1.1 实时数据接入示例
JUST引擎提供利用任务管理模块接入实时数据的方式,目前支持kafka数据源, 存储支持JUST、Kafka、JDBC等,完整步骤顺序为:建source表、建sink表、将source表中数据插入sink表,共涉及DDL、DML、DQL三个部分。下面对这三个部分做顺序介绍。
1.1.1
DDL
1.1.1.1 语法
1.
2.
3.
4.
5.
6.
7.
8.
9.
1.1.1.1.1 支持的数据类型
SQL |
Java Type |
说明 |
备注 |
STRING |
java.lang.String |
字符串 |
|
INT |
java.lang.Integer |
整型 |
|
BIGINT |
java.lang.Long |
长整型 |
|
FLOAT |
java.lang.Float |
单精度浮点型 |
|
DOUBLE |
java.lang.Double |
双精度浮点型 |
|
BOOLEAN |
java.lang.Boolean |
布尔型 |
|
DATE |
java.sql.Date |
日期类型 |
|
TIME |
java.sql.Time |
时间类型 |
|
TIMESTAMP |
java.sql.Timestamp |
时间戳 |
|
DECIMAL |
java.math.BigDecimal |
固定有效位数和小数位数的数据类型 |
|
ROW<fieldname fieldtype, ...> |
POJO |
对象类型 |
e.g. ROW<myField VARCHAR, myOtherField INT> |
ARRAY |
java.util.List/数组 |
单列集合/数组 |
e.g. ARRAY |
MAP<fieldtype, fieldtype> |
java.util.Map |
双列K/V集合 |
e.g. MAP<VARCHAR, INT> |
1.1.1.1.2 WITH参数
With参数包含connector、format、schema和update mode几部分。详情点击此处查看
1.1.1.2 示例
设kafka内数据为:
则kafka source的创表语句为:
1. CREATE TABLE kafka_source (
2. taxiId STRING,
3. ts TIMESTAMP(3),
4. btime BIGINT,
5. lng DOUBLE,
6. lat DOUBLE,
7. lng2 DOUBLE,
8. lat2 DOUBLE,
9. -- 这里ROW的第一个字段一定是经度,第二个字段一定是纬度
10. linestring_point_data ARRAY<ROW<lng DOUBLE,lat DOUBLE>>,
11. polygon_point_data ARRAY<ROW<lng DOUBLE,lat DOUBLE>>,
12. mbr_point_data ARRAY<ROW<lng DOUBLE,lat DOUBLE>>
13. ) WITH (
14. 'connector.type' = 'kafka',
15. 'connector.version' = '0.11',
16. 'connector.topic' = 'test_udf_data',
17. 'connector.startup-mode' = 'group-offsets',
18. 'connector.properties.bootstrap.servers' = 'KAFKA_IP:2181, KAFKA_IP:2181,...',
19. 'connector.properties.group.id' = 'g1',
20. 'update-mode' = 'append',
21. 'format.type' = 'json',
22. 'format.derive-schema' = 'true'
23. )
JUST sink建表示例如下:
1. CREATE TABLE just_sink (
2. taxiId STRING,
3. -- Flink中TIMESTAMP(3)对应JUST中的TIMESTAMP类型
4. ts2 TIMESTAMP(3),
5. btime_ms BIGINT,
6. btime_s BIGINT,
7. -- JUST中对应的空间字段定义在Flink中全部用STRING类型表示,用WKT的格式输出
8. point2 STRING,
9. linestring2 STRING,
10. polygon2 STRING,
11. mbr2 STRING
12. ) WITH (
13. 'connector.type' = 'geomesa',
14. -- sft-name的值为:JUST的表名称
15. 'connector.sft-name' = 'test-table',
16. 'connector.datastore.hbase.zookeepers' = 'ZK_IP:2181,ZK_IP:2181,...',
17. -- catalog的值为:JUST的用户名_JUST的库名称
18. 'connector.datastore.hbase.catalog' = 'test01_testdb',
19. -- 可选配置(若有空间字段必须配置), 空间字段, 格式:字段1,字段2,字段3
20. 'connector.spatial.fields' = 'point2,linestring2,polygon2,mbr',
21. -- 主键字段
22. 'connector.primary-key.field' = ' taxiId',
23. -- 可选配置, 缓冲刷写数据的时间间隔,默认值3s
24. 'connector.write.buffer-flush.interval' = '10s'
25. )
注:更多信息,参见kafka-connector。
1.1.2
DML
1.1.2.1 语法
1.1.2.2 示例
以从kafka数据源写入JUST为例,注意:INSERT INTO之前,必须先创建对应存储介质中的表,一个完整的SQL包含三部分:source ddl、sink ddl、inset into。
1. INSERT INTO just_sink
2. SELECT
3. taxiId,
4. -- 将13位的时间时间戳转成TIMESTAMP,FROM_UNIXTIME接收10位的时间戳
5. TO_TIMESTAMP(FROM_UNIXTIME(btime/1000)) as ts2,
6. timestamp2MS(ts) as btime_ms,
7. UNIX_TIMESTAMP(DATE_FORMAT(ts,'yyyy-MM-dd HH:mm:ss')) as btime_s,
8. -- 创建point,以WKT的格式输出
9. st_asText(st_makePoint(lng,lat)) as point2,
10. -- 创建linestring,以WKT的格式输出
11. st_asText(st_makeLine(linestring_point_data)) as linestring2,
12. -- 创建polygon,以WKT的格式输出
13. st_asText(st_makePolygon(st_makeLine(polygon_point_data))) as polygon2,
14. -- 创建mbr,以WKT的格式输出
15. st_asText(st_makeBBOX(lng,lat, lng2, lat2)) as mbr2
16. FROM kafka_source;
1.1.3 DQL
1.1.3.1 语法
详情可参考此处。
2. [ (columnName[ , columnName]*) ]
3. FROM
4. <tableName> | subQuery
5. [ WHERE booleanExpression ]
6. [ GROUP BY { groupItem [, groupItem ]* } ]
7. [ HAVING booleanExpression ]
8. [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
1.1.3.2 示例
以从查询kafka中的数据为例
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.