开发者社区 > 博文 > GeoMesa-Hbase源码篇 ——索引机制
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

GeoMesa-Hbase源码篇 ——索引机制

  • 京东城市JUST团队
  • 2021-01-18
  • IP归属:未知
  • 66080浏览


第一章   概述

第一节   GeoMesa索引简介

GeoMesa 索引的基本原理计算将三维(经度、纬度、时间)的数据按照Z曲线进行降维,得到一维数据作为Key使用,方便在key-value数据库中进行查询。 实际的Key结构比简单的键值对更复杂。在AccumuloGeoMesa索引的结构如图:

第二节   分类

索引名称

索引解释

Z2 [ z2]

Z2索引使用二维Z阶曲线来索引点数据的纬度和经度。如果要素类型具有几何类型,则将创建此索引 Point。这用于有效地响应具有空间组件但没有时间组件的查询。

Z3 [ z3]

Z3索引使用三维Z阶曲线来索引点数据的纬度,经度和时间。如果要素类型具有几何类型Point且具有时间属性,则将创建此索引。这用于有效地回答具有空间和时间组件的查询。

XZ2 [ xz2]

XZ2索引使用XZ-ordering [1]的二维实现来索引非点数据的纬度和经度。XZ排序是Z-排序的扩展,设计用于空间扩展对象(即非点几何,如线串或多边形)。如果要素类型具有非Point几何图形,则将创建此索引。这用于有效地回答具有空间组件但没有时间组件的查询。

XZ3 [ xz3]

XZ3索引使用XZ-ordering [1]的三维实现来索引非点数据的纬度,经度和时间。如果要素类型具有非Point几何并且具有时间属性,则将创建此索引。这用于有效地回答具有空间和时间组件的查询。 

Record / ID [ id]

记录索引使用功能ID作为主键。它用于ID的任何查询。此外,某些属性查询可能最终从记录索引中检索数据。

Attribute [ attr]

属性索引使用属性值作为主索引键。这允许在没有时空组件的情况下快速检索查询。属性索引包括辅助时空密钥,其可以改进具有多个谓词的查询。

 

第三节   传统空间索引方式

GeoMesa的数据存储使用 key-value数据库。key-value数据库是一种NoSQL数据库,其数据按照键值对的形式进行组织、索引和存储。 AccumuloHBaseGoogle Cloud Bigtable对这些键进行排序,并可将它们存储在任意数量的节点(服务器)上。

当使用key-value数据库时,Key的良好设计可以使应用程序更高效的运行。与关系数据库不同的是,key-value数据库中会频繁的使用key进行查询。例如在订单数据库中,会以订单号作为key进行存储,当用户查询订单的时候,即可用通过订单号直接查询到订单数据并返回该条数据。AccumuloHBaseCloud Bigtable都是使用类似的机制进行工作,GeoMesa同样也是使用该机制来进行数据组织。GeoMesa根据时空数据的特点,实现了生成包含时空信息的Key的算法,算法的基本思想如图: 

图中的红线被称为空间填充曲线,又称为Z曲线。该线顺序访问每个单元格一次,并且能够保证访问次序的唯一性。 Z曲线也能用于高分辨率地图,如图:

Z曲线上的每个点都可以赋予一个顺序值,通过这个顺序值,GeoMesa将经纬度表示为一个整数,这样就将二维数据降为一维数据,可以作为key-value数据库中的key使用。因为Z曲线支持多维数据,所以GeoMesa也支持将多维数据降为一维数据,作为key使用。

第四节   GeoMesa时空索引机制原理

如何对时间空间数据进行有效地编码,这是一个GIS研究工作者一直在研究的问题。只有让这些数据能够快速的被查询到,这样的体系才是有用的。而这个工作的关键在于query planner上了。

1.   建立时空索引

在此处介绍的案例是基于accumulo data storeGeoMesa基本的一个data store)的。这个框架是一个分布式的键值对数据源。而在给时空数据建立索引的过程就是要将三维的数据进行降维处理,即让经度、维度、时间数据转化成为单一维度的数据。这个特定的降维过程就是被index-schema的格式所定义的。

下面的图片就展示了一个简单的利用空间填充曲线来进行降维的过程。在这个例子当中,index-schema的格式是10个比特,遵循的模式就是“YXTTYXTTYX”。在这个过程当中,三个维度都呈现在Z型曲线当中。这个编码方式就是GeoMesa当中默认的一种index-schema的编码格式。

在前面的两个条带就展示了逻辑顺序的存储过程和物理顺序的存储过程。

2.   Query Planning

例如,我们需要进行一个查询,查询条件如下:

l  -180 ≤ longitude < 45

l  -90 ≤ latitude < 22.5

l  0 < time < 9 (on an arbitrary scale of 0 to 16 for this illustration)

这个对应的查询范围正好是三个维度的最低值。下图就可以展示出这些key对应的查询位置:

这个例子使用的是10 bit的编码格式,所以在立方体当中只有1024个查询结果。默认的GeoMesa index-schema利用了超过55 bit,而这样利用穷举来进行查询的效率会非常的低。不过幸运的是,Geohash在进行位置信息的编码时,对于String类型的字符串的存储是分层的,因此这样就可以大大的降低查询的资源开销。

第五节   不同Data Store的索引设计

1.   Hbase Data StorerowKey设计

1.1    针对Point的时间+空间三维索引(Z3

ShardKey(1 byte)+Epoch Week(2 bytes)+Z3(x,y,t)(8 bytes)+FeatureID

1.2    针对Point的空间索引(Z2

ShardKey(1 byte)+Z2(x,y)(2 bytes)+FeatureID

1.3    针对复杂空间对象(如Polygon)的时间+空间三维索引(XZ3

ShardKey(1 byte)+Epoch Week(2 bytes)+XZ3(minX, minY, maxX, maxY)(8 bytes)+FeatureID

1.4    针对复杂空间对象(如Polygon)的空间二维索引(XZ2

ShardKey(1 byte)+XZ2(minX, minY, maxX, maxY)(8 bytes)+FeatureID

1.5    Attribute索引

IdxBytes(2 bytes)+ShardKey(1 byte)+AttrValue+SplitByte(1 byte)+SecondaryIndex(Z3/XZ3/Z2/XZ2)+FeatureID

1.6    ID索引

FeatureID

 

第二章   理论准备

第一节   数据标准化方法

1.   数据标准化的背景

数据的标准化(normalization)是将数据按比例缩放,使之落入一个小的特定区间。在某些比较和评价的指标处理中经常会用到,去除数据的单位限制,将其转化为无量纲的纯数值,便于不同单位或量级的指标能够进行比较和加权。

假设对3名新生婴儿体重(567)和3名成年人的体重(150151152)差异的大小进行对比分析,从表面上看,两组人员的平均差异均为1斤,由此便得出两组人员的体重差异程度相同显然是不合适,因为两者的体重水平不在同一等级上,即量纲不同。

对多个指标进行综合分析,假设对商品的运营指标销售量、销售额、浏览量进行综合评价或聚类分析,由于各指标间的水平相差很大,如果直接进行分析会突出数值较高的指标在综合分析中的作用,从而使各个指标以不等权参与运算。

因此,常常需要先对数据进行标准化,对各统计指标进行无量纲化处理,消除量纲影响和变量自身变异大小和数值大小的影响。

而对于时空数据,同样存在这样的量纲不统一的问题。例如时 间上,其单位为秒、分钟、小时,而在空间上,其单位又根据坐标系的不同,会选择不同的量度体系。如果选择大地坐标系,这个量度的标准就是度,如果选择空间直角坐标系,这个量度就变成了米、千米。因此这个矛盾同样是非常突出的,需要通过数据标准化的方式来解决。

GeoMesa当中,在进行数据的预处理的过程当中,就用到了数据标准化的方法,使得时空数据可操作。而数据标准化的方法在统计学中非常多,其中比较有代表性的方法就是Max-Min标准化方法和Z-score标准化方法。

2.   Max-Min标准化/离差标准化

该方法将某个变量的观察值减去该变量的最小值,然后除以该变量的离差,其标准化的数值落到[0,1]区间,转换函数为:x=(x-min)/(max-min),其中max为样本的最大值,min为样本的最小值。其过程可以表达如下

对序列x1x2xn进行变换:

则新序列y1y2yn  [0,1] 无量纲。

该方法对原始数据进行线性变换,保持原始数据之间的联系,其缺陷是当有新数据加入时,可能导致maxmin的变化,转换函数需要重新定义。GeoMesa当中采用的数据标准化方法就是这种方法。

3.   Z-score 标准化/标准差标准化/零均值标准化

该方法将某变量中的观察值减去该变量的平均数,然后除以该变量的标准差,标准化后的数据符合标准正态分布。转化为数学表达式如下:

上式中,x是原始数值,u是样本均值,σ是样本标准差。回顾下正态分布的基本性质,若xN(u,σ2),则有

其中,N0,1)表示标准正态分布。

可以看出,z-score标准化方法试图将原始数据集标准化成均值为0,方差为1且接近于标准正态分布的数据集。然而,一旦原始数据的分布 接近于一般正态分布,则标准化的效果会不好。该方法比较适合数据量大的场景(即样本足够多,现在都流行大数据,因此可以比较放心地用)。此外,相对于min-max归一化方法,该方法不仅能够去除量纲,还能够把所有维度的变量一视同仁(因为每个维度都服从均值为0、方差1的正态分布),在最后计算距离时各个维度数据发挥了相同的作用,避免了不同量纲的选取对距离计算产生的巨大影响。所以,涉及到计算点与点之间的距离,如利用距离度量来计算相似度、PCALDA,聚类分析等,并且数据量大(近似正态分布),可考虑该方法。相反地,如果想保留原始数据中由标准差所反映的潜在权重关系应该选择min-max归一化,

第二节   GeoHash算法原理

4.   GeoHash概述

Geohash是将整个地图或者某个分割所得的区域进行一次划分,由于采用的是base32编码方式。即Geohash中的每一个字母或者数字(如wx4g0e中的w)都是由5bits组成(2^5 = 32base32),这5bits可以有32中不同的组合(0~31),这样我们可以将整个地图区域分为32个区域,通过00000 ~ 11111来标识这32个区域。第一次对地图划分后的情况如下图所示(每个区域中的编号对应于该区域所对应的编码):

Geohash01串序列是经度01序列和纬度01序列中的数字交替进行排列的,偶数位对应的序列为经度序列,奇数位对应的序列为纬度序列,在进行第一次划分时,Geohash01序列中的前5bits11100),那么这5bits中有3bits是表示经度,2bits表示纬度,所以第一次划分时,是将经度划分成8个区段(2^3 = 8),将纬度划分为4个区段(2^2 = 4),这样就形成了32个区域。如下图:

5.   GeoHash的实现流程

2.1    将经纬度转化为二进制编码

以经纬度值:(116.389550 39.928167)进行算法说明,对纬度39.928167进行逼近编码 (地球纬度区间是[-90,90]

1)     区间[-90,90]进行二分为[-90,0),[0,90],称为左右区间,可以确定39.928167属于右区间[0,90],给标记为1

2)     接着将区间[0,90]进行二分为 [0,45),[45,90],可以确定39.928167属于左区间 [0,45),给标记为0

3)     递归上述过程39.928167总是属于某个区间[a,b]。随着每次迭代区间[a,b]总在缩小,并越来越逼近39.928167

4)     如果给定的纬度x39.928167)属于左区间,则记录0,如果属于右区间则记录1,序列的长度跟给定的区间划分次数有关

