开发者社区 > 博文 > GeoMesa-Hbase源码篇 ——序列化机制
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

GeoMesa-Hbase源码篇 ——序列化机制

  • 京东城市JUST团队
  • 2021-01-18
  • IP归属:未知
  • 1150浏览


第一章   各种序列化机制

第一节   Kryo序列化机制

1     Kryo概述

Kryo是一种快速高效的Java对象图(Object graph)序列化框架。该项目的目标是打造速度、效率和易于使用的API。当对象需要持久化时,无论是用于文件、数据库还是通过网络,该项目都很有用。

Kryo还可以执行自动深层浅层的复制/克隆。这是从对象直接复制到对象,而不是objectbytesobject

1.1    Java对象图(Object graph

对于传统的数据模型当中,例如在一个UML图当中,指定的是类之间的关系。而Java对象图指定的是对象之间的关系。

在物理表示上,对象图是有向图,可能是循环的。当存储在内存中时,对象使用其属性和函数表占用内存的不同段,而关系由高级语言中的指针或者不同类型的全局程序来进行处理。

比如说,一个Car类当中由Wheel类组成,在UML图当中,表示为CarWheel,指定的仅仅是类之间的关系。而在对象图当中,由于一辆车会有四只轮子,可以标定这四个Wheel对象为frontLeftfrontRightbackLeftbackRight。邻接列表就能够表示为cCar{frontLeftWheelfrontRightWheelbackLeftWheelbackRightWheel}

1.2    深拷贝(deep clone)和浅拷贝(shallow copy

浅拷贝只是复制了对象中对其他对象的引用,而没有再复制引用所指向的实际的对象。深拷贝与浅拷贝不同,在进行深拷贝的过程当中,实际上将对象所指向的对象也重新创建了一份。

如下图,现在需要拷贝一个obj1对象,它包含了两个对象:containedObj1containedObj2。针对不同的场景,可能会采用不同的拷贝方案。

       对于浅拷贝来说,我们需要做的就是创建一个新的obj1对象,然后将这个新对象的引用指向原有的obj1对象所属的containedObj1对象和containedObj2对象。而在java当中实现浅拷贝可以通过clone方法来实现。

       而对于深拷贝来说,我们需要做的首先是创建一个obj2对象,然后创建它的所属对象containedObj3对象和containedObj4对象,最后将obj2对象的引用指向这两个新创建的子对象。在java当中实现深拷贝主要是通过序列化和反序列化来实现的,所以实现深拷贝的前提是对象一定是能够被序列化的,这一点很重要。

这两种拷贝方式虽然生成的原理是不同的,但是在效果上有很多共同点:

1.     浅拷贝和深拷贝都是通过复制创建了一个新的对象,也就是 x.clone() != x

2.     浅拷贝和深拷贝的对象所属的类是一致的,也就是 x.clone().getClass() == x.getClass()

3.     浅拷贝和深拷贝的对象在某些情况下, 也就是从内容角度来说,它所包含的内容是一致的 x.clone().equals(x)

2     Kryo特性

2.1    可变长度编码

IO类提供了读取和写入变长intvarInt)和longvarLong)值的方法。这是通过使用每个字节的第8位来指示是否使用更多的字节来完成序列化的过程,这意味着varInt使用1-5个字节来表示int类型数据,而varLong使用1-9个字节来表示long类型的数据。使用可变长度编码会造成更大的系统开销,但是序列化的数据会比java原生的序列化机制小很多。

在写入可变长编码时,可以针对正值或负值来优化该值。例如,如果进行了正值优化,一个字节就能够表示从0127的所有数值,两个字节就能够表示从12816383中的所有数字。但是如果没有进行正值优化,一个字节就只能表示从-6463中的所有数字,两个字节就只能够表示从648191和从-8192-65之间的所有数字。因此在实际业务当中,有时没有负数,因此需要根据具体的业务需求进行具体的优化。

在进行可变长度编码也会对输入和输出缓冲区造成一些压力,为了解决这个问题,一些方法允许缓冲区决定是写入固定长度还是可变长度值。

方法

描述

writeInt(int)

写入一个4个字节的int类型

writeVarInt(int, boolean)

写入一个1-5个字节的int类型

writeInt(int, boolean)

写入一个4字节或者1-5字节的int类型(这个模式由缓冲区来决定)

writeLong(long)

写入一个8字节的long类型

writeVarLong(long, boolean)

写入一个1-9字节的long类型

writeLong(long, boolean)

写入一个8字节的或者1-9字节的long类型(这个模式由缓冲区来决定)

       除了上述这些单个数据的序列化方法,kryo也可以对多个数据进行批处理。而这些方法当中,有一些方法也是能够支持可变长编码的。如下表

