[go: up one dir, main page]

CN109359139A - Data synchronization method, system, electronic device and computer readable storage medium - Google Patents

Data synchronization method, system, electronic device and computer readable storage medium Download PDF

Info

Publication number
CN109359139A
CN109359139A CN201811244243.4A CN201811244243A CN109359139A CN 109359139 A CN109359139 A CN 109359139A CN 201811244243 A CN201811244243 A CN 201811244243A CN 109359139 A CN109359139 A CN 109359139A
Authority
CN
China
Prior art keywords
data
filtering
memory
processing flow
database
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Pending
Application number
CN201811244243.4A
Other languages
Chinese (zh)
Inventor
张坤
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Lazas Network Technology Shanghai Co Ltd
Original Assignee
Lazas Network Technology Shanghai Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Lazas Network Technology Shanghai Co Ltd filed Critical Lazas Network Technology Shanghai Co Ltd
Priority to CN201811244243.4A priority Critical patent/CN109359139A/en
Publication of CN109359139A publication Critical patent/CN109359139A/en
Pending legal-status Critical Current

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/50Allocation of resources, e.g. of the central processing unit [CPU]
    • G06F9/5061Partitioning or combining of resources

Landscapes

  • Engineering & Computer Science (AREA)
  • Software Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

The embodiment of the disclosure discloses a data synchronization method, a system, an electronic device and a computer readable storage medium, wherein the method comprises the following steps: sending the data in the database to a message storage processing flow; consuming and analyzing data in the processing flow stored in the message to obtain analyzed data; and filtering the analysis data according to a preset filtering rule to obtain filtering data, and storing the filtering data into a memory. According to the scheme, data can be imported from the relational database to the non-relational database, data can be written in real time, and meanwhile data can be traced back.

Description

数据同步方法、系统、电子设备及计算机可读存储介质Data synchronization method, system, electronic device, and computer-readable storage medium

技术领域technical field

本公开涉及通信技术领域,具体涉及一种数据同步方法、系统、电子设备及计算机可读存储介质。The present disclosure relates to the field of communication technologies, and in particular, to a data synchronization method, system, electronic device, and computer-readable storage medium.

背景技术Background technique

目前,将数据从关系型数据库导入到非关系型数据库中,利用开源软件Sqoop中的编程模型MapReduce(映射归约)做批量离线数据计算,开源软件Sqoop非常适合做离线数据操作任务。Currently, data is imported from a relational database to a non-relational database, and the programming model MapReduce (map reduction) in the open source software Sqoop is used to perform batch offline data calculations. The open source software Sqoop is very suitable for offline data manipulation tasks.

由于,在进行数据迁移时,会存在大量数据,编程模型MapReduce会执行较长时间,使得数据的实时性较差。Because, during data migration, there will be a large amount of data, and the programming model MapReduce will execute for a long time, making the real-time data poor.

发明内容SUMMARY OF THE INVENTION

本公开实施例提供一种数据同步方法和系统、电子设备及计算机可读存储介质。Embodiments of the present disclosure provide a data synchronization method and system, an electronic device, and a computer-readable storage medium.

第一方面,本公开实施例中提供了一种数据同步方法。In a first aspect, an embodiment of the present disclosure provides a data synchronization method.

具体的,所述数据同步方法,包括:Specifically, the data synchronization method includes:

将数据库中的数据发送至分布式发布订阅消息的处理流程;The process of sending the data in the database to the distributed publish-subscribe message;

消费所述分布式发布订阅消息的处理流程中的数据并进行解析,得到解析数据;以及Consume and parse the data in the processing flow of the distributed publish-subscribe message to obtain parsed data; and

按照预设过滤规则对所述解析数据进行过滤,得到过滤数据,将所述过滤数据存储到存储器中。The parsed data is filtered according to a preset filtering rule to obtain filtered data, and the filtered data is stored in a memory.

结合第一方面,本公开在第一方面的第一种实现方式中,在将数据库中数据发送至所述分布式发布订阅消息的处理流程之前,包括:初始化配置信息;其中,配置信息包括:数据库的连配置、分布式发布订阅消息的配置信息、数据处理的参数信息、预设过滤规则、存储器中表的结构信息。With reference to the first aspect, in a first implementation manner of the first aspect of the present disclosure, before the data in the database is sent to the processing flow of the distributed publish-subscribe message, it includes: initializing configuration information; wherein the configuration information includes: The connection configuration of the database, the configuration information of the distributed publish and subscribe messages, the parameter information of data processing, the preset filtering rules, the structure information of the table in the memory.

结合第一方面和第一方面的第一种实现方式,本公开在第一方面的第二种实现方式中,所述分布式发布订阅消息的处理流程中的数据为预先设定的格式的数据。In combination with the first aspect and the first implementation manner of the first aspect, in the second implementation manner of the first aspect of the present disclosure, the data in the processing flow of the distributed publish-subscribe message is data in a preset format .

结合第一方面、第一方面的第一种实现方式和第一方面的第二种实现方式,本公开在第一方面的第三种实现方式中,在所述将所述过滤数据存储到存储器中之后,还包括:存储消费分布式发布订阅消息的处理流程中的数据的函数。With reference to the first aspect, the first implementation manner of the first aspect, and the second implementation manner of the first aspect, in a third implementation manner of the first aspect of the present disclosure, when the filtering data is stored in the memory After that, it also includes: a function for storing data in the processing flow of consuming distributed publish-subscribe messages.

第二方面,本公开实施例中提供了一种数据回溯方法。In a second aspect, an embodiment of the present disclosure provides a data backtracking method.

具体的,所述数据回溯方法,包括:Specifically, the data backtracking method includes:

设置回溯数据表的参数,根据所述参数,获取对应的偏移量;Set the parameters of the backtracking data table, and obtain the corresponding offset according to the parameters;

根据所述对应的偏移量,消费所述分布式发布订阅消息的处理流程中的数据并进行解析,得到解析数据;According to the corresponding offset, the data in the processing flow of the distributed publish-subscribe message is consumed and parsed to obtain parsed data;

