开发者社区 > 博文 > GeoMesa-HBase原理篇——写入过程
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

GeoMesa-HBase原理篇——写入过程

  • 京东城市JUST团队
  • 2021-01-22
  • IP归属:未知
  • 27280浏览


1. 基本流程

建立索引(在此处的索引是动词,指在写入操作中,将数据进行解构并组织,使其能够更快地进行查询)、插入feature都是写操作。基于Hbase进行写单个feature的流程如下图所示(由于GeoMesa最具特色的是其基于Z曲线的三维时空索引,因此在此处以Z3索引为例进行展示):

以下是写入单个feature的流程:

  1. 客户端向GeoMesa发送写请求
  2. GeoMesa对传来的feature进行数据的预处理,主要包括featureID的处理、epoch的封装等等,将feature数据和SimpleFeatureType数据封装成为能够进行下一步操作的数据类型。
  3. GeoMesa中的索引机制会根据feature中的数据判断建立的索引类型,然后利用sfcurve中的相关方法,将时间、空间数据格式化,并整合成为一个序列。
  4. GeoMesa会利用Kryo序列化机制,将第三步产生的序列进行序列化,转换成为适合Hbase存储的格式。
  5. 最后利用缓存修改器中的put方法,调用Hbase API,将之前处理好的数据写入到Hbase当中。

2. 数据预处理

2.1 分析feature中的id

在调用write方法后,首先会进入org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter类中,调用write方法。

  1. override def write(): Unit = {
  2. if (currentFeature == null) {
  3. throw new IllegalStateException("next() must be called before write()")
  4. }
  5. writeFeature(currentFeature)
  6. currentFeature = null
  7. }

更进一步,geomesa会调用writeFeature方法,完成写入操作的具体实现就是利用此方法来实现的。

  1. protected def writeFeature(feature: SimpleFeature): Unit = {
  2. //查看是否有一个指定id,如果没有,系统会生成一个id
  3. val featureWithFid = GeoMesaFeatureWriter.featureWithFid(sft, feature)
  4. val wrapped = wrapFeature(featureWithFid)
  5. val converted = try {
  6. writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }
  7. } catch {
  8. case NonFatal(e) =>
  9. val attributes = s"${featureWithFid.getID}:${featureWithFid.getAttributes.asScala.mkString("|")}"
  10. throw new IllegalArgumentException(s"Error indexing feature '$attributes'", e)
  11. }
  12. converted.foreach { case (mutator, writes) => executeWrite(mutator, writes) }
  13. statUpdater.add(featureWithFid)
  14. }

由于用户在创建feature时,有可能设定了id,也可能没有设定id,因此在开始,geoemesa需要对feature进行判断,如果有设定id,就使用用户指定的id,如果没有,就使用uuid随机生成一个id。

具体实现是在featureWithFid方法当中。首先,系统会对feature进行判断,也就是Hint.PROVIED_FID,如果为true,就使用此fid,如果没有,就利用idGenerator进行生成id的操作。

  1. def featureWithFid(sft: SimpleFeatureType, feature: SimpleFeature): SimpleFeature = {
  2. if (feature.getUserData.containsKey(Hints.PROVIDED_FID)) {
  3. withFid(sft, feature, feature.getUserData.get(Hints.PROVIDED_FID).toString)
  4. } else if (feature.getUserData.containsKey(Hints.USE_PROVIDED_FID) &&
  5. feature.getUserData.get(Hints.USE_PROVIDED_FID).asInstanceOf[Boolean]) {
  6. feature
  7. } else {
  8. withFid(sft, feature, idGenerator.createId(sft, feature))
  9. }
  10. }

2.2 Feature格式的转换

由于原生的SimpleFeature当中并没有包含Hbase相关的一些属性,不利于SimpleFeature最终插入到Hbase表当中,因此在这一步,GeoMesa将SimpleFeature对象转换为HbaseFeature对象。

这个过程是在org.locationtech.geomesa.hbase.data.HBaseFeatureWriter类当中进行的。具体实现方法是wrapFeature方法。

override protected def wrapFeature(feature: SimpleFeature): HBaseFeature = wrapper(feature) 

