开发者社区 > 博文 > 【JUSTQL Reference】实时数据接入示例
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

【JUSTQL Reference】实时数据接入示例

  • 京东城市JUST团队
  • 2021-01-26
  • IP归属:未知
  • 37080浏览

1.1 实时数据接入示例

JUST引擎提供利用任务管理模块接入实时数据的方式,目前支持kafka数据源, 存储支持JUSTKafkaJDBC等,完整步骤顺序为:建source表、建sink表、将source表中数据插入sink表,共涉及DDLDMLDQL三个部分。下面对这三个部分做顺序介绍。

1.1.1 DDL

1.1.1.1 语法

1.   CREATE TABLE <tableName> (    

2.       fieldName1 fieldType1,    

3.       fieldName2 fieldType2,    

4.       ...    

5.   WITH (    

6.       'key1' = 'value1',    

7.       'key2' = 'value2',    

8.       ...    

9.   )  

1.1.1.1.1 支持的数据类型

SQL

Java Type

说明

备注

STRING

java.lang.String

字符串

 

INT

java.lang.Integer

整型

 

BIGINT

java.lang.Long

长整型

 

FLOAT

java.lang.Float

单精度浮点型

 

DOUBLE

java.lang.Double

双精度浮点型

 

BOOLEAN

java.lang.Boolean

布尔型

 

DATE

java.sql.Date

日期类型

 

TIME

java.sql.Time

时间类型

 

TIMESTAMP

java.sql.Timestamp

时间戳

 

DECIMAL

java.math.BigDecimal

固定有效位数和小数位数的数据类型

 

ROW<fieldname fieldtype, ...>

POJO

对象类型

e.g. ROW<myField VARCHAR, myOtherField INT>

ARRAY

java.util.List/数组

单列集合/数组

e.g. ARRAY

MAP<fieldtype, fieldtype>

java.util.Map

双列K/V集合

e.g. MAP<VARCHAR, INT>

1.1.1.1.2 WITH参数

With参数包含connectorformatschemaupdate mode几部分。详情点击此处查看

1.1.1.2 示例

kafka内数据为:

4.9 kafka示例文件数据

kafka source的创表语句为:

1.   CREATE TABLE kafka_source (  

2.      taxiId STRING,  

3.      ts TIMESTAMP(3),  

4.      btime BIGINT,  

5.      lng DOUBLE,   

6.      lat DOUBLE,  

7.      lng2 DOUBLE,   

8.      lat2 DOUBLE,  

9.      -- 这里ROW的第一个字段一定是经度,第二个字段一定是纬度  

10.     linestring_point_data ARRAY<ROW<lng DOUBLE,lat DOUBLE>>,  

11.     polygon_point_data ARRAY<ROW<lng DOUBLE,lat DOUBLE>>,  

12.     mbr_point_data ARRAY<ROW<lng DOUBLE,lat DOUBLE>>  

13. WITH (  

14.     'connector.type' = 'kafka',  

15.     'connector.version' = '0.11',  

16.     'connector.topic' = 'test_udf_data',  

17.     'connector.startup-mode' = 'group-offsets',  

18.     'connector.properties.bootstrap.servers' = 'KAFKA_IP:2181, KAFKA_IP:2181,...',   

19.     'connector.properties.group.id' = 'g1',  

20.     'update-mode' = 'append',  

21.     'format.type' = 'json',  

22.     'format.derive-schema' = 'true'  

23. )   

JUST sink建表示例如下:

1.   CREATE TABLE just_sink (  

2.     taxiId STRING,  

3.     -- FlinkTIMESTAMP(3)对应JUST中的TIMESTAMP类型  

4.     ts2 TIMESTAMP(3),  

5.     btime_ms BIGINT,  

6.     btime_s BIGINT,  

7.     -- JUST中对应的空间字段定义在Flink中全部用STRING类型表示,用WKT的格式输出  

8.     point2 STRING,  

9.     linestring2 STRING,  

10.   polygon2 STRING,  

11.   mbr2 STRING  

12. WITH (  

13.   'connector.type' = 'geomesa',  

14.   -- sft-name的值为:JUST的表名称  

15.   'connector.sft-name' = 'test-table',  

16.    'connector.datastore.hbase.zookeepers' = 'ZK_IP:2181,ZK_IP:2181,...',  

17.   -- catalog的值为:JUST的用户名_JUST的库名称  

18.   'connector.datastore.hbase.catalog' = 'test01_testdb',  

19.   -- 可选配置(若有空间字段必须配置)空间字段格式:字段1,字段2,字段3  

20.   'connector.spatial.fields' = 'point2,linestring2,polygon2,mbr',   

21.   -- 主键字段  

22.   'connector.primary-key.field' = ' taxiId',  

23.   -- 可选配置缓冲刷写数据的时间间隔,默认值3s  

24.   'connector.write.buffer-flush.interval' = '10s'  

25. )    

注:更多信息,参见kafka-connector

1.1.2 DML

1.1.2.1 语法

1.   INSERT INTO <tableName>  

2.       [ (columnName[ , columnName]*) ]   

3.       query   

1.1.2.2 示例

以从kafka数据源写入JUST为例,注意:INSERT INTO之前,必须先创建对应存储介质中的表,一个完整的SQL包含三部分:source ddlsink ddlinset into

1.   INSERT INTO just_sink  

2.   SELECT  

3.     taxiId,  

4.     -- 13位的时间时间戳转成TIMESTAMPFROM_UNIXTIME接收10位的时间戳  

5.     TO_TIMESTAMP(FROM_UNIXTIME(btime/1000)) as ts2,  

6.     timestamp2MS(ts) as btime_ms,  

7.     UNIX_TIMESTAMP(DATE_FORMAT(ts,'yyyy-MM-dd HH:mm:ss')) as btime_s,  

8.     -- 创建point,以WKT的格式输出  

9.     st_asText(st_makePoint(lng,lat)) as point2,  

10.   -- 创建linestring,以WKT的格式输出  

11.   st_asText(st_makeLine(linestring_point_data)) as linestring2,  

12.   -- 创建polygon,以WKT的格式输出  

13.   st_asText(st_makePolygon(st_makeLine(polygon_point_data))) as polygon2,  

14.   -- 创建mbr,以WKT的格式输出  

15.   st_asText(st_makeBBOX(lng,lat, lng2, lat2)) as mbr2  

16. FROM kafka_source;   

1.1.3 DQL

1.1.3.1 语法

详情可参考此处

1.  SELECT  

2.    [ (columnName[ , columnName]*) ]   

3.  FROM   

4.    <tableName> | subQuery  

5.  WHERE booleanExpression ]  

6.  GROUP BY { groupItem [, groupItem ]* } ]  

7.  HAVING booleanExpression ]  

8.  [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]  

1.1.3.2 示例

以从查询kafka中的数据为例

1.   SELECT  

2.     taxiId,  

3.     -- 13位的时间时间戳转成TIMESTAMPFROM_UNIXTIME接收10位的时间戳  

4.     TO_TIMESTAMP(FROM_UNIXTIME(btime/1000)) as ts2,  

5.     timestamp2MS(ts) as btime_ms,  

6.     UNIX_TIMESTAMP(DATE_FORMAT(ts,'yyyy-MM-dd HH:mm:ss')) as btime_s,  

7.     -- 创建point,以WKT的格式输出  

8.     st_asText(st_makePoint(lng,lat)) as point2,  

9.     -- 创建linestring,以WKT的格式输出  

10.   st_asText(st_makeLine(linestring_point_data)) as linestring2,  

11.   -- 创建polygon,以WKT的格式输出  

12.   st_asText(st_makePolygon(st_makeLine(polygon_point_data))) as polygon2,  

13.   -- 创建mbr,以WKT的格式输出  

14.   st_asText(st_makeBBOX(lng,lat, lng2, lat2)) as mbr2  

15. FROM kafka_source;  

共0条评论