按照预设过滤规则对所述解析数据进行过滤,得到过滤数据,将所述过滤数据存储到存储器中。The parsed data is filtered according to a preset filtering rule to obtain filtered data, and the filtered data is stored in a memory.

第三方面,本公开实施例中提供了一种数据同步系统。In a third aspect, an embodiment of the present disclosure provides a data synchronization system.

具体的,所述数据同步系统,包括:Specifically, the data synchronization system includes:

发送模块,被配置为将数据库中的数据发送至分布式发布订阅消息的处理流程;The sending module is configured to send the data in the database to the processing flow of the distributed publish-subscribe message;

解析模块,被配置为消费所述分布式发布订阅消息的处理流程中的数据并进行解析,得到解析数据;A parsing module, configured to consume and parse the data in the processing flow of the distributed publish-subscribe message to obtain parsed data;

过滤模块,被配置为按照预设过滤规则对所述解析数据进行过滤,得到过滤数据;a filtering module, configured to filter the parsed data according to a preset filtering rule to obtain filtered data;

存储模块,被配置为将所述过滤数据存储到存储器中。A storage module configured to store the filtering data in a memory.

结合第三方面,本公开在第三方面的第一种实现方式中,所述数据同步系统,还包括:初始化模块,被配置为初始化配置信息;其中,配置信息包括:数据库的连配置、分布式发布订阅消息的配置信息、数据处理的参数信息、预设过滤规则、存储器中表的结构信息。With reference to the third aspect, in a first implementation manner of the third aspect of the present disclosure, the data synchronization system further includes: an initialization module configured to initialize configuration information; wherein the configuration information includes: database connection configuration, distribution configuration information, parameter information of data processing, preset filtering rules, and structure information of tables in memory.

结合第三方面和第三方面的第一种实现方式,本公开在第三方面的第二种实现方式中,所述分布式发布订阅消息的处理流程中的数据为预先设定的格式的数据。In combination with the third aspect and the first implementation manner of the third aspect, in the second implementation manner of the third aspect of the present disclosure, the data in the processing flow of the distributed publish-subscribe message is data in a preset format .

结合第三方面、第三方面的第一种实现方式和第三方面的第二种实现方式,本公开在第三方面的第三种实现方式中,所述存储模块,被配置为存储消费分布式发布订阅消息的处理流程中的数据的函数。With reference to the third aspect, the first implementation manner of the third aspect, and the second implementation manner of the third aspect, in a third implementation manner of the third aspect of the present disclosure, the storage module is configured to store consumption distribution A function of the data in the processing flow of a publish-subscribe message.

第四方面,本公开实施例中提供了一种数据回溯系统。In a fourth aspect, an embodiment of the present disclosure provides a data backtracking system.

具体的,所述数据回溯系统,包括:Specifically, the data backtracking system includes:

设置模块,被配置为设置回溯数据表的参数;A settings module, configured to set the parameters of the backtracking data table;

函数获取模块,被配置为根据所述参数,获取对应的偏移量;a function obtaining module, configured to obtain the corresponding offset according to the parameter;

解析模块,被配置为消费分布式发布订阅消息的处理流程中的数据并进行解析,得到解析数据;The parsing module is configured to consume and parse the data in the processing flow of the distributed publish-subscribe message to obtain the parsed data;

过滤模块,被配置为按照预设过滤规则对所述解析数据进行过滤,得到过滤数据;a filtering module, configured to filter the parsed data according to a preset filtering rule to obtain filtered data;

存储模块,被配置为将所述过滤数据存储到存储器中。A storage module configured to store the filtering data in a memory.

第五方面,本公开实施例提供了一种电子设备,包括存储器和处理器,所述存储器用于存储一条或多条支持数据同步系统执行上述第一和第二方面中数据同步方法的计算机指令,所述处理器被配置为用于执行所述存储器中存储的计算机指令。所述数据同步系统还可以包括通信接口,用于数据同步系统与其他设备或通信网络通信。In a fifth aspect, an embodiment of the present disclosure provides an electronic device, including a memory and a processor, where the memory is configured to store one or more computer instructions that support the data synchronization system to perform the data synchronization method in the first and second aspects above , the processor is configured to execute computer instructions stored in the memory. The data synchronization system may also include a communication interface for the data synchronization system to communicate with other devices or a communication network.

第六方面,本公开实施例提供了一种计算机可读存储介质,用于存储数据同步系统所用的计算机指令,其包含用于执行上述第一和第二方面中数据同步方法为数据同步系统所涉及的计算机指令。In a sixth aspect, an embodiment of the present disclosure provides a computer-readable storage medium for storing computer instructions used by a data synchronization system, which includes the data synchronization method used in the data synchronization system for executing the first and second aspects above. The computer instructions involved.

本公开实施例提供的技术方案可以包括以下有益效果:The technical solutions provided by the embodiments of the present disclosure may include the following beneficial effects:

上述技术方案,通过将数据库中数据发送到分布式发布订阅消息的处理流程,并对其进行解析和过滤,将过滤后的数据存储到对应的存储器,实现数据表中数据的实时同步处理。The above technical solution realizes the real-time synchronization processing of data in the data table by sending the data in the database to the processing flow of the distributed publish-subscribe message, analyzing and filtering it, and storing the filtered data in the corresponding memory.

应当理解的是,以上的一般描述和后文的细节描述仅是示例性和解释性的,并不能限制本公开。It is to be understood that the foregoing general description and the following detailed description are exemplary and explanatory only and are not restrictive of the present disclosure.

附图说明Description of drawings

结合附图,通过以下非限制性实施方式的详细描述,本公开的其它特征、目的和优点将变得更加明显。在附图中:Other features, objects and advantages of the present disclosure will become more apparent from the following detailed description of non-limiting embodiments, taken in conjunction with the accompanying drawings. In the attached image:

图1示出根据本公开一实施方式的数据同步方法的流程图;1 shows a flowchart of a data synchronization method according to an embodiment of the present disclosure;

图2示出根据本公开一实施方式的数据回溯方法的流程图;2 shows a flowchart of a data backtracking method according to an embodiment of the present disclosure;