接下来调用org.locationtech.geomesa.hbase.data.HBaseFeature类当中的wrapper方法。

  1. def wrapper(sft: SimpleFeatureType): (SimpleFeature) => HBaseFeature = {
  2. val serializers = HBaseColumnGroups.serializers(sft)
  3. val idSerializer = GeoMesaFeatureIndex.idToBytes(sft)
  4. (feature) => new HBaseFeature(feature, serializers, idSerializer)
  5. }

从这个方法中可以看出,在转换Feature格式的过程当中,新的HbaseFeature当中又封装了一些序列化的对象。IdSerializer在此时是空的对象,而serializers对象已经被赋值了,是KyroFeatureSerializer中的内部类MutableActiveSerializer的一个实例。可以看出,在进行数据序列化的过程当中,GeoMesa-Hbase使用的是Kyro序列化机制。

2.3 确定分片

在进行格式转化之后,回到GeoMesaFeatureWriter类的writeFeature方法当中进行封装。

writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }    

但是这个时候的数据虽然可以进行序列化、也能够调用Hbase的API插入到Hbase当中,但是由于时空数据往往会分布不均匀,可能会产生严重的数据倾斜问题,所以这是需要将数据进行分片存储。这个方法是在org.locationtech.geomesa.index.index.BaseFeatureIndex的writer方法当中实现的。

  1. override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {
  2. val sharing = sft.getTableSharingBytes
  3. val shards = shardStrategy(sft)
  4. val toIndexKey = keySpace.toIndexKeyBytes(sft)
  5. mutator(sharing, shards, toIndexKey, createInsert)
  6. }

在这个方法当中看到有四个参数,其中比较重要的是shards和toIndexKey,shards是负责分片存储策略实现的,toIndexKey是负责建立索引的参数。

在进一步的方法调用中,GeoMesa会判断id是用户指定的还是系统自动生成的uuid,如果是用户指定的,就会调用StringToBytes方法,将用户指定的id当成一个字符串来进行hash运算。如果是系统生成的uuid,GeoMesa就会调用uuidToBytes方法,将uuid直接进行hash运算。

在执行完上述操作后,geomesa就开始调用toIndexKey来进行索引的建立工作了。

3. Z曲线处理

3.1 获取Z曲线的value值

在GeoMesa将feature存入到Hbase的过程当中,GeoMesa会生成相应的索引值,而这个过程是通过调用org.locationtech.geomesa.index.index.z3.Z3IndexKeySpace中的toIndexKeyBytes方法实现的。

  1. override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {
  2. val z3 = sfc(sft.getZ3Interval)
  3. val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)
  4. val dtgIndex = sft.getDtgIndex.getOrElse(throw new IllegalStateException("Z3 index requires a valid date"))
  5. val timeToIndex = BinnedTime.timeToBinnedTime(sft.getZ3Interval)
  6. getZValueBytes(z3, geomIndex, dtgIndex, timeToIndex, lenient)
  7. }

而在其进一步调用的getZValueBytes方法当中,可以看到这个索引值具体是如何封装的。

  1. private def getZValueBytes(z3: Z3SFC,
  2. geomIndex: Int,
  3. dtgIndex: Int,
  4. timeToIndex: TimeToBinnedTime,
  5. lenient: Boolean)
  6. (prefix: Seq[Array[Byte]],
  7. feature: SimpleFeature,
  8. suffix: Array[Byte]): Seq[Array[Byte]] = {
  9. val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]
  10. if (geom == null) {
  11. throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")
  12. }
  13. val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]
  14. val time = if (dtg == null) { 0 } else { dtg.getTime }
  15. val BinnedTime(b, t) = timeToIndex(time)
  16. val z = try { z3.index(geom.getX, geom.getY, t, lenient).z } catch {
  17. case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry/time: $geom,$dtg", e)
  18. }
  19. // 创建字节数组,用来封装索引信息
  20. val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)
  21. var i = 0
  22. prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }
  23. ByteArrays.writeShort(b, bytes, i)
  24. ByteArrays.writeLong(z, bytes, i + 2)
  25. System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)
  26. Seq(bytes)
  27. }

在方法的前半段,从第9行到第17行,该方法实现了如下功能:首先将地理信息提取出来,转换为本身的Point对象,并且判断是否有地理信息,之后会将时间信息提取出来,调用相关的方法进一步建立索引。

3.2 将时间信息利用Binned机制进行转换

