开发者社区 > 博文 > GeoMesa-Hbase源码篇 ——写流程
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

GeoMesa-Hbase源码篇 ——写流程

  • 京东城市JUST团队
  • 2021-01-18
  • IP归属:未知
  • 41720浏览

第一章   写入操作需要的参数

以前述的利用Java API方式来进行数据写入为例,可以看出,在写入时,主要使用了FeatureWriter来执行写入操作的,而在最后写入之前,事先对这个writer对象设定了DataStoreSimpleFeatureTypeSimpleFeature三个参数。

其中DataStore封装了连接信息(例如针对Hbasehconnection)、metadata(例如针对HbaseHBaseBackedMetadata)、状态信息(分布式相关)、FeatureWriter的工厂信息、其他配置信息等等。

SimpleFeatureType中封装了schema的信息,其中包括了索引信息、解释器的信息(主要用来解析SimpleFeatureType的字符串里的不同字段信息)、默认地理信息、type的名称等等。

SimpleFeature主要封装了一行数据的信息,其中包括了一行数据的idfeatureType(相当于DBMS中的schema)、内容、索引的映射关系等等。

1.     private void writeFeature(DataStore datastore, SimpleFeatureType sft, SimpleFeature feature) {  

2.             try {  

3.                 System.out.println("write test data");  

4.                 FeatureWriter<SimpleFeatureType, SimpleFeature> writer =  

5.                         datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT);  

6.                 SimpleFeature toWrite = writer.next();  

7.                 toWrite.setAttributes(feature.getAttributes());  

8.                 ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());  

9.                 toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);  

10.               toWrite.getUserData().putAll(feature.getUserData());  

11.               writer.write();  

12.               // 关闭流  

13.               writer.close();  

14.           } catch (IOException e) {  

15.               e.printStackTrace();  

16.           }  

17.   }  

第二章   写入方式的不同

当需要实现客户端对geomesa以及hbase进行读写操作时,可以选择多种操作方式,例如可以采用命令行、Java APIJavaRDDSpark APIpySpark等等方式来进行。

第一节   利用命令行来写入

第二节   利用Java API来写入

相比于其他方式,使用Java API理论上来说效率更高一些,不过如果不是在服务器当中运行程序,而是在本地IDE上执行这个操作,这个方式相对来说就比较慢了。

为了给读者一个参考,这里提供了一个最简化版本的Demo,同样,在之后章节里面可能会出现一些数据写入的例子,同样是以这个Demo为依据的。

在这个Demo当中,只生成了一行数据,其中包括taxiIdString)、时间数据(Date)、空间数据(Point)、descriptionString),简化的数据可以加快实验速度。SimpleFeatureType的名称为index-text02vatalog的名称为“test”

1.     public class IODemo01 {  

2.       

3.         /** 

4.          * 这个方法主要设定了表名"index-text02" 

5.          * schema结构"taxiId:String,dtg:Date,*geom:Point:srid=4326" 

6.          * @return SimpleFeatureType,即建表的schema表结构 

7.          */  

8.         public SimpleFeatureType getSimpleFeatureType() {  

9.             SimpleFeatureType sft = SimpleFeatureTypes.createType("index-text02""taxiId:String,dtg:Date,*geom:Point:srid=4326,description:String");  

10.           sft.getUserData().put(SimpleFeatureTypes.DEFAULT_DATE_KEY, "dtg");  

11.           return sft;  

12.       }  

13.     

14.       public static void main(String[] args) {  

15.     

16.           Map<String, String> params = new HashMap<>();  

17.           IODemo01 demo01 = new IODemo01();  

18.     

19.           try {  

20.               //创建datastore  

21.               params.put("hbase.catalog""test");  

22.               params.put("hbase.zookeepers""116.196.121.47:2181");  

23.     

24.               DataStore datastore = DataStoreFinder.getDataStore(params);  

25.     

26.               // 创建schema  

27.               SimpleFeatureType sft = demo01.getSimpleFeatureType();  

28.               System.out.println(sft);  

29.               System.out.println(datastore);  

30.               datastore.createSchema(sft);  

31.     

32.               //获取Features  

33.               SimpleFeature feature = demo01.getData();  

34.     

35.               //写入Features  

36.               demo01.writeFeature(datastore, sft, feature);  

37.           } catch (Exception e) {  

38.               e.printStackTrace();  

39.           }  

40.     

41.       }  

42.     

43.       /** 

44.        * 这个方法主要用来将封装好的feature存入到对应的datastore 

45.        * @param datastore 数据源,可以选择的有很多,此处是HBaseDataStore 

46.        * @param sft 表结构 

47.        * @param feature 封装好的一行数据 

48.        */  

49.       private void writeFeature(DataStore datastore, SimpleFeatureType sft, SimpleFeature feature) {  

50.           try {  

51.               System.out.println("write test data");  

52.               FeatureWriter<SimpleFeatureType, SimpleFeature> writer =  

53.                       datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT);  

54.               SimpleFeature toWrite = writer.next();  

55.               toWrite.setAttributes(feature.getAttributes());  

56.               ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());  

57.               toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);  