图3示出根据本公开一实施方式的数据同步系统的结构框图;3 shows a structural block diagram of a data synchronization system according to an embodiment of the present disclosure;

图4示出根据本公开一实施方式的数据回溯系统的结构框图;4 shows a structural block diagram of a data backtracking system according to an embodiment of the present disclosure;

图5示出根据本公开一实施方式的电子设备的结构框图;FIG. 5 shows a structural block diagram of an electronic device according to an embodiment of the present disclosure;

图6是适于用来实现根据本公开一实施方式的数据同步或数据回溯方法的计算机系统的结构示意图。FIG. 6 is a schematic structural diagram of a computer system suitable for implementing a data synchronization or data backtracking method according to an embodiment of the present disclosure.

具体实施方式Detailed ways

下文中,将参考附图详细描述本公开的示例性实施方式,以使本领域技术人员可容易地实现它们。此外,为了清楚起见,在附图中省略了与描述示例性实施方式无关的部分。Hereinafter, exemplary embodiments of the present disclosure will be described in detail with reference to the accompanying drawings so that those skilled in the art can easily implement them. Also, for the sake of clarity, parts unrelated to describing the exemplary embodiments are omitted from the drawings.

在本公开中,应理解,诸如“包括”或“具有”等的术语旨在指示本说明书中所公开的特征、数字、步骤、行为、部件、部分或其组合的存在,并且不欲排除一个或多个其他特征、数字、步骤、行为、部件、部分或其组合存在或被添加的可能性。In the present disclosure, it should be understood that terms such as "comprising" or "having" are intended to indicate the presence of features, numbers, steps, acts, components, parts, or combinations thereof disclosed in this specification, and are not intended to exclude a or multiple other features, numbers, steps, acts, components, parts, or combinations thereof may exist or be added.

另外还需要说明的是,在不冲突的情况下,本公开中的实施例及实施例中的特征可以相互组合。下面将参考附图并结合实施例来详细说明本公开。In addition, it should be noted that the embodiments of the present disclosure and the features of the embodiments may be combined with each other under the condition of no conflict. The present disclosure will be described in detail below with reference to the accompanying drawings and in conjunction with embodiments.

本公开实施例提供的技术方案通过对关系型数据库管理系统mySQL的二进制日志文件BinLog解析后发送到分布式发布订阅消息Kafka的处理流程进行解析处理,通过规则引擎的加载过滤方法,获取到符合条件的数据,将获取到的数据存储到与mySQL数据库对应的表里。其中,规则引擎用于对解析出来的数据按照自定义方法进行过滤,返回过滤结果。其中,Kafka是一种高吞吐量的分布式发布消息订阅系统,作为消息存储的中间件。The technical solution provided by the embodiments of the present disclosure is to parse and process the processing flow of the binary log file BinLog of the relational database management system mySQL and then send it to the distributed publish-subscribe message Kafka. store the obtained data in the table corresponding to the mySQL database. Among them, the rule engine is used to filter the parsed data according to the custom method, and return the filtering result. Among them, Kafka is a high-throughput distributed publishing message subscription system, which is used as the middleware for message storage.

图1示出根据本公开一实施方式的数据同步方法的流程图。如图1所示,所述数据同步方法包括以下步骤S110-S130:FIG. 1 shows a flowchart of a data synchronization method according to an embodiment of the present disclosure. As shown in FIG. 1, the data synchronization method includes the following steps S110-S130:

在步骤S110中,将数据库中数据发送至Kafka处理流程;In step S110, the data in the database is sent to the Kafka processing flow;

在步骤S120中,消费Kafka处理流程中的数据并进行解析,得到解析数据;In step S120, the data in the Kafka processing flow is consumed and parsed to obtain parsed data;

在步骤S130中,按照预设过滤规则对所述解析数据进行过滤,得到过滤数据,将所述过滤数据存储到存储器中。In step S130, filtering the parsed data according to a preset filtering rule to obtain filtering data, and storing the filtering data in a memory.

上文提及,数据库中的数据在现有技术中处理方式为:利用开源工具Sqoop每天凌晨定时执行任务,将数据库中的数据批量从关系型数据库往非关系型数据库导入,随后构建数据仓库,用户利用交互式结构化查询语言(SQL,Structured Query Language)可以完成数据的查询等操作,由于需要处理的数据量较大,执行时间较长,导致数据的实时性较差。As mentioned above, the data in the database is processed in the prior art as follows: using the open source tool Sqoop to execute tasks at regular intervals every morning, importing the data in the database from the relational database to the non-relational database in batches, and then constructing a data warehouse, Users can use the interactive Structured Query Language (SQL, Structured Query Language) to complete data query and other operations. Due to the large amount of data to be processed and the long execution time, the real-time performance of the data is poor.

考虑到上述缺陷,在该实施方式中,提出一种数据同步方法,该方法通过将数据库中数据发送到Kafka处理流程,并对其进行解析和过滤,将过滤后的数据存储到对应的存储器,实现数据表中数据的实时同步处理。Considering the above defects, in this embodiment, a data synchronization method is proposed. The method sends the data in the database to the Kafka processing flow, parses and filters the data, and stores the filtered data in the corresponding memory. Real-time synchronization of data in the data table.

其中,数据库中数据发送至Kafka处理流程之前,需要对二进制日志文件BinLog解析。Among them, before the data in the database is sent to the Kafka processing process, the binary log file BinLog needs to be parsed.

其中,消费Kafka处理流程中的数据并进行解析。Kafka系统作为消息存储的中间件,不能一次性接收所有数据库中发送的数据,因此只能消费Kafka中的数据之后再接收下一批数据。其中,解析数据的格式可为Map<K,V>,方便存入对应的存储器。存入Kafka处理流程中的数据的数据都是由DRC(Data Replication Center,数据复制中心)或者Canal数据库增量日志解析系统解析出来的预先设定的格式(例如,JSON格式)的数据。其中,JSON(JavaScript Object Notation,JavaScript对象简谱)。Among them, the data in the Kafka processing flow is consumed and parsed. As the middleware for message storage, the Kafka system cannot receive data sent from all databases at one time, so it can only receive the next batch of data after consuming the data in Kafka. The format of the parsed data can be Map<K,V>, which is convenient to store in the corresponding memory. The data stored in the Kafka processing flow is all data in a preset format (for example, JSON format) parsed by the DRC (Data Replication Center, data replication center) or the Canal database incremental log parsing system. Among them, JSON (JavaScript Object Notation, JavaScript Object Notation).