由于时间是无限的,因此为了适应GeoHash二分编码的规则,与经纬度信息形成八叉树的索引结构,GeoMesa内置了Binned机制,将时间以公元1970年1月1日零时零分为起始,按照天、周(默认)、月、年四种区间,形成相对有限的chunk进行计量,以利于后期的二分操作。

但是因为索引长度是有限的,因此这个方式会有一些局限性,主要体现在表示的时间长度比较有限。如果以天为单位,偏移量的单位就是毫秒,最大表示的时间为公元2059年9月18日;如果以周为单位,这个也是GeoMesa默认的一种设置,偏移量的单位就是秒,最大表示的时间为公元2598年1月4日;如果以月为单位,偏移量的单位也是秒,最大表示的时间为公元4700年8月31日;如果以年为单位,偏移量的单位就是分钟,最大表示的时间为公元34737年12月31日。

一般情况下,由于项目生命周期和时间粒度的权衡下,以周为时间周期进行计算是比较合理的,因此现在的GeoMesa默认的设置同样是这样的。

这个过程具体是由org.locationtech.geomesa.curve.BinnedTime类承担的,而具体到现在写入数据的过程,主要是调用timeToBinnedTime方法来实现的。

  1. def timeToBinnedTime(period: TimePeriod): TimeToBinnedTime = {
  2. period match {
  3. case TimePeriod.Day => toDayAndMillis
  4. case TimePeriod.Week => toWeekAndSeconds
  5. case TimePeriod.Month => toMonthAndSeconds
  6. case TimePeriod.Year => toYearAndMinutes
  7. }
  8. }

更进一步的调用,可以点开toWeekAndSeconds方法进行查看。

3.3 建立时空索引

这一步是GeoMesa建立时空索引的核心部分,在执行这一步之前,用户需要将经纬度、时间的参数都准备好,经纬度的信息都是Double类型的数据,时间的信息需要转换成为Long类型的数据,然后进入org.locationtech.geomesa.curve.Z3SFC类中,执行index方法。

  1. override def index(x: Double, y: Double, t: Long, lenient: Boolean = false): Z3 = {
  2. try {
  3. require(x >= lon.min && x <= lon.max && y >= lat.min && y <= lat.max && t >= time.min && t <= time.max,
  4. s"Value(s) out of bounds ([${lon.min},${lon.max}], [${lat.min},${lat.max}], [${time.min},${time.max}]): $x, $y, $t")
  5. Z3(lon.normalize(x), lat.normalize(y), time.normalize(t))
  6. } catch {
  7. case _: IllegalArgumentException if lenient => lenientIndex(x, y, t)
  8. }
  9. }

在上述源码的第4行,可以看到,经度、维度和时间的数据都调用了normalize方法。这是对这些数据进行格式化处理。

在此之后,对于经过格式化的数据,GeoMesa利用org.locationtech.sfcurve.zorder.Z3类对这些数据进行了再次的封装。

  1. def apply(x: Int, y: Int, z: Int): Z3 = {
  2. new Z3(split(x) | split(y) << 1 | split(z) << 2)
  3. }

其中,会调用一些切分、编码、解码的方法,在此不赘述。

4. 数据序列化

在前述的Z3IndexKeySpace类中的getZValueBytes方法的后半段(第20行到第28行),主要实现了前面处理数据的序列化操作。

  1. private def getZValueBytes(z3: Z3SFC,
  2. geomIndex: Int,
  3. dtgIndex: Int,
  4. timeToIndex: TimeToBinnedTime,
  5. lenient: Boolean)
  6. (prefix: Seq[Array[Byte]],
  7. feature: SimpleFeature,
  8. suffix: Array[Byte]): Seq[Array[Byte]] = {
  9. ...
  10. // 创建字节数组,用来封装索引信息
  11. val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)
  12. var i = 0
  13. prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }
  14. ByteArrays.writeShort(b, bytes, i)
  15. ByteArrays.writeLong(z, bytes, i + 2)
  16. System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)
  17. Seq(bytes)
  18. }

4.1 将数据封装成Long类型的数据

在上一步中,数据已经被整合成为符合Z3填充曲线规则的格式了,同时用Z3封装好了,接下来就是将这些数据解析成为一个long类型数据并进一步转化成为能够存入到Hbase当中的二进制序列。

