您好!
欢迎来到京东云开发者社区
登录
首页
博文
课程
大赛
工具
用户中心
开源
首页
博文
课程
大赛
工具
开源
更多
用户中心
开发者社区
>
博文
>
clickhouse的操作实践
分享
打开微信扫码分享
点击前往QQ分享
点击前往微博分享
点击复制链接
clickhouse的操作实践
自猿其说Tech
2021-08-03
IP归属:未知
116120浏览
ClickHouse
### 1 导言 通过本文可以大大的节省你独自摸索使用clickhouse的时间,快速的上手开发;clickhouse使用过程中可能遇到的易出问题点在本文大部分都有包含。Clickhouse满足大部分Mysql语法,对于新接触的人比较友好,如果想flink+clickhouse的方式使用clikhouse,我们将clickhouse作为sink的一个模块已进行了封装,有需要的可以一起沟通交流。 ### 2 应用场景 ClickHouse是一款面向大数据场景下的OLAP数据库,相比于传统的基于Hadoop生态圈的OLAP大数据分析系统,ClickHouse具有极致的查询性能、轻量级的架构设计及维护简单等优势。目前社区活跃度高,业界应用实践日趋广泛。 ### 3 技术选型 对于数据指标的多维度分析场景,相较于OLTP(联机事务)系统,业界把此类面向BI的系统统称为OLAP(联机分析)系统。伴随着计算机软件技术的发展、从单机工具的少量数据分析(如Excel),到中等规模数据通过分析型关系数据库构建(如微软的SSAS)的OLAP,再到今日的大数据时代,海量数据的实时OLAP分析引擎,技术上的推陈出新,大体上可以将它们从架构模式上划分为两大类: 1. MPP架构。MPP架构特点是服务将接收到的查询请求发送到每个计算节点,待计算节点计算完成后,通过一个节点将最终结果汇总在一起得到最终结果。典型实现如Presto、Impala、SparkSQL、Drill、Doris等。MPP架构的特点是支持灵活的数据模型,要达到较高性能对内存开销大。 1. 预计算系统。预计算的核心思想是利用空间换时间,通过深入业务理解,将需要查询的数据指标和维度组合进行预处理,将计算好的结果存入数据库并建立对应索引,实现查询加速。典型实现如Kylin、Druid。预计算系统特点是性能较高,但灵活性较差,一般对数据模型调整会涉及到历史数据的重跑,维护困难。 ![](//img1.jcloudcs.com/developer.jdcloud.com/4ecf0546-529e-4e7b-8d4a-d1cdfc6b226e20210803163224.jpg) 从上表可知,目前业界还没有一个OLAP引擎能够同时兼顾性能和灵活性的要求,京东能源管理平台在做技术选型的时候,综合考虑了模型的灵活性、部署的难易程度、开发成本、可维护性以及是否适合云端部署等因素,最终决定使用基于MPP架构的ClickHouse作为我们的OLAP引擎。 ### 4 Clickhouse创建 京东智联云平台(x.devops.jdcloud.com)的Clickhouse只提供了数据存储,没有提供数据库及数据表的操作界面; 所有的操作可通过登录任一堡垒机的方式,使用curl的方式操作(curl 'ck数据库地址:8213/?user=数据库用户名&password=数据库密码&query' -d '操作命令')或者通过clickhouse client进行连接后操作,因此所有的操作需谨慎操作。 操作命令举例:SHOW DATABASES(TABLES) ### 5 Clickhouse 表引擎 MergeTree:该系列的引擎被设计用于插入极大量的数据到一张表当中。数据可以以数据片段的形式一个接着一个的快速写入,数据片段在后台按照一定的规则进行合并。主要特点:存储的额数据按主键排序;支持分区;支持副本;支持采样; 只有 MergeTree 系列里的表可支持副本,副本是表级别的,不是整个服务器级的。所以,服务器里可以同时有复制表和非复制表;副本不依赖分片;每个分片有它自己的独立副本。 ReplacingMergeTree:可以针对相同主键的数据进行去重,它能够在合并分区时删除重复的数据。常使用这种引擎实现真正存储数据, 数据去重会在后台一个不确定的时间进行;**OPTIMIZE 可手动发起合并,但是非常耗时危险,严重情况会卡住后续ddl** 创建ReplicatedReplacingMergeTree CK数据库表: ```sql CREATE TABLE judgement_online.judgement_da_model on CLUSTER default ( deal_id Int64 COMMENT '主键编码', deal_dept String DEFAULT '' COMMENT '操作部门' , deal_user_pin String, deal_begin_time DateTime, deal_type String, deal_time DateTime, deal_type_name String, ts DateTime ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/judgement_online.judgement_da_model','{replica}') PARTITION BY toYYYYMM(ts) ORDER BY deal_id SAMPLE BY deal_id SETTINGS index_granularity = 8192 ``` judgement_da_model为创建的数据库表名 ENGINE配置汇总的{shard} 和{replica}占位符不需要手动指定,在创建数据表的时候会拉取ck配置配置文件中的丢应的值自动填充。 on CLUSTER default :在创建ck 数据库表的时候强制指定集群,否则会出现查询时有时没有的情况。 PARTITION BY:分区键,在将新数据插入表中时,每个分区的数据按照目录存储为单独的数据片段,在插入后的10~15分钟内,同一个分区的数据片段将合并为一个整体的数据片段。 所以如果分区键设置不合理,将不调数据的不同状态插入到不同的分区,是无法合并的,会出现查询的时候返回多条的情况。 ORDER BY: 排序键(按**主键**理解即可,默认情况主键(primary key)和排序键相同) judgement_online为创建的数据库,创建命令: ```sql CREATE DATABASE judgement_online ON CLUSTER default ENGINE = Ordinary; ``` **Distributed引擎** 不会自己存储任何数据,但允许在多个服务器上进行分布式查询处理,相当于**视图**。 定义形式:Distributed(cluster_name,database_name,table_name[,sharding_key]) 各个参数的含义分别如下: **cluster_name:**集群名称,与集群配置中的自定义名称相对应。 **database_name:**数据库名称 **table_name:**表名称 **sharding_key:**可选的,用于分片的key值,在数据写入的过程中,分布式表会依据分片key的规则,将数据分布到各个节点的本地表。 为已创建的ReplicatedReplacingMergeTree创建视图 ```sql create table judgement_online.judgement_da on CLUSTER default as judgement_online.judgement_da_model ENGINE = Distributed(default, judgement_online, judgement_da_model, intHash64(deal_id)); ``` ### 6 查询去重 ClickHouse没有UPDATE语法;如果想对数据进行更新的话,只能是在原数据基础之上进行特定字段的插入(更新); 创建本地表 ```sql CREATE TABLE judgement_pre.waybill_code_model on CLUSTER default( waybill_code String COMMENT '数据主键', waybill_type String COMMENT '运单类型', collection_type String COMMENT '收款类型', waybill_state String COMMENT '运单状态', dt date COMMENT '数据dt' )ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/judgement_pre.waybill_code_model','{replica}') PARTITION BY toYYYYMMDD(dt) ORDER BY waybill_code SAMPLE BY waybill_code SETTINGS index_granularity = 8192; ``` 创建分布式表: ```sql create table judgement_pre.waybill_code on CLUSTER default as judgement_pre.waybill_code_model ENGINE = Distributed(default, judgement_pre, waybill_code_model, sipHash64(waybill_code)); ``` 通过本地表插入数据: ```sql insert into judgement_pre.waybill_code_model values('JDL001','ZY','0','-450','2021-07-20 09:12:34'); insert into judgement_pre.waybill_code_model values('JDL001','ZY','0','10','2021-07-18 11:56:34'); insert into judgement_pre.waybill_code_model values('JDL002','WD','1','-460','2021-07-21 09:12:34'); ``` 查询插入的数据,结果如下,你会发现对于运单JDL001数据重复了,正常意义上我们是需要最新状态的数据,出现这种情况的原因是引擎ReplicatedReplacingMergeTree的数据合并是在后台通过一个线程通过定义的主键进行的,时间不确定( 10-15 分钟); ![](//img1.jcloudcs.com/developer.jdcloud.com/2bd98bfe-29f4-4c02-8e09-44e07520d99b20210803163613.png) 会在数据去重可通过关键字final (), #### 6.1 查询去重方法一 可通过关键字optimize来进行数据去重,但是不建议使用,刚才已经说过了,数据的合并在后台有一个单独的线程来在一定的时间内完成的,optimize关键字相当于我们手动启动了整个数据表的合并 #### 6.2 查询去重方法二 还可通过关键字final来进行,当final被指定,CK会在返回结果之前合并数据,尽管后台数据合并的过程随着CK的优化从单线程优化成多线程(合并的时候同分区的数据根据order by的字段合并,不同分区的数据是没法合并的),但这部分时间消耗还是存在的 ![](//img1.jcloudcs.com/developer.jdcloud.com/4f365906-acbf-4407-8514-607921ab4b6a20210803163637.png) 再次执行了数据查询,但是却没有查询到数据(刚才插入的数据没有做任何删除操作);唯一的区别是登录了不同的节点来查询的;这就是本地表的一个问题,当我们通过程序插入的数据会被分散到不同的节点上,当我们在查询的时候,会以其中的一个节点作为master来查询本数据节点的数据,其他数据节点的是查询不到的,因此会少一部分数据。 ![](//img1.jcloudcs.com/developer.jdcloud.com/a3940417-67fa-4326-a0a4-89423a27a66b20210803163700.png) 这种情况可通过创建的分布式表judgement_pre.waybill_code来进行数据的插入和查询,分布式表在创建的时候已经指定了数据的分布策略:按照运单号做hash(sipHash64(waybill_code)),相同主键的数据会落到相同的分片,后续merge的时候将多分数据合并;如果插入的是底层表的话,会出现相同主键的数据落到不同的分片上;数据合并只会合并同一分片上的数据; 通过分布式表查询的时候,是以其中一个数据节点作为master,向所有的数据节点发送数据请求命令的,所以不会出现本地表查询不到数据的情况。 通过分布式表插入数据: ```sql insert into judgement_pre.waybill_code values('JDL003','ZY','0','-450','2021-07-20 09:12:34'); insert into judgement_pre.waybill_code values('JDL004','ZY','0','10','2021-07-18 11:56:34'); insert into judgement_pre.waybill_code values('JDL004','WD','1','-460','2021-07-21 09:12:34'); ``` 通过不同的节点查询数据并不会出现刚才通过本地表查询数据的时候,少数据的情况。 ![](//img1.jcloudcs.com/developer.jdcloud.com/c311d909-4ba9-4435-964a-feaf5a22007f20210803163736.png) ![](//img1.jcloudcs.com/developer.jdcloud.com/36aaa761-4ae7-4fca-bf58-7f0d31fedae220210803163747.png) #### 6.3 查询去重方法三 可通过group by +argMax的方式也可达到使用final去重的效果 ```sql select * from ( select waybill_code, argMax(waybill_type,dt),argMax(collection_type,dt),argMax(waybill_state,dt) from judgement_pre.waybill_code group by waybill_code )a where a.wabill_state='' ``` 具体使用方法二还是方法三,看具体使用场景:如果你的数据是离线导入的,没有重复数据,可直接查询,不用final和argMax+group by的方式; 在本人实现生产系统(两张400万的数据表做join)的查询DDL的时候,我通过比对方法二和方法三,当where 过滤条件里面的涉及的字段的值是固定不变的时候(比如运单的创建时间,查询创建时间在特定时间范围内的运单,因为创建时间是不变的,所以可以通过where过滤大部分数据),where和group by 可放在同一层,性能是方法三优于方法二;当where过滤条件里面的涉及的字段的值会改变的时候(比如运单状态,当运单状态是变化的时候,就没法先过滤;假如运单1的状态变化是10->20->30,如果只要运单状态是20的,先过滤状态是20的话,运单1会被卡出来,但是如果先group by之后,运单1的最新状态是30,通过状态为20过滤的时候,运单1会被过滤掉),必须得是先group by,后where,性能是方法二优于方法三。 你在查询的时候两种方式的性能可以比对一下,选择性能优的一种。 ### 7 操作卡顿 当执行一些DELETE/UPDATE/ALTER等后台异步进行的语句进行大批量删除/更新操作的时候,既有可能会卡住。 可通过如下语句判断操作是否完成。 ```sql create table judgement_online.judgement_da on CLUSTER default as judgement_online.judgement_da_model ENGINE = Distributed(default, judgement_online, judgement_da_model, intHash64(deal_id)); ``` 然后通过执行如下语句kill掉卡住的执行语句 ```sql KILL MUTATION WHERE database = 'default' AND table = 'table' AND mutation_id = 'mutation_id ' ``` ### 8 查看日志 使用批量插入的方式是异步的,且clickhouse不支持事务,所以数据插入成功与否无法感知,可通过如下方式查询一些日志: ```sql select * from system.query_log where query like 'INSERT INTO 数据库.表名%' order by event_time desc limit 100; ``` ### 9 数据删除 #### 9.1通过删除分区的方式删除数据: 分区是在创建表的时候就指定了按照什么样的格式(PARTITION BY toYYYYMM(ts) ,按照日期的格式) 查询分区ddl: ```sql select * from system.parts p where database='数据库' and table='表名' limit 1; ``` ![](//img1.jcloudcs.com/developer.jdcloud.com/8a875b17-8c7a-462e-9d18-26358b5b95e820210803163941.png) 删除分区ddl: ```sql ALTER TABLE 数据库.表名 on CLUSTER default DROP partition '20210512'; ``` ![](//img1.jcloudcs.com/developer.jdcloud.com/94109d7d-3a2b-4493-a59c-67103901d9b920210803164007.png) 按照分区删除数据的时候,不能通过Distributed表删除数据(DB::Exception: Table engine Distributed doesn't support partitioning);在删除的时候必须写on CLUSTER default,**数据是通过分布式存储的;删除命令是在其中一个数据存储节点作为master发起,因此会将master之外的分区数据删除,作为master的节点上的指定分区是无法被删除的,除非切换到其他节点,再执行一遍删除ddl**;标粗内容属于我个人理解。 #### 9.2 通过特定条件删除数据: ```sql ALTER TABLE 数据库.表名 on CLUSTER default DELETE WHERE ? ; ``` ### 10 分布式查询执行 集群设置中的服务器大多是独立的。你可以在一个集群中的一个或多个服务器上创建一个 Distributed 表。Distributed 表本身并不存储数据,它只为集群的多个节点上的所有本地表提供一个«视图(view)»。当从 Distributed 表中进行 SELECT 时,它会重写该查询,根据负载平衡设置来选择远程节点,并将查询发送给节点。Distributed 表请求远程服务器处理查询,直到可以合并来自不同服务器的中间结果的阶段。然后它接收中间结果并进行合并。分布式表会尝试将尽可能多的工作分配给远程服务器,并且不会通过网络发送太多的中间数据。 当 IN 或 JOIN 子句中包含子查询并且每个子查询都使用分布式表时,事情会变得更加复杂。我们有不同的策略来执行这些查询。 分布式查询执行没有全局查询计划。每个节点都有针对自己的工作部分的本地查询计划。我们仅有简单的一次性分布式查询执行:将查询发送给远程节点,然后合并结果。但是对于具有高基数的 GROUP BY 或具有大量临时数据的 JOIN 这样困难的查询的来说,这是不可行的:在这种情况下,我们需要在服务器之间«改组»数据(we need to “reshuffle” data between servers),这需要额外的协调。ClickHouse 不支持这类查询执行,我们需要在这方面进行努力。 **如下sql,将第一行的deal_user_region_name更新成任一非join表字段,查询结果则会出现很大差异** ```sql SELECT '全国' AS deal_user_region_name1, count(deal_id) AS handler_num FROM ( SELECT id, joint_duty_type FROM judgement_online.biz_judgement final WHERE summary_type != 2 AND proc_inst_id IS NOT NULL AND proc_inst_id <> '' ) judgement_data JOIN ( SELECT id AS deal_id, judgement_id, deal_user_region as deal_user_region_name FROM judgement_online.biz_judgement_deal final WHERE deal_type NOT IN (11, 500) AND role_id IN (27, 28, 29, 30, 31, 33) AND deal_end_time > formatDateTime(now(), '%Y-%m-%d 00:00:00') ) deal_data ON judgement_data.id = deal_data.judgement_id WHERE (deal_user_region_name = 'BLOC_20201215000003' AND joint_duty_type IN (0,1)) OR deal_user_region_name != 'BLOC_20201215000003' ``` #### 附录1 jar引入 ```xml <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.2.4</version> </dependency> ``` #### 附录2 获取ClickHouseConnection ```java ClickHouseDataSource dataSource = new ClickHouseDataSource( SystemConfi .getProp("ck_connection_url"),new ClickHouseProperties()); ClickHouseConnectionImpl) dataSource.getConnection(SystemConfig.getProp("ck_connection_username"), SystemConfig.getProp("ck_connectoord")))); ``` ------------ 自猿其说Tech-JDL京东物流技术发展部 作者:客户服务技术部 柳栋栋
原创文章,需联系作者,授权转载
上一篇:点亮B端产品技能树——业务调研
下一篇:Flink入门到实践
相关文章
京东mPaaS平台之Android组件化系统私有化部署改造实践!
别困惑,不是你的错!90%的开发者把Clubhouse看成了Clickhouse!
JUST技术: 使用 ClickHouse实现时序数据管理
自猿其说Tech
文章数
426
阅读量
2149964
作者其他文章
01
深入JDK中的Optional
本文将从Optional所解决的问题开始,逐层解剖,由浅入深,文中会出现Optioanl方法之间的对比,实践,误用情况分析,优缺点等。与大家一起,对这项Java8中的新特性,进行理解和深入。
01
Taro小程序跨端开发入门实战
为了让小程序开发更简单,更高效,我们采用 Taro 作为首选框架,我们将使用 Taro 的实践经验整理了出来,主要内容围绕着什么是 Taro,为什么用 Taro,以及 Taro 如何使用(正确使用的姿势),还有 Taro 背后的一些设计思想来进行展开,让大家能够对 Taro 有个完整的认识。
01
Flutter For Web实践
Flutter For Web 已经发布一年多时间,它的发布意味着我们可以真正地使用一套代码、一套资源部署整个大前端系统(包括:iOS、Android、Web)。渠道研发组经过一段时间的探索,使用Flutter For Web技术开发了移动端可视化编程平台—Flutter乐高,在这里希望和大家分享下使用Flutter For Web实践过程和踩坑实践
01
配运基础数据缓存瘦身实践
在基础数据的常规能力当中,数据的存取是最基础也是最重要的能力,为了整体提高数据的读取能力,缓存技术在基础数据的场景中得到了广泛的使用,下面会重点展示一下配运组近期针对数据缓存做的瘦身实践。
自猿其说Tech
文章数
426
阅读量
2149964
作者其他文章
01
深入JDK中的Optional
01
Taro小程序跨端开发入门实战
01
Flutter For Web实践
01
配运基础数据缓存瘦身实践
添加企业微信
获取1V1专业服务
扫码关注
京东云开发者公众号