其中,预设过滤规则可以通过函数自定义,规则引擎通过加载自定义的函数对数据进行过滤,得到过滤数据。其中,过滤数据也为Map<K,V>格式。The preset filtering rules can be customized through functions, and the rule engine filters the data by loading the customized functions to obtain filtered data. Among them, the filter data is also in Map<K,V> format.

其中,将所述过滤数据存储到存储器中包括:将过滤数据存储到Kudu表或者HBase表。Wherein, storing the filtering data in the memory includes: storing the filtering data in a Kudu table or an HBase table.

在本实施例的一个可选实现方式中,在将数据库中数据发送至Kafka处理流程之前,包括:初始化配置信息;其中,配置信息包括:数据库的连配置、Kafka的配置信息、数据处理的参数信息、预设过滤规则、存储器中表的结构信息。其中,配置信息存储在数据库中。其中,数据处理的参数信息转化成在数据处理过程中需要用到的类,通过对类进行广播,使得Excutor系统在执行任务时都能获取到广播的元数据信息。In an optional implementation of this embodiment, before sending the data in the database to the Kafka processing flow, it includes: initialization configuration information; wherein the configuration information includes: database connection configuration, Kafka configuration information, and data processing parameters information, preset filtering rules, structure information of tables in memory. Among them, the configuration information is stored in the database. Among them, the parameter information of data processing is converted into the classes that need to be used in the data processing process, and by broadcasting the classes, the Excutor system can obtain the broadcast metadata information when performing tasks.

在本实施例的一个可选实现方式中,在所述将所述过滤数据存储到存储器中之后,还包括:存储消费Kafka处理流程中的数据的函数。其中,所述函数为offset。函数offset的功能为以指定的引用为参照系,通过给定偏移量得到新的引用。In an optional implementation manner of this embodiment, after storing the filtering data in the memory, the method further includes: a function for storing and consuming data in the Kafka processing flow. Wherein, the function is offset. The function of the function offset is to take the specified reference as the reference system, and obtain a new reference through the given offset.

图2示出根据本公开一实施方式的数据回溯方法的流程图。如图2所示,所述数据回溯方法包括以下步骤S210-S230:FIG. 2 shows a flowchart of a data backtracking method according to an embodiment of the present disclosure. As shown in Figure 2, the data backtracking method includes the following steps S210-S230:

在步骤S210中,设置回溯数据表的参数,根据所述参数,获取对应的偏移量(例如,通过函数offset);In step S210, the parameters of the backtracking data table are set, and according to the parameters, the corresponding offset is obtained (for example, through the function offset);

在步骤S220中,根据所述对应的偏移量,消费Kafka处理流程中的数据并进行解析,得到解析数据;In step S220, according to the corresponding offset, the data in the Kafka processing flow is consumed and parsed to obtain parsed data;

在步骤S230中,按照预设过滤规则对所述解析数据进行过滤,得到过滤数据,将所述过滤数据存储到存储器中。In step S230, the parsed data is filtered according to a preset filtering rule to obtain filtered data, and the filtered data is stored in a memory.

在本实施方式中,通过保存的偏移量来回溯需要查询校验的数据,保证数据的准确性和一致性。In this embodiment, the data that needs to be queried and verified is backtracked through the stored offset, so as to ensure the accuracy and consistency of the data.

其中,回溯数据表的参数包括需回溯的数据表的名称、回溯的时间段、存储回溯数据的表的类型。回溯数据表的参数通过外部输入。The parameters of the backtracking data table include the name of the data table to be backtracked, the backtracking time period, and the type of the table that stores the backtracking data. The parameters of the backtracking data table are inputted externally.

其中,根据所述参数,获取对应的偏移量,包括:根据需回溯的数据表的名称找到对应的数据表,再根据回溯的时间段找到对应的偏移量。具体的,解析所述参数,按照时间段来解析,判断是按照天回溯还是按照小时回溯,获取到要回溯表的配置信息。Wherein, obtaining the corresponding offset according to the parameter includes: finding the corresponding data table according to the name of the data table to be backtracked, and then finding the corresponding offset according to the backtracking time period. Specifically, the parameters are parsed according to the time period, and it is determined whether to backtrack by day or by hour, and obtain the configuration information of the table to be backtracked.

其中,数据处理的参数信息转化成在数据处理过程中需要用到的类,通过对类进行广播,使得Excutor系统在执行任务时都能获取到广播的元数据信息。Among them, the parameter information of data processing is converted into the classes that need to be used in the data processing process, and by broadcasting the classes, the Excutor system can obtain the broadcast metadata information when performing tasks.

其中,根据所述对应的偏移量,消费Kafka处理流程中的数据并进行解析,得到解析数据,具体的是通过Spark-Kafka的API来读取Kafka中数据,其中所述数据是偏移量所指定的区间的数据。Among them, according to the corresponding offset, the data in the Kafka processing flow is consumed and parsed, and the parsed data is obtained. Specifically, the data in Kafka is read through the Spark-Kafka API, where the data is the offset data for the specified interval.

其中,消费Kafka处理流程中的数据并进行解析。Kafka系统作为消息存储的中间件,不能一次性接收所有数据库中发送的数据,因此只能消费Kafka中的数据之后再接收下一批数据。其中,解析数据的格式可为Map<K,V>,方便存入对应的存储器。存入Kafka处理流程中的数据的数据都是由DRC(Data Replication Center,数据复制中心)或者Canal数据库增量日志解析系统解析出来的JSON格式的数据。Among them, the data in the Kafka processing flow is consumed and parsed. As the middleware for message storage, the Kafka system cannot receive data sent from all databases at one time, so it can only receive the next batch of data after consuming the data in Kafka. The format of the parsed data can be Map<K,V>, which is convenient to store in the corresponding memory. The data stored in the Kafka processing process are all JSON-formatted data parsed by the DRC (Data Replication Center) or the Canal database incremental log parsing system.

