[go: up one dir, main page]

CN110362390B - Distributed data integration job scheduling method and device - Google Patents

Distributed data integration job scheduling method and device Download PDF

Info

Publication number
CN110362390B
CN110362390B CN201910489422.2A CN201910489422A CN110362390B CN 110362390 B CN110362390 B CN 110362390B CN 201910489422 A CN201910489422 A CN 201910489422A CN 110362390 B CN110362390 B CN 110362390B
Authority
CN
China
Prior art keywords
job
scheduling
information
module
resource
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN201910489422.2A
Other languages
Chinese (zh)
Other versions
CN110362390A (en
Inventor
李建元
刘飞黄
王超群
刘兴田
贾建涛
温晓岳
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Yinjiang Technology Co.,Ltd.
Original Assignee
Enjoyor Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Enjoyor Co Ltd filed Critical Enjoyor Co Ltd
Priority to CN201910489422.2A priority Critical patent/CN110362390B/en
Publication of CN110362390A publication Critical patent/CN110362390A/en
Application granted granted Critical
Publication of CN110362390B publication Critical patent/CN110362390B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Images

Classifications

    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/27Replication, distribution or synchronisation of data between databases or within a distributed database system; Distributed database system architectures therefor
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/48Program initiating; Program switching, e.g. by interrupt
    • G06F9/4806Task transfer initiation or dispatching
    • G06F9/4843Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
    • G06F9/485Task life-cycle, e.g. stopping, restarting, resuming execution
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/48Program initiating; Program switching, e.g. by interrupt
    • G06F9/4806Task transfer initiation or dispatching
    • G06F9/4843Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
    • G06F9/4881Scheduling strategies for dispatcher, e.g. round robin, multi-level priority queues

Landscapes

  • Engineering & Computer Science (AREA)
  • Theoretical Computer Science (AREA)
  • Software Systems (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Databases & Information Systems (AREA)
  • Computing Systems (AREA)
  • Data Mining & Analysis (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

本发明涉及一种分布式数据集成作业调度方法及装置,本发明针对数据集成可能面临的特殊场景,由作业调度装置负责将数据集成作业下发到作业运行装置,作业运行装置接收调度任务并启动作业执行,并将作业运行的状态信息并发送给反馈给作业管理模块,将工作节点计算资源反馈给资源调度模块,将失联或故障信息反馈给作业预加载模块。本发明具备如下综合特点:(1)高可用与故障容错,弱一致性;(2)面向准实时作业调度的低延迟特性;(3)面向云服务应用的多租户并发控制;(4)计算资源隔离与多作业并行调度;(5)优先调度机制。

Figure 201910489422

The invention relates to a distributed data integration job scheduling method and device. The present invention is aimed at special scenarios that data integration may face. The job scheduling device is responsible for delivering data integration jobs to a job running device, and the job running device receives the scheduling task and starts it. The job is executed, and the status information of the job running is sent to the job management module, and the computing resources of the worker nodes are fed back to the resource scheduling module, and the disconnection or failure information is fed back to the job preloading module. The present invention has the following comprehensive features: (1) high availability and fault tolerance, weak consistency; (2) low-latency characteristics for quasi-real-time job scheduling; (3) multi-tenant concurrency control for cloud service applications; (4) computing Resource isolation and parallel scheduling of multiple jobs; (5) Priority scheduling mechanism.

Figure 201910489422

Description

Distributed data integration job scheduling method and device
Technical Field
The invention relates to the technical field of big data foundation, in particular to a distributed data integration job scheduling method and device.
Background
With the evolution of digital economy, service digitization in various industries has been fully developed, and digital service has gradually become a new center of gravity. However, since the service digitization derives a large number of data islands and becomes a common pain point for realizing the digital service, data integration is urgently needed in various industries, the data islands are opened and avoided, and data resources are integrated and managed, so that the associated value among data is effectively developed.
Data integration often faces thousands of job scheduling including job types such as data exchange and data preprocessing, and the design of a scheduling system needs to consider various complex scenarios. For example, some scenarios not only have a large number of jobs but also require parallel processing; some scenes require quasi-real-time operation, and priority scheduling needs to be considered; some jobs need to occupy more or longer computing resources, and resource isolation needs to be considered so as not to influence other jobs; some jobs may suffer from failures such as downtime of data sources and targets, interruption of networks, downtime of running nodes and the like, and a failure fault tolerance mechanism is needed; in a multi-tenant scenario, concurrent control needs to be handled; and so on.
The prior art fails to meet the complex data integration job scheduling requirements. For example: in the traditional LTS scheduling system, the operation is isolated based on the thread, if the execution thread of one operation exhausts all the memories of the current process, all the operations in the process are abnormal, the capability of scheduling data integration operation is lacked, and the traditional LTS scheduling system is more suitable for scheduling light-weight tasks. Chinese patent CN201610800080 discloses a distributed task scheduling system and method for solving the problems of large code writing amount and heavy development task of developers in a parallel computing program development mode, which is essentially scheduling for a single large-scale distributed computing job without considering multi-task parallel scheduling. Chinese patent CN201610197298 discloses a task scheduling method, apparatus and system, which provides a multi-channel multi-task distributed scheduling method, solves the problem of starvation of other jobs caused by too long scheduling time of a single task, but does not consider the problems of low latency of job metadata access, how to ensure job metadata consistency, and the like. Chinese patent No. CN201410748604 of the invention discloses a distributed task scheduling system and method, which provides a distributed task scheduling system and method for ensuring the reliability of the system itself, supporting independent or associated tasks, and supporting task rollback distribution, but is not suitable for complex data integration scenarios where rollback of tasks is not a key point, association does not need to occur between data integration tasks, and does not consider the problem that downtime under large-scale complex data integration tasks is high, and high availability needs to be achieved, and the problems of how to reduce delay as much as possible, how to ensure consistency of operation metadata, and the like under the premise that a quasi-real-time task exists.
Disclosure of Invention
Aiming at special scenes possibly faced by data integration, the invention is characterized in that an operation scheduling device is responsible for sending data integration operation to an operation running device, the operation running device receives scheduling tasks and starts operation execution, and sends operation running state information to an operation management module, feeds back working node calculation resources to a resource scheduling module, and feeds back loss connection or fault information to an operation preloading module; the invention has the following comprehensive characteristics: (1) high availability, fault tolerance, weak consistency; (2) the low-delay characteristic facing the quasi-real-time job scheduling; (3) multi-tenant concurrency control facing to cloud service application; (4) computing resource isolation and multi-job parallel scheduling; (5) a priority scheduling mechanism.
The invention achieves the aim through the following technical scheme: a distributed data integration job scheduling method comprises the following steps:
(1) the job scheduling device issues the data integration job to the job running device, wherein the job scheduling device comprises a job management module, a job preloading module and a resource scheduling module: (1.1) the operation management module receives, caches and stores operation related meta information to perform concurrency control;
(1.2) the job preloading module acquires the jobs to be processed from the job management module and determines the scheduling priority sequence;
(1.3) the resource scheduling module completes resource allocation and scheduling distribution by acquiring the job preloading information and the calculation resource information of the job running device;
(2) the operation running device receives the scheduling task and starts operation execution, and feeds back the operation running state information to the operation management module, feeds back the working node calculation resources to the resource scheduling module, and feeds back the loss connection or fault information to the operation preloading module.
Preferably, the job management module includes an information receiving unit, an information caching unit, a persistent storage unit, and a concurrency control unit, wherein the specific operations are as follows:
(i) the information receiving unit receives job submission, job meta-information modification and scheduling strategy updating; receiving unallocated resource job information fed back by the resource scheduling module, and updating job states; receiving operation state information fed back by the operation running device and updating the operation state;
(ii) the information caching unit is used for locally caching the operation meta information and the state information and supporting frequent real-time query;
(iii) the persistent storage unit maintains the data consistency of the cache layer and the storage layer according to the metadata information of the persistent operation of the cache content;
(iv) the concurrency control unit assigns a read-write lock to access of each of the job resources.
Preferably, in step (iii), the data consistency between the cache layer and the storage layer is maintained by the following method:
(a) writing updated data into a local file by using fault-tolerant storage, and writing the updated data into a storage layer after the network is recovered to be normal;
(b) the fuse is used, when the fault-tolerant mechanism is triggered and reaches a preset threshold value, the fuse is disconnected, the service performs degradation processing, and a new task is not scheduled;
(c) and encapsulating the operation state interface of the operation running device, acquiring the operation running state and auditing the operation running state when the operation scheduling node is started every time, and ensuring that the operation state in the operation running device is consistent with the state in the metadata storage layer.
Preferably, the job preloading module includes a real-time query unit, a job preloading unit, and a fault processing unit, wherein the specific operations are as follows:
(I) the real-time query unit queries the job metadata cache in real time to acquire the unlocked job to be scheduled;
(II) adding the unlocked job to be scheduled into a bounded ordered queue by a job preloading unit, and sequencing according to job scheduling time and job priority;
(III) the fault processing unit receives fault information from the operation device and carries out fault tolerance processing; the fault tolerance processing means that a working node is down or a long-time network is disconnected in the operation process, the operation running device informs the operation preloading module, the operation preloading module is connected with the working node to judge whether the connection is unavailable, if the connection is not available, the operation is directly put into a queue, and the lost operation is finally dispatched to other available nodes; if the unconnected node recovers again at the moment, the operation running device can directly kill the operation process, so that the same operation cannot be simultaneously run on two nodes under one operation running device.
Preferably, the bounded ordered blocking queue is used for loading all jobs to be scheduled, wherein bounded means that the upper limit of the number of jobs is guaranteed, and an upper limit parameter can be given through scene evaluation; the order refers to that the job with earlier trigger time and higher priority is placed at the position at the front of the queue for preferential scheduling; in the process of stopping deleting the job, supporting to remove the specified job to be scheduled from the queue; with the producer-consumer model, the CPU burden is reduced using a thread blocking-wakeup approach.
Preferably, the resource scheduling module includes a resource obtaining unit, a resource allocating unit, and a scheduling distributing unit, wherein the specific operations are as follows:
1) the resource obtaining unit obtains the computing resources of the operation cluster and caches the computing resources in the memory;
2) the resource allocation unit acquires all the jobs from the bounded ordered queue and allocates computing resources to each job according to the job priority order;
3) and the scheduling and distributing unit appoints an actuator for the scheduled job and sends the job meta-information, the distributed computing resources and the actuator configuration to the job running cluster.
Preferably, in the step (2), the job running device includes a main control node and a work node, the main control node is responsible for management and coordination, and the work node is responsible for executing the data integration job; the master control node receives the job meta-information, the job resource allocation information and the job executor information which are distributed by the resource scheduling module and starts the job execution; the agent program on the working node collects the state information of the operation and sends the state information to the main control node, and the main control node sends the state information to the operation management module; the agent program on the working node collects the working node computing resources and sends the working node computing resources to the main control node, and the main control node sends the working node computing resources to the resource scheduling module; the agent program on the working node sends heartbeat information to the main control node, and the main control node sends loss of connection or fault information to the operation preloading module; the executor on the working node is provided with a retry mechanism, and once the data flow source or the data flow target goes down or loses connection, the executor carries out timing retry to ensure that the data flow source and the data flow target can continue to normally operate after recovery.
Preferably, the operation of the job running device is based on a meso cluster system to perform distributed system resource management, a master control node provides low-delay local metadata management in a RAM + WAL log mode, a PAXOS algorithm is adopted to maintain job state synchronization of a large number of working nodes, and specific physical resources are pushed to a job scheduling system based on a cluster physical resource unified management interface and a specific resource sharing strategy of the master control node; the job operation cluster provides two modes of a multi-language driver package and JSON RPC for the registration of the job scheduling system and the acquisition of a specific callback event; the agent program is responsible for collecting the resources of the working nodes, running a specific scheduling task through the actuator and returning the execution result and the task state of the actuator to the master control node; and then the main control node forwards the data to the job scheduling device.
A distributed data integrated job scheduling apparatus, comprising: a job scheduling device and a job running device; the job scheduling device and the job running device perform information interaction with each other; the job scheduling device comprises a job management module, a job pre-recording module and a resource scheduling module; the operation management module is used for receiving, caching and storing operation related meta information and performing concurrency control; the job preloading module is used for acquiring the jobs to be processed from the job management module and determining the scheduling priority sequence; the resource scheduling module is used for completing resource allocation and scheduling distribution by acquiring the job preloading information and the calculation resource information of the job running device; the operation running device comprises a main control node and a working node, wherein the main control node is responsible for management and coordination, and the working node is responsible for executing data integration operation.
Preferably, the job scheduling device and the job running device are both registered in the ZooKeeper, the job scheduling device adopts a master-slave mode, and once the master device goes down, the ZooKeeper selects the backup device and replaces the job scheduling work; the main control nodes in the operation running device adopt a main standby mode, and once the main control nodes are down, the ZooKeeper elects the standby main control nodes to take over the management coordination work.
Preferably, the job scheduling device performs auditing and maintenance based on job status information fed back by the job running device, and the job metadata database caches and maintains consistency based on job metadata of the job scheduling device; when the job scheduling device fails, the standby job scheduling device needs to interact with the job metadata database once taking over the work, a job metadata cache mechanism is rebuilt, and the metadata cache information is audited and maintained by receiving job state feedback information from the job running device, so that the metadata cache information is kept consistent in a distribution system.
The invention has the beneficial effects that: (1) according to CAP theorem, the method of the invention meets two indexes of high availability and fault tolerance, and adopts a mechanism for ensuring the consistency of operation metadata as much as possible; (2) the multi-tenant concurrency control is realized based on the distributed read-write lock, and the multi-tenant data integration service is provided in a cloud service mode; (3) aiming at the particularity that the data integration operation needs frequent scheduling, the operation metadata adopts a cache mechanism, so that the delay and interruption risks caused by frequent metadata access can be effectively reduced.
Drawings
FIG. 1 is a schematic flow diagram of the apparatus of the present invention;
FIG. 2 is a schematic diagram of the high availability mechanism of the apparatus of the present invention;
FIG. 3 is a schematic flow diagram of the method of the present invention;
FIG. 4 is a schematic flow diagram of a job management module of the present invention;
FIG. 5 is a schematic representation of the operation of the job management module of the present invention;
FIG. 6 is a process flow diagram of a job preloading module of the present invention;
FIG. 7 is a schematic flow diagram of a resource scheduling module of the present invention;
fig. 8 is a schematic view showing an operation flow of the work running apparatus of the present invention.
Detailed Description
The invention will be further described with reference to specific examples, but the scope of the invention is not limited thereto:
example (b): as shown in fig. 1, a distributed data-integrated job scheduling apparatus is composed of a job scheduling apparatus and a job execution apparatus. The job scheduling device and the job running device perform information interaction with each other; the job scheduling device comprises a job management module, a job pre-recording module and a resource scheduling module; the operation management module is used for receiving, caching and storing operation related meta information and performing concurrency control; the job preloading module is used for acquiring the jobs to be processed from the job management module and determining the scheduling priority sequence; the resource scheduling module is used for completing resource allocation and scheduling distribution by acquiring the job preloading information and the calculation resource information of the job running device; the operation running device comprises a main control node and a working node, wherein the main control node is responsible for management and coordination, and the working node is responsible for executing data integration operation.
As shown in fig. 2, both the job scheduling device and the job running device are registered in the ZooKeeper, the job scheduling device adopts a master-slave mode, and once the master device goes down, the ZooKeeper selects the backup device and replaces the job scheduling work; the main control nodes in the operation running device adopt a main standby mode, and once the main control nodes are down, the ZooKeeper elects the standby main control nodes to take over the management coordination work.
The job scheduling device audits and maintains based on job state information fed back by the job running device, and the job metadata base caches and maintains consistency based on job metadata of the job scheduling device; when the job scheduling device fails, the standby job scheduling device needs to interact with a job metadata database once taking over work, a job metadata cache mechanism is rebuilt, and the metadata cache information is audited and maintained by receiving job state feedback information from the job running device so as to keep the metadata cache information consistent in a distribution system; thereby ensuring weak consistency.
As shown in fig. 3, a distributed data integration job scheduling method includes the following steps:
s100: the job scheduling device issues the data integration job to the job running device, and the job scheduling device consists of a job management module, a job preloading module and a resource scheduling module, and specifically comprises the following parts:
s101: and the operation management module receives, caches and stores the operation related meta information and performs concurrency control. The job management module consists of an information receiving unit, a storage processing unit and a concurrency control unit. As shown in fig. 4, the specific operations are as follows:
(1) the information receiving unit S101-1 is responsible for receiving job submission, job meta-information modification and scheduling policy update; receiving unallocated resource job information fed back by the resource scheduling module, and updating job states; receiving operation state information fed back by the operation running device and updating the operation state;
(2) the information caching unit S101-2 is responsible for locally caching the operation meta information and the state information and supporting frequent real-time query;
(3) the persistent storage unit S101-3 maintains the data consistency of the cache layer and the storage layer according to the metadata information of the persistent job of the cache content;
(4) the concurrency control unit S101-4 is responsible for assigning a read-write lock to access of each job resource.
Specifically, as shown in fig. 5, the job management module of the present invention is responsible for receiving jobs and maintaining job state machines, and the main job states may include: the method comprises the following steps of not starting operation, waiting to be scheduled operation, suspending operation, running operation, stopping operation, abnormal operation and finishing operation. The job management module provides job state operation interfaces, such as operation interfaces for stopping running jobs, suspending jobs, scheduling jobs, suspending jobs, normally stopping jobs, abnormally stopping jobs, and the like. Maintaining various job scheduling policies, such as: repeated operation, timed operation, Cron operation, disposable operation, etc. And a history storage module is internally maintained and used for recording all scheduling histories. And adding a read-write lock to the operation of the cache layer and the metadata persistence layer to realize concurrency control: if there are concurrent threads that are write operations, the lock is upgraded to an exclusive lock and other threads cannot seize the lock. Conversely, if the concurrent thread is a read operation, the lock is upgraded to a shared lock and other threads can concurrently seize the lock. The operation management module adds a cache layer on an operation metadata storage layer, ensures frequent metadata query and call, supports frequent access and frequent scheduling of quasi-real-time data integration tasks, abstracts the cache layer into an SPI interface on an implementation layer, and supports the implementation of cache layer interfaces such as Caffeine, JDK, Guava, Redis and the like. The persistence layer is abstracted into an SPI interface on the implementation level and supports databases such as relational databases, MongoDB databases and the like. Because the job data may cause the problem of data inconsistency due to network and other instability factors when written into the persistence layer, in the implementation level, the job management module adopts "triple insurance" to ensure the metadata consistency as much as possible: (1) writing updated data into a local database/file by using fault-tolerant storage, and writing the updated data into a storage layer after the network is recovered to be normal; (2) the fuse is used, when the fault-tolerant mechanism is triggered and reaches a certain threshold value, the fuse is disconnected, the service performs degradation processing, and a new task is not scheduled; (3) and encapsulating an operation state interface of the operation running system, and acquiring the operation running state for auditing when the operation scheduling node is started every time, so as to ensure that the operation state in the operation running system is consistent with the state in the metadata storage layer.
S102, as shown in figure 6, the job preloading module acquires the job to be processed from the job management module and determines the scheduling priority sequence, wherein the job preloading module consists of a real-time query unit, a job preloading unit and a fault processing unit. The method comprises the following specific operations:
(1) the real-time query unit S102-1 is responsible for querying job metadata cache in real time and acquiring unlocked jobs to be scheduled;
(2) the job preloading unit S102-2 is responsible for adding the unlocked job to be scheduled into the bounded ordered queue and sorting the job according to the job scheduling time and the job priority;
(3) failure processing unit S102-3: and the system is responsible for receiving fault information from the operation cluster and performing fault tolerance processing.
Specifically, a bounded ordered blocking queue is built to load all jobs to be scheduled. The bounded state refers to the condition that the upper limit of the number of the jobs is guaranteed, and upper limit parameters can be given through scene evaluation; in order means that jobs with earlier trigger times and higher priority levels will be placed in the queue front position for priority scheduling. And in the process of stopping deleting the job, supporting to remove the specified job to be scheduled from the queue. With the producer-consumer model, the CPU burden is reduced using a thread blocking-wakeup approach.
The fault tolerance processing means that in the operation process of the operation, a working node is down or a long-time network is disconnected, the operation running device informs the operation preloading module, the operation preloading module is connected with the working node to judge whether the connection is unavailable, if the connection is not available, the operation is directly put into a queue, and the lost operation is finally scheduled to other available nodes. If the disconnected node is recovered again, the operation running device can directly kill the operation process to ensure that the same operation cannot run on two nodes simultaneously under one operation running device.
And S103, the resource scheduling module finishes resource allocation and scheduling distribution by acquiring the job preloading information and the calculation resource information of the job running device. The resource scheduling module consists of a resource acquisition unit, a resource allocation unit and a scheduling and distributing unit; as shown in fig. 7:
(1) the resource obtaining unit S103-1 is responsible for obtaining the computing resources of the job running cluster and caching the computing resources in the memory;
(2) the resource allocation unit S103-2 is responsible for acquiring all the jobs from the bounded ordered queue and allocating computing resources to each job according to the job priority order;
(3) the scheduling and distributing unit S103-3 is responsible for assigning an executor to the scheduled job and sending the job meta-information, the allocated computing resource and the executor configuration to the job running cluster.
The implementation of the Executor may be Linux container Executor, Docker Executor, or other executors, and these container executors can implement the isolation of the computing resources.
And S200, the job running device receives the scheduling task and starts job execution, sends the state information of job running to the job management module, feeds back the working node calculation resources to the resource scheduling module, and feeds back the loss of connection or fault information to the job preloading module.
The operation running device is provided with a main control node and a working node, the main control node is responsible for management and coordination, and the working node is responsible for executing data integration operation. The master control node receives job meta-information, job resource allocation information and job executor information distributed by the resource scheduling module and starts job execution; the agent program on the working node collects the state information of the operation and sends the state information to the main control node, and the main control node sends the state information to the operation management module; the agent program on the working node collects the working node computing resources and sends the working node computing resources to the main control node, and the main control node sends the working node computing resources to the resource scheduling module; and the agent program on the working node sends heartbeat information to the main control node, and the main control node sends loss of connection or fault information to the operation preloading module. The executor on the working node is provided with a retry mechanism, and once the data flow source or the data flow target goes down or loses connection, the executor carries out timing retry to ensure that the data flow source and the data flow target can continue to normally operate after being recovered.
As shown in fig. 8, the job running apparatus may perform distributed system resource management based on a meso cluster system, where the master node provides low-latency local metadata management in a RAM + WAL log manner, maintains job state synchronization of a large number of working nodes by using a PAXOS algorithm, and pushes specific physical resources to a job scheduling system based on a cluster physical resource unified management interface and a specific resource sharing policy of the master node. The job operation cluster provides two modes of a multi-language driver package and JSON RPC for the registration of the job scheduling system and the acquisition of a specific callback event; the agent program is responsible for collecting the resources of the working nodes, running a specific scheduling task through the actuator and returning the execution result and the task state of the actuator to the master control node. And then the main control node forwards the data to the job scheduling device.
While the invention has been described in connection with specific embodiments thereof, it will be understood by those skilled in the art that various changes in form and details may be made therein without departing from the spirit and scope of the invention as defined by the appended claims.

