第一章
各种序列化机制
第一节
Kryo序列化机制
1
Kryo概述
Kryo是一种快速高效的Java对象图(Object graph)序列化框架。该项目的目标是打造速度、效率和易于使用的API。当对象需要持久化时,无论是用于文件、数据库还是通过网络,该项目都很有用。
Kryo还可以执行自动深层浅层的复制/克隆。这是从对象直接复制到对象,而不是object→bytes→object。
1.1
Java对象图(Object graph)
对于传统的数据模型当中,例如在一个UML图当中,指定的是类之间的关系。而Java对象图指定的是对象之间的关系。
在物理表示上,对象图是有向图,可能是循环的。当存储在内存中时,对象使用其属性和函数表占用内存的不同段,而关系由高级语言中的指针或者不同类型的全局程序来进行处理。
比如说,一个Car类当中由Wheel类组成,在UML图当中,表示为Car→Wheel,指定的仅仅是类之间的关系。而在对象图当中,由于一辆车会有四只轮子,可以标定这四个Wheel对象为frontLeft,frontRight,backLeft和backRight。邻接列表就能够表示为c:Car→{frontLeft:Wheel,frontRight:Wheel,backLeft:Wheel,backRight:Wheel}
1.2
深拷贝(deep clone)和浅拷贝(shallow copy)
浅拷贝只是复制了对象中对其他对象的引用,而没有再复制引用所指向的实际的对象。深拷贝与浅拷贝不同,在进行深拷贝的过程当中,实际上将对象所指向的对象也重新创建了一份。
如下图,现在需要拷贝一个obj1对象,它包含了两个对象:containedObj1和containedObj2。针对不同的场景,可能会采用不同的拷贝方案。
对于浅拷贝来说,我们需要做的就是创建一个新的obj1对象,然后将这个新对象的引用指向原有的obj1对象所属的containedObj1对象和containedObj2对象。而在java当中实现浅拷贝可以通过clone方法来实现。
而对于深拷贝来说,我们需要做的首先是创建一个obj2对象,然后创建它的所属对象containedObj3对象和containedObj4对象,最后将obj2对象的引用指向这两个新创建的子对象。在java当中实现深拷贝主要是通过序列化和反序列化来实现的,所以实现深拷贝的前提是对象一定是能够被序列化的,这一点很重要。
这两种拷贝方式虽然生成的原理是不同的,但是在效果上有很多共同点:
1.
浅拷贝和深拷贝都是通过复制创建了一个新的对象,也就是 x.clone() != x
2.
浅拷贝和深拷贝的对象所属的类是一致的,也就是 x.clone().getClass() == x.getClass()
3.
浅拷贝和深拷贝的对象在某些情况下, 也就是从内容角度来说,它所包含的内容是一致的 x.clone().equals(x)
2
Kryo特性
2.1
可变长度编码
IO类提供了读取和写入变长int(varInt)和long(varLong)值的方法。这是通过使用每个字节的第8位来指示是否使用更多的字节来完成序列化的过程,这意味着varInt使用1-5个字节来表示int类型数据,而varLong使用1-9个字节来表示long类型的数据。使用可变长度编码会造成更大的系统开销,但是序列化的数据会比java原生的序列化机制小很多。
在写入可变长编码时,可以针对正值或负值来优化该值。例如,如果进行了正值优化,一个字节就能够表示从0到127的所有数值,两个字节就能够表示从128到16383中的所有数字。但是如果没有进行正值优化,一个字节就只能表示从-64到63中的所有数字,两个字节就只能够表示从64到8191和从-8192到-65之间的所有数字。因此在实际业务当中,有时没有负数,因此需要根据具体的业务需求进行具体的优化。
在进行可变长度编码也会对输入和输出缓冲区造成一些压力,为了解决这个问题,一些方法允许缓冲区决定是写入固定长度还是可变长度值。
方法
|
描述
|
writeInt(int)
|
写入一个4个字节的int类型
|
writeVarInt(int, boolean)
|
写入一个1-5个字节的int类型
|
writeInt(int, boolean)
|
写入一个4字节或者1-5字节的int类型(这个模式由缓冲区来决定)
|
writeLong(long)
|
写入一个8字节的long类型
|
writeVarLong(long, boolean)
|
写入一个1-9字节的long类型
|
writeLong(long, boolean)
|
写入一个8字节的或者1-9字节的long类型(这个模式由缓冲区来决定)
|
除了上述这些单个数据的序列化方法,kryo也可以对多个数据进行批处理。而这些方法当中,有一些方法也是能够支持可变长编码的。如下表
方法
|
描述
|
writeInts(int[])
|
批量写入4个字节的int类型数据
|
writeInts(int[], boolean)
|
批量写入4个字节或者1-5个字节的int类型(这个模式由缓冲区来决定)
|
writeLongs(long[])
|
批量写入8字节的long类型
|
writeLongs(long[], boolean)
|
批量写入8字节的或者1-9字节的long类型(这个模式由缓冲区来决定)
|
3
Kryo操作方法
3.1
Kryo的安装
如果使用Kryo的官方版本,需要在pom.xml文件当中添加如下代码:
1.
<dependency>
2.
<groupId>com.esotericsoftware</groupId>
3.
<artifactId>kryo</artifactId>
4.
<version>4.0.0</version>
5.
</dependency>
如果由于在类路径当中使用了不同版本的asm而遇到问题,则需要使用如下依赖:
1.
<dependency>
2.
<groupId>com.esotericsoftware</groupId>
3.
<artifactId>kryo-shaded</artifactId>
4.
<version>4.0.0</version>
5.
</dependency>
如果要测试kryo的最新快照,需要在pom.xml中添加如下代码段:
1.
<repository>
2.
<id>sonatype-snapshots</id>
3.
<name>sonatype snapshots repo</name>
4.
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
5.
</repository>
6.
7.
<dependency>
8.
<groupId>com.esotericsoftware</groupId>
9.
<artifactId>kryo</artifactId>
10.
<version>4.0.1-SNAPSHOT</version>
11.
</dependency>
如果您使用没有 Maven 的 Kryo,请注意
Kryo jar 文件有几个外部依赖关系,它们的JAR也需要添加到类路径中。这些依赖关系是 MinLog logging
library 和 Objenesis
library。
1.
<repository>
2.
<groupId>com.esotericsoftware</groupId>
3.
<artifactId>minlog</artifactId>
4.
<version>1.3.2-SNAPSHOT</version>
5.
</dependency>
4
Kryo性能测试
1.
kryo序列化后的字节数组是:
2.
1 40 0 1 -125 -27 -68 -96 -28 -72 -119
3.
4.
java序列化后的字节数组是:
5.
-84 -19 0 5 115 114 0 28 99 111 109 46 106 100 46 105 99 105 116 121 46 100 109 112 46 107 114 121 111 46 80 101 114 115 111 110 -69 -52 107 -91 16 46 117 81 2 0 3 73 0 3 97 103 101 76 0 3 109 97 112 116 0 15 76 106 97 118 97 47 117 116 105 108 47 77 97 112 59 76 0 4 110 97 109 101 116 0 18 76 106 97 118 97 47 108 97 110 103 47 83 116 114 105 110 103 59 120 112 0 0 0 20 112 116 0 6 -27 -68 -96 -28 -72 -119
第二章
GeoMesa序列化机制
第一节
概述
在GeoMesa当中,由于大量需要与内存交互以及网络传输,因此序列化的过程也就必不可少了。与很多大数据的框架一样,鉴于Java原生的序列化机制会造成大量的数据冗余,而且序列化的时间也比较长,因此GeoMesa同样采用了一些序列化框架来解决这个问题。
对于序列化框架的选择,原生的GeoMesa采用了两种选择,一种是avro框架,这个框架是hadoop生态的很多框架所通用的一种框架,因此在与hdfs、hbase进行网络通信的时候会有比较好的兼容性;另一种是kryo框架,也是GeoMesa默认的一种对value值进行序列化的一种框架。
1.
object SerializationType extends Enumeration {
2.
type SerializationType = Value
3.
val KRYO: Value = Value("kryo")
4.
val AVRO: Value = Value("avro")
5.
}
对于数据进行序列化的过程主要分为两大部分:对于SimpleFeatureType的序列化过程、对SimpleFeature的序列化过程。
第二节
GeoMesa的value结构
第三节
GeoMesa序列化流程
首先,在利用GeoMesa进行数据的写入和查询的过程当中,一般直接操作的都是SimpleFeature对象。那么如何判定这些SimpleFeature对象是不是需要公用一个连接,插入到一个存储空间内呢?这个过程GeoMesa是利用了在本地的一个缓存LocalCache来进行控制的。
在我们写入SimpleFeatureType信息的时候,GeoMesa会利用序列化机制,将SimpleFeatureType里面封装的属性结构信息利用序列化器进行序列化,存储在本地的缓存(localCache)当中。当我们需要写入或者查询数据时,GeoMesa会将本地缓存当中的SimpleFeatureType相关的信息反序列化出来与SimpleFeature当中的SimpleFeatureType信息进行比对。这样就可以保证始终有一个全局的schema来控制表结构的信息。
而对于SimpleFeature的序列化过程主要使用的是Kryo框架,通过将FeatureId和各个属性信息转换成为字节数组,作为Value值存储进RowValue对象当中,最终存储进Hbase当中。
2
基本流程
2.1
SimpleFeatureType的序列化
对于SimpleFeatureType的序列化过程其实没有出现在写入SimpleFeature的过程当中,而是当创建FeatureWriter时就进行了创建。如下代码当中,第二行就是创建FeatureWriter的过程。可以看到,此时并没有传入SimpleFeature的参数,仅仅是将SimpleFeatureType的TypeName和事务的配置传入FeatureWriter对象当中。结合前面创建datastore的过程可以知道,此时SimpleFeatureType的全量信息都已经被传入到了datastore当中,因此可以看出这一步是只与SimpleFeatureType相关的,与SimpleFeature没有关系。
1.
private void writeFeature(DataStore datastore, SimpleFeatureType sft, SimpleFeature feature) throws IOException {
2.
try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer = datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT)) {
3.
System.out.println("write test data");
4.
SimpleFeature toWrite = writer.next();
5.
toWrite.setAttributes(feature.getAttributes());
6.
if (toWrite.getIdentifier() instanceof FeatureIdImpl) {
7.
((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
8.
}
9.
toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
10.
toWrite.getUserData().putAll(feature.getUserData());
11.
writer.write();
12.
}
13.
14.
}
接下来就是整个对于SimpleFeatureType的序列化过程:
1. 序列化的引入阶段
2. 序列化参数的配置
3. 构造序列化器
2.1.1
序列化的引入阶段
由于序列化过程是非常底层的一个过程,因此往往会有很深的调用过程,在此对这个过程进行简单的介绍。首先从Demo的代码当中可以看出,我们调用的是org.locationtech.geomesa.index.geotools.GeoMesaDataStore类中的getFeatureWriterAppend方法。
1.
override def getFeatureWriterAppend(typeName: String, transaction: Transaction): FlushableFeatureWriter = {
2.
val sft = getSchema(typeName)
3.
if (sft == null) {
4.
throw new IOException(s"Schema '$typeName' has not been initialized. Please call 'createSchema' first.")
5.
}
6.
if (transaction != Transaction.AUTO_COMMIT) {
7.
logger.warn("Ignoring transaction - not supported")
8.
}
9.
featureWriterFactory.createFeatureWriter(sft, manager.indices(sft, mode = IndexMode.Write), None)
10.
}
这个方法主要完成了对schema和事务的操作,最后将这些操作的结果封装在FeatureWirter对象当中回传给用户。其中与我们序列化过程有关的是第9行的代码,这一行调用了org.locationtech.geomesa.hbase.data.HBaseFeatureWriter.HBaseFeatureWriterFactory类中的createFeatureWriter方法。
1.
override def createFeatureWriter(sft: SimpleFeatureType,
2.
indices: Seq[HBaseFeatureIndexType],
3.
filter: Option[Filter]): FlushableFeatureWriter = {
4.
(TablePartition(ds, sft), filter) match {
5.
case (None, None) =>
6.
new HBaseFeatureWriter(sft, ds, indices, null, null)
7.
with HBaseTableFeatureWriterType with HBaseAppendFeatureWriterType
8.
9.
case (None, Some(f)) =>
10.
new HBaseFeatureWriter(sft, ds, indices, f, null)
11.
with HBaseTableFeatureWriterType with HBaseModifyFeatureWriterType
12.
13.
case (Some(p), None) =>
14.
new HBaseFeatureWriter(sft, ds, indices, null, p)
15.
with HBasePartitionedFeatureWriterType with HBaseAppendFeatureWriterType
16.
17.
case (Some(p), Some(f)) =>
18.
new HBaseFeatureWriter(sft, ds, indices, f, p)
19.
with HBasePartitionedFeatureWriterType with HBaseModifyFeatureWriterType
20.
}
在这里我们可以看到,GeoMesa在此进行了一个匹配的过程,但是每一个分支的结果都是创建了一个HbaseFeatureWriter对象。
1.
abstract class HBaseFeatureWriter(val sft: SimpleFeatureType,
2.
val ds: HBaseDataStore,
3.
val indices: Seq[HBaseFeatureIndexType],
4.
val filter: Filter,
5.
val partition: TablePartition) extends HBaseFeatureWriterType {
6.
7.
private val wrapper = HBaseFeature.wrapper(sft)
在创建HbaseFeatureWriter对象时,我们可以在这个类当中看到,在这个对象当中有一个参数wrapper,调用
org.locationtech.geomesa.hbase.data.HBaseFeature伴生对象当中的wrapper方法。
1.
def wrapper(sft: SimpleFeatureType): (SimpleFeature) => HBaseFeature = {
2.
val serializers = HBaseColumnGroups.serializers(sft)
3.
val idSerializer = GeoMesaFeatureIndex.idToBytes(sft)
4.
(feature) => new HBaseFeature(feature, serializers, idSerializer)
5.
}
在这个方法中,才真正开始根据Hbase这种datastore与序列化器进行匹配,从上述代码的第2行可以看出调用了org.locationtech.geomesa.index.conf.ColumnGroups特性当中的serializers方法。
1.
def serializers(sft: SimpleFeatureType): Seq[(T, SimpleFeatureSerializer)] = {
2.
apply(sft).map { case (colFamily, subset) =>
3.
if (colFamily.eq(default)) {
4.
(colFamily, KryoFeatureSerializer(subset, SerializationOptions.withoutId))
5.
} else {
6.
(colFamily, new ProjectingKryoFeatureSerializer(sft, subset, SerializationOptions.withoutId))
7.
}
8.
}
9.
}
经过了上述这些复杂的调用过程,才真正进入到了对于SimpleFeatureType的序列化过程。
2.1.2
序列化参数的配置
接着上文的方法调用过程,接下来实现序列化的是org.locationtech.geomesa.index.conf.ColumnGroups伴生对象的apply方法。
1.
def apply(sft: SimpleFeatureType): Seq[(T, SimpleFeatureType)] = {
2.
val key = CacheKeyGenerator.cacheKey(sft)
3.
var groups = cache.getIfPresent(key)
4.
if (groups == null) {
5.
val map = scala.collection.mutable.Map.empty[String, SimpleFeatureTypeBuilder]
6.
7.
sft.getAttributeDescriptors.asScala.foreach { descriptor =>
8.
descriptor.getColumnGroups().foreach { group =>
9.
map.getOrElseUpdate(group, new SimpleFeatureTypeBuilder()).add(descriptor)
10.
}
11.
}
12.
13.
val sfts = map.map { case (group, builder) =>
14.
builder.setName(sft.getTypeName)
15.
val subset = builder.buildFeatureType()
16.
subset.getUserData.putAll(sft.getUserData)
17.
(convert(group), subset)
18.
} + (default -> sft)
19.
20.
// return the smallest groups first
21.
groups = sfts.toSeq.sortBy(_._2.getAttributeCount)
22.
23.
cache.put(key, groups)
24.
}
25.
groups
26.
}
从这段代码可以看出,这个方法主要分为三个部分,一开始进行了各种参数的准备,之后就是对于这些参数的封装过程,最后将这些封装好的参数放入缓存当中。
首先,在参数准备阶段,程序对于传入的key值进行了判断,此处的key不是全部的schema信息,而是SimpleFeatureType的名称以及各个字段的信息。例如在demo当中,此处的key值内容是“index-text02;taxiId:String,dtg:Date,geom:Point,description:String”。
然后就是根据这个key值从缓存当中获取相应的对象,这个过程会调用com.github.benmanes.caffeine.cache.LocalManualCache类中的getIfPresent方法。
1.
@Override
2.
default @Nullable V getIfPresent(Object key) {
3.
return cache().getIfPresent(key, /* recordStats */ true);
4.
}
进一步,程序会调用com.github.benmanes.caffeine.cache.BoundedLocalCache类中的getIfPresent方法。
1.
@Override
2.
public V getIfPresent(Object key, boolean recordStats) {
3.
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
4.
if (node == null) {
5.
if (recordStats) {
6.
statsCounter().recordMisses(1);
7.
}
8.
return null;
9.
}
10.
long now = expirationTicker().read();
11.
if (hasExpired(node, now)) {
12.
if (recordStats) {
13.
statsCounter().recordMisses(1);
14.
}
15.
scheduleDrainBuffers();
16.
return null;
17.
}
18.
19.
@SuppressWarnings("unchecked")
20.
K castedKey = (K) key;
21.
V value = node.getValue();
22.
23.
if (!isComputingAsync(node)) {
24.
setVariableTime(node, expireAfterRead(node, castedKey, value, now));
25.
setAccessTime(node, now);
26.
}
27.
afterRead(node, now, recordStats);
28.
return value;
29.
}
之后返回apply方法以后,程序会判断获取到的对象是不是空值,如果获取到的是个空值,说明在缓存当中是没有key值对应的信息的,这个时候就会根据传入的SimpleFeatureType进行一些操作。
1.
if (groups == null) {
2.
val map = scala.collection.mutable.Map.empty[String, SimpleFeatureTypeBuilder]
3.
4.
sft.getAttributeDescriptors.asScala.foreach { descriptor =>
5.
descriptor.getColumnGroups().foreach { group =>
6.
map.getOrElseUpdate(group, new SimpleFeatureTypeBuilder()).add(descriptor)
7.
}
8.
}
参数sfts其实是一个tuple,其中第一个元素就是列族名的字节表达,第二个元素是一个SimpleFeatureTypeImpl,也就是SimpleFeatureType的实现类的对象。之后这个tuple就会被封装进groups内。这个groups最终会被放进缓存当中。
1.
val sfts = map.map { case (group, builder) =>
2.
builder.setName(sft.getTypeName)
3.
val subset = builder.buildFeatureType()
4.
subset.getUserData.putAll(sft.getUserData)
5.
(convert(group), subset)
6.
} + (default -> sft)
7.
8.
// return the smallest groups first
9.
groups = sfts.toSeq.sortBy(_._2.getAttributeCount)
10.
11. cache.put(key, groups)
2.1.3
构造序列化器
接下来进行序列化器的构造,在这里有四种序列化器,分别是可变的延迟加载序列化器、可变的活跃序列化器、不变的延迟加载序列化器、不变的活跃序列化器。调用的是org.locationtech.geomesa.features.kryo.KryoFeatureSerializer伴生对象的apply方法,可以看出最终创建了不同的序列化器对象。
1.
def apply(sft: SimpleFeatureType, options: Set[SerializationOption] = Set.empty): KryoFeatureSerializer = {
2.
(options.immutable, options.isLazy) match {
3.
case (true, true) => new ImmutableLazySerializer(sft, options)
4.
case (true, false) => new ImmutableActiveSerializer(sft, options)
5.
case (false, true) => new MutableLazySerializer(sft, options)
6.
case (false, false) => new MutableActiveSerializer(sft, options)
7.
}
8.
}
2.2
SimpleFeature的序列化
具体负责将SimpleFeature进行序列化的过程在org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类的writeFeature方法中实现,如下:
1.
private def writeFeature(sf: SimpleFeature, output: Output): Unit = {
2.
val offsets = KryoFeatureSerialization.getOffsets(cacheKey, writers.length)
3.
val offset = output.position()
4.
output.writeInt(VERSION, true)
5.
output.setPosition(offset + 5) // leave 4 bytes to write the offsets
6.
if (withId) {
7.
// TODO optimize for uuids?
8.
output.writeString(sf.getID)
9.
}
10.
// write attributes and keep track off offset into byte array
11.
var i = 0
12.
while (i < writers.length) {
13.
offsets(i) = output.position() - offset
14.
writers(i)(output, sf.getAttribute(i))
15.
i += 1
16.
}
17.
// write the offsets - variable width
18.
i = 0
19.
val offsetStart = output.position() - offset
20.
while (i < writers.length) {
21.
output.writeInt(offsets(i), true)
22.
i += 1
23.
}
24.
// got back and write the start position for the offsets
25.
val end = output.position()
26.
output.setPosition(offset + 1)
27.
output.writeInt(offsetStart)
28.
// reset the position back to the end of the buffer so the bytes aren't lost, and we can keep writing user data
29.
output.setPosition(end)
30.
31.
if (withUserData) {
32.
KryoUserDataSerialization.serialize(output, sf.getUserData)
33.
}
34.
}
从这段代码可以看出,SimpleFeature的序列化过程可以分成如下几个部分:
1. 序列化版本号和FeatureId(第2行到第9行)
2. 序列化各个属性值(第10行到第16行)
3. 序列化属性的偏移量(第17行到第29行)
4. 序列化用户设置的参数(第31行到第33行)
接下来就对各个部分进行介绍。
2.2.1
序列化版本号和FeatureId
首先,GeoMesa会创建一个容量为1024的字节数组
,会将此时的版本号进行序列化以后放在第一个位置。这个地方用到了Kryo框架当中的可变长机制,虽然数据类型是Int类型,正常应该占用4个字节的长度,但是在此处,程序会对这个数据进行判断,如果这个数据是否能用少量的字节数表达,将冗余的位数进行忽略。例如:在GeoMesa当中默认的版本号是2,如果按照通常的方式,需要四个字节来进行存储:00000000 00000000 00000000 00000010,而在此处由于这种机制的存在,只需要用一个字节来进行存储:00000010,这样就能够大量的节省存储资源。
接着,缓冲区的游标会向后调整5位,因为在value设计时,从第2位到第5位预留出来,用来存储各个属性的偏移量的数据。从第6位开始写入FeatureId。结合此处的源码,可以看出FeatureId本身的序列化使用的是原生的针对String类型的序列化机制。
1.
val offsets = KryoFeatureSerialization.getOffsets(cacheKey, writers.length)
2.
val offset = output.position()
3.
output.writeInt(VERSION, true)
4.
output.setPosition(offset + 5) // leave 4 bytes to write the offsets
5.
if (withId) {
6.
// TODO optimize for uuids?
7.
output.writeString(sf.getID)
8.
}
2.2.2
序列化各个属性值
接下来,系统开始对SimpleFeature当中的属性进行遍历,将这些属性分别进行序列化。
1.
var i = 0
2.
while (i < writers.length) {
3.
offsets(i) = output.position() - offset
4.
writers(i)(output, sf.getAttribute(i))
5.
i += 1
6.
}
在遍历的过程当中,对于每一个属性值,根据这些属性的不同类型,GeoMesa会进行类型匹配,分配给不同的序列化器来进行序列化。具体的过程实现是org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类中的matchWriter方法
1.
private [geomesa] def matchWriter(bindings: Seq[ObjectType], descriptor: AttributeDescriptor): (Output, AnyRef) => Unit = {
2.
import org.locationtech.geomesa.utils.geotools.RichAttributeDescriptors.RichAttributeDescriptor
3.
bindings.head match {
4.
case ObjectType.STRING =>
5.
(o: Output, v: AnyRef) => o.writeString(v.asInstanceOf[String]) // write string supports nulls
6.
case ObjectType.INT =>
7.
val w = (o: Output, v: AnyRef) => o.writeInt(v.asInstanceOf[Int])
8.
writeNullable(w)
9.
case ObjectType.LONG =>
10.
val w = (o: Output, v: AnyRef) => o.writeLong(v.asInstanceOf[Long])
11.
writeNullable(w)
12.
case ObjectType.FLOAT =>
13.
val w = (o: Output, v: AnyRef) => o.writeFloat(v.asInstanceOf[Float])
14.
writeNullable(w)
15.
case ObjectType.DOUBLE =>
16.
val w = (o: Output, v: AnyRef) => o.writeDouble(v.asInstanceOf[Double])
17.
writeNullable(w)
18.
case ObjectType.BOOLEAN =>
19.
val w = (o: Output, v: AnyRef) => o.writeBoolean(v.asInstanceOf[Boolean])
20.
writeNullable(w)
21.
case ObjectType.DATE =>
22.
val w = (o: Output, v: AnyRef) => o.writeLong(v.asInstanceOf[Date].getTime)
23.
writeNullable(w)
24.
case ObjectType.UUID =>
25.
val w = (o: Output, v: AnyRef) => {
26.
val uuid = v.asInstanceOf[UUID]
27.
o.writeLong(uuid.getMostSignificantBits)
28.
o.writeLong(uuid.getLeastSignificantBits)
29.
}
30.
writeNullable(w)
31.
case ObjectType.GEOMETRY =>
32.
// null checks are handled by geometry serializer
33.
descriptor.getPrecision match {
34.
case GeometryPrecision.FullPrecision =>
35.
(o: Output, v: AnyRef) => KryoGeometrySerialization.serializeWkb(o, v.asInstanceOf[Geometry])
36.
case precision: GeometryPrecision.TwkbPrecision =>
37.
(o: Output, v: AnyRef) => KryoGeometrySerialization.serialize(o, v.asInstanceOf[Geometry], precision)
38.
}
39.
case ObjectType.JSON =>
40.
(o: Output, v: AnyRef) => KryoJsonSerialization.serialize(o, v.asInstanceOf[String])
41.
case ObjectType.LIST =>
42.
val valueWriter = matchWriter(bindings.drop(1), descriptor)
43.
(o: Output, v: AnyRef) => {
44.
val list = v.asInstanceOf[java.util.List[AnyRef]]
45.
if (list == null) {
46.
o.writeInt(-1, true)
47.
} else {
48.
o.writeInt(list.size(), true)
49.
val iter = list.iterator()
50.
while (iter.hasNext) {
51.
valueWriter(o, iter.next())
52.
}
53.
}
54.
}
55.
case ObjectType.MAP =>
56.
val keyWriter = matchWriter(bindings.slice(1, 2), descriptor)
57.
val valueWriter = matchWriter(bindings.drop(2), descriptor)
58.
(o: Output, v: AnyRef) => {
59.
val map = v.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
60.
if (map == null) {
61.
o.writeInt(-1, true)
62.
} else {
63.
o.writeInt(map.size(), true)
64.
val iter = map.entrySet.iterator()
65.
while (iter.hasNext) {
66.
val entry = iter.next()
67.
keyWriter(o, entry.getKey)
68.
valueWriter(o, entry.getValue)
69.
}
70.
}
71.
}
72.
case ObjectType.BYTES =>
73.
(o: Output, v: AnyRef) => {
74.
val arr = v.asInstanceOf[Array[Byte]]
75.
if (arr == null) {
76.
o.writeInt(-1, true)
77.
} else {
78.
o.writeInt(arr.length, true)
79.
o.writeBytes(arr)
80.
}
81.
}
82.
}
83.
}
从上面的代码可以看出,在进行属性序列化的过程中,程序会首先对数据进行类型匹配,这些类型有基本数据类型:String、Int、Float、Double、Boolean,还有特殊数据类型:Date、Geometry、UUID、Json,还有集合数据类型:List、Map。由于大部分数据类型的序列化方式比较简单,在此不再赘述。其中比较特殊的是Geometry的序列化方式比较特殊,在此对地理信息的数据类型进行详细介绍。
在前面的matchWriter方法当中,可以看出在进行类型匹配时,对于地理相关的信息,统一归类为Geometry类型。如果我们需要序列化的数据为Point、Linestring等等这些具体的地理类型,则由org.locationtech.geomesa.features.serialization.WkbSerialization类中的serializeWkb方法来进行进一步的类型匹配。
1.
def serializeWkb(out: T, geometry: Geometry): Unit = {
2.
if (geometry == null) { out.writeByte(NULL_BYTE) } else {
3.
out.writeByte(NOT_NULL_BYTE)
4.
geometry match {
5.
case g: Point => writePoint(out, g)
6.
case g: LineString => writeLineString(out, g)
7.
case g: Polygon => writePolygon(out, g)
8.
case g: MultiPoint => writeGeometryCollection(out, WkbSerialization.MultiPoint, g)
9.
case g: MultiLineString => writeGeometryCollection(out, WkbSerialization.MultiLineString, g)
10.
case g: MultiPolygon => writeGeometryCollection(out, WkbSerialization.MultiPolygon, g)
11.
case g: GeometryCollection => writeGeometryCollection(out, WkbSerialization.GeometryCollection, g)
12.
}
13.
}
14.
}
在上述代码当中,可以看出,GeoMesa对于地理信息类型又分为Point、LineString、Polygon、MultiPoint、MultiLineString、MultiPolygon、GeometrCollection这些类型。在进一步的序列化过程中,针对不同的类型,有不同的序列化方法与之对应。这些方法都存在于org.locationtech.geomesa.features.serialization.WkbSerialization类中。为了介绍简便,在此以writePoint方法为例。
1.
private def writePoint(out: T, g: Point): Unit = {
2.
val coords = g.getCoordinateSequence
3.
val (flag, writeDims) = if (coords.getDimension == 2) { (Point2d, false) } else { (Point, true) }
4.
out.writeInt(flag, optimizePositive = true)
5.
writeCoordinateSequence(out, coords, writeLength = false, writeDims)
6.
}
首先,GeoMesa会对point类型数据进行转化,调用org.locationtech.jts.geom.Point类中的getCoordinateSequence方法,将Point的WKT形式转换为Coordinate对象。例如:原来的“POINT (116.31412 39.89454)”转换为coordinates=” ((116.31412, 39.89454, NaN))”。其中NaN其实表示的是高度信息,由于在此处没有高度信息,因此显示为NaN。
1.
public CoordinateSequence getCoordinateSequence() {
2.
return coordinates;
3.
}
返回到writePoint方法中,接下来会对点的维度进行判断,如果是二维的点,就会设定flag参数为1,如果是多维的点,就会设定flag参数为8。
1.
val (flag, writeDims) = if (coords.getDimension == 2) { (Point2d, false) } else { (Point, true) }
在这段代码当中可以看到程序对flag进行了赋值,这个值就是关于地理信息的参数,这些关于点的参数配置在org.locationtech.geomesa.features.serialization.WkbSerialization当中。
1.
object WkbSerialization {
2.
3.
// 2-d values - corresponds to org.locationtech.jts.io.WKBConstants
4.
val Point2d: Int = 1
5.
val LineString2d: Int = 2
6.
val Polygon2d: Int = 3
7.
8.
val MultiPoint: Int = 4
9.
val MultiLineString: Int = 5
10.
val MultiPolygon: Int = 6
11.
val GeometryCollection: Int = 7
12.
13.
// n-dimensional values
14.
val Point: Int = 8
15.
val LineString: Int = 9
16.
val Polygon: Int = 10
17.
}
最后对于经纬度数据的序列化过程在WkbSerialization类的writeCoordinateSequence方法当中。下列代码的第13行到21行会对点数据内部的参数进行遍历,根据点的维度来控制遍历的次数,最后分维度来将不同维度的数据进行序列化。
1.
private def writeCoordinateSequence(out: T,
2.
coords: CoordinateSequence,
3.
writeLength: Boolean,
4.
writeDimensions: Boolean): Unit = {
5.
val dims = coords.getDimension
6.
if (writeLength) {
7.
out.writeInt(coords.size(), optimizePositive = true)
8.
}
9.
if (writeDimensions) {
10.
out.writeInt(dims, optimizePositive = true)
11.
}
12.
var i = 0
13.
while (i < coords.size()) {
14.
val coord = coords.getCoordinate(i)
15.
var j = 0
16.
while (j < dims) {
17. out.writeDouble(coord.getOrdinate(j))
18.
j += 1
19.
}
20.
i += 1
21.
}
22.
}
接下来,返回到writeFeature方法中,进行下一步的操作。
2.2.3
序列化属性的偏移量
当完成各个属性值的序列化以后,系统虽然已经将各个属性值序列化成字节数组,但是它现在并不知道每个属性值都是从什么位置开始序列化的。为了解决这个问题,回到之前的writeFeature方法之后,GeoMesa开始对每个属性的偏移量进行序列化。具体实现过程如下:
1.
i = 0
2.
val offsetStart = output.position() - offset
3.
while (i < writers.length) {
4.
output.writeInt(offsets(i), true)
5.
i += 1
6.
}
7.
val end = output.position()
8.
output.setPosition(offset + 1)
9.
output.writeInt(offsetStart)
10.
output.setPosition(end)
2.2.4
序列化用户设置的参数
在很多情况下,用户都需要对SimpleFeature进行一系列的设置,这些设置一般都会存在于SimpleFeature对象中的userData属性当中。在序列化的过程中,如果SimpleFeature对象当中存在这些用户设置的信息,我们同样需要对这些信息进行序列化。
1.
if (withUserData) {
2.
KryoUserDataSerialization.serialize(output, sf.getUserData)
3.
}
此处会调用针对UserData的序列化方法,具体的实现存在于org.locationtech.geomesa.features.kryo.serialization.KryoUserDataSerialization类的serialize方法当中。
1.
override def serialize(out: Output, javaMap: java.util.Map[AnyRef, AnyRef]): Unit = {
2.
import scala.collection.JavaConverters._
3.
4.
val map = javaMap.asScala
5.
6.
// may not be able to write all entries - must pre-filter to know correct count
7.
val skip = new java.util.HashSet[AnyRef]()
8.
map.foreach { case (k, _) => if (k == null || !canSerialize(k)) { skip.add(k) } }
9.
10.
val toWrite = if (skip.isEmpty) { map } else {
11.
logger.warn(s"Skipping serialization of entries: " +
12.
map.collect { case (k, v) if skip.contains(k) => s"$k->$v" }.mkString("[", "],[", "]"))
13.
map.filterNot { case (k, _) => skip.contains(k) }
14.
}
15.
16.
out.writeInt(toWrite.size) // don't use positive optimized version for back compatibility
17.
18.
toWrite.foreach { case (key, value) =>
19.
out.writeString(baseClassMappings.getOrElse(key.getClass, key.getClass.getName))
20.
write(out, key)
21.
if (value == null) {
22.
out.writeString(nullMapping)
23.
} else {
24.
out.writeString(baseClassMappings.getOrElse(value.getClass, value.getClass.getName))
25.
write(out, value)
26.
}
27.
}
28.
}