方法

描述

writeInts(int[])

批量写入4个字节的int类型数据

writeInts(int[], boolean)

批量写入4个字节或者1-5个字节的int类型(这个模式由缓冲区来决定)

writeLongs(long[])

批量写入8字节的long类型

writeLongs(long[], boolean)

批量写入8字节的或者1-9字节的long类型(这个模式由缓冲区来决定)

 

3     Kryo操作方法

3.1    Kryo的安装

如果使用Kryo的官方版本,需要在pom.xml文件当中添加如下代码:

1.     <dependency>  

2.         <groupId>com.esotericsoftware</groupId>  

3.         <artifactId>kryo</artifactId>  

4.         <version>4.0.0</version>  

5.     </dependency> 

如果由于在类路径当中使用了不同版本的asm而遇到问题,则需要使用如下依赖:

1.     <dependency>  

2.         <groupId>com.esotericsoftware</groupId>  

3.         <artifactId>kryo-shaded</artifactId>  

4.         <version>4.0.0</version>  

5.     </dependency>  

如果要测试kryo的最新快照,需要在pom.xml中添加如下代码段:

1.     <repository>  

2.        <id>sonatype-snapshots</id>  

3.        <name>sonatype snapshots repo</name>  

4.        <url>https://oss.sonatype.org/content/repositories/snapshots</url>  

5.     </repository>  

6.       

7.     <dependency>  

8.        <groupId>com.esotericsoftware</groupId>  

9.        <artifactId>kryo</artifactId>  

10.       <version>4.0.1-SNAPSHOT</version>  

11.   </dependency>  

如果您使用没有 Maven Kryo,请注意 Kryo jar 文件有几个外部依赖关系,它们的JAR也需要添加到类路径中。这些依赖关系是 MinLog logging library  Objenesis library

1.     <repository>  

2.         <groupId>com.esotericsoftware</groupId>  

3.         <artifactId>minlog</artifactId>  

4.         <version>1.3.2-SNAPSHOT</version>  

5.     </dependency>  

4     Kryo性能测试

 

1.     kryo序列化后的字节数组是:  

2.     1 40 0 1 -125 -27 -68 -96 -28 -72 -119  

3.       

4.     java序列化后的字节数组是:  

5.     -84 -19 0 5 115 114 0 28 99 111 109 46 106 100 46 105 99 105 116 121 46 100 109 112 46 107 114 121 111 46 80 101 114 115 111 110 -69 -52 107 -91 16 46 117 81 2 0 3 73 0 3 97 103 101 76 0 3 109 97 112 116 0 15 76 106 97 118 97 47 117 116 105 108 47 77 97 112 59 76 0 4 110 97 109 101 116 0 18 76 106 97 118 97 47 108 97 110 103 47 83 116 114 105 110 103 59 120 112 0 0 0 20 112 116 0 6 -27 -68 -96 -28 -72 -119 

 

 

第二章   GeoMesa序列化机制

第一节   概述

GeoMesa当中,由于大量需要与内存交互以及网络传输,因此序列化的过程也就必不可少了。与很多大数据的框架一样,鉴于Java原生的序列化机制会造成大量的数据冗余,而且序列化的时间也比较长,因此GeoMesa同样采用了一些序列化框架来解决这个问题。

对于序列化框架的选择,原生的GeoMesa采用了两种选择,一种是avro框架,这个框架是hadoop生态的很多框架所通用的一种框架,因此在与hdfshbase进行网络通信的时候会有比较好的兼容性;另一种是kryo框架,也是GeoMesa默认的一种对value值进行序列化的一种框架。

1.     object SerializationType extends Enumeration {  

2.       type SerializationType = Value  

3.       val KRYO: Value = Value("kryo")  

4.       val AVRO: Value = Value("avro")  

5.     }  

对于数据进行序列化的过程主要分为两大部分:对于SimpleFeatureType的序列化过程、对SimpleFeature的序列化过程。

第二节   GeoMesavalue结构

 

第三节   GeoMesa序列化流程

1     流程概述

首先,在利用GeoMesa进行数据的写入和查询的过程当中,一般直接操作的都是SimpleFeature对象。那么如何判定这些SimpleFeature对象是不是需要公用一个连接,插入到一个存储空间内呢?这个过程GeoMesa是利用了在本地的一个缓存LocalCache来进行控制的。

在我们写入SimpleFeatureType信息的时候,GeoMesa会利用序列化机制,将SimpleFeatureType里面封装的属性结构信息利用序列化器进行序列化,存储在本地的缓存(localCache)当中。当我们需要写入或者查询数据时,GeoMesa会将本地缓存当中的SimpleFeatureType相关的信息反序列化出来与SimpleFeature当中的SimpleFeatureType信息进行比对。这样就可以保证始终有一个全局的schema来控制表结构的信息。

而对于SimpleFeature的序列化过程主要使用的是Kryo框架,通过将FeatureId和各个属性信息转换成为字节数组,作为Value值存储进RowValue对象当中,最终存储进Hbase当中。

 

2     基本流程

2.1    SimpleFeatureType的序列化

对于SimpleFeatureType的序列化过程其实没有出现在写入SimpleFeature的过程当中,而是当创建FeatureWriter时就进行了创建。如下代码当中,第二行就是创建FeatureWriter的过程。可以看到,此时并没有传入SimpleFeature的参数,仅仅是将SimpleFeatureTypeTypeName和事务的配置传入FeatureWriter对象当中。结合前面创建datastore的过程可以知道,此时SimpleFeatureType的全量信息都已经被传入到了datastore当中,因此可以看出这一步是只与SimpleFeatureType相关的,与SimpleFeature没有关系。

1.     private void writeFeature(DataStore datastore, SimpleFeatureType sft, SimpleFeature feature) throws IOException {  

2.             try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer =  datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT)) {  

3.                 System.out.println("write test data");  

4.                 SimpleFeature toWrite = writer.next();  

5.                 toWrite.setAttributes(feature.getAttributes());  

6.                 if (toWrite.getIdentifier() instanceof FeatureIdImpl) {  

7.                     ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());  

8.                 }  

9.                 toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);  

10.               toWrite.getUserData().putAll(feature.getUserData());  

11.               writer.write();  

12.           }  

13.     

14.       }  

接下来就是整个对于SimpleFeatureType的序列化过程:

1.    序列化的引入阶段

2.    序列化参数的配置

3.    构造序列化器

2.1.1    序列化的引入阶段

由于序列化过程是非常底层的一个过程,因此往往会有很深的调用过程,在此对这个过程进行简单的介绍。首先从Demo的代码当中可以看出,我们调用的是org.locationtech.geomesa.index.geotools.GeoMesaDataStore类中的getFeatureWriterAppend方法。

1.     override def getFeatureWriterAppend(typeName: String, transaction: Transaction): FlushableFeatureWriter = {  

2.         val sft = getSchema(typeName)  

3.         if (sft == null) {  

4.           throw new IOException(s"Schema '$typeName' has not been initialized. Please call 'createSchema' first.")  

5.         }  

6.         if (transaction != Transaction.AUTO_COMMIT) {  

7.           logger.warn("Ignoring transaction - not supported")  

8.         }  

9.        featureWriterFactory.createFeatureWriter(sft, manager.indices(sft, mode = IndexMode.Write), None)  

10.     }  

这个方法主要完成了对schema和事务的操作,最后将这些操作的结果封装在FeatureWirter对象当中回传给用户。其中与我们序列化过程有关的是第9行的代码,这一行调用了org.locationtech.geomesa.hbase.data.HBaseFeatureWriter.HBaseFeatureWriterFactory类中的createFeatureWriter方法。