58.               toWrite.getUserData().putAll(feature.getUserData());  

59.               writer.write();  

60.               // 关闭流  

61.               writer.close();  

62.           } catch (IOException e) {  

63.               e.printStackTrace();  

64.           }  

65.     

66.       }  

67.     

68.       /** 

69.        * 这个方法主要是将非结构化的数据转换为feature对象 

70.        * @return feature对象 

71.        */  

72.       private SimpleFeature getData() {  

73.           SimpleFeatureBuilder builder = new SimpleFeatureBuilder(getSimpleFeatureType());  

74.           DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.US);  

75.           builder.set("taxiId""A SA2342");  

76.           builder.set("dtg", Date.from(LocalDateTime.parse("2008-02-02 13:30:49", dateFormat).toInstant(ZoneOffset.UTC)));  

77.           builder.set("geom""POINT (116.31412 39.89454)");  

78.           builder.set("description""这是一辆套牌的车");  

79.           builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);  

80.           return builder.buildFeature("A SA2342");  

81.       }  

82.   }  

 

 

第三节   利用Spark API来写入

第三章   针对不同DataStore的写入流程

第一节   HBaseDataStore的写入过程

1     基本流程

建立索引(在此处的索引是动词,指在写入操作中,将数据进行解构并组织,使其能够更快地进行查询)、插入feature都是写操作。基于Hbase进行写单个feature的流程如下图所示(由于GeoMesa最具特色的是其基于Z曲线的三维时空索引,因此在此处以Z3索引为例进行展示):

以下是写入单个feature的流程:

1)     客户端向GeoMesa发送写请求

2)     GeoMesa对传来的feature进行数据的预处理,主要包括featureID的处理、epoch的封装等等,将feature数据和SimpleFeatureType数据封装成为能够进行下一步操作的数据类型。

3)     GeoMesa中的索引机制会根据feature中的数据判断建立的索引类型,然后利用sfcurve中的相关方法,将时间、空间数据格式化,并整合成为一个序列。

4)     GeoMesa会利用Kryo序列化机制,将第三步产生的序列进行序列化,转换成为适合Hbase存储的格式。

5)     最后利用缓存修改器中的put方法,调用Hbase API,将之前处理好的数据写入到Hbase当中。

2     数据预处理

2.1  分析feature中的id

在调用write方法后,首先会进入org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter类中,调用write方法。

1.     override def write(): Unit = {  

2.           if (currentFeature == null) {  

3.             throw new IllegalStateException("next() must be called before write()")  

4.           }  

5.           writeFeature(currentFeature)  

6.           currentFeature = null  

7.         }  

更进一步,geomesa会调用writeFeature方法,完成写入操作的具体实现就是利用此方法来实现的。

1.     protected def writeFeature(feature: SimpleFeature): Unit = {  

2.         //查看是否有一个指定id,如果没有,系统会生成一个id  

3.         val featureWithFid = GeoMesaFeatureWriter.featureWithFid(sft, feature)  

4.         val wrapped = wrapFeature(featureWithFid)  

5.           

6.         val converted = try {  

7.           writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }  

8.         } catch {  

9.           case NonFatal(e) =>  

10.           val attributes = s"${featureWithFid.getID}:${featureWithFid.getAttributes.asScala.mkString("|")}"  

11.           throw new IllegalArgumentException(s"Error indexing feature '$attributes'", e)  

12.       }  

13.       converted.foreach { case (mutator, writes) => executeWrite(mutator, writes) }  

