Java api操作hdfs(二)

編程語言 HDFS Java Hadoop 陸小鳳一笑 2017-05-07

接著上一篇文章講hdfs的Java操作;

/*

* 文件備份狀態 getFileBlockLocations

*/

@Test

public void test06() throws Exception {

//1.配置器

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

//2.文件系統

FileSystem fs = FileSystem.get(conf);

//3.已存在的,必須是文件

Path path = new Path("hdfs:/input/abc.txt");

//4.文件狀態

FileStatus status = fs.getFileStatus(path);

//5.文件塊

//BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen()); //方法1,傳入文件的FileStatus

BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0, status.getLen()); //方法2,傳入文件的Path

int blockLen = blockLocations.length;

System.err.println("塊數量:"+blockLen); //如果文件不夠大,就不會分塊,即得到1

for (int i = 0; i < blockLen; i++) {

//得到塊文件大小

long sizes = blockLocations[i].getLength();

System.err.println("塊大小:"+sizes);

//按照備份數量得到全部主機名

String[] hosts = blockLocations[i].getHosts();

for (String host : hosts) {

System.err.println("主機名:"+host);

}

//按照備份數量得到全部主機名

String[] names = blockLocations[i].getNames();

for (String name : names) {

System.err.println("IP:"+ name);

}

}

}

@Test

public void test07() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem fs = FileSystem.get(conf);

Path path = new Path("hdfs:/input/abc.txt");

FSDataInputStream is = fs.open(path);

FileStatus stat = fs.getFileStatus(path);

byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];

is.readFully(0, buffer);

is.close();

fs.close();

System.out.println(new String(buffer));

}

/*

* 複製上傳文件 copyFromLocalFile

*/

@Test

public void test08() throws Exception {

// 1.創建配置器

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

//2.創建文件系統

FileSystem hdfs = FileSystem.get(conf);

//3.創建可供hadoop使用的文件系統路徑

Path src = new Path("E:\\info.log"); //本地目錄/文件

//Path src = new Path("file:/usr/abc.txt"); //本地目錄/文件

Path dst = new Path("hdfs:/"); //目標目錄/文件

// 4.拷貝本地文件上傳(本地文件,目標路徑)

hdfs.copyFromLocalFile(src, dst);

System.out.println("文件上傳成功至:" + conf.get("fs.default.name"));

// 5.列出HDFS上的文件

FileStatus[] fs = hdfs.listStatus(dst);

for (FileStatus f : fs) {

System.out.println(f.getPath());

}

//Path path = new Path("hdfs:/abc.txt");

Path path = new Path("hdfs:/info.log");

FSDataInputStream is = hdfs.open(path);

FileStatus stat = hdfs.getFileStatus(path);

byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];

is.readFully(0, buffer);

is.close();

hdfs.close();

System.out.println("文件內容:" + new String(buffer));

}

/*

* 複製下載文件 copyToLocalFile

*

*/

@Test

public void test09() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem hdfs = FileSystem.get(conf);

//創建HDFS源路徑和本地目標路徑

Path src = new Path("hdfs:/input/abc.txt"); //目標目錄/文件

Path dst = new Path("E:\\abc.log"); //本地目錄/文件

//拷貝本地文件上傳(本地文件,目標路徑)

hdfs.copyToLocalFile(src, dst);

}

/*

* 創建目錄 mkdir

*/

@Test

public void test10() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem hdfs = FileSystem.get(conf);

//創建目錄

hdfs.mkdirs(new Path("hdfs:/newname"));

}

/*

* 創建文件/目錄 create

*/

@Test

public void test11() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem hdfs = FileSystem.get(conf);

// 使用HDFS數據輸出流(寫)對象 在HDSF上根目錄創建一個文件夾,其內再創建文件

FSDataOutputStream out = hdfs.create(new Path("hdfs:/name/abc.txt"));

// 在文件中寫入一行數據,必須使用UTF-8

