[go: up one dir, main page]

WO2023168937A1 - Data processing method and apparatus, computer device, and readable medium - Google Patents

Data processing method and apparatus, computer device, and readable medium Download PDF

Info

Publication number
WO2023168937A1
WO2023168937A1 PCT/CN2022/123850 CN2022123850W WO2023168937A1 WO 2023168937 A1 WO2023168937 A1 WO 2023168937A1 CN 2022123850 W CN2022123850 W CN 2022123850W WO 2023168937 A1 WO2023168937 A1 WO 2023168937A1
Authority
WO
WIPO (PCT)
Prior art keywords
data
node
working
fragments
nodes
Prior art date
Application number
PCT/CN2022/123850
Other languages
French (fr)
Chinese (zh)
Inventor
张启明
熊先奎
蔡伟博
杨树林
虞红芳
朱炫鹏
姚海东
Original Assignee
中兴通讯股份有限公司
电子科技大学
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by 中兴通讯股份有限公司, 电子科技大学 filed Critical 中兴通讯股份有限公司
Publication of WO2023168937A1 publication Critical patent/WO2023168937A1/en

Links

Images

Classifications

    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/50Allocation of resources, e.g. of the central processing unit [CPU]
    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L41/00Arrangements for maintenance, administration or management of data switching networks, e.g. of packet switching networks
    • H04L41/08Configuration management of networks or network elements
    • H04L41/0803Configuration setting
    • H04L41/0813Configuration setting characterised by the conditions triggering a change of settings
    • H04L41/082Configuration setting characterised by the conditions triggering a change of settings the condition being updates or upgrades of network functionality
    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L41/00Arrangements for maintenance, administration or management of data switching networks, e.g. of packet switching networks
    • H04L41/08Configuration management of networks or network elements
    • H04L41/0894Policy-based network configuration management
    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L43/00Arrangements for monitoring or testing data switching networks
    • H04L43/08Monitoring or testing based on specific metrics, e.g. QoS, energy consumption or environmental parameters
    • H04L43/0876Network utilisation, e.g. volume of load or congestion level
    • H04L43/0888Throughput
    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L67/00Network arrangements or protocols for supporting network services or applications
    • H04L67/01Protocols
    • H04L67/10Protocols in which an application is distributed across nodes in the network
    • H04L67/1001Protocols in which an application is distributed across nodes in the network for accessing one among a plurality of replicated servers
    • H04L67/1004Server selection for load balancing
    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L67/00Network arrangements or protocols for supporting network services or applications
    • H04L67/01Protocols
    • H04L67/10Protocols in which an application is distributed across nodes in the network
    • H04L67/1001Protocols in which an application is distributed across nodes in the network for accessing one among a plurality of replicated servers
    • H04L67/1004Server selection for load balancing
    • H04L67/1008Server selection for load balancing based on parameters of servers, e.g. available memory or workload
    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L67/00Network arrangements or protocols for supporting network services or applications
    • H04L67/01Protocols
    • H04L67/10Protocols in which an application is distributed across nodes in the network
    • H04L67/1097Protocols in which an application is distributed across nodes in the network for distributed storage of data in networks, e.g. transport arrangements for network file system [NFS], storage area networks [SAN] or network attached storage [NAS]

