开发者社区 > 博文 > GeoMesa-HBase操作篇——Java API基本操作
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

GeoMesa-HBase操作篇——Java API基本操作

  • 京东城市JUST团队
  • 2021-01-22
  • IP归属:未知
  • 40560浏览

1. 创建DataStore

1.1 创建Option

由于之后的操作可能涉及到多个操作,因此利用Options类来对这些操作信息进行封装

  1. public Options createOptions(Param[] parameters) {
  2. Options options = new Options();
  3. for (Param p: parameters) {
  4. if (!p.isDeprecated()) {
  5. Option opt = Option.builder(null)
  6. .longOpt(p.getName())
  7. .argName(p.getName())
  8. .hasArg()
  9. .desc(p.getDescription().toString())
  10. .required(p.isRequired())
  11. .build();
  12. options.addOption(opt);
  13. }
  14. }
  15. return options;
  16. }

1.2 解析参数

由于geomesa的信息并不是利用configure来进行封装的,而是利用args来进行传输和解析的,因此对此需要进行规定。在这里使用了默认的解析器(DefaultParser)

  1. public CommandLine parseArgs(Class<?> caller, Options options, String[] args) throws ParseException {
  2. try {
  3. return new DefaultParser().parse(options, args);
  4. } catch (ParseException e) {
  5. System.err.println(e.getMessage());
  6. HelpFormatter formatter = new HelpFormatter();
  7. formatter.printHelp(caller.getName(), options);
  8. throw e;
  9. }
  10. }

1.3 获取DataStore参数

在此方法中主要是为了利用上述的解析器和获取的操作参数,最终封装成为一个Map,key为操作名称,value为操作的值。而在此方法中,zookeepers为zk集群的地址信息,catalog为在hbase当中的目录名称(类似于RDBMS中的数据库的概念),但是为了使此方法更加灵活,这个参数没有写死,而是作为一个参数进行传入。

  1. public Map<String, String> getDataStoreParams(String catalog) throws ParseException {
  2. String[] args ={"--hbase.catalog", catalog, "--hbase.zookeepers", "localhost:2181"};
  3. Options options = this.createOptions(new HBaseDataStoreFactory().getParametersInfo());
  4. CommandLine command = this.parseArgs(getClass(), options, args);
  5. Map<String, String> params = new HashMap<>();
  6. // 检查Option里面有没有值,如果没有,就会赋值
  7. for (Option opt: options.getOptions()) {
  8. String value = command.getOptionValue(opt.getLongOpt());
  9. if (value != null) {
  10. params.put(opt.getArgName(), value);
  11. }
  12. }
  13. return params;
  14. }

1.4 创建DataStore

  1. public DataStore createHbaseDataStore(String catalog) throws IOException, ParseException {
  2. Map<String, String> params = this.getDataStoreParams(catalog);
  3. return DataStoreFinder.getDataStore(params);
  4. }

2. 创建SimpleFeatureType

SimpleFeatureType是Geotools当中表示数据类型的一个概念,相当于一种封装地理数据的数据格式。理解上可以约等于关系型数据库当中的Schema,但是有一些差别。

  1. public SimpleFeatureType getSimpleFeatureType() {
  2. if (sft == null) {
  3. StringBuilder attributes = new StringBuilder();
  4. attributes.append("taxiId:String,");
  5. attributes.append("dtg:Date,");
  6. attributes.append("*geom:Point:srid=4326,");
  7. attributes.append("speed:string,");
  8. attributes.append("isDriven:string");
  9. sft = SimpleFeatureTypes.createType(getTypeName(), attributes.toString());
  10. sft.getUserData().put(SimpleFeatureTypes.DEFAULT_DATE_KEY, "dtg");
  11. }
  12. return sft;
  13. }

其中第6行当中的“srid”是GIS当中的一个空间参考标识符。而此处的“srid=4326”表示这些数据对应的WGS 84空间参考系统。

第10行中的getTypeName返回值也是String类型,用户也可以直接属于一个type名称(String类型)

3. 创建schema

dataStore.createSchema(sft);  

这一步主要是为了将sft传入特定dataStore中的createSchema方法当中,执行创建Schema的操作。

4. 读取数据