14.       statUpdater.add(featureWithFid)  

15.     }  

 

由于用户在创建feature时,有可能设定了id,也可能没有设定id,因此在开始,geoemesa需要对feature进行判断,如果有设定id,就使用用户指定的id,如果没有,就使用uuid随机生成一个id

具体实现是在featureWithFid方法当中。首先,系统会对feature进行判断,也就是Hint.PROVIED_FID,如果为true,就使用此fid,如果没有,就利用idGenerator进行生成id的操作。

1.     def featureWithFid(sft: SimpleFeatureType, feature: SimpleFeature): SimpleFeature = {  

2.         if (feature.getUserData.containsKey(Hints.PROVIDED_FID)) {  

3.           withFid(sft, feature, feature.getUserData.get(Hints.PROVIDED_FID).toString)  

4.         } else if (feature.getUserData.containsKey(Hints.USE_PROVIDED_FID) &&  

5.             feature.getUserData.get(Hints.USE_PROVIDED_FID).asInstanceOf[Boolean]) {  

6.           feature  

7.         } else {  

8.           withFid(sft, feature, idGenerator.createId(sft, feature))  

9.      }  

10.   }  

2.2  Feature格式转换

由于原生的SimpleFeature当中并没有包含Hbase相关的一些属性,不利于SimpleFeature最终插入到Hbase表当中,因此在这一步,GeoMesaSimpleFeature对象转换为HbaseFeature对象。

这个过程是在org.locationtech.geomesa.hbase.data.HBaseFeatureWriter类当中进行的。具体实现方法是wrapFeature方法。

1.     override protected def wrapFeature(feature: SimpleFeature): HBaseFeature = wrapper(feature)  

接下来调用org.locationtech.geomesa.hbase.data.HBaseFeature类当中的wrapper方法。

1.     def wrapper(sft: SimpleFeatureType): (SimpleFeature) => HBaseFeature = {  

2.         val serializers = HBaseColumnGroups.serializers(sft)  

3.         val idSerializer = GeoMesaFeatureIndex.idToBytes(sft)  

4.         (feature) => new HBaseFeature(feature, serializers, idSerializer)  

5.       }  

从这个方法中可以看出,在转换Feature格式的过程当中,新的HbaseFeature当中又封装了一些序列化的对象。IdSerializer在此时是空的对象,而serializers对象已经被赋值了,是KyroFeatureSerializer中的内部类MutableActiveSerializer的一个实例。可以看出,在进行数据序列化的过程当中,GeoMesa-Hbase使用的是Kyro序列化机制。

2.3  确定分片

在进行格式转化之后,回到GeoMesaFeatureWriter类的writeFeature方法当中进行封装。

1.     writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }    

但是这个时候的数据虽然可以进行序列化、也能够调用HbaseAPI插入到Hbase当中,但是由于时空数据往往会分布不均匀,可能会产生严重的数据倾斜问题,所以这是需要将数据进行分片存储。这个方法是在org.locationtech.geomesa.index.index.BaseFeatureIndexwriter方法当中实现的。

1.     override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {  

2.         val sharing = sft.getTableSharingBytes  

3.         val shards = shardStrategy(sft)  

4.         val toIndexKey = keySpace.toIndexKeyBytes(sft)  

5.         mutator(sharing, shards, toIndexKey, createInsert)  

6.       }  

在这个方法当中看到有四个参数,其中比较重要的是shardstoIndexKeyshards是负责分片存储策略实现的,toIndexKey是负责建立索引的参数。

在进一步的方法调用中,GeoMesa会判断id是用户指定的还是系统自动生成的uuid,如果是用户指定的,就会调用StringToBytes方法,将用户指定的id当成一个字符串来进行hash运算。如果是系统生成的uuidGeoMesa就会调用uuidToBytes方法,将uuid直接进行hash运算。

在执行完上述操作后,geomesa就开始调用toIndexKey来进行索引的建立工作了。

3     Z曲线处理

3.1  获取Z曲线的value

GeoMesafeature存入到Hbase的过程当中,GeoMesa会生成相应的索引值,而这个过程是通过调用org.locationtech.geomesa.index.index.z3.Z3IndexKeySpace中的toIndexKeyBytes方法实现的。

1.     override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {  

2.         val z3 = sfc(sft.getZ3Interval)  

3.         val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)  

