接著上一篇文章講hdfs的Java操作;
/*
* 文件備份狀態 getFileBlockLocations
*/
@Test
public void test06() throws Exception {
//1.配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
//2.文件系統
FileSystem fs = FileSystem.get(conf);
//3.已存在的,必須是文件
Path path = new Path("hdfs:/input/abc.txt");
//4.文件狀態
FileStatus status = fs.getFileStatus(path);
//5.文件塊
//BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen()); //方法1,傳入文件的FileStatus
BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0, status.getLen()); //方法2,傳入文件的Path
int blockLen = blockLocations.length;
System.err.println("塊數量:"+blockLen); //如果文件不夠大,就不會分塊,即得到1
for (int i = 0; i < blockLen; i++) {
//得到塊文件大小
long sizes = blockLocations[i].getLength();
System.err.println("塊大小:"+sizes);
//按照備份數量得到全部主機名
String[] hosts = blockLocations[i].getHosts();
for (String host : hosts) {
System.err.println("主機名:"+host);
}
//按照備份數量得到全部主機名
String[] names = blockLocations[i].getNames();
for (String name : names) {
System.err.println("IP:"+ name);
}
}
}
@Test
public void test07() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem fs = FileSystem.get(conf);
Path path = new Path("hdfs:/input/abc.txt");
FSDataInputStream is = fs.open(path);
FileStatus stat = fs.getFileStatus(path);
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
fs.close();
System.out.println(new String(buffer));
}
/*
* 複製上傳文件 copyFromLocalFile
*/
@Test
public void test08() throws Exception {
// 1.創建配置器
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
//2.創建文件系統
FileSystem hdfs = FileSystem.get(conf);
//3.創建可供hadoop使用的文件系統路徑
Path src = new Path("E:\\info.log"); //本地目錄/文件
//Path src = new Path("file:/usr/abc.txt"); //本地目錄/文件
Path dst = new Path("hdfs:/"); //目標目錄/文件
// 4.拷貝本地文件上傳(本地文件,目標路徑)
hdfs.copyFromLocalFile(src, dst);
System.out.println("文件上傳成功至:" + conf.get("fs.default.name"));
// 5.列出HDFS上的文件
FileStatus[] fs = hdfs.listStatus(dst);
for (FileStatus f : fs) {
System.out.println(f.getPath());
}
//Path path = new Path("hdfs:/abc.txt");
Path path = new Path("hdfs:/info.log");
FSDataInputStream is = hdfs.open(path);
FileStatus stat = hdfs.getFileStatus(path);
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
hdfs.close();
System.out.println("文件內容:" + new String(buffer));
}
/*
* 複製下載文件 copyToLocalFile
*
*/
@Test
public void test09() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem hdfs = FileSystem.get(conf);
//創建HDFS源路徑和本地目標路徑
Path src = new Path("hdfs:/input/abc.txt"); //目標目錄/文件
Path dst = new Path("E:\\abc.log"); //本地目錄/文件
//拷貝本地文件上傳(本地文件,目標路徑)
hdfs.copyToLocalFile(src, dst);
}
/*
* 創建目錄 mkdir
*/
@Test
public void test10() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem hdfs = FileSystem.get(conf);
//創建目錄
hdfs.mkdirs(new Path("hdfs:/newname"));
}
/*
* 創建文件/目錄 create
*/
@Test
public void test11() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem hdfs = FileSystem.get(conf);
// 使用HDFS數據輸出流(寫)對象 在HDSF上根目錄創建一個文件夾,其內再創建文件
FSDataOutputStream out = hdfs.create(new Path("hdfs:/name/abc.txt"));
// 在文件中寫入一行數據,必須使用UTF-8
out.write("你好,Hello !".getBytes("UTF-8"));
out = hdfs.create(new Path("/name/alizee.txt"));
out.write("世界,Hello !".getBytes("UTF-8"));
out.close();
FSDataInputStream is = hdfs.open(new Path("hdfs:/name/alizee.txt"));
FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/name/alizee.txt"));
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
// FSDataInputStream are = hdfs.open(new Path("hdfs:/name/abc.txt"));
// FileStatus stat1 = hdfs.getFileStatus(new Path("hdfs:/name/abc.txt"));
// byte[] buffer1 = new byte[Integer.parseInt(String.valueOf(stat1.getLen()))];
// are.readFully(0, buffer1);
// are.close();
hdfs.close();
System.out.println(new String(buffer));
// System.out.println(new String(buffer1));
}
/*
* 創建空文件 createNewFile
*/
@Test
public void test12() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem hdfs = FileSystem.get(conf);
//創建空文件
hdfs.createNewFile(new Path("hdfs:/newfile.txt"));
}
/*
* 寫入文件 append
*
*/
@Test
public void test13() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem hdfs = FileSystem.get(conf);
//創建空文件
FSDataOutputStream out = hdfs.append(new Path("hdfs:/newfile.txt"));
out.write("使用append方法寫入文件\n".getBytes("UTF-8"));
out.close();
out = hdfs.append(new Path("/newfile.txt"));
out.write("再次寫入!!!\n".getBytes("UTF-8"));
out.close();
}
/*
* 重命名文件 rename
*/
@Test
public void test14() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem fs = FileSystem.get(conf);
//重命名:fs.rename(源文件,新文件)
boolean rename = fs.rename(new Path("/newfile.txt"), new Path("/newfile2.txt"));
System.out.println(rename);
}
/*
*刪除文件 delete
*/
@Test
public void test15() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.109.130:9000");
FileSystem fs = FileSystem.get(conf);
//判斷刪除(路徑,true。false=非空時不刪除,拋RemoteException、IOException異常)
boolean delete = fs.delete(new Path("hdfs:/newfile2.txt"), true);
System.out.println("執行刪除:"+delete);
//FileSystem關閉時執行
boolean exit = fs.deleteOnExit(new Path("/info.log"));
System.out.println("執行刪除:"+exit);
fs.close();
}
/**
* 查找某個文件在HDFS集群的位置
* */
@Test
public void test16(){
try {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.109.130:9000");
FileSystem fs = FileSystem.get(uri, conf);
Path dfs = new Path("hdfs://192.168.109.130:9000/input/abc.txt");
FileStatus fileStatus = fs.getFileStatus(dfs);
BlockLocation[] blkLocations = fs.getFileBlockLocations(fileStatus,0, fileStatus.getLen() );
int blockLen = blkLocations.length;
System.out.println("blockLen of length : " +blockLen );
for( int i=0;i<blockLen; i++){
String[] hosts = blkLocations[i].getHosts();
System.out.println("Block " + i +" Location: " + hosts[i]);
}
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 獲取HDFS集群上所有節點名稱:
* */
@Test
public void test17(){
try {
Configuration conf = new Configuration();
URI uri = new URI("hdfs://192.168.109.130:9000");
FileSystem fs = FileSystem.get(uri,conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
int dataNodeLen = dataNodeStats.length;
for( int i=0; i<dataNodeLen;i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println("Node " + i + " Name: "+ names[i] );
}
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}