1. 创建DataStore
1.1 创建Option
由于之后的操作可能涉及到多个操作,因此利用Options类来对这些操作信息进行封装
- public Options createOptions(Param[] parameters) {
- Options options = new Options();
- for (Param p: parameters) {
- if (!p.isDeprecated()) {
- Option opt = Option.builder(null)
- .longOpt(p.getName())
- .argName(p.getName())
- .hasArg()
- .desc(p.getDescription().toString())
- .required(p.isRequired())
- .build();
- options.addOption(opt);
- }
- }
- return options;
- }
1.2 解析参数
由于geomesa的信息并不是利用configure来进行封装的,而是利用args来进行传输和解析的,因此对此需要进行规定。在这里使用了默认的解析器(DefaultParser)
- public CommandLine parseArgs(Class<?> caller, Options options, String[] args) throws ParseException {
- try {
- return new DefaultParser().parse(options, args);
- } catch (ParseException e) {
- System.err.println(e.getMessage());
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(caller.getName(), options);
- throw e;
- }
- }
1.3 获取DataStore参数
在此方法中主要是为了利用上述的解析器和获取的操作参数,最终封装成为一个Map,key为操作名称,value为操作的值。而在此方法中,zookeepers为zk集群的地址信息,catalog为在hbase当中的目录名称(类似于RDBMS中的数据库的概念),但是为了使此方法更加灵活,这个参数没有写死,而是作为一个参数进行传入。
- public Map<String, String> getDataStoreParams(String catalog) throws ParseException {
- String[] args ={"--hbase.catalog", catalog, "--hbase.zookeepers", "localhost:2181"};
- Options options = this.createOptions(new HBaseDataStoreFactory().getParametersInfo());
- CommandLine command = this.parseArgs(getClass(), options, args);
- Map<String, String> params = new HashMap<>();
- // 检查Option里面有没有值,如果没有,就会赋值
- for (Option opt: options.getOptions()) {
- String value = command.getOptionValue(opt.getLongOpt());
- if (value != null) {
- params.put(opt.getArgName(), value);
- }
- }
- return params;
- }
1.4 创建DataStore
- public DataStore createHbaseDataStore(String catalog) throws IOException, ParseException {
- Map<String, String> params = this.getDataStoreParams(catalog);
- return DataStoreFinder.getDataStore(params);
- }
2. 创建SimpleFeatureType
SimpleFeatureType是Geotools当中表示数据类型的一个概念,相当于一种封装地理数据的数据格式。理解上可以约等于关系型数据库当中的Schema,但是有一些差别。
- public SimpleFeatureType getSimpleFeatureType() {
- if (sft == null) {
- StringBuilder attributes = new StringBuilder();
- attributes.append("taxiId:String,");
- attributes.append("dtg:Date,");
- attributes.append("*geom:Point:srid=4326,");
- attributes.append("speed:string,");
- attributes.append("isDriven:string");
-
- sft = SimpleFeatureTypes.createType(getTypeName(), attributes.toString());
- sft.getUserData().put(SimpleFeatureTypes.DEFAULT_DATE_KEY, "dtg");
- }
- return sft;
- }
其中第6行当中的“srid”是GIS当中的一个空间参考标识符。而此处的“srid=4326”表示这些数据对应的WGS 84空间参考系统。
第10行中的getTypeName返回值也是String类型,用户也可以直接属于一个type名称(String类型)
3. 创建schema
dataStore.createSchema(sft);
这一步主要是为了将sft传入特定dataStore中的createSchema方法当中,执行创建Schema的操作。
4. 读取数据
此时就要获取相关的数据进行插入。Geomesa的数据插入支持多种格式文件,例如csv、txt,此处以csv为例。
- public List<SimpleFeature> getTestData() {
- if (features == null) {
- List<SimpleFeature> features = new ArrayList<>();
-
- URL input = getClass().getClassLoader().getResource("test.csv");
- if (input == null) {
- throw new RuntimeException("Couldn't load resource test.csv");
- }
-
- // 设定日期的格式,此处需要用到正则表达式
- DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.CHINA);
-
- // 使用geotools当中的SimpleFeatureType来生成我们需要的Feature
- SimpleFeatureBuilder builder = new SimpleFeatureBuilder(getSimpleFeatureType());
-
- try (CSVParser parser = CSVParser.parse(input, StandardCharsets.UTF_8, CSVFormat.DEFAULT)) {
- for (CSVRecord record : parser) {
- // 对于每个字段和csv文件的对应关系进行设定
- builder.set("taxiId", record.get(0));
- builder.set("dtg", Date.from(LocalDateTime.parse(record.get(1), dateFormat).toInstant(ZoneOffset.UTC)));
- double longitude = Double.parseDouble(record.get(3));
- double latitude = Double.parseDouble(record.get(2));
-
- builder.set("geom", "POINT (" + longitude + " " + latitude + ")");
- builder.set("speed", record.get(4));
- builder.set("isDriven", record.get(5));
-
- // 向Geotools确认要用用户自行设定的id
- builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
-
- // 设定一个id,此处是taxiID
- features.add(builder.buildFeature(record.get(1)));
- }
- } catch (IOException e) {
- throw new RuntimeException("Error reading t-drive data:", e);
- }
- this.features = Collections.unmodifiableList(features);
- }
- return features;
- }
注意:GeoMesa的数据库结构有三层,catalog、type、feature,可以类比关系型数据库当中的数据库、表和行。此处返回的features就是很多行数据的集合。
注意:此处的数据插入是将全量的数据先存入内存当中,这样很容易造成内存溢出的问题,因此建议批量插入。
5. 写入数据查询数据
获取到数据之后就可以将这些数据写入到geomesa当中了。
- public void writeFeatures(DataStore datastore, SimpleFeatureType sft, List<SimpleFeature> features) throws IOException {
- if (features.size() > 0) {
- System.out.println("Writing test data");
- try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
- datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT)) {
- for (SimpleFeature feature : features) {
- try {
- SimpleFeature toWrite = writer.next();
- toWrite.setAttributes(feature.getAttributes());
- ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
- toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
- toWrite.getUserData().putAll(feature.getUserData());
- writer.write();
- }catch (Exception e){
- logger.debug("Invalid GDELT record: " + e.toString() + " " + feature.getAttributes());
- }
- }
- }
- }
- }
6. 查询数据
在第6行和第7行出现了两个String类型的字符串,这是ECQL,一种针对地理信息的查询语句,其语法和通常的SQL略有不同。
- public List<Query> getTestQueries() {
- if (queries == null) {
- try {
- List<Query> queries = new ArrayList<>();
-
- String during = "dtg DURING 2159-01-01T00:00:00.000Z/2169-01-01T00:00:00.000Z";
- String bbox = "bbox(geom,128,48,133,53)";
-
- //时空查询
- queries.add(new Query(getTypeName(), ECQL.toFilter(bbox+" AND "+during)));
- //空间查询
- queries.add(new Query(getTypeName(), ECQL.toFilter(bbox)));
- //时间查询
- queries.add(new Query(getTypeName(), ECQL.toFilter(during)));
- // basic spatio-temporal query with projection down to a few attributes
- this.queries = Collections.unmodifiableList(queries);
- } catch (CQLException e) {
- throw new RuntimeException("Error creating filter:", e);
- }
- }
- return queries;
- }