4.         val dtgIndex = sft.getDtgIndex.getOrElse(throw new IllegalStateException("Z3 index requires a valid date"))  

5.         val timeToIndex = BinnedTime.timeToBinnedTime(sft.getZ3Interval)  

6.       

7.         getZValueBytes(z3, geomIndex, dtgIndex, timeToIndex, lenient)  

8.       }  

而在其进一步调用的getZValueBytes方法当中,可以看到这个索引值具体是如何封装的。

1.     private def getZValueBytes(z3: Z3SFC,  

2.                                  geomIndex: Int,  

3.                                  dtgIndex: Int,  

4.                                  timeToIndex: TimeToBinnedTime,  

5.                                  lenient: Boolean)  

6.                                 (prefix: Seq[Array[Byte]],  

7.                                  feature: SimpleFeature,  

8.                                  suffix: Array[Byte]): Seq[Array[Byte]] = {  

9.         val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]  

10.       if (geom == null) {  

11.         throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")  

12.       }  

13.       val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]  

14.       val time = if (dtg == null) { 0 } else { dtg.getTime }  

15.       val BinnedTime(b, t) = timeToIndex(time)  

16.       val z = try { z3.index(geom.getX, geom.getY, t, lenient).z } catch {  

17.         case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry/time: $geom,$dtg", e)  

18.       }  

19.     

20.       // create the byte array - allocate a single array up front to contain everything  

21.       val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)  

22.       var i = 0  

23.       prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

24.       ByteArrays.writeShort(b, bytes, i)  

25.       ByteArrays.writeLong(z, bytes, i + 2)  

26.       System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)  

27.       Seq(bytes)  

28.     }  

在方法的前半段,从第9行到第17行,该方法实现了如下功能:首先将地理信息提取出来,转换为本身的Point对象,并且判断是否有地理信息,之后会将时间信息提取出来,调用相关的方法进一步建立索引。

3.2  将时间信息利用Binned机制进行转换

由于时间是无限的,因此为了适应GeoHash二分编码的规则,与经纬度信息形成八叉树的索引结构,GeoMesa内置了Binned机制,将时间以公元197011日零时零分为起始,按照天、周(默认)、月、年四种区间,形成相对有限的chunk进行计量,以利于后期的二分操作。

但是因为索引长度是有限的,因此这个方式会有一些局限性,主要体现在表示的时间长度比较有限。如果以天为单位,偏移量的单位就是毫秒,最大表示的时间为公元2059918日;如果以周为单位,这个也是GeoMesa默认的一种设置,偏移量的单位就是秒,最大表示的时间为公元259814日;如果以月为单位,偏移量的单位也是秒,最大表示的时间为公元4700831日;如果以年为单位,偏移量的单位就是分钟,最大表示的时间为公元347371231日。

一般情况下,由于项目生命周期和时间粒度的权衡下,以周为时间周期进行计算是比较合理的,因此现在的GeoMesa默认的设置同样是这样的。

这个过程具体是由org.locationtech.geomesa.curve.BinnedTime类承担的,而具体到现在写入数据的过程,主要是调用timeToBinnedTime方法来实现的。

1.     def timeToBinnedTime(period: TimePeriod): TimeToBinnedTime = {  

2.         period match {  

3.           case TimePeriod.Day   => toDayAndMillis  

4.           case TimePeriod.Week  => toWeekAndSeconds  

5.           case TimePeriod.Month => toMonthAndSeconds  

6.           case TimePeriod.Year  => toYearAndMinutes  

7.         }  

8.       }

更进一步的调用,可以点开toWeekAndSeconds方法进行查看。

3.3  建立时空索引

这一步是GeoMesa建立时空索引的核心部分,在执行这一步之前,用户需要将经纬度、时间的参数都准备好,经纬度的信息都是Double类型的数据,时间的信息需要转换成为Long类型的数据,然后进入org.locationtech.geomesa.curve.Z3SFC类中,执行index方法。