out.write("你好,Hello !".getBytes("UTF-8"));

out = hdfs.create(new Path("/name/alizee.txt"));

out.write("世界,Hello !".getBytes("UTF-8"));

out.close();

FSDataInputStream is = hdfs.open(new Path("hdfs:/name/alizee.txt"));

FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/name/alizee.txt"));

byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];

is.readFully(0, buffer);

is.close();

// FSDataInputStream are = hdfs.open(new Path("hdfs:/name/abc.txt"));

// FileStatus stat1 = hdfs.getFileStatus(new Path("hdfs:/name/abc.txt"));

// byte[] buffer1 = new byte[Integer.parseInt(String.valueOf(stat1.getLen()))];

// are.readFully(0, buffer1);

// are.close();

hdfs.close();

System.out.println(new String(buffer));

// System.out.println(new String(buffer1));

}

/*

* 創建空文件 createNewFile

*/

@Test

public void test12() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem hdfs = FileSystem.get(conf);

//創建空文件

hdfs.createNewFile(new Path("hdfs:/newfile.txt"));

}

/*

* 寫入文件 append

*

*/

@Test

public void test13() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem hdfs = FileSystem.get(conf);

//創建空文件

FSDataOutputStream out = hdfs.append(new Path("hdfs:/newfile.txt"));

out.write("使用append方法寫入文件\n".getBytes("UTF-8"));

out.close();

out = hdfs.append(new Path("/newfile.txt"));

out.write("再次寫入!!!\n".getBytes("UTF-8"));

out.close();

}

/*

* 重命名文件 rename

*/

@Test

public void test14() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem fs = FileSystem.get(conf);

//重命名:fs.rename(源文件,新文件)

boolean rename = fs.rename(new Path("/newfile.txt"), new Path("/newfile2.txt"));

System.out.println(rename);

}

/*

*刪除文件 delete

*/

@Test

public void test15() throws Exception {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.109.130:9000");

FileSystem fs = FileSystem.get(conf);

//判斷刪除(路徑,true。false=非空時不刪除,拋RemoteException、IOException異常)

boolean delete = fs.delete(new Path("hdfs:/newfile2.txt"), true);

System.out.println("執行刪除:"+delete);

//FileSystem關閉時執行

boolean exit = fs.deleteOnExit(new Path("/info.log"));

System.out.println("執行刪除:"+exit);

fs.close();

}

/**

* 查找某個文件在HDFS集群的位置

* */

@Test

public void test16(){

try {

Configuration conf = new Configuration();

URI uri = new URI("hdfs://192.168.109.130:9000");

FileSystem fs = FileSystem.get(uri, conf);

Path dfs = new Path("hdfs://192.168.109.130:9000/input/abc.txt");

FileStatus fileStatus = fs.getFileStatus(dfs);

BlockLocation[] blkLocations = fs.getFileBlockLocations(fileStatus,0, fileStatus.getLen() );

int blockLen = blkLocations.length;

System.out.println("blockLen of length : " +blockLen );

for( int i=0;i<blockLen; i++){

String[] hosts = blkLocations[i].getHosts();

System.out.println("Block " + i +" Location: " + hosts[i]);

}

} catch (IllegalArgumentException e) {

e.printStackTrace();

} catch (URISyntaxException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

/**

* 獲取HDFS集群上所有節點名稱:

* */

@Test

public void test17(){

try {

Configuration conf = new Configuration();

URI uri = new URI("hdfs://192.168.109.130:9000");

FileSystem fs = FileSystem.get(uri,conf);

DistributedFileSystem hdfs = (DistributedFileSystem)fs;

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

String[] names = new String[dataNodeStats.length];

int dataNodeLen = dataNodeStats.length;

for( int i=0; i<dataNodeLen;i++){

names[i] = dataNodeStats[i].getHostName();

System.out.println("Node " + i + " Name: "+ names[i] );

}

} catch (URISyntaxException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

}

}

相關推薦

推薦中...