Claims (8)

1.一种分布式数据集成作业调度方法,其特征在于,包括如下步骤:(1)作业调度装置将数据集成作业下发至作业运行装置,其中所述作业调度装置包括作业管理模块、作业预加载模块、资源调度模块:1. A distributed data integration job scheduling method, characterized in that it comprises the following steps: (1) a job scheduling device issues a data integration job to a job running device, wherein the job scheduling device comprises a job management module, a job pre-processing Loading module, resource scheduling module: (1.1)作业管理模块接收、缓存、存储作业相关元信息,进行并发控制;所述的作业管理模块包括信息接收单元、信息缓存单元、持久存储单元、并发控制单元,其中具体操作如下:(1.1) The job management module receives, caches, and stores job-related meta-information, and performs concurrency control; the job management module includes an information receiving unit, an information caching unit, a persistent storage unit, and a concurrency control unit, and the specific operations are as follows: (1.1.1)信息接收单元接收作业提交、作业元信息修改、调度策略更新;接收资源调度模块反馈的未分配资源作业信息,并更新作业状态;接收作业运行装置反馈的作业状态信息,并更新作业状态;(1.1.1) The information receiving unit receives job submission, job meta information modification, and scheduling policy update; receives unallocated resource job information fed back by the resource scheduling module, and updates the job status; receives job status information fed back by the job running device, and updates job status; (1.1.2)信息缓存单元对作业元信息及状态信息进行本地化缓存,支撑频繁实时查询;(1.1.2) The information cache unit locally caches job meta information and status information to support frequent real-time queries; (1.1.3)持久存储单元根据缓存内容持久化作业元数据信息,维护缓存层和存储层的数据一致性;其中通过如下方法实现维护缓存层和存储层的数据一致性,具体如下:(1.1.3) The persistent storage unit maintains the data consistency between the cache layer and the storage layer by persisting the job metadata information according to the cache content; the data consistency between the cache layer and the storage layer is maintained by the following methods, as follows: (a)使用容错存储,将更新的数据写入到本地文件中,待网络恢复正常之后写入存储层;(a) Using fault-tolerant storage, write the updated data to the local file, and write to the storage layer after the network returns to normal; (b)使用熔断器,当容错机制已触发并达到预定的阈值时,熔断器断开,服务进行降级处理,不再调度新的任务;(b) Using a circuit breaker, when the fault tolerance mechanism has been triggered and reaches a predetermined threshold, the circuit breaker is disconnected, the service is degraded, and no new tasks are scheduled; (c)封装作业运行装置的作业状态接口,在每次作业调度节点启动时,获取作业运行状态并进行审计,保证作业运行装置中的作业状态与元数据存储层中的状态保持一致;(c) Encapsulating the job status interface of the job running device, obtaining the job running status and auditing each time the job scheduling node is started, to ensure that the job status in the job running device is consistent with the status in the metadata storage layer; (1.1.4)并发控制单元对每个作业资源的访问分配读写锁;(1.1.4) The concurrency control unit allocates read-write locks to the access of each job resource; (1.2)作业预加载模块向作业管理模块获取待处理作业,并确定有界有序队列和调度优先顺序;其中有界有序队列用以加载所有的待调度作业,其中,有界指的是保证作业个数上限,通过场景评估给出作业个数上限参数;有序是指触发时间早、优先级高的作业将被放置在队列靠前的位置优先调度;在作业停止删除过程中,支持从队列中移除指定的待调度作业;采用生产者-消费者模型,使用线程阻塞-唤醒方式减少CPU负担;(1.2) The job preloading module obtains the pending jobs from the job management module, and determines the bounded ordered queue and scheduling priority; the bounded ordered queue is used to load all the jobs to be scheduled, and the bounded refers to the Guarantee the upper limit of the number of jobs, and give the upper limit of the number of jobs parameters through scenario evaluation; orderly means that jobs with early trigger time and high priority will be placed at the top of the queue for priority scheduling; during the process of job stop deletion, support Remove the specified job to be scheduled from the queue; adopt the producer-consumer model and use the thread blocking-wakeup method to reduce the CPU burden; (1.3)资源调度模块通过获取作业预加载信息和作业运行装置的计算资源信息,完成资源分配和调度分发;(1.3) The resource scheduling module completes resource allocation and scheduling distribution by acquiring the job preloading information and the computing resource information of the job running device; (2)作业运行装置接收调度任务并启动作业执行,并将作业运行的状态信息并反馈给作业管理模块,将工作节点计算资源反馈给资源调度模块,将失联或故障信息反馈给作业预加载模块。(2) The job running device receives the scheduling task and starts the job execution, and feeds back the job running status information to the job management module, feeds back the computing resources of the worker nodes to the resource scheduling module, and feeds back the disconnection or failure information to the job preloading module module. 2.根据权利要求1所述的一种分布式数据集成作业调度方法,其特征在于:所述的作业预加载模块包括实时查询单元、作业预载单元和故障处理单元,其中所述步骤(1.2)具体操作如下:2. A distributed data integration job scheduling method according to claim 1, wherein the job preloading module comprises a real-time query unit, a job preloading unit and a fault processing unit, wherein the step (1.2 ) The specific operations are as follows: (I)实时查询单元实时查询作业元数据缓存,获取未上锁的待调度作业;(1) the real-time query unit queries the job metadata cache in real time, and obtains the unlocked job to be scheduled; (II)作业预载单元将未上锁的待调度作业加入到有界有序队列,并根据作业调度时间和作业优先级排序;(II) The job preloading unit adds unlocked jobs to be scheduled to a bounded ordered queue, and sorts them according to job scheduling time and job priority; (III)故障处理单元接收来自作业运行装置的故障信息,进行故障容错处理;其中,所述的故障容错处理是指在作业运行过程中发生了工作节点宕机或是长时间网络失联,作业运行装置会通知作业预加载模块,作业预加载模块会连接工作节点判断是否不可连,如果确认不可连,那么直接将作业放入队列,这个丢失作业最终会被调度到其它可用节点上;若此时不可连节点重新恢复,作业运行装置会将此作业进程直接杀死,从而保证在一个作业运行装置下不会有同一个作业在两个节点上同时运行。(III) The fault processing unit receives the fault information from the job running device, and performs fault-tolerant processing; wherein, the fault-tolerant processing refers to that the work node is down or the network is disconnected for a long time during the operation of the job. The running device will notify the job preloading module, and the job preloading module will connect to the working node to determine whether it is unconnectable. If it is confirmed that it is unconnectable, the job will be put into the queue directly, and the lost job will eventually be scheduled to other available nodes; if this When the node cannot be connected to restore, the job running device will directly kill the job process, so as to ensure that the same job will not run on two nodes at the same time under one job running device. 3.根据权利要求1所述的一种分布式数据集成作业调度方法,其特征在于:所述的资源调度模块包括资源获取单元、资源分配单元、调度分发单元,其中所述步骤(1.3)具体操作如下:3. A distributed data integration job scheduling method according to claim 1, characterized in that: the resource scheduling module comprises a resource acquisition unit, a resource allocation unit, and a scheduling distribution unit, wherein the step (1.3) specifically The operation is as follows: 1)资源获取单元获取作业运行集群的计算资源,并在内存中进行缓存;1) The resource acquisition unit acquires the computing resources of the job running cluster and caches them in memory; 2)资源分配单元从有界有序队列中获取所有作业,并按照作业优先顺序为每个作业分配计算资源;2) The resource allocation unit obtains all jobs from the bounded ordered queue, and allocates computing resources to each job according to the job priority; 3)调度分发单元为被调度的作业指定执行器,将作业元信息、分配的计算资源、执行器配置发送给作业运行集群。3) The scheduling distribution unit designates an executor for the scheduled job, and sends the job meta information, allocated computing resources, and executor configuration to the job running cluster. 4.根据权利要求1所述的一种分布式数据集成作业调度方法,其特征在于:在所述步骤(2)中,作业运行装置包括主控节点与工作节点,主控节点负责管理协调,工作节点负责执行数据集成作业;所述的主控节点接收资源调度模块分发的作业元信息、作业资源分配信息、作业执行器信息,并启动作业执行;所述工作节点上的代理程序收集作业运行的状态信息并发送给主控节点,由主控节点发送给作业管理模块;所述工作节点上的代理程序收集工作节点计算资源并发送给主控节点,并由主控节点发送给资源调度模块;所述工作节点上的代理程序发送心跳信息给主控节点,主控节点将失联或故障信息发送给作业预加载模块;所述工作节点上的执行器具有重试机制,一旦数据流转源或目标出现宕机或失联,进行定时重试以保证数据源和数据目标恢复后能够继续正常运行。4. A distributed data integration job scheduling method according to claim 1, characterized in that: in the step (2), the job running device comprises a master node and a worker node, and the master node is responsible for management and coordination, The worker node is responsible for executing the data integration job; the master control node receives job meta information, job resource allocation information, and job executor information distributed by the resource scheduling module, and starts job execution; the agent program on the worker node collects job execution The status information is sent to the master node, and the master node sends it to the job management module; the agent program on the worker node collects the computing resources of the worker node and sends it to the master node, and the master node sends it to the resource scheduling module. ; The agent program on the working node sends the heartbeat information to the master control node, and the master control node sends the disconnection or failure information to the job preloading module; The executor on the working node has a retry mechanism, once the data flows Or the target is down or disconnected, and retry is performed periodically to ensure that the data source and data target can continue to operate normally after recovery. 5.根据权利要求1所述的一种分布式数据集成作业调度方法,其特征在于:所述作业运行装置的运行基于Mesos集群系统进行分布式系统资源管理,主控节点采用RAM+WAL日志方式提供低延时的本地元数据管理,采用PAXOS算法维护工作节点的作业状态同步,基于其集群物理资源统一管理接口和特定的资源共享策略来推送具体的物理资源给作业调度系统;作业运行集群提供多语言驱动包和JSON RPC两种方式用于作业调度系统注册和获取到具体的回调事件;代理程序负责工作节点资源采集,通过执行器运行具体的调度任务,并将执行器的执行结果和任务状态返回给主控节点;再由主控节点转发给作业调度装置。5. A distributed data integration job scheduling method according to claim 1, wherein the operation of the job operation device is based on the Mesos cluster system for distributed system resource management, and the main control node adopts a RAM+WAL log mode Provides low-latency local metadata management, uses the PAXOS algorithm to maintain job status synchronization of worker nodes, and pushes specific physical resources to the job scheduling system based on its cluster physical resource unified management interface and specific resource sharing policies; the job running cluster provides The multi-language driver package and JSON RPC are used for the job scheduling system to register and obtain specific callback events; the agent program is responsible for resource collection of worker nodes, runs specific scheduling tasks through the executor, and transfers the execution results of the executor to the tasks. The status is returned to the master control node; and then forwarded to the job scheduling device by the master control node. 6.一种应用如权利要求1所述方法的分布式数据集成作业调度装置,其特征在于,包括:作业调度装置与作业运行装置;所述的作业调度装置与作业运行装置互相进行信息交互;所述的作业调度装置包括作业管理模块、作业预加载模块、资源调度模块;所述的作业管理模块用于接收、缓存、存储作业相关元信息,进行并发控制;所述的作业预加载模块用于向作业管理模块获取待处理作业,并确定调度优先顺序;所述的资源调度模块用于通过获取作业预加载信息和作业运行装置的计算资源信息,完成资源分配和调度分发;所述的作业运行装置包括主控节点与工作节点,主控节点负责管理协调,工作节点负责执行数据集成作业。6. A distributed data integration job scheduling device applying the method according to claim 1, characterized in that, comprising: a job scheduling device and a job running device; the job scheduling device and the job running device exchange information with each other; The job scheduling device includes a job management module, a job preloading module, and a resource scheduling module; the job management module is used for receiving, caching, and storing job-related meta-information, and performing concurrency control; the job preloading module is used for Obtaining the jobs to be processed from the job management module and determining the scheduling priority; the resource scheduling module is used to complete resource allocation and scheduling distribution by obtaining job preloading information and computing resource information of the job running device; the job The running device includes a master node and a working node, the master node is responsible for management and coordination, and the working node is responsible for executing data integration operations. 7.根据权利要求6所述的一种分布式数据集成作业调度装置,其特征在于:所述的作业调度装置与作业运行装置均注册到ZooKeeper中,所述的作业调度装置采用主备模式,主装置一旦宕机,ZooKeeper选举出备用装置并接替作业调度工作;作业运行装置中的主控节点采用主备模式,主控节点一旦宕机,ZooKeeper选举备用主控节点接替管理协调工作。7. A distributed data integration job scheduling device according to claim 6, wherein the job scheduling device and the job running device are both registered in ZooKeeper, and the job scheduling device adopts a master-standby mode, Once the master device goes down, ZooKeeper elects a backup device and takes over the job scheduling work; the master node in the job running device adopts the master-standby mode. Once the master node goes down, ZooKeeper elects a backup master node to take over the management and coordination work. 8.根据权利要求7所述的一种分布式数据集成作业调度装置,其特征在于:所述的作业调度装置基于作业运行装置反馈的作业状态信息进行审计和维护,作业元数据库基于作业调度装置的作业元数据缓存维护一致性;在作业调度装置发生故障时,备用作业调度装置一旦接替工作,需要与作业元数据库交互,重建作业元数据缓存机制,并通过接收来自作业运行装置的作业状态反馈信息,审计维护元数据缓存信息,使其在分布系统中保持一致。8 . The distributed data integration job scheduling device according to claim 7 , wherein the job scheduling device performs auditing and maintenance based on job status information fed back by the job running device, and the job metadata database is based on the job scheduling device. 9 . The job metadata cache maintains consistency; when the job scheduling device fails, once the backup job scheduling device takes over the job, it needs to interact with the job metadata database, rebuild the job metadata caching mechanism, and receive job status feedback from the job running device. information, auditing maintains metadata cache information to keep it consistent across distributed systems.
CN201910489422.2A 2019-06-06 2019-06-06 Distributed data integration job scheduling method and device Active CN110362390B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN201910489422.2A CN110362390B (en) 2019-06-06 2019-06-06 Distributed data integration job scheduling method and device

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN201910489422.2A CN110362390B (en) 2019-06-06 2019-06-06 Distributed data integration job scheduling method and device