其中,预设过滤规则可以通过函数自定义,规则引擎通过加载自定义的函数对数据进行过滤,得到过滤数据。其中,过滤数据也为Map<K,V>格式。The preset filtering rules can be customized through functions, and the rule engine filters the data by loading the customized functions to obtain filtered data. Among them, the filter data is also in Map<K,V> format.

其中,将所述过滤数据存储到存储器中包括:将过滤数据存储到Kudu表或者HBase表。Wherein, storing the filtering data in the memory includes: storing the filtering data in a Kudu table or an HBase table.

下述为本公开系统实施例,可以用于执行本公开方法实施例。The following system embodiments of the present disclosure can be used to execute the method embodiments of the present disclosure.

图3示出根据本公开一实施方式的数据同步系统的结构框图,该系统可以通过软件、硬件或者两者的结合实现成为电子设备的部分或者全部。如图3所示,所述数据同步系统包括:FIG. 3 shows a structural block diagram of a data synchronization system according to an embodiment of the present disclosure. The system can be implemented as part or all of an electronic device through software, hardware, or a combination of the two. As shown in Figure 3, the data synchronization system includes:

发送模块302,被配置为为将数据库中数据发送至Kafka处理流程;The sending module 302 is configured to send the data in the database to the Kafka processing flow;

解析模块303,被配置为消费Kafka处理流程中的数据并进行解析,得到解析数据;The parsing module 303 is configured to consume and parse the data in the Kafka processing flow to obtain the parsed data;

过滤模块304,被配置为按照预设过滤规则对所述解析数据进行过滤,得到过滤数据;The filtering module 304 is configured to filter the parsed data according to a preset filtering rule to obtain filtered data;

存储模块305,被配置为将所述过滤数据存储到存储器中。The storage module 305 is configured to store the filtering data in the memory.

上文提及,数据库中的数据在现有技术中处理方式为:利用开源工具Sqoop每天凌晨定时执行任务,将数据库中的数据批量从关系型数据库往非关系型数据库导入,随后构建数据仓库,用户利用交互式结构化查询语言(SQL,Structured Query Language)可以完成数据的查询等操作,由于需要处理的数据量较大,执行时间较长,导致数据的实时性较差。As mentioned above, the data in the database is processed in the prior art as follows: using the open source tool Sqoop to execute tasks at regular intervals every morning, importing the data in the database from the relational database to the non-relational database in batches, and then constructing a data warehouse, Users can use the interactive Structured Query Language (SQL, Structured Query Language) to complete data query and other operations. Due to the large amount of data to be processed and the long execution time, the real-time performance of the data is poor.

考虑到上述缺陷,在该实施方式中,提出一种数据同步系统,该系统将数据库中数据发送到Kafka处理流程,并对其进行解析和过滤,将过滤后的数据存储到对应的存储器,实现数据表中数据的实时同步处理。。Considering the above defects, in this embodiment, a data synchronization system is proposed, which sends the data in the database to the Kafka processing flow, parses and filters the data, and stores the filtered data in the corresponding memory to realize Real-time synchronization of data in data tables. .

其中,数据库中数据发送至Kafka处理流程之前,需要对二进制日志文件BinLog解析。Among them, before the data in the database is sent to the Kafka processing process, the binary log file BinLog needs to be parsed.

其中,Kafka系统作为消息存储的中间件,不能一次性接收所有数据库中发送的数据,因此只能消费Kafka中的数据之后再接收下一批数据。其中,解析数据的格式可为Map<K,V>,方便存入对应的存储器。存入Kafka处理流程中的数据的数据都是由DRC(DataReplication Center,数据复制中心)或者Canal数据库增量日志解析系统解析出来的JSON格式的数据。Among them, the Kafka system, as the middleware for message storage, cannot receive data sent from all databases at one time, so it can only receive the next batch of data after consuming the data in Kafka. The format of the parsed data can be Map<K,V>, which is convenient to store in the corresponding memory. The data stored in the Kafka processing process are all JSON-formatted data parsed by the DRC (Data Replication Center) or the Canal database incremental log parsing system.

其中,预设过滤规则可以通过函数自定义,规则引擎通过加载自定义的函数对数据进行过滤,得到过滤数据。其中,过滤数据也为Map<K,V>格式。The preset filtering rules can be customized through functions, and the rule engine filters the data by loading the customized functions to obtain filtered data. Among them, the filter data is also in Map<K,V> format.

其中,存储模块305,被配置为将过滤数据存储到Kudu表或者HBase表。The storage module 305 is configured to store the filtering data in a Kudu table or an HBase table.

在本实施例的一个可选实现方式中,所述数据同步系统还包括:初始化模块301,被配置为初始化配置信息;其中,配置信息包括:数据库的连配置、Kafka的配置信息、数据处理的参数信息、预设过滤规则、存储器中表的结构信息。其中,配置信息存储在数据库中。其中,数据处理的参数信息转化成在数据处理过程中需要用到的类,通过对类进行广播,使得Excutor系统在执行任务时都能获取到广播的元数据信息。In an optional implementation manner of this embodiment, the data synchronization system further includes: an initialization module 301 configured to initialize configuration information; wherein the configuration information includes: database connection configuration, Kafka configuration information, data processing Parameter information, preset filtering rules, structure information of tables in memory. Among them, the configuration information is stored in the database. Among them, the parameter information of data processing is converted into the classes that need to be used in the data processing process, and by broadcasting the classes, the Excutor system can obtain the broadcast metadata information when performing tasks.

在本实施例的一个可选实现方式中,所述存储模块305,被配置为存储消费Kafka处理流程中的数据的函数。其中,所述函数为offset。函数offset的功能为以指定的引用为参照系,通过给定偏移量得到新的引用。In an optional implementation manner of this embodiment, the storage module 305 is configured to store a function of consuming data in the Kafka processing flow. Wherein, the function is offset. The function of the function offset is to take the specified reference as the reference system, and obtain a new reference through the given offset.

