1. 基本流程
建立索引(在此处的索引是动词,指在写入操作中,将数据进行解构并组织,使其能够更快地进行查询)、插入feature都是写操作。基于Hbase进行写单个feature的流程如下图所示(由于GeoMesa最具特色的是其基于Z曲线的三维时空索引,因此在此处以Z3索引为例进行展示):
以下是写入单个feature的流程:
- 客户端向GeoMesa发送写请求
- GeoMesa对传来的feature进行数据的预处理,主要包括featureID的处理、epoch的封装等等,将feature数据和SimpleFeatureType数据封装成为能够进行下一步操作的数据类型。
- GeoMesa中的索引机制会根据feature中的数据判断建立的索引类型,然后利用sfcurve中的相关方法,将时间、空间数据格式化,并整合成为一个序列。
- GeoMesa会利用Kryo序列化机制,将第三步产生的序列进行序列化,转换成为适合Hbase存储的格式。
- 最后利用缓存修改器中的put方法,调用Hbase API,将之前处理好的数据写入到Hbase当中。
2. 数据预处理
2.1 分析feature中的id
在调用write方法后,首先会进入org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter类中,调用write方法。
- override def write(): Unit = {
- if (currentFeature == null) {
- throw new IllegalStateException("next() must be called before write()")
- }
- writeFeature(currentFeature)
- currentFeature = null
- }
更进一步,geomesa会调用writeFeature方法,完成写入操作的具体实现就是利用此方法来实现的。
- protected def writeFeature(feature: SimpleFeature): Unit = {
- //查看是否有一个指定id,如果没有,系统会生成一个id
- val featureWithFid = GeoMesaFeatureWriter.featureWithFid(sft, feature)
- val wrapped = wrapFeature(featureWithFid)
-
- val converted = try {
- writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }
- } catch {
- case NonFatal(e) =>
- val attributes = s"${featureWithFid.getID}:${featureWithFid.getAttributes.asScala.mkString("|")}"
- throw new IllegalArgumentException(s"Error indexing feature '$attributes'", e)
- }
- converted.foreach { case (mutator, writes) => executeWrite(mutator, writes) }
- statUpdater.add(featureWithFid)
- }
由于用户在创建feature时,有可能设定了id,也可能没有设定id,因此在开始,geoemesa需要对feature进行判断,如果有设定id,就使用用户指定的id,如果没有,就使用uuid随机生成一个id。
具体实现是在featureWithFid方法当中。首先,系统会对feature进行判断,也就是Hint.PROVIED_FID,如果为true,就使用此fid,如果没有,就利用idGenerator进行生成id的操作。
- def featureWithFid(sft: SimpleFeatureType, feature: SimpleFeature): SimpleFeature = {
- if (feature.getUserData.containsKey(Hints.PROVIDED_FID)) {
- withFid(sft, feature, feature.getUserData.get(Hints.PROVIDED_FID).toString)
- } else if (feature.getUserData.containsKey(Hints.USE_PROVIDED_FID) &&
- feature.getUserData.get(Hints.USE_PROVIDED_FID).asInstanceOf[Boolean]) {
- feature
- } else {
- withFid(sft, feature, idGenerator.createId(sft, feature))
- }
- }
2.2 Feature格式的转换
由于原生的SimpleFeature当中并没有包含Hbase相关的一些属性,不利于SimpleFeature最终插入到Hbase表当中,因此在这一步,GeoMesa将SimpleFeature对象转换为HbaseFeature对象。
这个过程是在org.locationtech.geomesa.hbase.data.HBaseFeatureWriter类当中进行的。具体实现方法是wrapFeature方法。
override protected def wrapFeature(feature: SimpleFeature): HBaseFeature = wrapper(feature)
接下来调用org.locationtech.geomesa.hbase.data.HBaseFeature类当中的wrapper方法。
- def wrapper(sft: SimpleFeatureType): (SimpleFeature) => HBaseFeature = {
- val serializers = HBaseColumnGroups.serializers(sft)
- val idSerializer = GeoMesaFeatureIndex.idToBytes(sft)
- (feature) => new HBaseFeature(feature, serializers, idSerializer)
- }
从这个方法中可以看出,在转换Feature格式的过程当中,新的HbaseFeature当中又封装了一些序列化的对象。IdSerializer在此时是空的对象,而serializers对象已经被赋值了,是KyroFeatureSerializer中的内部类MutableActiveSerializer的一个实例。可以看出,在进行数据序列化的过程当中,GeoMesa-Hbase使用的是Kyro序列化机制。
2.3 确定分片
在进行格式转化之后,回到GeoMesaFeatureWriter类的writeFeature方法当中进行封装。
writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }
但是这个时候的数据虽然可以进行序列化、也能够调用Hbase的API插入到Hbase当中,但是由于时空数据往往会分布不均匀,可能会产生严重的数据倾斜问题,所以这是需要将数据进行分片存储。这个方法是在org.locationtech.geomesa.index.index.BaseFeatureIndex的writer方法当中实现的。
- override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {
- val sharing = sft.getTableSharingBytes
- val shards = shardStrategy(sft)
- val toIndexKey = keySpace.toIndexKeyBytes(sft)
- mutator(sharing, shards, toIndexKey, createInsert)
- }
在这个方法当中看到有四个参数,其中比较重要的是shards和toIndexKey,shards是负责分片存储策略实现的,toIndexKey是负责建立索引的参数。
在进一步的方法调用中,GeoMesa会判断id是用户指定的还是系统自动生成的uuid,如果是用户指定的,就会调用StringToBytes方法,将用户指定的id当成一个字符串来进行hash运算。如果是系统生成的uuid,GeoMesa就会调用uuidToBytes方法,将uuid直接进行hash运算。
在执行完上述操作后,geomesa就开始调用toIndexKey来进行索引的建立工作了。
3. Z曲线处理
3.1 获取Z曲线的value值
在GeoMesa将feature存入到Hbase的过程当中,GeoMesa会生成相应的索引值,而这个过程是通过调用org.locationtech.geomesa.index.index.z3.Z3IndexKeySpace中的toIndexKeyBytes方法实现的。
- override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {
- val z3 = sfc(sft.getZ3Interval)
- val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)
- val dtgIndex = sft.getDtgIndex.getOrElse(throw new IllegalStateException("Z3 index requires a valid date"))
- val timeToIndex = BinnedTime.timeToBinnedTime(sft.getZ3Interval)
-
- getZValueBytes(z3, geomIndex, dtgIndex, timeToIndex, lenient)
- }
而在其进一步调用的getZValueBytes方法当中,可以看到这个索引值具体是如何封装的。
- private def getZValueBytes(z3: Z3SFC,
- geomIndex: Int,
- dtgIndex: Int,
- timeToIndex: TimeToBinnedTime,
- lenient: Boolean)
- (prefix: Seq[Array[Byte]],
- feature: SimpleFeature,
- suffix: Array[Byte]): Seq[Array[Byte]] = {
- val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]
- if (geom == null) {
- throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")
- }
- val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]
- val time = if (dtg == null) { 0 } else { dtg.getTime }
- val BinnedTime(b, t) = timeToIndex(time)
- val z = try { z3.index(geom.getX, geom.getY, t, lenient).z } catch {
- case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry/time: $geom,$dtg", e)
- }
-
- // 创建字节数组,用来封装索引信息
- val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)
- var i = 0
- prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }
- ByteArrays.writeShort(b, bytes, i)
- ByteArrays.writeLong(z, bytes, i + 2)
- System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)
- Seq(bytes)
- }
在方法的前半段,从第9行到第17行,该方法实现了如下功能:首先将地理信息提取出来,转换为本身的Point对象,并且判断是否有地理信息,之后会将时间信息提取出来,调用相关的方法进一步建立索引。
3.2 将时间信息利用Binned机制进行转换
由于时间是无限的,因此为了适应GeoHash二分编码的规则,与经纬度信息形成八叉树的索引结构,GeoMesa内置了Binned机制,将时间以公元1970年1月1日零时零分为起始,按照天、周(默认)、月、年四种区间,形成相对有限的chunk进行计量,以利于后期的二分操作。
但是因为索引长度是有限的,因此这个方式会有一些局限性,主要体现在表示的时间长度比较有限。如果以天为单位,偏移量的单位就是毫秒,最大表示的时间为公元2059年9月18日;如果以周为单位,这个也是GeoMesa默认的一种设置,偏移量的单位就是秒,最大表示的时间为公元2598年1月4日;如果以月为单位,偏移量的单位也是秒,最大表示的时间为公元4700年8月31日;如果以年为单位,偏移量的单位就是分钟,最大表示的时间为公元34737年12月31日。
一般情况下,由于项目生命周期和时间粒度的权衡下,以周为时间周期进行计算是比较合理的,因此现在的GeoMesa默认的设置同样是这样的。
这个过程具体是由org.locationtech.geomesa.curve.BinnedTime类承担的,而具体到现在写入数据的过程,主要是调用timeToBinnedTime方法来实现的。
- def timeToBinnedTime(period: TimePeriod): TimeToBinnedTime = {
- period match {
- case TimePeriod.Day => toDayAndMillis
- case TimePeriod.Week => toWeekAndSeconds
- case TimePeriod.Month => toMonthAndSeconds
- case TimePeriod.Year => toYearAndMinutes
- }
- }
更进一步的调用,可以点开toWeekAndSeconds方法进行查看。
3.3 建立时空索引
这一步是GeoMesa建立时空索引的核心部分,在执行这一步之前,用户需要将经纬度、时间的参数都准备好,经纬度的信息都是Double类型的数据,时间的信息需要转换成为Long类型的数据,然后进入org.locationtech.geomesa.curve.Z3SFC类中,执行index方法。
- override def index(x: Double, y: Double, t: Long, lenient: Boolean = false): Z3 = {
- try {
- require(x >= lon.min && x <= lon.max && y >= lat.min && y <= lat.max && t >= time.min && t <= time.max,
- s"Value(s) out of bounds ([${lon.min},${lon.max}], [${lat.min},${lat.max}], [${time.min},${time.max}]): $x, $y, $t")
- Z3(lon.normalize(x), lat.normalize(y), time.normalize(t))
- } catch {
- case _: IllegalArgumentException if lenient => lenientIndex(x, y, t)
- }
- }
在上述源码的第4行,可以看到,经度、维度和时间的数据都调用了normalize方法。这是对这些数据进行格式化处理。
在此之后,对于经过格式化的数据,GeoMesa利用org.locationtech.sfcurve.zorder.Z3类对这些数据进行了再次的封装。
- def apply(x: Int, y: Int, z: Int): Z3 = {
- new Z3(split(x) | split(y) << 1 | split(z) << 2)
- }
其中,会调用一些切分、编码、解码的方法,在此不赘述。
4. 数据序列化
在前述的Z3IndexKeySpace类中的getZValueBytes方法的后半段(第20行到第28行),主要实现了前面处理数据的序列化操作。
- private def getZValueBytes(z3: Z3SFC,
- geomIndex: Int,
- dtgIndex: Int,
- timeToIndex: TimeToBinnedTime,
- lenient: Boolean)
- (prefix: Seq[Array[Byte]],
- feature: SimpleFeature,
- suffix: Array[Byte]): Seq[Array[Byte]] = {
- ...
-
- // 创建字节数组,用来封装索引信息
- val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)
- var i = 0
- prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }
- ByteArrays.writeShort(b, bytes, i)
- ByteArrays.writeLong(z, bytes, i + 2)
- System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)
- Seq(bytes)
- }
4.1 将数据封装成Long类型的数据
在上一步中,数据已经被整合成为符合Z3填充曲线规则的格式了,同时用Z3封装好了,接下来就是将这些数据解析成为一个long类型数据并进一步转化成为能够存入到Hbase当中的二进制序列。
实现这个功能的类是org.locationtech.geomesa.utils.index.ByteArrays类,而实现这个过程需要先后调用其中的writeShort方法和writeLong方法。如下:
WriteShort方法:
- def writeShort(short: Short, bytes: Array[Byte], offset: Int = 0): Unit = {
- bytes(offset) = (short >> 8).asInstanceOf[Byte]
- bytes(offset + 1) = short.asInstanceOf[Byte]
- }
WriteLong方法:
- def writeLong(long: Long, bytes: Array[Byte], offset: Int = 0): Unit = {
- bytes(offset ) = ((long >> 56) & 0xff).asInstanceOf[Byte]
- bytes(offset + 1) = ((long >> 48) & 0xff).asInstanceOf[Byte]
- bytes(offset + 2) = ((long >> 40) & 0xff).asInstanceOf[Byte]
- bytes(offset + 3) = ((long >> 32) & 0xff).asInstanceOf[Byte]
- bytes(offset + 4) = ((long >> 24) & 0xff).asInstanceOf[Byte]
- bytes(offset + 5) = ((long >> 16) & 0xff).asInstanceOf[Byte]
- bytes(offset + 6) = ((long >> 8) & 0xff).asInstanceOf[Byte]
- bytes(offset + 7) = (long & 0xff).asInstanceOf[Byte]
- }
4.2 利用mutator将key数据进行封装
上述数据都处理完成以后,mutator会将index、shards、feature等数据进行封装
- private def mutator(sharing: Array[Byte],
- shards: ShardStrategy,
- toIndexKey: (Seq[Array[Byte]], SimpleFeature, Array[Byte]) => Seq[Array[Byte]],
- operation: (Array[Byte], F) => W)
- (feature: F): Seq[W] = {
- toIndexKey(Seq(sharing, shards(feature)), feature.feature, feature.idBytes).map(operation.apply(_, feature))
- }
5. 写入HBase
Geomesa中利用HTable与Hbase进行交互,而封装数据的put对象存在于org.locationtech.geomesa.hbase.index.HBaseIndexAdapter类当中,通过调用createInsert方法,来实现这个过程,该方法返回一个Mutation对象,其中就封装了数据的一次写操作。
- override protected def createInsert(row: Array[Byte], feature: HBaseFeature): Mutation = {
- val put = new Put(row)
- feature.values.foreach(v => put.addImmutable(v.cf, v.cq, v.value))
- feature.visibility.foreach(put.setCellVisibility)
- put.setDurability(HBaseIndexAdapter.durability)
- }
5.1 插入之前的序列化操作
由于Hbase本身存储数据是以二进制的格式将数据进行存储的,最终将数据存储到HDFS当中的HFile当中,所以这个持久化的过程必然会涉及到数据的序列化的问题。而在这个过程当中,GeoMesa主要用了Kryo的序列化方式。
kryo是一个高性能的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的体积。当然,序列化的方式有多种多样,GeoMesa也提供了例如avro、prottobuffer等序列化机制,用户在进行开发的过程中,可以进行选择。
这个过程主要涉及到两个类,HbaseFeature类以及org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类。HbaseFeature类承担了数据封装和调用方法的作用,同样也是贯穿整个写操作的一个基础类。KryoFeatureSerialization类则是承担了数据序列化的基础性工作,具体实现序列化功能的是serialize方法:
- override def serialize(sf: SimpleFeature): Array[Byte] = {
- val output = KryoFeatureSerialization.getOutput(null)
- writeFeature(sf, output)
- output.toBytes
- }
5.2 将数据插入到HBase当中
经过比较复杂的底层转换,最终实现数据插入的是利用org.apache.hadoop.hbase.client.Put类当中的addImmutable方法。
- public Put addImmutable(byte [] family, byte [] qualifier, long ts, byte [] value) {
- if (ts < 0) {
- throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
- }
- List<Cell> list = getCellList(family);
- KeyValue kv = createPutKeyValue(family, qualifier, ts, value);
- list.add(kv);
- familyMap.put(family, list);
- return this;
- }