开发者社区 > 博文 > 分页读取hdfs文件
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

分页读取hdfs文件

  • 京东城市JUST团队
  • 2021-01-22
  • IP归属:未知
  • 31040浏览

分页读取hdfs文件。

通过流是可以随机读的。

准备数据

放到 /jimo/linux.csv

# cat linux.csv 
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9
10,10
11,11
12,12
13,13
14,14
15,15
16,16
17,17
18,18
19,19

第一次读取10行

读取时记录读了多少个字符

final FileSystem fs = getFileSystem();
int batch = 10;
long pos = 0;

final String filePath = "/jimo/linux.csv";
final FSDataInputStream fis = fs.open(new Path(filePath));
try (InputStreamReader inputStreamReader = new InputStreamReader(fis);
	 BufferedReader br = new BufferedReader(inputStreamReader)) {
	String line;
	int row = 1;
	while (row <= batch && (line = br.readLine()) != null) {
		System.out.println(line);
		// System.out.println("pos---" + fis.getPos() + ",ir="); // 每次都是100
		row++;
		// 如果是windows文件,需要加2,因为\r\n, linux和mac只需要加1
		pos += line.length() + 1;
	}
	// 这样获取是一批数据,not ok
	// pos = fis.getPos();
	System.out.println("POS=======" + pos); // 40
}

实验:通过 fis.getPos() 获取的是一批数据的长度,这里实验数据比较小,所以全获取了一直是100,一批默认是16k。

于是通过手动计算每行的长度之和。

第二次读取10行

再接着读后面10行

// 再次从pos读取
final FSDataInputStream fis2 = fs.open(new Path(filePath));
// 偏移
fis2.seek(pos);
try (InputStreamReader inputStreamReader = new InputStreamReader(fis2);
	 BufferedReader br = new BufferedReader(inputStreamReader)) {
	// br.skip(pos);
	String line;
	int row = 1;
	while (row <= batch && (line = br.readLine()) != null) {
		System.out.println(line);
		row++;
	}
}

虽然 getPos 不好使,但 seek还是好使的。

完整代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;


   @Test
    public void testSeekRead() throws Exception {

        final FileSystem fs = getFileSystem();
        int batch = 10;
        long pos = 0;

        final String filePath = "/jimo/linux.csv";
        final FSDataInputStream fis = fs.open(new Path(filePath));
        try (InputStreamReader inputStreamReader = new InputStreamReader(fis);
             BufferedReader br = new BufferedReader(inputStreamReader)) {
            String line;
            int row = 1;
            while (row <= batch && (line = br.readLine()) != null) {
                System.out.println(line);
                // System.out.println("pos---" + fis.getPos() + ",ir="); // 每次都是100
                row++;
                // 如果是windows文件,需要加2,因为\r\n, linux和mac只需要加1
                pos += line.length() + 1;
            }
            // 这样获取是一批数据,not ok
            // pos = fis.getPos();
            System.out.println("POS=======" + pos); // 40
        }

        // 再次从pos读取
        final FSDataInputStream fis2 = fs.open(new Path(filePath));
        // 偏移
        fis2.seek(pos);
        try (InputStreamReader inputStreamReader = new InputStreamReader(fis2);
             BufferedReader br = new BufferedReader(inputStreamReader)) {
            // br.skip(pos);
            String line;
            int row = 1;
            while (row <= batch && (line = br.readLine()) != null) {
                System.out.println(line);
                row++;
            }
        }
    }

    private FileSystem getFileSystem() throws IOException, InterruptedException, URISyntaxException {
		// 此Util是从文件中读取hadoop集群配置
        Configuration conf = HadoopUtils.getHadoopConfiguration("D:\\export\\hdfs-conf");
        conf.set("dfs.client.use.datanode.hostname", "true");
        String hdfsPath = "hdfs://10.10.10.10:9000";
        return FileSystem.get(new URI(hdfsPath), conf, "hdfs");
    }
共0条评论