图4示出根据本公开一实施方式的数据回溯系统的结构框图,该系统可以通过软件、硬件或者两者的结合实现成为电子设备的部分或者全部。如图4所示,所述数据回溯系统包括:FIG. 4 shows a structural block diagram of a data retrospective system according to an embodiment of the present disclosure. The system can be implemented as part or all of an electronic device through software, hardware, or a combination of the two. As shown in Figure 4, the data backtracking system includes:

设置模块401,被配置为设置回溯数据表的参数;The setting module 401 is configured to set the parameters of the retrospective data table;

函数获取模块402,被配置为根据所述参数,获取对应的偏移量;The function obtaining module 402 is configured to obtain the corresponding offset according to the parameter;

解析模块403,被配置为消费Kafka处理流程中的数据并进行解析,得到解析数据;The parsing module 403 is configured to consume and parse the data in the Kafka processing flow to obtain the parsed data;

过滤模块404,被配置为按照预设过滤规则对所述解析数据进行过滤,得到过滤数据;The filtering module 404 is configured to filter the parsed data according to preset filtering rules to obtain filtered data;

存储模块405,被配置为将所述过滤数据存储到存储器中。The storage module 405 is configured to store the filtering data in the memory.

具体的,数据回溯系统的具体限定可以参见上文中对于数据回溯方法的限定,在此不再赘述。Specifically, for the specific limitations of the data retrospective system, reference may be made to the above-mentioned limitations on the data retrospective method, which will not be repeated here.

本公开还公开了一种电子设备,图5示出根据本公开一实施方式的电子设备的结构框图,如图5所示,所述电子设备500包括存储器501和处理器502;其中,The present disclosure also discloses an electronic device. FIG. 5 shows a structural block diagram of the electronic device according to an embodiment of the present disclosure. As shown in FIG. 5 , the electronic device 500 includes a memory 501 and a processor 502; wherein,

所述存储器501用于存储一条或多条计算机指令,其中,所述一条或多条计算机指令被所述处理器502执行以实现上述任一方法步骤。The memory 501 is used to store one or more computer instructions, wherein the one or more computer instructions are executed by the processor 502 to implement any of the above method steps.

图6适于用来实现根据本公开实施方式的数据同步方法的计算机系统的结构示意图。FIG. 6 is a schematic structural diagram of a computer system suitable for implementing the data synchronization method according to an embodiment of the present disclosure.

如图6所示,计算机系统600包括中央处理单元(CPU)601,其可以根据存储在只读存储器(ROM)602中的程序或者从存储部分608加载到随机访问存储器(RAM)603中的程序而执行上述实施方式中的各种处理。在RAM603中,还存储有系统600操作所需的各种程序和数据。CPU601、ROM602以及RAM603通过总线604彼此相连。输入/输出(I/O)接口605也连接至总线604。As shown in FIG. 6, a computer system 600 includes a central processing unit (CPU) 601, which can be loaded into a random access memory (RAM) 603 according to a program stored in a read only memory (ROM) 602 or a program from a storage section 608 Instead, various processes in the above-described embodiments are executed. In the RAM 603, various programs and data necessary for the operation of the system 600 are also stored. The CPU 601 , the ROM 602 and the RAM 603 are connected to each other through a bus 604 . An input/output (I/O) interface 605 is also connected to bus 604 .

以下部件连接至I/O接口605:包括键盘、鼠标等的输入部分606;包括诸如阴极射线管(CRT)、液晶显示器(LCD)等以及扬声器等的输出部分607;包括硬盘等的存储部分608;以及包括诸如LAN卡、调制解调器等的网络接口卡的通信部分609。通信部分609经由诸如因特网的网络执行通信处理。驱动器610也根据需要连接至I/O接口605。可拆卸介质611,诸如磁盘、光盘、磁光盘、半导体存储器等等,根据需要安装在驱动器610上,以便于从其上读出的计算机程序根据需要被安装入存储部分608。The following components are connected to the I/O interface 605: an input section 606 including a keyboard, a mouse, etc.; an output section 607 including a cathode ray tube (CRT), a liquid crystal display (LCD), etc., and a speaker, etc.; a storage section 608 including a hard disk, etc. ; and a communication section 609 including a network interface card such as a LAN card, a modem, and the like. The communication section 609 performs communication processing via a network such as the Internet. A drive 610 is also connected to the I/O interface 605 as needed. A removable medium 611, such as a magnetic disk, an optical disk, a magneto-optical disk, a semiconductor memory, etc., is mounted on the drive 610 as needed so that a computer program read therefrom is installed into the storage section 608 as needed.

特别地,根据本公开的实施方式,上文描述的方法可以被实现为计算机软件程序。例如,本公开的实施方式包括一种计算机程序产品,其包括有形地包含在及其可读介质上的计算机程序,所述计算机程序包含用于执行所述数据同步方法的程序代码。在这样的实施方式中,该计算机程序可以通过通信部分609从网络上被下载和安装,和/或从可拆卸介质611被安装。In particular, according to embodiments of the present disclosure, the methods described above may be implemented as computer software programs. For example, embodiments of the present disclosure include a computer program product including a computer program tangibly embodied on a readable medium thereof, the computer program including program code for executing the data synchronization method. In such an embodiment, the computer program may be downloaded and installed from the network via the communication section 609 and/or installed from the removable medium 611 .