2.2    合并经纬度数据为一个数据

偶数位放经度,奇数位放纬度,把2串编码组合生成新串如下图:

2.3    按照base32编码集进行编码

首先将11100 11101 00100 01111 0000  01101转成十进制,对应着2829415013 十进制对应的base32编码就是wx4g0e,如下图

6.   GeoHash算法的问题

现在大多数采用GeoHash算法的软件采用的都是Peano曲线(Z曲线),这种曲线最大的问题就是容易出现空间的突变问题。例如在下面右图当中,虽然编码为0111的点和编码为1000的点虽然在编码上是非常接近的,但是实际上,两个点的空间位置是非常远的。

所以在实际操作当中,往往在计算空间距离时,会对每个点周围的点进行计算,这算是对于这种索引机制的一种补充。

7.   GeoHash算法的展望

在现有的空间填充曲线理论当中,已经有了一些方案,如下图。可以看出,这些填充曲线都是为了能够将空间当中所有的数据利用一定的规律来进行排序,实现降维的操作。结合前面GeoHash算法面临的问题,可以看出,在众多的空间填充曲线的解决方案当中,希尔伯特曲线(Hilbert)的突变性是最低的,同样这个方案也是很多空间数据开发者推荐的一个方案。但是这种方案的计算开销比较大,而且算法实现是比较困难的,尤其是如果延伸到三维数据,这样的编码往往会更加复杂。所以大多数现有的利用GeoHash算法的软件采用的也都是peano曲线,不过在以后,随着空间数据库的发展,这个方案是一个非常不错的发展方向。

第三章   XZ索引概述

随着大数据时代的到来,越来越多的行业出现了海量数据存储和查询的需求。其中,与地理信息相关的行业同样面临这样的问题,而如何在数据库当中建立这些数据的索引成为了问题的关键。但是相对于传统的数据类型,例如:整型、浮点型数据等,地理信息承载了更多的数据,而且其数据类型更加复杂多样,例如:PointLineStringPolygon等等。为了更好地为这些数据建立索引,提升数据的写入和查询效率,Christian Böhm, Gerald Klump Hans-Peter Kriegel基于传统的空间填充曲线理论提出了XZ曲线来解决这个问题。本章的内容来源于他们在1999年的论文《XZ-Ordering: A Space-Filling Curve

for Objects with Spatial Extension》,本章将这篇论文的主要原理进行翻译和解释,旨在帮助读者更好地理解这个方法。

第一节   研究动机

在过去的二十多年时间内,对于空间索引的研究出现了大量的成果。但是大量的研究成果都是基于树形结构,例如R-tree及其变体。除此以外,大家也逐渐对于将地理信息整合起来的方法产生了兴趣。

地理信息系统(GIS)是一种数据密集型的系统,其中包含了主题属性和地理属性。所谓主题属性就是往往被我们存储在关系型数据库当中的数据,这种方式可以保证这些属性的易于统计和管理,但是这种方式并不适合存储地理属性,因为地理属性往往更加复杂,无法存储在传统的关系型数据库当中。所以在以往的存储方案当中,主题属性往往是存储在关系型数据库当中(MysqlOracle等),地理属性往往存储在基于文件的多维度混合索引结构当中,例如:GmlShp文件等等。不过近些年一些关系型数据库也逐渐可以支持地理信息的存储了,比如PostGIS等。

1.     <gml:featureMember>  

2.         <fj>  

3.        <fj.FeatureID>1</fj.FeatureID>  

4.        <fj.AREA>2360164864</fj.AREA>  

5.        <fj.Geometry>  

6.          <gml:Polygon>  

7.         <gml:outerBoundaryIs>  

8.           <gml:LinearRing>  

9.             <gml:coordinates>  

10.           713239.3125,2938893.0  713174.5,2938715.0         ......  

11.           </gml:coordinates>  

12.         </gml:LinearRing>  

13.       </gml:outerBoundaryIs>  

14.        </gml:Polygon>  

15.      </fj.Geometry>  

16.       </fj>  

17.   </gml:featureMember>  

这种方案有很多缺点,尤其是将数据分别存储在数据库系统是很难维持的。如果一个更新操作既包含地理信息也包含主题信息,如果主题信息更新失败,这里的地理信息也一定要更新失败,这样才能够保证数据的同步,反之亦然。所以为了实现这个功能,我们就需要一个针对异构型数据的分布式提交模型。而且文件系统和数据库系统通常有不同的数据安全策略,备份和读写方式,文件系统不能保证物理和逻辑数据相互独立。

而经过分析,无论是使用面向对象数据库还是单纯的关系型数据库,都需要将地理属性映射进关系模型当中。而这方面比较成熟的方案是基于空间填充曲线来管理数据,这样的办法可以粗略地保证空间上距离较近的两个点,在映射成的一维曲线上距离也比较近。而GeoMesa使用的同样是这种方式来处理地理信息(PointLineStringPolygon等等)。而在本章主要讨论的是LineStringPolygon这种比较复杂的地理属性。

第二节   以往的两种解决方式

从图中可以看出,如果延续Z曲线的逻辑,我们就会遇到这样的问题,一个Polygon会占到多个网格。一个最简单的方法就是把这个Polygon占用的所有网格都存储在数据库当中。但是这样的方案很明显会占用很多个格子,而且需要存储的数据过多,造成数据冗余。

而另一种方式就可以将数据转化成为一个近似的value来进行存储了。主要的方法就是首先将地理空间进行四分,如果有一个四分块被实体切割,就进行递归,继续切分下去;如果超过一个四分块被实体切割,就停止。使用四分序列来存储那个点。这种方式的优势非常明显,就是能够将地理信息转化成为一个单一的值。但是它的缺陷也是非常突出的:首先,这些唯一值的长度是不统一的。如果采用这样的方式,那我们存储在数据库当中的key值就会千变万化。除此以外,对于一些比较小的地理实体,它会产生非常大的近似偏差。事实上,如果用户采用这种方式,这个最终的值越短,产生的偏差就越大,而且这样产生的索引,在查询过程当中的命中率也就越差。

       为了解决数据冗余的问题,Orenstein提出了两个概念来控制四分序列码的长度,长度边界(size-bound)和错误边界(error-bound)。具体的实现方式在后文中会有详述。

为了优化这些算法,我们还可以在空间填充曲线上下功夫。除了Z曲线,还有其他几种曲线来进行选择。其中,通常来说最优的曲线是Hilbert曲线,这样的数据突变是最平缓的,但是通常用的最多的是Z曲线,因为这样计算起来更加简便,而且算法也更容易实现。

第三节   Geomesa中的方案

通过前文可以知道,映射空间实体需要解决的问题如下:如何避免数据冗余以及如何解决四分序列长度多样化的问题。为此,有如下三个需要解决的问题:将元素和网格进行叠加;使用一种复杂的编码结构;在查询过程当中引入一个为间隔设计的有效的算法。

1.   将元素和网格进行叠加

设定 s 是一个区域的四分序列,而且|s|是它的长度。例如右上方区域的宽度和高度就是0.5|s|-1

引理1:四分编码序列的最小长度和最大长度:

现有一个地理实体四分序列串s的长度|s|,它的高度h和宽度w的范围可以限定为:

其中:

引理2:近似错误的最大值:

现有的近似值错误不是无上限的。在论文当中,通过作者的证明,最大的近似错误不会超过15。尽管15这个数字也非常大,不过现在这个方式使得近似错误能够可控了。

2.   为四分序列编码

引理3:在R(s)区域当中的网格数量和元素数量。

设定网格的分解等级为g,四分序列s的长度|s|=l,一个地理元素里面网格的数量为:

相应的元素数量为:

一个四分序列s = <q0q1…qi…ql-1>的序列化编码C(s)对应

引理4:编码序列的顺序存储

将不太相等的编码序列顺序与象限编码顺序相对应

引理5:编码序列的最小化。

不存在从象限序列集到自然数集的映射,其需要比自然数更小的自然数的间隔C(s)

第四节   查询过程

1.   插入和删除

2.   从窗口查询到时间间隔设置

3.   一个有效的间隔算法

 

第四章   Z2索引

第三节   Z2索引概述

对于通常的空间查询,最主要的数据是经度和纬度的数据。而如果直接将这两个数据分为两个字段存储在数据库当中的话,就可能剥离点的位置信息,尤其是在计算点的距离等场景下,这种存储方式可能会出现很大的系统开销。为了能够将空间点的位置信息也能够反映在存储的数据当中,就需要将经度和纬度两个数据尽可能封装成为一个数据。这样就出现了使用空间填充曲线来解决这个问题的算法——GeoHash算法。

第四节   Z2索引的原理

1.   概述

Z2索引当中,为了实现空间索引,其内部可以分为如下几个机制:

1)     分片存储机制

2)     空间索引机制

3)     Fid机制

4)     多个数据的封装

2.   分片存储机制

Z2索引当中,针对的主要是空间数据,尤其是由经度、纬度确定的空间点数据。但是在实际情况当中,空间点数据往往是不均匀的。例如:在北京市的人口密度分布图中可以看出,在市中心的地区,人口密度比较高,相应的关于人的数据会更多一些,而远离市区的地方,人口密度比较低,相应的数据就会少很多。类似的情况也发生在很多其他需要空间数据作为支撑的场景当中。

而过度的数据不平衡容易引发数据倾斜问题,尤其是对于Hbase这种基于列的分布式数据存储系统,数据倾斜问题可能会引发局部热点数据,进而引发服务器的连环宕机的问题。为了解决这个问题。GeoMesa采用了分片机制,在存储数据之前,会将经过GeoHash算法处理过后的空间索引前面添加一个负责分片的字节数据,这样就能够平衡各部分的负担。

3.   空间索引机制

在构建空间索引的过程当中,采用的依然是geoHash算法。首先,geomesa对经纬度的数据进行极值标准化操作,将经纬度的数据由Double类型转化为Int类型,然后对这两个数据进行合并操作,形成一个long类型的数据,最后与之前产生的数据封装成一个字节数组,作为rowkey存储数据系统当中。

4.   多个数据的封装

最后一步就是对于所有数据进行封装。

第五节   Z2索引的代码实现

建立索引的过程其实都是整合在写流程当中的,整体的写流程见源码篇(写流程)。这一节主要选取跟建立索引相关的部分进行介绍。建立索引的方法入口在org.locationtech.geomesa.index.index.BaseFeatureIndex特性当中的writer方法当中。

1.     override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {  

2.         val sharing = sft.getTableSharingBytes  

3.         val shards = shardStrategy(sft)  

4.         val toIndexKey = keySpace.toIndexKeyBytes(sft)  

5.         mutator(sharing, shards, toIndexKey, createInsert)  

6.     }  

第二行当中的sharingtable sharing机制有关。第三行中的shards参数就是选取的分片位置,第四行当中的toIndexKey主要是对于时空索引的数据,第五行当中的mutator方法包含了对分片信息、时空索引、每一行的value等数据的封装。

1.   获取分片

writer方法当中的sharing机制是跟accumulo本身的机制密切相关的,hbase没有类似的机制,因此如果使用Hbase DataStore,这个参数返回一个空的byte数组。

1.        val sharing = sft.getTableSharingBytes  

具体调用的方法如下:

1.     def getTableSharingBytes: Array[Byte] = if (sft.isTableSharing) {  

2.           sft.getTableSharingPrefix.getBytes(StandardCharsets.UTF_8)  

3.         } else {  

4.           Array.empty[Byte]  

5.     }  