此时就要获取相关的数据进行插入。Geomesa的数据插入支持多种格式文件,例如csv、txt,此处以csv为例。

  1. public List<SimpleFeature> getTestData() {
  2. if (features == null) {
  3. List<SimpleFeature> features = new ArrayList<>();
  4. URL input = getClass().getClassLoader().getResource("test.csv");
  5. if (input == null) {
  6. throw new RuntimeException("Couldn't load resource test.csv");
  7. }
  8. // 设定日期的格式,此处需要用到正则表达式
  9. DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA);
  10. // 使用geotools当中的SimpleFeatureType来生成我们需要的Feature
  11. SimpleFeatureBuilder builder = new SimpleFeatureBuilder(getSimpleFeatureType());
  12. try (CSVParser parser = CSVParser.parse(input, StandardCharsets.UTF_8, CSVFormat.DEFAULT)) {
  13. for (CSVRecord record : parser) {
  14. // 对于每个字段和csv文件的对应关系进行设定
  15. builder.set("taxiId", record.get(0));
  16. builder.set("dtg", Date.from(LocalDateTime.parse(record.get(1), dateFormat).toInstant(ZoneOffset.UTC)));
  17. double longitude = Double.parseDouble(record.get(3));
  18. double latitude = Double.parseDouble(record.get(2));
  19. builder.set("geom", "POINT (" + longitude + " " + latitude + ")");
  20. builder.set("speed", record.get(4));
  21. builder.set("isDriven", record.get(5));
  22. // 向Geotools确认要用用户自行设定的id
  23. builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
  24. // 设定一个id,此处是taxiID
  25. features.add(builder.buildFeature(record.get(1)));
  26. }
  27. } catch (IOException e) {
  28. throw new RuntimeException("Error reading t-drive data:", e);
  29. }
  30. this.features = Collections.unmodifiableList(features);
  31. }
  32. return features;
  33. }

注意:GeoMesa的数据库结构有三层,catalog、type、feature,可以类比关系型数据库当中的数据库、表和行。此处返回的features就是很多行数据的集合。

注意:此处的数据插入是将全量的数据先存入内存当中,这样很容易造成内存溢出的问题,因此建议批量插入。

5. 写入数据查询数据

获取到数据之后就可以将这些数据写入到geomesa当中了。

  1. public void writeFeatures(DataStore datastore, SimpleFeatureType sft, List<SimpleFeature> features) throws IOException {
  2. if (features.size() > 0) {
  3. System.out.println("Writing test data");
  4. try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
  5. datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT)) {
  6. for (SimpleFeature feature : features) {
  7. try {
  8. SimpleFeature toWrite = writer.next();
  9. toWrite.setAttributes(feature.getAttributes());
  10. ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
  11. toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
  12. toWrite.getUserData().putAll(feature.getUserData());
  13. writer.write();
  14. }catch (Exception e){
  15. logger.debug("Invalid GDELT record: " + e.toString() + " " + feature.getAttributes());
  16. }
  17. }
  18. }
  19. }
  20. }

6. 查询数据

在第6行和第7行出现了两个String类型的字符串,这是ECQL,一种针对地理信息的查询语句,其语法和通常的SQL略有不同。

  1. public List<Query> getTestQueries() {
  2. if (queries == null) {
  3. try {
  4. List<Query> queries = new ArrayList<>();
  5. String during = "dtg DURING 2159-01-01T00:00:00.000Z/2169-01-01T00:00:00.000Z";
  6. String bbox = "bbox(geom,128,48,133,53)";
  7. //时空查询
  8. queries.add(new Query(getTypeName(), ECQL.toFilter(bbox+" AND "+during)));
  9. //空间查询
  10. queries.add(new Query(getTypeName(), ECQL.toFilter(bbox)));
  11. //时间查询
  12. queries.add(new Query(getTypeName(), ECQL.toFilter(during)));
  13. // basic spatio-temporal query with projection down to a few attributes
  14. this.queries = Collections.unmodifiableList(queries);
  15. } catch (CQLException e) {
  16. throw new RuntimeException("Error creating filter:", e);
  17. }
  18. }
  19. return queries;
  20. }

 

共0条评论