Publications (2)

Publication Number Publication Date
CN110362390A CN110362390A (en) 2019-10-22
CN110362390B true CN110362390B (en) 2021-09-07

Family

ID=68215696

Family Applications (1)

Application Number Title Priority Date Filing Date
CN201910489422.2A Active CN110362390B (en) 2019-06-06 2019-06-06 Distributed data integration job scheduling method and device

Country Status (1)

Country Link
CN (1) CN110362390B (en)

Families Citing this family (14)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN111045802B (en) * 2019-11-22 2024-01-26 中国联合网络通信集团有限公司 Redis cluster component scheduling system and method, platform equipment
CN111124806B (en) * 2019-11-25 2023-09-05 山东鲁软数字科技有限公司 Method and system for monitoring equipment state in real time based on distributed scheduling task
CN111338770A (en) * 2020-02-12 2020-06-26 咪咕文化科技有限公司 A task scheduling method, server and computer-readable storage medium
CN111580990A (en) * 2020-05-08 2020-08-25 中国建设银行股份有限公司 Task scheduling method, scheduling node, centralized configuration server and system
CN113961318A (en) * 2020-07-20 2022-01-21 百度在线网络技术(北京)有限公司 Distributed scheduling method, device, equipment and storage medium
CN114265672A (en) * 2020-09-16 2022-04-01 广州天维信息技术股份有限公司 Parallel scheduling scheme and system based on multi-tenant distributed performance system
CN112200534A (en) * 2020-09-24 2021-01-08 中国建设银行股份有限公司 Method and device for managing time events
CN112328383A (en) * 2020-11-19 2021-02-05 湖南智慧畅行交通科技有限公司 Priority-based job concurrency control and scheduling algorithm
CN112131318B (en) * 2020-11-30 2021-03-16 北京优炫软件股份有限公司 Pre-written log record ordering system in database cluster
CN112527488A (en) * 2020-12-21 2021-03-19 浙江百应科技有限公司 Distributed high-availability task scheduling method and system
CN112835717B (en) * 2021-02-05 2024-06-28 远光软件股份有限公司 Integrated application processing method and device for clusters
CN113778676B (en) * 2021-09-02 2023-05-23 山东派盟网络科技有限公司 Task scheduling system, method, computer device and storage medium
CN113986507A (en) * 2021-11-01 2022-01-28 佛山技研智联科技有限公司 Job scheduling method and device, computer equipment and storage medium
CN114118043A (en) * 2021-11-17 2022-03-01 粤港澳国际供应链(广州)有限公司 A method of dynamically generating PDF