writer方法当中的的shards参数表示的是选取的分片标号。

1.        val shards = shardStrategy(sft)  

之后的方法调用会根据索引类型的不同调用相应的方法。在此处,针对Z3索引机制,geomesa调用的是org.locationtech.geomesa.index.index.z3.Z3Index中的shardStrategy方法,在这里封装了一个SimpleFeatureType对象,返回一个ShardStrategy,也就是封装了分片策略的对象。

1.     override protected def shardStrategy(sft: SimpleFeatureType): ShardStrategy = ZShardStrategy(sft)  

接下来调用ZshardStrategyapply方法,获取具体的shard值:

1.     object ZShardStrategy {  

2.         def apply(sft: SimpleFeatureType): ShardStrategy = ShardStrategy(sft.getZShards)  

3.       }  

Z_SPLITS_KEY里面封装了切片的相关属性,例如在示例当中,这个值其实是“geomesa.z.splits”字符串,最后的4指的是切片的总数。用户可以通过调节这里的参数,可以根据自己的业务场景从Java API当中直接进行设置,也可以在源码级别进行修改。

1.     def getZShards: Int = userData[String](Z_SPLITS_KEY).map(_.toInt).getOrElse(4)  

之后回到shardStrategy方法以后会调用org.locationtech.geomesa.index.index.ShardStrategy伴生对象当中的apply方法,在这个方法当中,会传入前面设定的分片数量。此时底层会对这个分片数量进行一个判断,如果分片数少于2,这个方法就会返回一个空的Array,也就是会使这个分片机制失效,如果count大于等于2geomesa才会执行接下来的分配分片的操作。

1.     def apply(count: Int): ShardStrategy = {  

2.         if (count < 2) { NoShardStrategy } else {  

3.           var strategy = instances.get(count)  

4.           if (strategy == null) {  

5.             strategy = new ShardStrategyImpl(SplitArrays(count))  

6.             instances.put(count, strategy)  

7.           }  

8.           strategy  

9.         }  

10.     }  

在程序执行到第5行时,程序会进一步进入org.locationtech.geomesa.index.utils.SplitArrays类的伴生对象,调用其apply方法,进行进一步的切片操作。在这个方法中有多个参数,其中splitArraysMap参数本质上是一个concurrentHashMap,因此这个操作也是具有一定的并发能力的。

1.     def apply(numSplits: Int): IndexedSeq[Array[Byte]] = {  

2.         if (numSplits < 2) { EmptySplits } else {  

3.           var splits = splitArraysMap.get(numSplits)  

4.           if (splits == null) {  

5.             splits = (0 until numSplits).map(_.toByte).toArray.map(Array(_)).toIndexedSeq  

6.             splitArraysMap.put(numSplits, splits)  

7.           }  

8.           splits  

9.         }  

10.     }  

最终返回给writer方法的其实是一个Array[byte],里面对于分片机制进行了预处理。

2.   获取空间索引

首先获取空间索引的入口是org.locationtech.geomesa.index.index.z2.Z2IndexKeySpace类中的getZValueBytes方法,方法内容如下:

1.     private def getZValueBytes(geomIndex: Int,  

2.                                lenient: Boolean)  

3.                               (prefix: Seq[Array[Byte]],  

4.                                feature: SimpleFeature,  

5.                                suffix: Array[Byte]): Seq[Array[Byte]] = {  

6.       val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]  

7.       if (geom == null) {  

8.         throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")  

9.       }  

10.     

11.     val z = try { sfc.index(geom.getX, geom.getY, lenient).z } catch {  

12.       case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry: $geom", e)  

13.     }  

14.     

15.     // 创建一个字节数组用来封装所有的数据  

16.     val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 8 + suffix.length)  

17.     var i = 0  

18.     prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

19.     ByteArrays.writeLong(z, bytes, i)  

20.     System.arraycopy(suffix, 0, bytes, i + 8, suffix.length)  

21.     Seq(bytes)  

22.   }  

这个方法内部封装了很多内容,大体可以分为两个部分,首先是获取空间索引的部分(第6行到13行),接着是将多个数据封装成为字节数组(第15行到第21行),这个过程在下一节介绍,本节重点介绍获取空间索引的部分。

负责构建空间索引的是org.locationtech.geomesa.curve.Z2SFC类中的index方法,其内部实现如下:

1.     override def index(x: Double, y: Double, lenient: Boolean = false): Z2 = {  

2.       try {  

3.         require(x >= lon.min && x <= lon.max && y >= lat.min && y <= lat.max,  

4.           s"Value(s) out of bounds ([${lon.min},${lon.max}], [${lat.min},${lat.max}]): $x, $y")  

5.         Z2(lon.normalize(x), lat.normalize(y))  

6.       } catch {  

7.         case _: IllegalArgumentException if lenient => lenientIndex(x, y)  

8.       }  

9.     }  

在这个方法内部,首先系统对于传入的数据进行了一个判断,通过require的内容可以看出,这里需要传入的经纬度数据进行合法性的判断。如果判断通过,就会进入空间索引的核心部分。

首先,经度和纬度数据会经过一个极值标准化的过程,这个操作出现在org.locationtech.geomesa.curve.NormalizedDimension.BitNormalizedDimension类的normalize方法当中。

1.     override def normalize(x: Double): Int =  

2.       if (x >= max) { maxIndex } else { math.floor((x - min) * normalizer).toInt }  

经过这个极值标准化的过程,经度纬度的数据就会转化为一个Int类型的数据,回传给index方法。这两个转化完成的数据接下来就会作为参数,参与构建Z2的一个实例,这个过程当中,会调用org.locationtech.sfcurve.zorder.Z2类的apply方法。

1.     def apply(x: Int, y:  Int): Z2 = new Z2(split(x) | split(y) << 1)  

在这个方法内部,可以看出,两个数据又会作为参数传入split方法当中,这个split方法就是org.locationtech.sfcurve.zorder.Z2类中的split方法。这个split方法的主要作用就是对数据进行离散化处理,为下一步经纬度的数据相互穿插做准备。

1.     override def split(value: Long): Long = {  

2.       var x: Long = value & MaxMask  

3.       x = (x ^ (x << 32)) & 0x00000000ffffffffL  

4.       x = (x ^ (x << 16)) & 0x0000ffff0000ffffL  

5.       x = (x ^ (x <<  8)) & 0x00ff00ff00ff00ffL  

6.       x = (x ^ (x <<  4)) & 0x0f0f0f0f0f0f0f0fL   

7.       x = (x ^ (x <<  2)) & 0x3333333333333333L  

8.       x = (x ^ (x <<  1)) & 0x5555555555555555L 

9.       x  

10.   }  

最终,两个数据在离散化处理之后,在apply方法中,通过移位运算和“|”位运算,就整合成为了一个数据,这就是Z2索引当中最核心的空间索引部分。

3.   获取Fid

获取Fid的方法与写流程当中的过程比较类似。同样是调用从UserData当中取出Hints.PROVIDED_FID,如果用户没有设定id,就会随机生成一个uuid作为存储时用到的id

1.     override def createId(sft: SimpleFeatureType, sf: SimpleFeature): String = {  

2.         if (sft.getGeometryDescriptor == null) {  

3.           // no geometry in this feature type - just use a random UUID  

4.           UUID.randomUUID().toString  

5.         } else {  

6.           Z3UuidGenerator.createUuid(sft, sf).toString  

7.         }  

8.       }  

除此以外,为了使其能够整合进rowKey当中,还需要对这个id值进行序列化的操作。这个操作是在wrapper当中实现的,在第3行的代码当中。

1.     def wrapper(sft: SimpleFeatureType): (SimpleFeature) => HBaseFeature = {  

2.         val serializers = HBaseColumnGroups.serializers(sft)  

3.         val idSerializer = GeoMesaFeatureIndex.idToBytes(sft)  

4.         (feature) => new HBaseFeature(feature, serializers, idSerializer)  

5.       }  

进入idToBytes方法内部,可以看出geomesa此时还会对id进行一个判断。根据进一步的追踪可以知道,如果是此处的iduuid,会使用其本身的序列化机制;如果此处的id不是uuid,那么它的序列化是按照UTF-8的编码规则来操作的,在此不再赘述了。

1.     def idToBytes(sft: SimpleFeatureType): String => Array[Byte] =  

2.         if (sft.isUuidEncoded) { uuidToBytes } else { stringToBytes }  

4.   多个数据的封装

最终实现多个数据封装的过程依然是在org.locationtech.geomesa.index.index.z2.Z2IndexKeySpace类中的getZValueBytes方法的后半段。

1.       // 创建一个字节数组用来封装所有的数据  

2.       val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 8 + suffix.length)  

3.       var i = 0  

4.       prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

5.       ByteArrays.writeLong(z, bytes, i)  

6.       System.arraycopy(suffix, 0, bytes, i + 8, suffix.length)  

7.       Seq(bytes)  

首先在创建字节数组时,是根据前缀、空间索引和后缀的长度来进行创建的,其中前缀里面主要封装了shard的数据,占了一字节的位置,然后的8位就是空间索引占用的长度,后缀来自于FeatureId经过序列化的结果,所以其长度取决于FeatureId的序列化结果的长度。由于FeatureId的长度不确定,因此字节数组整体的长度也是不确定的。

在第5行中,空间索引经过处理以后插入到了字节数组当中。此时,系统调用了org.locationtech.geomesa.utils.index.ByteArrays类中的writeLong方法,可以看出,在这个方法当中,geomesa对于64位的空间索引按照8位一组,分割成了8个字节。

1.     def writeLong(long: Long, bytes: Array[Byte], offset: Int = 0): Unit = {  

2.       bytes(offset    ) = ((long >> 56) & 0xff).asInstanceOf[Byte]  

3.       bytes(offset + 1) = ((long >> 48) & 0xff).asInstanceOf[Byte]  

4.       bytes(offset + 2) = ((long >> 40) & 0xff).asInstanceOf[Byte]  

5.       bytes(offset + 3) = ((long >> 32) & 0xff).asInstanceOf[Byte]  

6.       bytes(offset + 4) = ((long >> 24) & 0xff).asInstanceOf[Byte]  

7.       bytes(offset + 5) = ((long >> 16) & 0xff).asInstanceOf[Byte]  

8.       bytes(offset + 6) = ((long >> 8)  & 0xff).asInstanceOf[Byte]  

9.       bytes(offset + 7) =  (long        & 0xff).asInstanceOf[Byte]  

10.   }  

最后,这些数据经过封装,形成了字节数组,返回给geomesa,作为rowkey进行下一步的操作。

第五章   Z3索引

第一节   Z3索引概述

在大量的场景当中,我们不仅仅需要进行时间的查询、空间的查询,也需要对时间和空间的组合进行查询。在以往的机制当中,往往使用的是多级过滤或者多级索引的机制,这样的话很多时候无法对数据进行精确定位,会产生一些冗余的过程。Z3索引就是为了解决这个问题而产生的。

Z3索引当中,我们将时间和空间的数据进行类似GeoHash算法的分片和穿插,最后形成一个能够承担索引任务的数据,最后将这个数据进行一系列的封装,以适应具体的业务场景。

第二节   Z3索引的原理

1.   概述

Z3索引当中,为了解决将时间空间数据进行综合索引的功能,其内部可以分为如下几个机制:

5)     分片存储机制

6)     Epoch Week机制

7)     时空索引机制

8)     Fid机制

9)     多个数据的封装

2.   分片存储机制