Definitions

  • the present disclosure relates to the field of communication technology, and specifically relates to a data processing method, device, computer equipment and readable medium.
  • Distributed machine learning is a typical business in the computing network scenario. Its model aggregation and update process is a representative example of data aggregation and distribution in the computing network.
  • the transmission scheduling of communication during the aggregation and update process of distributed machine learning models is optimized.
  • the focus of related work is a specific and effective perspective to explore the optimization of communication primitives in the computing network scenario.
  • Distributed machine learning transmission scheduling optimization work can be carried out from the perspective of overlapping computing and communication.
  • a priority-based transmission scheme has been proposed in related technologies, which mainly takes into account the contradiction between the gradient generation order in machine learning and the demand order of global gradients (parameters) in forward propagation, and considers giving higher priority to the front-layer model parameters.
  • this solution also considers the application of fine-grained transmission to improve the parallelism of uplink and downlink utilization.
  • the core idea is that smaller granularity can effectively reduce the waiting delay of the merge operation during the data aggregation process, but the specific design
  • the above method is only limited to fixed-value segmentation of larger-granularity data blocks, and does not consider how to process data blocks with too small a granularity.
  • Related technologies focus on the fact that the parameter sizes of many layers in deep learning neural networks are actually very small, especially CNN (Convolutional Neural Networks, convolutional neural network) networks. Considering other additional overheads in each transmission, in this case if When parameters or gradients are transmitted at layer granularity, the transmission efficiency will become very low.
  • Computing power networks usually require wide area networks to open up communication channels between cross-domain distributed computing power nodes.
  • the inherent heterogeneity, scarcity and dynamic characteristics of wide area network bandwidth lead to the frequent occurrence of bottleneck links, causing the computing power network to carry The communication process of distributed services is significantly blocked and slowed down.
  • a solution using multiple central nodes cannot sense the dynamically changing network status, causing the communication tasks undertaken by each central node to not match its communication capabilities, resulting in low overall communication efficiency.
  • the present disclosure provides a data processing method, device, computer equipment and readable medium.
  • embodiments of the present disclosure provide a data processing method, which is applied to a scheduling node in a computing power network distributed cluster system.
  • the method includes: responding to receiving a network message sent by a working node that carries the working node.
  • the policy update request message of the throughput information updates the global network throughput information according to the network throughput information.
  • the global network throughput information is used to record the network throughput between any two working nodes; after receiving all the work
  • data allocation information is determined based on the global network throughput information, and the data allocation information is used to record the data allocation ratio of each working node; in response to the first preset The threshold, the data allocation information and the historical data allocation information determine that the preset policy update conditions are met, and a policy response message carrying the data allocation information is sent to each of the working nodes.
  • embodiments of the present disclosure also provide a data processing method applied to working nodes in a computing power network distributed cluster system.
  • the method includes: dividing the data to be processed into multiple data shards, wherein, The data amount of each data fragment is less than or equal to the second threshold; the destination receiving node of the data fragment to be allocated is determined according to the locally stored data allocation information, and the locally stored data allocation information is the policy response sent by the scheduling node Data allocation information carried in the message; sending a second notice message and a partial data message carrying the data fragments to be allocated to each of the destination receiving nodes.
  • embodiments of the present disclosure also provide a data processing device applied to a scheduling node in a computing power network distributed cluster system, including a throughput information sensing module, a policy formulation module and a policy publishing module; the throughput information
  • the sensing module is configured to, in response to receiving a policy update request message carrying the network throughput information of the working node sent by the working node, update the global network throughput information according to the network throughput information, the global network throughput
  • the information is used to record the network throughput between any two working nodes; the policy formulation module is used to determine data based on the global network throughput information when receiving policy update request messages sent by all working nodes.
  • the data allocation information is used to record the data allocation ratio of each of the working nodes; the policy issuance module is used to respond to a determination based on the preset first threshold, the data allocation information and historical data allocation information. If the preset policy update conditions are met, a policy response message carrying the data allocation information is sent to each working node.
  • embodiments of the present disclosure also provide a data processing device applied to working nodes in a computing power network distributed cluster system, including a data segmentation module, a destination receiving node determination module and a data distribution module.
  • the data segmentation module The dividing module is used to divide the data to be processed into multiple data fragments, wherein the data amount of each data fragment is less than or equal to the second threshold; the destination receiving node determination module is used to determine the destination according to the locally stored data.
  • the data allocation information determines the destination receiving node of the data fragments to be allocated, and the locally stored data allocation information is the data allocation information carried in the policy response message sent by the scheduling node; the data distribution module is used to provide each The destination receiving node sends a second notice message and a partial data message carrying the data fragments to be allocated.
  • an embodiment of the present disclosure also provides a computer device, including: one or more processors; a storage device on which one or more programs are stored; when the one or more programs are executed by the one or more When executed by multiple processors, the one or more processors implement the data processing method as described above.
  • embodiments of the present disclosure also provide a computer-readable medium on which a computer program is stored, wherein when the program is executed, the data processing method as described above is implemented.
  • Figure 1 is a schematic diagram of the system architecture of an embodiment of the present disclosure
  • Figure 2 is a schematic flowchart of a data processing method with a scheduling node as the execution subject provided by an embodiment of the present disclosure
  • Figure 3 is a schematic flowchart of determining data allocation information provided by an embodiment of the present disclosure
  • Figure 4 is a schematic flowchart of a data processing method with working nodes as execution subjects provided by an embodiment of the present disclosure
  • Figure 5 is a schematic diagram 1 of the process of determining the destination receiving node of the data fragments to be allocated according to an embodiment of the present disclosure
  • Figure 6 is a schematic diagram 2 of the process of determining the destination receiving node of the data fragments to be allocated according to an embodiment of the present disclosure
  • Figure 7 is a schematic flowchart of calculating network throughput between working nodes provided by an embodiment of the present disclosure
  • Figure 8 is a schematic flowchart of data merging provided by an embodiment of the present disclosure.
  • Figure 9 is a schematic flowchart of a data processing method with a working node as the execution subject in a specific example provided by the embodiment of the present disclosure.
  • FIG. 10 is a schematic structural diagram of a data processing device (scheduling node) provided by an embodiment of the present disclosure
  • FIG 11 is a schematic structural diagram of a data processing device (working node) provided by an embodiment of the present disclosure
  • Figure 12 is a schematic structural diagram 2 of a data processing device (working node) provided by an embodiment of the present disclosure
  • Figure 13 is a schematic structural diagram three of a data processing device (working node) provided by an embodiment of the present disclosure
  • Figure 14 is a schematic structural diagram 4 of a data processing device (working node) provided by an embodiment of the present disclosure.
  • Embodiments described herein may be described with reference to plan and/or cross-sectional illustrations, with the aid of idealized schematic illustrations of the present disclosure. Accordingly, example illustrations may be modified based on manufacturing techniques and/or tolerances. Therefore, the embodiments are not limited to those shown in the drawings but include modifications of configurations formed based on the manufacturing process. Accordingly, the regions illustrated in the figures are of a schematic nature and the shapes of the regions shown in the figures are illustrative of the specific shapes of regions of the element and are not intended to be limiting.
  • Embodiments of the present disclosure provide a data processing method, which is applied to a computing power network distributed cluster system.
  • the system includes a scheduling node and multiple working nodes.
  • the scheduling node is responsible for sensing network throughput information. And formulate and publish policies based on network throughput information; worker nodes are responsible for executing specific data transmission control based on policy information.
  • the scheduling node since the scheduling node needs to make decisions based on the throughput information of all other working nodes, the scheduling node is deployed separately on a network device. Worker nodes are deployed to all network devices that actually perform computing tasks.
  • the scheduling node forms a star-shaped logical network with all working nodes, and exchanges scheduling information through the WAN to ensure the smooth progress of global decision-making and control; a logically fully connected network is formed between each working node, and data information is transmitted through the WAN to meet the computing requirements.
  • the data requirements of the task In the embodiment of the present disclosure, working nodes need to send calculated local data on the one hand, and receive global data sent by other working nodes on the other hand. In addition, they also need to perform data aggregation and distribution of local data of other working nodes.
  • the scheduling node includes three functional divisions: network throughput information perception, policy formulation and policy release.
  • the network throughput information-aware partition is responsible for sensing network throughput information between worker nodes that perform distributed tasks.
  • the network throughput information is determined by the worker nodes during data transmission and notified to the scheduling node in the policy update request message.
  • the network throughput information aware partition maintains global network throughput information.
  • the global network throughput information is used to record the network throughput between any two working nodes.
  • the scheduling node receives the policy update request message from the working node, the network throughput information
  • the aware partition uses the latest network throughput information received from the worker node to update the value of the corresponding position in the global network throughput information.
  • the policy formulation partition is the core work area of the scheduling node.
  • the policy release partition is to distribute the performance evaluation results to all worker nodes.
  • the working nodes need to first send a policy update request message to the scheduling node. After the scheduling node receives the policy update request messages from all working nodes, the latest policy will be written. Incoming policy response message is sent to the worker node.
  • the functions of the worker node mainly include policy local update and data transmission control.
  • the working node needs to start a new round of data sending, it needs to send a policy update request message to the scheduling node to ask whether the local sending policy needs to be updated.
  • the working node policy After receiving the policy response message from the scheduling node, the working node policy locally updates the partition and reads the policy in the policy response message, and then re-divides the size of the data blocks sent to different working nodes to minimize the transmission delay and provide future data.
  • the data transmission control partition is responsible for optimizing the granularity of data fragmentation according to the current policy, assigning a sending priority to each fragment according to the internal relationship of the data, and then sending and receiving data.
  • the data transmission control partition is responsible for fragmenting the data and assigning priorities to each fragment, and then sends local data messages (one or more data messages) of different sizes to different working nodes according to established strategies. Fragmentation).
  • the data transmission control partition is responsible for receiving the global data messages merged by the working nodes and performing local subsequent processing.
  • the data transmission control partition is responsible for receiving local partial data from other worker nodes and distributing global data to other worker nodes, where the global data is usually merged from the received local data.
  • Embodiments of the present disclosure provide a data processing method, which is applied to scheduling nodes in a computing power network distributed cluster system. As shown in Figure 2, the data processing method includes steps S21 to S23.
  • Step S21 In response to receiving the policy update request message sent by the working node and carrying the network throughput information of the working node, update the global network throughput information according to the network throughput information.
  • Global network throughput information (THRUPUT_TABLE) is used to record the network throughput between any two working nodes.
  • the network throughput information between working nodes can reflect the communication performance and network status of the working nodes.
  • the policy update request message sent by each working node to the scheduling node carries the network throughput information of the working node, and the scheduling node adjusts the corresponding position of the global network throughput information based on the network throughput information of each working node. renew.
  • Step S22 Upon receiving policy update request messages sent by all working nodes, determine data allocation information based on global network throughput information.
  • Data allocation information (ROPORTION_LIST) is used to record the data allocation ratio of each working node.
  • the data allocation ratio of a working node is the proportion of the data size that the working node is responsible for aggregating and distributing to the total data size.
  • Step S23 In response to determining that the preset policy update conditions are met based on the preset first threshold, data allocation information and historical data allocation information, send a policy response message carrying data allocation information to each working node.
  • the data distribution information (ROPORTION_LIST) is the data processing strategy of the entire computing network, and the historical data distribution information (DICISION_LOG) is used to record the historical data distribution ratio of each working node.
  • the scheduling node determines whether the preset policy update conditions are met based on the preset first threshold (DISTRIBUTION_THRESHOLD), data allocation information (ROPORTION_LIST), and historical data allocation information (DICISION_LOG).
  • DISTRIBUTION_THRESHOLD preset first threshold
  • ROPORTION_LIST data allocation information
  • DIISION_LOG historical data allocation information
  • a data processing method provided by an embodiment of the present disclosure includes: in response to receiving a policy update request message sent by a working node carrying network throughput information of the working node, updating the global network throughput according to the network throughput information information; upon receiving the policy update request message sent by all working nodes, determine the data allocation information based on the global network throughput information; in response to determining that the data distribution information satisfies the requirements based on the preset first threshold, data allocation information, and historical data allocation information. Preset policy update conditions and send policy response messages carrying data allocation information to each working node.
  • the disclosed embodiments can significantly accelerate the data aggregation and distribution process of distributed applications carried by WAN-based computing power networks, thereby effectively reducing the communication time for data aggregation and distribution and improving communication efficiency; based on network throughput sensing technology, through nodes
  • the performance evaluation algorithm dynamically allocates data volumes of different sizes to different aggregation nodes, which can effectively avoid bottleneck links and more effectively utilize high-bandwidth resource links, thereby achieving efficient use of current WAN bandwidth resources.
  • the global network throughput information (THRUPUT_TABLE) is an n*n matrix, and the elements a ij of the matrix are the network throughput from node i to node j, 1 ⁇ i ⁇ n, 1 ⁇ j ⁇ n , n is the total number of working nodes in the distributed cluster system, and the matrix can be expressed as:
  • determining data allocation information based on global network throughput information includes steps S221 to S223.
  • Step S221 Calculate the sum R i of the row elements in the matrix and the sum C j of the column elements in the matrix respectively.
  • R i of each row element in the matrix is calculated, and calculate the sum of the elements in each column of the matrix
  • R 1 a 11 +a 12 +...+a 1n
  • R 2 a 21 +a 22 +...+a 2n
  • C 1 a 11 +a 21 +...+a n1
  • C 2 a 12 +a 22 +...+a n2 .
  • Step S222 Determine the effective throughput of each working node based on the sum of row elements Ri and the sum C j of column elements in the matrix.
  • the effective throughput of working node 1 is MIN(R 1 , C 1 )
  • the effective throughput of working node 2 is MIN(R 2 , C 2 ).
  • Step S223 Determine data allocation information based on the effective throughput of each working node and the minimum value of the effective throughput of each working node.
  • the data allocation information (ROPORTION_LIST) is a set of data allocation proportions b i of each working node.
  • b i represents the proportion of the data scale that the working node i should be responsible for aggregating and distributing to the total data scale.
  • the data allocation information (ROPORTION_LIST) can Expressed as: [b 1 b 2 b 3 ... b n ].
  • the data allocation ratio b i of worker node i can be calculated by the following formula (1):
  • MIN(SUM_C[j]) represents the minimum effective throughput of each working node
  • ROUND represents rounding to the nearest integer
  • SUM_C[i] represents the effective throughput of each working node.
  • the data allocation information (ROPORTION_LIST) can be obtained.
  • satisfying the preset policy update conditions includes: the change rate (CHANGE_RATE) of the data allocation ratio of any working node in the data allocation information (ROPORTION_LIST) is greater than the preset first threshold (DISTRIBUTION_THRESHOLD).
  • the change rate (CHANGE_RATE) of the data allocation ratio of the working node is based on the data allocation ratio (PROPORTION_LIST[i]) of the working node in the data allocation information and the data allocation ratio (DICISION_LOG[i] of the working node in the historical data allocation information (DICISION_LOG) ]) is calculated.
  • the change rate CHANGE_RATE[i] of the data allocation ratio of worker node i is compared with the set first threshold (DISTRIBUTION_THRESHOLD) for triggering policy release. As long as there is a value of CHANGE_RATE[i] greater than The first threshold triggers policy update and release.
  • the change rate CHANGE_RATE[i] of the data allocation ratio of worker node i can be calculated by the following formula (2):
  • DICISION_LOG[i] represents the historical data distribution information of working node i (that is, the historical data distribution ratio of working node i)
  • PROPORTION_LIST[i] represents the data distribution ratio of working node i.
  • the data processing method may further include the following steps: responding to the data allocation information (ROPORTION_LIST) according to the first threshold (DISTRIBUTION_THRESHOLD), ) and historical data allocation information (DICISION_LOG) determine that the preset policy update conditions are met, and for the working nodes whose change rate (CHANGE_RATE) is greater than the first threshold, the historical data allocation information (DICISION_LOG) is updated according to the data allocation information (ROPORTION_LIST). That is to say, when it is determined that a policy update is required, the scheduling node locally updates its historical data allocation information for working nodes with large changes in communication performance.
  • Embodiments of the present disclosure also provide a data processing method, which is applied to working nodes in a computing power network distributed cluster system. As shown in Figure 4, the data processing method includes steps S41 to S43.
  • Step S41 Divide the data to be processed into multiple data fragments, where the data amount of each data fragment is less than or equal to the second threshold.
  • the working node divides the data generated by the calculation based on the second threshold (SLICE_SIZE) to obtain data fragments.
  • the second threshold (SLICE_SIZE) can be determined based on the total amount of data to be processed by the working node and the total number of working nodes.
  • M is the total amount of data to be processed by working node i (working node i is the working node that performs data segmentation)
  • N is the total number of working nodes in the computing power network distributed cluster system
  • is the coefficient
  • 1.2 ⁇ 10 5 .
  • the data to be processed is data with an internal structure
  • the internal structure in the data to be processed is divided into multiple data blocks with a data amount greater than 1/4 of the second threshold, wherein the data amount of each data block is less than or equal to 1/4 the second threshold, and according to the generation order of each data block, each data block is merged to generate data fragments.
  • the data to be processed is data without an internal structure
  • the data to be processed is divided into multiple data fragments according to the second threshold.
  • Step S42 Determine the destination receiving node of the data fragments to be allocated based on locally stored data allocation information.
  • the locally stored data allocation information is the data allocation information carried in the policy response message sent by the scheduling node.
  • the scheduling node publishes a new policy to each working node, that is, the current data allocation information (ROPORTION_LIST) is sent to each working node through a policy response message, and each working node stores the data allocation information (ROPORTION_LIST) locally. ). In this step, each working node determines the destination receiving node of the data fragments to be allocated based on the locally stored data allocation information (ROPORTION_LIST).
  • the current data allocation information ROPORTION_LIST
  • each working node determines the destination receiving node of the data fragments to be allocated based on the locally stored data allocation information (ROPORTION_LIST).
  • Step S43 Send the second notice message and the partial data message carrying the data fragments to be allocated to each destination receiving node.
  • each working node sends all the data fragments that need to be allocated to each destination receiving node to achieve data distribution.
  • the sending thread determines the destination receiving node of the data fragment. If the destination receiving node is itself, it sends the second notice message and the local data message to the service thread of the working node in sequence. If the destination If the receiving node is another working node, the second notice message and the local data message are sent to each destination receiving node in sequence.
  • the data processing method provided by the embodiment of the present disclosure takes into account the certain independence within the distributed application data, and performs better granular transmission by segmenting larger data blocks before data transmission, so that each optimized granularity can Data sharding performs subsequent data operations independently after being collected by each convergence node, thereby alleviating the problem of long-term blocking of all data operations and accelerating the acquisition and utilization of data by applications.
  • the data processing method may also include the following steps: step S42', assigning priorities to each data fragment, And generate a priority list, which includes each of the data fragments arranged from high to low.
  • the working node can assign priority to each data fragment according to the order in which the data fragments are generated (it can also be determined according to specific needs). The data fragment generated first The priority is the lowest, and the last data fragment generated has the highest priority.
  • the worker node maintains a priority queue, and the data fragments marked with priority labels are pushed into the priority queue and enter the waiting state. By adding different priority labels to different data fragments, more important data can be sent first in the sending queue, further accelerating the acquisition and utilization of important data by applications.
  • Steps S41 to S43 and step S42' are processed by the sending thread of the working node. After all the data fragments to be distributed are distributed, the sending thread ends.
  • determining the destination receiving node of the data fragments to be allocated based on locally stored data allocation information includes steps S421 to S424.
  • Step S421 Calculate the sum of data allocation proportions of the working nodes based on the locally stored data allocation information.
  • the sending thread of the working node calculates the sum of each element in the data allocation information (ROPORTION_LIST) (that is, the data allocation ratio of each working node), which is represented by SUM_P.
  • Step S422 Determine whether the sum of the data allocation ratios of the working nodes is greater than or equal to the number of data shards. If so, execute step 423; otherwise, execute step S424.
  • SUM_P determines the relationship between SUM_P and the number S of local data shards. If SUM_P ⁇ S, it means there are fewer data shards, and some working nodes can be selected to distribute data to them; if SUM_P ⁇ S, it means the data shards are small. More, distribute data to all working nodes.
  • Step S423 Determine the working node whose data distribution ratio is greater than or equal to 1 in the data distribution information as the destination receiving node.
  • Step S424 determine all working nodes as destination receiving nodes.
  • the sum of data allocation proportions of working nodes (SUM_P) is greater than or equal to the number of data fragments (S)
  • the number of data fragments to be received by each destination receiving node is the first number
  • the first amount is the difference between the data allocation proportion of each destination receiving node (PROPORTION_LIST[i]) and the data fragmentation adjustment amount (REDUCTION).
  • the data fragmentation adjustment amount (REDUCTION) is based on the sum of the data allocation proportions of the working nodes (SUM_P ), the number of data fragments (S) and the data distribution ratio of each destination receiving node are determined.
  • the difference (P_S) between the sum of the data allocation proportions of each working node (SUM_P) and the number of data fragments (S) is calculated, and the difference (P_S) and the data allocation proportion of each destination receiving node are calculated.
  • the sum of ( ⁇ i PROPORTION_LIST[i]) is represented by SUM_MORE_1, where each destination receiving node is a working node with a data allocation ratio greater than or equal to 1.
  • the data allocation ratio of each destination receiving node (PROPORTION_LIST[i]) and SUM_MORE_1 the data fragmentation adjustment amount (REDUCTION) of each destination receiving node is calculated respectively.
  • the data fragmentation adjustment amount (REDUCTION[i]) of the destination receiving node i can be calculated according to the following formula (3):
  • the number of data fragments to be received by each destination receiving node is the second number, and the second The quantity is determined based on the data distribution ratio of each destination receiving node. It should be noted that when all working nodes are used as destination receiving nodes and all the data fragments to be allocated cannot be fully allocated, the data allocation ratio of each working node in the data allocation information (ROPORTION_LIST) can be calculated again. Data fragments that have not yet been allocated continue to be allocated until all data fragments to be allocated are allocated.
  • PROPORTION_LIST[:j] PROPORTION_LIST[0]+PROPORTION_LIST[1]+...+PROPORTION_LIST[j].
  • the sending thread uses a circular traversal method to allocate the corresponding data fragments to be allocated to the elements in LOOKUP_LIST (that is, all working nodes) in the order in which the data fragments to be allocated are generated (the number of allocated data fragments is Allocation ratio value for the data of the current working node) until all data shards to be allocated are allocated.
  • determining the destination receiving node of the data fragments to be allocated based on locally stored data allocation information includes steps S421' and S422'.
  • Step S421' calculate the load of each working node based on the total amount of data to be processed by each working node and the locally stored data allocation information.
  • the load of worker node i can be calculated according to the following formula: Among them, M is the total amount of data to be processed by working node i, PROPORTION_LIST[i] is the data allocation ratio of working node i, i ⁇ (1,n).
  • Step S422' determine the destination receiving node of the data fragments to be allocated based on the load of each working node and the data volume of the data fragments to be allocated.
  • the difference between the load of working node i and the current data volume of the data shards to be allocated can be calculated according to the following formula (4):
  • INDEX represents the identification of the data fragment currently to be allocated
  • PARAS INDEX is the data amount of the data fragment INDEX
  • LOAD i is the load of working node i.
  • the working node when determining the destination receiving node of each local data fragment, adopts the cycle traversal determination method guided by the idea of bandwidth maximization under minimum competition, which can ensure that the data generation speed is not much faster than the data transmission. speed, so that data can be distributed as early as possible.
  • a simple simple loop arrangement method can be used for simplicity (first determine the number of shards that each working node can be responsible for based on the total number of shards, and then use a single The sequential loop method selects each node as the destination receiving node for the current fragment.
  • Target node arrangement method in sequence first determine the number of data shards that each worker node can be responsible for based on the total number of data shards, and then allocate sufficient data shards to each worker node in the order of the worker nodes).
  • the disclosed embodiment proposes a data fragmentation allocation scheme based on maximizing bandwidth under minimum competition, and periodically allocates a proportional amount of data fragmentation to each working node, which can avoid early blocking of nodes with low communication capabilities under the naive round-robin arrangement method. It can also avoid the problem that the bandwidth resources of each working node cannot be utilized at the same time under the sequential arrangement method of the destination receiving node, so that the overall utilization rate of the network is higher and the transmission process can be completed faster and earlier.
  • a receiving thread and a service thread are also created.
  • the sending thread, receiving thread and service thread can be executed in parallel.
  • the data processing method may also include steps S71 to S72.
  • Step S71 In response to receiving the first notice message and the global data message sent by other working nodes, obtain the global data carried in the global data message.
  • the global data is the data obtained after data processing on the data fragments distributed to the other working nodes. the data obtained.
  • the receiving thread of the working node receives the first notice message and the global data message in sequence, records the reception time t1 of the first notice message and the reception time t2 of the global data message, and obtains the global data in the global data message.
  • the working node performs aggregation and merging processing on the distributed data fragments (local data) to obtain global data, and sends the global data to other working nodes through global data messages.
  • Step S72 Calculate the network throughput between the working node and other working nodes based on the data volume of the global data, the receiving time of the first notice message and the receiving time of the global data message.
  • the receiving thread of the working node determines the network throughput between the working node i and the working node j that sends the global data message according to the following formula (5):
  • S ij is the data amount of global data sent by working node j to working node i.
  • the measurement of network throughput between nodes is determined by locating the start and end time of data transmission combined with the amount of data contained in the data message.
  • the starting time of data transmission is determined by the time when the working node receiving the message receives the notice message
  • the completion time of data transmission is determined by the time when the working node receiving the message receives the data message.
  • This arrangement is for the upper layer timing operation. convenient.
  • a feasible and better solution is to time the timing directly based on the transport layer protocol. Taking TCP (Transmission Control Protocol) as an example, the timestamp time timing can be located based on the ACK (response) confirmation that the entire data stream transmission is completed. .
  • TCP Transmission Control Protocol
  • the timestamp time timing can be located based on the ACK (response) confirmation that the entire data stream transmission is completed.
  • the implementation process of this solution will be more complex and difficult, involving modification of the underlying basic library, making compatibility difficult to achieve, and inconvenient for system integration.
  • the data processing method further includes the following step: in response to receiving all data fragments distributed to this working node, processing all data fragments.
  • the receiving thread of the working node determines whether it has received all data fragments assigned to the working node. If all data fragments are received, all data fragments are processed, that is, subsequent operations on local data are performed, and then the receiving thread End; if not all are received, execute step S71.
  • the data processing method may further include the following steps: in response to receiving the second preview message and the partial data message, obtaining the data fragments carried in the partial data message.
  • the data fragments carried in the partial data message are partial data, which are distributed data fragments.
  • the network throughput between the corresponding working nodes is calculated.
  • the specific implementation method of calculating the network throughput by the service thread of the working node is similar to the specific implementation method of calculating the network throughput by the receiving thread.
  • the difference is that the data amount S ij ′ of the local data is used instead of the data amount S ij of the global data, and the second data amount S ij is used.
  • the reception time t1' of the preview message replaces the reception time t1 of the first preview message, and the reception time t2' of the local data message is used to replace the reception time t2 of the global data message.
  • the data processing method further includes steps S81 to S82.
  • Step S81 For the same data fragment, after receiving the data fragments sent by all working nodes, perform data merging on the data fragments to obtain global data of the data fragments.
  • the service thread of the working node when the service thread of the working node receives the same data fragment sent by all working nodes, it performs data aggregation and other data merging processing on the data fragment to obtain the global data of the data fragment.
  • each working node After receiving the data, the service thread of the working node usually needs to perform a data merging operation to merge the same data shards from all working nodes. It should be noted that this merging operation is optional but not necessary.
  • AllGather( (Global collection) effect each working node can directly distribute it to each working node in turn without any operation after receiving the local data of other working nodes, and complete the broadcast. The working node only plays the role of a central summary.
  • Step S82 Send the first notice message and the global data message to all working nodes.
  • the global data message carries the global data of the data fragments.
  • the service thread of the working node determines whether all the data fragments responsible for this node have been processed. If all processing is completed, the service thread ends; if not all processing is completed, continue to follow steps S81 to S82 to process the data responsible for this working node. Processed in slices.
  • the data processing process of the working node will be described in detail below with reference to Figure 9 and a specific example. As shown in Figure 9, the working node performs data processing process steps 1 to 10.
  • Step 1 After the working node is ready, fragment the data and add priority labels, and send a policy update request message to the scheduling node to request policy update.
  • Step 2 The working node waits to receive the policy response message from the scheduling node. When receiving a policy response message that is not empty, it performs the policy update and then goes to step 3; if the policy response message is empty, it directly executes step 3.
  • Step 3 The working nodes create sending threads, receiving threads and service threads respectively.
  • the sending thread is responsible for sending local data to other nodes for collection respectively.
  • the receiving thread is responsible for receiving global data for the next round of calculation.
  • the service thread is responsible for receiving and merging other working nodes.
  • the local data is sent to obtain global data, and then the global data is distributed to other worker nodes.
  • the sending thread performs steps 4-5 (i.e., the steps in the left column of Figure 9), the receiving thread performs steps 6-7 (i.e., the steps in the middle column of Figure 9), and the service thread performs steps 8-10 (i.e., the steps in the right column of Figure 9). (Steps in the next column), the sending thread, receiving thread and service thread can execute simultaneously.
  • Step 4 The sending thread selects a data fragment and determines its destination receiving node, and then sends the second notice message and the partial data message (carrying the selected data fragment) to the destination receiving node.
  • Step 5 The sending thread determines whether all data fragments have been sent. If so, the sending thread ends, otherwise go to step 4.
  • Step 6 The receiving thread waits to receive the first notice message and global data message from other working nodes. If received in sequence, calculate the throughput. Among them, the throughput between the same nodes needs to be accumulated and averaged, and then go to step 7.
  • Step 7 The receiving thread determines whether all data fragments have been received. If all data fragments are received, subsequent operations on local data are performed, and then the receiving thread ends. Otherwise, go to step 6.
  • Step 8 The service thread waits to receive the second notice message and the local data message from the worker node. If received in sequence, calculate the throughput (the specific process is the same as step 6), and then go to step 9.
  • Step 9 The service thread determines whether it has received the current data fragments sent by all working nodes. If so, it completes the data merging of the data fragments, and then sends the first notice message and the data fragments containing the data to all other working nodes in sequence.
  • the global data message of the merged data that is, global data
  • Step 10 The service thread determines whether all the data fragments responsible for this working node have been processed. If so, the service thread ends, otherwise go to step 8.
  • This disclosed embodiment is based on the premise that the network will not change in a short period of time, and uses the network status of the previous iteration to plan the next iteration strategy. Therefore, by default, the working node must perform the next iteration at the beginning of each round of data transmission. Inquiries for policy updates. Considering the specific usage scenario, if the network status of the environment is relatively stable, you can also perform a policy update query after a certain number of rounds.
  • This disclosed embodiment is based on multi-center communication task load balancing, and treats each working node as a central node by default. In fact, if the actual scenario does not require it, only some working nodes can be selected as central nodes to participate in data sharding aggregation, without Just open a service thread for the working node of the non-central node and calibrate the node at the scheduling node.
  • the embodiments of this disclosure solve the problem of different delays caused by sending large and small granularity data separately by simultaneously segmenting large-granularity data and merging small-granularity data, and use a transmission priority strategy to ensure that more important data is transmitted as early as possible to achieve effective calculations. Overlap with communications.
  • the amount of data sent to each convergence node i.e., the destination receiving node
  • the network throughput is dynamically adjusted according to the network throughput, thereby achieving convergence tasks for real-time network throughput in more common dynamic network and multi-convergence node scenarios.
  • Load balancing avoids the creation of bottleneck links, improves network utilization, and makes dynamic network aggregation and distribution more efficient.
  • the network throughput monitoring mechanism determines the uplink and downlink throughput between nodes in the network during the data transmission process, providing a basis for policy formulation.
  • the strategy formulation optimization algorithm evaluates the communication performance of working nodes based on network uplink and downlink throughput, and dynamically allocates the data it is responsible for to each node to achieve load balancing of aggregation and distribution tasks.
  • Optimize the granularity priority data transmission mechanism to reduce the long blocking time in data merging at larger granularities through granularity refinement, and to avoid the delay caused by the transmission of a single too small granularity data by merging multiple data with too small granularity, and based on the transmission priority
  • This method sends important data earlier, enabling later work to start earlier and accelerating the overall process.
  • the fragmentation allocation method that maximizes bandwidth under minimal competition can further effectively improve the utilization of network bandwidth resources and accelerate the completion of transmission.
  • the embodiment of the present disclosure optimizes transmission scheduling according to the characteristics of the wide area network. Due to the scarcity of WAN bandwidth, it is necessary to optimize transmission efficiency through transmission scheduling; due to the heterogeneity of WAN bandwidth, it is feasible to avoid bottleneck links through appropriate transmission scheduling optimization; due to the bandwidth of WAN Due to the dynamic nature of the network, there is a need to perceive the network status in real time through network awareness and adjust the transmission strategy in a timely manner.
  • the solution of the disclosed embodiment completely covers the three characteristics of the wide area network and realizes highly targeted transmission scheduling optimization.
  • Embodiments of the present disclosure can perceive the current network status by measuring the current network, and on the premise that the network status remains basically stable in a short period of time, the current network status is used to determine the data allocation ratio of each central node, and Use the results of this decision for a period of time in the future. Therefore, using the solutions of the embodiments of the present disclosure to realize data aggregation and distribution of distributed services has significant iterative characteristics, allowing enough time for network sensing and decision-making results to be applied, so that dynamic decision-making can achieve ideal results.
  • embodiments of the present disclosure also provide a data processing device.
  • the data processing device is applied to a scheduling node in a computing power network distributed cluster system and includes a throughput information sensing module 101 , policy formulation module 102 and policy release module 103.
  • the throughput information sensing module 101 is configured to, in response to receiving a policy update request message carrying the network throughput information of the working node sent by the working node, update the global network throughput information according to the network throughput information, the Global network throughput information records the network throughput between any two worker nodes.
  • the policy formulation module 102 is configured to, upon receiving policy update request messages sent by all working nodes, determine data allocation information according to the global network throughput information, and the data allocation information is used to record the data distribution of each working node. Data allocation ratio.
  • the policy issuing module 103 is configured to, in response to determining that the preset policy update conditions are met based on the preset first threshold, the data allocation information and historical data allocation information, send the data allocation information carrying the data to each of the working nodes. policy response message.
  • the global network throughput information is an n*n matrix, and the element a ij of the matrix is the network throughput from node i to node j, 1 ⁇ i ⁇ n, 1 ⁇ j ⁇ n, n is the total number of working nodes in the distributed cluster system; the policy formulation module 102 is used to calculate the sum of the row elements R i in the matrix and the sum C j of the column elements in the matrix respectively; according to the The sum of the row elements R i and the sum of the column elements C j in the matrix are used to determine the effective throughput of each working node; according to the effective throughput of each working node and the effective throughput of each working node Minimum value to determine data allocation information.
  • satisfying the preset policy update condition includes: the change rate of the data allocation ratio of any one of the working nodes in the data allocation information is greater than a preset first threshold; wherein, the working node The change rate of the data allocation ratio is calculated based on the data allocation ratio of the working node in the data allocation information and the data allocation ratio of the working node in the historical data allocation information.
  • the throughput information sensing module 101 is further configured to, after the policy formulation module 102 determines the data allocation information according to the global network throughput information, respond to the data allocation information according to the first threshold, the data allocation information and The historical data allocation information determines that the preset policy update condition is met, and for the working node whose change rate is greater than the first threshold, the historical data allocation information is updated according to the data allocation information.
  • embodiments of the present disclosure also provide a data processing device.
  • the data processing device is applied to working nodes in a computing power network distributed cluster system and includes a data segmentation module 201, a purpose Receiving node determination module 202 and data distribution module 203.
  • the data segmentation module 201 is configured to segment the data to be processed into multiple data fragments, where the data amount of each data fragment is less than or equal to the second threshold.
  • the destination receiving node determination module 202 is configured to determine the destination receiving node of the data fragments to be distributed according to the locally stored data allocation information.
  • the locally stored data allocation information is the data allocation information carried in the policy response message sent by the scheduling node. .
  • the data distribution module 203 is configured to send a second notice message and a partial data message carrying the data fragments to be allocated to each of the destination receiving nodes.
  • the data segmentation module 201 is configured to, when the data to be processed is data with an internal structure, segment the internal structure in the data to be processed whose data amount is greater than 1/4 of the second threshold into Multiple data blocks, where the data amount of each data block is less than or equal to 1/4 of the second threshold; according to the generation order of each of the data blocks, each of the data blocks is sequentially merged to generate data fragments; or, in all If the data to be processed is data without an internal structure, the data to be processed is divided into multiple data fragments according to the second threshold.
  • the data segmentation module 201 is configured to determine the second threshold based on the total amount of data to be processed by the working node and the total number of working nodes.
  • the data processing apparatus further includes a priority module 204.
  • the priority module 204 is used to assign priorities to each of the data fragments and generate priority pairs.
  • the priority list includes each of the data fragments arranged from high to low.
  • the destination receiving node determination module 202 is configured to calculate the sum of data allocation proportions of the working nodes according to the locally stored data allocation information; in response to the sum of the data allocation proportions of the working nodes being greater than or equal to the The number of data fragments, determine the working node with a data allocation ratio greater than or equal to 1 in the data allocation information as the destination receiving node; in response to the sum of the data allocation ratios of the working nodes being less than the number of data fragments, determine All the working nodes are destination receiving nodes.
  • the data distribution module 203 is configured to, when the sum of the data allocation proportions of the working nodes is greater than or equal to the number of the data fragments, distribute the data fragments to be received by each destination receiving node.
  • the number is a first number, and the first number is the difference between the data allocation ratio of each destination receiving node and the data fragmentation adjustment amount.
  • the data fragmentation adjustment amount is based on the data allocation ratio of the working node.
  • each destination receiving node waits for The number of received data fragments is a second number, and the second number is determined according to the data allocation ratio of each destination receiving node.
  • the destination receiving node determination module 202 is configured to calculate the load of each working node according to the total amount of data to be processed by each working node and the locally stored data allocation information; The load of the node and the data volume of the data fragments to be allocated determine the destination receiving node of the data fragments to be allocated.
  • the data processing apparatus further includes a network throughput calculation module 205.
  • the network throughput calculation module 205 is configured to respond to receiving the first preview message and the global data message sent by other working nodes. , obtain the global data carried in the global data message, the global data is the data obtained after data processing of the data fragments distributed to the other working nodes; according to the data volume of the global data, the third The network throughput between the working node and the other working nodes is calculated based on the reception time of a preview message and the reception time of the global data message.
  • the data processing device also includes a data processing module 206.
  • the data processing module 206 is configured to, in response to receiving all data fragments distributed to this working node, process all the data fragments. Data is fragmented for processing.
  • the network throughput calculation module 205 is further configured to, in response to receiving the second notice message and the partial data message, obtain the data fragments carried in the partial data messages; according to the data of the data fragments The network throughput between corresponding working nodes is calculated based on the quantity, the reception time of the second preview message and the reception time of the local data message.
  • the data processing module 206 is also configured to, for the same data fragment, perform data merging on the data fragments to obtain the data after receiving the data fragments sent by all working nodes. Global data of the fragments; sending a first notice message and a global data message to all working nodes, where the global data message carries the global data of the data fragments.
  • the disclosed embodiments are suitable for data aggregation and distribution scenarios in various large-scale or large-volume distributed services carried by WAN-based computing power networks. From the perspective of comprehensive utilization of the solution, the solution of the embodiments of the present disclosure is particularly suitable for global model aggregation of distributed machine learning and federated learning.
  • the model aggregation process and data collection process of the control module based on machine learning algorithms in smart factories, Internet of Vehicles and Cloud VR scenarios can all use the solutions of the embodiments of the present disclosure, such as the collection of large amounts of sensor data in smart factories.
  • edge servers in the Internet of Vehicles need to obtain information from different devices on the roadside (such as traffic lights, cameras, intelligent signs, etc.) for unified scheduling and cloud nodes in Cloud VR scenarios to obtain intermediate data from each working node.
  • Embodiments of the present disclosure also provide a computer device.
  • the computer device includes: one or more processors and a storage device; wherein one or more programs are stored on the storage device.
  • the one or more programs are used by the above-mentioned one When executed by or multiple processors, the above one or more processors implement the data processing method as provided in the foregoing embodiments.
  • Embodiments of the present disclosure also provide a computer-readable medium on which a computer program is stored, wherein when the computer program is executed, the data processing method as provided in the foregoing embodiments is implemented.
  • Such software may be distributed on computer-readable media, which may include computer storage media (or non-transitory media) and communication media (or transitory media).
  • computer storage media includes volatile and nonvolatile media implemented in any method or technology for storage of information such as computer readable instructions, data structures, program modules or other data. removable, removable and non-removable media.
  • Computer storage media includes, but is not limited to, RAM, ROM, EEPROM, flash memory or other memory technology, CD-ROM, Digital Versatile Disk (DVD) or other optical disk storage, magnetic cassettes, tapes, disk storage or other magnetic storage devices, or may Any other medium used to store the desired information and that can be accessed by a computer.
  • communication media typically embodies computer readable instructions, data structures, program modules or other data in a modulated data signal such as a carrier wave or other transport mechanism, and may include any information delivery media .
  • the disclosed embodiments can significantly accelerate the data aggregation and distribution process of distributed applications carried by WAN-based computing power networks, thereby effectively reducing the communication time for data aggregation and distribution and improving communication efficiency; based on network throughput sensing technology, through nodes
  • the performance evaluation algorithm dynamically allocates data volumes of different sizes to different aggregation nodes, which can effectively avoid bottleneck links and more effectively utilize high-bandwidth resource links, thereby achieving efficient use of current WAN bandwidth resources.
  • Example embodiments have been disclosed herein, and although specific terms are employed, they are used and should be interpreted in a general illustrative sense only and not for purpose of limitation. In some instances, it will be apparent to those skilled in the art that features, characteristics and/or elements described in connection with a particular embodiment may be used alone, or may be used in conjunction with other embodiments, unless expressly stated otherwise. Features and/or components used in combination. Accordingly, it will be understood by those skilled in the art that various changes in form and details may be made without departing from the scope of the present disclosure as set forth in the appended claims.

Landscapes

  • Engineering & Computer Science (AREA)
  • Computer Networks & Wireless Communication (AREA)
  • Signal Processing (AREA)
  • Software Systems (AREA)
  • General Engineering & Computer Science (AREA)
  • Theoretical Computer Science (AREA)
  • Physics & Mathematics (AREA)
  • General Physics & Mathematics (AREA)
  • Environmental & Geological Engineering (AREA)
  • Computer Hardware Design (AREA)
  • Data Exchanges In Wide-Area Networks (AREA)

Abstract

Provided are a data processing method, a data processing apparatus, a computer device, and a readable medium. The data processing method comprises: in response to receiving a policy update request message sent by a working node and carrying network throughput information of the working node, updating global network throughput information according to the network throughput information (S21); when policy update request messages sent by all working nodes are received, determining data allocation information according to the global network throughput information (S22); and in response to determining that a preset policy update condition is satisfied according to a preset first threshold, the data allocation information, and historical data allocation information, sending a policy response message carrying data allocation information to each working node (S23).

Description

数据处理方法、装置、计算机设备及可读介质Data processing methods, devices, computer equipment and readable media

相关申请的交叉引用Cross-references to related applications

本申请要求享有2022年03月09日提交的名称为“数据处理方法、装置、计算机设备及可读介质”的中国专利申请CN202210232264.4的优先权,其全部内容通过引用并入本申请中。This application claims the priority of Chinese patent application CN202210232264.4 titled "Data processing method, device, computer equipment and readable medium" submitted on March 9, 2022, the entire content of which is incorporated into this application by reference.

技术领域Technical field

本公开涉及通信技术领域,具体涉及一种数据处理方法、装置、计算机设备及可读介质。The present disclosure relates to the field of communication technology, and specifically relates to a data processing method, device, computer equipment and readable medium.

背景技术Background technique

分布式机器学习是算力网络场景下的典型业务,其模型聚合与更新过程是算力网络中数据汇聚与分发的代表性实例,对分布式机器学习模型聚合与更新过程中通信的传输调度优化相关工作的重点关注是探究算力网络场景下通信原语优化工作的一个具体而有效的视角。分布式机器学习传输调度优化工作可以从计算与通信重叠角度进行。相关技术中提出了基于优先级的传输方案,其主要考虑到机器学习中梯度生成顺序和前向传播中全局梯度(参数)的需求顺序的相悖性,考虑给予前层模型参数更高的优先级以使其更早地被各节点获取,从而更早地开启前向传播。在这个过程中该方案也考虑到了应用细粒度传输来提升对上下行链路利用的并行度,其核心思想在于更小的粒度能有效减少数据汇聚过程中归并操作的等待时延,不过具体设计上仅局限在对较大粒度数据块的定值切分,没有考虑对过小粒度数据块该如何处理。相关技术关注到深度学习神经网络中很多层的参数规模事实上很小,特别是CNN(Convolutional Neural Networks,卷积神经网络)网络,考虑到每次传输中的其它额外开销,这种情况下如果以层为粒度进行参数或梯度的传输,传输效率会变得很低。Distributed machine learning is a typical business in the computing network scenario. Its model aggregation and update process is a representative example of data aggregation and distribution in the computing network. The transmission scheduling of communication during the aggregation and update process of distributed machine learning models is optimized. The focus of related work is a specific and effective perspective to explore the optimization of communication primitives in the computing network scenario. Distributed machine learning transmission scheduling optimization work can be carried out from the perspective of overlapping computing and communication. A priority-based transmission scheme has been proposed in related technologies, which mainly takes into account the contradiction between the gradient generation order in machine learning and the demand order of global gradients (parameters) in forward propagation, and considers giving higher priority to the front-layer model parameters. So that it can be obtained by each node earlier, thus starting forward propagation earlier. In this process, this solution also considers the application of fine-grained transmission to improve the parallelism of uplink and downlink utilization. The core idea is that smaller granularity can effectively reduce the waiting delay of the merge operation during the data aggregation process, but the specific design The above method is only limited to fixed-value segmentation of larger-granularity data blocks, and does not consider how to process data blocks with too small a granularity. Related technologies focus on the fact that the parameter sizes of many layers in deep learning neural networks are actually very small, especially CNN (Convolutional Neural Networks, convolutional neural network) networks. Considering other additional overheads in each transmission, in this case if When parameters or gradients are transmitted at layer granularity, the transmission efficiency will become very low.

关于多中心节点下数据汇聚与分发任务的负载均衡工作,相关技术中,主要针对分布式机器学习PS架构下多服务器的负载均衡问题,通过统计和预测的方法确定各服务器的性能,然后选定一部分服务器参考其性能为其分配一定量的参数。考虑到网络动态变化的随机性,事实上预测的方法很多时候并不能与实际相符,从而导致已掌握的网络信息反而不能很好地被使用。此外,该方案在传输中并没有深入讨论细粒度传输和合并过细粒度的问题。Regarding the load balancing of data aggregation and distribution tasks under multi-center nodes, related technologies mainly focus on the load balancing problem of multi-servers under distributed machine learning PS architecture. The performance of each server is determined through statistical and predictive methods, and then selected Some servers are assigned a certain number of parameters with reference to their performance. Considering the randomness of dynamic changes in the network, in fact, the prediction methods are often not consistent with the actual situation, resulting in the network information that has been mastered not being used well. In addition, this solution does not discuss in depth the issues of fine-grained transmission and merging of fine-grained data during transmission.

算力网络通常需要广域网为其打通跨域分布的算力节点间的通信渠道,而广域网带宽固有的异构性、稀缺性和动态性特点导致瓶颈链路频繁出现,造成算力网络所承载的分布式业务的通信过程被显著阻塞而变慢。相关技术中,采用多中心节点的方案,其无法感知动态变化的网络状态,使得各中心节点承担的通信任务与其通信能力不匹配,从而造成整体通信效率低下。Computing power networks usually require wide area networks to open up communication channels between cross-domain distributed computing power nodes. However, the inherent heterogeneity, scarcity and dynamic characteristics of wide area network bandwidth lead to the frequent occurrence of bottleneck links, causing the computing power network to carry The communication process of distributed services is significantly blocked and slowed down. In the related technology, a solution using multiple central nodes cannot sense the dynamically changing network status, causing the communication tasks undertaken by each central node to not match its communication capabilities, resulting in low overall communication efficiency.

发明内容Contents of the invention

本公开提供一种数据处理方法、装置、计算机设备和可读介质。The present disclosure provides a data processing method, device, computer equipment and readable medium.

第一方面,本公开实施例提供一种数据处理方法,应用于算力网络分布式集群系统中 的调度节点,所述方法包括:响应于接收到工作节点发送的携带有所述工作节点的网络吞吐量信息的策略更新请求消息,根据所述网络吞吐量信息更新全局网络吞吐量信息,所述全局网络吞吐量信息用于记录任意两个工作节点之间的网络吞吐量;在接收到全部工作节点发送的策略更新请求消息的情况下,根据所述全局网络吞吐量信息确定数据分配信息,所述数据分配信息用于记录各所述工作节点的数据分配比例;响应于根据预设的第一阈值、所述数据分配信息和历史数据分配信息确定出满足预设策略更新条件,向各所述工作节点发送携带有所述数据分配信息的策略响应消息。In a first aspect, embodiments of the present disclosure provide a data processing method, which is applied to a scheduling node in a computing power network distributed cluster system. The method includes: responding to receiving a network message sent by a working node that carries the working node. The policy update request message of the throughput information updates the global network throughput information according to the network throughput information. The global network throughput information is used to record the network throughput between any two working nodes; after receiving all the work In the case of a policy update request message sent by a node, data allocation information is determined based on the global network throughput information, and the data allocation information is used to record the data allocation ratio of each working node; in response to the first preset The threshold, the data allocation information and the historical data allocation information determine that the preset policy update conditions are met, and a policy response message carrying the data allocation information is sent to each of the working nodes.

又一方面,本公开实施例还提供一种数据处理方法,应用于算力网络分布式集群系统中的工作节点,所述方法包括:将待处理数据切分为多个数据分片,其中,各所述数据分片的数据量小于或等于第二阈值;根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,所述本地存储的数据分配信息为调度节点发送的策略响应消息中携带的数据分配信息;向各所述目的接收节点发送第二预告消息和携带有所述待分配的数据分片的局部数据消息。On the other hand, embodiments of the present disclosure also provide a data processing method applied to working nodes in a computing power network distributed cluster system. The method includes: dividing the data to be processed into multiple data shards, wherein, The data amount of each data fragment is less than or equal to the second threshold; the destination receiving node of the data fragment to be allocated is determined according to the locally stored data allocation information, and the locally stored data allocation information is the policy response sent by the scheduling node Data allocation information carried in the message; sending a second notice message and a partial data message carrying the data fragments to be allocated to each of the destination receiving nodes.

又一方面,本公开实施例还提供一种数据处理装置,应用于算力网络分布式集群系统中的调度节点,包括吞吐量信息感知模块、策略制定模块和策略发布模块;所述吞吐量信息感知模块用于,响应于接收到工作节点发送的携带有所述工作节点的网络吞吐量信息的策略更新请求消息,根据所述网络吞吐量信息更新全局网络吞吐量信息,所述全局网络吞吐量信息用于记录任意两个工作节点之间的网络吞吐量;所述策略制定模块用于,在接收到全部工作节点发送的策略更新请求消息的情况下,根据所述全局网络吞吐量信息确定数据分配信息,所述数据分配信息用于记录各所述工作节点的数据分配比例;所述策略发布模块用于,响应于根据预设的第一阈值、所述数据分配信息和历史数据分配信息确定出满足预设策略更新条件,向各所述工作节点发送携带有所述数据分配信息的策略响应消息。On the other hand, embodiments of the present disclosure also provide a data processing device applied to a scheduling node in a computing power network distributed cluster system, including a throughput information sensing module, a policy formulation module and a policy publishing module; the throughput information The sensing module is configured to, in response to receiving a policy update request message carrying the network throughput information of the working node sent by the working node, update the global network throughput information according to the network throughput information, the global network throughput The information is used to record the network throughput between any two working nodes; the policy formulation module is used to determine data based on the global network throughput information when receiving policy update request messages sent by all working nodes. Allocation information, the data allocation information is used to record the data allocation ratio of each of the working nodes; the policy issuance module is used to respond to a determination based on the preset first threshold, the data allocation information and historical data allocation information. If the preset policy update conditions are met, a policy response message carrying the data allocation information is sent to each working node.

又一方面,本公开实施例还提供一种数据处理装置,应用于算力网络分布式集群系统中的工作节点,包括数据切分模块、目的接收节点确定模块和数据分发模块,所述数据切分模块用于,将待处理数据切分为多个数据分片,其中,各所述数据分片的数据量小于或等于第二阈值;所述目的接收节点确定模块用于,根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,所述本地存储的数据分配信息为调度节点发送的策略响应消息中携带的数据分配信息;所述数据分发模块用于,向各所述目的接收节点发送第二预告消息和携带有所述待分配的数据分片的局部数据消息。On the other hand, embodiments of the present disclosure also provide a data processing device applied to working nodes in a computing power network distributed cluster system, including a data segmentation module, a destination receiving node determination module and a data distribution module. The data segmentation module The dividing module is used to divide the data to be processed into multiple data fragments, wherein the data amount of each data fragment is less than or equal to the second threshold; the destination receiving node determination module is used to determine the destination according to the locally stored data. The data allocation information determines the destination receiving node of the data fragments to be allocated, and the locally stored data allocation information is the data allocation information carried in the policy response message sent by the scheduling node; the data distribution module is used to provide each The destination receiving node sends a second notice message and a partial data message carrying the data fragments to be allocated.

又一方面,本公开实施例还提供一种计算机设备,包括:一个或多个处理器;存储装置,其上存储有一个或多个程序;当所述一个或多个程序被所述一个或多个处理器执行时,使得所述一个或多个处理器实现如前所述的数据处理方法。In another aspect, an embodiment of the present disclosure also provides a computer device, including: one or more processors; a storage device on which one or more programs are stored; when the one or more programs are executed by the one or more When executed by multiple processors, the one or more processors implement the data processing method as described above.

又一方面,本公开实施例还提供一种计算机可读介质,其上存储有计算机程序,其中,所述程序被执行时实现如前所述的数据处理方法。In another aspect, embodiments of the present disclosure also provide a computer-readable medium on which a computer program is stored, wherein when the program is executed, the data processing method as described above is implemented.

附图说明Description of the drawings

图1为本公开实施例的系统架构示意图;Figure 1 is a schematic diagram of the system architecture of an embodiment of the present disclosure;

图2为本公开实施例提供的以调度节点为执行主体的数据处理方法的流程示意图;Figure 2 is a schematic flowchart of a data processing method with a scheduling node as the execution subject provided by an embodiment of the present disclosure;

图3为本公开实施例提供的确定数据分配信息的流程示意图;Figure 3 is a schematic flowchart of determining data allocation information provided by an embodiment of the present disclosure;

图4为本公开实施例提供的以工作节点为执行主体的数据处理方法的流程示意图;Figure 4 is a schematic flowchart of a data processing method with working nodes as execution subjects provided by an embodiment of the present disclosure;

图5为本公开实施例提供的确定待分配的数据分片的目的接收节点的流程的示意图一;Figure 5 is a schematic diagram 1 of the process of determining the destination receiving node of the data fragments to be allocated according to an embodiment of the present disclosure;

图6为本公开实施例提供的确定待分配的数据分片的目的接收节点的流程的示意图二;Figure 6 is a schematic diagram 2 of the process of determining the destination receiving node of the data fragments to be allocated according to an embodiment of the present disclosure;

图7为本公开实施例提供的计算工作节点间网络吞吐量的流程示意图;Figure 7 is a schematic flowchart of calculating network throughput between working nodes provided by an embodiment of the present disclosure;

图8为本公开实施例提供的数据归并的流程示意图;Figure 8 is a schematic flowchart of data merging provided by an embodiment of the present disclosure;

图9为本公开实施例提供的一具体实例中以工作节点为执行主体的数据处理方法的流程示意图;Figure 9 is a schematic flowchart of a data processing method with a working node as the execution subject in a specific example provided by the embodiment of the present disclosure;

图10为本公开实施例提供的数据处理装置(调度节点)的结构示意图;Figure 10 is a schematic structural diagram of a data processing device (scheduling node) provided by an embodiment of the present disclosure;

图11为本公开实施例提供的数据处理装置(工作节点)的结构示意图一;Figure 11 is a schematic structural diagram of a data processing device (working node) provided by an embodiment of the present disclosure;

图12为本公开实施例提供的数据处理装置(工作节点)的结构示意图二;Figure 12 is a schematic structural diagram 2 of a data processing device (working node) provided by an embodiment of the present disclosure;

图13为本公开实施例提供的数据处理装置(工作节点)的结构示意图三;Figure 13 is a schematic structural diagram three of a data processing device (working node) provided by an embodiment of the present disclosure;

图14为本公开实施例提供的数据处理装置(工作节点)的结构示意图四。Figure 14 is a schematic structural diagram 4 of a data processing device (working node) provided by an embodiment of the present disclosure.

具体实施方式Detailed ways

在下文中将参考附图更充分地描述示例实施例,但是所述示例实施例可以以不同形式来体现且不应当被解释为限于本文阐述的实施例。反之,提供这些实施例的目的在于使本公开透彻和完整,并将使本领域技术人员充分理解本公开的范围。Example embodiments will be described more fully below with reference to the accompanying drawings, which may, however, be embodied in different forms and should not be construed as limited to the embodiments set forth herein. Rather, these embodiments are provided so that this disclosure will be thorough and complete, and will fully understand the scope of the disclosure to those skilled in the art.

如本文所使用的,术语“和/或”包括一个或多个相关列举条目的任何和所有组合。As used herein, the term "and/or" includes any and all combinations of one or more of the associated listed items.

本文所使用的术语仅用于描述特定实施例,且不意欲限制本公开。如本文所使用的,单数形式“一个”和“该”也意欲包括复数形式,除非上下文另外清楚指出。还将理解的是,当本说明书中使用术语“包括”和/或“由……制成”时,指定存在所述特征、整体、步骤、操作、元件和/或组件,但不排除存在或添加一个或多个其他特征、整体、步骤、操作、元件、组件和/或其群组。The terminology used herein is used to describe particular embodiments only and is not intended to limit the disclosure. As used herein, the singular forms "a," "an" and "the" are intended to include the plural forms as well, unless the context clearly indicates otherwise. It will also be understood that when the terms "comprising" and/or "made of" are used in this specification, the presence of stated features, integers, steps, operations, elements and/or components is specified but does not exclude the presence or Add one or more other features, entities, steps, operations, elements, components, and/or groups thereof.

本文所述实施例可借助本公开的理想示意图而参考平面图和/或截面图进行描述。因此,可根据制造技术和/或容限来修改示例图示。因此,实施例不限于附图中所示的实施例,而是包括基于制造工艺而形成的配置的修改。因此,附图中例示的区具有示意性属性,并且图中所示区的形状例示了元件的区的具体形状,但并不旨在是限制性的。Embodiments described herein may be described with reference to plan and/or cross-sectional illustrations, with the aid of idealized schematic illustrations of the present disclosure. Accordingly, example illustrations may be modified based on manufacturing techniques and/or tolerances. Therefore, the embodiments are not limited to those shown in the drawings but include modifications of configurations formed based on the manufacturing process. Accordingly, the regions illustrated in the figures are of a schematic nature and the shapes of the regions shown in the figures are illustrative of the specific shapes of regions of the element and are not intended to be limiting.

除非另外限定,否则本文所用的所有术语(包括技术和科学术语)的含义与本领域普通技术人员通常理解的含义相同。还将理解,诸如那些在常用字典中限定的那些术语应当被解释为具有与其在相关技术以及本公开的背景下的含义一致的含义,且将不解释为具有理想化或过度形式上的含义,除非本文明确如此限定。Unless otherwise defined, all terms (including technical and scientific terms) used herein have the same meaning as commonly understood by one of ordinary skill in the art. It will also be understood that terms such as those defined in commonly used dictionaries should be construed to have meanings consistent with their meanings in the context of the relevant art and the present disclosure, and will not be construed as having idealized or excessive formal meanings, Unless expressly so limited herein.

本公开实施例提供一种数据处理方法,该方法应用于算力网络分布式集群系统,如图1所示,所述系统包括调度节点和多个工作节点,调度节点负责感知网络吞吐量信息,并根据网络吞吐量信息制定和发布策略;工作节点负责根据策略信息执行具体的数据传输控制。在系统部署上,由于调度节点需要根据其他所有工作节点的吞吐量信息做决策,因此调度节点被单独部署在一个网络设备上。工作节点被部署到所有实际执行计算任务的网络设备上。调度节点与所有工作节点组成星形逻辑网络,通过广域网交互调度信息,以保证全局决策与控制的顺利进行;各工作节点之间组成逻辑上的全连接网络,通过广域网传输数据信息,以满足计算任务对数据的需求。在本公开实施例中,工作节点一方面需要发送计算后的局部数据,一方面需要接收其他工作节点发送的全局数据,此外还需要对其他工作节点的局部数据做数据汇聚和分发。Embodiments of the present disclosure provide a data processing method, which is applied to a computing power network distributed cluster system. As shown in Figure 1, the system includes a scheduling node and multiple working nodes. The scheduling node is responsible for sensing network throughput information. And formulate and publish policies based on network throughput information; worker nodes are responsible for executing specific data transmission control based on policy information. In terms of system deployment, since the scheduling node needs to make decisions based on the throughput information of all other working nodes, the scheduling node is deployed separately on a network device. Worker nodes are deployed to all network devices that actually perform computing tasks. The scheduling node forms a star-shaped logical network with all working nodes, and exchanges scheduling information through the WAN to ensure the smooth progress of global decision-making and control; a logically fully connected network is formed between each working node, and data information is transmitted through the WAN to meet the computing requirements. The data requirements of the task. In the embodiment of the present disclosure, working nodes need to send calculated local data on the one hand, and receive global data sent by other working nodes on the other hand. In addition, they also need to perform data aggregation and distribution of local data of other working nodes.

调度节点内部包括网络吞吐信息感知、策略制定和策略发布三个功能分区。网络吞吐信息感知分区负责感知执行分布式任务的工作节点之间的网络吞吐量信息,网络吞吐量信息由工作节点在数据传输时确定,并在策略更新请求消息中告知调度节点。网络吞吐信息感知分区维护着全局网络吞吐量信息,全局网络吞吐量信息用于记录任意两个工作节点之 间的网络吞吐量,当调度节点收到工作节点的策略更新请求消息时,网络吞吐信息感知分区用收到的该工作节点最新网络吞吐量信息更新全局网络吞吐量信息中相应位置的值。策略制定分区是调度节点的核心工作区,它会基于当前全局网络吞吐量信息提供的网络吞吐信息,根据性能评估算法量化评估所有工作节点的通信性能,然后将评估结果与上一轮评估结果对比,如果工作节点的性能发生较大改变,则触发策略重置,需要发布新的策略。策略发布分区的作用是将性能评估结果分发给所有的工作节点。在分布式业务中,为保证所有工作节点策略的同步,需要先由工作节点向调度节点发送策略更新请求消息,待调度节点收到所有工作节点的策略更新请求消息后,再将最新的策略写入策略响应消息,发送给工作节点。The scheduling node includes three functional divisions: network throughput information perception, policy formulation and policy release. The network throughput information-aware partition is responsible for sensing network throughput information between worker nodes that perform distributed tasks. The network throughput information is determined by the worker nodes during data transmission and notified to the scheduling node in the policy update request message. The network throughput information aware partition maintains global network throughput information. The global network throughput information is used to record the network throughput between any two working nodes. When the scheduling node receives the policy update request message from the working node, the network throughput information The aware partition uses the latest network throughput information received from the worker node to update the value of the corresponding position in the global network throughput information. The policy formulation partition is the core work area of the scheduling node. Based on the network throughput information provided by the current global network throughput information, it quantitatively evaluates the communication performance of all working nodes according to the performance evaluation algorithm, and then compares the evaluation results with the previous round of evaluation results. , if the performance of the working node changes significantly, the policy reset is triggered and a new policy needs to be released. The role of the policy release partition is to distribute the performance evaluation results to all worker nodes. In distributed business, in order to ensure the synchronization of the policies of all working nodes, the working nodes need to first send a policy update request message to the scheduling node. After the scheduling node receives the policy update request messages from all working nodes, the latest policy will be written. Incoming policy response message is sent to the worker node.

工作节点的功能主要包括策略本地更新和数据传输控制。当工作节点需要开始新一轮数据发送前,需要先向调度节点发送策略更新请求消息询问本地发送策略是否需要更新。在收到调度节点的策略响应消息后,工作节点策略本地更新分区读取策略响应消息中的策略,然后重新划分发送给不同工作节点的数据块规模,以最小化传输时延,为之后的数据传输控制做准备。数据传输控制分区负责按照当前策略对数据做优化粒度分片,并按照数据内部的关系为每个分片分配发送优先级,然后发送和接收数据。因此,分别在工作节点创建发送线程、接收线程和服务线程执行这部分功能。对于工作节点的发送线程而言,数据传输控制分区负责数据的分片以及给每个分片分配优先级,然后按照既定策略向不同的工作节点发送不同规模的局部数据消息(一个或多个数据分片)。对于接收线程,数据传输控制分区负责接收工作节点归并后的全局数据消息,并进行本地后续处理。而对于工作节点的服务线程,数据传输控制分区负责接收来自其它工作节点的本地局部数据和分发全局数据到其他工作节点,其中全局数据通常由收到的本地数据归并而来。The functions of the worker node mainly include policy local update and data transmission control. When the working node needs to start a new round of data sending, it needs to send a policy update request message to the scheduling node to ask whether the local sending policy needs to be updated. After receiving the policy response message from the scheduling node, the working node policy locally updates the partition and reads the policy in the policy response message, and then re-divides the size of the data blocks sent to different working nodes to minimize the transmission delay and provide future data. Prepare for transmission control. The data transmission control partition is responsible for optimizing the granularity of data fragmentation according to the current policy, assigning a sending priority to each fragment according to the internal relationship of the data, and then sending and receiving data. Therefore, create sending threads, receiving threads and service threads on the working nodes respectively to perform this part of the function. For the sending thread of the working node, the data transmission control partition is responsible for fragmenting the data and assigning priorities to each fragment, and then sends local data messages (one or more data messages) of different sizes to different working nodes according to established strategies. Fragmentation). For the receiving thread, the data transmission control partition is responsible for receiving the global data messages merged by the working nodes and performing local subsequent processing. For the service thread of the worker node, the data transmission control partition is responsible for receiving local partial data from other worker nodes and distributing global data to other worker nodes, where the global data is usually merged from the received local data.