附图中的流程图和框图,图示了按照本公开各种实施方式的系统、方法和计算机程序产品的可能实现的体系架构、功能和操作。在这点上,路程图或框图中的每个方框可以代表一个模块、程序段或代码的一部分,所述模块、程序段或代码的一部分包含一个或多个用于实现规定的逻辑功能的可执行指令。也应当注意,在有些作为替换的实现中,方框中所标注的功能也可以以不同于附图中所标注的顺序发生。例如,两个接连地表示的方框实际上可以基本并行地执行,它们有时也可以按相反的顺序执行,这依所涉及的功能而定。也要注意的是,框图和/或流程图中的每个方框、以及框图和/或流程图中的方框的组合,可以用执行规定的功能或操作的专用的基于硬件的系统来实现,或者可以用专用硬件与计算机指令的组合来实现。The flowchart and block diagrams in the Figures illustrate the architecture, functionality, and operation of possible implementations of systems, methods and computer program products according to various embodiments of the present disclosure. In this regard, each block in the diagram or block diagram may represent a module, segment, or portion of code that contains one or more functions for implementing the specified logical function. executable instructions. It should also be noted that, in some alternative implementations, the functions noted in the blocks may occur out of the order noted in the figures. For example, two blocks shown in succession may, in fact, be executed substantially concurrently, or the blocks may sometimes be executed in the reverse order, depending upon the functionality involved. It is also noted that each block of the block diagrams and/or flowchart illustrations, and combinations of blocks in the block diagrams and/or flowchart illustrations, can be implemented in dedicated hardware-based systems that perform the specified functions or operations , or can be implemented in a combination of dedicated hardware and computer instructions.

描述于本公开实施方式中所涉及到的单元或模块可以通过软件的方式实现,也可以通过硬件的方式来实现。所描述的单元或模块也可以设置在处理器中,这些单元或模块的名称在某种情况下并不构成对该单元或模块本身的限定。The units or modules involved in the embodiments of the present disclosure can be implemented in software or hardware. The described units or modules may also be provided in the processor, and the names of these units or modules do not constitute limitations to the units or modules themselves in certain circumstances.

作为另一方面,本公开还提供了一种计算机可读存储介质,该计算机可读存储介质可以是上述实施方式中所述系统中所包含的计算机可读存储介质;也可以是单独存在,未装配入设备中的计算机可读存储介质。计算机可读存储介质存储有一个或者一个以上程序,所述程序被一个或者一个以上的处理器用来执行描述于本公开的方法。As another aspect, the present disclosure also provides a computer-readable storage medium. The computer-readable storage medium may be a computer-readable storage medium included in the system described in the foregoing embodiments; A computer-readable storage medium that fits into a device. The computer-readable storage medium stores one or more programs used by one or more processors to perform the methods described in the present disclosure.

以上描述仅为本公开的较佳实施例以及对所运用技术原理的说明。本领域技术人员应当理解,本公开中所涉及的发明范围,并不限于上述技术特征的特定组合而成的技术方案,同时也应涵盖在不脱离所述发明构思的情况下,由上述技术特征或其等同特征进行任意组合而形成的其它技术方案。例如上述特征与本公开中公开的(但不限于)具有类似功能的技术特征进行互相替换而形成的技术方案。The above description is merely a preferred embodiment of the present disclosure and an illustration of the technical principles employed. It should be understood by those skilled in the art that the scope of the invention involved in the present disclosure is not limited to the technical solutions formed by the specific combination of the above technical features, and should also cover the above technical features without departing from the inventive concept. Other technical solutions formed by any combination of its equivalent features. For example, a technical solution is formed by replacing the above-mentioned features with the technical features disclosed in the present disclosure (but not limited to) with similar functions.

Claims (10)