由于我们存储的数据主要针对的是时间数据和空间数据。对时间数据来说,每天的数据量随着时间的分布是不均匀的。例如:在2016年、2017年、2018年三年的大年初一的沪昆高速上车流量的统计图中,可以看出,在不同时段、不同年份的车流量数据都是分布不均的,这样也会导致这些车辆产生的数据量会有所不同。这样如果直接将数据存入,很可能会在Hbase里面产生数据倾斜的问题。

为了解决这个问题,geomesa当中会在key值的第一个字节存储一个shard值,这个值主要是通过分片的方式,将这些数据随机分配到不同的shard当中,这样就可以尽量避免数据倾斜的问题。

[l1] 

 

3.   Epoch Week机制

在前文当中已经描述了关于空间方面的geohash算法。而对于时间数据来说,这样就很难直接通过二分的方式来进行划分和编码了。因为大地坐标系当中,经纬度是有明确的范围的,经度的范围就是从东经180°到西经180°,维度的范围就是从南纬90°到北纬90°。

但是对于时间来说,这个量度是无限的,因此为了给时间的编码提供一种相对明确的范围,geomesa当中采用了Epoch机制。这种机制就是以1970110点的时间作为起点,将feature当中的时间与这个起点的距离包含多少个星期作为Epoch Week。在每一个week内部,可以进行类似于geohash算法当中的二分法,最终形成一个可以标定时间的唯一数据[l2] 

 

4.   时空索引机制

Z2索引以及geohash算法当中,可以看出,我们利用到了一种四叉树的索引结构,首先将限定范围的空间进行横向和纵向的二分,当分出来的每一个小区域满足精度需求的时候,就可以利用空间填充曲线将这些小区域进行串联,最终实现二维空间向一维数据的降维。

Z3索引当中,同样通过类似的方法,但是前提是在经过上一步中对于Epoch week的限定,这样就可以确定时间数据的范围。而在下一步的操作中,同样是对经度、维度、时间数据进行极值标准化,然后在二进制当中进行穿插,最终形成的数据就是我们需要的时空索引。

这种索引方式利用的是八叉树的结构,同样也能够实现数据的唯一对应,但是也存在geohash算法当中的突变问题。另一方面,geomesa采用的依然是Z-order填充曲线,因此这个矛盾就会显得尤为突出了。

    

上述的这些索引过程可以分为一下几个步骤:

1)     首先对于经度、维度、时间数据进行极值标准化的操作,将这些数据进行类型的统一。

2)     在类型统一过后,将三个结果值当中没两位之间插入两个0,这样就会将这些数值的位数扩充为原来的3倍,而插入的0就是其他维度数据插入的位置。

3)     将扩充位数之后的数据进行适当移位,然后做位运算,得出一个整合了多个数据的数值

4)     将这个数值按照一定的位数规则,转化为字节数组,以便于进一步转化为索引字符串。

5.   Fid机制

在具体数据的设定当中,每一行feature在插入时都需要一个id作为标识符,这个标识符可以是用户设定好的,如果用户没有进行设定,geomesa底层会给这个feature随机生成一个uuid值作为标识featureid

6.   多个数据的封装

最终的封装过程比较简单,首先为了最大程度的实现rowkeyhbase当中的散列,geomesashard值放在了rowkey的开头,最大程度避免数据倾斜的问题。

接下来是Epoch Week,这个值相对于时空数据来说,变化是相对均匀的,可以实现rowkey排列的时序性相对较强。但是这样也会导致一个现象,当利用geomesa进行取数据的时候,从同一个week里的数据相对来说查询速度较快,而不同week里的数据查询速度相对较慢。

接下来是时空索引和Fid,这部分的数据是最可能出现数据倾斜的部分,将这些数据放在最后也是为了尽可能避免这个问题的发生。

第三节   Z3索引的代码实现

建立索引的过程其实都是整合在写流程当中的,整体的写流程见源码篇(写流程)。这一节主要选取跟建立索引相关的部分进行介绍。建立索引的方法入口在org.locationtech.geomesa.index.index.BaseFeatureIndex特性当中的writer方法当中。

7.     override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {  

8.         val sharing = sft.getTableSharingBytes  

9.         val shards = shardStrategy(sft)  

10.       val toIndexKey = keySpace.toIndexKeyBytes(sft)  

11.       mutator(sharing, shards, toIndexKey, createInsert)  

12.   }  

第二行当中的sharingtable sharing机制有关。第三行中的shards参数就是选取的分片位置,第四行当中的toIndexKey主要是对于时空索引的数据,第五行当中的mutator方法包含了对分片信息、时空索引、每一行的value等数据的封装。

1.   获取分片

获取分片的机制与Z2索引的过程相同,在此不再赘述。

2.   获取Epoch Week

在前述的writer方法当中,进一步执行到toIndexKey参数。

1.        val toIndexKey = keySpace.toIndexKeyBytes(sft)  

在这里调用到了org.locationtech.geomesa.index.index.z3.Z3IndexKeySpace类的伴生对象当中的toIndexKeyBytes方法。在这个方法当中,封装了很多信息。第2到第4行只是在获取Epoch Week之前的数据预处理过程,其中z3参数封装了空间填充曲线的相关信息,geomIndex参数表示了地理元素的索引元数据,dtgIndex参数表示了时间元素的索引元数据,真正获取Epoch Week的是timeToIndex参数。最后由getZValueBytes方法将上述这些参数进行封装。

1.     override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {  

2.         val z3 = sfc(sft.getZ3Interval)  

3.         val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)  

4.         val dtgIndex = sft.getDtgIndex.getOrElse(throw new IllegalStateException("Z3 index requires a valid date"))  

5.         val timeToIndex = BinnedTime.timeToBinnedTime(sft.getZ3Interval)  

6.       

7.         getZValueBytes(z3, geomIndex, dtgIndex, timeToIndex, lenient)  

8.       }  

首先关于Z3参数,内部首先调用了org.locationtech.geomesa.utils.geotools.Conversions类当中的getZ3Interval方法,这里会对时间分割的单位进行设置,默认情况下是使用“week”作为时间分割的基本单位,不过为了适应更多的应用场景,用户在具体使用时也可以选择“day”、“month”、“year”作为时间分割的基本单位。和之前的“geomesa.z.splits”相类似,用户也可以在Java API当中设定“geomesa.z3.initerval”来对这个参数进行定义。

1.     def getZ3Interval: TimePeriod = userData[String](Z3_INTERVAL_KEY) match {  

2.           case None    => TimePeriod.Week  

3.           case Some(i) => TimePeriod.withName(i.toLowerCase)  

4.         }  

最后利用空间填充曲线的类对于这个时间间隔数据进行匹配和封装,在此处其实本质上调用的是Z3SFC类,最后返回的是一个Z3SFC对象,里面包含了经度、纬度、时间以及whole Period四个对象,其中经度(NomarlizedLon)、纬度(NormalizedLat)、时间(NormalizedTime)三个参数都各自封装了如下数据。

参数

含义

Precision

精确度(默认是21

Min

最小值

Max

最大值

Bins

二进制的值(默认是2097152,即21位二进制数的最大值)

Normalizer

极值标准化器(由Double转为Int

Denormalizer

反极值标准化器(由Int转为Double

maxIndex

索引的最大值(默认是2097152,即21位二进制数的最大值)

 

接下来的geomIndex参数主要是获取了一些地理空间参考相关的信息。

1.        val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)  

这个参数的数据来源于org.geotools.feature.type.FeatureTypeImpl类的getGeometryDescriptor方法,这个方法就封装了地理学相关的信息。

1.     public GeometryDescriptor getGeometryDescriptor() {  

2.             if (defaultGeometry == null) {  

3.                 for (PropertyDescriptor property : getDescriptors()) {  

4.                     if (property instanceof GeometryDescriptor) {  

5.                         defaultGeometry = (GeometryDescriptor) property;  

6.                         break;  

7.                     }  

8.                 }  

9.             }  

10.           return defaultGeometry;  

11.       }  

例如示例demo当中,这个参数就封装了一个CRS变量,这个变量跟一些地理学方面的知识有关,在此不再详述,这个变量内部的内容如下:

1.     GEOGCS["WGS 84",   

2.       DATUM["World Geodetic System 1984",   

3.         SPHEROID["WGS 84"6378137.0298.257223563, AUTHORITY["EPSG","7030"]],   

4.         AUTHORITY["EPSG","6326"]],   

5.       PRIMEM["Greenwich"0.0, AUTHORITY["EPSG","8901"]],   

6.       UNIT["degree"0.017453292519943295],   

7.       AXIS["Geodetic longitude", EAST],   

8.       AXIS["Geodetic latitude", NORTH],   

9.       AUTHORITY["EPSG","4326"]]  

而最终返回到geomIndex参数的数值则是将上述数据进行索引之后的一个数值。

接下来对于时间信息相关的dtgIndex,则调用了Conversions类当中的getDtgFeild方法。在这里,geomesa会对之前用户设定的默认日期值进行获取,最终获得一个类似于geomIndex的与时间相关的数值,返回给dtgIndex参数。

1.     def getDtgField: Option[String] = userData[String](DEFAULT_DATE_KEY)  

2.     def getDtgIndex: Option[Int] = getDtgField.map(sft.indexOf).filter(_ != -1)  

执行到timeToIndex参数,在此获取了设定的时间的分段间隔(Interval)。

1.        val timeToIndex = BinnedTime.timeToBinnedTime(sft.getZ3Interval)  

由于时间需要转换为Int类型,因此需要设定一个最小的时间单位。根据不同的分段间隔,geomesa选取了不同的时间单位。例如,如果大的分段间隔选取为Week,那么相对应的时间单位就是秒。具体的实现方法就是BinnedTime类当中的timeToBinnedTime方法内。

1.     def timeToBinnedTime(period: TimePeriod): TimeToBinnedTime = {  

2.         period match {  

3.           case TimePeriod.Day   => toDayAndMillis  

4.           case TimePeriod.Week  => toWeekAndSeconds  

5.           case TimePeriod.Month => toMonthAndSeconds  

6.           case TimePeriod.Year  => toYearAndMinutes  

7.         }  

8.       }  

最终封装这些参数的是getZValueBytes方法,这个方法最终会将这些参数转换为byte数组,执行下一步的操作。

1.        getZValueBytes(z3, geomIndex, dtgIndex, timeToIndex, lenient)  

3.   获取时空索引

上述的getZValueBytes方法内容如下,内部的操作比较多,主要包含两个部分:参数准备(第9行到第20行)、封装数据(第22行到29行),最后将各种数据封装成为一个byte数组,返回给系统作为rowkey,进行下一步与数据库的交互。

1.     private def getZValueBytes(z3: Z3SFC,  

2.                                  geomIndex: Int,  

3.                                  dtgIndex: Int,  

4.                                  timeToIndex: TimeToBinnedTime,  

5.                                  lenient: Boolean)  

6.                                 (prefix: Seq[Array[Byte]],  

7.                                  feature: SimpleFeature,  

8.                                  suffix: Array[Byte]): Seq[Array[Byte]] = {  

9.         val geom = feature.getAttribute(geomIndex).asInstanceOf[Point]  

10.       if (geom == null) {  

11.         throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")  

12.       }  

13.       val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]  

14.       val time = if (dtg == null) { 0 } else { dtg.getTime }  

15.       val BinnedTime(b, t) = timeToIndex(time)  

16.    

17.      // 创建Z填充曲线

18.      val z = try { z3.index(geom.getX, geom.getY, t, lenient).z } catch {  

19.        case NonFatal(e) => throw new IllegalArgumentException(s"Invalid z value from geometry/time: $geom,$dtg", e)  

20.      }  

21.     

22.       // 创建一个byte数组,将所有的信息都封装起来  

23.       val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)  

24.       var i = 0  

25.       prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

26.       ByteArrays.writeShort(b, bytes, i)  

27.       ByteArrays.writeLong(z, bytes, i + 2)  

28.       System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)  

29.       Seq(bytes)  

30.     }  

这一节我们重点介绍时空索引的生成机制,在这个方法里,对应的是第17行到19行代码。参数z就是我们最终需要的封装了时空数据的时空索引。而产生这个索引需要穿入三个参数,即经度(Double)、维度(Double)、时间(Long)的数据,调用org.locationtech.geomesa.curve.Z3SFC类当中的index方法,方法内容如下:

1.     override def index(x: Double, y: Double, t: Long, lenient: Boolean = false): Z3 = {  

2.         try {  

3.           require(x >= lon.min && x <= lon.max && y >= lat.min && y <= lat.max && t >= time.min && t <= time.max,  

4.             s"Value(s) out of bounds ([${lon.min},${lon.max}], [${lat.min},${lat.max}], [${time.min},${time.max}]): $x, $y, $t")  

5.           Z3(lon.normalize(x), lat.normalize(y), time.normalize(t))  

6.         } catch {  

7.           case _: IllegalArgumentException if lenient => lenientIndex(x, y, t)  

8.         }  

9.       }  

其中最重要的是第五行,创建了Z3对象,并将经纬度、时间的数据经过极值标准化操作后的数据进行封装,返回给getZValueBytes方法。

此处的极值标准化操作主要是通过org.locationtech.geomesa.curve.NormalizedDimension.BitNormalizedDimension类当中的normalize方法实现的,主要的目的是将经度、维度、时间都进行二分操作转换成为Int类型的数据,以利于下一步的整合操作。

1.     override def normalize(x: Double): Int =  

2.           if (x >= max) { maxIndex } else { math.floor((x - min) * normalizer).toInt }  

Z3对象传入参数之后,接着就会调用其apply方法。在这个方法内部,系统对于这三个数据进行了第二次的封装。

1.     def apply(x: Int, y:  Int, z: Int): Z3 = {  

2.         new Z3(split(x) | split(y) << 1 | split(z) << 2)  

3.       }  

在这个过程当中,每一个数据又经过了split方法的处理,其源码如下,实现的功能是在原本数据的二进制每一位的后面添加两个0,将原来数据的二进制位数扩大为原来的三倍。

例如,15的二进制形式是1111,经过这个方法处理以后,这个数据就会变成100100100100

1.     override def split(value: Long): Long = {  

2.         var x = value & MaxMask  

3.         x = (x | x << 32) & 0x1f00000000ffffL  

4.         x = (x | x << 16) & 0x1f0000ff0000ffL  

5.         x = (x | x << 8)  & 0x100f00f00f00f00fL  

6.         x = (x | x << 4)  & 0x10c30c30c30c30c3L  

7.         (x | x << 2)      & 0x1249249249249249L  

8.       } 

经过处理以后的三个数据在apply方法当中经过移位以及位运算,最终整合成为一个64位的数据,其中就将经度、纬度、时间的数据整合成为一个时空索引了,其每一位的数据可以看做“tyxtyx…”

但是这个时空索引是空间优先的,虽然在apply当中,时间对应的数据左移了两位,维度对应的数据左移了一位,在逻辑上时间对应的应该是第一位的。但是实际上在实际操作当中,时间所形成的数据长度会比经纬度形成的数据长度少一位,在第一位,时间往往填不满。因此这个索引依然是空间优先的,基于时间的检索和查询性能可能会受到一些影响。

4.   获取Fid

这个过程与Z2索引当中相应的过程是相同的,在此不再赘述。

5.   多个数据的封装

接下来继续分析getZValueBytes方法,在这个方法的最后几行当中,geomesa创建了一个byte数组,从第11行可以看出这个byte数组的容量是根据传入的参数来确定的,因此这个数组的容量也是不确定的。

1.     private def getZValueBytes(z3: Z3SFC,  

2.                                  geomIndex: Int,  

3.                                  dtgIndex: Int,  

4.                                  timeToIndex: TimeToBinnedTime,  

5.                                  lenient: Boolean)  

6.                                 (prefix: Seq[Array[Byte]],  

7.                                  feature: SimpleFeature,  

8.                                  suffix: Array[Byte]): Seq[Array[Byte]] = {  

9.         …  

10.       // 创建一个byte数组,将所有的信息都封装起来  

11.       val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)  

12.       var i = 0  

13.       prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

14.       ByteArrays.writeShort(b, bytes, i)  

15.       ByteArrays.writeLong(z, bytes, i + 2)  

16.       System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)  

17.       Seq(bytes)  

18.     }  

决定这个容量的参数主要有两个,其中prefix里面封装了数据分配到的分片标号,因为geomesa对于分片数量的默认设置是4,因此这个数据占用的位数是相对比较确定的,而suffix里面则封装了FeatureId的序列化结果,这个序列化方式利用的是UTF-8编码形式,但是这个参数的容量是会有变化的,因此用户在设计的时候需要注意FeatureId的长度,以免超出数据库对于key值长度的要求。

剩下的那个10个位置主要包含了两方面的内容,其中2个位置是分配给Epoch Week的,主要记录了该条记录与1970年历元时间的间隔里面包含了多少周。这个数据本来是Short类型,在第14行,geomesa将这个数据转换成了两个byte类型的数据。在这里调用了org.locationtech.geomesa.utils.index.ByteArrays类当中的writeShort方法,具体实现如下:

1.     def writeShort(short: Short, bytes: Array[Byte], offset: Int = 0): Unit = {  

2.         bytes(offset) = (short >> 8).asInstanceOf[Byte]  

3.         bytes(offset + 1) = short.asInstanceOf[Byte]  

4.       }  

在这里我们可以看到一个现象,在这个过程当中,geomesa仅仅是将short类型数据进行了八位和八位的切分,直接按照顺序转换成了byte类型的数据。例如,在提供的案例demo中,这里的short值其实是1987,转换为二进制就是0000 0111 1100 0011,如果直接利用geomesa的机制来转换就得到了0000 01111100 0011,转换为两个byte就是7-61。这样的话虽然保留了原有数据的顺序,但是没有很好地起到散列的作用,因为前8位因为位数比较高,而后8位明显会出现更大的随机性,如果将这两个byte数据交换顺序,这样的话,就能够实现数据的散列,更容易避免数据倾斜的问题。

其实类似的做法在HashMap的源码当中也可以看到,在HashMap底层,它会对传入的key值进行哈西操作,这样是为了让这个key值更加离散,防止过多的数据存入到一个entry当中。为了提升这种离散的效果,HashMap会对key值进行二次Hash,在这个过程当中,除了进行hash算法的操作,它还会将key的二进制数据的前半部分和后半部分进行交换,因为后半部分会有更大的随机性。这样的话就能够实现数据的离散化。

剩下8位的数据则是时空索引的位置。根据前文所述,时空索引的数据长度总共有64位,而一个字节总共有8位,因此如果转化为byte数组,就需要占用8个位置。具体实现在BytesArray类当中的writeLong方法中,可以看出,这个方法就是将时空索引通过位运算转化为byte类型的数据,然后再将这些数据存放进之前创建的byte数组当中。

1.     def writeLong(long: Long, bytes: Array[Byte], offset: Int = 0): Unit = {  

2.         bytes(offset    ) = ((long >> 56) & 0xff).asInstanceOf[Byte]  

3.         bytes(offset + 1) = ((long >> 48) & 0xff).asInstanceOf[Byte]  

4.         bytes(offset + 2) = ((long >> 40) & 0xff).asInstanceOf[Byte]  

5.         bytes(offset + 3) = ((long >> 32) & 0xff).asInstanceOf[Byte]  

6.         bytes(offset + 4) = ((long >> 24) & 0xff).asInstanceOf[Byte]  

7.         bytes(offset + 5) = ((long >> 16) & 0xff).asInstanceOf[Byte]  

8.         bytes(offset + 6) = ((long >> 8)  & 0xff).asInstanceOf[Byte]  

9.         bytes(offset + 7) =  (long        & 0xff).asInstanceOf[Byte]  

10.     }  

最终经过getZValueBytes方法以后,相关的数据就都封装进这个byte数组当中了,接下来geomesa就会依次为Key来创建跟数据库交互的请求,例如在于Hbase的交互当中,这个byte数组会作为rowKey封装进一个Put对象当中,最终将相关的数据存储进Hbase当中。而在Hbase当中显示出来的rowKey是一个长字符串,这个过程是通过org.apache.hadoop.hbase.util.Bytes类当中的toStringBinary方法。具体的测试代码如下:

1.     public static void main(String[] args) {  

2.             byte[] source = {27, -6158, -66, -122113, -2, -273, -114, -23, -69, -1116532836550515250};  

3.             String a = Bytes.toStringBinary(source);  

4.             System.out.println(a);  

5.         }  

测试结果如下,这个就是我们真正在Hbase当中看到的rowKey

1.     \x02\x07\xC3:\xBE\x86q\xFE\xE5\x03\x8E\xE9\xBB\x91A SA2342  

2.     Process finished with exit code 0  

第六章   XZ2索引

第一节   XZ2索引概述

第二节   XZ2索引的原理

1     数据预处理

2     获取几何体的空间范围(envelope

3     获取几何体的时空索引

4     将数据进行封装并进行序列化

第三节   XZ2索引的代码实现

XZ2索引的入口同样是org.locationtech.geomesa.index.index.BaseFeatureIndex类的writer方法,这个是与Z2索引相同的,但是在具体的方法调用上,就不会调用Z2的方法了,而是调用XZ2相关的方法。

1.     override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {  

2.         val sharing = sft.getTableSharingBytes  

3.         val shards = shardStrategy(sft)  

4.         val toIndexKey = keySpace.toIndexKeyBytes(sft)  

5.         mutator(sharing, shards, toIndexKey, createInsert)  

6.       }  

1     数据预处理

由于不存在时间维度,所以对于XZ2索引来说,数据预处理的过程相对来说比较简单。

数据预处理主要包含了sharing(主要针对DataStoreAccumulo的交互操作)、shard数值的生成,其中shard是分片的数值。这是一部分的需要预处理的数据。

1.         val sharing = sft.getTableSharingBytes  

2.         val shards = shardStrategy(sft)  

可以看出在toIndexKeyBytes方法当中,没有对于时间维度的相关操作,其他部分与Z2索引的构建过程是类似的。

1.     override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {  

2.         val sfc: XZ2SFC = XZ2SFC(sft.getXZPrecision)  

3.         val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)  

4.         getXZValueBytes(sfc, geomIndex, lenient)  

5.       }  

