Disclosure of Invention
In order to overcome the defect of poor practicability of the existing big data calculation method, the invention provides an extensible big data calculation method based on message combination. The method comprises the steps of firstly establishing an abstract big data computing operation topological structure, then deploying big data computing operation, generating a computing task execution plan in the big data computing operation, scheduling a big data computing task, carrying out load balancing in the computing task execution process, and executing the big data computing task. When the operation is finished, or the execution condition is not met, or the operation is cancelled by the user, the operation is evacuated from the system, and the operation life is finished. Because the topological structure of the computing task and the computing task are divided, the topological structure defines the abstraction of the big data computing operation, the computing task represents the real execution process, and the message transmission is used as a control link between the abstract task and the computing task. And a master-slave structure mode is adopted, and a scheduler on the master node controls the execution of the calculation tasks on each slave cluster node according to the topological structure of the abstract tasks, so that the practicability is good.
The technical scheme adopted by the invention for solving the technical problems is as follows: an extensible big data calculation method based on message combination is characterized by comprising the following steps:
step one, establishing an abstract big data computing operation topological structure: big data computing jobs are first partitioned into a series of small tasks that are combined using a customized sequence, loop, condition selection, and parallel structure. The sequence, loop, condition selection and parallel structure are used to control the small task execution process. And establishing big data operation in a text file mode.
Step two, deployment of big data calculation operation: and the user deploys the computing task corresponding to the abstract task to the cluster node.
Step three, generating a calculation task execution plan in the calculation operation of the big data: the user's abstract computing tasks are submitted to the management node. And the management node checks the grammar and the semantics of the big data calculation definition file, forms an operation plan and stores the operation plan.
Step four, scheduling of big data calculation tasks: the big data computing job engine obtains the task execution plan and forms a message sending sequence. The message sending plan forms different stages according to concurrency, sequence and condition relation, and messages in the same stage are sent simultaneously. And after the reply message of the previous stage arrives, starting the message transmission of the next stage.
Step five, calculating load balance of the task execution process: and the task allocation module optimizes the message sending sequence according to the dependency relationship among the tasks and selects the optimal task execution node to start the calculation object. The management node can also perform load balancing according to the load of each working node so as to improve the performance. Each worker node maintains the state of the compute object assigned to it, which in turn is reflected in the message object of the management node.
And step six, executing a big data calculation task: after acquiring parameters required for executing each task, the job engine sends an execution request to the execution node. After receiving the request, the execution node starts to execute the method contained in the calculation object, simultaneously returns the state of the calculation object to the operation engine as R, and the operation engine records the state into the system sharing buffer area. Once the execution of the computing object is completed, the computing object returns the D state or the E state to the main control node through the cluster node, and the computing result is also returned to the main control node. And after the engine of the main control node obtains the calculation result of the calculation object, the state of the abstract object is changed, and the next request is sent.
Seventhly, calculating extinction of operation by big data: when the operation is finished, or the execution condition is not met, or the operation is cancelled by the user, the operation is evacuated from the system, and the operation life is finished.
The invention has the beneficial effects that: the method comprises the steps of firstly establishing an abstract big data computing operation topological structure, then deploying big data computing operation, generating a computing task execution plan in the big data computing operation, scheduling a big data computing task, carrying out load balancing in the computing task execution process, and executing the big data computing task. When the operation is finished, or the execution condition is not met, or the operation is cancelled by the user, the operation is evacuated from the system, and the operation life is finished. Because the topological structure of the computing task and the computing task are divided, the topological structure defines the abstraction of the big data computing operation, the computing task represents the real execution process, and the message transmission is used as a control link between the abstract task and the computing task. And a master-slave structure mode is adopted, and a scheduler on the master node controls the execution of the calculation tasks on each slave cluster node according to the topological structure of the abstract tasks, so that the practicability is good. Tests show that the cost of the IT infrastructure investment of an enterprise is reduced by 40%, and the manual operation cost of the enterprise is reduced by more than 30%.
The present invention will be described in detail below with reference to the accompanying drawings and specific embodiments.
Detailed Description
Reference is made to fig. 1-7. The extensible big data calculation method based on the message combination comprises the following specific steps:
1. definitions related to the invention.
Definition 1. message object: referring to the left part of fig. 1, the message object contains fixed attributes related to parameters, stagein, and stageout, and is an abstraction of a concrete computation. A message object is described as follows:
o=(m,parameter,stagein,stageout,code,state)
where m represents the name of a message used to control the execution of an executable component or module. The parameter is an input parameter to the message. Stagein is a description of the input data that is required when an executable component or module executes. Stageout is the resulting output description of an executable component or module. code and state are output parameters. code is the return value of the received executable component or module. state is the execution state of the message object, o.state ═ { W, R, D }, and when a message is ready to be sent, state is W; after sending a message to the executable component or module, the state is R; after the executable component or module finishes executing, the state is the final state D. If the executable component or module returns correctly, the state is Do, otherwise, the state is De.
Definition 2. computing the object: an executable component or module is called a computing object and may be a complete program or simply a command.
Referring to the right-hand portion of FIG. 1, the computing object is an instance of the mysort class that contains a static method named computer (), and in fact a computing object may contain 1 or k methods, which are determined by the logic of the program, and the state of the message object is the final state after all methods of the computing object have been executed. Of course, if any method in the calculation process is wrong, the final state of the calculation object is De.
After a calculation, the calculation object may generate an output that is used as input for other calculation objects or itself for entering the next calculation.
Define 3. parallel objects: multiple message objects may be triggered to enter a wait state. It is used to control multiple message objects to send messages in parallel, without itself sending messages to the execution component.
Define 4. jump object: jump objects are often used to control the loop execution of certain message objects.
Definition 5. condition object: conditional objects are special cases of concurrent objects. And simultaneously triggering a plurality of abstract objects or state changes of the messages, wherein the message objects meeting the conditions enter a waiting state, and the message objects not meeting the conditions directly enter a final state.
Define 6. the combined structure of the message: large data computing jobs contain a large number of computing objects and thus correspond to a large number of message objects. How to organize these messages is an important issue. Referring to table 1, in order to express the interactive process of the message, a combined structure of messages is defined by using an order, a loop, a condition selection and a parallel, and is used to control the message interaction order of the message objects.
Table 1 message assembly structure
Define 7. big data job: big data computing jobs are typically composed of a large number of executable components. And performing message interaction between the executable components according to a certain rule, and finally realizing the calculation requirement of a user. Therefore, the big data computing job can be abstracted into a set of message interaction, and the sending sequence of the messages is controlled by the combined structure of the messages. For each particular big data computing job, executable computing components may be deployed on the distributed cluster, and a combination of message objects may be deployed on the unified management node.
The combination of the deployed message objects on the unified management node may be described in the form of a text file, referred to as a job definition file. The job definition file includes a message object, a sequence object, a jump object, a branch object, and a parallel object, where the sequence object, the jump object, the branch object, and the parallel object are collectively referred to as a control object. The message objects form a definition file of big data calculation operation through the control object. In the definition file, each message object and control object is identified by a unique string.
The definition file of the big data job is deployed on one management node, and the computing objects are scattered on different working nodes. When the operation is executed, the scheduling engine analyzes the definition file, the message object sends the message to the corresponding calculation object according to the logic relation between the message object and the control object, and the control object determines the sending sequence of the message. When all message objects have sent the message, one calculation is finished. If the user needs to perform the operation again, the message sending can be started again.
2. The invention relates to an implementation structure.
(1) An example of a big data job definition file.
FIG. 2 depicts a graphically represented big job in which 6 message objects are circled with numbers, (p1, p2) is a parallel object, (f1, f2) is a conditional object, and g (3,2) is a jump object.
FIG. 3 is a definition file of the big data computing job, where MSG is the key of the message object, Para is the parallel object key, If is the key of the condition object, which contains an alert condition, which may be set externally or may be the return value expression of the previous object of the If object; and if the alarm condition is met, executing the first branch, otherwise executing the second branch. Goto is a key for jumping objects. Pairs "{ }" are keys representing combinations of split messages.
(2) The structure of the model is calculated.
The transition process of the state of the message object from waiting for W, running R to ending D is called one-time calculation. Fig. 4 shows the execution of a calculation: one calculation process is the change of the message object from the W state to the D state, i.e., (start, W) (s, R) (e, D) sequence. Entering a W state when the message object receives a start event; a message object in the state of W receives the s event, it sends a message call to the computation object, and its own state becomes R. And after receiving the return value of the calculation object, the message object is set to be in a D state. If the return value of the calculation object is within the range specified by the message object, the state is normal end, namely Do state, otherwise, the state is set to error end, namely De state.
A computation object may contain 1 or k methods, which are determined by the logic of the program, and the state of the message object is the final state after all methods of the computation object are executed. Of course, if any method in the calculation process is wrong, the final state of the calculation object is De.
After a calculation, the calculation object may generate an output that is used as input for other calculation objects or itself for entering the next calculation.
FIG. 5 is an example of a calculation of a strong connectivity map, in which the computational model is generally described. Each vertex in fig. 5 has a value. The lines between vertices A, B, C, D represent the edges of the graph and the numbers of the vertices represent the vertex values of the graph. The connectivity of the graph is checked by a policy that propagates the maximum of the vertices to each vertex of the graph. Vertex A, B, C, D represents 4 computing objects, deployed on different worker nodes. S represents A, B, C, D set of message objects located on a management node, the solid lines from S represent message objects in S sending messages to the computation object A, B, C, D, and the dashed lines represent the passing of vertex values when computing. In this computation model, 4 message objects of S are combined in a parallel manner, and 4 concurrent messages arrive at a computation object and represent a computation. The return message is omitted from the figure.
In each calculation process, S sends a message to each calculation object, the calculation objects receive the message, perform calculation and return the result to S, and when the return value of the vertex A, B, C, D is not changed any more, the calculation is finished and the algorithm is terminated. In fig. 5 (a), S sends a calculation message to the calculation object a, and after a is calculated, the maximum value is 6, changes its own value and returns a message; s also sends a message to B, C, D, and the calculation object represented by each vertex has performed a calculation, and its own state becomes fig. 5 (b). The dotted line in fig. 5 represents the input relationship of the calculation object. After 4 calculations, the vertex values represented in (c) and (d) of fig. 5 are no longer changed, and the calculation process is ended, resulting in a connected graph. The method is used for calculating the big data, and after programming is finished, namely after the calculation operation is determined, a user can dynamically expand the calculation scale by modifying the big data operation definition file, for example, more calculation nodes are added, the topological structure among tasks can be controlled, the existing calculation operation is repeatedly utilized, and the flexibility of the topological structure of the big data calculation and the independence of the calculation operation and the description are improved.
FIG. 6 is an overall structure of the computational model, including two types of work units: management node and a plurality of working nodes. The management node is responsible for scheduling of jobs and assignment of tasks. The whole system supports HDFS or runs other storage systems, can also be a local file system, and is mainly used for data persistence.
(3) Inputting/outputting data.
The input data includes a job definition file, a calculation object, and calculation data. The job definition file is written in a text mode and stored in a local file system of the main control node. The calculation object may be a bytecode in JAVA format, or other binary file, or an SQL command, which may be stored in the local file system of the calculation node or in the HDFS file system. The data contained by the computing objects is stored using a local file or distributed file system, but the computing objects on the worker nodes have access to the data.
And the output information of the calculation object is stored in a local disk of the working node or the HDFS. The log information contains the start and end times of the message object and the computation object, state change events, etc.
(4) Management node and worker node.
The management node coordinates the work among the working nodes by sending instructions to the working nodes, and specifically comprises ① starting a new calculation, ② terminating the calculation, ③ calculating the state feedback of the object, ④ inquiring the state and the like.
The working node stores the results of the calculation objects, status flags, and maintains message queues required for the current and next calculations. Each worker node consists of 3 threads:
① A compute thread is used to execute on compute objects in the worker node and it also maintains an outgoing message buffer when the buffer is full, it is either sent over the network to the communication thread or passed directly to the local message resolution thread.
② the communication thread is used to send and receive messages in the buffers and also to simply coordinate messages between the management node and the worker node when a message buffer receives a message it passes the message to the parser thread.
③ the message parser thread parses the message in the input message buffer and the sent message is placed into the accept message queue of the corresponding compute thread for the next execution.
(5) A computing object on a worker node.
Inside the compute () function of a compute object, external input data, configuration files, and the operation of a global object may be accessed. The global object is used for coordinating message and data sharing and statistics and summarization between the management node and the working node. And updating the object mapped locally by the working node at the beginning of each calculation, and informing the end state of the calculation object to the management node when the calculation is finished.
3. Implementation examples of the invention.
The big data calculation model based on message transmission is adopted to improve the K-means calculation process, and the algorithm steps are as follows:
(1) randomly extracting k points from the data to serve as initial clustering centers, and representing each cluster by the centers;
(2) dispersing the original data to m calculation nodes, calculating the distance from the m calculation nodes to the k clustering centers, and classifying the points to the nearest clustering center;
(3) and adjusting the clustering center. Moving the center of the cluster to the geometric center of the cluster;
(4) and (3) repeating the step 2) until the clustering center is not changed any more, and then the algorithm is converged.
Table 2 is the acceleration ratio for the modified K-means algorithm running on different numbers of compute nodes.
TABLE 2 acceleration ratio of the K-means algorithm
FIG. 7 is a process for implementing the K-means refinement algorithm, where task2 is a parallel object, task4 is a jump object, and the others are message objects.