Citations (7)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US6490572B2 (en) * 1998-05-15 2002-12-03 International Business Machines Corporation Optimization prediction for industrial processes
CN101309208A (en) * 2008-06-21 2008-11-19 华中科技大学 A Job Scheduling System Based on Reliability Cost for Grid Environment
CN101599026A (en) * 2009-07-09 2009-12-09 浪潮电子信息产业股份有限公司 A Cluster Job Scheduling System with Elastic Architecture
CN104317650A (en) * 2014-10-10 2015-01-28 北京工业大学 Map/Reduce type mass data processing platform-orientated job scheduling method
CN104462370A (en) * 2014-12-09 2015-03-25 北京百度网讯科技有限公司 Distributed task scheduling system and method
US9141433B2 (en) * 2009-12-18 2015-09-22 International Business Machines Corporation Automated cloud workload management in a map-reduce environment
CN109327509A (en) * 2018-09-11 2019-02-12 武汉魅瞳科技有限公司 A kind of distributive type Computational frame of the lower coupling of master/slave framework

Patent Citations (7)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US6490572B2 (en) * 1998-05-15 2002-12-03 International Business Machines Corporation Optimization prediction for industrial processes
CN101309208A (en) * 2008-06-21 2008-11-19 华中科技大学 A Job Scheduling System Based on Reliability Cost for Grid Environment
CN101599026A (en) * 2009-07-09 2009-12-09 浪潮电子信息产业股份有限公司 A Cluster Job Scheduling System with Elastic Architecture
US9141433B2 (en) * 2009-12-18 2015-09-22 International Business Machines Corporation Automated cloud workload management in a map-reduce environment
CN104317650A (en) * 2014-10-10 2015-01-28 北京工业大学 Map/Reduce type mass data processing platform-orientated job scheduling method
CN104462370A (en) * 2014-12-09 2015-03-25 北京百度网讯科技有限公司 Distributed task scheduling system and method
CN109327509A (en) * 2018-09-11 2019-02-12 武汉魅瞳科技有限公司 A kind of distributive type Computational frame of the lower coupling of master/slave framework