本公开实施例提供一种数据处理方法,所述方法应用于算力网络分布式集群系统中的调度节点,如图2所示,所述数据处理方法包括步骤S21~S23。Embodiments of the present disclosure provide a data processing method, which is applied to scheduling nodes in a computing power network distributed cluster system. As shown in Figure 2, the data processing method includes steps S21 to S23.

步骤S21,响应于接收到工作节点发送的携带有所述工作节点的网络吞吐量信息的策略更新请求消息,根据网络吞吐量信息更新全局网络吞吐量信息。Step S21: In response to receiving the policy update request message sent by the working node and carrying the network throughput information of the working node, update the global network throughput information according to the network throughput information.

全局网络吞吐量信息(THRUPUT_TABLE)用于记录任意两个工作节点之间的网络吞吐量,工作节点之间的网络吞吐量信息能够反映工作节点的通信性能以及网络状态。在本步骤中,各个工作节点向调度节点发送的策略更新请求消息中携带有该工作节点的网络吞吐量信息,调度节点基于各个工作节点的网络吞吐量信息对全局网络吞吐量信息的相应位置进行更新。Global network throughput information (THRUPUT_TABLE) is used to record the network throughput between any two working nodes. The network throughput information between working nodes can reflect the communication performance and network status of the working nodes. In this step, the policy update request message sent by each working node to the scheduling node carries the network throughput information of the working node, and the scheduling node adjusts the corresponding position of the global network throughput information based on the network throughput information of each working node. renew.

