分页读取hdfs文件。
通过流是可以随机读的。
准备数据
放到 /jimo/linux.csv
# cat linux.csv
0,0
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
9,9
10,10
11,11
12,12
13,13
14,14
15,15
16,16
17,17
18,18
19,19
第一次读取10行
读取时记录读了多少个字符
final FileSystem fs = getFileSystem();
int batch = 10;
long pos = 0;
final String filePath = "/jimo/linux.csv";
final FSDataInputStream fis = fs.open(new Path(filePath));
try (InputStreamReader inputStreamReader = new InputStreamReader(fis);
BufferedReader br = new BufferedReader(inputStreamReader)) {
String line;
int row = 1;
while (row <= batch && (line = br.readLine()) != null) {
System.out.println(line);
// System.out.println("pos---" + fis.getPos() + ",ir="); // 每次都是100
row++;
// 如果是windows文件,需要加2,因为\r\n, linux和mac只需要加1
pos += line.length() + 1;
}
// 这样获取是一批数据,not ok
// pos = fis.getPos();
System.out.println("POS=======" + pos); // 40
}
实验:通过 fis.getPos()
获取的是一批数据的长度,这里实验数据比较小,所以全获取了一直是100,一批默认是16k。
于是通过手动计算每行的长度之和。
第二次读取10行
再接着读后面10行
// 再次从pos读取
final FSDataInputStream fis2 = fs.open(new Path(filePath));
// 偏移
fis2.seek(pos);
try (InputStreamReader inputStreamReader = new InputStreamReader(fis2);
BufferedReader br = new BufferedReader(inputStreamReader)) {
// br.skip(pos);
String line;
int row = 1;
while (row <= batch && (line = br.readLine()) != null) {
System.out.println(line);
row++;
}
}
虽然 getPos
不好使,但 seek
还是好使的。
完整代码
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@Test
public void testSeekRead() throws Exception {
final FileSystem fs = getFileSystem();
int batch = 10;
long pos = 0;
final String filePath = "/jimo/linux.csv";
final FSDataInputStream fis = fs.open(new Path(filePath));
try (InputStreamReader inputStreamReader = new InputStreamReader(fis);
BufferedReader br = new BufferedReader(inputStreamReader)) {
String line;
int row = 1;
while (row <= batch && (line = br.readLine()) != null) {
System.out.println(line);
// System.out.println("pos---" + fis.getPos() + ",ir="); // 每次都是100
row++;
// 如果是windows文件,需要加2,因为\r\n, linux和mac只需要加1
pos += line.length() + 1;
}
// 这样获取是一批数据,not ok
// pos = fis.getPos();
System.out.println("POS=======" + pos); // 40
}
// 再次从pos读取
final FSDataInputStream fis2 = fs.open(new Path(filePath));
// 偏移
fis2.seek(pos);
try (InputStreamReader inputStreamReader = new InputStreamReader(fis2);
BufferedReader br = new BufferedReader(inputStreamReader)) {
// br.skip(pos);
String line;
int row = 1;
while (row <= batch && (line = br.readLine()) != null) {
System.out.println(line);
row++;
}
}
}
private FileSystem getFileSystem() throws IOException, InterruptedException, URISyntaxException {
// 此Util是从文件中读取hadoop集群配置
Configuration conf = HadoopUtils.getHadoopConfiguration("D:\\export\\hdfs-conf");
conf.set("dfs.client.use.datanode.hostname", "true");
String hdfsPath = "hdfs://10.10.10.10:9000";
return FileSystem.get(new URI(hdfsPath), conf, "hdfs");
}