2     获取几何体的空间范围(envelope

实际生成XZ2索引的方法是org.locationtech.geomesa.index.index.z2.XZ2IndexKeySpace类中的getXZValueBytes方法。其结构依然类似于Z2IndexKeySpace类中的同名方法,主要可以分为两个部分,一个是准备数据(第8行到第15行)、创建字节数组封装数据(第17行到24行)。

但是与Z3索引的构建过程有两个部分是不同的,一个是在第13行,获取一个envelope,也就是一个boundingbox,另一个是在第19行,在这里调用的不是Z2index方法,而是XZ2index方法。

1.     private def getXZValueBytes(sfc: XZ2SFC,  

2.                                   geomIndex: Int,  

3.                                   lenient: Boolean)  

4.                                  (prefix: Seq[Array[Byte]],  

5.                                   feature: SimpleFeature,  

6.                                   suffix: Array[Byte]): Seq[Array[Byte]] = {  

7.         val geom = feature.getAttribute(geomIndex).asInstanceOf[Geometry]  

8.         if (geom == null) {  

9.           throw new IllegalArgumentException(s”Null geometry in feature ${feature.getID}”)  

10.       }  

11.       val envelope = geom.getEnvelopeInternal  

12.     

13.       val xz = try { sfc.index(envelope.getMinX, envelope.getMinY, envelope.getMaxX, envelope.getMaxY, lenient) } catch {  

14.         case NonFatal€ => throw new IllegalArgumentException(s”Invalid xz value from geometry: $geom”, e)  

15.       }  

16.     

17.       // 创建字节数组,封装所有数据  

18.       val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 8 + suffix.length)  

19.       var € = 0  

20.       prefix.foreach { p => System.arraycopy(p, 0, bytes, €, p.length); € += p.length }  

21.       ByteArrays.writeLong(xz, bytes, i)  

22.       System.arraycopy(suffix, 0, bytes, € + 8, suffix.length)  

23.       Seq(bytes)  

24.     }  

在创建envelope的过程与创建XZ3索引的过程是相同的,程序调用了org.locationtech.jts.geom.Geometry类的getEnvelopeInternal方法。在这里会先判断一下是否存在已经创建过envelope对象,如果没有该对象,程序会进行相关的计算。

1.     public Envelope getEnvelopeInternal() {  

2.         if (envelope == null) {  

3.           envelope = computeEnvelopeInternal();  

4.         }  

5.         return new Envelope(envelope);  

6.       }  

最终得到的Envelope对象里面主要封装了四个数据:经度的最小值(minX)、纬度的最小值(minY)、经度的最大值(maxX)、维度的最大值(maxY)。

3     获取几何体的时空索引

这个过程是GeoMesa建立XZ2索引最重要的过程

1.      val xz = try { sfc.index(envelope.getMinX, envelope.getMinY, envelope.getMaxX, envelope.getMaxY, lenient) } catch {  

2.           case NonFatal€ => throw new IllegalArgumentException(s”Invalid xz value from geometry: $geom”, e)  

3.         }  

此处的index方法是XZ2index方法,进入这个方法,可以看到其内部实现的过程。这个方法与XZ3索引的构建方法不同,这个方法传入了5个参数,其中包含了envelope对象当中的经度、纬度的最大值和最小值,还包含了两个时间属性以及一个布尔类型的数据。

1.     def index(xmin: Double, ymin: Double, xmax: Double, ymax: Double, lenient: Boolean = false): Long = {  

2.         // 数据极值标准化

3.         val (nxmin, nymin, nxmax, nymax) = normalize(xmin, ymin, xmax, ymax, lenient)  

4.       

5.         // 计算序列编码的长度  

6.         val maxDim = math.max(nxmax – nxmin, nymax – nymin)  

7.       

8.         // 计算编码长度的最小值  

9.         val l1 = math.floor(math.log(maxDim) / XZSFC.LogPointFive).toInt  

10.     

11.       // 判断编码长度要取最大值还是最小值  

12.       val length = if (l1 >= g) { g } else {  

13.         val w2 = math.pow(0.5, l1 + 1)   

14.    

15.         // 计算空间实体切割了多少条轴线

16.         def predicate(min: Double, max: Double): Boolean = max <= (math.floor(min / w2) * w2) + (2 * w2)  

17.     

18.         if (predicate(nxmin, nxmax) && predicate(nymin, nymax)) l1 + 1 else l1  

19.       }  

20.     

21.       sequenceCode(nxmin, nymin, length)  

22.     } 

XZ3相类似,index方法内部大体也可以分为三个过程:空间数据的标准化处理、准备参数、生成序列化编码。接下来对这三个过程进行详细的介绍:

3.1    空间数据的标准化处理

由于XZ2索引针对的是空间数据,因此这个数据的标准化处理过程当中只涉及到经度、纬度的最大最小值,外带一个布尔类型的参数。最终返回的是封装了经过处理的四个Double类型的元组。

1.     private def normalize(xmin: Double,  

2.                             ymin: Double,  

3.                             xmax: Double,  

4.                             ymax: Double,  

5.                             lenient: Boolean): (Double, Double, Double, Double) = {  

6.         require(xmin <= xmax && ymin <= ymax, s”Bounds must be ordered: [$xmin $xmax] [$ymin $ymax]”)  

7.       

8.         try {  

9.           require(xmin >= xLo && xmax <= xHi && ymin >= yLo && ymax <= yHi,  

10.           s”Values out of bounds ([$xLo $xHi] [$yLo $yHi]): [$xmin $xmax] [$ymin $ymax]”)  

11.     

12.        val nxmin = (xmin – xLo) / xSize  

13.        val nymin = (ymin – yLo) / ySize  

14.        val nxmax = (xmax – xLo) / xSize  

15.        val nymax = (ymax – yLo) / ySize  

16.     

17.        (nxmin, nymin, nxmax, nymax)  

18.       } catch {  

19.         case _: IllegalArgumentException if lenient =>  

20.     

21.           val bxmin = if (xmin < xLo) { xLo } else if (xmin > xHi) { xHi } else { xmin }  

22.           val bymin = if (ymin < yLo) { yLo } else if (ymin > yHi) { yHi } else { ymin }  

23.           val bxmax = if (xmax < xLo) { xLo } else if (xmax > xHi) { xHi } else { xmax }  

24.           val bymax = if (ymax < yLo) { yLo } else if (ymax > yHi) { yHi } else { ymax }  

25.     

26.           val nxmin = (bxmin – xLo) / xSize  

27.           val nymin = (bymin – yLo) / ySize  

28.           val nxmax = (bxmax – xLo) / xSize  

29.           val nymax = (bymax – yLo) / ySize  

30.     

31.           (nxmin, nymin, nxmax, nymax)  

32.       }  

33.     }  

可以看出,这个方法的核心在第15行到第20行,就是将经度、维度的最大值和最小值,共4个数据,经过比值运算,转换成为01之间的相对于各自全局最大值和最小值的比值。最终这些比值数据组成的元组将回传给index方法。

3.2    准备参数

XZ3素银的构建过程类似,在进行数据封装之前,同样需要准备一些参数。这个过程具体代码如下:

1.         // 计算序列编码的长度  

2.         val maxDim = math.max(nxmax – nxmin, nymax – nymin)  

3.       

4.         // 计算编码长度的最小值  

5.         val l1 = math.floor(math.log(maxDim) / XZSFC.LogPointFive).toInt  

6.       

7.         // 判断编码长度要取最大值还是最小值  

8.         val length = if (l1 >= g) { g } else {  

9.           val w2 = math.pow(0.5, l1 + 1)   

10.    

11.         // 计算空间实体切割了多少条轴线

12.         def predicate(min: Double, max: Double): Boolean = max <= (math.floor(min / w2) * w2) + (2 * w2)  

13.     

14.         if (predicate(nxmin, nxmax) && predicate(nymin, nymax)) l1 + 1 else l1  

15.       }  

首先要计算编码序列的长度,这个过程分为两个过程:

首先是计算编码长度的最小值,在这一步需要计算数据差异最大的那个维度的差值,再根据这个差值来计算出编码长度的最小值。这个过程在论文当中的引理1中可以验证。

然后就是对于编码的具体长度进行设定。在这个过程当中,同样需要与默认精度(默认编码长度)进行对比,如果计算出的最短编码长度小于默认经度,就需要进行进一步的计算,最终判断编码长度是使用l1+1还是使用l1。如果计算出的最短编码长度大于默认精度,就选取默认精度作为此次编码的长度。这个过程在论文同样有相对应的阐述。

3.3    生成序列化编码

最后一步就是生成序列化的编码,其中需要传入的参数有三个,分别是三个维度上的最小值(经过极值标准化以后的数据)以及上面讨论过的编码的长度。

1.        sequenceCode(nxmin, nymin, length)  

方法的具体内容如下:

1.     private def normalize(xmin: Double,  

2.                             ymin: Double,  

3.                             xmax: Double,  

4.                             ymax: Double,  

5.                             lenient: Boolean): (Double, Double, Double, Double) = {  

6.         require(xmin <= xmax && ymin <= ymax, s"Bounds must be ordered: [$xmin $xmax] [$ymin $ymax]")  

7.       

8.         try {  

9.           require(xmin >= xLo && xmax <= xHi && ymin >= yLo && ymax <= yHi,  

10.           s"Values out of bounds ([$xLo $xHi] [$yLo $yHi]): [$xmin $xmax] [$ymin $ymax]")  

11.     

12.         val nxmin = (xmin - xLo) / xSize  

13.         val nymin = (ymin - yLo) / ySize  

14.         val nxmax = (xmax - xLo) / xSize  

15.         val nymax = (ymax - yLo) / ySize  

16.     

17.         (nxmin, nymin, nxmax, nymax)  

18.       } catch {  

19.         case _: IllegalArgumentException if lenient =>  

20.     

21.           val bxmin = if (xmin < xLo) { xLo } else if (xmin > xHi) { xHi } else { xmin }  

22.           val bymin = if (ymin < yLo) { yLo } else if (ymin > yHi) { yHi } else { ymin }  

23.           val bxmax = if (xmax < xLo) { xLo } else if (xmax > xHi) { xHi } else { xmax }  

24.           val bymax = if (ymax < yLo) { yLo } else if (ymax > yHi) { yHi } else { ymax }  

25.     

26.           val nxmin = (bxmin - xLo) / xSize  

27.           val nymin = (bymin - yLo) / ySize  

28.           val nxmax = (bxmax - xLo) / xSize  

29.           val nymax = (bymax - yLo) / ySize  

30.     

31.           (nxmin, nymin, nxmax, nymax)  

32.       }  

33.     }  

在这个方法中,我们可以清楚地看到论文当中的思想。这个方法主要分为三个部分:

首先是对于数据的初始化,主要是初始的经度、纬度的最大值和最小值以及序列化编码的初始化。

接下来就是对于数据的迭代处理。每一轮都会对于经度、纬度、时间进行二分,计算此时的经度中值、纬度中值,然后再根据这些标准来对传入的xy值进行判断,判断的标准主要是依据这三个数值定位的点所在的卦限。针对不同的象限,分别对数据进行不同的处理。内部计算的过程采用了论文当中的引理4当中对于C(s)的计算公式。

最终,经过加工的序列编码作为返回值回传给index方法,进一步回传给更上一层的方法中。

4     将数据进行封装并进行序列化

这个封装过程与Z2的过程相类似,在此不再赘述。

1.         // 创建字节数组,封装所有数据  

2.         val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 8 + suffix.length)  

3.         var € = 0  

4.         prefix.foreach { p => System.arraycopy(p, 0, bytes, €, p.length); € += p.length }  

5.         ByteArrays.writeLong(xz, bytes, i)  

6.         System.arraycopy(suffix, 0, bytes, € + 8, suffix.length)  

7.         Seq(bytes)  

8.       }  

第七章   XZ3索引

第一节   XZ3索引概述

在前述的Z3索引当中,时空索引仅仅是能够实现单个点及其时间点的索引构造。然而在真实的场景当中,往往需要进行创建索引的数据可能是一条线,一个多边形甚至是更加复杂的几何图形。

