CN114675969B - Elastic scaling stream processing method and system based on self-adaptive load partition - Google Patents
Elastic scaling stream processing method and system based on self-adaptive load partition Download PDFInfo
- Publication number
- CN114675969B CN114675969B CN202210313490.5A CN202210313490A CN114675969B CN 114675969 B CN114675969 B CN 114675969B CN 202210313490 A CN202210313490 A CN 202210313490A CN 114675969 B CN114675969 B CN 114675969B
- Authority
- CN
- China
- Prior art keywords
- data
- operator
- time
- load
- stream processing
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 238000005192 partition Methods 0.000 title claims abstract description 26
- 238000003672 processing method Methods 0.000 title claims abstract description 16
- 230000003044 adaptive effect Effects 0.000 claims abstract description 22
- 238000000638 solvent extraction Methods 0.000 claims description 97
- 238000000034 method Methods 0.000 claims description 50
- 238000004364 calculation method Methods 0.000 claims description 26
- 238000003860 storage Methods 0.000 claims description 15
- 241001362551 Samba Species 0.000 claims description 12
- 238000010276 construction Methods 0.000 claims description 5
- 238000005516 engineering process Methods 0.000 claims description 5
- 238000012544 monitoring process Methods 0.000 claims description 5
- 230000002688 persistence Effects 0.000 claims description 3
- 230000000694 effects Effects 0.000 abstract description 4
- 238000013468 resource allocation Methods 0.000 description 12
- 238000010586 diagram Methods 0.000 description 7
- 230000003068 static effect Effects 0.000 description 4
- 238000009826 distribution Methods 0.000 description 3
- 238000005457 optimization Methods 0.000 description 3
- 101001121408 Homo sapiens L-amino-acid oxidase Proteins 0.000 description 2
- 102100026388 L-amino-acid oxidase Human genes 0.000 description 2
- 238000004458 analytical method Methods 0.000 description 2
- 238000004519 manufacturing process Methods 0.000 description 2
- 239000002699 waste material Substances 0.000 description 2
- 101000827703 Homo sapiens Polyphosphoinositide phosphatase Proteins 0.000 description 1
- 102100023591 Polyphosphoinositide phosphatase Human genes 0.000 description 1
- 101100012902 Saccharomyces cerevisiae (strain ATCC 204508 / S288c) FIG2 gene Proteins 0.000 description 1
- 238000009825 accumulation Methods 0.000 description 1
- 230000007423 decrease Effects 0.000 description 1
- 230000001934 delay Effects 0.000 description 1
- 238000005111 flow chemistry technique Methods 0.000 description 1
- 208000024891 symptom Diseases 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5005—Allocation of resources, e.g. of the central processing unit [CPU] to service a request
- G06F9/5027—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals
- G06F9/505—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals considering the load
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/10—File systems; File servers
- G06F16/17—Details of further file system functions
- G06F16/176—Support for shared access to files; File sharing support
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5061—Partitioning or combining of resources
- G06F9/5077—Logical partitioning of resources; Management or configuration of virtualized resources
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Software Systems (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Data Mining & Analysis (AREA)
- Databases & Information Systems (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
Description
技术领域Technical Field
本发明属于计算机数据处理领域,具体涉及一种基于自适应负载分区的弹性缩放流处理方法及系统。The present invention belongs to the field of computer data processing, and in particular relates to an elastic scaling stream processing method and system based on adaptive load partitioning.
背景技术Background Art
现实世界的数据流往往是动态变化的,对分布式流处理系统的处理性能带来了巨大的挑战。在高速输入下,系统会因为处理性能不足产生较高的处理延迟,甚至导致输入数据丢失。而在低速输入下,系统中的大量计算资源处于空闲状态,资源利用率低。不仅如此,输入数据流的倾斜分布也加剧了分布式流处理系统的资源利用不均,导致处理性能下降。Data streams in the real world often change dynamically, which poses a huge challenge to the processing performance of distributed stream processing systems. Under high-speed input, the system will have high processing delays due to insufficient processing performance, and may even cause input data loss. Under low-speed input, a large number of computing resources in the system are idle, and resource utilization is low. Not only that, the skewed distribution of input data streams also exacerbates the uneven resource utilization of distributed stream processing systems, resulting in reduced processing performance.
为了解决分布式数据流处理系统任务执行过程中的资源分配问题,研究人员提出了多种资源配置方案。静态资源配置方案是最简单的资源分配方案,其通过设置能够满足可预见的最大负载的资源配置来实现资源分配。静态资源方案具有配置简单的特点,但是在复杂环境下难以预测可能出现的最大工作负载,并且需要在不同环境中人工调整资源分配。不仅如此,静态资源配置方案在大部分的处理时间内并不会出现负载峰值,这种情况无疑会产生大量的计算资源浪费。In order to solve the resource allocation problem in the task execution process of distributed data stream processing systems, researchers have proposed a variety of resource allocation schemes. The static resource allocation scheme is the simplest resource allocation scheme, which implements resource allocation by setting resource configurations that can meet the foreseeable maximum load. The static resource scheme has the characteristics of simple configuration, but it is difficult to predict the maximum workload that may occur in complex environments, and resource allocation needs to be manually adjusted in different environments. Not only that, the static resource configuration scheme does not have load peaks during most of the processing time, which will undoubtedly result in a large amount of computing resource waste.
为了解决静态资源分配方案中的资源浪费问题,大量的动态资源分配方案被提出。动态资源分配方案主要分为基于规则的方法和基于模型的方法。基于规则的方法利用预定义的规则生成资源配置方案:基于规则的方法首先收集必要的性能指标,并且将性能指标应用于预定义的专家规则,根据规则判断系统运行时症状并产生资源重配置动作。由于规则需要领域专家设计,因此基于规则的方法无法普遍适用于不同的生产环境。基于模型的方法首先对分布式流处理系统建立系统模型,并且构建系统性能与资源配置的优化问题,再利用优化方法求解优化问题,将计算的最优解作为下一步的资源分配方案。基于模型的方法虽然能够在均衡的数据流上产生较好的缩放效果,但是在倾斜数据流中难以建立准确的性能模型从而导致效果不佳。In order to solve the problem of resource waste in static resource allocation schemes, a large number of dynamic resource allocation schemes have been proposed. Dynamic resource allocation schemes are mainly divided into rule-based methods and model-based methods. Rule-based methods use predefined rules to generate resource configuration schemes: rule-based methods first collect necessary performance indicators, and apply performance indicators to predefined expert rules, judge system runtime symptoms according to the rules and generate resource reconfiguration actions. Since rules need to be designed by domain experts, rule-based methods cannot be universally applied to different production environments. Model-based methods first establish a system model for the distributed stream processing system, and construct an optimization problem of system performance and resource configuration, and then use optimization methods to solve the optimization problem, and use the calculated optimal solution as the next resource allocation scheme. Although model-based methods can produce better scaling effects on balanced data streams, it is difficult to establish an accurate performance model in tilted data streams, resulting in poor results.
发明内容Summary of the invention
本发明的目的之一在于提供一种可靠性高、实施效果好且科学合理的基于自适应负载分区的弹性缩放流处理方法。One of the purposes of the present invention is to provide a scientific and reasonable elastic scaling stream processing method based on adaptive load partitioning with high reliability and good implementation effect.
本发明的目的之二在于提供一种实现所述基于自适应负载分区的弹性缩放流处理方法的系统。A second objective of the present invention is to provide a system for implementing the elastic scaling stream processing method based on adaptive load partitioning.
本发明提供的这种基于自适应负载分区的弹性缩放流处理方法,包括如下步骤:The elastic scaling stream processing method based on adaptive load partitioning provided by the present invention comprises the following steps:
S1.基于Flink原型,以现有技术构建流处理系统;S1. Build a stream processing system based on the Flink prototype using existing technologies;
S2.基于步骤S1构建的流处理系统,构建DKG(Duplicated Key Group,重复键组)模型用于将数据分配到下游算子实例,并管理实例中的计算状态;S2. Based on the stream processing system constructed in step S1, a DKG (Duplicated Key Group) model is constructed to distribute data to downstream operator instances and manage the computing status in the instances;
S3.构建指标收集器模型,用于收集和存储流处理系统的性能指标数据;S3. Build an indicator collector model to collect and store performance indicator data of the stream processing system;
S4.将步骤S3存储的性能指标数据共享;S4. Share the performance indicator data stored in step S3;
S5.构建判别器模型,用于计算弹性缩放策略实施因子和负载分区策略实施因子;S5. construct a discriminator model for calculating elastic scaling strategy implementation factors and load partitioning strategy implementation factors;
S6.根据步骤S5得到的弹性缩放策略实施因子和负载分区策略实施因子,构建对应的弹性缩放策略和负载分区策略;S6. According to the elastic scaling strategy implementation factor and the load partitioning strategy implementation factor obtained in step S5, construct the corresponding elastic scaling strategy and load partitioning strategy;
S7.构建重配置控制器模块,用于将步骤S6得到的策略应用到流处理系统中,从而完成基于自适应负载分区的弹性缩放流处理。S7. Construct a reconfiguration controller module to apply the strategy obtained in step S6 to the stream processing system, thereby completing elastic scaling stream processing based on adaptive load partitioning.
步骤S2所述的构建DKG模型用于将数据分配到下游算子实例,并管理实例中的计算状态,具体包括如下步骤:The construction of the DKG model described in step S2 is used to distribute data to downstream operator instances and manage the computing status in the instances, and specifically includes the following steps:
将输入的数据根据负载分区算法计算发送数据的通道,并且将数据从选中的通道发送到下游实例;Calculate the channel for sending the input data according to the load partitioning algorithm, and send the data from the selected channel to the downstream instance;
下游实例接收到输入的数据后,判定是否包括设定的标记:After receiving the input data, the downstream instance determines whether it contains the set tag:
若包括设定的标记,则直接将数据发送到对应的下游算子实例,从而将物理分区应用到逻辑分区;If the set tag is included, the data is sent directly to the corresponding downstream operator instance, so that the physical partition is applied to the logical partition;
否则,根据哈希分区方法实现逻辑分区,然后将数据根据逻辑分区的结果发送到下游算子实例;Otherwise, implement logical partitioning according to the hash partitioning method, and then send the data to the downstream operator instance according to the result of the logical partitioning;
最后,采用如下步骤管理输入数据的状态:Finally, the state of the input data is managed as follows:
当数据传输到下游算子实例时,采用如下算式计算数据的KG值:When data is transferred to the downstream operator instance, the KG value of the data is calculated using the following formula:
KG=murmurhash(hashcode(key))%Pmax KG=murmurhash(hashcode(key))%P max
式中murmurhash()为murmur哈希函数;hashcode()为乘法哈希函数;Pmax为流处理系统支持的最大并行度;key为输入数据的键;%为取余操作;Where murmurhash() is the murmur hash function; hashcode() is the multiplication hash function; P max is the maximum parallelism supported by the stream processing system; key is the key of the input data; % is the remainder operation;
然后,在采用如下算式计算KG值对应的存储位置SI:Then, the storage location SI corresponding to the KG value is calculated using the following formula:
式中NKG为流处理系统支持的最大keyGroup数量;Ninst为实例的数量;/为除法操作并截取整数部分;Where N KG is the maximum number of keyGroups supported by the stream processing system; N inst is the number of instances; / is a division operation and truncates the integer part;
再根据得到的存储位置SI从本地状态后端中获取状态:Then get the status from the local status backend according to the obtained storage location SI:
若输入数据的状态不存在,则在本地状态后端中创建一个新的状态;If the state of the input data does not exist, create a new state in the local state backend;
若输入数据的状态存在,则直接获取对应的状态;If the state of the input data exists, directly obtain the corresponding state;
执行有状态的计算,并将更新后的状态存储到状态后端;Perform stateful computations and store updated states in the state backend;
最终,将若干个算子的计算结果相加得到统一的计算结果,从而保证计算结果的正确性。Finally, the calculation results of several operators are added together to obtain a unified calculation result, thereby ensuring the correctness of the calculation result.
步骤S3所述的构建指标收集器模型,用于收集和存储流处理系统的性能指标数据,具体包括如下步骤:The construction of the indicator collector model described in step S3 is used to collect and store the performance indicator data of the stream processing system, and specifically includes the following steps:
每个算子实例初始化本地指标收集器,用于存储有效时间指标、处理数据量指标、输出数据量指标和选择输出通道过程的时间开销指标;指标收集器初始化时,从配置文件中获取指标持久化过程的窗口长度和存储路径;Each operator instance initializes a local indicator collector to store effective time indicators, processed data volume indicators, output data volume indicators, and time overhead indicators for selecting output channels. When the indicator collector is initialized, the window length and storage path of the indicator persistence process are obtained from the configuration file.
每次处理数据时,计算处理所用的有效时间:在数据反序列化之前记录当时的纳秒时间,当数据处理结束并完成序列化过程后再记录当时的纳秒时间,将记录的两个纳秒时间相减得到有效时间,并将有效时间累加到指标收集器中的有效时间指标中;Each time data is processed, the effective time used for processing is calculated: the nanosecond time is recorded before data deserialization, and the nanosecond time is recorded again after data processing is completed and the serialization process is completed. The effective time is obtained by subtracting the two recorded nanosecond times, and the effective time is accumulated to the effective time indicator in the indicator collector;
更新处理数据量指标、输出数据量指标和选择输出通道过程的时间开销指标:反序列化得到一个数据后,处理数据量指标增加1;序列化与一个数据等待输出时,输出数据量指标增加1;以时间差的方式计算选择输出通道过程的时间开销指标;Update the processing data volume index, output data volume index and time overhead index of the output channel selection process: after deserialization to obtain a data, the processing data volume index increases by 1; when serialization and a data is waiting to be output, the output data volume index increases by 1; the time overhead index of the output channel selection process is calculated in the form of time difference;
处理完成一个数据后,在处理过程的最后判断此时记录的纳秒时间与开始的纳秒时间之间的差值是否超过配置的窗口长度:After processing a data, at the end of the processing process, determine whether the difference between the recorded nanosecond time and the starting nanosecond time exceeds the configured window length:
若未超过配置的窗口大小,则不执行任何操作;If the configured window size is not exceeded, no action is performed;
若超过了配置的窗口大小,则执行指标计算和存储操作:If the configured window size is exceeded, the indicator calculation and storage operations are performed:
指标计算包括采用如下算式计算真实处理率和真实输出率:The indicator calculation includes the following formula to calculate the real processing rate and the real output rate:
式中Rtrue-proc为真实处理率;Nproc为处理数据的数量;Tuseful为用于数据处理的有效时间;Rtrue-output为真实输出率;Noutput为输出数据的数量;Where R true-proc is the true processing rate; N proc is the amount of processed data; T useful is the effective time for data processing; R true-output is the true output rate; N output is the amount of output data;
计算完成后,将计算结果存储到性能指标数据文件。After the calculation is completed, the calculation results are stored in the performance indicator data file.
步骤S4所述的将步骤S3存储的性能指标数据共享,具体包括如下步骤:The step S4 of sharing the performance indicator data stored in step S3 specifically includes the following steps:
采用Samba、inotify和mv工具实现性能指标数据文件的实时共享;Use Samba, inotify and mv tools to achieve real-time sharing of performance indicator data files;
在流处理系统启动之前,先配置Samba实现文件夹共享,并设置inotify监控流处理系统中配置的性能指标文件存储路径;Before starting the stream processing system, configure Samba to implement folder sharing and set the storage path of the performance indicator file configured in the inotify monitoring stream processing system.
当流处理系统存储一个性能指标数据文件时,inotify产生性能指标数据文件的完整路径,并触发mv操作将性能指标数据文件移动到本地共享文件夹;When the stream processing system stores a performance indicator data file, inotify generates the full path of the performance indicator data file and triggers the mv operation to move the performance indicator data file to the local shared folder;
Samba通过SMB协议在多个主机节点共享性能指标数据文件,使系统能够访问流处理系统的性能指标数据。Samba shares performance indicator data files on multiple host nodes through the SMB protocol, enabling the system to access the performance indicator data of the stream processing system.
步骤S5所述的构建判别器模型,用于计算弹性缩放策略实施因子和负载分区策略实施因子,具体包括如下步骤:The construction of the discriminator model described in step S5 is used to calculate the elastic scaling strategy implementation factor and the load partitioning strategy implementation factor, and specifically includes the following steps:
采用如下步骤计算得到弹性缩放策略实施因子:The elastic scaling policy implementation factor is calculated using the following steps:
从Samba共享文件系统中读取性能指标数据;Read performance metrics data from a Samba shared file system;
判别器读取每一个性能指标文件,并将文件中算子实例的速率信息添加到任务的拓扑结构中;The discriminator reads each performance indicator file and adds the rate information of the operator instances in the file to the topology of the task;
在将全部的性能指标数据依据算子进行整合之后,计算每个算子的真实输入速率和平均真实处理速率;并采用真实输入速率与真实处理速率的比值向上取整作为最优的算子并行度(即资源分配数量);After integrating all performance indicator data according to operators, calculate the real input rate and average real processing rate of each operator; and use the ratio of the real input rate to the real processing rate rounded up as the optimal operator parallelism (i.e., the number of resource allocations);
比较最优的算子并行度和当前的算子并行度,并计算得到弹性缩放策略实施因子:Compare the optimal operator parallelism with the current operator parallelism, and calculate the elastic scaling strategy implementation factor:
若最优的算子并行度和当前的算子并行度之间的差异总数超过设定阈值,则将弹性缩放策略实施因子设定为第一设定值,表示需要执行弹性缩放策略;If the total difference between the optimal operator parallelism and the current operator parallelism exceeds the set threshold, the elastic scaling strategy implementation factor is set to the first set value, indicating that the elastic scaling strategy needs to be executed;
若最优的算子并行度和当前的算子并行度之间的差异总数未超过设定阈值,则将弹性缩放策略实施因子设定为第二设定值,表示不改变当前的算子并行度;If the total difference between the optimal operator parallelism and the current operator parallelism does not exceed the set threshold, the elastic scaling strategy implementation factor is set to the second set value, indicating that the current operator parallelism is not changed;
采用如下步骤计算得到负载分区策略实施因子:The load partitioning strategy implementation factor is calculated using the following steps:
从全部的性能指标数据中筛选出逻辑分区下游的算子性能指标数据,并读取全部算子实例的性能指标数据,然后将性能指标文件中的观测处理速率求倒数作为算子实例的观测处理时间;Filter out the operator performance indicator data downstream of the logical partition from all the performance indicator data, read the performance indicator data of all operator instances, and then calculate the inverse of the observed processing rate in the performance indicator file as the observed processing time of the operator instance;
获取最大的观测处理时间和最小的观测处理时间,并计算两者之间的差值作为负载失衡状态下最大等待时间;Obtain the maximum observed processing time and the minimum observed processing time, and calculate the difference between the two as the maximum waiting time under the load imbalance state;
使用排队论方法计算负载均衡状态下的排队时间:将每个算子的实例建模成为GI/G/1排队模型,采用公式估算每个算子实例的平均排队时间,其中Tqueue为估算得到的每个算子实例的平均排队时间,ρ为利用率,ca为到达时间变异系数,cs为服务时间变异系数,Rture-proc为算子实例所对应的真实处理率;Use the queuing theory method to calculate the queuing time under load balancing: model each operator instance into a GI/G/1 queuing model, using the formula Estimate the average queuing time of each operator instance, where T queue is the estimated average queuing time of each operator instance, ρ is the utilization rate, c a is the arrival time variation coefficient, c s is the service time variation coefficient, and R ture-proc is the actual processing rate corresponding to the operator instance;
计算得到每个算子的负载失衡状态下最大等待时间和负载均衡状态下平均排队时间的差值,用于表示由于负载失衡产生的额外等待时间;The difference between the maximum waiting time in the load imbalance state and the average queuing time in the load balance state of each operator is calculated to represent the additional waiting time caused by load imbalance.
将每个算子的平均利用率乘以额外等待时间,作为最终的额外等待时间判定值;Multiply the average utilization of each operator by the additional waiting time to obtain the final additional waiting time determination value.
统计负载分区(即选择输出通道过程)的时间,并乘以一个设定的系数,作为最终的负载分区时间判定值;The time of load partitioning (i.e., the process of selecting output channels) is counted and multiplied by a set coefficient as the final load partitioning time determination value;
最后,对额外等待时间判定值和负载分区时间判定值进行大小判定:Finally, the additional waiting time determination value and the load partition time determination value are sized:
若额外等待时间判定值大于负载分区时间判定值,则将负载分区策略实施因子设定为第三设定值,表示需要执行负载分区策略;If the additional waiting time determination value is greater than the load partitioning time determination value, the load partitioning strategy implementation factor is set to a third setting value, indicating that the load partitioning strategy needs to be executed;
若额外等待时间判定值小于或等于负载分区时间判定值,则将负载分区策略实施因子设定为第四设定值,表示不需要执行负载分区策略。If the additional waiting time determination value is less than or equal to the load partitioning time determination value, the load partitioning strategy implementation factor is set to a fourth setting value, indicating that the load partitioning strategy does not need to be executed.
步骤S6所述的弹性缩放策略,具体为采用如下步骤构建弹性缩放策略:The elastic scaling strategy described in step S6 is specifically constructed by adopting the following steps:
获取性能指标数据;Obtain performance indicator data;
读取表示处理任务的有向无环图,并使用邻接链表的形式存储;Read the directed acyclic graph representing the processing task and store it in the form of an adjacency linked list;
使用拓扑排序算法得到任务从源算子(Source)到汇算子(Sink)的算子排序;Use the topological sorting algorithm to get the operator order of tasks from the source operator (Source) to the sink operator (Sink);
将源算子设定的期望输出速率作为下游第一个算子的输入速率,并计算算子的并行度P为Rtrue-input为真实输入速率;Rtrue-proc为真实处理速率;The expected output rate set by the source operator is used as the input rate of the first downstream operator, and the parallelism P of the operator is calculated as R true-input is the real input rate; R true-proc is the real processing rate;
计算得到真实输出率Rtrue-output为其中Noutput为算子实例的输出数据的数量,Tuseful为用于数据处理的有效时间,并将真实输出率作为下游算子的真实输入率;The true output rate R true-output is calculated as Where N output is the number of output data of the operator instance, T useful is the effective time for data processing, and the actual output rate is used as the actual input rate of the downstream operator;
不断迭代以上计算过程,从拓扑结构的源算子到汇算子,逐个计算算子的并行度。The above calculation process is continuously iterated, from the source operator to the sink operator of the topological structure, and the parallelism of the operators is calculated one by one.
步骤S6所述的负载分区策略,具体为采用如下步骤构建负载分区策略:The load partitioning strategy described in step S6 is specifically constructed by adopting the following steps:
获取性能指标数据;Obtain performance indicator data;
创建一个固定长度的哈希映射存储不同输入数据的频数,且哈希映射仅存储可能的热数据;同时,创建一个数组存储不同通道发送的数据量;Create a fixed-length hash map to store the frequency of different input data, and the hash map only stores possible hot data; at the same time, create an array to store the amount of data sent by different channels;
当输入一条数据时,判断该数据是否已经记录在哈希映射上:When a piece of data is input, determine whether the data has been recorded in the hash map:
若数据已经记录在哈希映射上,则将数据对应的频数增加1;If the data has been recorded in the hash map, increase the frequency corresponding to the data by 1;
若数据未记录在哈希映射上且哈希映射没有达到容量上限,则将数据记录到哈希映射上,并记录数据对应的频数为1;If the data is not recorded in the hash map and the hash map has not reached the upper limit of capacity, the data is recorded in the hash map and the frequency corresponding to the data is recorded as 1;
若数据未记录在哈希映射上且哈希映射达到容量上限,则将哈希映射记录的数据中频数最低的数据替换为当前的数据,并记录数据对应的频数加1;If the data is not recorded in the hash map and the hash map reaches its capacity limit, the data with the lowest frequency in the data recorded in the hash map is replaced with the current data, and the frequency corresponding to the recorded data is increased by 1;
每输入一次数据,更新输入数据的数量;当输入的数量达到设定值时,将哈希映射记录的所有数据所对应的频数均按设定的比例缩小;Each time data is input, the number of input data is updated; when the number of inputs reaches the set value, the frequencies corresponding to all data recorded in the hash map are reduced according to the set ratio;
每一次频数更新时,采用如下步骤计算当前的热数据:Each time the frequency is updated, the following steps are used to calculate the current hot data:
计算哈希映射中所有数据占全部输入数据量的频率Fkey表示数据的频率,Nkey表示哈希映射中数据的数量,Ntotal表示全部输入数据量;Calculate the frequency of all data in the hash map as a percentage of the total input data F key represents the frequency of data, N key represents the number of data in the hash map, and N total represents the total amount of input data;
利用设置的参数,通过算式计算热数据阈值,θhot表示热数据阈值,P表示并行度,θdef表示用户定义的用于调整热数据阈值的参数;Using the set parameters, through the formula Calculate the hot data threshold, θ hot represents the hot data threshold, P represents the parallelism, and θ def represents the user-defined parameter for adjusting the hot data threshold;
比较哈希映射中数据频率和热数据阈值的大小:数据频率超过热数据阈值的数据认为是热数据;Compare the data frequency and hot data threshold in the hash map: data with a frequency exceeding the hot data threshold is considered hot data;
对输入的数据进行判断:Judge the input data:
若输入的数据为热数据,则选择所有输出通道中已经输出的数据量最少的通道作为选中的输出通道;If the input data is hot data, the channel with the least amount of data output among all output channels is selected as the selected output channel;
若输入的数据非热数据,则选择已设定的两个通道中发送数据最少的通道作为选中的通道;If the input data is not hot data, the channel with the least data sent out of the two channels is selected as the selected channel;
选中通道后,更新通道发送的数据量。After selecting a channel, update the amount of data sent by the channel.
本发明还公开了一种实现所述基于自适应负载分区的弹性缩放流处理方法的系统,包括Flink系统模块、DKG模块、指标收集器模块、指标文件共享模块、判别器模块、弹性缩放策略生成模块、负载分区策略生成模块和重配置控制模块;Flink系统模块、DKG模块、指标收集器模块、指标文件共享模块和判别器模块依次串联;判别器模块的输出端同时连接弹性缩放策略生成模块和负载分区策略生成模块的输入端;弹性缩放策略生成模块和负载分区策略生成模块的输出端同时连接重配置控制模块;重配置控制模块的输出端连接Flink系统模块;Flink系统模块用于构建流处理系统;DKG模块用于构建DKG模型,将数据分配到下游算子实例,并管理实例中的计算状态;指标收集器模块用于构建指标收集器模型,收集和存储流处理系统的指标数据;指标文件共享模块用于共享存储的性能指标数据文件;判别器模块用于构建判别器模型,根据共享的性能指标数据,计算弹性缩放策略实施因子和负载分区策略实施因子;弹性缩放策略生成模块用于根据弹性缩放策略实施因子,生成对应的弹性缩放策略;负载分区策略生成模块用于根据负载分区策略实施因子,生成对应的负载分区策略;重配置控制模块用于将得到的弹性缩放策略和负载分区策略应用到流处理系统中,从而完成基于自适应负载分区的弹性缩放流处理。The present invention also discloses a system for implementing the elastic scaling stream processing method based on adaptive load partitioning, comprising a Flink system module, a DKG module, an indicator collector module, an indicator file sharing module, a discriminator module, an elastic scaling strategy generation module, a load partitioning strategy generation module and a reconfiguration control module; the Flink system module, the DKG module, the indicator collector module, the indicator file sharing module and the discriminator module are sequentially connected in series; the output end of the discriminator module is simultaneously connected to the input ends of the elastic scaling strategy generation module and the load partitioning strategy generation module; the output ends of the elastic scaling strategy generation module and the load partitioning strategy generation module are simultaneously connected to the reconfiguration control module; the output end of the reconfiguration control module is connected to the Flink system module; the Flink system module is used to construct a stream processing system; the DKG module is used to Build a DKG model, distribute data to downstream operator instances, and manage the computing status in the instances; the indicator collector module is used to build an indicator collector model to collect and store indicator data of the stream processing system; the indicator file sharing module is used to share the stored performance indicator data files; the discriminator module is used to build a discriminator model to calculate the elastic scaling policy implementation factor and the load partitioning policy implementation factor based on the shared performance indicator data; the elastic scaling policy generation module is used to generate the corresponding elastic scaling policy based on the elastic scaling policy implementation factor; the load partitioning strategy generation module is used to generate the corresponding load partitioning strategy based on the load partitioning strategy implementation factor; the reconfiguration control module is used to apply the obtained elastic scaling strategy and load partitioning strategy to the stream processing system, thereby completing elastic scaling stream processing based on adaptive load partitioning.
本发明提供的这种基于自适应负载分区的弹性缩放流处理方法及系统,将负载分区技术与弹性缩放技术相结合,还提出了自适应负载分区方案;因此本发明不仅能够在均衡的数据流上实现较低的端到端处理延迟和较高的吞吐量,而且能够在倾斜的数据流上依然实现较低的端到端处理延迟和较高的吞吐量;此外,本发明可靠性高、实施效果好且科学合理。The elastic scaling stream processing method and system based on adaptive load partitioning provided by the present invention combines load partitioning technology with elastic scaling technology, and also proposes an adaptive load partitioning scheme; therefore, the present invention can not only achieve lower end-to-end processing delay and higher throughput on balanced data streams, but also can achieve lower end-to-end processing delay and higher throughput on tilted data streams; in addition, the present invention has high reliability, good implementation effect and is scientific and reasonable.
附图说明BRIEF DESCRIPTION OF THE DRAWINGS
图1为本发明的方法流程示意图。FIG1 is a schematic flow chart of the method of the present invention.
图2为本发明的方法中的DKG模型的处理过程示意图。FIG. 2 is a schematic diagram of the processing process of the DKG model in the method of the present invention.
图3为本发明的方法中的指标收集器模型的处理过程示意图。FIG. 3 is a schematic diagram of the processing process of the indicator collector model in the method of the present invention.
图4为本发明的方法中的指标文件共享示意图。FIG. 4 is a schematic diagram of indicator file sharing in the method of the present invention.
图5为本发明的方法中的负载分区策略中的频率更新示意图。FIG5 is a schematic diagram of frequency update in the load partitioning strategy in the method of the present invention.
图6为本发明的系统功能模块图。FIG. 6 is a system function module diagram of the present invention.
具体实施方式DETAILED DESCRIPTION
如图1所示为本发明的方法流程示意图:本发明提供的这种基于自适应负载分区的弹性缩放流处理方法,包括如下步骤:FIG1 is a schematic diagram of the method flow of the present invention: the elastic scaling flow processing method based on adaptive load partitioning provided by the present invention comprises the following steps:
S1.基于Flink原型,以现有技术构建流处理系统;S1. Build a stream processing system based on the Flink prototype using existing technologies;
S2.基于步骤S1构建的流处理系统,构建DKG(Duplicated Key Group,重复键组)模型用于将数据分配到下游算子实例,并管理实例中的计算状态;具体包括如下步骤(如图2所示):S2. Based on the stream processing system constructed in step S1, a DKG (Duplicated Key Group) model is constructed to distribute data to downstream operator instances and manage the computing status in the instances; specifically, the steps are as follows (as shown in FIG2 ):
将输入的数据根据负载分区算法计算发送数据的通道,并且将数据从选中的通道发送到下游实例;Calculate the channel for sending the input data according to the load partitioning algorithm, and send the data from the selected channel to the downstream instance;
下游实例接收到输入的数据后,判定是否包括设定的标记:After receiving the input data, the downstream instance determines whether it contains the set tag:
若包括设定的标记,则直接将数据发送到对应的下游算子实例,从而将物理分区应用到逻辑分区;If the set tag is included, the data is sent directly to the corresponding downstream operator instance, so that the physical partition is applied to the logical partition;
否则,根据哈希分区方法实现逻辑分区,然后将数据根据逻辑分区的结果发送到下游算子实例;Otherwise, implement logical partitioning according to the hash partitioning method, and then send the data to the downstream operator instance according to the result of the logical partitioning;
最后,采用如下步骤管理输入数据的状态:Finally, the state of the input data is managed as follows:
当数据传输到下游算子实例时,采用如下算式计算数据的KG值:When data is transferred to the downstream operator instance, the KG value of the data is calculated using the following formula:
KG=murmurhash(hashcode(key))%Pmax KG=murmurhash(hashcode(key))%P max
式中murmurhash()为murmur哈希函数;hashcode()为乘法哈希函数;Pmax为流处理系统支持的最大并行度;key为输入数据的键;%为取余操作;Where murmurhash() is the murmur hash function; hashcode() is the multiplication hash function; P max is the maximum parallelism supported by the stream processing system; key is the key of the input data; % is the remainder operation;
然后,在采用如下算式计算KG值对应的存储位置SI:Then, the storage location SI corresponding to the KG value is calculated using the following formula:
式中NKG为流处理系统支持的最大keyGroup数量;Ninst为实例的数量;/为除法操作并截取整数部分;Where N KG is the maximum number of keyGroups supported by the stream processing system; N inst is the number of instances; / is a division operation and truncates the integer part;
再根据得到的存储位置SI从本地状态后端中获取状态:Then get the status from the local status backend according to the obtained storage location SI:
若输入数据的状态不存在,则在本地状态后端中创建一个新的状态;If the state of the input data does not exist, create a new state in the local state backend;
若输入数据的状态存在,则直接获取对应的状态;If the state of the input data exists, directly obtain the corresponding state;
执行有状态的计算,并将更新后的状态存储到状态后端;Perform stateful computations and store updated states in the state backend;
最终,将若干个算子的计算结果相加得到统一的计算结果,从而保证计算结果的正确性;Finally, the calculation results of several operators are added together to obtain a unified calculation result, thereby ensuring the correctness of the calculation result;
S3.构建指标收集器模型,用于收集和存储流处理系统的性能指标数据;具体包括如下步骤(如图3所示):S3. Build an indicator collector model to collect and store performance indicator data of the stream processing system; specifically, the following steps are included (as shown in Figure 3):
每个算子实例初始化本地指标收集器,用于存储有效时间指标、处理数据量指标、输出数据量指标和选择输出通道过程的时间开销指标;指标收集器初始化时,从配置文件中获取指标持久化过程的窗口长度和存储路径;Each operator instance initializes a local indicator collector to store effective time indicators, processed data volume indicators, output data volume indicators, and time overhead indicators for selecting output channels. When the indicator collector is initialized, the window length and storage path of the indicator persistence process are obtained from the configuration file.
每次处理数据时,计算处理所用的有效时间:在数据反序列化之前记录当时的纳秒时间,当数据处理结束并完成序列化过程后再记录当时的纳秒时间,将记录的两个纳秒时间相减得到有效时间,并将有效时间累加到指标收集器中的有效时间指标中;有效时间排除了计算过程中等待读写的时间,能够更加准确地反映系统的处理性能;Each time data is processed, the effective time used for processing is calculated: the nanosecond time is recorded before data deserialization, and the nanosecond time is recorded again after data processing and serialization is completed. The effective time is obtained by subtracting the two recorded nanosecond times, and the effective time is accumulated to the effective time indicator in the indicator collector; the effective time excludes the time waiting for reading and writing during the calculation process, which can more accurately reflect the processing performance of the system;
更新处理数据量指标、输出数据量指标和选择输出通道过程的时间开销指标:反序列化得到一个数据后,处理数据量指标增加1;序列化与一个数据等待输出时,输出数据量指标增加1;以时间差的方式计算选择输出通道过程的时间开销指标;Update the processing data volume index, output data volume index and time overhead index of the output channel selection process: after deserialization to obtain a data, the processing data volume index increases by 1; when serialization and a data is waiting to be output, the output data volume index increases by 1; the time overhead index of the output channel selection process is calculated in the form of time difference;
处理完成一个数据后,在处理过程的最后判断此时记录的纳秒时间与开始的纳秒时间之间的差值是否超过配置的窗口长度:After processing a data, at the end of the processing process, determine whether the difference between the recorded nanosecond time and the starting nanosecond time exceeds the configured window length:
若未超过配置的窗口大小,则不执行任何操作;If the configured window size is not exceeded, no action is performed;
若超过了配置的窗口大小,则执行指标计算和存储操作:If the configured window size is exceeded, the indicator calculation and storage operations are performed:
指标计算包括采用如下算式计算真实处理率和真实输出率:The indicator calculation includes the following formula to calculate the real processing rate and the real output rate:
式中Rtrue-proc为真实处理率;Nproc为处理数据的数量;Tuseful为用于数据处理的有效时间;Rtrue-output为真实输出率;Noutput为输出数据的数量;Where R true-proc is the true processing rate; N proc is the amount of processed data; T useful is the effective time for data processing; R true-output is the true output rate; N output is the amount of output data;
计算完成后,将计算结果存储到性能指标数据文件;After the calculation is completed, the calculation results are stored in the performance indicator data file;
完成DKG模型和指标收集器模型设计后,采用Maven工具编译Flink源程序;编译完成之后,将生成的Flink系统部署到分布式环境中;After completing the design of the DKG model and the indicator collector model, use the Maven tool to compile the Flink source program. After the compilation is complete, deploy the generated Flink system to a distributed environment.
S4.将步骤S3存储的性能指标数据共享;具体包括如下步骤(如图4所示):S4. Sharing the performance indicator data stored in step S3; specifically comprising the following steps (as shown in FIG4 ):
采用Samba、inotify和mv工具实现性能指标数据文件的实时共享;Use Samba, inotify and mv tools to achieve real-time sharing of performance indicator data files;
在流处理系统启动之前,先配置Samba实现文件夹共享,并设置inotify监控流处理系统中配置的性能指标文件存储路径;Before starting the stream processing system, configure Samba to implement folder sharing and set the storage path of the performance indicator file configured in the inotify monitoring stream processing system.
当流处理系统存储一个性能指标数据文件时,inotify产生性能指标数据文件的完整路径,并触发mv操作将性能指标数据文件移动到本地共享文件夹;When the stream processing system stores a performance indicator data file, inotify generates the full path of the performance indicator data file and triggers the mv operation to move the performance indicator data file to the local shared folder;
Samba通过SMB协议在多个主机节点共享性能指标数据文件,使系统能够访问流处理系统的性能指标数据;Samba shares performance indicator data files on multiple host nodes through the SMB protocol, enabling the system to access the performance indicator data of the stream processing system;
S5.构建判别器模型,用于计算弹性缩放策略实施因子和负载分区策略实施因子;具体包括如下步骤:S5. Construct a discriminator model to calculate the elastic scaling strategy implementation factor and the load partitioning strategy implementation factor; specifically, the following steps are included:
采用如下步骤计算得到弹性缩放策略实施因子:The elastic scaling policy implementation factor is calculated using the following steps:
从Samba共享文件系统中读取性能指标数据;Read performance metrics data from a Samba shared file system;
判别器读取每一个性能指标文件,并将文件中算子实例的速率信息添加到任务的拓扑结构中;The discriminator reads each performance indicator file and adds the rate information of the operator instances in the file to the topology of the task;
在将全部的性能指标数据依据算子进行整合之后,计算每个算子的真实输入速率和平均真实处理速率;并采用真实输入速率与真实处理速率的比值向上取整作为最优的算子并行度(即资源分配数量);After integrating all performance indicator data according to operators, calculate the real input rate and average real processing rate of each operator; and use the ratio of the real input rate to the real processing rate rounded up as the optimal operator parallelism (i.e., the number of resource allocations);
比较最优的算子并行度和当前的算子并行度,并计算得到弹性缩放策略实施因子:Compare the optimal operator parallelism with the current operator parallelism, and calculate the elastic scaling strategy implementation factor:
若最优的算子并行度和当前的算子并行度之间的差异总数超过设定阈值,则将弹性缩放策略实施因子设定为第一设定值,表示需要执行弹性缩放策略;If the total difference between the optimal operator parallelism and the current operator parallelism exceeds the set threshold, the elastic scaling strategy implementation factor is set to the first set value, indicating that the elastic scaling strategy needs to be executed;
若最优的算子并行度和当前的算子并行度之间的差异总数未超过设定阈值,则将弹性缩放策略实施因子设定为第二设定值,表示不改变当前的算子并行度;If the total difference between the optimal operator parallelism and the current operator parallelism does not exceed the set threshold, the elastic scaling strategy implementation factor is set to the second set value, indicating that the current operator parallelism is not changed;
由于负载分区策略应用于每个输入数据,并且相比于数据的处理时间,负载分区策略具有更大的时间开销;不仅如此,在一些情况下并不需要使用负载分区策略;因此,采用如下步骤计算得到负载分区策略实施因子:Since the load partitioning strategy is applied to each input data and has a larger time overhead than the data processing time; moreover, the load partitioning strategy is not needed in some cases; therefore, the load partitioning strategy implementation factor is calculated using the following steps:
从全部的性能指标数据中筛选出逻辑分区下游的算子性能指标数据,并读取全部算子实例的性能指标数据,然后将性能指标文件中的观测处理速率求倒数作为算子实例的观测处理时间;Filter out the operator performance indicator data downstream of the logical partition from all the performance indicator data, read the performance indicator data of all operator instances, and then calculate the inverse of the observed processing rate in the performance indicator file as the observed processing time of the operator instance;
获取最大的观测处理时间和最小的观测处理时间,并计算两者之间的差值作为负载失衡状态下最大等待时间;由于算子的不同实例之间的真实处理时间几乎相同,观测处理时间之间的差异表现为等待数据输入的时间差异;并且如果由负载失衡导致处理性能下降,至少存在一个算子实例处于满负荷状态,其观测处理时间最小;因此,算子实例之间由于负载失衡产生的最大等待时间使用实例最大处理时间和最小处理时间之间的差值表示;Get the maximum observed processing time and the minimum observed processing time, and calculate the difference between the two as the maximum waiting time under load imbalance; since the actual processing time between different instances of the operator is almost the same, the difference between the observed processing time is manifested as the difference in the time waiting for data input; and if the processing performance is degraded due to load imbalance, there is at least one operator instance in a full load state, and its observed processing time is the smallest; therefore, the maximum waiting time caused by load imbalance between operator instances is represented by the difference between the maximum processing time and the minimum processing time of the instance;
为了排除均衡状态下的排队时间的影响,使用排队论方法计算负载均衡状态下的排队时间:理想的负载均衡场景下,全部算子实例的输入速率和真实处理速率都相同;因此将每个算子的实例建模成为GI/G/1排队模型,采用公式估算每个算子实例的平均排队时间,其中Tqueue为估算得到的每个算子实例的平均排队时间,ρ为利用率,ca为到达时间变异系数,cs为服务时间变异系数,Rture-proc为算子实例所对应的真实处理率;In order to eliminate the influence of the queue time in the balanced state, the queue time in the load balanced state is calculated using the queue theory method: in the ideal load balancing scenario, the input rate and the actual processing rate of all operator instances are the same; therefore, each operator instance is modeled as a GI/G/1 queue model, using the formula Estimate the average queuing time of each operator instance, where T queue is the estimated average queuing time of each operator instance, ρ is the utilization rate, c a is the arrival time variation coefficient, c s is the service time variation coefficient, and R ture-proc is the actual processing rate corresponding to the operator instance;
计算得到每个算子的负载失衡状态下最大等待时间和负载均衡状态下平均排队时间的差值,用于表示由于负载失衡产生的额外等待时间;The difference between the maximum waiting time in the load imbalance state and the average queuing time in the load balance state of each operator is calculated to represent the additional waiting time caused by load imbalance.
当负载处于均衡状态且每个算子实例的利用率较低时,通过以上方法计算的等待时间较大;原因是由于实例资源利用率低,算子观测处理率之间的小误差导致计算结果被放大;因此,将每个算子的平均利用率乘以额外等待时间,作为最终的额外等待时间判定值;When the load is balanced and the utilization of each operator instance is low, the waiting time calculated by the above method is large. This is because the small error between the operator observation processing rates causes the calculation result to be amplified due to the low instance resource utilization. Therefore, the average utilization of each operator is multiplied by the additional waiting time to obtain the final additional waiting time determination value.
统计负载分区(即选择输出通道过程)的时间,并乘以一个设定的系数,作为最终的负载分区时间判定值;The time of load partitioning (i.e., the process of selecting output channels) is counted and multiplied by a set coefficient as the final load partitioning time determination value;
最后,对额外等待时间判定值和负载分区时间判定值进行大小判定:Finally, the additional waiting time determination value and the load partition time determination value are sized:
若额外等待时间判定值大于负载分区时间判定值,则将负载分区策略实施因子设定为第三设定值,表示需要执行负载分区策略;If the additional waiting time determination value is greater than the load partitioning time determination value, the load partitioning strategy implementation factor is set to a third setting value, indicating that the load partitioning strategy needs to be executed;
若额外等待时间判定值小于或等于负载分区时间判定值,则将负载分区策略实施因子设定为第四设定值,表示不需要执行负载分区策略;If the additional waiting time determination value is less than or equal to the load partition time determination value, the load partition strategy implementation factor is set to a fourth setting value, indicating that the load partition strategy does not need to be executed;
S6.根据步骤S5得到的弹性缩放策略实施因子和负载分区策略实施因子,构建对应的弹性缩放策略和负载分区策略;S6. According to the elastic scaling strategy implementation factor and the load partitioning strategy implementation factor obtained in step S5, construct the corresponding elastic scaling strategy and load partitioning strategy;
具体实施时,弹性缩放策略具体为采用如下步骤构建弹性缩放策略:In specific implementation, the elastic scaling strategy is constructed by using the following steps:
获取性能指标数据;Obtain performance indicator data;
读取表示处理任务的有向无环图,并使用邻接链表的形式存储;Read the directed acyclic graph representing the processing task and store it in the form of an adjacency linked list;
使用拓扑排序算法得到任务从源算子(Source)到汇算子(Sink)的算子排序;Use the topological sorting algorithm to get the operator order of tasks from the source operator (Source) to the sink operator (Sink);
将源算子设定的期望输出速率作为下游第一个算子的输入速率,并计算算子的并行度P为Rtrue-input为真实输入速率;Rtrue-proc为真实处理速率;The expected output rate set by the source operator is used as the input rate of the first downstream operator, and the parallelism P of the operator is calculated as R true-input is the real input rate; R true-proc is the real processing rate;
计算得到真实输出率Rtrue-output为其中Noutput为算子实例的输出数据的数量,Tuseful为用于数据处理的有效时间,并将真实输出率作为下游算子的真实输入率;The true output rate R true-output is calculated as Where N output is the number of output data of the operator instance, T useful is the effective time for data processing, and the actual output rate is used as the actual input rate of the downstream operator;
不断迭代以上计算过程,从拓扑结构的源算子到汇算子,逐个计算算子的并行度;The above calculation process is continuously iterated, from the source operator to the sink operator of the topological structure, and the parallelism of the operators is calculated one by one;
负载分区的基本思想是对于频繁出现的热数据分配到全部的下游算子实例,而非频繁的冷数据则分配到固定的两个算子实例中;非频繁数据分配的候选实例通过两个哈希函数生成;因此,负载分区策略具体为采用如下步骤构建负载分区策略(如图5所示):The basic idea of load partitioning is to distribute frequently occurring hot data to all downstream operator instances, and infrequent cold data to two fixed operator instances. Candidate instances for infrequent data distribution are generated through two hash functions. Therefore, the load partitioning strategy is constructed by the following steps (as shown in Figure 5):
获取性能指标数据;Obtain performance indicator data;
创建一个固定长度的哈希映射存储不同输入数据的频数,且哈希映射仅存储可能的热数据;同时,创建一个数组存储不同通道发送的数据量;Create a fixed-length hash map to store the frequency of different input data, and the hash map only stores possible hot data; at the same time, create an array to store the amount of data sent by different channels;
当输入一条数据时,判断该数据是否已经记录在哈希映射上:When a piece of data is input, determine whether the data has been recorded in the hash map:
若数据已经记录在哈希映射上,则将数据对应的频数增加1;If the data has been recorded in the hash map, increase the frequency corresponding to the data by 1;
若数据未记录在哈希映射上且哈希映射没有达到容量上限,则将数据记录到哈希映射上,并记录数据对应的频数为1;If the data is not recorded in the hash map and the hash map has not reached the upper limit of capacity, the data is recorded in the hash map and the frequency corresponding to the data is recorded as 1;
若数据未记录在哈希映射上且哈希映射达到容量上限,则将哈希映射记录的数据中频数最低的数据替换为当前的数据,并记录数据对应的频数加1;If the data is not recorded in the hash map and the hash map reaches its capacity limit, the data with the lowest frequency in the data recorded in the hash map is replaced with the current data, and the frequency corresponding to the recorded data is increased by 1;
通过替换低频的数据实现了可能的热键更新,使得新的热数据不需要前期大量积累才能成为热数据;并且低频的数据与未记录的数据都属于冷数据,在数据分区上同等对待;如图5所示,下一时刻数据输入是“two”,且没有存储在哈希映射中;接着查找到其中频数最少的元素“the”,用(“two”,2)替换(“the”,1);紧接着下一个输入数据“a”,存在于哈希映射中,将哈希映射中的“a”元素项加1;By replacing low-frequency data, possible hot key updates are achieved, so that new hot data does not need a large amount of accumulation in the early stage to become hot data; and low-frequency data and unrecorded data are both cold data and are treated equally in data partitioning; as shown in Figure 5, the data input at the next moment is "two", which is not stored in the hash map; then the element "the" with the lowest frequency is found, and ("two", 2) is used to replace ("the", 1); then the next input data "a" exists in the hash map, and the "a" element item in the hash map is increased by 1;
大部分的负载分区方法都没有考虑到历史数据的热度随着时间的前进而减弱,从而对当前时刻的负载数据分布敏感度较低;负载分区策略生成器使用随时间前进而减弱历史数据频数的方法;每输入一次数据,更新输入数据的数量;当输入的数量达到设定值时,将哈希映射记录的所有数据所对应的频数均按设定的比例缩小(比如均乘以系数0.5);Most load partitioning methods do not take into account that the popularity of historical data decreases over time, and thus are less sensitive to the current load data distribution. The load partitioning strategy generator uses a method that reduces the frequency of historical data over time. Each time data is input, the number of input data is updated. When the number of inputs reaches the set value, the frequency of all data recorded in the hash map is reduced by a set ratio (for example, multiplied by a coefficient of 0.5).
每一次频数更新时,采用如下步骤计算当前的热数据:Each time the frequency is updated, the following steps are used to calculate the current hot data:
计算哈希映射中所有数据占全部输入数据量的频率Fkey表示数据的频率,Nkey表示哈希映射中数据的数量,Ntotal表示全部输入数据量;Calculate the frequency of all data in the hash map as a percentage of the total input data F key represents the frequency of data, N key represents the number of data in the hash map, and N total represents the total amount of input data;
利用设置的参数,通过算式计算热数据阈值,θhot表示热数据阈值,P表示并行度,θdef表示用户定义的用于调整热数据阈值的参数;Using the set parameters, through the formula Calculate the hot data threshold, θ hot represents the hot data threshold, P represents the parallelism, and θ def represents the user-defined parameter for adjusting the hot data threshold;
比较哈希映射中数据频率和热数据阈值的大小:数据频率超过热数据阈值的数据认为是热数据;Compare the data frequency and hot data threshold in the hash map: data with a frequency exceeding the hot data threshold is considered hot data;
对输入的数据进行判断:Judge the input data:
若输入的数据为热数据,则选择所有输出通道中已经输出的数据量最少的通道作为选中的输出通道;If the input data is hot data, the channel with the least amount of data output among all output channels is selected as the selected output channel;
若输入的数据非热数据,则选择已设定的两个通道中发送数据最少的通道作为选中的通道;If the input data is not hot data, the channel with the least data sent out of the two channels is selected as the selected channel;
选中通道后,更新通道发送的数据量;After selecting a channel, update the amount of data sent by the channel;
S7.构建重配置控制器模块,用于将步骤S6得到的策略应用到流处理系统中,从而完成基于自适应负载分区的弹性缩放流处理。S7. Construct a reconfiguration controller module to apply the strategy obtained in step S6 to the stream processing system, thereby completing elastic scaling stream processing based on adaptive load partitioning.
如图6所示为本发明的系统功能模块图:本发明公开的这种实现所述基于自适应负载分区的弹性缩放流处理方法的系统,包括Flink系统模块、DKG模块、指标收集器模块、指标文件共享模块、判别器模块、弹性缩放策略生成模块、负载分区策略生成模块和重配置控制模块;Flink系统模块、DKG模块、指标收集器模块、指标文件共享模块和判别器模块依次串联;判别器模块的输出端同时连接弹性缩放策略生成模块和负载分区策略生成模块的输入端;弹性缩放策略生成模块和负载分区策略生成模块的输出端同时连接重配置控制模块;重配置控制模块的输出端连接Flink系统模块;Flink系统模块用于构建流处理系统;DKG模块用于构建DKG模型,将数据分配到下游算子实例,并管理实例中的计算状态;指标收集器模块用于构建指标收集器模型,收集和存储流处理系统的指标数据;指标文件共享模块用于共享存储的性能指标数据文件;判别器模块用于构建判别器模型,根据共享的性能指标数据,计算弹性缩放策略实施因子和负载分区策略实施因子;弹性缩放策略生成模块用于根据弹性缩放策略实施因子,生成对应的弹性缩放策略;负载分区策略生成模块用于根据负载分区策略实施因子,生成对应的负载分区策略;重配置控制模块用于将得到的弹性缩放策略和负载分区策略应用到流处理系统中,从而完成基于自适应负载分区的弹性缩放流处理。As shown in FIG6 , a system functional module diagram of the present invention is shown: the system disclosed in the present invention for implementing the elastic scaling stream processing method based on adaptive load partitioning comprises a Flink system module, a DKG module, an indicator collector module, an indicator file sharing module, a discriminator module, an elastic scaling strategy generation module, a load partitioning strategy generation module and a reconfiguration control module; the Flink system module, the DKG module, the indicator collector module, the indicator file sharing module and the discriminator module are connected in series in sequence; the output end of the discriminator module is simultaneously connected to the input end of the elastic scaling strategy generation module and the load partitioning strategy generation module; the output end of the elastic scaling strategy generation module and the load partitioning strategy generation module are simultaneously connected to the reconfiguration control module; the output end of the reconfiguration control module is connected to the Flink system module; the Flink system module is used to construct a stream processing system system; the DKG module is used to build a DKG model, distribute data to downstream operator instances, and manage the computing status in the instances; the indicator collector module is used to build an indicator collector model to collect and store indicator data of the stream processing system; the indicator file sharing module is used to share the stored performance indicator data files; the discriminator module is used to build a discriminator model to calculate the elastic scaling policy implementation factor and the load partitioning policy implementation factor based on the shared performance indicator data; the elastic scaling policy generation module is used to generate the corresponding elastic scaling policy based on the elastic scaling policy implementation factor; the load partitioning strategy generation module is used to generate the corresponding load partitioning strategy based on the load partitioning strategy implementation factor; the reconfiguration control module is used to apply the obtained elastic scaling strategy and load partitioning strategy to the stream processing system, thereby completing elastic scaling stream processing based on adaptive load partitioning.
本发明提供的这种基于自适应负载分区的弹性缩放流处理方法及系统,可以应用于网络舆情分析和工业传感器数据监控等领域。The elastic scaling stream processing method and system based on adaptive load partitioning provided by the present invention can be applied to fields such as network public opinion analysis and industrial sensor data monitoring.
在网络舆情分析场景下,例如统计微博博文的热度,本发明不仅能够在低热度事件中实现基本的关键词热度统计功能,而且能够在高热度事件中保持正常的功能且不会出现系统崩溃。In the scenario of online public opinion analysis, such as counting the popularity of microblog posts, the present invention can not only realize the basic keyword popularity statistics function in low-heat events, but also maintain normal functions in high-heat events without system crashes.
在工业传感器数据监控场景下,将大量工业传感器采集的数据作为流数据,以本发明的流处理方法进行数据处理和分析,使得工业传感器数据的流处理过程具有低延迟和高吞吐量的特性,保障了数据处理的实时性,并且能够及时发现生产问题以避免损失。In the industrial sensor data monitoring scenario, the data collected by a large number of industrial sensors are used as stream data, and the stream processing method of the present invention is used to process and analyze the data, so that the stream processing process of the industrial sensor data has the characteristics of low latency and high throughput, ensuring the real-time performance of data processing and being able to detect production problems in a timely manner to avoid losses.
Claims (5)
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202210313490.5A CN114675969B (en) | 2022-03-28 | 2022-03-28 | Elastic scaling stream processing method and system based on self-adaptive load partition |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202210313490.5A CN114675969B (en) | 2022-03-28 | 2022-03-28 | Elastic scaling stream processing method and system based on self-adaptive load partition |
Publications (2)
Publication Number | Publication Date |
---|---|
CN114675969A CN114675969A (en) | 2022-06-28 |
CN114675969B true CN114675969B (en) | 2024-08-20 |
Family
ID=82075675
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202210313490.5A Active CN114675969B (en) | 2022-03-28 | 2022-03-28 | Elastic scaling stream processing method and system based on self-adaptive load partition |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN114675969B (en) |
Families Citing this family (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN115412501B (en) * | 2022-08-30 | 2024-08-09 | 哈尔滨工业大学 | Multi-level collaborative reconfiguration stream processing method based on Flink |
CN116319381B (en) * | 2023-05-25 | 2023-07-25 | 中国地质大学(北京) | Communication and resource-aware data stream grouping method and system |
Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN106471773A (en) * | 2014-06-30 | 2017-03-01 | 微软技术许可有限责任公司 | The distribution of integrated form global resource and load balance |
CN113924554A (en) * | 2019-05-30 | 2022-01-11 | 微软技术许可有限责任公司 | Automated cloud edge flow workload distribution and bi-directional migration with lossless one-time processing |
Family Cites Families (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US9858124B1 (en) * | 2015-10-05 | 2018-01-02 | Amazon Technologies, Inc. | Dynamic management of data stream processing |
CN114090226B (en) * | 2021-07-13 | 2024-10-29 | 中国科学院软件研究所 | Method and system for scheduling streaming computing engine based on load balancing |
-
2022
- 2022-03-28 CN CN202210313490.5A patent/CN114675969B/en active Active
Patent Citations (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN106471773A (en) * | 2014-06-30 | 2017-03-01 | 微软技术许可有限责任公司 | The distribution of integrated form global resource and load balance |
CN113924554A (en) * | 2019-05-30 | 2022-01-11 | 微软技术许可有限责任公司 | Automated cloud edge flow workload distribution and bi-directional migration with lossless one-time processing |
Also Published As
Publication number | Publication date |
---|---|
CN114675969A (en) | 2022-06-28 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN104915407B (en) | A kind of resource regulating method based under Hadoop multi-job environment | |
Hu et al. | Flutter: Scheduling tasks closer to data across geo-distributed datacenters | |
Huang et al. | Stochastic configuration networks based adaptive storage replica management for power big data processing | |
Ismaeel et al. | Proactive dynamic virtual-machine consolidation for energy conservation in cloud data centres | |
Wu et al. | Energy and migration cost-aware dynamic virtual machine consolidation in heterogeneous cloud datacenters | |
KR101827369B1 (en) | Apparatus and method for managing data stream distributed parallel processing service | |
CN114675969B (en) | Elastic scaling stream processing method and system based on self-adaptive load partition | |
CN118626263A (en) | Heterogeneous hardware computing power scheduling method, device, equipment and medium | |
Moens et al. | Hierarchical network-aware placement of service oriented applications in clouds | |
CN102227121A (en) | Method and system for adaptive switching of distributed cache strategy based on machine learning | |
CN103345514A (en) | Streamed data processing method in big data environment | |
CN102739785B (en) | Method for scheduling cloud computing tasks based on network bandwidth estimation | |
Wang et al. | Hybrid pulling/pushing for i/o-efficient distributed and iterative graph computing | |
CN112567359A (en) | System for optimizing storage replication in a distributed data analysis system using historical data access patterns | |
Zhong et al. | Assessing profit of prediction for SDN controllers load balancing | |
Slo et al. | pspice: Partial match shedding for complex event processing | |
Agnihotri et al. | Zerotune: learned zero-shot cost models for parallelism tuning in stream processing | |
CN116755872A (en) | Containerized streaming media service dynamic loading system and method based on TOPSIS | |
Jeong et al. | Towards energy-efficient service scheduling in federated edge clouds | |
Perwej | The ambient scrutinize of scheduling algorithms in big data territory | |
CN105635285A (en) | State-sensing-based VM migration scheduling method | |
Myint et al. | Comparative analysis of adaptive file replication algorithms for cloud data storage | |
Wang et al. | Model-based scheduling for stream processing systems | |
Ghiasi et al. | Smart virtual machine placement using learning automata to reduce power consumption in cloud data centers | |
Wu et al. | A job schedule model based on grid environment |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |