1.
-- 定义kafka source
2.
CREATE TABLE kafka_source_vehicle_gps_test (
3.
vehiclegpsid STRING,
4.
vehiclenum STRING,
5.
longitude DOUBLE,
6.
latitude DOUBLE,
7.
recordtime BIGINT,
8.
-- 如果有join处理的话, 需要声明处理时间字段, 固定写法:xxx AS PROCTIME()
9.
proctime AS PROCTIME()
10. ) WITH (
11. 'connector.type' = 'kafka',
12. 'connector.version' = '0.11',
13. 'connector.topic' = '71e0ef60-c039-485b-bd3e-e6fa2e5dbcc6',
14. 'connector.startup-mode' = 'group-offsets',
15. 'connector.properties.bootstrap.servers' = 'KAFKA_IP:9092,KAFKA_IP:9092,...',
16. 'connector.properties.group.id' = 'test-g1',
17. 'update-mode' = 'append',
18. 'format.type' = 'json',
19. 'format.derive-schema' = 'true'
20. );
21.
22.
23. -- 定义mysql维表
24. CREATE TABLE mysql_dim_gov_chemical_vehicle_base (
25. id BIGINT,
26. plate_number STRING
27. ) WITH (
28. 'connector.type' = 'jdbc',
29. 'connector.url' = 'jdbc:mysql://mysql.just.icity.internal:3306/xxx',
30. -- mysql中表名称
31. 'connector.table' = 'table_name',
32. 'connector.driver' = 'com.mysql.jdbc.Driver',
33. 'connector.username' = 'xxx',
34. 'connector.password' = 'xxx',
35. -- 可选配置, 缓存的最大行数, 超过此值时将删除最早的行
36. 'connector.lookup.cache.max-rows' = '5000',
37. -- 可选配置, 缓存的最长生存时间, 超过此值时最旧的行将过期; connector.lookup.cache.max-rows 和 connector.lookup.cache.ttl必须同时出现
38. 'connector.lookup.cache.ttl' = '60s',
39. -- 可选配置, 连接database失败最大重试次数
40. 'connector.lookup.max-retries' = '5'
41. );
42.
43.
44. -- 定义JUST统计结果表
45. CREATE TABLE just_stat_result (
46. pid STRING,
47. grid_id STRING,
48. -- JUST中对应的空间字段定义在Flink中全部用STRING类型表示,用WKT的格式输出
49. mbr STRING,
50. -- Flink中TIMESTAMP(3)对应JUST中的TIMESTAMP类型
51. start_time TIMESTAMP(3),
52. end_time TIMESTAMP(3),
53. car_num BIGINT
54. ) WITH (
55. 'connector.type' = 'geomesa',
56. -- sft-name的值为:JUST的表名称
57. 'connector.sft-name' = 'just_realtime_stat_result',
58. 'connector.datastore.hbase.zookeepers' = 'zk1:2181,zk2:2181,zk3:2181',
59. -- catalog的值为:JUST的用户名_JUST的库名称
60. 'connector.datastore.hbase.catalog' = 'JustProject_test',
61. -- 可选配置(若有空间字段必须配置), 空间字段, 格式:字段1,字段2,字段3
62. 'connector.spatial.fields' = 'mbr',
63. -- 主键字段
64. 'connector.primary-key.field' = 'pid',
65. -- 可选配置, 缓冲刷写数据的时间间隔,默认值3s
66. 'connector.write.buffer-flush.interval' = '10s'
67. );
68.
69.
70. -- 定义JUST sink
71. CREATE TABLE just_sink_vehicle_gps_test01 (
72. vehiclegpsid STRING,
73. vehiclenum STRING,
74. -- JUST中对应的空间字段定义在Flink中全部用STRING类型表示,用WKT的格式输出
75. `position` STRING,
76. -- Flink中TIMESTAMP(3)对应JUST中的TIMESTAMP类型
77. recordtime TIMESTAMP(3),
78. flag INT
79. ) WITH (
80. 'connector.type' = 'geomesa',
81. -- sft-name的值为:JUST的表名称
82. 'connector.sft-name' = 'gps_point_test',
83. 'connector.datastore.hbase.zookeepers' = 'zk1:2181,zk2:2181,zk3:2181',
84. -- catalog的值为:JUST的用户名_JUST的库名称
85. 'connector.datastore.hbase.catalog' = 'JustProject_test',
86. -- 可选配置(若有空间字段必须配置), 空间字段, 格式:字段1,字段2,字段3
87. 'connector.spatial.fields' = 'position',
88. -- 主键字段
89. 'connector.primary-key.field' = 'vehiclegpsid',
90. -- 可选配置, 缓冲刷写数据的时间间隔,默认值3s
91. 'connector.write.buffer-flush.interval' = '10s'
92. );
93.
94.
95. -- 处理逻辑
96. INSERT INTO just_stat_result
97. SELECT
98. CONCAT(grid_id,CAST(timestamp2MS(TUMBLE_START(proctime, INTERVAL '5' MINUTE)) AS STRING)) as pid,
99. grid_id,
100.
st_grid2Wkt(grid_id) as mbr,
101.
-- 窗口的开始时间, 返回TIMESTAMP(3)类型
102.
TUMBLE_START(proctime, INTERVAL '5' MINUTE) as start_time,
103.
-- 窗口的结束时间, 返回TIMESTAMP(3)类型
104.
TUMBLE_END(proctime, INTERVAL '5' MINUTE) as end_time,
105.
COUNT(DISTINCT vehiclenum) as car_num
106.
FROM (
107.
SELECT
108.
st_pointInGrid(longitude, latitude) as grid_id,
109.
vehiclenum,
110.
k.proctime as proctime
111.
FROM kafka_source_vehicle_gps_test as k
112.
-- FOR SYSTEM_TIME AS OF 系统快照, 固定写法
113.
INNER JOIN mysql_dim_gov_chemical_vehicle_base FOR SYSTEM_TIME AS OF k.proctime as m
114.
ON k.vehiclenum = m.plate_number
115.
WHERE longitude >= 120.2049002800000039 and longitude <= 121.9726016000000044 and latitude >= 31.6194071399999999 and latitude <= 32.7087668699999981
116.
) t WHERE grid_id is not null
117.
-- 按照5分钟窗口做聚合统计
118.
GROUP BY TUMBLE(proctime, INTERVAL '5' MINUTE), grid_id;
119.
120.
121.
-- 处理逻辑
122.
INSERT INTO just_sink_vehicle_gps_test01
123.
SELECT
124.
vehiclegpsid,
125.
vehiclenum,
126.
-- 创建point,以WKT的格式输出
127.
st_asText(st_makePoint(longitude,latitude)) as `position`,
128.
-- 将13位的时间时间戳转成TIMESTAMP,FROM_UNIXTIME接收10位的时间戳
129.
TO_TIMESTAMP(FROM_UNIXTIME(recordtime/1000)) as recordtime,
130.
IF(m.plate_number IS NOT NULL, 0, 1) as flag
131.
FROM kafka_source_vehicle_gps_test as k
132.
-- FOR SYSTEM_TIME AS OF 系统快照, 固定写法
133.
LEFT JOIN mysql_dim_gov_chemical_vehicle_base FOR SYSTEM_TIME AS OF k.proctime as m
134.
ON k.vehiclenum = m.plate_number;