为了给这些复杂几何体创建唯一的时空索引,geomesa采用了XZ3的形式来进行索引的构建,基本思路是寻找能够包含这个几何元素的最小立方体。在实际的开发过程当中,这种形式可能会浪费很大的空间,可以采用凸包来进行索引。不过建立索引的目的本身是给几何元素一个唯一确定的标识索引,在实际情况当中,在正常的数据精度下,BoundingBox本身就已经很难重复,而凸包还需要经历一个计算的过程,因此具体这里使用BoundingBos和凸包哪个更优,这个问题还需要具体情况具体分析。

第二节   XZ3索引的原理

1.   数据预处理

在大体的构建索引的流程上,这个过程与Z3索引的构建在一些步骤上是很类似的。尤其是在数据的预处理上,基本的流程是相通的,包括获取分片、获取Epoch Week、获取Fid这几个步骤,两个索引在建构上是相同的,可以参看第三章的第二节,在此不再赘述。

2.   获取几何体的空间范围

3.   获取几何体的时空索引

4.   将数据进行封装并进行序列化

第三节   XZ3索引的代码实现

XZ3索引的入口同样是org.locationtech.geomesa.index.index.BaseFeatureIndex类的writer方法,这个是与Z3索引相同的,但是在具体的方法调用上,就不会调用Z3的方法了,而是调用XZ3相关的方法。

7.     override def writer(sft: SimpleFeatureType, ds: DS): F => Seq[W] = {  

8.         val sharing = sft.getTableSharingBytes  

9.         val shards = shardStrategy(sft)  

10.       val toIndexKey = keySpace.toIndexKeyBytes(sft)  

11.       mutator(sharing, shards, toIndexKey, createInsert)  

12.     }  

1     数据预处理

数据预处理主要包含了sharing(主要针对DataStoreAccumulo的交互操作)、shard数值的生成,其中shard是分片的数值。这是一部分的需要预处理的数据。

3.         val sharing = sft.getTableSharingBytes  

4.         val shards = shardStrategy(sft)  

另外一部分需要预处理的数据是Epoch Week,这方面的过程存在于org.locationtech.geomesa.index.index.z3.XZ3IndexKeySpace类中的toIndexKeyBytes方法。代码如下:

1.     override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {  

2.         val sfc = XZ3SFC(sft.getXZPrecision, sft.getZ3Interval)  

3.         val geomIndex = sft.indexOf(sft.getGeometryDescriptor.getLocalName)  

4.         val dtgIndex = sft.getDtgIndex.getOrElse(throw new IllegalStateException("XZ3 index requires a valid date"))  

5.         val timeToIndex: TimeToBinnedTime = BinnedTime.timeToBinnedTime(sft.getZ3Interval)  

6.         getZValueBytes(sfc, geomIndex, dtgIndex, timeToIndex, lenient)  

7.       }  

可以看出,此处调用的依然是Z3索引的时间间隔,因此这个生成Epoch Week的过程与构建Z3索引的相关过程是完全相同的。

经过这些过程以后,接下来就是针对XZ3索引的独有部分了。

2      获取几何体的空间范围(envelope

对于生成XZ3索引的仍然是org.locationtech.geomesa.index.index.z3.XZ3IndexKeySpace类中的getZValueBytes方法。其结构依然类似于Z3IndexKeySpace类中的同名方法,主要可以分为两个部分,一个是准备数据(第10行到第22行)、创建字节数组封装数据(第24行到31行)。

但是与Z3索引的构建过程有两个部分是不同的,一个是在第13行,获取一个envelope,也就是一个boundingbox,另一个是在第19行,在这里调用的不是Z3index方法,而是XZ3index方法。

1.     private def getZValueBytes(sfc: XZ3SFC,  

2.                                  geomIndex: Int,  

3.                                  dtgIndex: Int,  

4.                                  timeToIndex: TimeToBinnedTime,  

5.                                  lenient: Boolean)  

6.                                 (prefix: Seq[Array[Byte]],  

7.                                  feature: SimpleFeature,  

8.                                  suffix: Array[Byte]): Seq[Array[Byte]] = {  

9.         val geom = feature.getDefaultGeometry.asInstanceOf[Geometry]  

10.       if (geom == null) {  

11.         throw new IllegalArgumentException(s"Null geometry in feature ${feature.getID}")  

12.       }  

13.      val envelope = geom.getEnvelopeInternal  

14.         

15.       val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]  

16.       val time = if (dtg == null) { 0L } else { dtg.getTime }  

17.       val BinnedTime(b, t) = timeToIndex(time)  

18.       val xz = try {  

19.        sfc.index(envelope.getMinX, envelope.getMinY, t, envelope.getMaxX, envelope.getMaxY, t, lenient)  

20.       } catch {  

21.         case NonFatal(e) => throw new IllegalArgumentException(s"Invalid xz value from geometry/time: $geom,$dtg", e)  

22.       }  

23.     

24.       // 创建字节数组  

25.       val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)  

26.       var i = 0  

27.       prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

28.       ByteArrays.writeShort(b, bytes, i)  

29.       ByteArrays.writeLong(xz, bytes, i + 2)  

30.       System.arraycopy(suffix, 0, bytes, i + 10, suffix.length)  

31.       Seq(bytes)  

32.     }  

在创建envelope时,程序调用了org.locationtech.jts.geom.Geometry类的getEnvelopeInternal方法。在这里会先判断一下是否存在已经创建过envelope对象,如果没有该对象,程序会进行相关的计算。

7.     public Envelope getEnvelopeInternal() {  

8.         if (envelope == null) {  

9.           envelope = computeEnvelopeInternal();  

10.       }  

11.       return new Envelope(envelope);  

12.     }  

最终得到的Envelope对象里面主要封装了四个数据:经度的最小值(minX)、纬度的最小值(minY)、经度的最大值(maxX)、维度的最大值(maxY)。

3      获取几何体的时空索引

这个过程是geomesa构建XZ3索引最重要的过程。这个过程在getZIndexBytes对应的代码如下:

1.          sfc.index(envelope.getMinX, envelope.getMinY, t, envelope.getMaxX, envelope.getMaxY, t, lenient)  

此处的index方法是XZ3index方法,进入这个方法,可以看到其内部实现的过程。这个方法传入了7个参数,其中包含了envelope对象当中的经度、纬度的最大值和最小值,还包含了两个时间属性以及一个布尔类型的数据。

从这个方法可以看出,其实geomesa是能够支持将经度、纬度、时间的最大值和最小值,这个过程正好对应了index方法参数列表当中的6个变量,但是在原生的geomesa框架当中,传入的时间属性都是相同的,也就是说这种功能虽然它已经实现了,但是程序并没有放开。个人认为,此处是框架本身局限性的问题,不过对于用户来说,在进行二次开发的过程中,这个是一个可以优化的点。

1.     def index(xmin: Double, ymin: Double, zmin: Double, xmax: Double, ymax: Double, zmax: Double, lenient: Boolean = false): Long = {  

2.         val (nxmin, nymin, nzmin, nxmax, nymax, nzmax) = normalize(xmin, ymin, zmin, xmax, ymax, zmax, lenient)  

3.         val maxDim = math.max(math.max(nxmax - nxmin, nymax - nymin), nzmax - nzmin)  

4.         val l1 = math.floor(math.log(maxDim) / XZSFC.LogPointFive).toInt  

5.         val length = if (l1 >= g) { g } else {  

6.           val w2 = math.pow(0.5, l1 + 1)   

7.           def predicate(min: Double, max: Double): Boolean = max <= (math.floor(min / w2) * w2) + (2 * w2)  

8.           if (predicate(nxmin, nxmax) && predicate(nymin, nymax) && predicate(nzmin, nzmax)) l1 + 1 else l1  

9.         }  

10.     

11.       sequenceCode(nxmin, nymin, nzmin, length)  

12.     }  

如果对于XZ-ordering论文进行仔细分析,这个方法的很多元素会很熟悉。其内部大概可以分为三步:首先是对于时空数据的极值标准化操作,然后对几个参数进行了准备(maxDiml1length),最后就是将上述数据进行序列化的操作,生成最后的索引。

这个方法的内部则涉及到了很多XZ-Ordering论文的内容,在此一点一点来介绍。

3.1    时空数据的标准化处理

与其他索引构建的过程类似,经度、维度、时间的数据需要先经过极值标准化的过程。但是这个过程和其他索引不同,六个数据共同封装,最终返回一个元组。而且这个函数传入的是Double类型的数据,传出的元组数据也都是Double类型。

1.        val (nxmin, nymin, nzmin, nxmax, nymax, nzmax) = normalize(xmin, ymin, zmin, xmax, ymax, zmax, lenient)  

具体的normalize方法内容如下:

1.     private def normalize(xmin: Double,  

2.                             ymin: Double,  

3.                             zmin: Double,  

4.                             xmax: Double,  

5.                             ymax: Double,  

6.                             zmax: Double,  

7.                             lenient: Boolean): (Double, Double, Double, Double, Double, Double) = {  

8.         require(xmin <= xmax && ymin <= ymax && zmin <= zmax,  

9.           s"Bounds must be ordered: [$xmin $xmax] [$ymin $ymax] [$zmin $zmax]")  

10.     

11.       try {  

12.         require(xmin >= xLo && xmax <= xHi && ymin >= yLo && ymax <= yHi && zmin >= zLo && zmax <= zHi,  

13.           s"Values out of bounds ([$xLo $xHi] [$yLo $yHi] [$zLo $zHi]): [$xmin $xmax] [$ymin $ymax] [$zmin $zmax]")  

14.     

15.        val nxmin = (xmin - xLo) / xSize  

16.        val nymin = (ymin - yLo) / ySize  

17.        val nzmin = (zmin - zLo) / zSize  

18.        val nxmax = (xmax - xLo) / xSize  

19.        val nymax = (ymax - yLo) / ySize  

20.        val nzmax = (zmax - zLo) / zSize  

21.     

22.         (nxmin, nymin, nzmin, nxmax, nymax, nzmax)  

23.       } catch {  

24.         case _: IllegalArgumentException if lenient =>  

25.     

26.           val bxmin = if (xmin < xLo) { xLo } else if (xmin > xHi) { xHi } else { xmin }  

27.           val bymin = if (ymin < yLo) { yLo } else if (ymin > yHi) { yHi } else { ymin }  

28.           val bzmin = if (zmin < zLo) { zLo } else if (zmin > zHi) { zHi } else { zmin }  

29.           val bxmax = if (xmax < xLo) { xLo } else if (xmax > xHi) { xHi } else { xmax }  

30.           val bymax = if (ymax < yLo) { yLo } else if (ymax > yHi) { yHi } else { ymax }  

31.           val bzmax = if (zmax < zLo) { zLo } else if (zmax > zHi) { zHi } else { zmax }  

32.     

33.           val nxmin = (bxmin - xLo) / xSize  

34.           val nymin = (bymin - yLo) / ySize  

35.           val nzmin = (bzmin - zLo) / zSize  

36.           val nxmax = (bxmax - xLo) / xSize  

37.           val nymax = (bymax - yLo) / ySize  

38.           val nzmax = (bzmax - zLo) / zSize  

39.     

40.           (nxmin, nymin, nzmin, nxmax, nymax, nzmax)  

41.       }  

42.     }  

可以看出,这个方法的核心在第15行到第20行,就是将经度、维度、时间的最大值和最小值,共6个数据,经过比值运算,转换成为01之间的相对于各自全局最大值和最小值的比值。最终这些比值数据组成的元组将回传给index方法。