1. A method of data synchronization, comprising:
sending the data in the database to a processing flow of the distributed publishing and subscribing message;
consuming and analyzing the data in the distributed publish-subscribe message processing flow to obtain analyzed data; and
and filtering the analysis data according to a preset filtering rule to obtain filtering data, and storing the filtering data into a memory.
2. The method of claim 1, prior to sending the data in the database to the process flow of the distributed publish-subscribe message, comprising:
initializing configuration information;
wherein the configuration information includes: the method comprises the steps of continuous configuration of a database, configuration information of distributed publish-subscribe messages, parameter information of data processing, preset filtering rules and structure information of a table in a memory.
3. The method of claim 1, further comprising, after said storing said filtered data in memory:
and storing a function for consuming the data in the processing flow of the distributed publish-subscribe message.
4. A data backtracking method, comprising:
setting parameters of a backtracking data table, and acquiring corresponding offset according to the parameters;
according to the corresponding offset, consuming and analyzing data in the processing flow of the distributed publishing and subscribing message to obtain analyzed data; and
and filtering the analysis data according to a preset filtering rule to obtain filtering data, and storing the filtering data into a memory.
5. A data synchronization system, comprising:
the sending module is configured to send the data in the database to a processing flow of the distributed publishing and subscribing message;
the analysis module is configured to consume and analyze data in the processing flow of the distributed publish-subscribe message to obtain analysis data;
the filtering module is configured to filter the analysis data according to a preset filtering rule to obtain filtered data;
a storage module configured to store the filtered data in a memory.
6. The system of claim 5, further comprising:
an initialization module configured to initialize the configuration information;
wherein the configuration information includes: the method comprises the steps of continuous configuration of a database, configuration information of distributed publish-subscribe messages, parameter information of data processing, preset filtering rules and structure information of a table in a memory.
7. The system of claim 5, wherein the storage module is configured to store a function of data in the process flow of consuming the distributed publish-subscribe message.
8. A data backtracking system, comprising:
a setting module configured to set parameters of the backtracking data table;
a function obtaining module configured to obtain a corresponding offset according to the parameter;
the analysis module is configured to consume data in the processing flow of the distributed publish-subscribe message and analyze the data to obtain analysis data;
the filtering module is configured to filter the analysis data according to a preset filtering rule to obtain filtered data;
a storage module configured to store the filtered data in a memory.
9. An electronic device comprising a memory and a processor; wherein,
the memory is configured to store one or more computer instructions, wherein the one or more computer instructions are executed by the processor to implement the method steps of any of claims 1-4.
10. A computer-readable storage medium having stored thereon computer instructions, characterized in that the computer instructions, when executed by a processor, carry out the method steps of any of claims 1-4.
CN201811244243.4A 2018-10-24 2018-10-24 Data synchronization method, system, electronic device and computer readable storage medium Pending CN109359139A (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN201811244243.4A CN109359139A (en) 2018-10-24 2018-10-24 Data synchronization method, system, electronic device and computer readable storage medium

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN201811244243.4A CN109359139A (en) 2018-10-24 2018-10-24 Data synchronization method, system, electronic device and computer readable storage medium

Publications (1)

Publication Number Publication Date
CN109359139A true CN109359139A (en) 2019-02-19

Family

ID=65346611

Family Applications (1)

Application Number Title Priority Date Filing Date
CN201811244243.4A Pending CN109359139A (en) 2018-10-24 2018-10-24 Data synchronization method, system, electronic device and computer readable storage medium

Country Status (1)

Country Link
CN (1) CN109359139A (en)

Cited By (6)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN110515944A (en) * 2019-08-16 2019-11-29 出门问问(苏州)信息科技有限公司 Date storage method, storage medium and electronic equipment based on distributed data base
CN110968592A (en) * 2019-12-06 2020-04-07 深圳前海环融联易信息科技服务有限公司 Metadata acquisition method and device, computer equipment and computer-readable storage medium
CN111597270A (en) * 2020-05-22 2020-08-28 深圳前海微众银行股份有限公司 Data synchronization method, device, device and computer storage medium
CN111666344A (en) * 2020-06-19 2020-09-15 中信银行股份有限公司 Heterogeneous data synchronization method and device
CN111797158A (en) * 2019-04-08 2020-10-20 北京沃东天骏信息技术有限公司 Data synchronization system, method and computer readable storage medium
CN112052295A (en) * 2020-08-06 2020-12-08 中信银行股份有限公司 Data synchronization method and device, electronic equipment and readable storage medium

Citations (5)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN106713470A (en) * 2016-12-29 2017-05-24 北京奇艺世纪科技有限公司 Distributed cache updating method and cache updating system
CN107122497A (en) * 2017-05-25 2017-09-01 北京微影时代科技有限公司 Data processing method, device, electronic equipment and computer-readable recording medium
CN107609008A (en) * 2017-07-26 2018-01-19 郑州云海信息技术有限公司 A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop
CN107885881A (en) * 2017-11-29 2018-04-06 顺丰科技有限公司 Business datum real-time report, acquisition methods, device, equipment and its storage medium
CN108365985A (en) * 2018-02-07 2018-08-03 深圳壹账通智能科技有限公司 A kind of cluster management method, device, terminal device and storage medium

Patent Citations (5)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN106713470A (en) * 2016-12-29 2017-05-24 北京奇艺世纪科技有限公司 Distributed cache updating method and cache updating system
CN107122497A (en) * 2017-05-25 2017-09-01 北京微影时代科技有限公司 Data processing method, device, electronic equipment and computer-readable recording medium
CN107609008A (en) * 2017-07-26 2018-01-19 郑州云海信息技术有限公司 A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop
CN107885881A (en) * 2017-11-29 2018-04-06 顺丰科技有限公司 Business datum real-time report, acquisition methods, device, equipment and its storage medium
CN108365985A (en) * 2018-02-07 2018-08-03 深圳壹账通智能科技有限公司 A kind of cluster management method, device, terminal device and storage medium

Cited By (8)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN111797158A (en) * 2019-04-08 2020-10-20 北京沃东天骏信息技术有限公司 Data synchronization system, method and computer readable storage medium
CN111797158B (en) * 2019-04-08 2024-04-05 北京沃东天骏信息技术有限公司 Data synchronization system, method and computer readable storage medium
CN110515944A (en) * 2019-08-16 2019-11-29 出门问问(苏州)信息科技有限公司 Date storage method, storage medium and electronic equipment based on distributed data base
CN110968592A (en) * 2019-12-06 2020-04-07 深圳前海环融联易信息科技服务有限公司 Metadata acquisition method and device, computer equipment and computer-readable storage medium
CN110968592B (en) * 2019-12-06 2023-11-21 深圳前海环融联易信息科技服务有限公司 Metadata acquisition method, metadata acquisition device, computer equipment and computer readable storage medium
CN111597270A (en) * 2020-05-22 2020-08-28 深圳前海微众银行股份有限公司 Data synchronization method, device, device and computer storage medium
CN111666344A (en) * 2020-06-19 2020-09-15 中信银行股份有限公司 Heterogeneous data synchronization method and device
CN112052295A (en) * 2020-08-06 2020-12-08 中信银行股份有限公司 Data synchronization method and device, electronic equipment and readable storage medium

Similar Documents

Publication Publication Date Title
CN109359139A (en) Data synchronization method, system, electronic device and computer readable storage medium
JP7200259B2 (en) Data retention handling for data object stores
CN107451109B (en) Report generation method and system
JP6509127B2 (en) Variable-duration window for continuous data stream
EP3259668B1 (en) System and method for generating an effective test data set for testing big data applications
CN109063196B (en) Data processing method and device, electronic equipment and computer readable storage medium
CN110196888A (en) Data-updating method, device, system and medium based on Hadoop
US20140156683A1 (en) Integrating event processing with map-reduce
CN110019350A (en) Data query method and apparatus based on configuration information
CN106874247B (en) Report generation method and device
CN111666326A (en) ETL scheduling method and device
CN109840298B (en) Multi-information source collection method and system for large-scale network data
US10394805B2 (en) Database management for mobile devices
CN105677849A (en) Data updating method and device
CN105900093A (en) A method for updating a data table of a KeyValue database and a device for updating table data
CN109241099B (en) Data query method and terminal equipment
CN115774750A (en) Database lake entering configuration method and system, electronic equipment and storage medium
CN113190517A (en) Data integration method and device, electronic equipment and computer readable medium
CN112231292B (en) File processing method, device, storage medium and computer equipment
CN115878596B (en) Data processing methods, apparatus, equipment and storage media
CN113204560A (en) Data processing method, device and equipment and readable storage medium
CN110795494A (en) Automatic testing method and device for synchronous and asynchronous cache data
CN115599871A (en) Lake and bin integrated data processing system and method
CN105930354A (en) Storage model conversion method and device
US12306821B2 (en) System and techniques for traversing a dependency graph or tree structure in one step

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
RJ01 Rejection of invention patent application after publication

Application publication date: 20190219

RJ01 Rejection of invention patent application after publication