1.     override def index(x: Double, y: Double, t: Long, lenient: Boolean = false): Z3 = {  

2.         try {  

3.           require(x >= lon.min && x <= lon.max && y >= lat.min && y <= lat.max && t >= time.min && t <= time.max,  

4.             s"Value(s) out of bounds ([${lon.min},${lon.max}], [${lat.min},${lat.max}], [${time.min},${time.max}]): $x, $y, $t")  

5.           Z3(lon.normalize(x), lat.normalize(y), time.normalize(t))  

6.         } catch {  

7.           case _: IllegalArgumentException if lenient => lenientIndex(x, y, t)  

8.         }  

9.       }  

在上述源码的第4行,可以看到,经度、维度和时间的数据都调用了normalize方法。这是对这些数据进行格式化处理。

在此之后,对于经过格式化的数据,GeoMesa利用org.locationtech.sfcurve.zorder.Z3类对这些数据进行了再次的封装。

1.  def apply(x: Int, y:  Int, z: Int): Z3 = {  

2.      new Z3(split(x) | split(y) << 1 | split(z) << 2)  

3.    }  

其中,会调用一些切分、编码、解码的方法,在此不赘述。

4     数据序列化

在前述的Z3IndexKeySpace类中的getZValueBytes方法的后半段(第20行到第28行),主要实现了前面处理数据的序列化操作。

1.     private def getZValueBytes(z3: Z3SFC,  

2.                                  geomIndex: Int,  

3.                                  dtgIndex: Int,  

4.                                  timeToIndex: TimeToBinnedTime,  

5.                                  lenient: Boolean)  

6.                                 (prefix: Seq[Array[Byte]],  

7.                                  feature: SimpleFeature,  

8.                                  suffix: Array[Byte]): Seq[Array[Byte]] = {  

9.         val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]  

10.       if (geom == null) {  

11.         throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")  

12.       }  

13.       val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]  

14.       val time = if (dtg == null) { 0 } else { dtg.getTime }  

15.       val BinnedTime(b, t) = timeToIndex(time)  

16.       val z = try { z3.index(geom.getX, geom.getY, t, lenient).z } catch {  

17.         case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry/time: $geom,$dtg", e)  

18.       }  

19.     

20.       // create the byte array - allocate a single array up front to contain everything  

21.       val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)  

22.       var i = 0  

23.       prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

24.       ByteArrays.writeShort(b, bytes, i)  

25.       ByteArrays.writeLong(z, bytes, i + 2)  

26.       System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)  

27.       Seq(bytes)  

28.     }  

4.1  将数据封装成为Long类型的数据

在上一步中,数据已经被整合成为符合Z3填充曲线规则的格式了,同时用Z3封装好了,接下来就是将这些数据解析成为一个long类型数据并进一步转化成为能够存入到Hbase当中的二进制序列。

实现这个功能的类是org.locationtech.geomesa.utils.index.ByteArrays类,而实现这个过程需要先后调用其中的writeShort方法和writeLong方法。如下:

WriteShort方法:

1.     def writeShort(short: Short, bytes: Array[Byte], offset: Int = 0): Unit = {  

2.         bytes(offset) = (short >> 8).asInstanceOf[Byte]  

3.         bytes(offset + 1) = short.asInstanceOf[Byte]  

4.       }  

WriteLong方法:

1.     def writeLong(long: Long, bytes: Array[Byte], offset: Int = 0): Unit = {  

2.         bytes(offset    ) = ((long >> 56) & 0xff).asInstanceOf[Byte]  

3.         bytes(offset + 1) = ((long >> 48) & 0xff).asInstanceOf[Byte]  

4.         bytes(offset + 2) = ((long >> 40) & 0xff).asInstanceOf[Byte]  

5.         bytes(offset + 3) = ((long >> 32) & 0xff).asInstanceOf[Byte]  

6.         bytes(offset + 4) = ((long >> 24) & 0xff).asInstanceOf[Byte]  

7.         bytes(offset + 5) = ((long >> 16) & 0xff).asInstanceOf[Byte]  

8.         bytes(offset + 6) = ((long >> 8)  & 0xff).asInstanceOf[Byte]  

9.         bytes(offset + 7) =  (long        & 0xff).asInstanceOf[Byte]  

10.     }  

4.2  利用mutatorkey的数据进行封装

上述数据都处理完成以后,mutator会将indexshardsfeature等数据进行封装

1.  private def mutator(sharing: Array[Byte],  

2.                        shards: ShardStrategy,  

3.                        toIndexKey: (Seq[Array[Byte]], SimpleFeature, Array[Byte]) => Seq[Array[Byte]],  

4.                        operation: (Array[Byte], F) => W)  

5.                       (feature: F): Seq[W] = {  

6.      toIndexKey(Seq(sharing, shards(feature)), feature.feature, feature.idBytes).map(operation.apply(_, feature))  

7.    }  

5     写入Hbase

Geomesa中利用HTableHbase进行交互,而封装数据的put对象存在于org.locationtech.geomesa.hbase.index.HBaseIndexAdapter类当中,通过调用createInsert方法,来实现这个过程,该方法返回一个Mutation对象,其中就封装了数据的一次写操作。

1.     override protected def createInsert(row: Array[Byte], feature: HBaseFeature): Mutation = {  

2.       val put = new Put(row)  

3.       feature.values.foreach(v => put.addImmutable(v.cf, v.cq, v.value))  

4.       feature.visibility.foreach(put.setCellVisibility)  

5.       put.setDurability(HBaseIndexAdapter.durability)  

6.     }  

5.1  插入之前的序列化操作

由于Hbase本身存储数据是以二进制的格式将数据进行存储的,最终将数据存储到HDFS当中的HFile当中,所以这个持久化的过程必然会涉及到数据的序列化的问题。而在这个过程当中,GeoMesa主要用了Kryo的序列化方式。

kryo是一个高性能的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的体积。当然,序列化的方式有多种多样,GeoMesa也提供了例如avroprottobuffer等序列化机制,用户在进行开发的过程中,可以进行选择。

这个过程主要涉及到两个类,HbaseFeature类以及org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类。HbaseFeature类承担了数据封装和调用方法的作用,同样也是贯穿整个写操作的一个基础类。KryoFeatureSerialization类则是承担了数据序列化的基础性工作,具体实现序列化功能的是serialize方法:

1.     override def serialize(sf: SimpleFeature): Array[Byte] = {  

2.         val output = KryoFeatureSerialization.getOutput(null)  

3.         writeFeature(sf, output)  

4.         output.toBytes  

5.       }  

5.2  将数据插入到Hbase当中

经过比较复杂的底层转换,最终实现数据插入的是利用org.apache.hadoop.hbase.client.Put类当中的addImmutable方法。

1.     public Put addImmutable(byte [] family, byte [] qualifier, long ts, byte [] value) {  

2.         if (ts < 0) {  

3.           throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);  

4.         }  

5.         List<Cell> list = getCellList(family);  

6.         KeyValue kv = createPutKeyValue(family, qualifier, ts, value);  

7.         list.add(kv);  

8.         familyMap.put(family, list);  

9.         return this;  

10.     }  

这个方法里,在对KeyValue进行设置以后,进一步调用add方法,此时就将数据存储到Hbase当中了。

第二节   AccumuloDataStore的写入过程

第三节   BigtableDataStore的写入过程

第四节   CassandraDataStore的写入过程

第五节   LamdaDataStore的写入过程

第四章   功能方面的探索

第一节   单列变多列

1     改造的背景

1.1  GeoMesa-Hbase本身的问题

Geomesa在向Hbase写入数据时,是通过HTablePut对象来实现的。在写入的时候,需要指定列族名、列名和Cellvalue值,而这些指定的内容都需要经过序列化才能进入到Hbase当中。

在原生的GeoMesa当中,value值是将整个feature的所有信息都封装在一起,统一进行序列化的。虽然这样能够让存储的空间更小,避免了查询多个字段的问题,但是这样的方式使得查询没有针对性,在进行查询时,一些价值不大的信息也会被查询到,极大地占用了资源。

1.2  预期目标

希望能够通过一些源码级别的修改,找到GeoMesaHbase进行交互的位置,将插入过程从单列写入改造成为多列写入,将整个的feature信息存储由存储为一个列转变为分属性存储在不同的列中。以此来提升查询的针对性和效率。

2     过程分析

GeoMesa当中,主要负责插入数据是org.locationtech.geomesa.hbase.index.HBaseIndexAdapter类当中的createInsert方法,在这个方法当中,GeoMesa创建了Put对象,一个Put对象对应一个行键,这个行键是已经经过时空索引处理过后的二进制数组。