实现这个功能的类是org.locationtech.geomesa.utils.index.ByteArrays类,而实现这个过程需要先后调用其中的writeShort方法和writeLong方法。如下:

WriteShort方法:

  1. def writeShort(short: Short, bytes: Array[Byte], offset: Int = 0): Unit = {
  2. bytes(offset) = (short >> 8).asInstanceOf[Byte]
  3. bytes(offset + 1) = short.asInstanceOf[Byte]
  4. }

WriteLong方法:

  1. def writeLong(long: Long, bytes: Array[Byte], offset: Int = 0): Unit = {
  2. bytes(offset ) = ((long >> 56) & 0xff).asInstanceOf[Byte]
  3. bytes(offset + 1) = ((long >> 48) & 0xff).asInstanceOf[Byte]
  4. bytes(offset + 2) = ((long >> 40) & 0xff).asInstanceOf[Byte]
  5. bytes(offset + 3) = ((long >> 32) & 0xff).asInstanceOf[Byte]
  6. bytes(offset + 4) = ((long >> 24) & 0xff).asInstanceOf[Byte]
  7. bytes(offset + 5) = ((long >> 16) & 0xff).asInstanceOf[Byte]
  8. bytes(offset + 6) = ((long >> 8) & 0xff).asInstanceOf[Byte]
  9. bytes(offset + 7) = (long & 0xff).asInstanceOf[Byte]
  10. }

4.2 利用mutator将key数据进行封装

上述数据都处理完成以后,mutator会将index、shards、feature等数据进行封装

  1. private def mutator(sharing: Array[Byte],
  2. shards: ShardStrategy,
  3. toIndexKey: (Seq[Array[Byte]], SimpleFeature, Array[Byte]) => Seq[Array[Byte]],
  4. operation: (Array[Byte], F) => W)
  5. (feature: F): Seq[W] = {
  6. toIndexKey(Seq(sharing, shards(feature)), feature.feature, feature.idBytes).map(operation.apply(_, feature))
  7. }

5. 写入HBase

Geomesa中利用HTable与Hbase进行交互,而封装数据的put对象存在于org.locationtech.geomesa.hbase.index.HBaseIndexAdapter类当中,通过调用createInsert方法,来实现这个过程,该方法返回一个Mutation对象,其中就封装了数据的一次写操作。

  1. override protected def createInsert(row: Array[Byte], feature: HBaseFeature): Mutation = {
  2. val put = new Put(row)
  3. feature.values.foreach(v => put.addImmutable(v.cf, v.cq, v.value))
  4. feature.visibility.foreach(put.setCellVisibility)
  5. put.setDurability(HBaseIndexAdapter.durability)
  6. }

5.1 插入之前的序列化操作

由于Hbase本身存储数据是以二进制的格式将数据进行存储的,最终将数据存储到HDFS当中的HFile当中,所以这个持久化的过程必然会涉及到数据的序列化的问题。而在这个过程当中,GeoMesa主要用了Kryo的序列化方式。

kryo是一个高性能的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的体积。当然,序列化的方式有多种多样,GeoMesa也提供了例如avro、prottobuffer等序列化机制,用户在进行开发的过程中,可以进行选择。

这个过程主要涉及到两个类,HbaseFeature类以及org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类。HbaseFeature类承担了数据封装和调用方法的作用,同样也是贯穿整个写操作的一个基础类。KryoFeatureSerialization类则是承担了数据序列化的基础性工作,具体实现序列化功能的是serialize方法:

  1. override def serialize(sf: SimpleFeature): Array[Byte] = {
  2. val output = KryoFeatureSerialization.getOutput(null)
  3. writeFeature(sf, output)
  4. output.toBytes
  5. }

5.2 将数据插入到HBase当中

经过比较复杂的底层转换,最终实现数据插入的是利用org.apache.hadoop.hbase.client.Put类当中的addImmutable方法。

  1. public Put addImmutable(byte [] family, byte [] qualifier, long ts, byte [] value) {
  2. if (ts < 0) {
  3. throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
  4. }
  5. List<Cell> list = getCellList(family);
  6. KeyValue kv = createPutKeyValue(family, qualifier, ts, value);
  7. list.add(kv);
  8. familyMap.put(family, list);
  9. return this;
  10. }

 

共0条评论