1.     override def createFeatureWriter(sft: SimpleFeatureType,  

2.                                          indices: Seq[HBaseFeatureIndexType],  

3.                                          filter: Option[Filter]): FlushableFeatureWriter = {  

4.           (TablePartition(ds, sft), filter) match {  

5.             case (None, None) =>  

6.               new HBaseFeatureWriter(sft, ds, indices, nullnull)  

7.                   with HBaseTableFeatureWriterType with HBaseAppendFeatureWriterType  

8.       

9.             case (None, Some(f)) =>  

10.             new HBaseFeatureWriter(sft, ds, indices, f, null)  

11.                 with HBaseTableFeatureWriterType with HBaseModifyFeatureWriterType  

12.     

13.           case (Some(p), None) =>  

14.             new HBaseFeatureWriter(sft, ds, indices, null, p)  

15.                 with HBasePartitionedFeatureWriterType with HBaseAppendFeatureWriterType  

16.     

17.           case (Some(p), Some(f)) =>  

18.             new HBaseFeatureWriter(sft, ds, indices, f, p)  

19.                 with HBasePartitionedFeatureWriterType with HBaseModifyFeatureWriterType  

20.         }  

在这里我们可以看到,GeoMesa在此进行了一个匹配的过程,但是每一个分支的结果都是创建了一个HbaseFeatureWriter对象。

1.     abstract class HBaseFeatureWriter(val sft: SimpleFeatureType,  

2.                                       val ds: HBaseDataStore,  

3.                                       val indices: Seq[HBaseFeatureIndexType],  

4.                                       val filter: Filter,  

5.                                       val partition: TablePartition) extends HBaseFeatureWriterType {  

6.       

7.       private val wrapper = HBaseFeature.wrapper(sft)  

在创建HbaseFeatureWriter对象时,我们可以在这个类当中看到,在这个对象当中有一个参数wrapper,调用 org.locationtech.geomesa.hbase.data.HBaseFeature伴生对象当中的wrapper方法。

1.     def wrapper(sft: SimpleFeatureType): (SimpleFeature) => HBaseFeature = {  

2.         val serializers = HBaseColumnGroups.serializers(sft)  

3.         val idSerializer = GeoMesaFeatureIndex.idToBytes(sft)  

4.         (feature) => new HBaseFeature(feature, serializers, idSerializer)  

5.       }  

在这个方法中,才真正开始根据Hbase这种datastore与序列化器进行匹配,从上述代码的第2行可以看出调用了org.locationtech.geomesa.index.conf.ColumnGroups特性当中的serializers方法。

1.     def serializers(sft: SimpleFeatureType): Seq[(T, SimpleFeatureSerializer)] = {  

2.         apply(sft).map { case (colFamily, subset) =>  

3.           if (colFamily.eq(default)) {  

4.             (colFamily, KryoFeatureSerializer(subset, SerializationOptions.withoutId))  

5.           } else {  

6.             (colFamily, new ProjectingKryoFeatureSerializer(sft, subset, SerializationOptions.withoutId)) 

7.           }  

8.         }  

9.       }  

经过了上述这些复杂的调用过程,才真正进入到了对于SimpleFeatureType的序列化过程。

2.1.2    序列化参数的配置

接着上文的方法调用过程,接下来实现序列化的是org.locationtech.geomesa.index.conf.ColumnGroups伴生对象的apply方法。

1.     def apply(sft: SimpleFeatureType): Seq[(T, SimpleFeatureType)] = {  

2.         val key = CacheKeyGenerator.cacheKey(sft)  

3.         var groups = cache.getIfPresent(key)  

4.         if (groups == null) {  

5.           val map = scala.collection.mutable.Map.empty[String, SimpleFeatureTypeBuilder]  

6.       

7.           sft.getAttributeDescriptors.asScala.foreach { descriptor =>  

8.             descriptor.getColumnGroups().foreach { group =>  

9.               map.getOrElseUpdate(group, new SimpleFeatureTypeBuilder()).add(descriptor)  

10.           }  

11.         }  

12.     

13.         val sfts = map.map { case (group, builder) =>  

14.           builder.setName(sft.getTypeName)  

15.           val subset = builder.buildFeatureType()  

16.           subset.getUserData.putAll(sft.getUserData)  

17.           (convert(group), subset)  

18.         } + (default -> sft)  

19.     

20.         // return the smallest groups first  

21.         groups = sfts.toSeq.sortBy(_._2.getAttributeCount)  

22.     

23.         cache.put(key, groups)  

24.       }  

25.       groups  

26.     }  

从这段代码可以看出,这个方法主要分为三个部分,一开始进行了各种参数的准备,之后就是对于这些参数的封装过程,最后将这些封装好的参数放入缓存当中。

首先,在参数准备阶段,程序对于传入的key值进行了判断,此处的key不是全部的schema信息,而是SimpleFeatureType的名称以及各个字段的信息。例如在demo当中,此处的key值内容是“index-text02;taxiId:String,dtg:Date,geom:Point,description:String”。

然后就是根据这个key值从缓存当中获取相应的对象,这个过程会调用com.github.benmanes.caffeine.cache.LocalManualCache类中的getIfPresent方法。

1.     @Override  

2.       default @Nullable V getIfPresent(Object key) {  

3.         return cache().getIfPresent(key, /* recordStats */ true);  

4.       }  

进一步,程序会调用com.github.benmanes.caffeine.cache.BoundedLocalCache类中的getIfPresent方法。

1.     @Override  

2.       public V getIfPresent(Object key, boolean recordStats) {  

3.         Node<K, V> node = data.get(nodeFactory.newLookupKey(key));  

4.         if (node == null) {  

5.           if (recordStats) {  

6.             statsCounter().recordMisses(1);  

7.           }  

8.           return null;  

9.         }  

10.       long now = expirationTicker().read();  

11.       if (hasExpired(node, now)) {  

12.         if (recordStats) {  

13.           statsCounter().recordMisses(1);  

14.         }  

15.         scheduleDrainBuffers();  

16.         return null;  

17.       }  

18.     

19.       @SuppressWarnings("unchecked")  

20.       K castedKey = (K) key;  

21.       V value = node.getValue();  

22.     

23.       if (!isComputingAsync(node)) {  

24.         setVariableTime(node, expireAfterRead(node, castedKey, value, now));  

25.         setAccessTime(node, now);  

26.       }  

27.       afterRead(node, now, recordStats);  

28.       return value;  

29.     }  

之后返回apply方法以后,程序会判断获取到的对象是不是空值,如果获取到的是个空值,说明在缓存当中是没有key值对应的信息的,这个时候就会根据传入的SimpleFeatureType进行一些操作。

1.         if (groups == null) {  

2.           val map = scala.collection.mutable.Map.empty[String, SimpleFeatureTypeBuilder]  

3.       

4.           sft.getAttributeDescriptors.asScala.foreach { descriptor =>  

5.             descriptor.getColumnGroups().foreach { group =>  

6.               map.getOrElseUpdate(group, new SimpleFeatureTypeBuilder()).add(descriptor)  

7.             }  

8.           }  

参数sfts其实是一个tuple,其中第一个元素就是列族名的字节表达,第二个元素是一个SimpleFeatureTypeImpl,也就是SimpleFeatureType的实现类的对象。之后这个tuple就会被封装进groups内。这个groups最终会被放进缓存当中。

1.           val sfts = map.map { case (group, builder) =>  

2.             builder.setName(sft.getTypeName)  

3.             val subset = builder.buildFeatureType()  

4.             subset.getUserData.putAll(sft.getUserData)  

5.             (convert(group), subset)  

6.           } + (default -> sft)  

7.       

8.           // return the smallest groups first  

9.           groups = sfts.toSeq.sortBy(_._2.getAttributeCount)  

10.     

11.        cache.put(key, groups)  

2.1.3    构造序列化器

接下来进行序列化器的构造,在这里有四种序列化器,分别是可变的延迟加载序列化器、可变的活跃序列化器、不变的延迟加载序列化器、不变的活跃序列化器。调用的是org.locationtech.geomesa.features.kryo.KryoFeatureSerializer伴生对象的apply方法,可以看出最终创建了不同的序列化器对象。

1.     def apply(sft: SimpleFeatureType, options: Set[SerializationOption] = Set.empty): KryoFeatureSerializer = {  

2.         (options.immutable, options.isLazy) match {  

3.           case (true,  true)  => new ImmutableLazySerializer(sft, options)  

4.           case (true,  false) => new ImmutableActiveSerializer(sft, options)  

5.           case (false, true)  => new MutableLazySerializer(sft, options)  

6.           case (false, false) => new MutableActiveSerializer(sft, options)  

7.         }  

8.       }  

 

2.2    SimpleFeature的序列化

具体负责将SimpleFeature进行序列化的过程在org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类的writeFeature方法中实现,如下:

1.     private def writeFeature(sf: SimpleFeature, output: Output): Unit = {  

2.         val offsets = KryoFeatureSerialization.getOffsets(cacheKey, writers.length)  

3.         val offset = output.position()  

4.         output.writeInt(VERSION, true)  

5.         output.setPosition(offset + 5) // leave 4 bytes to write the offsets  

6.         if (withId) {  

7.           // TODO optimize for uuids?  

8.           output.writeString(sf.getID)  

9.         }  

10.       // write attributes and keep track off offset into byte array  

11.       var i = 0  

12.       while (i < writers.length) {  

13.         offsets(i) = output.position() - offset  

14.         writers(i)(output, sf.getAttribute(i))  

15.         i += 1  

16.       }  

17.       // write the offsets - variable width  

18.       i = 0  

19.       val offsetStart = output.position() - offset  

20.       while (i < writers.length) {  

21.         output.writeInt(offsets(i), true)  

22.         i += 1  

23.       }  

24.       // got back and write the start position for the offsets  

25.       val end = output.position()  

26.       output.setPosition(offset + 1)  

27.       output.writeInt(offsetStart)  

28.       // reset the position back to the end of the buffer so the bytes aren't lost, and we can keep writing user data  

29.       output.setPosition(end)  

30.     

31.       if (withUserData) {  

32.         KryoUserDataSerialization.serialize(output, sf.getUserData)  

33.       }  

34.     }  

从这段代码可以看出,SimpleFeature的序列化过程可以分成如下几个部分:

1.    序列化版本号和FeatureId(第2行到第9行)

2.    序列化各个属性值(第10行到第16行)

3.    序列化属性的偏移量(第17行到第29行)

4.    序列化用户设置的参数(第31行到第33行)

接下来就对各个部分进行介绍。

2.2.1    序列化版本号和FeatureId

首先,GeoMesa会创建一个容量为1024的字节数组 ,会将此时的版本号进行序列化以后放在第一个位置。这个地方用到了Kryo框架当中的可变长机制,虽然数据类型是Int类型,正常应该占用4个字节的长度,但是在此处,程序会对这个数据进行判断,如果这个数据是否能用少量的字节数表达,将冗余的位数进行忽略。例如:在GeoMesa当中默认的版本号是2,如果按照通常的方式,需要四个字节来进行存储:00000000 00000000 00000000 00000010,而在此处由于这种机制的存在,只需要用一个字节来进行存储:00000010,这样就能够大量的节省存储资源。

接着,缓冲区的游标会向后调整5位,因为在value设计时,从第2位到第5位预留出来,用来存储各个属性的偏移量的数据。从第6位开始写入FeatureId。结合此处的源码,可以看出FeatureId本身的序列化使用的是原生的针对String类型的序列化机制。

1.         val offsets = KryoFeatureSerialization.getOffsets(cacheKey, writers.length)  

2.         val offset = output.position()  

3.        output.writeInt(VERSION, true)  

4.         output.setPosition(offset + 5) // leave 4 bytes to write the offsets  

5.         if (withId) {  

6.           // TODO optimize for uuids?  

7.          output.writeString(sf.getID)  

8.         }  

2.2.2    序列化各个属性值

接下来,系统开始对SimpleFeature当中的属性进行遍历,将这些属性分别进行序列化。

1.         var i = 0  

2.         while (i < writers.length) {  

3.           offsets(i) = output.position() - offset  

4.           writers(i)(output, sf.getAttribute(i))  

5.           i += 1  

6.         }  

在遍历的过程当中,对于每一个属性值,根据这些属性的不同类型,GeoMesa会进行类型匹配,分配给不同的序列化器来进行序列化。具体的过程实现是org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类中的matchWriter方法

1.     private [geomesa] def matchWriter(bindings: Seq[ObjectType], descriptor: AttributeDescriptor): (Output, AnyRef) => Unit = {  

2.         import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor  

3.         bindings.head match {  

4.           case ObjectType.STRING =>  

5.             (o: Output, v: AnyRef) => o.writeString(v.asInstanceOf[String]) // write string supports nulls  

6.           case ObjectType.INT =>  

7.             val w = (o: Output, v: AnyRef) => o.writeInt(v.asInstanceOf[Int])  

8.             writeNullable(w)  

9.           case ObjectType.LONG =>  

10.           val w = (o: Output, v: AnyRef) => o.writeLong(v.asInstanceOf[Long])  

11.           writeNullable(w)  

12.         case ObjectType.FLOAT =>  

13.           val w = (o: Output, v: AnyRef) => o.writeFloat(v.asInstanceOf[Float])  

14.           writeNullable(w)  

15.         case ObjectType.DOUBLE =>  

16.           val w = (o: Output, v: AnyRef) => o.writeDouble(v.asInstanceOf[Double])  

17.           writeNullable(w)  

18.         case ObjectType.BOOLEAN =>  

19.           val w = (o: Output, v: AnyRef) => o.writeBoolean(v.asInstanceOf[Boolean])  

20.           writeNullable(w)  

21.         case ObjectType.DATE =>  

22.           val w = (o: Output, v: AnyRef) => o.writeLong(v.asInstanceOf[Date].getTime)  

23.           writeNullable(w)  

24.         case ObjectType.UUID =>  

25.           val w = (o: Output, v: AnyRef) => {  

26.             val uuid = v.asInstanceOf[UUID]  

27.             o.writeLong(uuid.getMostSignificantBits)  

28.             o.writeLong(uuid.getLeastSignificantBits)  

29.           }  

30.           writeNullable(w)  

31.         case ObjectType.GEOMETRY =>  

32.           // null checks are handled by geometry serializer  

33.           descriptor.getPrecision match {  

34.             case GeometryPrecision.FullPrecision =>  

35.               (o: Output, v: AnyRef) => KryoGeometrySerialization.serializeWkb(o, v.asInstanceOf[Geometry])  

36.             case precision: GeometryPrecision.TwkbPrecision =>  

37.               (o: Output, v: AnyRef) => KryoGeometrySerialization.serialize(o, v.asInstanceOf[Geometry], precision)  

38.           }  

39.         case ObjectType.JSON =>  

40.           (o: Output, v: AnyRef) => KryoJsonSerialization.serialize(o, v.asInstanceOf[String])  

41.         case ObjectType.LIST =>  

42.           val valueWriter = matchWriter(bindings.drop(1), descriptor)  

43.           (o: Output, v: AnyRef) => {  

44.             val list = v.asInstanceOf[java.util.List[AnyRef]]  

45.             if (list == null) {  

46.               o.writeInt(-1, true)  

47.             } else {  

48.               o.writeInt(list.size(), true)  

49.               val iter = list.iterator()  

50.               while (iter.hasNext) {  

51.                 valueWriter(o, iter.next())  

52.               }  

53.             }  

54.           }  

55.         case ObjectType.MAP =>  

56.           val keyWriter = matchWriter(bindings.slice(1, 2), descriptor)  

57.           val valueWriter = matchWriter(bindings.drop(2), descriptor)  

58.           (o: Output, v: AnyRef) => {  

59.             val map = v.asInstanceOf[java.util.Map[AnyRef, AnyRef]]  

60.             if (map == null) {  

61.               o.writeInt(-1, true)  

62.             } else {  

63.               o.writeInt(map.size(), true)  

64.               val iter = map.entrySet.iterator()  

65.               while (iter.hasNext) {  

66.                 val entry = iter.next()  

67.                 keyWriter(o, entry.getKey)  

68.                 valueWriter(o, entry.getValue)  

69.               }  

70.             }  

71.           }  

72.         case ObjectType.BYTES =>  

73.           (o: Output, v: AnyRef) => {  

74.             val arr = v.asInstanceOf[Array[Byte]]  

75.             if (arr == null) {  

76.               o.writeInt(-1, true)  

77.             } else {  

78.               o.writeInt(arr.length, true)  

79.               o.writeBytes(arr)  

80.             }  

81.           }  

82.       }  

83.     }  

从上面的代码可以看出,在进行属性序列化的过程中,程序会首先对数据进行类型匹配,这些类型有基本数据类型:StringIntFloatDoubleBoolean,还有特殊数据类型:DateGeometryUUIDJson,还有集合数据类型:ListMap。由于大部分数据类型的序列化方式比较简单,在此不再赘述。其中比较特殊的是Geometry的序列化方式比较特殊,在此对地理信息的数据类型进行详细介绍。

在前面的matchWriter方法当中,可以看出在进行类型匹配时,对于地理相关的信息,统一归类为Geometry类型。如果我们需要序列化的数据为PointLinestring等等这些具体的地理类型,则由org.locationtech.geomesa.features.serialization.WkbSerialization类中的serializeWkb方法来进行进一步的类型匹配。

1.     def serializeWkb(out: T, geometry: Geometry): Unit = {  

2.         if (geometry == null) { out.writeByte(NULL_BYTE) } else {  

3.           out.writeByte(NOT_NULL_BYTE)  

4.           geometry match {  

5.             case g: Point              => writePoint(out, g)  

6.             case g: LineString         => writeLineString(out, g)  

7.             case g: Polygon            => writePolygon(out, g)  

8.             case g: MultiPoint         => writeGeometryCollection(out, WkbSerialization.MultiPoint, g)  

9.             case g: MultiLineString    => writeGeometryCollection(out, WkbSerialization.MultiLineString, g)  

10.           case g: MultiPolygon       => writeGeometryCollection(out, WkbSerialization.MultiPolygon, g)  

11.           case g: GeometryCollection => writeGeometryCollection(out, WkbSerialization.GeometryCollection, g)  

12.         }  

13.       }  

14.     }  

在上述代码当中,可以看出,GeoMesa对于地理信息类型又分为PointLineStringPolygonMultiPointMultiLineStringMultiPolygonGeometrCollection这些类型。在进一步的序列化过程中,针对不同的类型,有不同的序列化方法与之对应。这些方法都存在于org.locationtech.geomesa.features.serialization.WkbSerialization类中。为了介绍简便,在此以writePoint方法为例。

1.     private def writePoint(out: T, g: Point): Unit = {  

2.         val coords = g.getCoordinateSequence  

3.         val (flag, writeDims) = if (coords.getDimension == 2) { (Point2d, false) } else { (Point, true) }  

4.         out.writeInt(flag, optimizePositive = true)  

5.         writeCoordinateSequence(out, coords, writeLength = false, writeDims)  

6.       }  

首先,GeoMesa会对point类型数据进行转化,调用org.locationtech.jts.geom.Point类中的getCoordinateSequence方法,将PointWKT形式转换为Coordinate对象。例如:原来的“POINT (116.31412 39.89454)”转换为coordinates=” ((116.31412, 39.89454, NaN))”。其中NaN其实表示的是高度信息,由于在此处没有高度信息,因此显示为NaN

1.     public CoordinateSequence getCoordinateSequence() {  

2.         return coordinates;  

3.       }  

返回到writePoint方法中,接下来会对点的维度进行判断,如果是二维的点,就会设定flag参数为1,如果是多维的点,就会设定flag参数为8

1.         val (flag, writeDims) = if (coords.getDimension == 2) { (Point2d, false) } else { (Point, true) }  

在这段代码当中可以看到程序对flag进行了赋值,这个值就是关于地理信息的参数,这些关于点的参数配置在org.locationtech.geomesa.features.serialization.WkbSerialization当中。

1.     object WkbSerialization {  

2.       

3.       // 2-d values - corresponds to org.locationtech.jts.io.WKBConstants  

4.       val Point2d: Int            = 1  

5.       val LineString2d: Int       = 2  

6.       val Polygon2d: Int          = 3  

7.       

8.       val MultiPoint: Int         = 4  

9.       val MultiLineString: Int    = 5  

10.     val MultiPolygon: Int       = 6  

11.     val GeometryCollection: Int = 7  

12.     

13.     // n-dimensional values  

14.     val Point: Int              = 8  

15.     val LineString: Int         = 9  

16.     val Polygon: Int            = 10  

17.   }  

最后对于经纬度数据的序列化过程在WkbSerialization类的writeCoordinateSequence方法当中。下列代码的第13行到21行会对点数据内部的参数进行遍历,根据点的维度来控制遍历的次数,最后分维度来将不同维度的数据进行序列化。

1.     private def writeCoordinateSequence(out: T,  

2.                                           coords: CoordinateSequence,  

3.                                           writeLength: Boolean,  

4.                                           writeDimensions: Boolean): Unit = {  

5.         val dims = coords.getDimension  

6.         if (writeLength) {  

7.           out.writeInt(coords.size(), optimizePositive = true)  

8.         }  

9.         if (writeDimensions) {  

10.         out.writeInt(dims, optimizePositive = true)  

11.       }  

12.       var i = 0  

13.       while (i < coords.size()) {  

14.         val coord = coords.getCoordinate(i)  

15.         var j = 0  

16.         while (j < dims) {  

17.          out.writeDouble(coord.getOrdinate(j))  

18.           j += 1  

19.         }  

20.         i += 1  

21.       }  

22.     }  

接下来,返回到writeFeature方法中,进行下一步的操作。

2.2.3    序列化属性的偏移量

当完成各个属性值的序列化以后,系统虽然已经将各个属性值序列化成字节数组,但是它现在并不知道每个属性值都是从什么位置开始序列化的。为了解决这个问题,回到之前的writeFeature方法之后,GeoMesa开始对每个属性的偏移量进行序列化。具体实现过程如下:

1.         i = 0  

2.         val offsetStart = output.position() - offset  

3.         while (i < writers.length) {  

4.           output.writeInt(offsets(i), true)  

5.           i += 1  

6.         }  

7.         val end = output.position()  

8.         output.setPosition(offset + 1)  

9.         output.writeInt(offsetStart)  

10.       output.setPosition(end)  

2.2.4    序列化用户设置的参数

在很多情况下,用户都需要对SimpleFeature进行一系列的设置,这些设置一般都会存在于SimpleFeature对象中的userData属性当中。在序列化的过程中,如果SimpleFeature对象当中存在这些用户设置的信息,我们同样需要对这些信息进行序列化。

1.         if (withUserData) {  

2.           KryoUserDataSerialization.serialize(output, sf.getUserData)  

3.         }  

此处会调用针对UserData的序列化方法,具体的实现存在于org.locationtech.geomesa.features.kryo.serialization.KryoUserDataSerialization类的serialize方法当中。

1.     override def serialize(out: Output, javaMap: java.util.Map[AnyRef, AnyRef]): Unit = {  

2.         import scala.collection.JavaConverters._  

3.       

4.         val map = javaMap.asScala  

5.       

6.         // may not be able to write all entries - must pre-filter to know correct count  

7.         val skip = new java.util.HashSet[AnyRef]()  

8.         map.foreach { case (k, _) => if (k == null || !canSerialize(k)) { skip.add(k) } }  

9.       

10.       val toWrite = if (skip.isEmpty) { map } else {  

11.         logger.warn(s"Skipping serialization of entries: " +  

12.             map.collect { case (k, v) if skip.contains(k) => s"$k->$v" }.mkString("[""],[""]"))  

13.         map.filterNot { case (k, _) => skip.contains(k) }  

14.       }  

15.     

16.       out.writeInt(toWrite.size) // don't use positive optimized version for back compatibility  

17.     

18.       toWrite.foreach { case (key, value) =>  

19.         out.writeString(baseClassMappings.getOrElse(key.getClass, key.getClass.getName))  

20.         write(out, key)  

21.         if (value == null) {  

22.           out.writeString(nullMapping)  

23.         } else {  

24.           out.writeString(baseClassMappings.getOrElse(value.getClass, value.getClass.getName))  

25.           write(out, value)  

26.         }  

27.       }  

28.     }  

第三章   异常处理

第四章   系统特性

第五章   思考和总结

共0条评论