1.     override protected def createInsert(row: Array[Byte], feature: HBaseFeature): Mutation = {  

2.       val put = new Put(row)  

3.       feature.values.foreach(v => put.addImmutable(v.cf, v.cq, v.value))  

4.       feature.visibility.foreach(put.setCellVisibility)  

5.       put.setDurability(HBaseIndexAdapter.durability)  

6.   

然后这个put对象设定了若干个value,但是在默认情况下,这个feature对象中的value只有一个对象,就是整个feature当中的所有信息。而具体设定这些信息的则是通过org.locationtech.geomesa.hbase.data.HBaseFeature当中的values值来设定的。

1.     lazy val values: Seq[RowValue] = serializers.map { case (colFamily, serializer) =>  

2.         new RowValue(colFamily, HBaseColumnGroups.default, serializer.serialize(feature))  

3.       }  

从上面的代码可以看出,values的类型是封装了若干个RowValue对象的序列,而在其内部设定时依然是单个RowValue对象,也就是说,values封装的是serializers遍历之后的所有RowValue对象。

在创建RowValue对象时,其中设定列名的是第二个参数,默认情况下是HBaseColumnGroups.default,这个参数存在于org.locationtech.geomesa.hbase.index.HBaseColumnGroups类当中,默认情况下是“d”。也就是说默认情况下,写入的数据只有一个列,列名为“d”

1.  override val default: Array[Byte] = Bytes.toBytes("d")  

3     改造方法

3.1  切入点

由上述分析可以看出,主要可以修改的点有三处:

首先,可以修改HBaseColumnGroups类当中的default参数,这样的话就可以修改列名。但是这样的修改方式只是将列名修改过来了,虽然在Hbase当中会显示新的列名,但是本质上依然是整个feature进行序列化,整个存储的过程。而且这样的修改方式也会连带地修改列族名,由修改过的实验可以看出,在进行写入操作时,列族名也会调用这个default参数。

其次还可以修改HBaseIndexAdapter类当中的createInsert方法,在进行feature.values的遍历时,可以进行拆分。但是这种方式仍然是在RowValue封装好以后才进行遍历的,本质上value值已经被序列化了,再进行拆分已经没有价值了。

最好的方案是修改HBaseFeature类当中的values,因为RowValue是在这个函数内生成的,因此可以进行一些更为细致的操作。

3.2  修改过程

首先需要在values函数当中,判断feature里面的属性存储情况,以便抽取不同的数据,将这些数据分成不同的RowValue进行封装,存储。

1.  lazy val values: Seq[RowValue] = serializers.flatMap { case (colFamily, serializer) =>  

2.    // Fid  

3.    println(feature.getAttribute(0))  

4.    // Date  

5.    println(feature.getAttribute(1))  

6.    // Geometry  

7.    println(feature.getAttribute(2))  

8.    // Description  

9.    println(feature.getAttribute(3))  

执行修改后的程序,可以看到这些属性成功被取出来了。

接下来就是将这些数据封装在不同的RowValue对象当中。修改如下:

1.     lazy val values: Seq[RowValue] = serializers.flatMap { case (colFamily, serializer) =>  

2.       Seq(  

3.           new RowValue(colFamily, Bytes.toBytes("Fid"), Bytes.toBytes(feature.getAttribute(0).toString)),  

4.           new RowValue(colFamily, Bytes.toBytes("Date"), Bytes.toBytes(feature.getAttribute(1).toString)),  

5.           new RowValue(colFamily, Bytes.toBytes("Geometry"), Bytes.toBytes(feature.getAttribute(2).toString)),  

6.           new RowValue(colFamily, Bytes.toBytes("Description"), Bytes.toBytes(feature.getAttribute(3).toString)))  

7.     }  

3.3  运行结果

在默认情况下,value当中将feature的所有数据都放在了一起,呈现出来的就是只有一列。

而经过修改以后,不同属性的数据都已经被放在了不同的列当中。

 

4     存在的问题

最大的问题就是这样的修改可能会导致数据无法通过原有GeoMesa来进行查询的问题,因为原有GeoMesa依然是通过列族名“d”和列名“d”来进行查询的。

还有一个问题就是和原有序列化机制可能会出现冲突的问题。因为如果写入的序列化机制和读取的序列化机制产生冲突,就会报版本错误的问题。

第五章   异常处理

第六章   系统特性

第七章   思考和总结

共0条评论