步骤S22,在接收到全部工作节点发送的策略更新请求消息的情况下,根据全局网络吞吐量信息确定数据分配信息。Step S22: Upon receiving policy update request messages sent by all working nodes, determine data allocation information based on global network throughput information.

数据分配信息(ROPORTION_LIST)用于记录各工作节点的数据分配比例,工作节点的数据分配比例为工作节点负责汇聚和分发的数据规模占全部数据规模的比例。Data allocation information (ROPORTION_LIST) is used to record the data allocation ratio of each working node. The data allocation ratio of a working node is the proportion of the data size that the working node is responsible for aggregating and distributing to the total data size.

步骤S23,响应于根据预设的第一阈值、数据分配信息和历史数据分配信息确定出满足预设策略更新条件,向各工作节点发送携带有数据分配信息的策略响应消息。Step S23: In response to determining that the preset policy update conditions are met based on the preset first threshold, data allocation information and historical data allocation information, send a policy response message carrying data allocation information to each working node.

数据分配信息(ROPORTION_LIST)即为整个算力网络的数据处理策略,历史数据分配信息(DICISION_LOG)用于记录各工作节点历史的数据分配比例。在本步骤中,调度节点根据预设的第一阈值(DISTRIBUTION_THRESHOLD)、数据分配信息(ROPORTION_LIST)和历史数据分配信息(DICISION_LOG)判断是否满足预设的策略更新条件,在确定需要进行策略更新时,调度节点向各个工作节点发布新的策略,该新的策略即为步骤22得到的数据分配信息(ROPORTION_LIST)。需说明的是,在确定不需要 进行策略更新时,调度节点给每个工作节点返回空的策略响应消息。The data distribution information (ROPORTION_LIST) is the data processing strategy of the entire computing network, and the historical data distribution information (DICISION_LOG) is used to record the historical data distribution ratio of each working node. In this step, the scheduling node determines whether the preset policy update conditions are met based on the preset first threshold (DISTRIBUTION_THRESHOLD), data allocation information (ROPORTION_LIST), and historical data allocation information (DICISION_LOG). When it is determined that a policy update is required, The scheduling node publishes a new strategy to each working node, and the new strategy is the data allocation information (ROPORTION_LIST) obtained in step 22. It should be noted that when it is determined that no policy update is required, the scheduling node returns an empty policy response message to each working node.

本公开实施例提供的数据处理方法,所述方法包括:响应于接收到工作节点发送的携带有所述工作节点的网络吞吐量信息的策略更新请求消息,根据网络吞吐量信息更新全局网络吞吐量信息;在接收到全部工作节点发送的策略更新请求消息的情况下,根据全局网络吞吐量信息确定数据分配信息;响应于根据预设的第一阈值、数据分配信息和历史数据分配信息确定出满足预设策略更新条件,向各工作节点发送携带有数据分配信息的策略响应消息。本公开实施例能够显著加速基于广域网的算力网络所承载的分布式应用的数据汇聚和分发过程,从而有效减少数据汇聚和分发的通信时间,提高通信效率;基于网络吞吐量感知技术,通过节点性能评估算法,动态地为不同汇聚节点分配不同规模的数据量,能够有效规避瓶颈链路,更有效利用高带宽资源链路,从而实现对当前广域网带宽资源的高效利用。A data processing method provided by an embodiment of the present disclosure, the method includes: in response to receiving a policy update request message sent by a working node carrying network throughput information of the working node, updating the global network throughput according to the network throughput information information; upon receiving the policy update request message sent by all working nodes, determine the data allocation information based on the global network throughput information; in response to determining that the data distribution information satisfies the requirements based on the preset first threshold, data allocation information, and historical data allocation information. Preset policy update conditions and send policy response messages carrying data allocation information to each working node. The disclosed embodiments can significantly accelerate the data aggregation and distribution process of distributed applications carried by WAN-based computing power networks, thereby effectively reducing the communication time for data aggregation and distribution and improving communication efficiency; based on network throughput sensing technology, through nodes The performance evaluation algorithm dynamically allocates data volumes of different sizes to different aggregation nodes, which can effectively avoid bottleneck links and more effectively utilize high-bandwidth resource links, thereby achieving efficient use of current WAN bandwidth resources.

在一些实施例中,全局网络吞吐量信息(THRUPUT_TABLE)为n*n的矩阵,所述矩阵的元素a ij为节点i到节点j的网络吞吐量,1≤i≤n,1≤j≤n,n为分布式集群系统中工作节点的总数,所述矩阵可以表示为: In some embodiments, the global network throughput information (THRUPUT_TABLE) is an n*n matrix, and the elements a ij of the matrix are the network throughput from node i to node j, 1≤i≤n, 1≤j≤n , n is the total number of working nodes in the distributed cluster system, and the matrix can be expressed as:

Figure PCTCN2022123850-appb-000001
Figure PCTCN2022123850-appb-000001

其中,a ij(i=j)=0表示节点i自身无网络吞吐,初始时a ij(i≠j)随机。 Among them, a ij (i=j)=0 means that node i itself has no network throughput, and a ij (i≠j) is random initially.

如图3所示,所述根据全局网络吞吐量信息确定数据分配信息,包括步骤S221~S223。As shown in Figure 3, determining data allocation information based on global network throughput information includes steps S221 to S223.

步骤S221,分别计算矩阵中各行元素之和R i以及矩阵中各列元素之和C jStep S221: Calculate the sum R i of the row elements in the matrix and the sum C j of the column elements in the matrix respectively.

在本步骤中,计算矩阵中各行元素之和R i

Figure PCTCN2022123850-appb-000002
并计算矩阵中各列元素之和
Figure PCTCN2022123850-appb-000003
示例性的,R 1=a 11+a 12+…+a 1n,R 2=a 21+a 22+…+a 2n;C 1=a 11+a 21+…+a n1,C 2=a 12+a 22+…+a n2。 In this step, the sum R i of each row element in the matrix is calculated,
Figure PCTCN2022123850-appb-000002
and calculate the sum of the elements in each column of the matrix
Figure PCTCN2022123850-appb-000003
For example, R 1 =a 11 +a 12 +…+a 1n , R 2 =a 21 +a 22 +…+a 2n ; C 1 = a 11 +a 21 +…+a n1 ,C 2 =a 12 +a 22 +…+a n2 .

步骤S222,根据矩阵中各行元素之和R i以及各列元素之和C j,确定各工作节点的有效吞吐量。 Step S222: Determine the effective throughput of each working node based on the sum of row elements Ri and the sum C j of column elements in the matrix.

工作节点i的有效吞吐量为SUM_C[i],SUM_C[i]=MIN(R i,C j),其中,j=i。也就是说,取R i和C j(i=j)中最小值作为工作节点i的有效吞吐量SUM_C[i]。示例性的,工作节点1的有效吞吐量为MIN(R 1,C 1),工作节点2的有效吞吐量为MIN(R 2,C 2)。 The effective throughput of worker node i is SUM_C[i], SUM_C[i]=MIN(R i , C j ), where j=i. That is to say, the minimum value among R i and C j (i=j) is taken as the effective throughput SUM_C[i] of the working node i. For example, the effective throughput of working node 1 is MIN(R 1 , C 1 ), and the effective throughput of working node 2 is MIN(R 2 , C 2 ).

步骤S223,根据各工作节点的有效吞吐量和各工作节点的有效吞吐量的最小值,确定数据分配信息。Step S223: Determine data allocation information based on the effective throughput of each working node and the minimum value of the effective throughput of each working node.

数据分配信息(ROPORTION_LIST)为各工作节点的数据分配比例b i的集合,b i表示工作节点i应负责汇聚和分发的数据规模占全部数据规模的比例,相应的,数据分配信息(ROPORTION_LIST)可以表示为:[b 1 b 2 b 3 ... b n]。 The data allocation information (ROPORTION_LIST) is a set of data allocation proportions b i of each working node. b i represents the proportion of the data scale that the working node i should be responsible for aggregating and distributing to the total data scale. Correspondingly, the data allocation information (ROPORTION_LIST) can Expressed as: [b 1 b 2 b 3 ... b n ].

工作节点i的数据分配比例b i可以通过以下公式(1)计算获得: The data allocation ratio b i of worker node i can be calculated by the following formula (1):

Figure PCTCN2022123850-appb-000004
Figure PCTCN2022123850-appb-000004

其中,MIN(SUM_C[j])表示各工作节点的有效吞吐量的最小值,ROUND表示就近取整,SUM_C[i]表示各工作节点的有效吞吐量。Among them, MIN(SUM_C[j]) represents the minimum effective throughput of each working node, ROUND represents rounding to the nearest integer, and SUM_C[i] represents the effective throughput of each working node.

根据公式(1)得到各个工作节点的数据分配比例之后,即可得到数据分配信息(ROPORTION_LIST)。After obtaining the data allocation ratio of each working node according to formula (1), the data allocation information (ROPORTION_LIST) can be obtained.

在一些实施例中,所述满足预设策略更新条件,包括:数据分配信息(ROPORTION_LIST)中任意一个工作节点的数据分配比例的变化率(CHANGE_RATE)大于预设的第一阈值(DISTRIBUTION_THRESHOLD)。其中,工作节点的数据分配比例的变化率(CHANGE_RATE)根据数据分配信息中工作节点的数据分配比例(PROPORTION_LIST[i])和历史数据分配信息(DICISION_LOG)中工作节点的数据分配比例(DICISION_LOG[i])计算得到。In some embodiments, satisfying the preset policy update conditions includes: the change rate (CHANGE_RATE) of the data allocation ratio of any working node in the data allocation information (ROPORTION_LIST) is greater than the preset first threshold (DISTRIBUTION_THRESHOLD). Among them, the change rate (CHANGE_RATE) of the data allocation ratio of the working node is based on the data allocation ratio (PROPORTION_LIST[i]) of the working node in the data allocation information and the data allocation ratio (DICISION_LOG[i] of the working node in the historical data allocation information (DICISION_LOG) ]) is calculated.

也就是说,按照遍历顺序将工作节点i的数据分配比例的变化率CHANGE_RATE[i]与设定好的触发策略发布的第一阈值(DISTRIBUTION_THRESHOLD)相比较,只要有一个CHANGE_RATE[i]的值大于第一阈值,就触发策略更新及发布。That is to say, according to the traversal order, the change rate CHANGE_RATE[i] of the data allocation ratio of worker node i is compared with the set first threshold (DISTRIBUTION_THRESHOLD) for triggering policy release. As long as there is a value of CHANGE_RATE[i] greater than The first threshold triggers policy update and release.

