开发者社区 > 博文 > 【JUSTQL Reference】实时join维表&窗口统计示例
分享
  • 打开微信扫码分享

  • 点击前往QQ分享

  • 点击前往微博分享

  • 点击复制链接

【JUSTQL Reference】实时join维表&窗口统计示例

  • 京东城市JUST团队
  • 2021-01-26
  • IP归属:未知
  • 66560浏览

1.1 实时join维表&窗口统计示例

1.   -- 定义kafka source  

2.   CREATE TABLE kafka_source_vehicle_gps_test (  

3.       vehiclegpsid    STRING,  

4.       vehiclenum      STRING,  

5.       longitude       DOUBLE,  

6.       latitude        DOUBLE,  

7.       recordtime      BIGINT,  

8.       -- 如果有join处理的话需要声明处理时间字段固定写法:xxx AS PROCTIME()  

9.       proctime AS PROCTIME()  

10. WITH (  

11.     'connector.type' = 'kafka',  

12.     'connector.version' = '0.11',  

13.     'connector.topic' = '71e0ef60-c039-485b-bd3e-e6fa2e5dbcc6',  

14.     'connector.startup-mode' = 'group-offsets',  

15.     'connector.properties.bootstrap.servers' = 'KAFKA_IP:9092,KAFKA_IP:9092,...',  

16.     'connector.properties.group.id' = 'test-g1',  

17.     'update-mode' = 'append',  

18.     'format.type' = 'json',  

19.     'format.derive-schema' = 'true'  

20. );  

21.   

22.   

23. -- 定义mysql维表  

24. CREATE TABLE mysql_dim_gov_chemical_vehicle_base (  

25.     id              BIGINT,  

26.     plate_number    STRING  

27. WITH (  

28.     'connector.type' = 'jdbc',  

29.     'connector.url' = 'jdbc:mysql://mysql.just.icity.internal:3306/xxx',  

30.     -- mysql中表名称  

31.     'connector.table' = 'table_name',  

32.     'connector.driver' = 'com.mysql.jdbc.Driver',  

33.     'connector.username' = 'xxx',  

34.     'connector.password' = 'xxx',  

35.     -- 可选配置缓存的最大行数超过此值时将删除最早的行  

36.     'connector.lookup.cache.max-rows' = '5000',  

37.     -- 可选配置缓存的最长生存时间超过此值时最旧的行将过期; connector.lookup.cache.max-rows  connector.lookup.cache.ttl必须同时出现  

38.     'connector.lookup.cache.ttl' = '60s',  

39.     -- 可选配置连接database失败最大重试次数  

40.     'connector.lookup.max-retries' = '5'  

41. );  

42.   

43.   

44. -- 定义JUST统计结果表  

45. CREATE TABLE just_stat_result (  

46.     pid           STRING,  

47.     grid_id       STRING,  

48.     -- JUST中对应的空间字段定义在Flink中全部用STRING类型表示,用WKT的格式输出  

49.     mbr           STRING,  

50.     -- FlinkTIMESTAMP(3)对应JUST中的TIMESTAMP类型  

51.     start_time    TIMESTAMP(3),  

52.     end_time      TIMESTAMP(3),  

53.     car_num       BIGINT  

54. WITH (  

55.     'connector.type' = 'geomesa',  

56.     -- sft-name的值为:JUST的表名称  

57.     'connector.sft-name' = 'just_realtime_stat_result',  

58.     'connector.datastore.hbase.zookeepers' = 'zk1:2181,zk2:2181,zk3:2181',  

59.     -- catalog的值为:JUST的用户名_JUST的库名称  

60.     'connector.datastore.hbase.catalog' = 'JustProject_test',  

61.     -- 可选配置(若有空间字段必须配置)空间字段格式:字段1,字段2,字段3  

62.     'connector.spatial.fields' = 'mbr',  

63.     -- 主键字段  

64.     'connector.primary-key.field' = 'pid',  

65.     -- 可选配置缓冲刷写数据的时间间隔,默认值3s  

66.     'connector.write.buffer-flush.interval' = '10s'  

67. );  

68.   

69.   

70. -- 定义JUST sink  

71. CREATE TABLE just_sink_vehicle_gps_test01 (  

72.     vehiclegpsid    STRING,  

73.     vehiclenum      STRING,  

74.     -- JUST中对应的空间字段定义在Flink中全部用STRING类型表示,用WKT的格式输出  

75.    `position`       STRING,  

76.     -- FlinkTIMESTAMP(3)对应JUST中的TIMESTAMP类型  

77.     recordtime      TIMESTAMP(3),  

78.     flag            INT  

79. WITH (  

80.     'connector.type' = 'geomesa',  

81.     -- sft-name的值为:JUST的表名称  

82.     'connector.sft-name' = 'gps_point_test',  

83.     'connector.datastore.hbase.zookeepers' = 'zk1:2181,zk2:2181,zk3:2181',  

84.     -- catalog的值为:JUST的用户名_JUST的库名称  

85.     'connector.datastore.hbase.catalog' = 'JustProject_test',  

86.     -- 可选配置(若有空间字段必须配置)空间字段格式:字段1,字段2,字段3  

87.     'connector.spatial.fields' = 'position',  

88.     -- 主键字段  

89.     'connector.primary-key.field' = 'vehiclegpsid',  

90.     -- 可选配置缓冲刷写数据的时间间隔,默认值3s  

91.     'connector.write.buffer-flush.interval' = '10s'  

92. );  

93.   

94.   

95. -- 处理逻辑  

96. INSERT INTO just_stat_result  

97. SELECT  

98.     CONCAT(grid_id,CAST(timestamp2MS(TUMBLE_START(proctime, INTERVAL '5' MINUTE)) AS STRING)) as pid,  

99.     grid_id,  

100.        st_grid2Wkt(grid_id) as mbr,  

101.        -- 窗口的开始时间返回TIMESTAMP(3)类型  

102.        TUMBLE_START(proctime, INTERVAL '5' MINUTEas start_time,  

103.        -- 窗口的结束时间返回TIMESTAMP(3)类型  

104.        TUMBLE_END(proctime, INTERVAL '5' MINUTEas end_time,  

105.        COUNT(DISTINCT vehiclenum) as car_num  

106.    FROM (  

107.            SELECT  

108.                st_pointInGrid(longitude, latitude) as grid_id,  

109.                vehiclenum,  

110.                k.proctime as proctime  

111.            FROM kafka_source_vehicle_gps_test as k  

112.            -- FOR SYSTEM_TIME AS OF 系统快照固定写法  

113.            INNER JOIN mysql_dim_gov_chemical_vehicle_base FOR SYSTEM_TIME AS OF k.proctime as m  

114.            ON k.vehiclenum = m.plate_number   

115.            WHERE longitude >= 120.2049002800000039 and longitude <= 121.9726016000000044 and latitude >= 31.6194071399999999 and latitude <= 32.7087668699999981  

116.    ) t WHERE grid_id is not null  

117.    -- 按照5分钟窗口做聚合统计  

118.    GROUP BY TUMBLE(proctime, INTERVAL '5' MINUTE), grid_id;  

119.      

120.      

121.    -- 处理逻辑  

122.    INSERT INTO just_sink_vehicle_gps_test01  

123.    SELECT  

124.      vehiclegpsid,  

125.      vehiclenum,  

126.      -- 创建point,以WKT的格式输出  

127.      st_asText(st_makePoint(longitude,latitude)) as `position`,  

128.      -- 13位的时间时间戳转成TIMESTAMPFROM_UNIXTIME接收10位的时间戳  

129.      TO_TIMESTAMP(FROM_UNIXTIME(recordtime/1000)) as recordtime,  

130.      IF(m.plate_number IS NOT NULL, 0, 1) as flag  

131.    FROM kafka_source_vehicle_gps_test as k  

132.    -- FOR SYSTEM_TIME AS OF 系统快照固定写法  

133.    LEFT JOIN mysql_dim_gov_chemical_vehicle_base FOR SYSTEM_TIME AS OF k.proctime as m  

134.    ON k.vehiclenum = m.plate_number;  

 

共0条评论