CN113672452A - Method and system for monitoring operation of data acquisition task - Google Patents
Method and system for monitoring operation of data acquisition task Download PDFInfo
- Publication number
- CN113672452A CN113672452A CN202110839412.4A CN202110839412A CN113672452A CN 113672452 A CN113672452 A CN 113672452A CN 202110839412 A CN202110839412 A CN 202110839412A CN 113672452 A CN113672452 A CN 113672452A
- Authority
- CN
- China
- Prior art keywords
- data acquisition
- task
- node
- acquisition task
- monitoring
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Withdrawn
Links
- 238000000034 method Methods 0.000 title claims abstract description 118
- 238000012544 monitoring process Methods 0.000 title claims abstract description 116
- 230000008569 process Effects 0.000 claims abstract description 82
- 238000012546 transfer Methods 0.000 claims description 19
- 238000012545 processing Methods 0.000 claims description 16
- 238000013480 data collection Methods 0.000 claims description 15
- 230000003993 interaction Effects 0.000 claims description 7
- 230000004907 flux Effects 0.000 description 10
- 241001481833 Coryphaena hippurus Species 0.000 description 5
- 238000010586 diagram Methods 0.000 description 3
- 238000006467 substitution reaction Methods 0.000 description 3
- 230000000007 visual effect Effects 0.000 description 3
- 238000012423 maintenance Methods 0.000 description 2
- 238000012986 modification Methods 0.000 description 2
- 230000004048 modification Effects 0.000 description 2
- 230000004931 aggregating effect Effects 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 238000013075 data extraction Methods 0.000 description 1
- 238000013501 data transformation Methods 0.000 description 1
- 238000013461 design Methods 0.000 description 1
- 238000011161 development Methods 0.000 description 1
- 238000011068 loading method Methods 0.000 description 1
- 230000007246 mechanism Effects 0.000 description 1
- 238000011084 recovery Methods 0.000 description 1
- 230000009466 transformation Effects 0.000 description 1
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F11/00—Error detection; Error correction; Monitoring
- G06F11/30—Monitoring
- G06F11/3003—Monitoring arrangements specially adapted to the computing system or computing system component being monitored
- G06F11/302—Monitoring arrangements specially adapted to the computing system or computing system component being monitored where the computing system component is a software system
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F11/00—Error detection; Error correction; Monitoring
- G06F11/30—Monitoring
- G06F11/3051—Monitoring arrangements for monitoring the configuration of the computing system or of the computing system component, e.g. monitoring the presence of processing resources, peripherals, I/O links, software programs
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/23—Updating
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4843—Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
- G06F9/4881—Scheduling strategies for dispatcher, e.g. round robin, multi-level priority queues
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Computing Systems (AREA)
- Quality & Reliability (AREA)
- Software Systems (AREA)
- Data Mining & Analysis (AREA)
- Databases & Information Systems (AREA)
- Mathematical Physics (AREA)
- Debugging And Monitoring (AREA)
Abstract
The invention provides a method and a system for monitoring the operation of a data acquisition task, wherein the method comprises the following steps: configuring data acquisition tasks to generate a directed acyclic graph representing the relationship among the data acquisition tasks, and issuing the directed acyclic graph to a main node; the master node analyzes the directed acyclic graph, acquires configuration information and sends the configuration information to the service node; the service node runs the data acquisition task according to the configuration information and reports the running result to the main node; if the operation is successful, the service node adds the data acquisition task to a monitoring queue; a monitoring thread in a service node acquires a data acquisition task of a monitoring queue for monitoring; if the data acquisition task is stopped, reporting the data acquisition task to the main node; and the main node changes the state of the data acquisition task and outputs and displays the state. And monitoring the data acquisition task, and reporting corresponding information if the data acquisition task stops in the operation process so as to facilitate the user to check.
Description
Technical Field
The invention relates to the technical field of data acquisition task monitoring, in particular to a method and a system for monitoring the operation of a data acquisition task.
Background
In the big data era, Flume is used as an excellent data acquisition tool for a plurality of scenes. And constructing a basic data acquisition unit by using agents, wherein one Agent comprises Source, Sink and Channel. The Source can collect data from various data sources such as log files, network ports, Kafka and the like, and packages the data into an Event, and the Event can be processed through a series of interceptors and written into a Channel. After the data is successfully written into the Channel, the Sink actively pulls the data from the Channel and writes the data into various large data components such as HDFS, HBase, Hive, ES and the like.
The Agent is a Java process running in a plurality of machines so as to collect log information in the machines, and then the Agent is combined with the Agent to collect and distribute the logs. However, in this scenario, since the logs may exist in multiple hosts, the flux Agent (data collection task) may also be scattered in the deployment and operation of the multiple hosts, and the monitoring Agent needs to log in to the corresponding host and start the monitoring Agent. If the number of hosts is too many, the process is very complicated, and a relatively large operation and maintenance cost is generated.
Dolphin scheduler (task scheduling System: distributed easy-to-expand visual DAG workflow task scheduling System). Is a top level entry of Apache. The dolphin scheduler has two major functions: 1. and various types of tasks such as Shell and MR are scheduled, monitored and managed at regular time according to the sequence of DAG (directed acyclic graph), so that the operation and maintenance cost of large data development is reduced. 2. And the DAG and the task information in the DAG are configured in a visual dragging and pulling mode, so that the method is convenient and easy to use.
The existing Dolphin scheduler does not support the scheduling of the flow type task, and the flow task cannot monitor the progress of the flow task after running, and needs to log in a corresponding host to check the state of the progress, so that a large amount of workload can be realized if more flow agents (data acquisition tasks) are available.
Disclosure of Invention
The existing Dolphin scheduler does not support the scheduling of the flux type task, and the flux task cannot monitor the flux task process after running, and needs to log in a corresponding host to check the state of the process, which is a problem of larger workload if flux agents are more.
The technical scheme of the invention is as follows:
on one hand, the technical scheme of the invention provides a method for monitoring the operation of a data acquisition task, which comprises the following steps:
configuring data acquisition tasks to generate a directed acyclic graph representing the relationship among the data acquisition tasks, and issuing the directed acyclic graph to a main node;
the master node analyzes the directed acyclic graph, acquires configuration information and sends the configuration information to the service node;
the service node runs the data acquisition task according to the configuration information and reports the running result to the main node;
if the data acquisition task is successfully operated, the service node adds the data acquisition task to a monitoring queue; a monitoring thread in a service node acquires a data acquisition task of a monitoring queue for monitoring;
if the data acquisition task runs normally, the data acquisition task is put back to the monitoring queue; if the data acquisition task stops running, reporting the data acquisition task to the main node; and the main node changes the state of the data acquisition task and outputs and displays the state.
Configuring a directed acyclic graph of the relationship between the data acquisition tasks, submitting an operation command and issuing the operation command to the main node; the main node splits the directed acyclic graph and issues tasks to the service nodes; the service node runs a data acquisition task and reports a running result to the main node; and displaying the running condition of the task.
The method comprises the steps of increasing a scheduling mode of a data acquisition task, operating the data acquisition task, adding the data acquisition task to a corresponding monitoring queue when the data acquisition task is successfully operated, monitoring the monitoring state of the task by a monitoring thread, and updating the task state after the master node receives report information if the data acquisition task is successfully put into the monitoring queue and fails to report to the master node.
Further, the step that the master node analyzes the directed acyclic graph, acquires the configuration information and sends the configuration information to the service node comprises;
the main node analyzes the directed acyclic graph to obtain a first task node;
and sending the configuration information of the task node to the service node. The main node splits the directed acyclic graph and issues tasks to the service nodes.
Further, the step that the service node runs the data acquisition task according to the configuration information and reports the running result to the main node comprises:
the service node assembles the configuration information to generate a configuration file; the configuration information comprises node information and data source information;
and the service node copies the configuration file to the corresponding host, starts a data acquisition task and reports the operation result to the host node.
Further, the step of starting the data collection task and reporting the operation result to the master node includes:
starting a data acquisition task, if the operation is successful, the state is recorded successfully, the PID of the process is the operation PID, otherwise, the state is recorded unsuccessfully, and the PID of the process is 0;
and reporting the recorded task state information to the main node, wherein the task state information comprises state success, the PID of the process is PID operation and state failure, and the PID of the process is 0.
Further, the step of the master node changing the state of the data collection task includes:
the main node receives the task state information and stores the task state information into a database;
the main node judges according to the received task state information;
if the received state is successful, judging whether the task has a subsequent node, if so, executing the following steps: the main node analyzes the directed acyclic graph to obtain a next task node; if not, setting the state of the directed acyclic graph as successful operation;
and if the state is failed, setting the state of the directed acyclic graph as operation failure.
And the main node determines whether to execute the next data acquisition task according to the specific task condition and updates and sets the running condition of the task.
Further, if the data acquisition task is successfully operated, the service node adds the data acquisition task to the monitoring queue; the step of monitoring the data acquisition task of the monitoring queue acquired by the monitoring thread in the service node comprises the following steps:
after the data acquisition task is successfully operated, putting task information of the data acquisition task into a monitoring queue; the task information comprises a host name, an IP and a PID of a process of the data acquisition task;
starting a timing thread, taking out task information from the monitoring queue at regular time, connecting a host computer for running the data acquisition task, and checking whether the PID of the process exists or not;
and if the data acquisition task exists, the task information of the data acquisition task is put back to the monitoring queue, and if the data acquisition task does not exist, the task information of the data acquisition task is put into the reporting queue.
The monitoring thread acquires information to be monitored from the monitoring queue, checks the state of the data acquisition task, puts the data acquisition task back to the original monitoring queue if the data acquisition task is running, and puts the data acquisition task into the reporting queue if the data acquisition task stops running.
Further, if the data acquisition task stops running, reporting the data acquisition task to the main node; the step that the main node changes the state of the data acquisition task and outputs and displays the state comprises the following steps:
starting a reporting thread, taking out task information from a reporting queue and sending the task information to a main node;
and the main node updates the task information into the database and displays the task information on the interface.
And reporting the task information in the reporting queue to the main node by the reporting thread, updating the task information into a database by the main node, and finally displaying the task information on a UI (user interface).
Further, in order to avoid the loss of monitoring of the data collection task, the method further comprises:
monitoring the state of the service node;
when the main node scans that the process of the service node is lost, the main node transfers the data acquisition task process which is monitored on the service node to other service nodes for processing.
Further, when the master node scans that the process of the service node is lost, the step that the master node transfers the process of the data acquisition task being monitored on the service node to other service nodes for execution includes:
when the main node scans that the process of the service node is lost, counting the process of a data acquisition task monitored by the service node;
the main node sends the task information of the counted data acquisition task to other service nodes of the cluster according to a polling method;
and the service node adds the task information sent by the main node into the monitoring queue.
Therefore, the running state of the running data acquisition task can be monitored in real time, and the data stream of the acquired data can be monitored and processed. And meanwhile, the system has a fault transfer function, and when one service node fails, the main node transfers the task monitored by the service node to other service nodes.
On the other hand, the technical scheme of the invention also provides an operation monitoring system of the data acquisition task, which comprises a UI and API interaction module, a main node cluster and a service node cluster;
the UI and API interaction module is used for configuring the data acquisition tasks, generating a directed acyclic graph representing the relationship among the data acquisition tasks and issuing the directed acyclic graph to the main node cluster;
the main node cluster is used for analyzing the directed acyclic graph, acquiring configuration information and issuing the configuration information to the service nodes; the data acquisition system is also used for changing the state of the data acquisition task which stops running and outputting and displaying the state;
the service node cluster is used for operating the data acquisition task according to the configuration information and reporting the operating state to the main node cluster; when the data acquisition task is successfully operated, adding the data acquisition task into a monitoring queue;
each service node is provided with a monitoring thread module for acquiring a data acquisition task of a monitoring queue to monitor; when the data acquisition task runs normally, the data acquisition task is put back to the monitoring queue; and when the operation of the data acquisition task is stopped, reporting the data acquisition task to the main node cluster.
Furthermore, each main node comprises an analysis module and a task issuing module;
the analysis module is used for analyzing the directed acyclic graph to obtain a first task node;
and the task issuing module is used for issuing the configuration information of the task node to the service node. The analysis module splits the directed acyclic graph, and the task issuing module issues the tasks to the service nodes one by one.
Furthermore, a processing module and a task starting module are also arranged in the service node;
the processing module is used for assembling the configuration information to generate a configuration file and copying the configuration file to a corresponding host; the configuration information comprises node information and data source information;
the task starting module is used for starting a data acquisition task and reporting an operation result to the main node; the method is specifically used for starting a data acquisition task, if the operation is successful, the state is recorded successfully, the PID of the process is the operation PID, otherwise, the state is recorded unsuccessfully, and the PID of the process is 0; and reporting the recorded task state information to the main node, wherein the task state information comprises state success, the PID of the process is PID operation and state failure, and the PID of the process is 0.
Further, the main node comprises a task state information receiving module, a state judging module, a node judging module and an updating module;
the task state information receiving module is used for receiving the task state information by the main node and storing the task state information into the database;
the state judgment module is used for judging according to the received task state information;
the node judging module is used for judging whether the task has a subsequent node or not if the state judging module judges that the state is successful, and outputting information to the analyzing module to acquire the next task node if the task has the subsequent node; if not, outputting information to an updating module;
the updating module is used for setting the state of the directed acyclic graph as successful operation if the node judging module judges that no next task node exists; or when the received state judgment module judges that the state fails, setting the state of the directed acyclic graph as operation failure.
And the main node determines whether to execute the next data acquisition task according to the specific task condition and updates and sets the running condition of the task.
Further, the task starting module is also used for putting task information of the data acquisition task into a monitoring queue if the data acquisition task is successfully operated; the task information comprises a host name, an IP and a PID of a process of the data acquisition task;
the monitoring thread module is used for taking out task information from the monitoring queue at regular time, connecting a host computer for running the data acquisition task and checking whether the PID of the process exists or not; and if the data acquisition task exists, the task information of the data acquisition task is put back to the monitoring queue, and if the data acquisition task does not exist, the task information of the data acquisition task is put into the reporting queue.
Furthermore, the service node is also provided with a reporting thread module which is used for taking out the task information from the reporting queue and sending the task information to an updating module of the main node;
and the updating module is used for updating the task information into the database and displaying the task information on the interface.
And the reporting thread module reports the task information in the reporting queue to the main node, and the main node updates the task information to the database and finally displays the task information on a UI (user interface).
Furthermore, in order to avoid the loss of monitoring of the data acquisition task, each main node is also provided with a process scanning module and a transfer processing module;
the process scanning module is used for monitoring the state of the service node by scanning the progress of the service node;
the transfer processing module is used for transferring the data acquisition task process monitored on the service node to other service nodes for processing when a main node scans that the process of the service node is lost; the method is particularly used for counting the data acquisition task process monitored by the service node when the process of the service node is lost by scanning; sending the task information of the counted data acquisition task to other service nodes of the cluster according to a polling method;
and the task starting module is used for adding the task information sent by the transfer processing module into the monitoring queue.
According to the technical scheme, the invention has the following advantages: after the data acquisition task runs, the data acquisition task is monitored, if the data acquisition task stops in the running process, corresponding information is reported so that a user can check the data acquisition task conveniently, meanwhile, the fault transfer function is achieved, when a service node of the task scheduling system stops abnormally, the data acquisition task which is being monitored on the service node can be transferred to other service nodes, and the data acquisition task cannot be monitored.
In addition, the invention has reliable design principle, simple structure and very wide application prospect.
Therefore, compared with the prior art, the invention has prominent substantive features and remarkable progress, and the beneficial effects of the implementation are also obvious.
Drawings
In order to more clearly illustrate the embodiments or technical solutions in the prior art of the present invention, the drawings used in the description of the embodiments or prior art will be briefly described below, and it is obvious for those skilled in the art that other drawings can be obtained based on these drawings without creative efforts.
FIG. 1 is a schematic flow diagram of a method of one embodiment of the invention.
Fig. 2 is a schematic flow diagram of a method of another embodiment of the invention.
FIG. 3 is a schematic block diagram of a system of one embodiment of the present invention.
In the figure, 10-UI and API interaction module, 201-analysis module, 202-task issuing module, 203-task state information receiving module, 204-state judgment module, 205-node judgment module, 206-updating module, 301-monitoring thread module, 302-processing module, 303-task starting module and 304-reporting thread module.
Detailed Description
In order to make those skilled in the art better understand the technical solution of the present invention, the technical solution in the embodiment of the present invention will be clearly and completely described below with reference to the drawings in the embodiment of the present invention, and it is obvious that the described embodiment is only a part of the embodiment of the present invention, and not all embodiments. All other embodiments, which can be derived by a person skilled in the art from the embodiments given herein without making any creative effort, shall fall within the protection scope of the present invention.
The method comprises the following steps that the flash Agent is a distributed, reliable and high-availability system for collecting, aggregating and transmitting mass logs, and the flash Agent is defined as a data collection task in the application.
Dolphin scheduler: a distributed easily extensible visual DAG workflow task scheduling system. Is a top level entry of Apache. The task scheduling system is defined in the application;
HDFS (Hadoop distributed File System): a distributed file system.
Kafka: a distributed message queue.
A DAG: and the directed acyclic graph is used for representing the scheduling sequence among the tasks.
Hive: hive is a data warehouse tool based on Hadoop, which is used for data extraction, transformation and loading, and is a mechanism capable of storing, querying and analyzing large-scale data stored in Hadoop.
As shown in fig. 1, an embodiment of the present invention provides a method for monitoring operation of a data acquisition task, including the following steps:
step 11: configuring data acquisition tasks to generate a directed acyclic graph representing the relationship among the data acquisition tasks, and issuing the directed acyclic graph to a main node;
step 12: the master node analyzes the directed acyclic graph, acquires configuration information and sends the configuration information to the service node;
step 13: the service node runs the data acquisition task according to the configuration information and reports the running result to the main node; if the data acquisition task is successfully operated, executing step 14, otherwise, executing step 18;
step 14: the service node adds the data acquisition task to a monitoring queue;
step 15: a monitoring thread in a service node acquires a data acquisition task of a monitoring queue for monitoring; if the state is normal, executing step 16, otherwise, executing step 17;
step 16: it is placed back into the monitoring queue; step 18 is executed;
and step 17: reporting the data acquisition task to a main node;
step 18: and the main node changes the state of the data acquisition task and outputs and displays the state.
Configuring a directed acyclic graph of the relationship between the data acquisition tasks, submitting an operation command and issuing the operation command to the main node; the main node splits the directed acyclic graph and issues tasks to the service nodes; the service node runs a data acquisition task and reports a running result to the main node; and displaying the running condition of the task.
The method comprises the steps of increasing a scheduling mode of a data acquisition task, operating the data acquisition task, adding the data acquisition task to a corresponding monitoring queue when the data acquisition task is successfully operated, monitoring the monitoring state of the task by a monitoring thread, and updating the task state after the master node receives report information if the data acquisition task is successfully put into the monitoring queue and fails to report to the master node.
As shown in fig. 2, in some embodiments, in step 12, the step of the master node analyzing the directed acyclic graph, acquiring the configuration information, and sending the configuration information to the service node includes;
step 121: the main node analyzes the directed acyclic graph to obtain a first task node;
step 122: and sending the configuration information of the task node to the service node. The main node splits the directed acyclic graph and issues tasks to the service nodes.
In some embodiments, in step 13, the step of the service node running the data collection task according to the configuration information and reporting the running result to the master node includes:
step 131: the service node assembles the configuration information to generate a configuration file; the configuration information comprises node information and data source information;
step 132: and the service node copies the configuration file to the corresponding host, starts a data acquisition task and reports the operation result to the host node.
It should be noted that, in step 132, the step of starting the data collection task and reporting the operation result to the master node includes:
step 1321: starting a data acquisition task; judging whether the operation is successful, if so, executing the step 1322, otherwise, executing the step 1323;
step 1322: recording the state successfully, wherein the PID of the process is the running PID;
step 1323: the recording state fails, and the PID of the process is 0;
step 1324: and reporting the task state information recorded in the step 1323 and the step 1323 to the master node, wherein the task state information comprises state success, the PID of the process is running PID and state failure, and the PID of the process is 0.
In some embodiments, in step 18, the master node changing the state of the data collection task includes:
step 181: the main node receives the task state information and stores the task state information into a database;
step 182: the main node judges according to the received task state information; if the status is successfully received, go to step 183, if the status is failed, go to step 185;
step 183: judging whether the task has a subsequent node, if so, executing step 121 to obtain the next task node; if not, go to step 184;
step 184: setting the state of the directed acyclic graph to be successfully operated;
step 185: setting the state of the directed acyclic graph as a running failure.
And the main node determines whether to execute the next data acquisition task according to the specific task condition and updates and sets the running condition of the task.
In some embodiments, in step 14, specifically, after the data collection task is successfully executed, the task information of the data collection task is put into a monitoring queue; the task information comprises a host name, an IP and a PID of a process of the data acquisition task; the PID of a process is an identifier that represents the identity of the process.
Step 15, starting a timing thread, taking out task information from the monitoring queue at regular time, connecting a host computer for running the data acquisition task, and checking whether the PID of the process exists or not;
if yes, step 16 specifically includes: putting the task information of the data acquisition task back to a monitoring queue;
if not, step 17 specifically includes:
step 171: putting the task information of the data acquisition task into a reporting queue;
step 172: and starting a reporting thread, taking out the task information from the reporting queue and sending the task information to the main node.
The monitoring thread acquires information to be monitored from the monitoring queue, checks the state of the data acquisition task, puts the data acquisition task back to the original monitoring queue if the data acquisition task is running, and puts the data acquisition task into the reporting queue if the data acquisition task stops running.
In step 18, the step of changing the state of the data acquisition task and outputting and displaying by the master node includes: and the main node updates the task information into the database and displays the task information on the interface.
And reporting the task information in the reporting queue to the main node by the reporting thread, updating the task information into a database by the main node, and finally displaying the task information on a UI (user interface).
In some embodiments, to avoid loss of monitoring of the data collection task, the method further comprises:
step 19: monitoring the state of the service node; when the main node scans that the process of the service node is lost, the main node transfers the data acquisition task process which is monitored on the service node to other service nodes for processing.
It should be noted that, when the master node scans that the process of the service node is lost, the step that the master node transfers the process of the data collection task being monitored on the service node to another service node for performing includes:
step 191, when the master node scans that the process of the service node is lost, counting the process of the data acquisition task monitored by the service node;
step 192: the main node sends the task information of the counted data acquisition task to other service nodes of the cluster according to a polling method;
step 193: the service node adds the task information sent by the main node to a monitoring queue; step 15 is performed.
Therefore, the running state of the running data acquisition task can be monitored in real time, and the data stream of the acquired data can be monitored and processed. And meanwhile, the system has a fault transfer function, and when one service node fails, the main node transfers the task monitored by the service node to other service nodes.
As can be seen from the above embodiments, the method of the present application mainly includes:
task running part
1. Configuring the FLUME tasks on the UI interface, and issuing a DAG graph of the relationship among the FLUME tasks to the main node;
2. a Master node (Master) splits a DAG and issues tasks to service nodes;
3. and the business node (Worker) runs the flash Agent, reports the running result to the main node, and the main node determines whether to execute the next flash Agent according to the specific task condition and finally displays the next flash Agent in the UI interface.
Second, flux task monitoring
1. If the flux Agent is successfully operated, the Agent information is sent to a monitoring queue;
2. the flux Agent in the monitoring queue can be taken out by the monitoring thread of the service node, the running state of the Agent is checked at the same time, if the Agent is not running, the Agent is reported to the main node, and if the Agent is running, the Agent returns to the monitoring queue again;
3. and the master node changes the state of the flux Agent and displays the changed state.
Third, service node monitoring fault recovery
In the operating environment, if the business node process is hung, the main node transfers the monitored flow Agent process on the business node to other business nodes for operation, and the phenomenon that the flow Agent which is monitored by the hung business node loses monitoring is avoided.
The specific process is as follows:
(1) the API issues a starting command and specifies a DAG to be executed;
(2) a Master node (Master) cluster executes DAG analysis work to analyze a first node;
(2) node information and data source information are sent to a service node;
(4) a service node (Worker) receives a starting command of a main node and starts a starting process;
(5) generating a flash configuration according to the configuration information, copying the flash configuration to a corresponding host, starting a flash Agent, recording PID as an operation PID, wherein the Agent is successfully started and the state is successful; the Agent fails to start, the state record is failure, the process PID is 0, and the step (6) is executed;
(6) reporting the task state to the main node;
(7) the main node receives the task state and stores the state and the process PID into a database, if the task state is successful, the execution is carried out (8), and if the task state is failed, the execution is carried out (10);
(8) judging whether the task has a subsequent node according to the task state, wherein an execution step (2) exists, a next node is obtained, and a step (9) is not executed;
(9) setting the state of the DAG as successful operation, and ending the process;
(10) and setting the state of the DAG as operation failure, and ending the process.
The process is stopped due to the exception at one service node, which causes the loss of the node registered in the Zookeeper by the service node; the method comprises the steps that a process of a service node is lost when a main node in a main node cluster scans, a fault transfer process of the service node is started, in the transfer process, the number of all flux Agent processes and information monitored by the service node stopping the process are counted, the main node sends monitored information to other service nodes according to a polling method, and other nodes add the information sent by the main node to an Agent information queue to monitor data again.
As shown in fig. 3, the technical solution of the present invention further provides an operation monitoring system for a data acquisition task, which includes a UI and API interaction module 10, a master node cluster, and a service node cluster;
the UI and API interaction module 10 is used for configuring data acquisition tasks, generating a directed acyclic graph representing the relationship among the data acquisition tasks and issuing the directed acyclic graph to the main node cluster;
the main node cluster is used for analyzing the directed acyclic graph, acquiring configuration information and issuing the configuration information to the service nodes; the data acquisition system is also used for changing the state of the data acquisition task which stops running and outputting and displaying the state;
the service node cluster is used for operating the data acquisition task according to the configuration information and reporting the operating state to the main node cluster; when the data acquisition task is successfully operated, adding the data acquisition task into a monitoring queue; the main node cluster comprises a plurality of main nodes, and the service node cluster comprises a plurality of service nodes.
Each service node is provided with a monitoring thread module 301 for acquiring a data acquisition task of a monitoring queue to monitor; when the data acquisition task runs normally, the data acquisition task is put back to the monitoring queue; and when the operation of the data acquisition task is stopped, reporting the data acquisition task to the main node cluster.
In some embodiments, each master node includes a parsing module 201 and a task issuing module 202;
the analysis module 201 is configured to analyze the directed acyclic graph to obtain a first task node;
and the task issuing module 202 is configured to issue the configuration information of the task node to the service node. The analysis module splits the directed acyclic graph, and the task issuing module issues the tasks to the service nodes one by one.
In some embodiments, the service node is further provided with a processing module 302 and a task starting module 303;
a processing module 302, configured to assemble the configuration information to generate a configuration file, and copy the configuration file to a corresponding host; the configuration information comprises node information and data source information;
the task starting module 303 is used for starting a data acquisition task and reporting an operation result to the main node; the method is specifically used for starting a data acquisition task, if the operation is successful, the state is recorded successfully, the PID of the process is the operation PID, otherwise, the state is recorded unsuccessfully, and the PID of the process is 0; and reporting the recorded task state information to the main node, wherein the task state information comprises state success, the PID of the process is PID operation and state failure, and the PID of the process is 0.
In some embodiments, the master node includes a task state information receiving module 203, a state determining module 204, a node determining module 205, and an updating module 206;
the task state information receiving module 203 is used for receiving the task state information by the main node and storing the task state information into a database;
a state judgment module 204, configured to perform judgment according to the received task state information;
a node determining module 205, configured to determine whether a subsequent node exists in the task if the state determining module determines that the state is successful, and if so, output information to the parsing module to obtain a next task node; if not, outputting information to an updating module;
an updating module 206, configured to set the state of the directed acyclic graph as a successful operation if the node determining module determines that there is no next task node; or when the received state judgment module judges that the state fails, setting the state of the directed acyclic graph as operation failure.
And the main node determines whether to execute the next data acquisition task according to the specific task condition and updates and sets the running condition of the task.
In some embodiments, the task starting module 303 is further configured to, if the data acquisition task is successfully executed, place task information of the data acquisition task into a monitoring queue; the task information comprises a host name, an IP and a PID of a process of the data acquisition task;
the monitoring thread module 301 is configured to take out task information from the monitoring queue at regular time, connect to a host in which the data acquisition task operates, and check whether a PID of the process exists; and if the data acquisition task exists, the task information of the data acquisition task is put back to the monitoring queue, and if the data acquisition task does not exist, the task information of the data acquisition task is put into the reporting queue.
In some embodiments, the service node is further provided with a reporting thread module 304, configured to take out task information from the reporting queue and send the task information to an update module of the host node;
and the updating module is used for updating the task information into the database and displaying the task information on the interface.
And the reporting thread module reports the task information in the reporting queue to the main node, and the main node updates the task information to the database and finally displays the task information on a UI (user interface).
In some embodiments, in order to avoid the loss of monitoring of the data acquisition task, each master node is further provided with a process scanning module and a transfer processing module;
the process scanning module is used for monitoring the state of the service node by scanning the progress of the service node;
the transfer processing module is used for transferring the data acquisition task process monitored on the service node to other service nodes for processing when a main node scans that the process of the service node is lost; the method is particularly used for counting the data acquisition task process monitored by the service node when the process of the service node is lost by scanning; sending the task information of the counted data acquisition task to other service nodes of the cluster according to a polling method;
and the task starting module is used for adding the task information sent by the transfer processing module into the monitoring queue.
Although the present invention has been described in detail by referring to the drawings in connection with the preferred embodiments, the present invention is not limited thereto. Various equivalent modifications or substitutions can be made on the embodiments of the present invention by those skilled in the art without departing from the spirit and scope of the present invention, and these modifications or substitutions are within the scope of the present invention/any person skilled in the art can easily conceive of the changes or substitutions within the technical scope of the present invention. Therefore, the protection scope of the present invention shall be subject to the protection scope of the appended claims.
Claims (10)
1. A method for monitoring the operation of a data acquisition task is characterized by comprising the following steps:
configuring data acquisition tasks to generate a directed acyclic graph representing the relationship among the data acquisition tasks, and issuing the directed acyclic graph to a main node;
the master node analyzes the directed acyclic graph, acquires configuration information and sends the configuration information to the service node;
the service node runs the data acquisition task according to the configuration information and reports the running result to the main node;
if the data acquisition task is successfully operated, the service node adds the data acquisition task to a monitoring queue; a monitoring thread in a service node acquires a data acquisition task of a monitoring queue for monitoring;
if the data acquisition task runs normally, the data acquisition task is put back to the monitoring queue; if the data acquisition task stops running, reporting the data acquisition task to the main node; and the main node changes the state of the data acquisition task and outputs and displays the state.
2. The method for monitoring the operation of the data acquisition task according to claim 1, wherein the step of the master node analyzing the directed acyclic graph, acquiring the configuration information and issuing the configuration information to the service node comprises;
the main node analyzes the directed acyclic graph to obtain a first task node;
and sending the configuration information of the task node to the service node.
3. The method for monitoring the operation of the data acquisition task according to claim 2, wherein the step of the service node operating the data acquisition task according to the configuration information and reporting the operation result to the master node comprises:
the service node assembles the configuration information to generate a configuration file; the configuration information comprises node information and data source information;
and the service node copies the configuration file to the corresponding host, starts a data acquisition task and reports the operation result to the host node.
4. The method for monitoring the operation of the data acquisition task according to claim 1, wherein the step of starting the data acquisition task and reporting the operation result to the master node comprises:
starting a data acquisition task, if the operation is successful, recording the state successfully, and the PID of the process is the operation PID, otherwise, recording the state failure and the PID of the process is 0;
and reporting the recorded task state information to the main node, wherein the task state information comprises state success, the PID of the process is PID operation and state failure, and the PID of the process is 0.
5. The method for monitoring the operation of the data acquisition task according to claim 4, wherein the step of the main node changing the state of the data acquisition task comprises the following steps:
the main node receives the task state information and stores the task state information into a database;
the main node judges according to the received task state information;
if the received state is successful, judging whether the task has a subsequent node, if so, executing the following steps: the main node analyzes the directed acyclic graph to obtain a next task node; if not, setting the state of the directed acyclic graph as successful operation;
and if the state is failed, setting the state of the directed acyclic graph as operation failure.
6. The method for monitoring the operation of the data acquisition task according to claim 4, wherein if the data acquisition task is successfully operated, the service node adds the data acquisition task to a monitoring queue; the step of monitoring the data acquisition task of the monitoring queue acquired by the monitoring thread in the service node comprises the following steps:
after the data acquisition task is successfully operated, putting task information of the data acquisition task into a monitoring queue; the task information comprises a host name, an IP and a PID of a process of the data acquisition task;
starting a timing thread, taking out task information from the monitoring queue at regular time, connecting a host computer for running the data acquisition task, and checking whether the PID of the process exists or not;
and if the data acquisition task exists, the task information of the data acquisition task is put back to the monitoring queue, and if the data acquisition task does not exist, the task information of the data acquisition task is put into the reporting queue.
7. The method for monitoring the operation of the data acquisition task according to claim 6, wherein if the operation of the data acquisition task is stopped, the data acquisition task is reported to the master node; the step that the main node changes the state of the data acquisition task and outputs and displays the state comprises the following steps:
starting a reporting thread, taking out task information from a reporting queue and sending the task information to a main node;
and the main node updates the task information into the database and displays the task information on the interface.
8. The method for monitoring the operation of a data collection task of claim 1, further comprising:
monitoring the state of the service node;
when the main node scans that the process of the service node is lost, the main node transfers the data acquisition task process which is monitored on the service node to other service nodes for processing.
9. The method for monitoring the operation of the data collection task according to claim 8, wherein the step of the master node transferring the monitored data collection task process on the service node to another service node when the master node scans that the service node process is lost comprises:
when the main node scans that the process of the service node is lost, counting the process of a data acquisition task monitored by the service node;
the main node sends the task information of the counted data acquisition task to other service nodes of the cluster according to a polling method;
and the service node adds the task information sent by the main node into the monitoring queue.
10. A running monitoring system of a data acquisition task is characterized by comprising a UI and API interaction module, a main node cluster and a service node cluster;
the UI and API interaction module is used for configuring the data acquisition tasks, generating a directed acyclic graph representing the relationship among the data acquisition tasks and issuing the directed acyclic graph to the main node cluster;
the main node cluster is used for analyzing the directed acyclic graph, acquiring configuration information and issuing the configuration information to the service nodes; the data acquisition system is also used for changing the state of the data acquisition task which stops running and outputting and displaying the state;
the service node cluster is used for operating the data acquisition task according to the configuration information and reporting the operating state to the main node cluster; when the data acquisition task is successfully operated, adding the data acquisition task into a monitoring queue;
each service node is provided with a monitoring thread module for acquiring a data acquisition task of a monitoring queue to monitor; when the data acquisition task runs normally, the data acquisition task is put back to the monitoring queue; and when the operation of the data acquisition task is stopped, reporting the data acquisition task to the main node cluster.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202110839412.4A CN113672452A (en) | 2021-07-23 | 2021-07-23 | Method and system for monitoring operation of data acquisition task |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202110839412.4A CN113672452A (en) | 2021-07-23 | 2021-07-23 | Method and system for monitoring operation of data acquisition task |
Publications (1)
Publication Number | Publication Date |
---|---|
CN113672452A true CN113672452A (en) | 2021-11-19 |
Family
ID=78540035
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202110839412.4A Withdrawn CN113672452A (en) | 2021-07-23 | 2021-07-23 | Method and system for monitoring operation of data acquisition task |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN113672452A (en) |
Cited By (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN114443292A (en) * | 2022-01-14 | 2022-05-06 | 苏州浪潮智能科技有限公司 | Client management method, device and system based on workflow task scheduling system |
CN116016117A (en) * | 2022-12-27 | 2023-04-25 | 山西合力创新科技股份有限公司 | Network equipment operation and maintenance data acquisition method and system, electronic equipment and storage medium |
-
2021
- 2021-07-23 CN CN202110839412.4A patent/CN113672452A/en not_active Withdrawn
Cited By (3)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN114443292A (en) * | 2022-01-14 | 2022-05-06 | 苏州浪潮智能科技有限公司 | Client management method, device and system based on workflow task scheduling system |
CN114443292B (en) * | 2022-01-14 | 2023-11-14 | 苏州浪潮智能科技有限公司 | Client management method, device and system based on workflow task scheduling system |
CN116016117A (en) * | 2022-12-27 | 2023-04-25 | 山西合力创新科技股份有限公司 | Network equipment operation and maintenance data acquisition method and system, electronic equipment and storage medium |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN109714192B (en) | Monitoring method and system for monitoring cloud platform | |
CN109495308B (en) | Automatic operation and maintenance system based on management information system | |
CN111506412B (en) | Airflow-based distributed asynchronous task construction and scheduling system and method | |
CN105357038B (en) | Monitor the method and system of cluster virtual machine | |
KR101683321B1 (en) | Monitoring of distributed applications | |
Zheng et al. | Co-analysis of RAS log and job log on Blue Gene/P | |
US20100223446A1 (en) | Contextual tracing | |
Wu et al. | Zeno: Diagnosing performance problems with temporal provenance | |
CN110895484A (en) | Task scheduling method and device | |
CN110895487B (en) | Distributed Task Scheduling System | |
US7493527B2 (en) | Method for logging diagnostic information | |
CN101719852B (en) | Method and device for monitoring performance of middleware | |
JP2001188765A (en) | Technique for referring to fault information showing plural related fault under distributed computing environment | |
WO2011060642A1 (en) | Automatic test system for distributed comprehensive service and method thereof | |
CN110895488B (en) | Task scheduling method and device | |
CN108243012A (en) | Billing application processing system, method and device in online billing system OCS | |
Demirbaga et al. | Autodiagn: An automated real-time diagnosis framework for big data systems | |
CN113672452A (en) | Method and system for monitoring operation of data acquisition task | |
KR20180037342A (en) | Application software error monitoring, statistics management service and solution method. | |
CN113824601A (en) | Electric power marketing monitored control system based on service log | |
CN110895486B (en) | Distributed Task Scheduling System | |
CN116737560B (en) | Smart training system based on intelligent guidance and control | |
CN118689862A (en) | Heterogeneous database data migration method, device, equipment and storage medium | |
CN118585297A (en) | A task execution method and related equipment | |
CN116610443A (en) | Log collection method, device and readable storage medium |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
WW01 | Invention patent application withdrawn after publication | ||
WW01 | Invention patent application withdrawn after publication |
Application publication date: 20211119 |