3.2    准备参数

接下来就是对于一些论文当中出现的参数进行准备。

1.         val maxDim = math.max(math.max(nxmax - nxmin, nymax - nymin), nzmax - nzmin)  

2.         val l1 = math.floor(math.log(maxDim) / XZSFC.LogPointFive).toInt  

3.         val length = if (l1 >= g) { g } else {  

4.           val w2 = math.pow(0.5, l1 + 1)   

5.           def predicate(min: Double, max: Double): Boolean = max <= (math.floor(min / w2) * w2) + (2 * w2)  

6.           if (predicate(nxmin, nxmax) && predicate(nymin, nymax) && predicate(nzmin, nzmax)) l1 + 1 else l1  

7.         }  

首先是maxDim参数,这里对前面极值标准化的各维度数据进行做差,算出其中最大值和最小值差别最大的一个数据,作为整个XZ索引对应的空间立方体的边长。

之后就是l1参数,这个参数出现在论文当中的3.1节,联系前文可以知道这里计算得出的是当前要生成的四分序列的最小值。

接下来就是最终序列长度的确定。这里涉及到了一个参数g,从论文中,我们可以得知,这个是整个空间当中,表达最细粒度空间需要的序列长度。从其他部分可以得知,此处的g默认为12。在这个方法当中,系统会对l1g的值进行判断,如果l1小于g,也就是说需要的精度是比默认精度低的,那就会进行下面的操作;如果l1大于等于g,也就是说需要的经度比默认精度高,或者默认的经度就可以满足需求,那么就会选择默认的g来作为序列编码的长度。

如果l1小于g,这时,就用到了论文当中的引理1的证明过程,当序列化编码取最大值l2时,其对应的网格间距就会比实体宽度的一半还要小。转化为公式就是:

这里的w2计算的就是一个元素此时的宽度。

接下来的predict方法就是确认此时有多少轴线被实体切割了。如果在三个维度上,在最小值和最大值两种情况下,包含的网格都是相同的,最终采用的长度就是l1+1,否则就是l1

3.3    生成序列化编码

最后一步就是生成序列化的编码,其中需要传入的参数有四个,分别是三个维度上的最小值(经过极值标准化以后的数据)以及上面讨论过的编码的长度。

1.        sequenceCode(nxmin, nymin, nzmin, length)  

这个方法的具体内容如下:

1.     private def sequenceCode(x: Double, y: Double, z: Double, length: Int): Long = {  

2.       var xmin = 0.0  

3.       ary max = 0.0  

4.       var zmin = 0.0  

5.       var xmax = 1.0  

6.       ary max = 1.0  

7.       var zmax = 1.0  

8.       

9.       var cs = 0L  

10.     

11.     var € = 0  

12.     while (€ < length) {  

13.       val xCenter = (xmin + xmax) / 2.0  

14.       val yCenter = (ymin + ymax) / 2.0  

15.       val zCenter = (zmin + zmax) / 2.0  

16.       (x < xCenter, y < yCenter, z < zCenter) match {  

17.         case (true,  true, true)   => cs += 1L                                             ; xmax = xCenter; ymax = yCenter; zmax = zCenter  

18.         case (false, true, true)   => cs += 1L + 1L * (math.pow(8, g – i).toLong – 1L) / 7L; xmin = xCenter; ymax = yCenter; zmax = zCenter  

19.         case (true,  false, true)  => cs += 1L + 2L * (math.pow(8, g – i).toLong – 1L) / 7L; xmax = xCenter; ymin = yCenter; zmax = zCenter  

20.         case (false, false, true)  => cs += 1L + 3L * (math.pow(8, g – i).toLong – 1L) / 7L; xmin = xCenter; ymin = yCenter; zmax = zCenter  

21.         case (true,  true, false)  => cs += 1L + 4L * (math.pow(8, g – i).toLong – 1L) / 7L; xmax = xCenter; ymax = yCenter; zmin = zCenter  

22.         case (false, true, false)  => cs += 1L + 5L * (math.pow(8, g – i).toLong – 1L) / 7L; xmin = xCenter; ymax = yCenter; zmin = zCenter  

23.         case (true,  false, false) => cs += 1L + 6L * (math.pow(8, g – i).toLong – 1L) / 7L; xmax = xCenter; ymin = yCenter; zmin = zCenter  

24.         case (false, false, false) => cs += 1L + 7L * (math.pow(8, g – i).toLong – 1L) / 7L; xmin = xCenter; ymin = yCenter; zmin = zCenter  

25.       }  

26.       € += 1  

27.     }  

28.     

29.     cs  

30.   }  

在这个方法中,我们可以清楚地看到论文当中的思想。这个方法主要分为三个部分:

首先是对于数据的初始化,主要是初始的经度、纬度、时间的最大值和最小值以及序列化编码的初始化。

接下来就是对于数据的迭代处理。每一轮都会对于经度、纬度、时间进行二分,计算此时的经度中值、纬度中值、时间中值,然后再根据这些标准来对传入的xyz值进行判断,判断的标准主要是依据这三个数值定位的点所在的卦限。针对不同的卦限,分别对数据进行不同的处理。内部计算的过程采用了论文当中的引理4当中对于C(s)的计算公式。不过值得注意的是,原有的公式里,因为只是二维的数据,所以采用了4作为指数运算的底数,在此处,因为涉及到的是三维运算,因此采用了8作为指数运算的底数。

最终,经过加工的序列编码作为返回值回传给index方法,进一步回传给更上一层的方法中。

4      将数据进行封装并进行序列化

最后对于数据进行封装的过程比较简单,与Z3索引相类似,此处不再赘述了。

1.         // 创建字节数组  

2.         val bytes = Array.ofDim[Byte](prefix.map(_.length).sum + 10 + suffix.length)  

3.         var € = 0  

4.         prefix.foreach { p => System.arraycopy(p, 0, bytes, €, p.length); € += p.length }  

5.         ByteArrays.writeShort(b, bytes, i)  

6.         ByteArrays.writeLong(xz, bytes, € + 2)  

7.         System.arraycopy(suffix, 0, bytes, € + 10, suffix.length)  

8.         Seq(bytes)  

9.       }  

第八章   其他索引

geomesa当中,除了前文介绍的索引意外,还可以建立一些其他类型的索引,例如ID索引,属性索引,日期索引等。由于这些索引建立机制相对来说比较简单,在此不再独立成章,在本章集中将这几种索引进行介绍。

第一节   ID索引

ID建立索引的方法主要是在org.locationtech.geomesa.index.index.id.IdIndexKeySpace类中的toIndexKeyBytes方法。

1.     override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = getIdAsBytes  

在这个方法当中,直接调用了同类的getIdAsBytes方法。这个过程是比较简单的,直接将FeatureId按照UTF-8编码集来进行序列化,在前面加上前缀,包括sharingaccumulo才有用)、shardID Indexshard策略为NoShardStrategy),最终封装这些数据形成字节数组并返回。

1.     private def getIdAsBytes(prefix: Seq[Array[Byte]], feature: SimpleFeature, suffix: Array[Byte]): Seq[Array[Byte]] = {  

2.         val length = prefix.map(_.length).sum + suffix.length  

3.         val result = Array.ofDim[Byte](length)  

4.         var i = 0  

5.         prefix.foreach { p =>  

6.           System.arraycopy(p, 0, result, i, p.length)  

7.           i += p.length  

8.         }  

9.         System.arraycopy(suffix, 0, result, i, suffix.length)  

10.       Seq(result)  

11.     }  

第二节   属性索引

对于属性的索引相对来说比较复杂,其实现在org.locationtech.geomesa.index.index.attribute.AttributeIndexKeySpace类中的toIndexKeyBytes方法。

1.     override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {  

2.         val indexedAttributes = SimpleFeatureTypes.getSecondaryIndexedAttributes(sft).map { d =>  

3.           val i = sft.indexOf(d.getName)  

4.           (i, AttributeIndexKey.indexToBytes(i), d.isList)  

5.         }  

6.         (prefix, feature, suffix) => {  

7.           val baseLength = prefix.map(_.length).sum + suffix.length + 3 // 2 for attributed i, 1 for null byte  

8.           indexedAttributes.flatMap { case (idx, idxBytes, list) =>  

9.             AttributeIndexKey.encodeForIndex(feature.getAttribute(idx), list).map { encoded =>  

10.             val value = encoded.getBytes(StandardCharsets.UTF_8)  

11.    

12.             // 创建byte数组,封装所有的二进制索引

13.             val bytes = Array.ofDim[Byte](baseLength + value.length)  

14.             var i = 0  

15.             prefix.foreach { p => System.arraycopy(p, 0, bytes, i, p.length); i += p.length }  

16.             bytes(i) = idxBytes(0)  

17.             i += 1  

18.             bytes(i) = idxBytes(1)  

19.             i += 1  

20.             System.arraycopy(value, 0, bytes, i, value.length)  

21.             i += value.length  

22.             bytes(i) = ByteArrays.ZeroByte  

23.             System.arraycopy(suffix, 0, bytes, i + 1, suffix.length)  

24.             bytes  

25.           }  

26.         }  

27.       }  

28.     }  

这个方法大体可以分为两个部分:

首先生成一个与属性相关的元组,其中比较重要的是前两个元素,这里对于属性名进行了索引,将属性名转化成为对应的int,然后再转化为字节数组。

第二步就是将数据封装到一个字节数组当中。由于属性索引中包含属性的内容,所以属性索引的长度是不确定的。所以在设定初始长度时只设定了前缀、后缀的位置,还预留了3个位置。然后就是对于属性值转换为字节数组,即value参数。最后将上述的这些数据进行组合,具体的顺序如下:

1.     val bytes = {  

2.         // shard  

3.         2,  

4.         // idxBytes  

5.         0,1,  

6.         // 属性的字节数组  

7.         -24,-65,103,-26,-104,-81,-28,-72,-128,-24,-66,-122,-27,-91,-101,-25,-119,-116,-24,-67,-90,  

8.         // 分割位  

9.         0  

10.       // FeatureId的字节数组  

11.       -23,-69,-111,65,32,83,50,51,52,50  

12.   }  

 

第三节   日期索引

对于日期的索引的实现在org.locationtech.geomesa.index.index.legacy.AttributeDateIndex.DateIndexKeySpace类中的toIndexKeyBytes方法当中。

1.     override def toIndexKeyBytes(sft: SimpleFeatureType, lenient: Boolean): ToIndexKeyBytes = {  

2.           val dtgIndex = sft.getDtgIndex.getOrElse(-1)  

3.           (prefix, feature, suffix) => {  

4.             val dtg = feature.getAttribute(dtgIndex).asInstanceOf[Date]  

5.             val p = ByteArrays.concat(prefix: _*)  

6.             Seq(ByteArrays.concat(p, timeToBytes(if (dtg == null) { 0L } else { dtg.getTime }), suffix))  

7.           }  

8.         }  

这个方法内部最关键的在第6行,系统调用了timeToBytes方法。TimeToBytes方法也在这个类当中,具体内容如下:

1.     private def timeToBytes(t: Long): Array[Byte] =  

2.       typeRegistry.encode(t).substring(0, 12).getBytes(StandardCharsets.UTF_8)  

可以看出,时间经过编码以后,系统只截取了前12位的数值,然后将截取的数值根据UTF-8编码集进行编码,转换成为字节数组并将这个字节数组返回给toIndexKeyBytes方法中。



 [l1]https://new.qq.com/omn/20190203/20190203B0PGKP.html

 [l2]此处配插图

共0条评论