CN118626496B - 数据集成方法、装置、服务器、介质及程序 - Google Patents
数据集成方法、装置、服务器、介质及程序 Download PDFInfo
- Publication number
- CN118626496B CN118626496B CN202411095117.2A CN202411095117A CN118626496B CN 118626496 B CN118626496 B CN 118626496B CN 202411095117 A CN202411095117 A CN 202411095117A CN 118626496 B CN118626496 B CN 118626496B
- Authority
- CN
- China
- Prior art keywords
- data
- database
- source
- target
- incremental
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 230000010354 integration Effects 0.000 title claims abstract description 196
- 238000000034 method Methods 0.000 title claims abstract description 139
- 230000008569 process Effects 0.000 claims abstract description 94
- 238000013507 mapping Methods 0.000 claims abstract description 83
- 238000013480 data collection Methods 0.000 claims abstract description 58
- 238000006243 chemical reaction Methods 0.000 claims abstract description 21
- 230000008859 change Effects 0.000 claims description 40
- 230000005477 standard model Effects 0.000 claims description 29
- 230000008676 import Effects 0.000 claims description 18
- 238000013506 data mapping Methods 0.000 claims description 17
- 238000001914 filtration Methods 0.000 claims description 13
- 238000003780 insertion Methods 0.000 claims description 11
- 230000037431 insertion Effects 0.000 claims description 11
- 238000003860 storage Methods 0.000 claims description 11
- 238000004422 calculation algorithm Methods 0.000 claims description 10
- 238000013499 data model Methods 0.000 claims description 10
- 238000012546 transfer Methods 0.000 claims description 7
- 238000001514 detection method Methods 0.000 claims description 6
- 230000006870 function Effects 0.000 claims description 6
- 238000012432 intermediate storage Methods 0.000 claims description 4
- 238000004590 computer program Methods 0.000 claims description 3
- 230000002776 aggregation Effects 0.000 claims 4
- 238000004220 aggregation Methods 0.000 claims 4
- 238000012545 processing Methods 0.000 description 27
- 238000013508 migration Methods 0.000 description 20
- 230000005012 migration Effects 0.000 description 20
- 238000007726 management method Methods 0.000 description 19
- 238000012544 monitoring process Methods 0.000 description 11
- 238000004458 analytical method Methods 0.000 description 9
- 238000013500 data storage Methods 0.000 description 7
- 230000007246 mechanism Effects 0.000 description 6
- 230000005540 biological transmission Effects 0.000 description 5
- 238000005457 optimization Methods 0.000 description 5
- 230000001360 synchronised effect Effects 0.000 description 5
- 238000010586 diagram Methods 0.000 description 4
- 230000009466 transformation Effects 0.000 description 4
- 230000005856 abnormality Effects 0.000 description 3
- 238000012217 deletion Methods 0.000 description 3
- 230000037430 deletion Effects 0.000 description 3
- 238000005516 engineering process Methods 0.000 description 3
- 238000011156 evaluation Methods 0.000 description 3
- 230000004044 response Effects 0.000 description 3
- 238000012216 screening Methods 0.000 description 3
- 239000013598 vector Substances 0.000 description 3
- 238000004364 calculation method Methods 0.000 description 2
- 239000003054 catalyst Substances 0.000 description 2
- 238000000605 extraction Methods 0.000 description 2
- 230000001960 triggered effect Effects 0.000 description 2
- 238000012935 Averaging Methods 0.000 description 1
- 239000008186 active pharmaceutical agent Substances 0.000 description 1
- 230000006399 behavior Effects 0.000 description 1
- 239000008280 blood Substances 0.000 description 1
- 210000004369 blood Anatomy 0.000 description 1
- 238000004140 cleaning Methods 0.000 description 1
- 238000010276 construction Methods 0.000 description 1
- 238000007405 data analysis Methods 0.000 description 1
- 238000013075 data extraction Methods 0.000 description 1
- 238000013136 deep learning model Methods 0.000 description 1
- 230000009977 dual effect Effects 0.000 description 1
- 238000013209 evaluation strategy Methods 0.000 description 1
- 230000003203 everyday effect Effects 0.000 description 1
- 230000003993 interaction Effects 0.000 description 1
- 230000001788 irregular Effects 0.000 description 1
- 238000010801 machine learning Methods 0.000 description 1
- 238000012423 maintenance Methods 0.000 description 1
- 238000012986 modification Methods 0.000 description 1
- 230000004048 modification Effects 0.000 description 1
- 230000000737 periodic effect Effects 0.000 description 1
- 230000002688 persistence Effects 0.000 description 1
- 238000003908 quality control method Methods 0.000 description 1
- 230000009467 reduction Effects 0.000 description 1
- 239000004575 stone Substances 0.000 description 1
- 238000013024 troubleshooting Methods 0.000 description 1
- 238000012800 visualization Methods 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/22—Indexing; Data structures therefor; Storage structures
- G06F16/2282—Tablespace storage structures; Management thereof
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/23—Updating
- G06F16/2358—Change logging, detection, and notification
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/242—Query formulation
- G06F16/2433—Query languages
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/27—Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/28—Databases characterised by their database models, e.g. relational or object models
- G06F16/284—Relational databases
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Physics & Mathematics (AREA)
- Data Mining & Analysis (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computing Systems (AREA)
- Mathematical Physics (AREA)
- Computational Linguistics (AREA)
- Software Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本申请实施例提供一种数据集成方法、装置、服务器、介质及程序,其中方法包括:在配置源数据库的连接信息后,通过全量数据采集,将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;在源数据库存在数据更新时,通过增量数据采集,将源数据库的增量数据拉取到中间表,并从中间表拉取到目标表;对于全量数据采集,将全量数据采集到中间表,再根据数据集成语句,从中间表拉取到目标表;对于增量数据采集,根据增量更新字段,将增量数据采集到中间表,再利用带有增量更新子句的数据集成语句,从中间表拉取到目标表;数据从中间表拉取到目标表还进行数据类型转换和字段映射。本申请实施例可以提升数据集成过程中的效率。
Description
技术领域
本申请实施例涉及数据处理技术领域,具体涉及一种数据集成方法、装置、服务器、介质及程序。
背景技术
数据集成是将来自不同数据源的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享的过程。数据集成在数字化转型中扮演着核心和基础的角色,其中数字化转型是指企业利用数字技术改造传统业务流程、改善经营模式和提升客户体验的过程;在数字化转型的过程中,数据集成是实现数据驱动决策、自动化操作和提高数据处理效率的基石。因此,如何提供改进的技术方案,以提升数据集成过程中的效率,成为了本领域技术人员亟需解决的技术问题。
发明内容
有鉴于此,本申请实施例提供一种数据集成方法、装置、服务器、介质及程序,以提升数据集成过程中的效率。
为实现上述目的,本申请实施例提供如下技术方案。
第一方面,本申请实施例提供一种数据集成方法,包括:
在配置源数据库的连接信息后,向源数据库发送主动查询请求,以查询源数据库是否存在数据记录或数据更新;
在源数据库存在数据记录时,通过全量数据采集,将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;所述中间库为数据集成平台的中间存储系统对应的数据库,目标库为采用企业标准模型的数据库;
在源数据库存在数据更新时,通过增量数据采集,将源数据库的增量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;
其中,对于全量数据采集,将源数据库的全量数据采集到中间库的中间表,再根据数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表;对于增量数据采集,根据增量更新字段,将源数据库的增量数据采集到中间库的中间表,再利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表;数据从中间表拉取到目标表的过程还进行数据类型转换和字段映射。
第二方面,本申请实施例提供一种数据集成装置,包括:
查询请求发送模块,用于在配置源数据库的连接信息后,向源数据库发送主动查询请求,以查询源数据库是否存在数据记录或数据更新;
全量数据集成模块,用于在源数据库存在数据记录时,通过全量数据采集,将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;所述中间库为数据集成平台的中间存储系统对应的数据库,目标库为采用企业标准模型的数据库;
增量数据集成模块,用于在源数据库存在数据更新时,通过增量数据采集,将源数据库的增量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;
其中,对于全量数据采集,全量数据集成模块将源数据库的全量数据采集到中间库的中间表,再根据数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表;对于增量数据采集,增量数据集成模块根据增量更新字段,将源数据库的增量数据采集到中间库的中间表,再利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表;数据从中间表拉取到目标表的过程还进行数据类型转换和字段映射。
第三方面,本申请实施例提供一种服务器,包括:至少一个存储器和至少一个处理器,所述存储器存储计算机可执行指令,所述处理器调用所述计算机可执行指令,以执行如上述第一方面所述的数据集成方法。
第四方面,本申请实施例提供一种存储介质,所述存储介质存储计算机可执行指令,所述计算机可执行指令被执行时,实现如上述第一方面所述的数据集成方法。
第五方面,本申请实施例提供一种计算机程序产品,包括计算机可执行指令,所述计算机可执行指令被执行时,实现如上述第一方面所述的数据集成方法。
本申请实施例可以通过全量数据采集和增量数据采集的流程,涵盖从源数据库到目标库的数据迁移路径,并且引入中间库作为临时存储,确保数据在迁移过程中的完整性和准确性,从而满足主动查询源数据库时的数据记录或数据更新对应的数据集成需求,提升数据集成过程中的效率。具体的,本申请实施例在配置源数据库的连接信息后,可以向源数据库发送主动查询请求,从而在查询源数据库存在数据记录时,可以将源数据库的全量数据拉取到中间库,然后再根据数据集成语句,将全量数据从中间库拉取到目标库,确保了目标库拥有源数据库的完整快照,可以适用于初始化或周期性的源数据库和目标库的完全同步;在源数据库发生数据更新时,将增量数据拉取到中间库,然后再利用带有增量更新子句的数据集成语句,将增量数据从中间库拉取到目标库,减少了网络负载和存储需求,同时确保目标库的数据最新状态。进一步的,在数据从中间库传输到目标库的过程中,进行数据类型转换和字段映射,可以通过目标库的企业标准模型解决不同源数据库间可能存在的数据形式不一致的问题,从而统一不同源数据库的数据模型。因此,本申请实施例可以有效地管理跨数据库的数据集成,无论是需要定期同步源数据库中的全量数据,还是更新源数据库中的增量数据,本申请实施例均可以实现数据完整、准确的集成,且统一不同源数据库的数据模型;并且,通过使用专门的SQL语句处理全量数据采集和增量数据采集,可以实现数据集成的资源优化使用。因此,本申请实施例可以提升数据集成过程中的效率,优化数据集成的性能。
附图说明
为了更清楚地说明本申请实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本申请的实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据提供的附图获得其他的附图。
图1为数据集成平台的架构图。
图2为基于源数据库的数据集成的流程图。
图3为基于源数据库的实时数据采集的流程图。
图4为基于源文件系统的数据集成的流程图。
图5为基于源网络API接口的数据集成的流程图。
图6为基于源MQ的数据集成的流程图。
图7为数据集成装置的框图。
具体实施方式
下面将结合本申请实施例中的附图,对本申请实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本申请一部分实施例,而不是全部的实施例。基于本申请中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本申请保护的范围。
作为可选实现,图1示例了数据集成平台的架构图,数据集成平台可以是实现数据集成的服务器平台,可以由多台服务器组成的服务器系统实现。参照图1,该数据集成平台可以包括数据采集系统110,至少用于从异构数据源采集数据,并集成到数据存储和元数据管理系统120中。
异构数据源是指数据来源不同的多个数据源,也就是说,异构数据源包括多个数据源,该多个数据源的数据来源多样化,比如数据在不同数据源的格式、结构、存储方式或访问方法并不相同。示例的,异构数据源可以包括:
源数据库(DB,Data Base),即本申请实施例可以从作为数据源的数据库采集数据,为便于说明,作为数据源的数据库称为源数据库,源数据库例如企业的MySQL数据库等;
源文件系统,即本申请实施例可以从作为数据源的文件系统采集数据,为便于说明,作为数据源的文件系统称为源文件系统;
源网络API(Application Programming Interface,应用程序编程接口)接口,即本申请实施例可以通过作为数据源的网络API接口进行网络访问,从而采集数据;为便于说明,作为数据源的网络API接口称为源网络API接口;
源MQ(Message Queue,消息队列),即本申请实施例可以从作为数据源的MQ采集数据,为便于说明,作为数据源的MQ称为源MQ。
下面分别从源数据库、源文件系统、源网络API接口、源MQ的数据集成进行说明。
作为可选实现,图2示例了基于源数据库的数据集成的流程图,如图2所示,该流程可以包括如下步骤。
步骤S210,对于全量数据采集,将源数据库的全量数据采集到中间库的中间表;对于增量数据采集,根据增量更新字段,将源数据库的增量数据采集到中间库的中间表。
本申请实施例基于源数据库的数据集成涉及:从源数据库采集数据到中间库的中间表的第一阶段,以及,从中间库的中间表拉取数据到目标库的目标表的第二阶段。可选的,中间库为数据集成平台的中间存储系统对应的数据库,用于存储数据集成的中间数据,例如,结合图1所示,中间库可以是数据存储和元数据管理系统120中的ODS(OperationalData Store,操作数据存储)库;中间表可以为中间库中的表,例如,中间表可以是ODS库中的ODS表。结合图1所示,目标库可以为采用企业标准模型的数据库,例如目标库中数据的数据模型为企业标准模型;目标表为目标库中的表,目标表中的数据采用企业标准模型。
需要说明的是,在本申请实施例中,ODS库设计用来临时存储来自多个数据源的数据,充当数据集成的缓冲层。企业标准模型是指本申请实施例定义的标准化、归一化的数据模型,用于统一异构数据源的数据模型。
步骤S210主要用于实现基于源数据库的数据集成的第一阶段,即通过全量数据采集和增量数据采集的方式,实现从源数据库采集全量数据和增量数据到中间表(例如ODS表)。
在可选实现中,全量数据采集可以在数据集成的初期进行,或者当需要完全刷新ODS表的数据时执行,全量数据采集涉及将数据库中的所有数据复制到ODS表,例如,全量数据采集可以用于ODS表的初始化或定期的数据完全同步。
作为可选实现,对于全量数据采集,以数据集成工具采用DataX组件为例,本申请实施例可以采用DataX组件,通过配置适配源数据库的Reader插件(例如MySQL Reader插件)和中间库的Writer插件(例如Postgres Writer插件),以及,通过传递JSON(JavaScriptObject Notation,JavaScript对象表示法)配置的方式,将数据从源数据库采集到中间表。
需要说明的是,DataX组件是一个开源数据集成工具,主要用于在各种数据库、数据仓库和数据湖之间进行大批量数据迁移和同步,当然,本申请实施例也可支持使用其他类型的数据集成工具,而不限于DataX组件;DataX组件支持可定制的插件架构,可以通过不同的Reader(读)和Writer(写)插件来连接到各种数据源和中间库(例如ODS库)。
在DataX组件中,Reader插件负责从数据源读取数据,每种Reader插件支持一种特定类型的数据源,例如,MySQL Reader插件是DataX组件提供的一个插件,用于从MySQL数据库读取数据。Writer插件在DataX组件中负责将数据写入中间库,例如,以中间库为PostgreSQL数据库的形式(比如ODS库为PostgreSQL数据库),则Postgres Writer插件用于将数据写入PostgreSQL数据库。另外需要说明的是,JSON格式是一种轻量级的数据交换格式,易于用户阅读和编写,同时也易于机器解析和生成。
需要进一步说明的是,在本申请实施例中,JSON配置可以是JSON配置文件中的配置信息(比如配置项),表达了从数据源提取数据,并将数据写入中间库的处理方式。也就是说,JSON配置能够使得DataX工具了解并执行从数据源到中间库的数据迁移任务。示例的,JSON配置(比如JSON配置文件中的配置项)可以包括但不限于:
Reader配置信息,用于从源数据库读取数据的配置信息,例如数据源连接信息和元数据查询信息;其中,数据源连接信息用于DataX组件连接到指定的数据源,其内容可以包括数据源类型、服务器地址、端口号、用户名和密码等,从而在JSON配置的Reader配置信息中设置数据源连接信息与源数据库相应(即配置源数据库的连接信息),则可以确保DataX组件能够访问并读取源数据库中的数据;元数据查询信息用于指导DataX组件从数据源中读取数据,其内容可以包括执行的SQL查询语句,或数据源中指定要迁移的表和字段,从而在JSON配置的Reader配置信息中设置元数据查询信息,则可以定义DataX组件在所连接的源数据库读取数据的具体行为;
Writer配置信息,用于将数据写入中间库的配置信息,例如中间库连接信息、中间表和插入字段信息等;其中,中间库连接信息用于设置DataX组件所连接到中间库(例如ODS库),其内容包括库类型、服务器地址、端口号、用户名和密码等,从而在JSON配置的Writer配置信息设置中间库连接信息,可以确保DataX组件将从源数据库读取的数据写入中间库;中间表和插入字段信息用于指定数据在中间库中的存储位置和方式,例如,中间表的名称和字段信息,从而在JSON配置的Writer配置信息中设置中间表的名称、字段信息,可以确保数据在中间库的中间表(例如OSD库的ODS表)按预期进行存储;
Transformer(转换)配置信息,用于在数据从源数据库传输到中间表的过程中,指示需要进行的转换操作,其内容可以包括数据清洗、格式化或字段值计算等;
Config(设置)配置信息,例如设置任务数量信息、错误限制信息等;其中,任务数量信息定义DataX组件执行数据迁移时的并行处理能力,其可以是并行执行的任务数量,任务数量信息需要平衡数据处理速度和系统负载;错误限制信息用于设置在数据迁移过程中允许的最大错误数,其内容可以为允许的错误次数上限,通过设置错误限制信息对应的错误次数阈值(即错误次数上限),则一旦数据迁移过程中的错误次数超过该错误次数阈值,则数据迁移任务停止。
本申请实施例上述所述的数据迁移任务可以是数据从数据源(例如源数据库)到中间库(例如ODS库)的迁移任务。
在进一步的可选实现中,本申请实施例可以通过Channel(通道)的任务拆分技术,允许多任务同时进行,从而提升数据迁移任务的执行效率和性能。具体的,Channel(通道)可以是DataX工具等数据集成工具用于数据传输的数据传输通道,即Channel(通道)负责管理从源数据库到中间库的数据流,从而通过将数据迁移任务拆分为多个子任务,并且多个子任务划分到多个Channel上并行执行,可以实现多任务同时进行,从而提升数据迁移的效率和性能;例如,每个Channel可以运行在一个单独的线程或进程,从而数据迁移任务被划分到多个独立的Channel上执行,则多个Channel对应的多个线程或进程可以同时运行,每个线程或进程执行不同的Channel上划分的任务,可以实现多任务同时进行,从而提升数据迁移的效率和性能。
在可选实现中,增量数据采集可以是将数据记录已存在于中间表但数据内容发生变化的数据,从源数据库迁移到中间表;增量数据采集依赖于能够识别源数据库的数据记录变化机制,例如时间戳、日志文件或数据监测工具等;通过增量数据采集可以减少数据从源数据库到中间库的数据传输量,提高数据传输效率。
在可选实现中,对于增量数据采集,本申请实施例可以在DataX工具的Writer配置中(例如上述Writer配置信息),设置Write Mode(写入模式)的属性为update(更新),以指示当前任务是更新中间表中已经存在的记录而不是插入新记录,即对中间表中已存在的记录(记录即数据记录)进行数据更新;同时,在DataX工具的Writer配置(例如上述Writer配置信息)中,指定字段信息(例如业务主键信息),以表示中间表中属于增量数据的字段。其中,业务主键可以用于唯一标识表中的记录,在Writer配置中设置业务主键字段,则可以用于标识中间表中的每条记录的唯一性;进而,在执行数据更新任务时,DataX组件可以利用业务主键字段来确定中间表中需要更新的记录。也就是说,本申请实施例可以根据Writer配置信息中设置的更新的写入模式,以及属于增量数据的字段信息(比如业务主键字段),将源数据库的增量数据采集到中间库的中间表。
步骤S220,对于全量数据采集,根据数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表;对于增量数据采集,利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表;其中,数据从中间表拉取到目标表的过程进行数据类型转换和字段映射。
步骤S220主要用于实现基于源数据库的数据集成的第二阶段。作为可选实现,本申请实施例可以根据需求选择使用SQL组件或DataX组件进行中间表的数据到目标表的迁移。
可选的,本申请实施例可以生成和执行数据集成语句(例如SQL语句),以进行中间表的数据到目标表的迁移。在可选实现示例中,数据集成语句(例如SQL语句)可以从一个或多个中间表中提取数据,提取的数据通过指定的关联条件和筛选条件进行过滤和联接,然后插入到企业标准模型的目标表;目标表为采用企业标准模型的表,用于在统一数据模型和格式的情况下,存储经过整理和转换的数据,以支持企业的分析和业务决策。
示例的,数据集成语句(例如SQL语句)可以执行以下过程:
数据拉取过程,数据拉取可以通过SELECT(选择)子句实现,用于选择和拉取中间表的数据;例如,SELECT(选择)子句指定了拉取数据的字段、拉取数据的字段所来自的一个或多个中间表;
数据整合过程,数据整合可以通过JOIN(组合)子句实现,用于通过关联条件将多个中间表中拉取的数据进行合并;例如,JOIN子句可以基于业务主键或其他关联字段(比如客户ID或订单ID)连接两个中间表;
数据过滤过程,数据过滤可以通过WHERE子句(条件子句)实现,用于从数据整合过程合并的数据中选择满足筛选条件的记录,以实现数据过滤;例如,WHERE子句可以基于日期、状态、数值范围等多种筛选条件进行数据过滤;
数据插入过程,数据插入可以通过INSERT INTO子句(插入子句)实现,用于指定目标表及字段,即上述数据拉取、数据整合和数据过滤的数据被插入的目标表以及字段。
作为可选实现,对于全量数据采集,本申请实施例可以通过具有上述数据拉取过程、数据整合过程、数据过滤过程和数据插入过程的数据集成语句(例如SQL语句),从中间表拉取全量数据到企业标准模型的目标表。例如,对于全量数据采集,本申请实施例可以利用数据集成语句的上述数据拉取过程、数据整合过程和数据过滤过程的处理过程,从中间表得到待插入的全量数据;进而,待插入的全量数据经数据类型转换、字段映射之后,通过数据集成语句的上述数据插入过程,插入到目标表中。
在可选实现中,数据从中间表到目标表的迁移过程中可以进行自动化的数据类型转换和字段映射。可选的,本申请实施例可以利用数据资产平台的元数据信息以及页面配置的映射信息,自动进行数据的类型转换和字段映射。
其中,元数据信息是描述数据的数据,提供了有关数据源、内容、格式和结构等的详细信息;在数据迁移和集成过程中,元数据可以指示数据的来源、数据类型、以及数据的处理方式。页面配置的映射信息是指通过用户界面设置的规则,定义了将中间表的数据字段映射到目标表的相应字段的规则;该映射信息可以确保数据从中间表迁移到目标表时能够按照预定的方式正确对应。
在可选实现中,本申请实施例可以按照元数据信息指示的数据类型(即目标表中的数据对应的数据类型),自动将从中间表拉取的数据的类型转换为元数据信息指示的数据类型;根据页面配置的映射信息对应的规则,将从中间表拉取的数据的字段,映射为目标表的相应字段。示例的,以从中间表中拉取一个字段名为Date_of_Birth,类型为字符串的数据为例,如果目标数据库中相应的字段名为DOB,类型为日期,则可以根据元数据信息指示的数据类型将字符串格式转换为日期格式,根据映射信息(Date_of_Birth到DOB的映射),将转换类型后的数据存储在目标表的DOB字段中。
对于增量数据采集,当将数据从中间表迁移到目标表时,涉及将中间表中的记录插入目标表,而当目标表中存在与中间表相同业务主键的记录时,需要对该记录更新数据,而不是插入新记录;此时,本申请实施例可以利用带有增量更新子句的数据集成语句,将中间表中更新的数据(即增量数据),迁移到目标表中已存在的记录中。示例的,增量更新子句可以执行如下过程:
检测冲突过程,在尝试将中间表的记录插入目标表时,如果记录的业务主键已存在于目标表,则触发冲突;
执行更新过程,当冲突发生时,针对冲突记录,使用中间表中的增量数据替换目标表中冲突记录的旧数据;例如将冲突记录在中间表中的字段数据更新到目标表中对应的字段。
示例的,可以通过ON CONFLICT子句(冲突处理子句),自动检测业务主键的冲突问题;通过DO UPDATE SET子句(执行更新设置子句)指示更新操作为:将冲突记录在中间表中的字段的数据,更新到目标表中对应的字段。从而,当中间表和目标表存在相同业务主键的记录时,如果中间表中的记录发生数据变更,则相应的数据变更也会反映在目标表的相应记录中。也就是说,ON CONFLICT子句的冲突检测、以及DO UPDATE SET子句的更新操作,可以用于处理业务主键冲突,适用于增量数据到目标表的更新,从而保障目标表的数据一致性。
进而,在利用数据集成语句从中间库拉取数据到目标库时,对于增量数据,本申请实施例可以通过具有上述检测冲突过程、执行更新过程的增量更新子句,从中间表拉取增量数据到企业标准模型的目标表。例如,本申请实施例在利用数据集成语句从中间库拉取数据到目标库时,可以利用增量更新子句的上述检测冲突、执行更新的处理过程,得到待插入的增量数据,并且待插入的增量数据经过数据类型转换、字段映射之后,插入到目标表相应的冲突记录中。
在可选实现中,上述图2所示的全量数据采集和增量数据采集可以是非实时的数据采集,例如可以基于主动查询方式触发图2所示方法。在可选实现中,主动查询可以是通过Pull(拉取)全量数据和增量数据的方式,将数据从源数据库拉取到中间表再拉取到目标表。在可选实现中,本申请实施例可以在配置源数据库的连接信息之后,定时(例如每天或每小时)或受用户手动触发向源数据库发送主动查询请求,以查询源数据库是否存在数据记录或数据更新(例如已有记录的数据发生更新);在源数据库存在数据记录时,数据采集系统可以通过全量数据采集(参照图2所示),将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;在源数据库存在数据更新时,数据采集系统可以通过增量数据采集(参照图2所示),将源数据库的增量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表。
也就是说,在可选实现中,图2所示的方法可以在配置源数据库的连接信息后,向源数据库发送主动查询请求的情况下执行,从而通过全量数据采集将涵盖源数据库的数据记录的全量数据,从源数据库拉取到中间表再拉取到目标表;在源数据库存在更新数据时,通过增量数据采集将对应的增量数据,从源数据库拉取到中间表再拉取到目标表。上述过程可以是在配置源数据库的连接信息后,基于定时触发或手动触发实现。
本申请实施例可以在配置源数据库的连接信息后,向源数据库发送主动查询请求,以查询源数据库是否存在数据记录或数据更新;在源数据库存在数据记录时,通过全量数据采集,将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;所述中间库为数据集成平台的中间存储系统对应的数据库,目标库为采用企业标准模型的数据库;在源数据库存在数据更新时,通过增量数据采集,将源数据库的增量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;
其中,对于全量数据采集,将源数据库的全量数据采集到中间库的中间表,再根据数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表;对于增量数据采集,根据增量更新字段,将源数据库的增量数据采集到中间库的中间表,再利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表;数据从中间表拉取到目标表的过程还进行数据类型转换和字段映射。
可见,本申请实施例可以通过全量数据采集和增量数据采集的流程,涵盖从源数据库到目标库的数据迁移路径,并且引入中间库作为临时存储,确保数据在迁移过程中的完整性和准确性,从而满足主动查询源数据库时的数据记录或数据更新对应的数据集成需求,提升数据集成过程中的效率。具体的,本申请实施例在配置源数据库的连接信息后,可以向源数据库发送主动查询请求,从而在查询源数据库存在数据记录时,可以将源数据库的全量数据拉取到中间库,然后再根据数据集成语句,将全量数据从中间库拉取到目标库,确保了目标库拥有源数据库的完整快照,可以适用于初始化或周期性的源数据库和目标库的完全同步;在源数据库发生数据更新时,将增量数据拉取到中间库,然后再利用带有增量更新子句的数据集成语句,将增量数据从中间库拉取到目标库,减少了网络负载和存储需求,同时确保目标库的数据最新状态。进一步的,在数据从中间库传输到目标库的过程中,进行数据类型转换和字段映射,可以通过目标库的企业标准模型解决不同源数据库间可能存在的数据形式不一致的问题,从而统一不同源数据库的数据模型。因此,本申请实施例可以有效地管理跨数据库的数据集成,无论是需要定期同步源数据库中的全量数据,还是更新源数据库中的增量数据,本申请实施例均可以实现数据完整、准确的集成,且统一不同源数据库的数据模型;并且,通过使用专门的SQL语句处理全量数据采集和增量数据采集,可以实现数据集成的资源优化使用。因此,本申请实施例可以提升数据集成过程中的效率,优化数据集成的性能。
在进一步的可选实现中,本申请实施例还可支持对源数据库的数据进行实时采集,即捕捉源数据库的数据变更事件,并将数据变更事件所变更的数据同步到中间库或目标库,比如,源数据库的实时变更数据可以同步到中间库,或者同步到中间库再到目标库,或者,直接同步到目标库。需要说明的是,区别于上述全量数据采集和增量数据采集的主动查询触发方式,源数据库的数据实时采集可以是实时进行的,例如数据采集系统并不是定时或受用户手动触发来查询源数据库,而是持续监听源数据库的变更日志,从而源数据库的任何数据变化(插入、更新、删除)都会即时写入变更日志;进而,数据采集系统监听到变更日志存在的数据变更事件时,可以实时捕获源数据库的数据变更事件,并立即反映到中间库或目标库中。
在可选实现中,图3示出了基于源数据库的实时数据采集的流程图,如图3所示,该流程可以包括如下步骤。
步骤S310,实时监控源数据库的变更日志,以实时捕获源数据库的数据变更事件。
在可选实现中,本申请实施例可以采用Debezium平台执行步骤S310;其中,Debezium平台是一个开源的分布式平台,用于实时捕获并转发源数据库变更事件,Debezium平台可以监控源数据库的变更日志,并捕获变更日志对应的变更事件。从而,本申请实施例可以在源数据库中部署Debezium平台,以实时监听源数据库的变更日志,例如实时监听MySQL的Binlog(Binary Log,二进制日志)、PostgreSQL的WAL(Write AheadLogging,预写式日志)等;通过实时监听源数据库的变更日志,Debezium平台可以实时捕获源数据库的数据变更事件,例如数据插入、数据更新和数据删除等。
步骤S320,将数据变更事件格式化为消息,并实时发送到Kafka的指定主题。
在可选实现中,本申请实施例可以采用Kafka连接器(Connector)配合Debezium平台,以实现基于源数据库的实时数据采集。Debezium平台在实时捕获到源数据库的数据变更事件后,可以将数据变更事件实时发布到Kafka连接器。例如,Debezium捕获的数据变更事件可以被格式化为消息,并实时通过Kafka的Source Connector(源连接器)发送到Kafka集群中的一个或多个指定主题(Topic),从而Kafka作为中间消息队列,可以存储源数据库的这些数据变更事件对应的消息,以便Kafka的下游系统或下游服务可以实时访问和处理这些消息。
需要说明的是,Kafka的Source Connector(源连接器)是Kafka连接器的一部分,用于从外部系统中抓取数据并将这些数据推送到Kafka集群中;Source Connector起到数据输入的作用,将不同来源的数据流统一地转移到Kafka服务器上。Topic(主题)是Kafka的数据流的类别或者数据源的标识,每个主题包含一系列数据记录,Kafka根据主题来存储和管理数据。
步骤S330,通过Kafka的下游系统或下游服务消费指定主题的消息,且根据消费的消息,实时将消息对应的变更数据应用到中间库或目标库。
作为可选实现,Kafka的下游系统或下游服务可以使用Kafka的Sink Connector(汇聚连接器)从Kafka中订阅并消费指定主题的消息;消费的指定主题的消息可以包含源数据库对应的数据变更事件的类型和详细数据,例如数据变更事件变更的数据记录和变更类型(插入、更新、删除等);从而,Kafka的下游系统或下游服务根据消费到的消息内容,可以实时地将这些数据变更事件应用到中间库或目标库中,以保证中间库或目标库与源数据库之间的数据一致性和实时更新。
需要说明的是,Sink Connector(汇聚连接器)用于将数据从Kafka集群传输到外部系统(例如下游系统或下游服务),主要负责从Kafka的主题中消费消息,并将这些消息的内容推送到外部系统中去。
在进一步的可选实现中,本申请实施例可以基于Kubernetes的部署,实现对于源数据库的实时数据采集;例如,Debezium平台和Kafka可以在Kubernetes平台上部署,利用Kubernetes的容器化和自动化管理特性来增强可用性和伸缩性;其中,Kubernetes环境提供了自动化部署、扩展和管理容器化应用的能力。
在进一步的可选实现中,本申请实施例可以通过各种监控工具和日志记录功能,实时监控和记录数据采集系统的运行状态、性能和潜在的系统故障,从而便于及时发现问题、故障排查和系统优化。
作为可选实现,本申请实施例还可支持对源文件系统进行数据集成,源文件系统的数据可以集成到中间库,或者集成到中间库再到目标库,或者,直接集成到目标库。可选的,图4示例了基于源文件系统的数据集成的流程图,如图4所示,该流程可以包括如下步骤。
步骤S410,对于源文件系统的源文件的全量导入,通过用户配置选择需导入的源文件,以及配置源文件的数据页内区域与中间表或目标表的逻辑实体字段之间的映射关系;解析源文件的页内容,根据该映射关系生成SQL语句,并执行SQL语句,以清空中间表或目标表中的现有数据,并将源文件中的所有数据插入到中间表或目标表中。
本申请实施例对于源文件系统的数据集成主要分为全量导入和追加导入。步骤S410主要实现全量导入,支持将源文件系统的源文件全量导入中间表或目标表中。全量导入可以视为是对源文件系统的源文件进行批量导入,以源文件为Excel文件为例,则本申请实施例可以提供导入方案配置界面,从而用户可以在导入方案配置界面进行用户配置,用户配置可以包括:选择需要导入的Excel文件(即选择需导入的源文件)、以及配置Excel文件中的列与中间表或目标表中的逻辑实体字段(例如中间表或目标表中的字段)之间的映射关系。在以源文件为Excel文件为例的情况下,源文件的数据页内区域可以例如Excel文件中的列。
从而,本申请实施例提供的数据采集系统可以自动解析选择的Excel文件,读取Excel文件中的页内容(例如Excel文件中的Sheet页内容);进而数据采集系统可以根据用户配置的映射关系,生成相应的SQL语句。可选的,该SQL语句主要包括全量删除语句和全量插入语句,其中,全量删除语句用于删除(即清空)中间表或目标表中的现有数据,全量插入语句用于将选择的Excel文件的所有数据插入到中间表或目标表中;也就是说,本申请实施例可以通过全量删除来清空中间表或目标表中的现有数据,通过全量插入来将Excel文件的所有数据插入到中间表或目标表中,从而通过执行SQL语句,本申请实施例可以实现Excel文件的数据全量导入到中间表或目标表中。
步骤S420,对于源文件的追加导入,复用所述用户配置;检查中间表或目标表中已有的数据,将源文件的追加数据根据用户配置的映射关系追加到中间表或目标表中,以在中间表或目标表中已存在的记录更新数据。
追加导入适用于源文件的数据需要持续更新的场景,特别是当源文件的新数据不断产生,且需要保持中间表或目标表的数据为最新时。在追加导入下,本申请实施例可以复用全量导入对应的用户配置,包括源文件的文件结构和映射关系等;从而,以Excel文件为例,本申请实施例可以解析选择的Excel文件,读取Excel文件中需要追加的追加数据(例如需要更新的更新数据);然后,本申请实施例可以检查中间表或目标表中的已有数据,将追加数据根据配置的映射关系追加到中间表或目标表中,例如,基于映射关系指示的追加数据在Excel文件中的列所对应的中间表或目标表中的字段,将追加数据追加到中间表或目标表中相应的字段;其中,对于在中间表或目标表中已经存在的记录(可以基于业务主键字段识别),本申请实施例不会重新插入该记录,而是更新该记录的数据,以确保记录中的数据是最新的。
作为可选实现,本申请实施例还可支持对源网络API接口来源的数据进行数据集成,源网络API接口来源的数据可以集成到中间库,或者集成到中间库再到目标库,或者,直接集成到目标库。可选的,图5示例了基于源网络API接口的数据集成的流程图,如图5所示,该流程可以包括如下步骤。
步骤S510,配置源网络API接口,在基于配置的源网络API接口的数据预览阶段,定义源网络API接口的数据映射关系;并且,根据源网络API接口的数据映射关系,创建相应的中间表。
在可选实现中,本申请实施例可以提供API配置界面,以允许用户配置源网络API接口,例如,用户可以通过API配置界面输入API的URL(Uniform Resource Locator,统一资源定位符)和请求参数,请求参数比如API的访问密钥(Token)和加解密配置等。
本申请实施例进一步提供数据预览功能,允许用户基于配置的源网络API接口验证API调用是否成功,以及预览API返回的数据结构。在上述数据预览阶段(例如预览API返回的数据结构的阶段),本申请实施例可以支持定义源网络API接口的数据映射关系,定义源网络API接口的数据映射关系可以主要包括定义数据路径和配置主键字段;例如,用户可以定义数据路径(即定义源网络API接口返回的数据结构中被使用的部分)和主键字段,以确保API返回的数据能正确地映射到中间表的相应字段。
进而,基于用户定义的数据映射关系,本申请实施例可以在资产目录中创建对应的ODS表(中间表的一种示例),且ODS表的表名和字段与源网络API接口返回的数据结构相对应。
在可选实现中,本申请实施例可以解析源网络API接口的数据映射关系中的客户表信息,从而获取客户表的元数据,例如基于解析的客户表信息链接到源网络API接口的客户表,从而获取到客户表的元数据;进而,将获取的元数据与中间表的建表规则相结合,并检查是否已存在相应的中间表,这是为了确保了数据不会被重复创建,如果资产目录中已存在相应的中间表,则可直接使用;如果不存在,则需要进行下一步的中间表创建;也就是说,如果在资产目录中没有找到相应的中间表,则根据获取的元数据在资产目录中创建新的中间表,创建中间表时不仅要包括中间表的基本结构(比如字段名和类型),还需要包含完整的元数据信息,比如表描述和唯一约束,以确保中间表的完整性和数据的一致性。
步骤S520,调用源网络API接口,并处理源网络API接口的网络请求,以从源网络API接口获取数据路径对应的数据;根据从源网络API接口获取的数据,生成JSON脚本,以将获取的数据加载到创建的中间表中;其中,在数据加载到中间表的过程中,采用数据映射关系配置的主键字段,处理数据重复或数据冲突。
在可选实现中,用户可以通过配置cron表达式来设定一个定时任务(cron表达式用于在预定的时间自动执行任务),从而定时自动调用源网络API接口。在定时调用源网络API接口时,本申请实施例可以发送HTTP(HyperText Transfer Protocol,超文本传输协议)请求到源网络API接口的URL;进一步的,本申请实施例可以使用DataX组件扩展的Restful Reader插件来处理源网络API接口的网络请求,从而确保从源网络API接口接收的数据被正确解析和处理,比如,能够正确接收数据映射关系定义的数据路径中被使用的那部分数据。需要说明的是,使用DataX组件扩展的Restful Reader插件处理源网络API接口的网络请求,可以涉及处理源网络API接口的HTTP请求和响应。
需要说明的是,Restful Reader插件是DataX组件中的一个读取插件,用于从支持Restful API的数据源读取数据,此插件可以处理来自Restful服务的HTTP请求和响应,从而获取数据并转换成DataX组件能够处理的格式,用于数据同步过程。
基于从源网络API接口获取的数据,本申请实施例可以生成DataX组件的执行任务的JSON脚本,JSON脚本通过调度平台被执行,可以负责将获取的数据加载到创建的中间表。
在数据加载到中间表的过程中,本申请实施例可以基于数据映射关系定义的主键字段,采用主键冲突更新策略来处理可能出现的数据重复或冲突问题,避免数据冗余和错误。
在进一步的可选实现中,本申请实施例可以实时监控DataX组件的任务执行状态,并记录相关日志,以便于进行故障排查和性能优化,确保数据集成的稳定性和效率。
作为可选实现,本申请实施例还可支持对源MQ的数据进行数据集成,源MQ的数据可以集成到中间库,或者集成到中间库再到目标库,或者,直接集成到目标库。可选的,图6示例了基于源MQ的数据集成的流程图,如图6所示,该流程可以包括如下步骤。
步骤S610,配置MQ实例任务,并提供MQ实例任务的数据预览功能,以便查看从监听的MQ中读取的消息数据。
在可选实现中,本申请实施例可以提供可视化界面,以支持用户配置MQ实例任务;例如,通过可视化界面,用户可以输入MQ实例的配置信息,比如MQ的连接信息以及要监听的消息队列名称等,MQ的连接信息例如主机地址、端口、用户名、密码等;通过MQ实例的配置信息,本申请实施例可以生成MQ Source Connector(源连接器),MQ Source Connector(源连接器)可以用于从指定要监听的MQ中读取数据;从而,MQ Source Connector作为一个连接组件可以连接到MQ实例,即MQ Source Connector依赖于连接的MQ实例,来从指定要监听的MQ中获取数据。
进一步的,本申请实施例提供MQ实例任务的数据预览功能,允许用户在配置MQ实例任务后,查看从要监听的MQ中读取的消息数据。
进一步的,如果读取的消息数据是列表形式,则本申请实施例支持将列表转换为多条独立的消息,以便后续处理。作为可选实现示例,在前端界面,用户可以从接收到的MQ消息中选择需要处理的JSON数据路径,所选择的数据路径被后端系统持久化存储到目标库中;在结合使用Kafka的情况下,MQ发送的消息不仅包含数据,还指定了消息对应的Kafka主题(Topic),该主题可以由MQ Source Connector管理,从而消息可以直接推送到指定主题相应的Kafka消息队列中;进而,本申请实施例可以监听Kafka的指定主题,消费Kafka的指定主题对应的消息,比如本申请实施例可以使用选择的数据路径来识别和处理指定主题的消息;然后,本申请实施例在接收到每条消息数据后,如果消息数据包含列表形式的数据(例如一个数组),则可以遍历这个列表,把列表中的每个数据单独提取出来并格式化为JDBC(Java Database Connectivity,Java数据库)Sink Connector可接受的格式,格式化后的数据可以发送到另一个指定的Kafka主题,以便最终被送入中间库。
步骤S620,配置JDBC Sink Connector,以指定中间库的连接信息;以及,将从MQ中读取的消息数据写入中间库。
JDBC Sink Connector是一种用于将数据从Kafka流式传输到支持JDBC协议的数据库系统的工具,此连接器负责从Kafka的指定主题读取数据,并将其写入到配置的数据库中。从而,本申请实施例在配置MQ实例任务,并通过MQ的MQ Source Connector从指定MQ中读取数据时,从MQ读取的数据可以推送到Kafka消息队列中,进而本申请实施例可以通过JDBC Sink Connector从Kafka的指定主题对应的Kafka消息队列中读取数据,并写入中间库。
在可选实现中,本申请实施例可以配置JDBC Sink Connector来指定中间库的连接信息,比如库类型、主机地址、端口等;进一步的,本申请实施例还可支持通过下拉菜单的方式,从中间库中选择中间表,进而,根据所选择的中间表配置JDBC Sink Connector的表映射信息,该表映射信息用于将从MQ读取的消息数据映射到中间库的中间表的相应字段。
作为可选实现示例,用户在创建新的任务实例时可以进行多项选择,包括确定数据的路径、主键字段的路径,并选择一个OSD表,比如预先在资产目录中创建ODS表,用户可以通过搜索和选择来OSD表;用户完成上述配置并提交后,后台系统可以自动将这些配置信息持久化,比如保存到配置数据库中;进而,系统可以读取用户的配置信息,并且使用一个JDBC Sink Connector的模板(包含了一些固定的配置项和一些变量部分),从而通过读取的配置信息替换模板中的变量部分,生成最终的配置文件(例如JSON格式),生成的配置文件可以通过RESTful API传送到Kafka连接器的服务端,一旦配置文件上传并被Kafka连接器的服务端接受,则相应的JDBC Sink Connector实例被创建,可以开始执行数据同步任务。
进而,JDBC Sink Connector可以从Kafka的指定主题对应的Kafka消息队列中读取消息数据(如果是列表形式,还涉及到格式转换),按照配置的表映射信息,将消息数据正确地映射并写入到中间表的相应字段。
在进一步的可选实现中,本申请实施例可以提供任务监控界面,以便用户可以实时查看任务的执行状态、进度和日志信息,从而用户可以跟踪数据集成任务的性能和可能的执行问题。进一步的,通过后台轮询机制定期检查任务状态,可以及时发现并处理异常或错误;发现异常或错误问题时,本申请实施例可以通过预设的告警机制(比如邮件、即时通讯软件等方式)通知用户,确保问题能够得到快速响应和处理。
在进一步的可选实现中,结合图1所示,本申请实施实施例可以通过智能匹配系统130,以智能匹配与自动化映射方式,至少实现数据从数据源到中间库再到目标库的集成。数据的智能匹配和自动化集成可以涉及关系投影(RP,Relation Projection)、实体关系(ER,Entity Relationship)推荐、以及基于不同数据源(例如API、MQ等)的数据集成策略等。
在可选实现中,本申请实施例可以自动分析ERP数据库(源数据库的一种示例)中的表数据结构(例如订单表和用户表),并将表数据结构中的字段与企业标准模型中的对应实体自动匹配,例如,订单表的order_id字段与企业标准模型中订单实体的相应字段匹配。从而,基于表数据结构中的字段与企业标准模型中实体相应字段的映射和匹配关系,本申请实施例可以自动生成SQL语句,以在将数据从源数据库转移到ODS库之后,从ODS库转移到企业标准模型的目标库。也就是说,在基于源数据库的数据集成中,本申请实施例使用到了源数据库的表数据结构中的字段,与企业标准模型中实体相应字段的映射信息,基于该映射信息,本申请实施例可以使用前文描述的基于源数据库的数据集成的全量数据采集、增量数据采集方案,实现源数据库中的数据到目标库的集成。
在可选实现示例中,用户可以先配置ERP数据库(源数据库的一种形式)的连接信息,并导入ERP数据库关键的数据表,例如订单表和用户表的数据字典;从而,本申请实施例可以使用智能匹配算法分析ERP数据库关键的数据表中的字段,例如order_id、user_id、order_date等字段,并自动将这些字段与企业标准模型中的相应实体进行匹配,根据匹配结果,推荐源数据库的表数据结构中的字段,与企业标准模型中实体相应字段的映射信息,例如,将订单表映射到企业模型中的订单实体,用户表映射到用户实体等;进而,在确认上述映射信息后,本申请实施例可以在源数据库中的数据采集到中间库(例如OSD库)后,根据上述映射信息,自动生成将数据从中间库的中间表拉取到目标库的目标表的数据集成语句(例如SQL语句),并实现数据从中间表拉取到目标表的过程的字段映射;数据集成语句相应的处理过程可以参照前文相应部分的描述,此处不再赘述。
在进一步的可选实现中,对于涉及多个数据表的复杂映射情形,例如将发货单和签收单的数据集成到一个表中,则需要分析和确认这些表之间的实体关系(ER),从而本申请实施例可以进行ER匹配,例如将数据从源数据库拉取到中间库(ODS库)后,可以对拉取到中间库的数据进行ER匹配,得到推荐的ER关系,比如利用拉取到中间库的数据的字典和元数据(包括业务主键信息)来推荐可能的ER关系(示例的,如果订单表的user_id和用户表的user_id有关联,则可以推荐这两个表之间的一对多关系);进而,推荐的ER关系及其相似度信息可以被推送到数据资产平台和图谱引擎,从而用户可以在平台上确认ER关系,进而在用户确认ER关系后,本申请实施例可以基于ER关系优化数据模型、确定主表、并生成处理复杂数据关系的多对一的数据集成语句。
在可选实现示例中,ER关系及其相似度信息被推送到图谱引擎后,本申请实施例可以利用图谱引擎构建包含多个逻辑实体及其关系的复杂图谱,即图数据。需要说明的是,图谱引擎可以用于在Neo4j等图数据库中维护和操作数据关系;其中,Neo4j是一种图形数据库管理系统,使用节点和边来表示和存储数据,比如数据实体可以表示为节点,数据之间的关系可以表示为节点之间的连接边,从而使用图数据库可以直观地表示和维护实体之间的关系,例如订单和客户之间的关系;进一步的,使用图数据库可以容易地查询多张表之间的关系,例如,如通过一系列关联的节点快速找到关系链。
从而在构建图谱之后,确定多个逻辑实体之间的最短路径可以是图谱的一个应用,这在某些业务分析(如供应链分析、网络优化等)中具有价值,例如,可以快速找到两个实体间的最短交互路径,帮助优化流程或成本。进一步的,在生成用于数据集成的SQL语句时,确定的主表(主导数据结构的表)直接影响了数据处理和查询的效率,本申请实施例可以利用图谱,以基于实体间的关系权重和连接度来动态确定主表。
需要进一步解释的是,上述智能匹配算法可以从源数据库接收数据的元数据信息和语义内容作为输入,以便为后续的匹配和分析提供基础数据;智能匹配算法在进行匹配和分析时可以使用双重评估机制:
文本相似度得分,比如使用文本相似度算法计算数据之间的元数据和语义内容的相似度,以便评估字段名称、描述等文本信息的相似性,得到相似度值;
大模型执行匹配,比如使用机器算法(例如机器学习或深度学习模型)来分析和匹配数据之间的元数据和语义内容的相似度,得到相似度值;
从而,本申请实施例可以设置策略引擎,将上述双重评估机制计算得到的相似度值进行加权平均,得出一个综合评价得分,这一得分用于最终的映射信息推荐。
进一步的,在进行映射信息推荐时,本申请实施例可以根据用户配置的推荐排名信息,挑选出相似度最高的前N个表和字段,进而推荐出表和字段的映射信息,用于构建SQL语句。
进一步的,在进行ER关系推荐时,本申请实施例可以将多张客户表的数据拉取到ODS库;通过算法分析多张客户表的数据的业务主键等关键信息,执行数据匹配,并构建ER图谱,来表示多张客户表之间的关系;进而,可以输出多张客户表之间的关系得分,用于图谱构建和进一步的数据处理。
进一步的,本申请实施例可以整合映射信息和实体关系,例如,本申请实施例可以根据RP推荐的结果(映射信息)和ER推荐的结果(实体关系),构建复杂的SQL语句,以进行数据的最终集成。示例的,可以通过RP推荐得到的字段映射和ER关系推荐得到的表关系,来构建SQL语句。
在更进一步的可选实现中,智能匹配可以分为全量匹配和实时匹配;全量匹配可以是在数据采集前进行的步骤,主要用于生成源数据库中的表和目标表之间的映射信息,包括表的映射关系和字段的映射信息等。实时匹配针对全量匹配未能覆盖的表,即针对全量匹配未能覆盖的表,生成表之间的ER关系,从而支持复杂的多表关联语句的生成。
进而,本申请实施例可以整合全量匹配得到的映射信息,以及实时匹配得到的ER关系,生成本申请实施例的数据集成语句(例如SQL语句),且数据集成语句分为全量数据采集语句以及增量数据采集语句;其中,全量数据采集语句用于当查询源数据库存在数据记录时,将全量数据从源数据库采集到中间库之后,再迁移到目标库;增量数据采集语句可以是带有增量更新子句的数据集成语句,用于处理源数据库中的数据更新,将更新的增量数据通过中间库迁移到目标库。
在可选实现中,映射信息在SQL语句中可以涉及匹配INSERT INTO子句中指定的目标表及字段,用以确定数据如何从源数据库的表映射到目标库的表。ER关系在SQL语句中可以涉及匹配JOIN子句中的关联条件。
本申请实施例通过使用智能匹配算法来处理源数据库的文本数据,并结合机器模型等机器算法分析数据的语义内容,能够更好地理解和处理数据的语义层次,提高匹配的准确性和灵活性;相比于将数据向量化转换为一系列数值表示(涉及到特征提取和降维等处理),然后使用向量相似度计确定相似的实体向量的方式,本申请实施例不会忽略数据的语义信息,可以提高准确性;也就是说,向量相似度的计算依赖于准确的特征提取,可能会忽略数据的某些语义信息,导致不准确。进一步的,本申请实施例通过构建数据知识图谱,可以提升效率,减少重复工作。进一步的,使用加权平均和推荐排名策略来进行相似度评估,允许更细致和个性化的评价标准。
上述智能匹配与自动化数据集成方式,可以极大地提高数据集成的效率和准确性。通过自动识别和利用数据之间的关系,不仅减少了手动配置的工作量,还提高了数据集成的质量,使得企业能够更好地利用其数据资产,支持复杂的业务分析和决策过程,并且可以用于数据量大、结构复杂且需要频繁更新的企业数据环境。
在进一步的可选实现中,结合图1所示,数据集成平台还可以包括数据存储和元数据管理系统120、数据查询和应用系统140、基础设施和监控系统150。
示例的,数据存储和元数据管理系统120可以设置Catalog(数据目录)组件、ODS库、企业标准模型。其中,Catalog组件用于管理元数据,以支持数据治理和数据资产管理。ODS库、企业标准模型的介绍可以参照前文描述,此处不再展开。
数据查询和应用系统140用于数据查询和应用,例如,集成的数据可以应用于数据查询和应用系统,从而支持在线事务处理和在线分析处理等应用场景。示例的,数据查询和应用系统可以包括:
OLTP(On-Line Transaction Processing,在线事务处理)系统,用于处理在线事务请求,适用于需要快速响应的业务操作;
OLAP(On-Line Analytical Processing,在线分析处理)系统,用于复杂的分析查询,适合进行深入的数据分析;
Ad-hoc(临时)查询系统,用于支持对数据进行即席查询,适用于不定期的数据探索和分析需求。
基础设施和监控系统150用于为整个数据集成平台提供基础能力支持,包括但不限于提供:
开源容器编排平台,例如,提供Kubernetes,以用于自动化的应用程序容器部署、扩展和管理;
图谱血缘,用于跟踪和可视化数据从源(例如数据源)到目标(例如最终集成的数据)的流动路径,比如数据移动、转换和加工的详细记录等;
数据资产管理,涉及数据整个生命周期的管理,例如数据的收集、存储、维护、使用和归档的管理,数据资产管理涉及元数据管理(管理数据相关的元信息)、数据目录管理(提供易于搜索的数据资源库,使用户能够快速找到所需数据)、数据治理管理(管理数据安全、隐私保护、合法性等)、数据质量控制(监控和提高数据的准确性、可用性和一致性)等;
调度平台,用于在数据集成环境中负责管理和优化各种数据作业的执行,涉及任务调度(自动化安排任务执行的时间)、任务资源管理(合理分配任务的计算资源,确保任务的高效运行)、任务依赖管理(处理任务之间的依赖关系,确保按正确的顺序执行任务)、监控与报警(实时监控任务的作业状态,提供异常时快速响应并通知相关人员的机制)等。
本申请实施例提供的数据集成平台可以用于处理来自不同数据源的数据集成任务,包括数据库、Excel文件、API和MQ等;数据集成平台可以利用全链路调度和数据资产管理技术,以及智能匹配和评价策略,来优化数据集成过程。具体的,本申请实施例支持多种类型的数据源,包括数据库、Excel文件、通过API接入的数据以及来自消息队列的数据流;并且实现了从数据源到目标库的数据流动的自动化调度,确保数据在整个传输过程中的正确性和时效性;同时,本申请实施例可以处理全量数据采集和增量数据采集,有效管理数据负载和同步频率,并保障全量数据和增量数据同步的准确性,提升数据集成过程中的效率。
进一步的,本申请实施例可以利用智能匹配算法分析和推荐映射信息、ER关系,甚至整合映射关系和ER关系,来构建数据集成语句,以保障数据集成的效率和准确性,避免手动编写数据集成语句(例如SQL语句)所带来的技术门槛,降低数据集成的使用局限性。
作为可选实现,本申请实施例还提供一种数据集成装置,该数据集成装置可以视为是本申请实施例提供的数据集成方法相应的功能模块装置。下文描述内容可与上文描述内容相互对应参照。
可选的,图7示例性的示出了的数据集成装置的可选框图,如图7所示,该装置可以包括:
查询请求发送模块710,用于在配置源数据库的连接信息后,向源数据库发送主动查询请求,以查询源数据库是否存在数据记录或数据更新;
全量数据集成模块720,用于在源数据库存在数据记录时,通过全量数据采集,将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;所述中间库为数据集成平台的中间存储系统对应的数据库,目标库为采用企业标准模型的数据库;
增量数据集成模块730,用于在源数据库存在数据更新时,通过增量数据采集,将源数据库的增量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;
其中,对于全量数据采集,全量数据集成模块将源数据库的全量数据采集到中间库的中间表,再根据数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表;对于增量数据采集,增量数据集成模块根据增量更新字段,将源数据库的增量数据采集到中间库的中间表,再利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表;数据从中间表拉取到目标表的过程还进行数据类型转换和字段映射。
可选的,所述对于全量数据采集,将源数据库的全量数据采集到中间库的中间表包括:
通过配置适配源数据库的Reader插件和中间库的Writer插件,以及,通过传递JSON配置,将数据从源数据库采集到中间表;
其中,JSON配置至少包括:用于配置从源数据库读取数据的Reader配置信息,用于配置将数据写入中间库的Writer配置信息,用于在数据从数据源传输到中间表的过程中指示转换操作的Transformer配置信息;
可选的,所述对于增量数据采集,根据增量更新字段,将源数据库的增量数据采集到中间库的中间表包括:
在所述Writer配置信息中设置写入模式的属性为更新,以指示对中间表中已存在的记录进行数据更新,以及,在所述Writer配置信息中,指定字段信息以表示中间表中属于增量数据的字段;
根据所述Writer配置信息中设置的更新的写入模式,以及属于增量数据的字段信息,将源数据库的增量数据采集到中间库的中间表。
可选的,所述根据数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表包括:
通过具有数据拉取过程、数据整合过程、数据过滤过程、以及数据插入过程的数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表;
其中,数据拉取过程通过SELECT子句实现,用于选择和拉取中间表的数据;数据整合过程通过JOIN子句实现,用于通过关联条件将多个中间表中拉取的数据进行合并;数据过滤过程通过WHERE子句实现,用于从数据整合过程合并的数据中选择满足筛选条件的记录,以实现数据过滤;数据插入过程通过INSERT INTO子句实现,用于指定目标表及字段。
可选的,所述利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表包括:
在利用数据集成语句从中间库拉取数据到目标库时,通过具有检测冲突过程、执行更新过程的增量更新子句,从中间库的中间表拉取增量数据到目标库的目标表;
其中,检测冲突过程用于在尝试将中间表的记录插入目标表时,如果记录的业务主键已存在于目标表,则触发冲突;执行更新过程用于当冲突发生时,针对冲突记录,使用中间表中的增量数据替换目标表中冲突记录的旧数据。
在可选实现中,数据集成语句可以包括全量数据采集语句,带有增量更新子句的数据集成语句可以包括增量数据采集语句;其中,全量数据采集语句以及增量数据采集语句,根据全量匹配得到的映射信息,以及实时匹配得到的ER关系确定;全量匹配用于生成源数据库中的表和目标表之间的映射信息,实时匹配用于针对全量匹配未能覆盖的表,生成表之间的ER关系;
进一步的,所述映射信息匹配INSERT INTO子句中指定的目标表及字段;所述ER关系匹配JOIN子句中的关联条件。
可选的,所述数据集成语句至少根据映射信息生成,该映射信息为源数据库的表数据结构中的字段,与企业标准模型中实体相应字段的映射信息。
可选的,结合图7所示,该装置还可以包括智能匹配模块740,用于配置源数据库的连接信息,并导入源数据库关键的数据表;使用智能匹配算法分析源数据库关键的数据表中的字段,并将字段与企业标准模型中的相应实体进行匹配;根据匹配结果,推荐源数据库的表数据结构中的字段,与企业标准模型中实体相应字段的映射信息;
或者,将数据从源数据库拉取到中间库后,对拉取到中间库的数据进行ER匹配,得到推荐的ER关系;在推荐的ER关系被用户确认后,基于ER关系优化数据模型、确定主表、并生成多对一的数据集成语句;
或者,整合映射信息和ER关系,构建数据集成语句。
可选的,结合图7所示,该装置还可以包括实时数据采集模块750,用于实时监控源数据库的变更日志,以实时捕获源数据库的数据变更事件;将数据变更事件格式化为消息,并实时通过Kafka的源连接器发送到Kafka的指定主题;通过Kafka的下游系统或下游服务消费指定主题的消息,且根据消费的消息,实时将消息对应的变更数据应用到中间库或目标库;其中,Kafka的下游系统或下游服务使用Kafka的汇聚连接器从Kafka中订阅并消费指定主题的消息。
可选的,结合图7所示,该装置还可以包括:源文件系统数据集成模块760,和/或,API数据集成模块770,和/或,MQ数据集成模块780;
其中,源文件系统数据集成模块760执行基于源文件系统的数据集成;API数据集成模块770执行基于源网络API接口的数据集成;MQ数据集成模块780执行基于源MQ的数据集成。
可选的,所述执行基于源文件系统的数据集成包括:
对于源文件系统的源文件的全量导入,通过用户配置选择需导入的源文件,以及配置源文件的数据页内区域与中间表或目标表的逻辑实体字段之间的映射关系;解析源文件的页内容,根据该映射关系生成SQL语句,并执行SQL语句,以清空中间表或目标表中的现有数据,并将源文件中的所有数据插入到中间表或目标表中;
对于源文件的追加导入,复用所述用户配置;检查中间表或目标表中已有的数据,将源文件的追加数据根据用户配置的映射关系追加到中间表或目标表中,以在中间表或目标表中已存在的记录更新数据。
可选的,所述执行基于源网络API接口的数据集成包括:
配置源网络API接口,在基于配置的源网络API接口的数据预览阶段,定义源网络API接口的数据映射关系;并且,根据源网络API接口的数据映射关系,创建相应的中间表;其中,定义源网络API接口的数据映射关系包括定义数据路径和配置主键字段,定义数据路径为定义源网络API接口返回的数据结构中被使用的部分;
调用源网络API接口,并处理源网络API接口的网络请求,以从源网络API接口获取数据路径对应的数据;根据从源网络API接口获取的数据,生成JSON脚本,以将获取的数据加载到创建的中间表中;其中,在数据加载到中间表的过程中,采用数据映射关系配置的主键字段,处理数据重复或数据冲突。
可选的,所述执行基于源MQ的数据集成包括:
配置MQ实例任务,并提供MQ实例任务的数据预览功能,以便查看从监听的MQ中读取的消息数据;
配置JDBC汇聚连接器,以指定中间库的连接信息;以及,将从MQ中读取的消息数据写入中间库;
其中,MQ实例任务连接MQ源连接器,MQ源连接器用于从指定要监听的MQ中读取消息数据,并将读取的消息数据推送到指定主题相应的Kafka消息队列,并且如果消息数据包含列表形式的数据,则列表形式的数据被格式为JDBC汇聚连接器接受的格式,以便JDBC汇聚连接器从Kafka的指定主题对应的Kafka消息队列中读取数据,并写入中间库。
在进一步的可选实现中,本申请实施例还提供一种服务器,例如数据集成平台对应的服务器设备,包括至少一个存储器和至少一个处理器;其中,存储器存储计算机可执行指令,处理器调用所述计算机可执行指令,以执行如本申请实施例提供的数据集成方法。
在进一步的可选实现中,本申请实施例还提供一种存储介质,该存储介质存储计算机可执行指令,该计算机可执行指令被执行时,实现如本申请实施例提供的数据集成方法。
在进一步的可选实现中,本申请实施例还提供一种计算机程序产品,该计算机程序产品包括计算机可执行指令,该计算机可执行指令被执行时,实现如本申请实施例提供的数据集成方法。
上文描述了本申请实施例提供的多个实施例方案,各实施例方案介绍的各可选方式可在不冲突的情况下相互结合、交叉引用,从而延伸出多种可能的实施例方案,这些均可认为是本申请实施例披露、公开的实施例方案。
虽然本申请实施例披露如上,但本申请并非限定于此。任何本领域技术人员,在不脱离本申请的精神和范围内,均可作各种更动与修改,因此本申请的保护范围应当以权利要求所限定的范围为准。
Claims (10)
1.一种数据集成方法,其特征在于,以智能匹配与自动化映射方式,至少实现数据从数据源到中间库再到目标库的集成;智能匹配分为全量匹配和实时匹配;全量匹配在数据采集前进行,用于生成源数据库中的表和目标表之间的映射信息,包括表的映射关系和字段的映射信息;实时匹配针对全量匹配未能覆盖的表,生成表之间的ER关系,支持多表关联语句的生成;
所述方法包括:
整合全量匹配得到的映射信息,以及实时匹配得到的ER关系,生成数据集成语句,以在将数据从源数据库转移到中间库之后,从中间库转移到企业标准模型的目标库;所述数据集成语句分为全量数据采集语句以及增量数据采集语句,增量数据采集语句为带有增量更新子句的数据集成语句;
在配置源数据库的连接信息后,向源数据库发送主动查询请求,以查询源数据库是否存在数据记录或数据更新;
在源数据库存在数据记录时,通过全量数据采集,将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;全量数据采集涉及复制所有数据;所述中间库为数据集成平台的中间存储系统对应的数据库,目标库为数据集成平台的采用企业标准模型的数据库;
在源数据库存在数据更新时,通过增量数据采集,将源数据库的增量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;增量数据采集将数据记录已存在于中间表但数据内容发生变化的数据,从源数据库迁移到中间表;
其中,对于全量数据采集,将源数据库的全量数据采集到中间库的中间表,再根据全量数据采集语句,从中间库的中间表拉取全量数据到目标库的目标表;对于增量数据采集,根据增量更新字段,将源数据库的增量数据采集到中间库的中间表,再利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表;数据从中间表拉取到目标表的过程还利用数据资产平台的元数据信息以及页面配置的映射信息进行数据类型转换和字段映射。
2.根据权利要求1所述的方法,其特征在于,所述对于全量数据采集,将源数据库的全量数据采集到中间库的中间表包括:
通过配置适配源数据库的Reader插件和中间库的Writer插件,以及,通过传递JSON配置,将数据从源数据库采集到中间表;
其中,JSON配置至少包括:用于配置从源数据库读取数据的Reader配置信息,用于配置将数据写入中间库的Writer配置信息,用于在数据从数据源传输到中间表的过程中指示转换操作的Transformer配置信息;
所述对于增量数据采集,根据增量更新字段,将源数据库的增量数据采集到中间库的中间表包括:
在所述Writer配置信息中设置写入模式的属性为更新,以指示对中间表中已存在的记录进行数据更新,以及,在所述Writer配置信息中,指定字段信息以表示中间表中属于增量数据的字段;
根据所述Writer配置信息中设置的更新的写入模式,以及属于增量数据的字段信息,将源数据库的增量数据采集到中间库的中间表。
3.根据权利要求1所述的方法,其特征在于,所述根据全量数据采集语句,从中间库的中间表拉取全量数据到目标库的目标表包括:
通过具有数据拉取过程、数据整合过程、数据过滤过程、以及数据插入过程的数据集成语句,从中间库的中间表拉取全量数据到目标库的目标表;
其中,数据拉取过程通过SELECT子句实现,用于选择和拉取中间表的数据;数据整合过程通过JOIN子句实现,用于通过关联条件将多个中间表中拉取的数据进行合并;数据过滤过程通过WHERE子句实现,用于从数据整合过程合并的数据中选择满足筛选条件的记录,以实现数据过滤;数据插入过程通过INSERT INTO子句实现,用于指定目标表及字段;
所述利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表包括:
在利用数据集成语句从中间库拉取数据到目标库时,通过具有检测冲突过程、执行更新过程的增量更新子句,从中间库的中间表拉取增量数据到目标库的目标表;
其中,检测冲突过程用于在尝试将中间表的记录插入目标表时,如果记录的业务主键已存在于目标表,则触发冲突;执行更新过程用于当冲突发生时,针对冲突记录,使用中间表中的增量数据替换目标表中冲突记录的旧数据。
4.根据权利要求3所述的方法,其特征在于,所述全量匹配得到的映射信息匹配INSERTINTO子句中指定的目标表及字段;所述ER关系匹配JOIN子句中的关联条件。
5.根据权利要求1-3任一项所述的方法,其特征在于,所述方法还包括:
配置源数据库的连接信息,并导入源数据库关键的数据表;使用智能匹配算法分析源数据库关键的数据表中的字段,并将字段与企业标准模型中的相应实体进行匹配;根据匹配结果,推荐源数据库的表数据结构中的字段,与企业标准模型中实体相应字段的映射信息;
或者,将数据从源数据库拉取到中间库后,对拉取到中间库的数据进行ER匹配,得到推荐的ER关系;在推荐的ER关系被用户确认后,基于ER关系优化数据模型、确定主表、并生成多对一的数据集成语句。
6.根据权利要求1-3任一项所述的方法,其特征在于,还包括:
实时监控源数据库的变更日志,以实时捕获源数据库的数据变更事件;将数据变更事件格式化为消息,并实时通过Kafka的源连接器发送到Kafka的指定主题;通过Kafka的下游系统或下游服务消费指定主题的消息,且根据消费的消息,实时将消息对应的变更数据应用到中间库或目标库;其中,Kafka的下游系统或下游服务使用Kafka的汇聚连接器从Kafka中订阅并消费指定主题的消息;
和/或,执行基于源文件系统的数据集成,和/或,执行基于源网络API接口的数据集成,和/或,执行基于源MQ的数据集成;
其中,所述执行基于源文件系统的数据集成包括:
对于源文件系统的源文件的全量导入,通过用户配置选择需导入的源文件,以及配置源文件的数据页内区域与中间表或目标表的逻辑实体字段之间的映射关系;解析源文件的页内容,根据该映射关系生成SQL语句,并执行SQL语句,以清空中间表或目标表中的现有数据,并将源文件中的所有数据插入到中间表或目标表中;
对于源文件的追加导入,复用所述用户配置;检查中间表或目标表中已有的数据,将源文件的追加数据根据用户配置的映射关系,追加到中间表或目标表中,以在中间表或目标表中已存在的记录更新数据;
所述执行基于源网络API接口的数据集成包括:
配置源网络API接口,在基于配置的源网络API接口的数据预览阶段,定义源网络API接口的数据映射关系;并且,根据源网络API接口的数据映射关系,创建相应的中间表;其中,定义源网络API接口的数据映射关系包括定义数据路径和配置主键字段,定义数据路径为定义源网络API接口返回的数据结构中被使用的部分;
调用源网络API接口,并处理源网络API接口的网络请求,以从源网络API接口获取数据路径对应的数据;根据从源网络API接口获取的数据,生成JSON脚本,以将获取的数据加载到创建的中间表中;其中,在数据加载到中间表的过程中,采用数据映射关系配置的主键字段,处理数据重复或数据冲突;
所述执行基于源MQ的数据集成包括:
配置MQ实例任务,并提供MQ实例任务的数据预览功能,以便查看从监听的MQ中读取的消息数据;
配置JDBC汇聚连接器,以指定中间库的连接信息;以及,将从MQ中读取的消息数据写入中间库;
其中,MQ实例任务连接MQ源连接器,MQ源连接器用于从指定要监听的MQ中读取消息数据,并将读取的消息数据推送到指定主题相应的Kafka消息队列,并且如果消息数据包含列表形式的数据,则列表形式的数据被格式为JDBC汇聚连接器接受的格式,以便JDBC汇聚连接器从Kafka的指定主题对应的Kafka消息队列中读取数据,并写入中间库。
7.一种数据集成装置,其特征在于,以智能匹配与自动化映射方式,至少实现数据从数据源到中间库再到目标库的集成;智能匹配分为全量匹配和实时匹配;全量匹配在数据采集前进行,用于生成源数据库中的表和目标表之间的映射信息,包括表的映射关系和字段的映射信息;实时匹配针对全量匹配未能覆盖的表,生成表之间的ER关系,支持多表关联语句的生成;
所述装置用于,整合全量匹配得到的映射信息,以及实时匹配得到的ER关系,生成数据集成语句,以在将数据从源数据库转移到中间库之后,从中间库转移到企业标准模型的目标库;所述数据集成语句分为全量数据采集语句以及增量数据采集语句,增量数据采集语句为带有增量更新子句的数据集成语句;
所述装置包括:
查询请求发送模块,用于在配置源数据库的连接信息后,向源数据库发送主动查询请求,以查询源数据库是否存在数据记录或数据更新;
全量数据集成模块,用于在源数据库存在数据记录时,通过全量数据采集,将源数据库的全量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;全量数据采集涉及复制所有数据;所述中间库为数据集成平台的中间存储系统对应的数据库,目标库为数据集成平台的采用企业标准模型的数据库;
增量数据集成模块,用于在源数据库存在数据更新时,通过增量数据采集,将源数据库的增量数据拉取到中间库的中间表,并从中间表拉取到目标库的目标表;增量数据采集将数据记录已存在于中间表但数据内容发生变化的数据,从源数据库迁移到中间表;
其中,对于全量数据采集,全量数据集成模块将源数据库的全量数据采集到中间库的中间表,再根据全量数据采集语句,从中间库的中间表拉取全量数据到目标库的目标表;对于增量数据采集,增量数据集成模块根据增量更新字段,将源数据库的增量数据采集到中间库的中间表,再利用带有增量更新子句的数据集成语句,从中间库的中间表拉取增量数据到目标库的目标表;数据从中间表拉取到目标表的过程还利用数据资产平台的元数据信息以及页面配置的映射信息进行数据类型转换和字段映射。
8.一种服务器,其特征在于,包括:至少一个存储器和至少一个处理器,所述存储器存储计算机可执行指令,所述处理器调用所述计算机可执行指令,以执行如权利要求1-6任一项所述的数据集成方法。
9.一种存储介质,其特征在于,所述存储介质存储计算机可执行指令,所述计算机可执行指令被执行时,实现如权利要求1-6任一项所述的数据集成方法。
10.一种计算机程序产品,其特征在于,包括计算机可执行指令,所述计算机可执行指令被执行时,实现如权利要求1-6任一项所述的数据集成方法。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202411095117.2A CN118626496B (zh) | 2024-08-12 | 2024-08-12 | 数据集成方法、装置、服务器、介质及程序 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202411095117.2A CN118626496B (zh) | 2024-08-12 | 2024-08-12 | 数据集成方法、装置、服务器、介质及程序 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN118626496A CN118626496A (zh) | 2024-09-10 |
CN118626496B true CN118626496B (zh) | 2024-12-03 |
Family
ID=92596263
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202411095117.2A Active CN118626496B (zh) | 2024-08-12 | 2024-08-12 | 数据集成方法、装置、服务器、介质及程序 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN118626496B (zh) |
Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN108959611A (zh) * | 2018-07-16 | 2018-12-07 | 中国联合网络通信集团有限公司 | 数据割接方法及装置 |
CN111125260A (zh) * | 2020-01-21 | 2020-05-08 | 重庆文理学院 | 一种基于SQL Server的数据同步方法及系统 |
Family Cites Families (7)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN112632035A (zh) * | 2020-12-24 | 2021-04-09 | 广州辰创科技发展有限公司 | 面向自主可控数据库迁移方法及存储介质 |
CN114036119A (zh) * | 2021-09-30 | 2022-02-11 | 河海大学 | 一种基于kettle和数据库日志的数据同步方法 |
CN114116681B (zh) * | 2022-01-21 | 2022-07-15 | 阿里巴巴(中国)有限公司 | 数据迁移方法及装置 |
CN114625732B (zh) * | 2022-02-09 | 2023-06-20 | 杭州未名信科科技有限公司 | 基于结构化查询语言sql的查询方法和系统 |
CN115525719A (zh) * | 2022-09-29 | 2022-12-27 | 山东亿云信息技术有限公司 | 一种异构数据增量同步及断点续传的方法 |
CN115774750A (zh) * | 2022-12-28 | 2023-03-10 | 精英数智科技股份有限公司 | 数据库入湖配置方法、系统、电子设备及存储介质 |
CN117874037A (zh) * | 2023-10-23 | 2024-04-12 | 云基智慧工程股份有限公司 | 基于DataX框架支持Kafka读写方法和系统 |
-
2024
- 2024-08-12 CN CN202411095117.2A patent/CN118626496B/zh active Active
Patent Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN108959611A (zh) * | 2018-07-16 | 2018-12-07 | 中国联合网络通信集团有限公司 | 数据割接方法及装置 |
CN111125260A (zh) * | 2020-01-21 | 2020-05-08 | 重庆文理学院 | 一种基于SQL Server的数据同步方法及系统 |
Also Published As
Publication number | Publication date |
---|---|
CN118626496A (zh) | 2024-09-10 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
US11741089B1 (en) | Interactive location queries for raw machine data | |
US11663033B2 (en) | Design-time information based on run-time artifacts in a distributed computing cluster | |
US20200404045A1 (en) | Central repository of configuration files and two-way replication of search node configuration files | |
US10558651B2 (en) | Search point management | |
CN101971165B (zh) | 数据关系的图形表示 | |
US11487745B2 (en) | Workflow dependency management system | |
US11159397B2 (en) | Lower-tier application deployment for higher-tier system data monitoring | |
CN105339941B (zh) | 针对etl映射设计使用投影器和选择器组件类型 | |
US11822433B2 (en) | Qualification parameters for captain selection in a search head cluster | |
US11892976B2 (en) | Enhanced search performance using data model summaries stored in a remote data store | |
US12197394B1 (en) | Method and apparatus for efficient synchronization of search heads in a cluster using digests | |
WO2025001683A1 (zh) | 一种数据处理方法、任务调度方法、装置和存储介质 | |
US20130232158A1 (en) | Data subscription | |
US20110258007A1 (en) | Data subscription | |
US20220245091A1 (en) | Facilitating generation of data model summaries | |
US12039416B2 (en) | Facilitating machine learning using remote data | |
US20240036890A1 (en) | System and method of a modular framework for configuration and reuse of web components | |
CN118626496B (zh) | 数据集成方法、装置、服务器、介质及程序 | |
Dhanda | Big data storage and analysis | |
CN101968747A (zh) | 一种机群应用管理系统及其应用管理方法 | |
Zgolli et al. | Metadata in data lake ecosystems | |
US11829415B1 (en) | Mapping buckets and search peers to a bucket map identifier for searching | |
Konagala | Big data analytics using apache hive to analyze health data | |
Schuchardt et al. | Applying content management to automated provenance capture | |
dos Santos | Data distribution and access in a microservices architecture |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |