第一章
写入操作需要的参数
以前述的利用Java API方式来进行数据写入为例,可以看出,在写入时,主要使用了FeatureWriter来执行写入操作的,而在最后写入之前,事先对这个writer对象设定了DataStore、SimpleFeatureType、SimpleFeature三个参数。
其中DataStore封装了连接信息(例如针对Hbase的hconnection)、metadata(例如针对Hbase的HBaseBackedMetadata)、状态信息(分布式相关)、FeatureWriter的工厂信息、其他配置信息等等。
SimpleFeatureType中封装了schema的信息,其中包括了索引信息、解释器的信息(主要用来解析SimpleFeatureType的字符串里的不同字段信息)、默认地理信息、type的名称等等。
SimpleFeature主要封装了一行数据的信息,其中包括了一行数据的id、featureType(相当于DBMS中的schema)、内容、索引的映射关系等等。
1.
private void writeFeature(DataStore datastore, SimpleFeatureType sft, SimpleFeature feature) {
2.
try {
3.
System.out.println("write test data");
4.
FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
5.
datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT);
6.
SimpleFeature toWrite = writer.next();
7.
toWrite.setAttributes(feature.getAttributes());
8.
((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
9.
toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
10.
toWrite.getUserData().putAll(feature.getUserData());
11.
writer.write();
12.
// 关闭流
13.
writer.close();
14.
} catch (IOException e) {
15.
e.printStackTrace();
16.
}
17.
}
当需要实现客户端对geomesa以及hbase进行读写操作时,可以选择多种操作方式,例如可以采用命令行、Java API、JavaRDD、Spark API、pySpark等等方式来进行。
相比于其他方式,使用Java API理论上来说效率更高一些,不过如果不是在服务器当中运行程序,而是在本地IDE上执行这个操作,这个方式相对来说就比较慢了。
为了给读者一个参考,这里提供了一个最简化版本的Demo,同样,在之后章节里面可能会出现一些数据写入的例子,同样是以这个Demo为依据的。
在这个Demo当中,只生成了一行数据,其中包括taxiId(String)、时间数据(Date)、空间数据(Point)、description(String),简化的数据可以加快实验速度。SimpleFeatureType的名称为index-text02,vatalog的名称为“test”。
1.
public class IODemo01 {
2.
3.
/**
4.
* 这个方法主要设定了表名"index-text02",
5.
* 和schema结构"taxiId:String,dtg:Date,*geom:Point:srid=4326"
6.
* @return SimpleFeatureType,即建表的schema表结构
7.
*/
8.
public SimpleFeatureType getSimpleFeatureType() {
9.
SimpleFeatureType sft = SimpleFeatureTypes.createType("index-text02", "taxiId:String,dtg:Date,*geom:Point:srid=4326,description:String");
10.
sft.getUserData().put(SimpleFeatureTypes.DEFAULT_DATE_KEY, "dtg");
11.
return sft;
12.
}
13.
14.
public static void main(String[] args) {
15.
16.
Map<String, String> params = new HashMap<>();
17.
IODemo01 demo01 = new IODemo01();
18.
19.
try {
20.
//创建datastore
21.
params.put("hbase.catalog", "test");
22.
params.put("hbase.zookeepers", "116.196.121.47:2181");
23.
24.
DataStore datastore = DataStoreFinder.getDataStore(params);
25.
26.
// 创建schema
27.
SimpleFeatureType sft = demo01.getSimpleFeatureType();
28.
System.out.println(sft);
29.
System.out.println(datastore);
30.
datastore.createSchema(sft);
31.
32.
//获取Features
33.
SimpleFeature feature = demo01.getData();
34.
35.
//写入Features
36.
demo01.writeFeature(datastore, sft, feature);
37.
} catch (Exception e) {
38.
e.printStackTrace();
39.
}
40.
41.
}
42.
43.
/**
44.
* 这个方法主要用来将封装好的feature存入到对应的datastore中
45.
* @param datastore 数据源,可以选择的有很多,此处是HBaseDataStore
46.
* @param sft 表结构
47.
* @param feature 封装好的一行数据
48.
*/
49.
private void writeFeature(DataStore datastore, SimpleFeatureType sft, SimpleFeature feature) {
50.
try {
51.
System.out.println("write test data");
52.
FeatureWriter<SimpleFeatureType, SimpleFeature> writer =
53.
datastore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT);
54.
SimpleFeature toWrite = writer.next();
55.
toWrite.setAttributes(feature.getAttributes());
56.
((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
57.
toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
58.
toWrite.getUserData().putAll(feature.getUserData());
59.
writer.write();
60.
// 关闭流
61.
writer.close();
62.
} catch (IOException e) {
63.
e.printStackTrace();
64.
}
65.
66.
}
67.
68.
/**
69.
* 这个方法主要是将非结构化的数据转换为feature对象
70.
* @return feature对象
71.
*/
72.
private SimpleFeature getData() {
73.
SimpleFeatureBuilder builder = new SimpleFeatureBuilder(getSimpleFeatureType());
74.
DateTimeFormatter dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.US);
75.
builder.set("taxiId", "黑A SA2342");
76.
builder.set("dtg", Date.from(LocalDateTime.parse("2008-02-02 13:30:49", dateFormat).toInstant(ZoneOffset.UTC)));
77.
builder.set("geom", "POINT (116.31412 39.89454)");
78.
builder.set("description", "这是一辆套牌的车");
79.
builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
80.
return builder.buildFeature("黑A SA2342");
81.
}
82.
}
建立索引(在此处的索引是动词,指在写入操作中,将数据进行解构并组织,使其能够更快地进行查询)、插入feature都是写操作。基于Hbase进行写单个feature的流程如下图所示(由于GeoMesa最具特色的是其基于Z曲线的三维时空索引,因此在此处以Z3索引为例进行展示):
以下是写入单个feature的流程:
1)
客户端向GeoMesa发送写请求
2)
GeoMesa对传来的feature进行数据的预处理,主要包括featureID的处理、epoch的封装等等,将feature数据和SimpleFeatureType数据封装成为能够进行下一步操作的数据类型。
3)
GeoMesa中的索引机制会根据feature中的数据判断建立的索引类型,然后利用sfcurve中的相关方法,将时间、空间数据格式化,并整合成为一个序列。
4)
GeoMesa会利用Kryo序列化机制,将第三步产生的序列进行序列化,转换成为适合Hbase存储的格式。
5)
最后利用缓存修改器中的put方法,调用Hbase API,将之前处理好的数据写入到Hbase当中。
在调用write方法后,首先会进入org.locationtech.geomesa.index.geotools.GeoMesaFeatureWriter类中,调用write方法。
1.
override def write(): Unit = {
2.
if (currentFeature == null) {
3.
throw new IllegalStateException("next() must be called before write()")
4.
}
5.
writeFeature(currentFeature)
6.
currentFeature = null
7.
}
更进一步,geomesa会调用writeFeature方法,完成写入操作的具体实现就是利用此方法来实现的。
1.
protected def writeFeature(feature: SimpleFeature): Unit = {
2.
//查看是否有一个指定id,如果没有,系统会生成一个id
3.
val featureWithFid = GeoMesaFeatureWriter.featureWithFid(sft, feature)
4.
val wrapped = wrapFeature(featureWithFid)
5.
6.
val converted = try {
7.
writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }
8.
} catch {
9.
case NonFatal(e) =>
10.
val attributes = s"${featureWithFid.getID}:${featureWithFid.getAttributes.asScala.mkString("|")}"
11.
throw new IllegalArgumentException(s"Error indexing feature '$attributes'", e)
12.
}
13.
converted.foreach { case (mutator, writes) => executeWrite(mutator, writes) }
14.
statUpdater.add(featureWithFid)
15.
}
由于用户在创建feature时,有可能设定了id,也可能没有设定id,因此在开始,geoemesa需要对feature进行判断,如果有设定id,就使用用户指定的id,如果没有,就使用uuid随机生成一个id。
具体实现是在featureWithFid方法当中。首先,系统会对feature进行判断,也就是Hint.PROVIED_FID,如果为true,就使用此fid,如果没有,就利用idGenerator进行生成id的操作。
1.
def featureWithFid(sft: SimpleFeatureType, feature: SimpleFeature): SimpleFeature = {
2.
if (feature.getUserData.containsKey(Hints.PROVIDED_FID)) {
3.
withFid(sft, feature, feature.getUserData.get(Hints.PROVIDED_FID).toString)
4.
} else if (feature.getUserData.containsKey(Hints.USE_PROVIDED_FID) &&
5.
feature.getUserData.get(Hints.USE_PROVIDED_FID).asInstanceOf[Boolean]) {
6.
feature
7.
} else {
8.
withFid(sft, feature, idGenerator.createId(sft, feature))
9. }
10. }
由于原生的SimpleFeature当中并没有包含Hbase相关的一些属性,不利于SimpleFeature最终插入到Hbase表当中,因此在这一步,GeoMesa将SimpleFeature对象转换为HbaseFeature对象。
这个过程是在org.locationtech.geomesa.hbase.data.HBaseFeatureWriter类当中进行的。具体实现方法是wrapFeature方法。
1.
override protected def wrapFeature(feature: SimpleFeature): HBaseFeature = wrapper(feature)
接下来调用org.locationtech.geomesa.hbase.data.HBaseFeature类当中的wrapper方法。
1.
def wrapper(sft: SimpleFeatureType): (SimpleFeature) => HBaseFeature = {
2.
val serializers = HBaseColumnGroups.serializers(sft)
3.
val idSerializer = GeoMesaFeatureIndex.idToBytes(sft)
4.
(feature) => new HBaseFeature(feature, serializers, idSerializer)
5.
}
从这个方法中可以看出,在转换Feature格式的过程当中,新的HbaseFeature当中又封装了一些序列化的对象。IdSerializer在此时是空的对象,而serializers对象已经被赋值了,是KyroFeatureSerializer中的内部类MutableActiveSerializer的一个实例。可以看出,在进行数据序列化的过程当中,GeoMesa-Hbase使用的是Kyro序列化机制。
在进行格式转化之后,回到GeoMesaFeatureWriter类的writeFeature方法当中进行封装。
1.
writers(featureWithFid).map { case (mutator, convert) => (mutator, convert(wrapped)) }
但是这个时候的数据虽然可以进行序列化、也能够调用Hbase的API插入到Hbase当中,但是由于时空数据往往会分布不均匀,可能会产生严重的数据倾斜问题,所以这是需要将数据进行分片存储。这个方法是在org.locationtech.geomesa.index.index.BaseFeatureIndex的writer方法当中实现的。
1.
override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {
2.
val sharing = sft.getTableSharingBytes
3.
val shards = shardStrategy(sft)
4.
val toIndexKey = keySpace.toIndexKeyBytes(sft)
5.
mutator(sharing, shards, toIndexKey, createInsert)
6.
}
在这个方法当中看到有四个参数,其中比较重要的是shards和toIndexKey,shards是负责分片存储策略实现的,toIndexKey是负责建立索引的参数。
在进一步的方法调用中,GeoMesa会判断id是用户指定的还是系统自动生成的uuid,如果是用户指定的,就会调用StringToBytes方法,将用户指定的id当成一个字符串来进行hash运算。如果是系统生成的uuid,GeoMesa就会调用uuidToBytes方法,将uuid直接进行hash运算。
在执行完上述操作后,geomesa就开始调用toIndexKey来进行索引的建立工作了。
在GeoMesa将feature存入到Hbase的过程当中,GeoMesa会生成相应的索引值,而这个过程是通过调用org.locationtech.geomesa.index.index.z3.Z3IndexKeySpace中的toIndexKeyBytes方法实现的。
1.
override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {
2.
val z3 = sfc(sft.getZ3Interval)
3.
val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)
4.
val dtgIndex = sft.getDtgIndex.getOrElse(throw new IllegalStateException("Z3 index requires a valid date"))
5.
val timeToIndex = BinnedTime.timeToBinnedTime(sft.getZ3Interval)
6.
7.
getZValueBytes(z3, geomIndex, dtgIndex, timeToIndex, lenient)
8.
}
而在其进一步调用的getZValueBytes方法当中,可以看到这个索引值具体是如何封装的。
1.
private def getZValueBytes(z3: Z3SFC,
2.
geomIndex: Int,
3.
dtgIndex: Int,
4.
timeToIndex: TimeToBinnedTime,
5.
lenient: Boolean)
6.
(prefix: Seq[Array[Byte]],
7.
feature: SimpleFeature,
8.
suffix: Array[Byte]): Seq[Array[Byte]] = {
9.
val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]
10.
if (geom == null) {
11.
throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")
12.
}
13.
val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]
14.
val time = if (dtg == null) { 0 } else { dtg.getTime }
15.
val BinnedTime(b, t) = timeToIndex(time)
16.
val z = try { z3.index(geom.getX, geom.getY, t, lenient).z } catch {
17.
case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry/time: $geom,$dtg", e)
18.
}
19.
20.
// create the byte array - allocate a single array up front to contain everything
21.
val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)
22.
var i = 0
23.
prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }
24.
ByteArrays.writeShort(b, bytes, i)
25.
ByteArrays.writeLong(z, bytes, i + 2)
26.
System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)
27.
Seq(bytes)
28.
}
在方法的前半段,从第9行到第17行,该方法实现了如下功能:首先将地理信息提取出来,转换为本身的Point对象,并且判断是否有地理信息,之后会将时间信息提取出来,调用相关的方法进一步建立索引。
由于时间是无限的,因此为了适应GeoHash二分编码的规则,与经纬度信息形成八叉树的索引结构,GeoMesa内置了Binned机制,将时间以公元1970年1月1日零时零分为起始,按照天、周(默认)、月、年四种区间,形成相对有限的chunk进行计量,以利于后期的二分操作。
但是因为索引长度是有限的,因此这个方式会有一些局限性,主要体现在表示的时间长度比较有限。如果以天为单位,偏移量的单位就是毫秒,最大表示的时间为公元2059年9月18日;如果以周为单位,这个也是GeoMesa默认的一种设置,偏移量的单位就是秒,最大表示的时间为公元2598年1月4日;如果以月为单位,偏移量的单位也是秒,最大表示的时间为公元4700年8月31日;如果以年为单位,偏移量的单位就是分钟,最大表示的时间为公元34737年12月31日。
一般情况下,由于项目生命周期和时间粒度的权衡下,以周为时间周期进行计算是比较合理的,因此现在的GeoMesa默认的设置同样是这样的。
这个过程具体是由org.locationtech.geomesa.curve.BinnedTime类承担的,而具体到现在写入数据的过程,主要是调用timeToBinnedTime方法来实现的。
1.
def timeToBinnedTime(period: TimePeriod): TimeToBinnedTime = {
2.
period match {
3.
case TimePeriod.Day => toDayAndMillis
4.
case TimePeriod.Week => toWeekAndSeconds
5.
case TimePeriod.Month => toMonthAndSeconds
6.
case TimePeriod.Year => toYearAndMinutes
7.
}
8.
}
更进一步的调用,可以点开toWeekAndSeconds方法进行查看。
这一步是GeoMesa建立时空索引的核心部分,在执行这一步之前,用户需要将经纬度、时间的参数都准备好,经纬度的信息都是Double类型的数据,时间的信息需要转换成为Long类型的数据,然后进入org.locationtech.geomesa.curve.Z3SFC类中,执行index方法。
1.
override def index(x: Double, y: Double, t: Long, lenient: Boolean = false): Z3 = {
2.
try {
3.
require(x >= lon.min && x <= lon.max && y >= lat.min && y <= lat.max && t >= time.min && t <= time.max,
4.
s"Value(s) out of bounds ([${lon.min},${lon.max}], [${lat.min},${lat.max}], [${time.min},${time.max}]): $x, $y, $t")
5.
Z3(lon.normalize(x), lat.normalize(y), time.normalize(t))
6.
} catch {
7.
case _: IllegalArgumentException if lenient => lenientIndex(x, y, t)
8.
}
9.
}
在上述源码的第4行,可以看到,经度、维度和时间的数据都调用了normalize方法。这是对这些数据进行格式化处理。
在此之后,对于经过格式化的数据,GeoMesa利用org.locationtech.sfcurve.zorder.Z3类对这些数据进行了再次的封装。
1. def apply(x: Int, y: Int, z: Int): Z3 = {
2. new Z3(split(x) | split(y) << 1 | split(z) << 2)
3. }
其中,会调用一些切分、编码、解码的方法,在此不赘述。
在前述的Z3IndexKeySpace类中的getZValueBytes方法的后半段(第20行到第28行),主要实现了前面处理数据的序列化操作。
1.
private def getZValueBytes(z3: Z3SFC,
2.
geomIndex: Int,
3.
dtgIndex: Int,
4.
timeToIndex: TimeToBinnedTime,
5.
lenient: Boolean)
6.
(prefix: Seq[Array[Byte]],
7.
feature: SimpleFeature,
8.
suffix: Array[Byte]): Seq[Array[Byte]] = {
9.
val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]
10.
if (geom == null) {
11.
throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")
12.
}
13.
val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]
14.
val time = if (dtg == null) { 0 } else { dtg.getTime }
15.
val BinnedTime(b, t) = timeToIndex(time)
16.
val z = try { z3.index(geom.getX, geom.getY, t, lenient).z } catch {
17.
case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry/time: $geom,$dtg", e)
18.
}
19.
20.
// create the byte array - allocate a single array up front to contain everything
21.
val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)
22.
var i = 0
23.
prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }
24.
ByteArrays.writeShort(b, bytes, i)
25.
ByteArrays.writeLong(z, bytes, i + 2)
26.
System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)
27.
Seq(bytes)
28.
}
在上一步中,数据已经被整合成为符合Z3填充曲线规则的格式了,同时用Z3封装好了,接下来就是将这些数据解析成为一个long类型数据并进一步转化成为能够存入到Hbase当中的二进制序列。
实现这个功能的类是org.locationtech.geomesa.utils.index.ByteArrays类,而实现这个过程需要先后调用其中的writeShort方法和writeLong方法。如下:
WriteShort方法:
1.
def writeShort(short: Short, bytes: Array[Byte], offset: Int = 0): Unit = {
2.
bytes(offset) = (short >> 8).asInstanceOf[Byte]
3.
bytes(offset + 1) = short.asInstanceOf[Byte]
4.
}
WriteLong方法:
1.
def writeLong(long: Long, bytes: Array[Byte], offset: Int = 0): Unit = {
2.
bytes(offset ) = ((long >> 56) & 0xff).asInstanceOf[Byte]
3.
bytes(offset + 1) = ((long >> 48) & 0xff).asInstanceOf[Byte]
4.
bytes(offset + 2) = ((long >> 40) & 0xff).asInstanceOf[Byte]
5.
bytes(offset + 3) = ((long >> 32) & 0xff).asInstanceOf[Byte]
6.
bytes(offset + 4) = ((long >> 24) & 0xff).asInstanceOf[Byte]
7.
bytes(offset + 5) = ((long >> 16) & 0xff).asInstanceOf[Byte]
8.
bytes(offset + 6) = ((long >> 8) & 0xff).asInstanceOf[Byte]
9.
bytes(offset + 7) = (long & 0xff).asInstanceOf[Byte]
10.
}
上述数据都处理完成以后,mutator会将index、shards、feature等数据进行封装
1. private def mutator(sharing: Array[Byte],
2. shards: ShardStrategy,
3. toIndexKey: (Seq[Array[Byte]], SimpleFeature, Array[Byte]) => Seq[Array[Byte]],
4. operation: (Array[Byte], F) => W)
5. (feature: F): Seq[W] = {
6. toIndexKey(Seq(sharing, shards(feature)), feature.feature, feature.idBytes).map(operation.apply(_, feature))
7. }
Geomesa中利用HTable与Hbase进行交互,而封装数据的put对象存在于org.locationtech.geomesa.hbase.index.HBaseIndexAdapter类当中,通过调用createInsert方法,来实现这个过程,该方法返回一个Mutation对象,其中就封装了数据的一次写操作。
1.
override protected def createInsert(row: Array[Byte], feature: HBaseFeature): Mutation = {
2.
val put = new Put(row)
3.
feature.values.foreach(v => put.addImmutable(v.cf, v.cq, v.value))
4.
feature.visibility.foreach(put.setCellVisibility)
5.
put.setDurability(HBaseIndexAdapter.durability)
6.
}
由于Hbase本身存储数据是以二进制的格式将数据进行存储的,最终将数据存储到HDFS当中的HFile当中,所以这个持久化的过程必然会涉及到数据的序列化的问题。而在这个过程当中,GeoMesa主要用了Kryo的序列化方式。
kryo是一个高性能的序列化/反序列化工具,由于其变长存储特性并使用了字节码生成机制,拥有较高的运行速度和较小的体积。当然,序列化的方式有多种多样,GeoMesa也提供了例如avro、prottobuffer等序列化机制,用户在进行开发的过程中,可以进行选择。
这个过程主要涉及到两个类,HbaseFeature类以及org.locationtech.geomesa.features.kryo.impl.KryoFeatureSerialization类。HbaseFeature类承担了数据封装和调用方法的作用,同样也是贯穿整个写操作的一个基础类。KryoFeatureSerialization类则是承担了数据序列化的基础性工作,具体实现序列化功能的是serialize方法:
1.
override def serialize(sf: SimpleFeature): Array[Byte] = {
2.
val output = KryoFeatureSerialization.getOutput(null)
3.
writeFeature(sf, output)
4.
output.toBytes
5.
}
经过比较复杂的底层转换,最终实现数据插入的是利用org.apache.hadoop.hbase.client.Put类当中的addImmutable方法。
1.
public Put addImmutable(byte [] family, byte [] qualifier, long ts, byte [] value) {
2.
if (ts < 0) {
3.
throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
4.
}
5.
List<Cell> list = getCellList(family);
6.
KeyValue kv = createPutKeyValue(family, qualifier, ts, value);
7.
list.add(kv);
8.
familyMap.put(family, list);
9.
return this;
10.
}
这个方法里,在对KeyValue进行设置以后,进一步调用add方法,此时就将数据存储到Hbase当中了。
1
改造的背景
1.1 GeoMesa-Hbase本身的问题
Geomesa在向Hbase写入数据时,是通过HTable和Put对象来实现的。在写入的时候,需要指定列族名、列名和Cell的value值,而这些指定的内容都需要经过序列化才能进入到Hbase当中。
在原生的GeoMesa当中,value值是将整个feature的所有信息都封装在一起,统一进行序列化的。虽然这样能够让存储的空间更小,避免了查询多个字段的问题,但是这样的方式使得查询没有针对性,在进行查询时,一些价值不大的信息也会被查询到,极大地占用了资源。
1.2 预期目标
希望能够通过一些源码级别的修改,找到GeoMesa与Hbase进行交互的位置,将插入过程从单列写入改造成为多列写入,将整个的feature信息存储由存储为一个列转变为分属性存储在不同的列中。以此来提升查询的针对性和效率。
2
过程分析
在GeoMesa当中,主要负责插入数据是org.locationtech.geomesa.hbase.index.HBaseIndexAdapter类当中的createInsert方法,在这个方法当中,GeoMesa创建了Put对象,一个Put对象对应一个行键,这个行键是已经经过时空索引处理过后的二进制数组。
1.
override protected def createInsert(row: Array[Byte], feature: HBaseFeature): Mutation = {
2.
val put = new Put(row)
3.
feature.values.foreach(v => put.addImmutable(v.cf, v.cq, v.value))
4.
feature.visibility.foreach(put.setCellVisibility)
5.
put.setDurability(HBaseIndexAdapter.durability)
6. }
然后这个put对象设定了若干个value,但是在默认情况下,这个feature对象中的value只有一个对象,就是整个feature当中的所有信息。而具体设定这些信息的则是通过org.locationtech.geomesa.hbase.data.HBaseFeature当中的values值来设定的。
1.
lazy val values: Seq[RowValue] = serializers.map { case (colFamily, serializer) =>
2.
new RowValue(colFamily, HBaseColumnGroups.default, serializer.serialize(feature))
3.
}
从上面的代码可以看出,values的类型是封装了若干个RowValue对象的序列,而在其内部设定时依然是单个RowValue对象,也就是说,values封装的是serializers遍历之后的所有RowValue对象。
在创建RowValue对象时,其中设定列名的是第二个参数,默认情况下是HBaseColumnGroups.default,这个参数存在于org.locationtech.geomesa.hbase.index.HBaseColumnGroups类当中,默认情况下是“d”。也就是说默认情况下,写入的数据只有一个列,列名为“d”。
1. override val default: Array[Byte] = Bytes.toBytes("d")
3
改造方法
3.1 切入点
由上述分析可以看出,主要可以修改的点有三处:
首先,可以修改HBaseColumnGroups类当中的default参数,这样的话就可以修改列名。但是这样的修改方式只是将列名修改过来了,虽然在Hbase当中会显示新的列名,但是本质上依然是整个feature进行序列化,整个存储的过程。而且这样的修改方式也会连带地修改列族名,由修改过的实验可以看出,在进行写入操作时,列族名也会调用这个default参数。
其次还可以修改HBaseIndexAdapter类当中的createInsert方法,在进行feature.values的遍历时,可以进行拆分。但是这种方式仍然是在RowValue封装好以后才进行遍历的,本质上value值已经被序列化了,再进行拆分已经没有价值了。
最好的方案是修改HBaseFeature类当中的values,因为RowValue是在这个函数内生成的,因此可以进行一些更为细致的操作。
3.2 修改过程
首先需要在values函数当中,判断feature里面的属性存储情况,以便抽取不同的数据,将这些数据分成不同的RowValue进行封装,存储。
1. lazy val values: Seq[RowValue] = serializers.flatMap { case (colFamily, serializer) =>
2. // Fid
3. println(feature.getAttribute(0))
4. // Date
5. println(feature.getAttribute(1))
6. // Geometry
7. println(feature.getAttribute(2))
8. // Description
9. println(feature.getAttribute(3))
执行修改后的程序,可以看到这些属性成功被取出来了。
接下来就是将这些数据封装在不同的RowValue对象当中。修改如下:
1.
lazy val values: Seq[RowValue] = serializers.flatMap { case (colFamily, serializer) =>
2.
Seq(
3.
new RowValue(colFamily, Bytes.toBytes("Fid"), Bytes.toBytes(feature.getAttribute(0).toString)),
4.
new RowValue(colFamily, Bytes.toBytes("Date"), Bytes.toBytes(feature.getAttribute(1).toString)),
5.
new RowValue(colFamily, Bytes.toBytes("Geometry"), Bytes.toBytes(feature.getAttribute(2).toString)),
6.
new RowValue(colFamily, Bytes.toBytes("Description"), Bytes.toBytes(feature.getAttribute(3).toString)))
7.
}
3.3 运行结果
在默认情况下,value当中将feature的所有数据都放在了一起,呈现出来的就是只有一列。
而经过修改以后,不同属性的数据都已经被放在了不同的列当中。
4
存在的问题
最大的问题就是这样的修改可能会导致数据无法通过原有GeoMesa来进行查询的问题,因为原有GeoMesa依然是通过列族名“d”和列名“d”来进行查询的。
还有一个问题就是和原有序列化机制可能会出现冲突的问题。因为如果写入的序列化机制和读取的序列化机制产生冲突,就会报版本错误的问题。