工作节点i的数据分配比例的变化率CHANGE_RATE[i]可以通过以下公式(2)计算获得:The change rate CHANGE_RATE[i] of the data allocation ratio of worker node i can be calculated by the following formula (2):

Figure PCTCN2022123850-appb-000005
Figure PCTCN2022123850-appb-000005

其中,DICISION_LOG[i]表示工作节点i的历史数据分配信息(即工作节点i的历史数据分配比例),PROPORTION_LIST[i]表示工作节点i的数据分配比例。Among them, DICISION_LOG[i] represents the historical data distribution information of working node i (that is, the historical data distribution ratio of working node i), and PROPORTION_LIST[i] represents the data distribution ratio of working node i.

在一些实施例中,在根据全局网络吞吐量信息确定数据分配信息(即步骤22)之后,所述数据处理方法还可以包括以下步骤:响应于根据第一阈值(DISTRIBUTION_THRESHOLD)、数据分配信息(ROPORTION_LIST)和历史数据分配信息(DICISION_LOG)确定出满足预设策略更新条件,针对变化率(CHANGE_RATE)大于第一阈值的工作节点,根据数据分配信息(ROPORTION_LIST)更新历史数据分配信息(DICISION_LOG)。也就是说,在确定出需要进行策略更新的情况下,调度节点针对通信性能变化较大的工作节点,在本地对其历史数据分配信息进行更新。In some embodiments, after determining the data allocation information according to the global network throughput information (ie, step 22), the data processing method may further include the following steps: responding to the data allocation information (ROPORTION_LIST) according to the first threshold (DISTRIBUTION_THRESHOLD), ) and historical data allocation information (DICISION_LOG) determine that the preset policy update conditions are met, and for the working nodes whose change rate (CHANGE_RATE) is greater than the first threshold, the historical data allocation information (DICISION_LOG) is updated according to the data allocation information (ROPORTION_LIST). That is to say, when it is determined that a policy update is required, the scheduling node locally updates its historical data allocation information for working nodes with large changes in communication performance.

本公开实施例还提供一种数据处理方法,所述方法应用于算力网络分布式集群系统中的工作节点,如图4所示,所述数据处理方法包括步骤S41~S43。Embodiments of the present disclosure also provide a data processing method, which is applied to working nodes in a computing power network distributed cluster system. As shown in Figure 4, the data processing method includes steps S41 to S43.

步骤S41,将待处理数据切分为多个数据分片,其中,各数据分片的数据量小于或等于第二阈值。Step S41: Divide the data to be processed into multiple data fragments, where the data amount of each data fragment is less than or equal to the second threshold.

工作节点基于第二阈值(SLICE_SIZE)分割计算产生的数据,得到数据分片。可以根据工作节点待处理数据的数据总量和工作节点的总数量确定第二阈值(SLICE_SIZE)。在一些实施例中,

Figure PCTCN2022123850-appb-000006
M为工作节点i(工作节点i即为进行数据切分的工作节点)待处理数据的数据总量,N为算力网络分布式集群系统中工作节点的总数量,α为系数,α=1.2×10 5。 The working node divides the data generated by the calculation based on the second threshold (SLICE_SIZE) to obtain data fragments. The second threshold (SLICE_SIZE) can be determined based on the total amount of data to be processed by the working node and the total number of working nodes. In some embodiments,
Figure PCTCN2022123850-appb-000006
M is the total amount of data to be processed by working node i (working node i is the working node that performs data segmentation), N is the total number of working nodes in the computing power network distributed cluster system, α is the coefficient, α = 1.2 ×10 5 .

在本步骤中,针对不同的数据类型,采用不同方式的数据切分方案。在待处理数据为具有内部结构的数据的情况下,将待处理数据中数据量大于1/4第二阈值的内部结构切分 为多个数据块,其中每个数据块的数据量小于或等于1/4第二阈值,并根据各数据块的生成顺序,依次将各数据块合并生成数据分片。在待处理数据为不具有内部结构的数据的情况下,根据第二阈值将待处理数据切分为多个数据分片。In this step, different data segmentation schemes are used for different data types. In the case where the data to be processed is data with an internal structure, the internal structure in the data to be processed is divided into multiple data blocks with a data amount greater than 1/4 of the second threshold, wherein the data amount of each data block is less than or equal to 1/4 the second threshold, and according to the generation order of each data block, each data block is merged to generate data fragments. When the data to be processed is data without an internal structure, the data to be processed is divided into multiple data fragments according to the second threshold.

步骤S42,根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,本地存储的数据分配信息为调度节点发送的策略响应消息中携带的数据分配信息。Step S42: Determine the destination receiving node of the data fragments to be allocated based on locally stored data allocation information. The locally stored data allocation information is the data allocation information carried in the policy response message sent by the scheduling node.

在发生策略更新的情况下,调度节点向各个工作节点发布新的策略,即将当前的数据分配信息(ROPORTION_LIST)通过策略响应消息发送给各个工作节点,各个工作节点在本地存储该数据分配信息(ROPORTION_LIST)。在本步骤中,各工作节点根据本地存储的数据分配信息(ROPORTION_LIST)确定待分配的数据分片的目的接收节点,其具体实现方式后续再详细说明。When a policy update occurs, the scheduling node publishes a new policy to each working node, that is, the current data allocation information (ROPORTION_LIST) is sent to each working node through a policy response message, and each working node stores the data allocation information (ROPORTION_LIST) locally. ). In this step, each working node determines the destination receiving node of the data fragments to be allocated based on the locally stored data allocation information (ROPORTION_LIST). The specific implementation method will be described in detail later.

步骤S43,向各目的接收节点发送第二预告消息和携带有待分配的数据分片的局部数据消息。Step S43: Send the second notice message and the partial data message carrying the data fragments to be allocated to each destination receiving node.

在本步骤中,各工作节点将需要分配的全部数据分片发送给各个目的接收节点,实现数据的分发。对于每个待分配的数据分片,发送线程判断该数据分片的目的接收节点,若目的接收节点为自身,则向本工作节点的服务线程依次发送第二预告消息和局部数据消息,若目的接收节点为其他工作节点,则向各目的接收节点依次发送第二预告消息和局部数据消息。In this step, each working node sends all the data fragments that need to be allocated to each destination receiving node to achieve data distribution. For each data fragment to be allocated, the sending thread determines the destination receiving node of the data fragment. If the destination receiving node is itself, it sends the second notice message and the local data message to the service thread of the working node in sequence. If the destination If the receiving node is another working node, the second notice message and the local data message are sent to each destination receiving node in sequence.

本公开实施例提供的数据处理方法,考虑到分布式应用数据内部具有的一定的独立性,通过在数据传输前对较大数据块的切分来进行更优粒度的传输,以便各优化粒度的数据分片在各汇聚节点收齐后独立进行后续的数据操作,从而缓解全部的数据操作被长期阻塞的问题,加速应用对于数据的获取和利用。The data processing method provided by the embodiment of the present disclosure takes into account the certain independence within the distributed application data, and performs better granular transmission by segmenting larger data blocks before data transmission, so that each optimized granularity can Data sharding performs subsequent data operations independently after being collected by each convergence node, thereby alleviating the problem of long-term blocking of all data operations and accelerating the acquisition and utilization of data by applications.

在一些实施例中,在将待处理数据切分为多个数据分片(即步骤S41)之后,所述数据处理方法还可以包括以下步骤:步骤S42’,为各数据分片分配优先级,并生成优先级对列,优先级对列包括由高到低排列的各所述数据分片。In some embodiments, after dividing the data to be processed into multiple data fragments (ie step S41), the data processing method may also include the following steps: step S42', assigning priorities to each data fragment, And generate a priority list, which includes each of the data fragments arranged from high to low.

在本步骤中,为各数据分片添加优先级标签,工作节点可以按照数据分片产生的先后顺序为数各据分片分配优先级(也可以视具体需要而定),先产生的数据分片优先级最低,最后产生的数据分片优先级最高。工作节点维护一个优先级队列,标记好优先级标签的数据分片被推入优先级队列进入等待状态。通过对不同数据分片添加不同的优先级标签,实现在发送队列中对更重要数据的优先发送,进一步加速应用对重要数据的获取和利用。In this step, add a priority label to each data fragment. The working node can assign priority to each data fragment according to the order in which the data fragments are generated (it can also be determined according to specific needs). The data fragment generated first The priority is the lowest, and the last data fragment generated has the highest priority. The worker node maintains a priority queue, and the data fragments marked with priority labels are pushed into the priority queue and enter the waiting state. By adding different priority labels to different data fragments, more important data can be sent first in the sending queue, further accelerating the acquisition and utilization of important data by applications.

步骤S41~S43及步骤S42’由工作节点的发送线程处理,在将待分配的数据分片全部分发完成之后,发送线程结束。Steps S41 to S43 and step S42' are processed by the sending thread of the working node. After all the data fragments to be distributed are distributed, the sending thread ends.

以下分别结合图5和图6,对确定待分配的数据分片的目的接收节点的两种具体实现方式分别进行说明。Two specific implementation methods for determining the destination receiving node of the data fragments to be distributed will be described below with reference to Figures 5 and 6 respectively.

在一些实施例中,如图5所示,所述根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点(即步骤S42),包括步骤S421~S424。In some embodiments, as shown in Figure 5, determining the destination receiving node of the data fragments to be allocated based on locally stored data allocation information (ie, step S42) includes steps S421 to S424.

步骤S421,根据本地存储的数据分配信息,计算工作节点的数据分配比例之和。Step S421: Calculate the sum of data allocation proportions of the working nodes based on the locally stored data allocation information.

在本步骤中,工作节点的发送线程计算数据分配信息(ROPORTION_LIST)中各元素(即各工作节点的数据分配比例)之和,用SUM_P表示。In this step, the sending thread of the working node calculates the sum of each element in the data allocation information (ROPORTION_LIST) (that is, the data allocation ratio of each working node), which is represented by SUM_P.

步骤S422,判断工作节点的数据分配比例之和是否大于或等于数据分片的数量,若是,则执行步骤423,否则,执行步骤S424。Step S422: Determine whether the sum of the data allocation ratios of the working nodes is greater than or equal to the number of data shards. If so, execute step 423; otherwise, execute step S424.

在本步骤中,判断SUM_P与本地数据分片的数量S的大小关系,若SUM_P≥S,说明数据分片较少,可以选择部分工作节点向其分发数据;若SUM_P<S,说明数据分片较多, 向全部的工作节点分发数据。In this step, determine the relationship between SUM_P and the number S of local data shards. If SUM_P ≥ S, it means there are fewer data shards, and some working nodes can be selected to distribute data to them; if SUM_P < S, it means the data shards are small. More, distribute data to all working nodes.

步骤S423,确定数据分配信息中数据分配比例大于或等于1的工作节点为目的接收节点。Step S423: Determine the working node whose data distribution ratio is greater than or equal to 1 in the data distribution information as the destination receiving node.

在SUM_P≥S的情况下,根据数据分配信息(ROPORTION_LIST),确定数据分配比例>1的工作节点,将这些工作节点作为目的接收节点。In the case of SUM_P≥S, according to the data allocation information (ROPORTION_LIST), determine the working nodes with data allocation ratio > 1, and use these working nodes as the destination receiving nodes.

步骤S424,确定全部工作节点为目的接收节点。Step S424, determine all working nodes as destination receiving nodes.

在SUM_P<S的情况下,将全部工作节点为目的接收节点。In the case of SUM_P<S, all working nodes are used as destination receiving nodes.

在一些实施例中,在工作节点的数据分配比例之和(SUM_P)大于或等于数据分片的数量(S)的情况下,各目的接收节点待接收的数据分片的数量为第一数量,第一数量为各目的接收节点的数据分配比例(PROPORTION_LIST[i])与数据分片调整量(REDUCTION)的差值,数据分片调整量(REDUCTION)根据工作节点的数据分配比例之和(SUM_P)、数据分片的数量(S)和各目的接收节点的数据分配比例确定。In some embodiments, when the sum of data allocation proportions of working nodes (SUM_P) is greater than or equal to the number of data fragments (S), the number of data fragments to be received by each destination receiving node is the first number, The first amount is the difference between the data allocation proportion of each destination receiving node (PROPORTION_LIST[i]) and the data fragmentation adjustment amount (REDUCTION). The data fragmentation adjustment amount (REDUCTION) is based on the sum of the data allocation proportions of the working nodes (SUM_P ), the number of data fragments (S) and the data distribution ratio of each destination receiving node are determined.

在一些实施例中,计算各工作节点数据分配比例之和(SUM_P)与数据分片的数量(S)的差值(P_S),并计算差值(P_S)与各目的接收节点的数据分配比例之和(∑ iPROPORTION_LIST[i])的和,用SUM_MORE_1表示,其中,各目的接收节点即为数据分配比例大于或等于1的工作节点。根据差值(P_S)、各目的接收节点的数据分配比例(PROPORTION_LIST[i])和SUM_MORE_1,分别计算各目的接收节点的数据分片调整量(REDUCTION)。各目的接收节点待接收的数据分片的第一数量为各目的接收节点的数据分配比例(PROPORTION_LIST[i])与各目的接收节点的数据分片调整量(REDUCTION[i])的差值,即目的接收节点i待接收的数据分片的第一数量=PROPORTION_LIST[i]-REDUCTION[i]。 In some embodiments, the difference (P_S) between the sum of the data allocation proportions of each working node (SUM_P) and the number of data fragments (S) is calculated, and the difference (P_S) and the data allocation proportion of each destination receiving node are calculated. The sum of (∑ i PROPORTION_LIST[i]) is represented by SUM_MORE_1, where each destination receiving node is a working node with a data allocation ratio greater than or equal to 1. Based on the difference (P_S), the data allocation ratio of each destination receiving node (PROPORTION_LIST[i]) and SUM_MORE_1, the data fragmentation adjustment amount (REDUCTION) of each destination receiving node is calculated respectively. The first number of data fragments to be received by each destination receiving node is the difference between the data allocation ratio of each destination receiving node (PROPORTION_LIST[i]) and the data fragmentation adjustment amount of each destination receiving node (REDUCTION[i]), That is, the first number of data fragments to be received by the destination receiving node i = PROPORTION_LIST[i]-REDUCTION[i].

其中,目的接收节点i的数据分片调整量(REDUCTION[i])可以根据以下公式(3)计算:Among them, the data fragmentation adjustment amount (REDUCTION[i]) of the destination receiving node i can be calculated according to the following formula (3):

REDUCTION[i]=CEIL(P_S*PROPORTION_LIST[i]/SUM_MORE_1)  (3)REDUCTION[i]=CEIL(P_S*PROPORTION_LIST[i]/SUM_MORE_1) (3)

在一些实施例中,在工作节点的数据分配比例之和(SUM_P)小于数据分片的数量(S)的情况下,各目的接收节点待接收的数据分片的数量为第二数量,第二数量根据各目的接收节点的数据分配比例确定。需要说明的是,在将全部工作节点都作为目的接收节点也无法将全部待分配的数据分片完全分配完成的情况下,可以再次按照数据分配信息(ROPORTION_LIST)中各工作节点的数据分配比例对尚未分配的数据分片继续进行分配,直到全部待分配的数据分片均分配完成为止。In some embodiments, when the sum of data allocation proportions (SUM_P) of the working nodes is less than the number of data fragments (S), the number of data fragments to be received by each destination receiving node is the second number, and the second The quantity is determined based on the data distribution ratio of each destination receiving node. It should be noted that when all working nodes are used as destination receiving nodes and all the data fragments to be allocated cannot be fully allocated, the data allocation ratio of each working node in the data allocation information (ROPORTION_LIST) can be calculated again. Data fragments that have not yet been allocated continue to be allocated until all data fragments to be allocated are allocated.

示例性的,可以设置目的接收节点查询表LOOKUP_LIST[INDEX i]=i,其中,INDEX i∈[BEGIN i,END i],i∈(1,n)。 For example, the destination receiving node lookup table LOOKUP_LIST[INDEX i ]=i can be set, where INDEX i∈ [ BEGINi , ENDi ], i∈(1,n).

BEGIN={0,PROPORTION_LIST[0],PROPORTION_LIST[:1],…,PROPORTION_LIST[:i-1],…,PROPORTION_LIST[:n-1]};BEGIN={0,PROPORTION_LIST[0],PROPORTION_LIST[:1],…,PROPORTION_LIST[:i-1],…,PROPORTION_LIST[:n-1]};

END={PROPORTION_LIST[0]-1,PROPORTION_LIST[:1]-1,…,PROPORTION_LIST[:i]-1,…,PROPORTION_LIST[:n]-1}。END={PROPORTION_LIST[0]-1,PROPORTION_LIST[:1]-1,…,PROPORTION_LIST[:i]-1,…,PROPORTION_LIST[:n]-1}.

其中,PROPORTION_LIST[:j]=PROPORTION_LIST[0]+PROPORTION_LIST[1]+…+PROPORTION_LIST[j]。Among them, PROPORTION_LIST[:j]=PROPORTION_LIST[0]+PROPORTION_LIST[1]+…+PROPORTION_LIST[j].

发送线程以循环遍历的方式,按照待分配的数据分片产生的顺序,为LOOKUP_LIST 中的元素(即为全部工作节点)依次分配相应待分配的数据分片(所分配的数据分片的数量即为当前工作节点的数据分配比例值),直到所有待分配的数据分片都分配完毕。The sending thread uses a circular traversal method to allocate the corresponding data fragments to be allocated to the elements in LOOKUP_LIST (that is, all working nodes) in the order in which the data fragments to be allocated are generated (the number of allocated data fragments is Allocation ratio value for the data of the current working node) until all data shards to be allocated are allocated.

在一些实施例中,如图6所示,所述根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点(即步骤S42),包括步骤S421’和S422’。In some embodiments, as shown in Figure 6, determining the destination receiving node of the data fragments to be allocated based on locally stored data allocation information (ie, step S42) includes steps S421' and S422'.

步骤S421’,根据各工作节点待处理数据的数据总量和本地存储的数据分配信息,计算各工作节点的负载。Step S421', calculate the load of each working node based on the total amount of data to be processed by each working node and the locally stored data allocation information.

在本步骤中,可以根据以下公式计算工作节点i的负载:

Figure PCTCN2022123850-appb-000007
其中,M为工作节点i待处理数据的数据总量,PROPORTION_LIST[i]为工作节点i的数据分配比例,i∈(1,n)。 In this step, the load of worker node i can be calculated according to the following formula:
Figure PCTCN2022123850-appb-000007
Among them, M is the total amount of data to be processed by working node i, PROPORTION_LIST[i] is the data allocation ratio of working node i, i∈(1,n).

步骤S422’,根据各工作节点的负载和待分配的数据分片的数据量,确定待分配的数据分片的目的接收节点。Step S422', determine the destination receiving node of the data fragments to be allocated based on the load of each working node and the data volume of the data fragments to be allocated.

在本步骤中,分别计算各工作节点的负载与当前待分配的数据分片的数据量的差值,并确定各个差值的最大值MAX(LOAD i-PARAS INDEX),将该最大值对应的工作节点作为待分配的数据分片的目的接收节点。 In this step, calculate the difference between the load of each working node and the data volume of the current data shards to be allocated, and determine the maximum value MAX (LOAD i -PARAS INDEX ) of each difference, and put the maximum value corresponding to The working node serves as the destination receiving node for the data fragments to be distributed.

可以根据以下公式(4)计算工作节点i的负载与当前待分配数据分片的数据量的差值:The difference between the load of working node i and the current data volume of the data shards to be allocated can be calculated according to the following formula (4):

差值 i=LOAD i-PARAS INDEX   (4) Difference i =LOAD i -PARAS INDEX (4)

其中,INDEX表示当前待分配的数据分片的标识,PARAS INDEX为数据分片INDEX的数据量,LOAD i为工作节点i的负载。 Among them, INDEX represents the identification of the data fragment currently to be allocated, PARAS INDEX is the data amount of the data fragment INDEX, and LOAD i is the load of working node i.

