Application practice of JD Cloud ClickHouse and ES dual-engine design in retail selection

  • JDT Developer
  • 2022-01-08
  • IP归属:未知
  • 129 browse

    Background


    Nirvana Selection (Niepan Selection) is a strategic bigboss project in JDR, dedicated to building the underlying capabilities of commodities, forming the submission and delivery processes, enabling the online, rule-based and intelligent selection, and fully expressing will of related parties, such as marketing, category, operation/purchase and marketing through multi-party collaborative inventory.

    The diversified requirements of the business lead to the following technical difficulties and challenges in the initial stage of the project.

    clickhouse1.jpg


    Solutions to R&D Obstacles


    JDR has designed a set of technical solutions as shown below to address the above technical difficulties:

    clickhouse2.jpg

    The technical solution mainly falls into three modules in terms of data storage query:
    Module 1: The storage structure design module of ClickHouse and Elasticsearch;
    Module 2: The data push and verification module of ClickHouse;
    Module 3: The data push and verification module of Elasticsearch.


    I. Specific Technical Solution


    There are three major problems to be addressed: first, the incompatibility of rapid screening and fast multi-dimensional statistical query; second, low efficiency in importing massive commodity characteristic data; and third, high storage occupation of massive commodity characteristic data. The scenario relates to a rule-based commodity selection e-commerce platform with tens of billions of commodities, where all commodities have different characteristic data, the marketing activities are released continuously, and the stores can submit some commodities to these marketing activities to achieve the marketing purpose of the platform and sales purpose of the merchants. However, if a merchant wants to submit the commodities of his store accurately, he should select the target commodities following certain rules and expect that the commodities he submitted can achieve good effect. In order to meet this demand, he will calculate the characteristic data of the full amount of commodities. There may be approximately 10 billion pieces of full commodity data. A big and wide table can be obtained after filtering out obviously unqualified commodities in the early stage, which will include hundreds of millions of commodities and hundreds of tags.

    As a common practice, the data will be directly imported into the database, or imported after preprocessing, and then provided to the retrieval platform for query. However, each database has its focus, and almost no database can satisfy the query requirements of fast and simple screening and fast multidimensional statistics at the same time. In addition, the large number of screening tasks should be based on historical data which not only occupies much storage, but also affects the overall query efficiency.

    A method combining Elasticsearch with ClickHouse is proposed in the paper, along with the concept of snapshot table in terms of storage. Based on Spark’s offline data import and verification solutions for Elasticsearch and ClickHouse, and in the case that only two copies of full latest data are saved, this method greatly reduces the occupation of storage resources, enables both rapid screening and multidimensional statistical query, and can also rapidly import commodity characteristic data, greatly improving the efficiency of data update.

    The specific method is as follows:


    Here are the specific design solutions for the storage structure of ClickHouse and Elasticsearch:


    a. The ClickHouse + Elasticsearch solution was adopted for storage to enable both rapid screening (mainly based on Elasticsearch) and rapid multidimensional statistical query (mainly based on ClickHouse). Dual storage engines would often associate with the problem of data consistency. In this invention, the data consistency was ensured by verifying the data during the data import process of ClickHouse and Elasticsearch.

    b. There are two types of retrieval platforms in terms of time frame during data query. One type is the real-time query of new tasks. With regard to such tasks, a table was created for ClickHouse and Elasticsearch respectively in this invention. Specifically, for ClickHouse, a new table was generated that incorporated the distributed table and local table on each shard, where the distributed table was designed for data query, and the local table was designed as ReplicatedReplacingMergeTree engine for data import, and the historical data of the table was cleared after the latest data was successfully imported and verified every day. While for Elasticsearch, a set of indexes was generated every day based on the category information of the commodities. The reason for the use of “a set of indexes” instead of “an index” lies in that the latter would associate with extremely large data volume that may affect query efficiency; the data of the historical index was cleared after the latest data was successfully imported and verified every day.

    The other type is the real-time query of historical tasks, which is based on historical data. In the invention, the concept of snapshot table was designed for this query. Specifically, every morning, a traversal query was performed for the tasks created on the previous day to obtain the ID of a specific commodity, which, after being processed in the data warehouse, would be used to obtain the full commodity characteristic data of the task created on the previous day finally. The final data would not be updated, but was a snapshot of the tasks created on the previous day, which would be stored by building a snapshot table in ClickHouse and Elasticsearch respectively. Different from the storage of the latest data every day, the snapshot index in Elasticsearch involves a set of indexes generated based on the tasks in the snapshot data, and is no longer based on the category of the commodity, since all subsequent queries on the snapshot data are performed within the scope of a task. This design ensures effective query.

    c. There is also a real-time query for secondary selection in this scenario in addition to the queries in the above two time frames. Secondary selection refers to a secondary simple screening for target commodities on the basis of historical tasks. This selection scenario is special in that it involves a part of real-time tags. In the invention, the parent-child documents of Elasticsearch were employed for the storage design of these tags. Here is the specific plan: Every day, a set of indexes (the backflow table in the architecture diagram) was generated in Elasticsearch based on task information to store the commodity characteristics data of historical tasks. Unlike snapshots, the set of indexes was used to store the full historical data within the validity period every day, and followed a parent-child document structure. Every day, the historical index data was cleared after the offline data was successfully imported and verified; the offline task data was imported into the parent document of the index; and the tags in the child document would be updated in real time based on structured-streaming tasks. The reason for this design lies to the low efficiency in updating parent document which involves hundreds of tags, compared with the higher efficiency in updating child document which involves only a limited number of real-time tags.


    2. Data push and verification for ClickHouse


    The flow charge of the data push and verification architecture for ClickHouse is as follow:

    clickhouse3.jpg

    The specific implementation details are as follows:

    a. Process and integrate the data of commodities, users, traffic, etc. in the data warehouse every day to generate the characteristic data of the required commodities, and a wide table of characteristic data finally;

    b. Initiate the Spark task. First, read the wide table of data generated in the previous step, and process the data, which mainly includes: convert the types of tag fields in accordance with the field type of ClickHouse table in the configuration file, fill in null values (null values cannot be stored in ClickHouse table by default), perform special processing and format conversion for fields that need to be stored as Array, Nested and other structures;

    c. In the Spark task, get the metadata information for the table to be created in ClickHouse from the configuration file; check the ClickHouse cluster first for the latest table (both the distributed table and the local table), if yes, delete the data first (clear the miswritten data of the day, not the historical data); then check whether the current table structure is consistent with the configuration file, if not, delete or add fields to ensure that the table structure is consistent with the configuration file. Create table if no table exists. First, create a local table (ReplicatedReplacingMergeTree as the engine) on each node of the ClickHouse cluster according to the configuration file. Then, create a distributed table based on the local table. The design of ReplicatedReplacingMergeTree engine for local table serves two purposes: (i) ensure the consistency of data between replicas in each shard of ClickHouse with the capability of Zookeeper, so that the user only needs to import the data into one node in each shard; (ii) optimize the imported data by use of ReplacingMergeTree engine to ensure that there is no duplicate data on each node;

    d. After creating or updating the table in ClickHouse, allocate the data read by Spark by primary key field (hash the primary key, and then perform modulo on the number of ClickHouse shards) to ensure that the data on the shards follows the same rules. Then repartition the allocated data, control the data set to a parallelism acceptable to the cluster. Finally, start the data push program in parallel in multiple threads on the Driver side of Spark, write the data into the ClickHouse table in certain batches by leveraging the PreparedStatement of JDBC to reduce the request frequency of Spark to ClickHouse;

    e. Following the data push on all shards, Spark will verify the data in ClickHouse following established rules. In the present invention, the data was verified by category dimension, that is, the data size under all categories in ClickHouse was queried, and verified against the data size obtained by Spark from the warehouse. The following three situations may occur during the verification:

    First, the data size of ClickHouse was consistent with that in warehouse. In this case, the data of current category passed the verification;

    Second, the data size of ClickHouse was greater than that of warehouse. In this case, the local table of current shard should be optimized, including merging data, before data verification;

    Third, the data size of ClickHouse was smaller than that of warehouse. In this case, the data of the currently verified category in the local table of the current shard should be re-pushed before verification;
    The data was verified by dimension to help find problems during data import and ensure the timeliness and efficiency of data import and verification. After the overall data verification was passed, the Spark task would send the current time version back to the data retrieval platform and inform ClickHouse that the latest data was available, and then clear the historical data of the day before yesterday to reduce the overall storage resource usage.

    This solution also applies to data such as snapshot data and secondary selection, but their data clearing policies are different.


    3. Data push and verification of Elasticsearch.


    The flow charge of the data push and verification architecture for Elasticsearch is as follow:

    clickhouse4.jpg

    The specific implementation details are as follows:

    a. Process and integrate the data of commodities, users, traffic, etc. in the data warehouse every day to generate the characteristic data of the required commodities, and a wide table of characteristic data finally (This step is the same as the first step in the data push and verification for ClickHouse);

    b. Initiate the Spark task. First, read the wide table of data generated in the previous step, and process the data, which mainly includes: convert the types of tag fields in accordance with the field type of Elasticsearch index in the configuration file, fill in null values in the same way as that for ClickHouse import module (ensure the data consistency with ClickHouse), perform special processing and format conversion for fields that need to be stored as Array, Nested and other structures. In case that the pushed index is a parent-child document, the dataset should be reorganized to satisfy the data structure of the parent-child document;

    c. The Spark tasks performed dimensional statistics on the data of the data set to facilitate the creation of a set of indexes. In the present invention, the statistics on data set was performed by the dimension of commodity category to obtain the data size of each category. Then, a specified number of indexes were created in the Elasticsearch cluster based on the latest configuration information of the current Elasticsearch index. After that, each category was allocated to one of the indexes according to a certain algorithm logic to ensure that the commodity data of the same category must be included in the same index, and the data size in all indexes is as uniform as possible. This design aims to avoid excessive data size in one index that may affect query efficiency, and improve the overall query concurrency performance to a certain extent. In addition, the plan can be expanded horizontally according to the business volume, such as increasing the number of clusters.

    d. After obtaining Elasticsearch index attribution information for each category of the data set from the previous step, the Spark task performs data binning on the full data set, so that all the data in a data bin belongs to one Elasticsearch index. After data binning, the data will be written into Elasticsearch in batch according to the Bulkprocessor method of Elasticsearch with the following overall writing logic: the indexes of multiple clusters are written in parallel, and indexes of the same cluster are written in serial by taking the primary key of the data set as the _id of the Elasticsearch index, writing. This method can avoid the occurrence of multiple writing points in an Elasticsearch cluster that may lead to lots of rejections and thus result in low writing efficiency. Parallel writing helps improve writing efficiency, while taking primary key as _id helps avoid duplication of data writing;

    e. Following the data push for all indexes, Spark will verify the data in Elasticsearch following established rules. In the present invention, the data was verified by category dimension, that is, the data size under all categories in the indexes of Elasticsearch cluster was queried, and verified against the data size obtained by Spark from the warehouse. The following two situations may occur during the verification:

    First, the data size of Elasticsearch was consistent with that in warehouse. In this case, the data of current category passed the verification;

    Second, the data size of Elasticsearch was greater than that of warehouse. In this case, the data of the current category should be imported repeatedly before data verification;

    The data was verified by dimension to help find problems during data import and ensure the timeliness and efficiency of data import and verification. After the overall data verification was passed, the Spark task would send the current time version back to the data retrieval platform and inform Elasticsearch that the latest data was available, and then clear the historical data of the day before yesterday to reduce the overall storage resource usage.

    This solution also applies to data such as snapshot data and secondary selection, but their data clearing policies are different.


    II. Technical Solution Implementation Process


    Specific flow is shown as follows:

    clickhouse5.jpg

    III. Test Conclusion


    At present, there are more than 1 billion pieces of full commodity characteristics data, and a total of 490 tags. Every day, it takes 40 minutes (40 shards) to import data into ClickHouse offline, the time is reduced by 80% on average compared with that for former distributed table mode; it takes 2 hours to import data into Elasticsearch offline (single cluster, which can be expanded horizontally), the time is reduced by 60% on average compared with that for single-index writing mode (writing may fail occasionally). In addition to supporting simple screening, drilling up/down, top, window query, multi-dimensional sorting, and association aggregation, the online retrieval platform can attain a maximum of 300 in QPS, tp99 at millisecond level. Compared with common practice, the method in the plan reduces the occupation of storage resources by 60%-70%.


    Future Perspectives

     
    Since the launch of the project, this technical solution has supported a multi-scenario, multi-business and diversified selection process, and provided the underlying capabilities of data and indexing for the overall rule-based, online and intelligent selection. The solution can meet the practical needs of the current multi-party business, however, it can be further optimized and expanded in the following aspects:

    I. Dual-engine design. Although the current sophisticated selection platform can enable both rapid screening and complex computing in face of the massive data (1 billion+ entities, 600+ tags), it also has a fatal problem – data consistency. Currently, the data consistency is ensured by employing the same set of default values, and verifying data after updating. However, the time for verification every day may affect the overall data update timeliness to a certain extent. Therefore, it is an important aspect to be optimized in the future.

    II. At present, there are 600+ tags in the entire system, 99% of which are offline tags, and there are only a few real-time tags. However, a good selection platform must be based on a large number of real-time tags. The writing, updating, and indexing of real-time tags are very different from offline tags, which will bring great challenges to the data consistency mentioned above, and even affect the underlying architecture design of the entire data index.



    number of articles
    544
    reading volume
    3296738