Also Published As

Publication number Publication date
CN110362390A (en) 2019-10-22

Similar Documents

Publication Publication Date Title
CN110362390B (en) Distributed data integration job scheduling method and device
US10817478B2 (en) System and method for supporting persistent store versioning and integrity in a distributed data grid
Lin et al. Towards a non-2pc transaction management in distributed database systems
CN104793988B (en) The implementation method and device of integration across database distributed transaction
US8799248B2 (en) Real-time transaction scheduling in a distributed database
KR102013005B1 (en) Managing partitions in a scalable environment
US9548912B2 (en) System and method for supporting smart buffer management in a distributed data grid
US11334422B2 (en) System and method for data redistribution in a database
CN110019469B (en) Distributed database data processing method and device, storage medium and electronic device
US20090172142A1 (en) System and method for adding a standby computer into clustered computer system
EP2673711A1 (en) Method and system for reducing write latency for database logging utilizing multiple storage devices
CN114064414A (en) High-availability cluster state monitoring method and system
EP4229516B1 (en) System and method for rapid fault detection and repair in a shared nothing distributed database
US11550820B2 (en) System and method for partition-scoped snapshot creation in a distributed data computing environment
US9703634B2 (en) Data recovery for a compute node in a heterogeneous database system
US11003550B2 (en) Methods and systems of operating a database management system DBMS in a strong consistency mode
US11537574B2 (en) Autonomous database defragmentation
EP4404059A1 (en) Unified resource management architecture for workload schedulers
US9424147B2 (en) System and method for supporting memory allocation control with push-back in a distributed data grid
CN117331751A (en) Multi-node backup system and method for database
CN113342511A (en) Distributed task management system and method
CN117931302B (en) Parameter file saving and loading method, device, equipment and storage medium
WO2025010735A1 (en) Hybrid database implementations
WO2025010725A1 (en) Hybrid database implementations
WO2025010728A1 (en) Hybrid database implementations

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant
CP01 Change in the name or title of a patent holder

Address after: 310012 1st floor, building 1, 223 Yile Road, Hangzhou City, Zhejiang Province

Patentee after: Yinjiang Technology Co.,Ltd.

Address before: 310012 1st floor, building 1, 223 Yile Road, Hangzhou City, Zhejiang Province

Patentee before: ENJOYOR Co.,Ltd.

CP01 Change in the name or title of a patent holder
EE01 Entry into force of recordation of patent licensing contract

Application publication date: 20191022

Assignee: HANGZHOU ENJOYOR SMART CITY TECHNOLOGY GROUP CO.,LTD.

Assignor: Yinjiang Technology Co.,Ltd.

Contract record no.: X2024980042648

Denomination of invention: A distributed data integration job scheduling method and device

Granted publication date: 20210907

License type: Common License

Record date: 20250102

EE01 Entry into force of recordation of patent licensing contract