本公开实施例中,工作节点在确定本地每个数据分片的目的接收节点时采用最小竞争下带宽最大化思想指导下的循环遍历确定法,可以保证在数据的生成速度不是远大于数据的发送速度的情况下,使数据尽早被分发出去。但如果实际中数据的生成速度远大于数据的发送速度,事实上为了简化也可以采用简单的朴素循环安排方法(先基于总分片数确定每个工作节点能负责的分片数,再以单次顺序循环方式依次选各节点为当前分片的目的接收节点,当某个节点的累计安排分片数达到了其能负责的分片数,则在下次循环过程中跳过该节点),以及目标节点依次安排方法(先基于数据分片总数确定每个工作节点能负责的数据分片的数量,然后按照工作节点顺序为每个工作节点分配足量的数据分片)。In this disclosed embodiment, when determining the destination receiving node of each local data fragment, the working node adopts the cycle traversal determination method guided by the idea of bandwidth maximization under minimum competition, which can ensure that the data generation speed is not much faster than the data transmission. speed, so that data can be distributed as early as possible. However, if the actual data generation speed is much greater than the data sending speed, in fact, a simple simple loop arrangement method can be used for simplicity (first determine the number of shards that each working node can be responsible for based on the total number of shards, and then use a single The sequential loop method selects each node as the destination receiving node for the current fragment. When the cumulative number of scheduled fragments of a node reaches the number of fragments it can be responsible for, the node will be skipped in the next cycle), and Target node arrangement method in sequence (first determine the number of data shards that each worker node can be responsible for based on the total number of data shards, and then allocate sufficient data shards to each worker node in the order of the worker nodes).

本公开实施例提出了基于最小竞争下带宽最大化的数据分片分配方案,周期性地为各工作节点分配成比例的数据分片量,可以避免朴素循环安排方法下的低通信能力节点前期阻塞的问题,也可以避免目的接收节点依次安排方法下各工作节点带宽资源不能被同时利用的问题,从而使网络整体的利用率更高,传输过程能更快更早完成。The disclosed embodiment proposes a data fragmentation allocation scheme based on maximizing bandwidth under minimum competition, and periodically allocates a proportional amount of data fragmentation to each working node, which can avoid early blocking of nodes with low communication capabilities under the naive round-robin arrangement method. It can also avoid the problem that the bandwidth resources of each working node cannot be utilized at the same time under the sequential arrangement method of the destination receiving node, so that the overall utilization rate of the network is higher and the transmission process can be completed faster and earlier.

针对每个工作节点而言,除了创建发送线程之外,还创建接收线程和服务线程,发送线程、接收线程和服务线程可以并行执行。For each worker node, in addition to creating a sending thread, a receiving thread and a service thread are also created. The sending thread, receiving thread and service thread can be executed in parallel.

以下对接收线程的处理过程进行详细说明。如图7所示,所述数据处理方法还可以包括步骤S71~S72。The processing process of the receiving thread is described in detail below. As shown in Figure 7, the data processing method may also include steps S71 to S72.

步骤S71,响应于接收到其他工作节点发送的第一预告消息和全局数据消息,获取全局数据消息中携带的全局数据,全局数据为对分发给所述其他工作节点的数据分片进行数据处理后得到的数据。Step S71: In response to receiving the first notice message and the global data message sent by other working nodes, obtain the global data carried in the global data message. The global data is the data obtained after data processing on the data fragments distributed to the other working nodes. the data obtained.

在本步骤中,工作节点的接收线程依次接收第一预告消息和全局数据消息,记录第一预告消息的接收时间t1和全局数据消息的接收时间t2,并获取全局数据消息中的全局数据。需要说明的是,工作节点针对分发下来的数据分片(局部数据)进行聚合等归并处理得到 全局数据,并将全局数据通过全局数据消息发送给其他工作节点。In this step, the receiving thread of the working node receives the first notice message and the global data message in sequence, records the reception time t1 of the first notice message and the reception time t2 of the global data message, and obtains the global data in the global data message. It should be noted that the working node performs aggregation and merging processing on the distributed data fragments (local data) to obtain global data, and sends the global data to other working nodes through global data messages.

步骤S72,根据全局数据的数据量、第一预告消息的接收时间和全局数据消息的接收时间,计算工作节点与其他工作节点之间的网络吞吐量。Step S72: Calculate the network throughput between the working node and other working nodes based on the data volume of the global data, the receiving time of the first notice message and the receiving time of the global data message.

工作节点的接收线程根据以下公式(5)工作节点i与发送全局数据消息的工作节点j之间的网络吞吐量:The receiving thread of the working node determines the network throughput between the working node i and the working node j that sends the global data message according to the following formula (5):

Figure PCTCN2022123850-appb-000008
Figure PCTCN2022123850-appb-000008

其中,S ij为工作节点j发送给工作节点i的全局数据的数据量。 Among them, S ij is the data amount of global data sent by working node j to working node i.

需要说明的是,相同节点之间的网络吞吐量需要累积计算平均值。It should be noted that the network throughput between the same nodes needs to be accumulated and averaged.

在本公开实施例中,节点间网络吞吐量的测量是通过定位数据传输的起止时间再结合数据消息所含数据量来确定的。其中,数据传输的起始时间由接收消息的工作节点收到预告消息的时间确定,数据传输的完成时间由接收消息的工作节点收到数据消息的时间确定,这样的安排是为了上层计时操作的方便。一种可行的更优方案是直接基于传输层的协议来计时,以TCP(Transmission Control Protocol,传输控制协议)为例,可以基于整个数据流传输完成的ACK(响应)确认来定位时间戳时间计时。但该方案实现过程会更复杂,难度会更大,涉及底层基础库的修改,兼容性较难达成,也不方便系统集成。In the embodiment of the present disclosure, the measurement of network throughput between nodes is determined by locating the start and end time of data transmission combined with the amount of data contained in the data message. Among them, the starting time of data transmission is determined by the time when the working node receiving the message receives the notice message, and the completion time of data transmission is determined by the time when the working node receiving the message receives the data message. This arrangement is for the upper layer timing operation. convenient. A feasible and better solution is to time the timing directly based on the transport layer protocol. Taking TCP (Transmission Control Protocol) as an example, the timestamp time timing can be located based on the ACK (response) confirmation that the entire data stream transmission is completed. . However, the implementation process of this solution will be more complex and difficult, involving modification of the underlying basic library, making compatibility difficult to achieve, and inconvenient for system integration.

在一些实施例中,所述数据处理方法还包括以下步骤:响应于接收到分发给本工作节点的全部数据分片,对全部数据分片进行处理。在本步骤中,工作节点的接收线程判断是否接收到分配给该工作节点的所有数据分片,若全部收到,则对全部数据分片进行处理,即执行本地数据的后续操作,然后接收线程结束;若未全部收到,则执行步骤S71。In some embodiments, the data processing method further includes the following step: in response to receiving all data fragments distributed to this working node, processing all data fragments. In this step, the receiving thread of the working node determines whether it has received all data fragments assigned to the working node. If all data fragments are received, all data fragments are processed, that is, subsequent operations on local data are performed, and then the receiving thread End; if not all are received, execute step S71.

以下对服务线程的处理过程进行详细说明。The following describes the processing process of the service thread in detail.

所述数据处理方法还可以包括以下步骤:响应于接收到第二预告消息和局部数据消息,获取局部数据消息中携带的数据分片。局部数据消息中携带的数据分片即为局部数据,是分发下来的数据分片。根据数据分片的数据量S ij′、第二预告消息的接收时间t1’和局部数据消息的接收时间t2’,计算相应工作节点之间的网络吞吐量。工作节点的服务线程计算网络吞吐量的具体实现方式与接收线程计算网络吞吐量的具体实现方式类似,区别在于:利用局部数据的数据量S ij′代替全局数据的数据量S ij,利用第二预告消息的接收时间t1’代替第一预告消息的接收时间t1,利用局部数据消息的接收时间t2’代替全局数据消息的接收时间t2。 The data processing method may further include the following steps: in response to receiving the second preview message and the partial data message, obtaining the data fragments carried in the partial data message. The data fragments carried in the partial data message are partial data, which are distributed data fragments. According to the data amount S ij ′ of the data fragmentation, the reception time t1' of the second notice message and the reception time t2' of the local data message, the network throughput between the corresponding working nodes is calculated. The specific implementation method of calculating the network throughput by the service thread of the working node is similar to the specific implementation method of calculating the network throughput by the receiving thread. The difference is that the data amount S ij ′ of the local data is used instead of the data amount S ij of the global data, and the second data amount S ij is used. The reception time t1' of the preview message replaces the reception time t1 of the first preview message, and the reception time t2' of the local data message is used to replace the reception time t2 of the global data message.

在一些实施例中,如图8所示,所述数据处理方法还包括步骤S81~S82。In some embodiments, as shown in Figure 8, the data processing method further includes steps S81 to S82.

步骤S81,针对同一数据分片,在接收到所有工作节点发送的所述数据分片的情况下,对所述数据分片进行数据归并得到所述数据分片的全局数据。Step S81: For the same data fragment, after receiving the data fragments sent by all working nodes, perform data merging on the data fragments to obtain global data of the data fragments.

在本步骤中,工作节点的服务线程在接收到所有工作节点发送的同一数据分片的情况下,针对该数据分片进行数据聚合等数据归并处理,得到该数据分片的全局数据。In this step, when the service thread of the working node receives the same data fragment sent by all working nodes, it performs data aggregation and other data merging processing on the data fragment to obtain the global data of the data fragment.

工作节点的服务线程在收到数据后通常需要执行数据归并操作,对来自所有工作节点的同一数据分片进行合并,需要说明的是,该归并操作是可选的不是必须的,为了实现AllGather(全局收集)的效果,各工作节点在收到其它工作节点的局部数据后完全可以不做任何操作地依次直接分发给各工作节点,完成广播,该工作节点只起到一个中心汇总的作用。After receiving the data, the service thread of the working node usually needs to perform a data merging operation to merge the same data shards from all working nodes. It should be noted that this merging operation is optional but not necessary. In order to implement AllGather( (Global collection) effect, each working node can directly distribute it to each working node in turn without any operation after receiving the local data of other working nodes, and complete the broadcast. The working node only plays the role of a central summary.

步骤S82,向所有工作节点发送第一预告消息和全局数据消息,全局数据消息中携带 有所述数据分片的全局数据。Step S82: Send the first notice message and the global data message to all working nodes. The global data message carries the global data of the data fragments.

工作节点的服务线程判断是否已处理完本节点负责的全部数据分片,若全部处理完成,则服务线程结束;若未全部处理完成,则继续按照步骤S81~S82的方式对本工作节点负责的数据分片进行处理。The service thread of the working node determines whether all the data fragments responsible for this node have been processed. If all processing is completed, the service thread ends; if not all processing is completed, continue to follow steps S81 to S82 to process the data responsible for this working node. Processed in slices.

为清楚说明本公开实施例的技术方案,以下结合图9和一具体实例对工作节点进行数据处理的过程进行详细说明。如图9所示,工作节点进行数据处理的过程步骤1~10。In order to clearly illustrate the technical solution of the embodiment of the present disclosure, the data processing process of the working node will be described in detail below with reference to Figure 9 and a specific example. As shown in Figure 9, the working node performs data processing process steps 1 to 10.

步骤1:工作节点准备就绪后,对数据做分片处理并添加优先级标签,发送策略更新请求消息到调度节点请求策略更新。Step 1: After the working node is ready, fragment the data and add priority labels, and send a policy update request message to the scheduling node to request policy update.

步骤2:工作节点等待接收来自调度节点的策略响应消息,当收到不为空的策略响应消息时,执行策略更新,然后转至步骤3;若策略响应消息为空,则直接执行步骤3。Step 2: The working node waits to receive the policy response message from the scheduling node. When receiving a policy response message that is not empty, it performs the policy update and then goes to step 3; if the policy response message is empty, it directly executes step 3.

步骤3:工作节点分别创建发送线程、接收线程和服务线程,发送线程负责将局部数据发送到其他节点分别汇聚,接收线程负责接收全局数据进行下一轮计算,服务线程负责接收并归并其他工作节点发送的局部数据从而得到全局数据,然后将全局数据分发到其他工作节点。发送线程执行步骤4-5(即图9中左侧一列的步骤),接收线程执行步骤6-7(即图9中中间一列的步骤),服务线程执行步骤8-10(即图9中右侧一列的步骤),发送线程、接收线程和服务线程可以同步执行。Step 3: The working nodes create sending threads, receiving threads and service threads respectively. The sending thread is responsible for sending local data to other nodes for collection respectively. The receiving thread is responsible for receiving global data for the next round of calculation. The service thread is responsible for receiving and merging other working nodes. The local data is sent to obtain global data, and then the global data is distributed to other worker nodes. The sending thread performs steps 4-5 (i.e., the steps in the left column of Figure 9), the receiving thread performs steps 6-7 (i.e., the steps in the middle column of Figure 9), and the service thread performs steps 8-10 (i.e., the steps in the right column of Figure 9). (Steps in the next column), the sending thread, receiving thread and service thread can execute simultaneously.

步骤4:发送线程选定一个数据分片并确定其目的接收节点,然后向目的接收节点发送第二预告消息和局部数据消息(携带选定的数据分片)。Step 4: The sending thread selects a data fragment and determines its destination receiving node, and then sends the second notice message and the partial data message (carrying the selected data fragment) to the destination receiving node.

步骤5:发送线程判断是否已发送全部数据分片,若是,则发送线程结束,否则转至步骤4。Step 5: The sending thread determines whether all data fragments have been sent. If so, the sending thread ends, otherwise go to step 4.

步骤6:接收线程等待接收来自其他工作节点的第一预告消息和全局数据消息,在依次收到的情况下,计算吞吐量,其中,相同节点之间的吞吐需要累积求平均,然后转至步骤7。Step 6: The receiving thread waits to receive the first notice message and global data message from other working nodes. If received in sequence, calculate the throughput. Among them, the throughput between the same nodes needs to be accumulated and averaged, and then go to step 7.

步骤7:接收线程判断是否收到所有数据分片,若全部收到,则执行本地数据的后续操作,然后接收线程结束,否则转至步骤6。Step 7: The receiving thread determines whether all data fragments have been received. If all data fragments are received, subsequent operations on local data are performed, and then the receiving thread ends. Otherwise, go to step 6.

步骤8:服务线程等待接收来自工作节点的第二预告消息和局部数据消息,在依次收到的情况下,计算吞吐量(具体过程与步骤6相同),然后转至步骤9。Step 8: The service thread waits to receive the second notice message and the local data message from the worker node. If received in sequence, calculate the throughput (the specific process is the same as step 6), and then go to step 9.

步骤9:服务线程判断是否收到所有工作节点发送的当前数据分片,若是,则完成对该数据分片的数据归并,然后依次向所有其它工作节点发送第一预告消息和含有该数据分片归并后数据(即全局数据)的全局数据的消息,并转至步骤10;否则,转至步骤8。Step 9: The service thread determines whether it has received the current data fragments sent by all working nodes. If so, it completes the data merging of the data fragments, and then sends the first notice message and the data fragments containing the data to all other working nodes in sequence. The global data message of the merged data (that is, global data), and go to step 10; otherwise, go to step 8.

步骤10:服务线程判断是否已处理完本工作节点负责的全部数据分片,若是,则服务线程结束,否则转至步骤8。Step 10: The service thread determines whether all the data fragments responsible for this working node have been processed. If so, the service thread ends, otherwise go to step 8.

本公开实施例建立在网络在较短的时间内不会发生变化的前提下,使用上一次迭代的网络状态来规划下一次的迭代策略,所以默认在每轮数据传输开始时工作节点都要进行策略更新的询问。考虑具体的使用场景,如果所处环境网络状态较为稳定,也可以隔一定数量的轮次后再执行一次策略更新询问。This disclosed embodiment is based on the premise that the network will not change in a short period of time, and uses the network status of the previous iteration to plan the next iteration strategy. Therefore, by default, the working node must perform the next iteration at the beginning of each round of data transmission. Inquiries for policy updates. Considering the specific usage scenario, if the network status of the environment is relatively stable, you can also perform a policy update query after a certain number of rounds.

本公开实施例基于多中心通信任务负载均衡,默认将每个工作节点都作为了中心节点对待,事实上如果现实场景不需要,可以只选定部分工作节点作为中心节点参与数据分片聚合,无需给非中心节点的工作节点开辟服务线程并在调度节点对该节点做出标定即可。This disclosed embodiment is based on multi-center communication task load balancing, and treats each working node as a central node by default. In fact, if the actual scenario does not require it, only some working nodes can be selected as central nodes to participate in data sharding aggregation, without Just open a service thread for the working node of the non-central node and calibrate the node at the scheduling node.

本公开实施例通过同时切分大粒度以及合并过小粒度数据的方式解决大小粒度数据单独发送造成的不同时延问题,并使用传输优先级策略,保证更重要数据的尽早传输,实现有效的计算与通信重叠。此外,还根据网络吞吐量动态调整发送到每个汇聚节点(即目的 接收节点)上的数据量大小,从而在更普遍的动态网络、多汇聚节点场景中,实现针对实时网络吞吐的汇聚任务的负载均衡,避免了瓶颈链路的产生,提高了网络的利用率,使得动态网络的汇聚和分发更加高效。The embodiments of this disclosure solve the problem of different delays caused by sending large and small granularity data separately by simultaneously segmenting large-granularity data and merging small-granularity data, and use a transmission priority strategy to ensure that more important data is transmitted as early as possible to achieve effective calculations. Overlap with communications. In addition, the amount of data sent to each convergence node (i.e., the destination receiving node) is dynamically adjusted according to the network throughput, thereby achieving convergence tasks for real-time network throughput in more common dynamic network and multi-convergence node scenarios. Load balancing avoids the creation of bottleneck links, improves network utilization, and makes dynamic network aggregation and distribution more efficient.

本公开实施例的关键体现在网络吞吐量监测、基于网络信息的策略制定优化算法、优化粒度优先级数据传输机制以及最小竞争下带宽最大化的分片分配方法等。网络吞吐量监测机制在数据传输过程中确定网络中各节点间的上下行吞吐量,为策略制定提供基础。策略制定优化算法基于网络上下行吞吐量评估工作节点的通信性能,并动态地为每个节点分配其所负责的数据,实现汇聚和分发任务的负载均衡。优化粒度优先级数据传输机制通过粒度细化减少较大粒度下数据归并中的漫长阻塞时间,通过合并多个过小粒度数据规避单个过小粒度数据传输时造成的时延,并以传输优先级方式更早发送重要数据,促成后期工作更早开始,加速全局过程。最小竞争下带宽最大化的分片分配方法能进一步有效提高网络带宽资源的利用率,加速传输完成。Key features of the disclosed embodiments include network throughput monitoring, network information-based policy formulation optimization algorithms, optimized granular priority data transmission mechanisms, and shard allocation methods that maximize bandwidth under minimal competition. The network throughput monitoring mechanism determines the uplink and downlink throughput between nodes in the network during the data transmission process, providing a basis for policy formulation. The strategy formulation optimization algorithm evaluates the communication performance of working nodes based on network uplink and downlink throughput, and dynamically allocates the data it is responsible for to each node to achieve load balancing of aggregation and distribution tasks. Optimize the granularity priority data transmission mechanism to reduce the long blocking time in data merging at larger granularities through granularity refinement, and to avoid the delay caused by the transmission of a single too small granularity data by merging multiple data with too small granularity, and based on the transmission priority This method sends important data earlier, enabling later work to start earlier and accelerating the overall process. The fragmentation allocation method that maximizes bandwidth under minimal competition can further effectively improve the utilization of network bandwidth resources and accelerate the completion of transmission.

本公开实施例针对广域网的特点进行了传输调度优化。由于广域网带宽的稀缺性,才有了通过传输调度优化传输效率的必要性;由于广域网带宽的异构性,才有了通过合适的传输调度优化来规避瓶颈链路的可行性;由于广域网的带宽的动态性,才有了通过网络感知实时感知网络状态并及时调整传输策略的需求。本公开实施例的方案完全覆盖了广域网的三个特征,实现了强针对性的传输调度优化。本公开实施例能够通过对当前网络的测量来感知当前网络状态,并以网络状态在一个较短的时间内状态保持基本稳定为前提,用当前网络状态来决策各中心节点的数据分配比例,并将该决策结果用在未来一段时间内。因此,使用本公开实施例方案实现分布式业务的数据汇聚和分发具有显著的迭代性特征,能够为执行网络感知和决策结果得到应用足够的时间,从而使动态决策达到理想的效果。The embodiment of the present disclosure optimizes transmission scheduling according to the characteristics of the wide area network. Due to the scarcity of WAN bandwidth, it is necessary to optimize transmission efficiency through transmission scheduling; due to the heterogeneity of WAN bandwidth, it is feasible to avoid bottleneck links through appropriate transmission scheduling optimization; due to the bandwidth of WAN Due to the dynamic nature of the network, there is a need to perceive the network status in real time through network awareness and adjust the transmission strategy in a timely manner. The solution of the disclosed embodiment completely covers the three characteristics of the wide area network and realizes highly targeted transmission scheduling optimization. Embodiments of the present disclosure can perceive the current network status by measuring the current network, and on the premise that the network status remains basically stable in a short period of time, the current network status is used to determine the data allocation ratio of each central node, and Use the results of this decision for a period of time in the future. Therefore, using the solutions of the embodiments of the present disclosure to realize data aggregation and distribution of distributed services has significant iterative characteristics, allowing enough time for network sensing and decision-making results to be applied, so that dynamic decision-making can achieve ideal results.

基于相同的技术构思,本公开实施例还提供一种数据处理装置,如图10所示,所述数据处理装置应用于算力网络分布式集群系统中的调度节点,包括吞吐量信息感知模块101、策略制定模块102和策略发布模块103。Based on the same technical concept, embodiments of the present disclosure also provide a data processing device. As shown in Figure 10, the data processing device is applied to a scheduling node in a computing power network distributed cluster system and includes a throughput information sensing module 101 , policy formulation module 102 and policy release module 103.

吞吐量信息感知模块101用于,响应于接收到工作节点发送的携带有所述工作节点的网络吞吐量信息的策略更新请求消息,根据所述网络吞吐量信息更新全局网络吞吐量信息,所述全局网络吞吐量信息用于记录任意两个工作节点之间的网络吞吐量。The throughput information sensing module 101 is configured to, in response to receiving a policy update request message carrying the network throughput information of the working node sent by the working node, update the global network throughput information according to the network throughput information, the Global network throughput information records the network throughput between any two worker nodes.

策略制定模块102用于,在接收到全部工作节点发送的策略更新请求消息的情况下,根据所述全局网络吞吐量信息确定数据分配信息,所述数据分配信息用于记录各所述工作节点的数据分配比例。The policy formulation module 102 is configured to, upon receiving policy update request messages sent by all working nodes, determine data allocation information according to the global network throughput information, and the data allocation information is used to record the data distribution of each working node. Data allocation ratio.

策略发布模块103用于,响应于根据预设的第一阈值、所述数据分配信息和历史数据分配信息确定出满足预设策略更新条件,向各所述工作节点发送携带有所述数据分配信息的策略响应消息。The policy issuing module 103 is configured to, in response to determining that the preset policy update conditions are met based on the preset first threshold, the data allocation information and historical data allocation information, send the data allocation information carrying the data to each of the working nodes. policy response message.

在一些实施例中,所述全局网络吞吐量信息为n*n的矩阵,所述矩阵的元素a ij为节点i到节点j的网络吞吐量,1≤i≤n,1≤j≤n,n为所述分布式集群系统中工作节点的总数;所述策略制定模块102用于,分别计算所述矩阵中各行元素之和R i以及所述矩阵中各列元素之和C j;根据所述矩阵中各行元素之和R i以及各列元素之和C j,确定各所述工作节点的有效吞吐量;根据各所述工作节点的有效吞吐量和各所述工作节点的有效吞吐量的最小值,确定数据分配信息。 In some embodiments, the global network throughput information is an n*n matrix, and the element a ij of the matrix is the network throughput from node i to node j, 1≤i≤n, 1≤j≤n, n is the total number of working nodes in the distributed cluster system; the policy formulation module 102 is used to calculate the sum of the row elements R i in the matrix and the sum C j of the column elements in the matrix respectively; according to the The sum of the row elements R i and the sum of the column elements C j in the matrix are used to determine the effective throughput of each working node; according to the effective throughput of each working node and the effective throughput of each working node Minimum value to determine data allocation information.

在一些实施例中,所述满足预设策略更新条件,包括:所述数据分配信息中任意一个 所述工作节点的数据分配比例的变化率大于预设的第一阈值;其中,所述工作节点的数据分配比例的变化率根据所述数据分配信息中所述工作节点的数据分配比例和所述历史数据分配信息中所述工作节点的数据分配比例计算得到。In some embodiments, satisfying the preset policy update condition includes: the change rate of the data allocation ratio of any one of the working nodes in the data allocation information is greater than a preset first threshold; wherein, the working node The change rate of the data allocation ratio is calculated based on the data allocation ratio of the working node in the data allocation information and the data allocation ratio of the working node in the historical data allocation information.

在一些实施例中,吞吐量信息感知模块101还用于,在策略制定模块102根据所述全局网络吞吐量信息确定数据分配信息之后,响应于根据所述第一阈值、所述数据分配信息和历史数据分配信息确定出满足预设策略更新条件,针对所述变化率大于所述第一阈值的工作节点,根据所述数据分配信息更新所述历史数据分配信息。In some embodiments, the throughput information sensing module 101 is further configured to, after the policy formulation module 102 determines the data allocation information according to the global network throughput information, respond to the data allocation information according to the first threshold, the data allocation information and The historical data allocation information determines that the preset policy update condition is met, and for the working node whose change rate is greater than the first threshold, the historical data allocation information is updated according to the data allocation information.

基于相同的构思,本公开实施例还提供一种数据处理装置,如图11所示,所述数据处理装置应用于算力网络分布式集群系统中的工作节点,包括数据切分模块201、目的接收节点确定模块202和数据分发模块203。Based on the same concept, embodiments of the present disclosure also provide a data processing device. As shown in Figure 11, the data processing device is applied to working nodes in a computing power network distributed cluster system and includes a data segmentation module 201, a purpose Receiving node determination module 202 and data distribution module 203.

数据切分模块201用于,将待处理数据切分为多个数据分片,其中,各所述数据分片的数据量小于或等于第二阈值。The data segmentation module 201 is configured to segment the data to be processed into multiple data fragments, where the data amount of each data fragment is less than or equal to the second threshold.

目的接收节点确定模块202用于,根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,所述本地存储的数据分配信息为调度节点发送的策略响应消息中携带的数据分配信息。The destination receiving node determination module 202 is configured to determine the destination receiving node of the data fragments to be distributed according to the locally stored data allocation information. The locally stored data allocation information is the data allocation information carried in the policy response message sent by the scheduling node. .

数据分发模块203用于,向各所述目的接收节点发送第二预告消息和携带有所述待分配的数据分片的局部数据消息。The data distribution module 203 is configured to send a second notice message and a partial data message carrying the data fragments to be allocated to each of the destination receiving nodes.

在一些实施例中,数据切分模块201用于,在所述待处理数据为具有内部结构的数据的情况下,将待处理数据中数据量大于1/4第二阈值的内部结构切分为多个数据块,其中每个数据块的数据量小于或等于1/4第二阈值;根据各所述数据块的生成顺序,依次将各所述数据块合并生成数据分片;或者,在所述待处理数据为不具有内部结构的数据的情况下,根据所述第二阈值将待处理数据切分为多个数据分片。In some embodiments, the data segmentation module 201 is configured to, when the data to be processed is data with an internal structure, segment the internal structure in the data to be processed whose data amount is greater than 1/4 of the second threshold into Multiple data blocks, where the data amount of each data block is less than or equal to 1/4 of the second threshold; according to the generation order of each of the data blocks, each of the data blocks is sequentially merged to generate data fragments; or, in all If the data to be processed is data without an internal structure, the data to be processed is divided into multiple data fragments according to the second threshold.

在一些实施例中,数据切分模块201用于,根据工作节点待处理数据的数据总量和工作节点的总数量确定所述第二阈值。In some embodiments, the data segmentation module 201 is configured to determine the second threshold based on the total amount of data to be processed by the working node and the total number of working nodes.

在一些实施例中,如图12所示,所述数据处理装置还包括优先级模块204,优先级模块204用于,为各所述数据分片分配优先级,并生成优先级对列,所述优先级对列包括由高到低排列的各所述数据分片。In some embodiments, as shown in Figure 12, the data processing apparatus further includes a priority module 204. The priority module 204 is used to assign priorities to each of the data fragments and generate priority pairs. The priority list includes each of the data fragments arranged from high to low.

在一些实施例中,目的接收节点确定模块202用于,根据本地存储的数据分配信息,计算工作节点的数据分配比例之和;响应于所述工作节点的数据分配比例之和大于或等于所述数据分片的数量,确定所述数据分配信息中数据分配比例大于或等于1的工作节点为目的接收节点;响应于所述工作节点的数据分配比例之和小于所述数据分片的数量,确定全部所述工作节点为目的接收节点。In some embodiments, the destination receiving node determination module 202 is configured to calculate the sum of data allocation proportions of the working nodes according to the locally stored data allocation information; in response to the sum of the data allocation proportions of the working nodes being greater than or equal to the The number of data fragments, determine the working node with a data allocation ratio greater than or equal to 1 in the data allocation information as the destination receiving node; in response to the sum of the data allocation ratios of the working nodes being less than the number of data fragments, determine All the working nodes are destination receiving nodes.

在一些实施例中,数据分发模块203用于,在所述工作节点的数据分配比例之和大于或等于所述数据分片的数量的情况下,各所述目的接收节点待接收的数据分片的数量为第一数量,所述第一数量为各所述目的接收节点的数据分配比例与数据分片调整量的差值,所述数据分片调整量根据所述工作节点的数据分配比例之和、所述数据分片的数量和各目的接收节点的数据分配比例确定;在所述工作节点的数据分配比例之和小于所述数据分片的数量的情况下,各所述目的接收节点待接收的数据分片的数量为第二数量,所述第二数量根据各所述目的接收节点的数据分配比例确定。In some embodiments, the data distribution module 203 is configured to, when the sum of the data allocation proportions of the working nodes is greater than or equal to the number of the data fragments, distribute the data fragments to be received by each destination receiving node. The number is a first number, and the first number is the difference between the data allocation ratio of each destination receiving node and the data fragmentation adjustment amount. The data fragmentation adjustment amount is based on the data allocation ratio of the working node. The sum, the number of the data fragments and the data distribution ratio of each destination receiving node are determined; when the sum of the data distribution ratios of the working nodes is less than the number of the data fragments, each destination receiving node waits for The number of received data fragments is a second number, and the second number is determined according to the data allocation ratio of each destination receiving node.

在一些实施例中,目的接收节点确定模块202用于,根据各所述工作节点待处理数据的数据总量和本地存储的数据分配信息,计算各所述工作节点的负载;根据各所述工作节点的负载和待分配的数据分片的数据量,确定待分配的数据分片的目的接收节点。In some embodiments, the destination receiving node determination module 202 is configured to calculate the load of each working node according to the total amount of data to be processed by each working node and the locally stored data allocation information; The load of the node and the data volume of the data fragments to be allocated determine the destination receiving node of the data fragments to be allocated.

在一些实施例中,如图13所示,数据处理装置还包括网络吞吐量计算模块205,网络吞吐量计算模块205用于,响应于接收到其他工作节点发送的第一预告消息和全局数据消息,获取所述全局数据消息中携带的全局数据,所述全局数据为对分发给所述其他工作节点的数据分片进行数据处理后得到的数据;根据所述全局数据的数据量、所述第一预告消息的接收时间和所述全局数据消息的接收时间,计算所述工作节点与所述其他工作节点之间的网络吞吐量。In some embodiments, as shown in Figure 13, the data processing apparatus further includes a network throughput calculation module 205. The network throughput calculation module 205 is configured to respond to receiving the first preview message and the global data message sent by other working nodes. , obtain the global data carried in the global data message, the global data is the data obtained after data processing of the data fragments distributed to the other working nodes; according to the data volume of the global data, the third The network throughput between the working node and the other working nodes is calculated based on the reception time of a preview message and the reception time of the global data message.

在一些实施例中,如图14所示,所述数据处理装置还包括数据处理模块206,数据处理模块206用于,响应于接收到分发给本工作节点的全部数据分片,对所述全部数据分片进行处理。In some embodiments, as shown in Figure 14, the data processing device also includes a data processing module 206. The data processing module 206 is configured to, in response to receiving all data fragments distributed to this working node, process all the data fragments. Data is fragmented for processing.

在一些实施例中,网络吞吐量计算模块205还用于,响应于接收到第二预告消息和局部数据消息,获取所述局部数据消息中携带的数据分片;根据所述数据分片的数据量、所述第二预告消息的接收时间和所述局部数据消息的接收时间,计算相应工作节点之间的网络吞吐量。In some embodiments, the network throughput calculation module 205 is further configured to, in response to receiving the second notice message and the partial data message, obtain the data fragments carried in the partial data messages; according to the data of the data fragments The network throughput between corresponding working nodes is calculated based on the quantity, the reception time of the second preview message and the reception time of the local data message.

在一些实施例中,数据处理模块206还用于,针对同一数据分片,在接收到所有工作节点发送的所述数据分片的情况下,对所述数据分片进行数据归并得到所述数据分片的全局数据;向所有工作节点发送第一预告消息和全局数据消息,所述全局数据消息中携带有所述数据分片的全局数据。In some embodiments, the data processing module 206 is also configured to, for the same data fragment, perform data merging on the data fragments to obtain the data after receiving the data fragments sent by all working nodes. Global data of the fragments; sending a first notice message and a global data message to all working nodes, where the global data message carries the global data of the data fragments.

本公开实施例适用于基于广域网的算力网络所承载的各种大规模或大数据量分布式业务中的数据汇聚和分发场景。从对方案的全面利用角度来看,本公开实施例的方案尤其适用于分布式机器学习和联邦学习的全局模型汇聚。在技术应用场景上,智慧工厂、车联网和Cloud VR场景下的基于机器学习算法的控制模块的模型汇聚过程以及数据收集过程均可使用本公开实施例的方案,如智慧工厂中大量传感器数据的汇总、车联网中边缘服务器需要获取路侧不同设备(如红绿灯、摄像头、智能化标志标识等)的信息,来进行统一调度以及Cloud VR场景下云节点获取各个工作节点的中间数据等。The disclosed embodiments are suitable for data aggregation and distribution scenarios in various large-scale or large-volume distributed services carried by WAN-based computing power networks. From the perspective of comprehensive utilization of the solution, the solution of the embodiments of the present disclosure is particularly suitable for global model aggregation of distributed machine learning and federated learning. In terms of technical application scenarios, the model aggregation process and data collection process of the control module based on machine learning algorithms in smart factories, Internet of Vehicles and Cloud VR scenarios can all use the solutions of the embodiments of the present disclosure, such as the collection of large amounts of sensor data in smart factories. In summary, edge servers in the Internet of Vehicles need to obtain information from different devices on the roadside (such as traffic lights, cameras, intelligent signs, etc.) for unified scheduling and cloud nodes in Cloud VR scenarios to obtain intermediate data from each working node.

本公开实施例还提供了一种计算机设备,该计算机设备包括:一个或多个处理器以及存储装置;其中,存储装置上存储有一个或多个程序,当上述一个或多个程序被上述一个或多个处理器执行时,使得上述一个或多个处理器实现如前述各实施例所提供的数据处理方法。Embodiments of the present disclosure also provide a computer device. The computer device includes: one or more processors and a storage device; wherein one or more programs are stored on the storage device. When the one or more programs are used by the above-mentioned one When executed by or multiple processors, the above one or more processors implement the data processing method as provided in the foregoing embodiments.

本公开实施例还提供了一种计算机可读介质,其上存储有计算机程序,其中,该计算机程序被执行时实现如前述各实施例所提供的数据处理方法。Embodiments of the present disclosure also provide a computer-readable medium on which a computer program is stored, wherein when the computer program is executed, the data processing method as provided in the foregoing embodiments is implemented.

本领域普通技术人员可以理解,上文中所公开方法中的全部或某些步骤、装置中的功能模块/单元可以被实施为软件、固件、硬件及其适当的组合。在硬件实施方式中,在以上描述中提及的功能模块/单元之间的划分不一定对应于物理组件的划分;例如,一个物理组件可以具有多个功能,或者一个功能或步骤可以由若干物理组件合作执行。某些物理组件或所有物理组件可以被实施为由处理器,如中央处理器、数字信号处理器或微处理器执行的软件,或者被实施为硬件,或者被实施为集成电路,如专用集成电路。这样的软件可以分布在计算机可读介质上,计算机可读介质可以包括计算机存储介质(或非暂时性介质)和通信介质(或暂时性介质)。如本领域普通技术人员公知的,术语计算机存储介质包括在用于存储信息(诸如计算机可读指令、数据结构、程序模块或其他数据)的任何方法或技术中实施的易失性和非易失性、可移除和不可移除介质。计算机存储介质包括但不限于RAM、ROM、EEPROM、闪存或其他存储器技术、CD-ROM、数字多功能盘(DVD)或其他光盘存储、磁盒、磁带、磁盘存储或其他磁存储装置、或者可以用于存储期望的信息 并且可以被计算机访问的任何其他的介质。此外,本领域普通技术人员公知的是,通信介质通常包含计算机可读指令、数据结构、程序模块或者诸如载波或其他传输机制之类的调制数据信号中的其他数据,并且可包括任何信息递送介质。Those of ordinary skill in the art can understand that all or some steps in the methods disclosed above and functional modules/units in the devices can be implemented as software, firmware, hardware, and appropriate combinations thereof. In hardware implementations, the division between functional modules/units mentioned in the above description does not necessarily correspond to the division of physical components; for example, one physical component may have multiple functions, or one function or step may consist of several physical components. Components execute cooperatively. Some or all of the physical components may be implemented as software executed by a processor, such as a central processing unit, a digital signal processor, or a microprocessor, or as hardware, or as an integrated circuit, such as an application specific integrated circuit . Such software may be distributed on computer-readable media, which may include computer storage media (or non-transitory media) and communication media (or transitory media). As is known to those of ordinary skill in the art, the term computer storage media includes volatile and nonvolatile media implemented in any method or technology for storage of information such as computer readable instructions, data structures, program modules or other data. removable, removable and non-removable media. Computer storage media includes, but is not limited to, RAM, ROM, EEPROM, flash memory or other memory technology, CD-ROM, Digital Versatile Disk (DVD) or other optical disk storage, magnetic cassettes, tapes, disk storage or other magnetic storage devices, or may Any other medium used to store the desired information and that can be accessed by a computer. Additionally, it is known to those of ordinary skill in the art that communication media typically embodies computer readable instructions, data structures, program modules or other data in a modulated data signal such as a carrier wave or other transport mechanism, and may include any information delivery media .

本公开实施例能够显著加速基于广域网的算力网络所承载的分布式应用的数据汇聚和分发过程,从而有效减少数据汇聚和分发的通信时间,提高通信效率;基于网络吞吐量感知技术,通过节点性能评估算法,动态地为不同汇聚节点分配不同规模的数据量,能够有效规避瓶颈链路,更有效利用高带宽资源链路,从而实现对当前广域网带宽资源的高效利用。The disclosed embodiments can significantly accelerate the data aggregation and distribution process of distributed applications carried by WAN-based computing power networks, thereby effectively reducing the communication time for data aggregation and distribution and improving communication efficiency; based on network throughput sensing technology, through nodes The performance evaluation algorithm dynamically allocates data volumes of different sizes to different aggregation nodes, which can effectively avoid bottleneck links and more effectively utilize high-bandwidth resource links, thereby achieving efficient use of current WAN bandwidth resources.

本文已经公开了示例实施例,并且虽然采用了具体术语,但它们仅用于并仅应当被解释为一般说明性含义,并且不用于限制的目的。在一些实例中,对本领域技术人员显而易见的是,除非另外明确指出,否则可单独使用与特定实施例相结合描述的特征、特性和/或元素,或可与其他实施例相结合描述的特征、特性和/或元件组合使用。因此,本领域技术人员将理解,在不脱离由所附的权利要求阐明的本公开的范围的情况下,可进行各种形式和细节上的改变。Example embodiments have been disclosed herein, and although specific terms are employed, they are used and should be interpreted in a general illustrative sense only and not for purpose of limitation. In some instances, it will be apparent to those skilled in the art that features, characteristics and/or elements described in connection with a particular embodiment may be used alone, or may be used in conjunction with other embodiments, unless expressly stated otherwise. Features and/or components used in combination. Accordingly, it will be understood by those skilled in the art that various changes in form and details may be made without departing from the scope of the present disclosure as set forth in the appended claims.

Claims (19)

一种数据处理方法,应用于算力网络分布式集群系统中的调度节点,所述方法包括:A data processing method, applied to scheduling nodes in a computing power network distributed cluster system, the method includes: 响应于接收到工作节点发送的携带有所述工作节点的网络吞吐量信息的策略更新请求消息,根据所述网络吞吐量信息更新全局网络吞吐量信息,所述全局网络吞吐量信息用于记录任意两个工作节点之间的网络吞吐量;In response to receiving the policy update request message carrying the network throughput information of the working node sent by the working node, the global network throughput information is updated according to the network throughput information, and the global network throughput information is used to record any Network throughput between two worker nodes; 在接收到全部工作节点发送的策略更新请求消息的情况下,根据所述全局网络吞吐量信息确定数据分配信息,所述数据分配信息用于记录各所述工作节点的数据分配比例;以及When policy update request messages sent by all working nodes are received, data allocation information is determined based on the global network throughput information, and the data allocation information is used to record the data allocation ratio of each working node; and 响应于根据预设的第一阈值、所述数据分配信息和历史数据分配信息确定出满足预设策略更新条件,向各所述工作节点发送携带有所述数据分配信息的策略响应消息。In response to determining that the preset policy update condition is met based on the preset first threshold, the data allocation information and historical data allocation information, a policy response message carrying the data allocation information is sent to each of the working nodes. 如权利要求1所述的方法,其中,所述全局网络吞吐量信息为n*n的矩阵,所述矩阵的元素a ij为节点i到节点j的网络吞吐量,1≤i≤n,1≤j≤n,n为所述分布式集群系统中工作节点的总数; The method according to claim 1, wherein the global network throughput information is an n*n matrix, and the element a ij of the matrix is the network throughput from node i to node j, 1≤i≤n,1 ≤j≤n, n is the total number of working nodes in the distributed cluster system; 所述根据所述全局网络吞吐量信息确定数据分配信息,包括:Determining data allocation information based on the global network throughput information includes: 分别计算所述矩阵中各行元素之和R i以及所述矩阵中各列元素之和C jCalculate the sum of the row elements R i in the matrix and the sum C j of the column elements in the matrix respectively; 根据所述矩阵中各行元素之和R i以及各列元素之和C j,确定各所述工作节点的有效吞吐量; Determine the effective throughput of each working node according to the sum of the row elements R i and the sum of the column elements C j in the matrix; 根据各所述工作节点的有效吞吐量和各所述工作节点的有效吞吐量的最小值,确定数据分配信息。Data allocation information is determined according to the effective throughput of each working node and the minimum value of the effective throughput of each working node. 如权利要求1所述的方法,其中,所述满足预设策略更新条件,包括:The method of claim 1, wherein satisfying the preset policy update conditions includes: 所述数据分配信息中任意一个所述工作节点的数据分配比例的变化率大于预设的第一阈值;其中,所述工作节点的数据分配比例的变化率根据所述数据分配信息中所述工作节点的数据分配比例和所述历史数据分配信息中所述工作节点的数据分配比例计算得到。The rate of change of the data allocation ratio of any one of the working nodes in the data allocation information is greater than the preset first threshold; wherein, the rate of change of the data allocation ratio of the working node is based on the work node described in the data allocation information. The data distribution ratio of the node is calculated from the data distribution ratio of the working node in the historical data distribution information. 如权利要求3所述的方法,其中,在根据所述全局网络吞吐量信息确定数据分配信息之后,所述方法还包括:The method of claim 3, wherein after determining the data allocation information according to the global network throughput information, the method further includes: 响应于根据所述第一阈值、所述数据分配信息和历史数据分配信息确定出满足预设策略更新条件,针对所述变化率大于所述第一阈值的工作节点,根据所述数据分配信息更新所述历史数据分配信息。In response to determining that the preset policy update condition is met based on the first threshold, the data allocation information and historical data allocation information, for the working node whose change rate is greater than the first threshold, update according to the data allocation information The historical data distribution information. 一种数据处理方法,应用于算力网络分布式集群系统中的工作节点,所述方法包括:A data processing method, applied to working nodes in a computing power network distributed cluster system, the method includes: 将待处理数据切分为多个数据分片,其中,各所述数据分片的数据量小于或等于第二阈值;Divide the data to be processed into multiple data fragments, wherein the data amount of each data fragment is less than or equal to the second threshold; 根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,所述本地存储的数据分配信息为调度节点发送的策略响应消息中携带的数据分配信息;以及Determine the destination receiving node for the data fragments to be allocated based on locally stored data allocation information, which is the data allocation information carried in the policy response message sent by the scheduling node; and 向各所述目的接收节点发送第二预告消息和携带有所述待分配的数据分片的局部数据消息。Send a second notice message and a partial data message carrying the data fragments to be allocated to each of the destination receiving nodes. 如权利要求5所述的方法,其中,所述将待处理数据切分为多个数据分片,包括:The method of claim 5, wherein dividing the data to be processed into multiple data fragments includes: 在所述待处理数据为具有内部结构的数据的情况下,将待处理数据中数据量大于1/4第二阈值的内部结构切分为多个数据块,其中每个数据块的数据量小于或等于1/4第二阈值;根据各所述数据块的生成顺序,依次将各所述数据块合并生成数据分片;或者,在所述待处理数据为不具有内部结构的数据的情况下,根据所述第二阈值将待处理数据切分为多个数据分片。In the case where the data to be processed is data with an internal structure, the internal structure in the data to be processed with a data amount greater than 1/4 of the second threshold is divided into multiple data blocks, wherein the data amount of each data block is less than Or equal to 1/4 of the second threshold; according to the generation order of each data block, each of the data blocks is sequentially merged to generate data fragments; or, in the case where the data to be processed is data without an internal structure , divide the data to be processed into multiple data fragments according to the second threshold. 如权利要求5所述的方法,其中,根据工作节点待处理数据的数据总量和工作节点的总数量确定所述第二阈值。The method of claim 5, wherein the second threshold is determined based on a total amount of data to be processed by a working node and a total number of working nodes. 如权利要求5所述的方法,其中,在将待处理数据切分为多个数据分片之后,还包括:The method of claim 5, wherein after dividing the data to be processed into multiple data fragments, it further includes: 为各所述数据分片分配优先级,并生成优先级对列,所述优先级对列包括由高到低排列的各所述数据分片。A priority is assigned to each of the data fragments, and a priority list is generated, where the priority list includes each of the data fragments arranged from high to low. 如权利要求5所述的方法,其中,所述根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,包括:The method of claim 5, wherein determining the destination receiving node of the data fragments to be allocated based on locally stored data allocation information includes: 根据本地存储的数据分配信息,计算工作节点的数据分配比例之和;Calculate the sum of data allocation proportions of working nodes based on locally stored data allocation information; 响应于所述工作节点的数据分配比例之和大于或等于所述数据分片的数量,确定所述数据分配信息中数据分配比例大于或等于1的工作节点为目的接收节点;以及In response to the sum of the data allocation proportions of the working nodes being greater than or equal to the number of data shards, determining the working node with a data allocation proportion greater than or equal to 1 in the data allocation information as the destination receiving node; and 响应于所述工作节点的数据分配比例之和小于所述数据分片的数量,确定全部所述工作节点为目的接收节点。In response to the sum of the data allocation proportions of the working nodes being less than the number of data fragments, all the working nodes are determined to be destination receiving nodes. 如权利要求9所述的方法,其中,在所述工作节点的数据分配比例之和大于或等于所述数据分片的数量的情况下,各所述目的接收节点待接收的数据分片的数量为第一数量,所述第一数量为各所述目的接收节点的数据分配比例与数据分片调整量的差值,所述数据分片调整量根据所述工作节点的数据分配比例之和、所述数据分片 的数量和各目的接收节点的数据分配比例确定;The method of claim 9, wherein when the sum of the data allocation ratios of the working nodes is greater than or equal to the number of data fragments, the number of data fragments to be received by each destination receiving node is the first quantity, and the first quantity is the difference between the data allocation proportion of each destination receiving node and the data fragmentation adjustment amount. The data fragmentation adjustment amount is based on the sum of the data allocation proportions of the working nodes, The number of data fragments and the data distribution ratio of each destination receiving node are determined; 在所述工作节点的数据分配比例之和小于所述数据分片的数量的情况下,各所述目的接收节点待接收的数据分片的数量为第二数量,所述第二数量根据各所述目的接收节点的数据分配比例确定。When the sum of the data allocation proportions of the working nodes is less than the number of data fragments, the number of data fragments to be received by each destination receiving node is a second number, and the second number is based on the number of data fragments. The data distribution ratio of the destination receiving node is determined. 如权利要求5所述的方法,其中,所述根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,包括:The method of claim 5, wherein determining the destination receiving node of the data fragments to be allocated based on locally stored data allocation information includes: 根据各所述工作节点待处理数据的数据总量和本地存储的数据分配信息,计算各所述工作节点的负载;Calculate the load of each working node according to the total amount of data to be processed by each working node and the locally stored data allocation information; 根据各所述工作节点的负载和待分配的数据分片的数据量,确定待分配的数据分片的目的接收节点。According to the load of each working node and the data volume of the data fragments to be allocated, the destination receiving node of the data fragments to be allocated is determined. 如权利要求5所述的方法,其中,所述方法还包括:The method of claim 5, further comprising: 响应于接收到其他工作节点发送的第一预告消息和全局数据消息,获取所述全局数据消息中携带的全局数据,所述全局数据为对分发给所述其他工作节点的数据分片进行数据处理后得到的数据;In response to receiving the first notice message and the global data message sent by other working nodes, obtain the global data carried in the global data message, where the global data is data processing of the data fragments distributed to the other working nodes. The data obtained later; 根据所述全局数据的数据量、所述第一预告消息的接收时间和所述全局数据消息的接收时间,计算所述工作节点与所述其他工作节点之间的网络吞吐量。Calculate the network throughput between the working node and the other working nodes according to the data amount of the global data, the receiving time of the first preview message and the receiving time of the global data message. 如权利要求12所述的方法,其中,所述方法还包括:The method of claim 12, wherein the method further includes: 响应于接收到分发给本工作节点的全部数据分片,对所述全部数据分片进行处理。In response to receiving all data fragments distributed to this working node, all data fragments are processed. 如权利要求5所述的方法,其中,所述方法还包括:The method of claim 5, further comprising: 响应于接收到第二预告消息和局部数据消息,获取所述局部数据消息中携带的数据分片;In response to receiving the second preview message and the partial data message, obtain the data fragments carried in the partial data message; 根据所述数据分片的数据量、所述第二预告消息的和所述局部数据消息的接收时间,计算相应工作节点之间的网络吞吐量。The network throughput between corresponding working nodes is calculated according to the data amount of the data fragment, the reception time of the second preview message and the partial data message. 如权利要求14所述的方法,其中,所述方法还包括:The method of claim 14, wherein the method further includes: 针对同一数据分片,在接收到所有工作节点发送的所述数据分片的情况下,对所述数据分片进行数据归并得到所述数据分片的全局数据;For the same data fragment, after receiving the data fragments sent by all working nodes, perform data merging on the data fragments to obtain the global data of the data fragments; 向所有工作节点发送第一预告消息和全局数据消息,所述全局数据消息中携带有所述数据分片的全局数据。Send a first notice message and a global data message to all working nodes, where the global data message carries the global data of the data fragments. 一种数据处理装置,应用于算力网络分布式集群系统中的调度节点,包括吞吐量信息感知模块、策略制定模块和策略发布模块;A data processing device applied to a scheduling node in a computing power network distributed cluster system, including a throughput information sensing module, a policy formulation module and a policy release module; 所述吞吐量信息感知模块被配置为,响应于接收到工作节点发送的携带有所述工 作节点的网络吞吐量信息的策略更新请求消息,根据所述网络吞吐量信息更新全局网络吞吐量信息,所述全局网络吞吐量信息用于记录任意两个工作节点之间的网络吞吐量;The throughput information sensing module is configured to, in response to receiving a policy update request message carrying the network throughput information of the working node sent by the working node, update the global network throughput information according to the network throughput information, The global network throughput information is used to record the network throughput between any two working nodes; 所述策略制定模块被配置为,在接收到全部工作节点发送的策略更新请求消息的情况下,根据所述全局网络吞吐量信息确定数据分配信息,所述数据分配信息用于记录各所述工作节点的数据分配比例;The policy formulation module is configured to, upon receiving policy update request messages sent by all working nodes, determine data allocation information based on the global network throughput information, and the data allocation information is used to record each of the work nodes. Data allocation ratio of nodes; 所述策略发布模块被配置为,响应于根据预设的第一阈值、所述数据分配信息和历史数据分配信息确定出满足预设策略更新条件,向各所述工作节点发送携带有所述数据分配信息的策略响应消息。The policy publishing module is configured to, in response to determining that the preset policy update condition is met based on the preset first threshold, the data allocation information and historical data allocation information, send the data carrying the data to each of the working nodes. Policy response message for allocation information. 一种数据处理装置,应用于算力网络分布式集群系统中的工作节点,包括数据切分模块、目的接收节点确定模块和数据分发模块,A data processing device applied to working nodes in a computing power network distributed cluster system, including a data segmentation module, a destination receiving node determination module and a data distribution module, 所述数据切分模块被配置为,将待处理数据切分为多个数据分片,其中,各所述数据分片的数据量小于或等于第二阈值;The data segmentation module is configured to segment the data to be processed into multiple data fragments, wherein the data amount of each data fragment is less than or equal to the second threshold; 所述目的接收节点确定模块被配置为,根据本地存储的数据分配信息确定待分配的数据分片的目的接收节点,所述本地存储的数据分配信息为调度节点发送的策略响应消息中携带的数据分配信息;The destination receiving node determination module is configured to determine the destination receiving node of the data fragments to be distributed based on locally stored data allocation information. The locally stored data allocation information is the data carried in the policy response message sent by the scheduling node. distribution information; 所述数据分发模块被配置为,向各所述目的接收节点发送第二预告消息和携带有所述待分配的数据分片的局部数据消息。The data distribution module is configured to send a second notice message and a partial data message carrying the data fragments to be allocated to each of the destination receiving nodes. 一种计算机设备,包括:A computer device consisting of: 一个或多个处理器;one or more processors; 存储装置,其上存储有一个或多个程序;A storage device on which one or more programs are stored; 当所述一个或多个程序被所述一个或多个处理器执行时,使得所述一个或多个处理器实现如权利要求1-15任一项所述的数据处理方法。When the one or more programs are executed by the one or more processors, the one or more processors are caused to implement the data processing method according to any one of claims 1-15. 一种计算机可读介质,其上存储有计算机程序,其中,所述程序被执行时实现如权利要求1-15任一项所述的数据处理方法。A computer-readable medium having a computer program stored thereon, wherein when the program is executed, the data processing method according to any one of claims 1-15 is implemented.
PCT/CN2022/123850 2022-03-09 2022-10-08 Data processing method and apparatus, computer device, and readable medium WO2023168937A1 (en)

Applications Claiming Priority (2)

Application Number Priority Date Filing Date Title
CN202210232264.4A CN116781703A (en) 2022-03-09 2022-03-09 Data processing method, device, computer equipment and readable medium
CN202210232264.4 2022-03-09

Publications (1)

Publication Number Publication Date
WO2023168937A1 true WO2023168937A1 (en) 2023-09-14

Family

ID=87937087

Family Applications (1)

Application Number Title Priority Date Filing Date
PCT/CN2022/123850 WO2023168937A1 (en) 2022-03-09 2022-10-08 Data processing method and apparatus, computer device, and readable medium

Country Status (2)

Country Link
CN (1) CN116781703A (en)
WO (1) WO2023168937A1 (en)

Cited By (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN118450342A (en) * 2024-07-05 2024-08-06 深圳博瑞天下科技有限公司 A method and device for processing SMS node coordination under high throughput

Citations (8)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN102073546A (en) * 2010-12-13 2011-05-25 北京航空航天大学 Task-dynamic dispatching method under distributed computation mode in cloud computing environment
US20170085447A1 (en) * 2015-09-21 2017-03-23 Splunk Inc. Adaptive control of data collection requests sent to external data sources
CN109308221A (en) * 2018-08-02 2019-02-05 南京邮电大学 A Nginx dynamic load balancing method based on WebSocket long connection
CN110022373A (en) * 2019-04-17 2019-07-16 北京达佳互联信息技术有限公司 Method for distributing business, device, server and storage medium
US20210006459A1 (en) * 2019-07-02 2021-01-07 Northeastern University Network and Method for Servicing a Computation Request
CN112463390A (en) * 2020-12-11 2021-03-09 厦门市美亚柏科信息股份有限公司 Distributed task scheduling method and device, terminal equipment and storage medium
CN113486042A (en) * 2021-08-11 2021-10-08 腾讯科技(上海)有限公司 Data processing method and device, computer readable medium and electronic equipment
CN113918270A (en) * 2020-07-08 2022-01-11 电科云(北京)科技有限公司 Cloud resource scheduling method and system based on Kubernetes

Patent Citations (8)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN102073546A (en) * 2010-12-13 2011-05-25 北京航空航天大学 Task-dynamic dispatching method under distributed computation mode in cloud computing environment
US20170085447A1 (en) * 2015-09-21 2017-03-23 Splunk Inc. Adaptive control of data collection requests sent to external data sources
CN109308221A (en) * 2018-08-02 2019-02-05 南京邮电大学 A Nginx dynamic load balancing method based on WebSocket long connection
CN110022373A (en) * 2019-04-17 2019-07-16 北京达佳互联信息技术有限公司 Method for distributing business, device, server and storage medium
US20210006459A1 (en) * 2019-07-02 2021-01-07 Northeastern University Network and Method for Servicing a Computation Request
CN113918270A (en) * 2020-07-08 2022-01-11 电科云(北京)科技有限公司 Cloud resource scheduling method and system based on Kubernetes
CN112463390A (en) * 2020-12-11 2021-03-09 厦门市美亚柏科信息股份有限公司 Distributed task scheduling method and device, terminal equipment and storage medium
CN113486042A (en) * 2021-08-11 2021-10-08 腾讯科技(上海)有限公司 Data processing method and device, computer readable medium and electronic equipment

Cited By (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN118450342A (en) * 2024-07-05 2024-08-06 深圳博瑞天下科技有限公司 A method and device for processing SMS node coordination under high throughput
CN118450342B (en) * 2024-07-05 2024-10-29 深圳博瑞天下科技有限公司 Method and device for processing short message node overall under high throughput

Also Published As

Publication number Publication date
CN116781703A (en) 2023-09-19

Similar Documents

Publication Publication Date Title
CN108509276B (en) Video task dynamic migration method in edge computing environment
CN111246586B (en) A method and system for allocating smart grid resources based on genetic algorithm
CN111654712B (en) A Dynamic Adaptive Streaming Multicast Method for Mobile Edge Computing Scenarios
CN112737823A (en) Resource slice allocation method and device and computer equipment
CN110381541A (en) A kind of smart grid slice distribution method and device based on intensified learning
WO2019072162A1 (en) Virtual network mapping method, device and storage medium
US10521258B2 (en) Managing test services in a distributed production service environment
US9141436B2 (en) Apparatus and method for partition scheduling for a processor with cores
WO2020042612A1 (en) Method and device for storing and reading a message, server, and storage medium
CN103209419B (en) The method of the dynamic spectrum access of a kind of Users &#39; Need-oriented and lifting network performance
US9817698B2 (en) Scheduling execution requests to allow partial results
WO2021259246A1 (en) Resource scheduling method and apparatus, electronic device, and computer-readable storage medium
CN118690873A (en) Heterogeneous method and system for federated learning client resources for edge intelligence
CN112148381A (en) Software definition-based edge computing priority unloading decision method and system
CN112162789A (en) Edge calculation random unloading decision method and system based on software definition
CN117118926A (en) Scheduling method and device for network resources
WO2023168937A1 (en) Data processing method and apparatus, computer device, and readable medium
CN106502790A (en) A kind of task distribution optimization method based on data distribution
CN119697028A (en) Network resource configuration method, device, system, computer equipment, readable storage medium and program product
CN112423041B (en) Video stream processing method and system based on QoS constraints under distributed computing platform
CN113891466A (en) Online scheduling system and method for UDL task in edge wireless network
US10986036B1 (en) Method and apparatus for orchestrating resources in multi-access edge computing (MEC) network
CN112162837B (en) Edge calculation scheduling method and system based on software definition
US20250111295A1 (en) Internet of vehicles platform expansion and contraction method and system,and storage medium
CN118708344A (en) Resource scheduling method, device, computer equipment and readable storage medium

Legal Events

Date Code Title Description
121 Ep: the epo has been informed by wipo that ep was designated in this application

Ref document number: 22930561

Country of ref document: EP

Kind code of ref document: A1

NENP Non-entry into the national phase

Ref country code: DE

122 Ep: pct application non-entry in european phase

Ref document number: 22930561

Country of ref document: EP

Kind code of ref document: A1