[go: up one dir, main page]

CN104317738B - A Method of Incremental Computing Based on MapReduce - Google Patents

A Method of Incremental Computing Based on MapReduce Download PDF

Info

Publication number
CN104317738B
CN104317738B CN201410577293.XA CN201410577293A CN104317738B CN 104317738 B CN104317738 B CN 104317738B CN 201410577293 A CN201410577293 A CN 201410577293A CN 104317738 B CN104317738 B CN 104317738B
Authority
CN
China
Prior art keywords
data
processing
incremental
result
model
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Expired - Fee Related
Application number
CN201410577293.XA
Other languages
Chinese (zh)
Other versions
CN104317738A (en
Inventor
孙广中
刘惠民
周英华
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
University of Science and Technology of China USTC
Original Assignee
University of Science and Technology of China USTC
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by University of Science and Technology of China USTC filed Critical University of Science and Technology of China USTC
Priority to CN201410577293.XA priority Critical patent/CN104317738B/en
Publication of CN104317738A publication Critical patent/CN104317738A/en
Application granted granted Critical
Publication of CN104317738B publication Critical patent/CN104317738B/en
Expired - Fee Related legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Landscapes

  • Management, Administration, Business Operations System, And Electronic Commerce (AREA)

Abstract

本发明公开了一种基于MapReduce的增量计算方法,该方法包括:创建用于缓存不同的历史处理结果的增量处理模型,包括:缓存combiner结果的模型、缓存中间结果的模型与结果直接重用的模型;当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算。本发明公开的方法,通过选择适用于数据特性的模型进行计算,可以节省大量不必要的重复计算,从而提高了数据处理的效率。

The invention discloses a method for incremental calculation based on MapReduce. The method includes: creating an incremental processing model for caching different historical processing results, including: a model for caching combiner results, a model for caching intermediate results, and direct reuse of results model; when the input data is obtained, select the corresponding incremental processing model for data processing according to the data characteristics of the input data, and when the incremental data arrives, call the corresponding incremental processing model for data processing and cache Incremental data calculation is performed on historical processing results. The method disclosed by the invention can save a large amount of unnecessary repeated calculations by selecting a model suitable for data characteristics for calculation, thereby improving the efficiency of data processing.

Description

一种基于MapReduce的增量计算方法A Method of Incremental Computing Based on MapReduce

技术领域technical field

本发明涉及计算机技术领域,尤其涉及一种基于MapReduce的增量计算方法。The invention relates to the field of computer technology, in particular to a MapReduce-based incremental calculation method.

背景技术Background technique

随着信息时代的发展,越来越多的数据产生,数据的种类和规模正以前所未有的速度增长,如何更好的管理和利用大数据已经成为普遍关注的话题。数据规模的提升给数据存储、管理以及数据分析带来了极大的挑战,Google公司提出了MapReduce模型来处理大数据,微软也提出了一个相类似的模型Dryad。在Google公开分布式文件系统、MapReduce模型等技术中心思想的基础上发展出来了Hadoop大数据处理平台,随后学术界和企业界围绕这些模型框架系统提出了一系列改进或者提出一些新的模型框架和系统。With the development of the information age, more and more data are generated, and the types and scale of data are growing at an unprecedented rate. How to better manage and utilize big data has become a topic of general concern. The increase in data scale has brought great challenges to data storage, management, and data analysis. Google proposed the MapReduce model to process big data, and Microsoft also proposed a similar model, Dryad. Hadoop big data processing platform was developed on the basis of Google's open distributed file system, MapReduce model and other technical central ideas, and then the academic and business circles proposed a series of improvements around these model framework systems or proposed some new model frameworks and system.

在大数据处理中,有越来越多的应用和增量数据有关,即数据随着时间快速增加或有小的改变,为了处理这一类问题,需要设计系统只处理增加的数据而不需要对所有数据重新计算。目前,针对MapReduce计算模型,在其上改进的系统较为复杂,效率提升有限,大多针对某一类具体问题设计,缺乏通用性。In big data processing, more and more applications are related to incremental data, that is, data increases rapidly or changes slightly over time. In order to deal with this type of problem, it is necessary to design a system that only handles increased data without requiring Recalculate all data. At present, for the MapReduce computing model, the system improved on it is relatively complex, and the efficiency improvement is limited. Most of them are designed for a specific type of problem and lack versatility.

发明内容Contents of the invention

本发明的目的是提供一种基于MapReduce的增量计算方法,通过选择适用于数据特性的模型进行计算,可以节省大量不必要的重复计算,从而提高了数据处理的效率。The purpose of the present invention is to provide a MapReduce-based incremental calculation method, which can save a lot of unnecessary repeated calculations by selecting a model suitable for data characteristics for calculation, thereby improving the efficiency of data processing.

本发明的目的是通过以下技术方案实现的:The purpose of the present invention is achieved through the following technical solutions:

一种基于MapReduce的增量计算方法,该方法包括:A method for incremental computing based on MapReduce, the method comprising:

创建用于缓存不同的历史处理结果的增量处理模型,包括:缓存combiner结果的模型、缓存中间结果的模型与结果直接重用的模型;Create an incremental processing model for caching different historical processing results, including: a model for caching combiner results, a model for caching intermediate results, and a model for direct reuse of results;

当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算。When the input data is obtained, the corresponding incremental processing model is selected for data processing according to the data characteristics of the input data, and when the incremental data arrives, the corresponding incremental processing model is called for historical processing cached during data processing The results are computed on incremental data.

进一步的,所述根据所述输入数据的数据特性选择对应的增量处理模型包括:Further, the selecting the corresponding incremental processing model according to the data characteristics of the input data includes:

当获取到输入数据时,判断输入数据的数据特性;When the input data is obtained, determine the data characteristics of the input data;

若为统计类数据,则选择结果直接重用模型处理;If it is statistical data, the selection result can be directly reused for model processing;

若为时间序列数据,则选择缓存中间结果的模型处理;If it is time series data, choose to cache the model processing of intermediate results;

若不为上述两类数据,则使用缓存combiner结果的模型处理数据。If it is not the above two types of data, use the model that caches the combiner result to process the data.

进一步的,所述当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算包括:Further, when the input data is obtained, the corresponding incremental processing model is selected for data processing according to the data characteristics of the input data, and when the incremental data arrives, the corresponding incremental processing model is called for data processing Incremental data calculations for historical processing results cached in real time include:

若在获得输入数据时,选择缓存combiner结果的模型来处理,则对输入的数据依次进行map运算、combiner运算与reduce运算,并缓存所述combiner运算结果C1;其中,map运算结果作为combiner运算的输入数据,combiner运算结果C1作为reduce运算的输入数据;If the model that caches the combiner result is selected for processing when the input data is obtained, then map operation, combiner operation, and reduce operation are performed on the input data in sequence, and the combiner operation result C1 is cached; where the map operation result is used as the result of the combiner operation Input data, the combiner operation result C1 is used as the input data of the reduce operation;

当增量数据到来时,依次进行map运算与combiner运算,并缓存所述combiner运算结果C2;再调用之前缓存的combiner运算结果C1与当前的combiner运算结果C2作为reduce运算的输入数据,进行reduce运算。When the incremental data arrives, the map operation and the combiner operation are performed sequentially, and the combiner operation result C2 is cached; then the previously cached combiner operation result C1 and the current combiner operation result C2 are called as the input data of the reduce operation, and the reduce operation is performed .

进一步的,所述当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算包括:Further, when the input data is obtained, the corresponding incremental processing model is selected for data processing according to the data characteristics of the input data, and when the incremental data arrives, the corresponding incremental processing model is called for data processing Incremental data calculations for historical processing results cached in real time include:

若在获得输入数据时,选择缓存中间结果的模型来处理,则按照预定方式对所述输入数据进行划分后进行两个阶段的MapReduce运算;其中,划分后的数据分别依次进行map运算与reduce运算完成第一阶段MapReduce运算,获得所述按照预定方式划分的中间结果;将所述按照预定方式划分的中间结果作为map运算的输入数据,分别进行map运算,并缓存map运算结果M1,再将所述map运算结果M1作为reduce运算的输入数据,完成第二阶段MapReduce运算;If the model that caches the intermediate results is selected for processing when the input data is obtained, the input data is divided according to a predetermined method and then two stages of MapReduce operations are performed; wherein, the divided data are respectively subjected to map operations and reduce operations in sequence Complete the first stage MapReduce operation, obtain the intermediate result divided according to the predetermined method; use the intermediate result divided according to the predetermined method as the input data of the map operation, perform the map operation respectively, and cache the map operation result M1, and then use the The map operation result M1 is used as the input data of the reduce operation to complete the second stage MapReduce operation;

当增量数据到来时,将所述增量数据按照预定方式进行划分,并将划分后的数据块依次单独的进行两个阶段的MapReduce运算;其中,对于第一个处理的数据块,进行第二阶段MapReduce运算时,缓存其map运算结果M2,再调用之前缓存的map运算结果M1与当前的map运算结果M2作为reduce运算的输入数据,进行reduce运算;对于后续的数据块,在进行第二阶段MapReduce运算时缓存其map运算结果,并调用之前缓存的map运算结果M1、该数据块之前所有数据块进行第二阶段map运算结果,以及该数据块第二阶段map运算结果作为reduce运算的输入数据,进行reduce运算。When the incremental data arrives, divide the incremental data according to a predetermined method, and perform two-stage MapReduce operations on the divided data blocks separately; wherein, for the first processed data block, perform the second During the second-stage MapReduce operation, the map operation result M2 is cached, and the previously cached map operation result M1 and the current map operation result M2 are used as the input data of the reduce operation to perform the reduce operation; for subsequent data blocks, the second During the stage MapReduce operation, the map operation result is cached, and the previously cached map operation result M1, the second-stage map operation result of all data blocks before the data block, and the second-stage map operation result of the data block are used as the input of the reduce operation Data, perform reduce operation.

进一步的,所述当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算包括:Further, when the input data is obtained, the corresponding incremental processing model is selected for data processing according to the data characteristics of the input data, and when the incremental data arrives, the corresponding incremental processing model is called for data processing Incremental data calculations for historical processing results cached in real time include:

若在获得输入数据时,选择结果直接重用的模型,则进行MapReduce运算,获得计算结果R1并缓存;所述计算结果R1中包含该数据的运算结果和用于后续运算且可直接调用的信息;If when obtaining the input data, select a model for direct reuse of the result, then perform a MapReduce operation, obtain the calculation result R1 and cache it; the calculation result R1 includes the operation result of the data and information that can be used for subsequent operations and can be directly called;

当增量数据到来时,将该增量数据进行MapReduce运算,获得计算结果R2并缓存;将之前缓存的计算结果R1与当前结算结果R2作为合并merge运算的输入进行,进行整合处理,获得最终结果。When the incremental data arrives, perform MapReduce operations on the incremental data to obtain the calculation result R2 and cache it; use the previously cached calculation result R1 and the current settlement result R2 as inputs for the merge operation, and perform integration processing to obtain the final result .

由上述本发明提供的技术方案可以看出,根据不同的数据特性,选择不同的模型,通过缓存不同的历史数据,在启动增量计算时,根据获取到的历史计算结果,直接进行增量数据的计算,避免了不必要的重复计算,从而提高了数据处理的效率。It can be seen from the technical solution provided by the present invention above that, according to different data characteristics, different models are selected, and different historical data are cached. When incremental calculation is started, incremental data is directly processed according to the obtained historical calculation results. The calculation avoids unnecessary repeated calculation, thereby improving the efficiency of data processing.

附图说明Description of drawings

为了更清楚地说明本发明实施例的技术方案,下面将对实施例描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域的普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他附图。In order to more clearly illustrate the technical solutions of the embodiments of the present invention, the following will briefly introduce the accompanying drawings that need to be used in the description of the embodiments. Obviously, the accompanying drawings in the following description are only some embodiments of the present invention. For Those of ordinary skill in the art can also obtain other drawings based on these drawings on the premise of not paying creative work.

图1为本发明实施例提供的一种基于MapReduce的增量计算方法的流程图;Fig. 1 is a flow chart of a method for incremental calculation based on MapReduce provided by an embodiment of the present invention;

图2为本发明实施例提供的一种基于MapReduce的增量计算方法处理流程的示意图;Fig. 2 is a schematic diagram of a processing flow of a MapReduce-based incremental calculation method provided by an embodiment of the present invention;

图3为本发明实施例提供的一种缓存combiner结果的模型数据处理的示意图;FIG. 3 is a schematic diagram of model data processing of cache combiner results provided by an embodiment of the present invention;

图4为本发明实施例提供的一种缓存中间结果的模型数据处理的示意图;FIG. 4 is a schematic diagram of model data processing for caching intermediate results provided by an embodiment of the present invention;

图5为本发明实施例提供的一种结果直接重用的模型数据处理的示意图。FIG. 5 is a schematic diagram of model data processing for direct result reuse provided by an embodiment of the present invention.

具体实施方式detailed description

下面结合本发明实施例中的附图,对本发明实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本发明一部分实施例,而不是全部的实施例。基于本发明的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本发明的保护范围。The technical solutions in the embodiments of the present invention will be clearly and completely described below in conjunction with the accompanying drawings in the embodiments of the present invention. Obviously, the described embodiments are only some of the embodiments of the present invention, not all of them. Based on the embodiments of the present invention, all other embodiments obtained by persons of ordinary skill in the art without making creative efforts belong to the protection scope of the present invention.

实施例Example

图1为本发明实施例提供的一种基于MapReduce的增量计算方法的流程图;如图1所示,该方法主要包括如下步骤:Fig. 1 is the flow chart of a kind of incremental calculation method based on MapReduce that the embodiment of the present invention provides; As shown in Fig. 1, this method mainly comprises the following steps:

步骤11、创建用于缓存不同的历史处理结果的增量处理模型,包括:缓存combiner结果的模型、缓存中间结果的模型与结果直接重用的模型;Step 11. Create an incremental processing model for caching different historical processing results, including: a model for caching combiner results, a model for caching intermediate results, and a model for direct reuse of results;

步骤12、当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算。Step 12. When the input data is obtained, select the corresponding incremental processing model for data processing according to the data characteristics of the input data, and when the incremental data arrives, call the corresponding incremental processing model for data processing cache Incremental data calculation is performed on historical processing results.

其中,所述根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理可以参照如下规则:当获取到输入数据时,判断输入数据的数据特性;若为统计类数据,则选择结果直接重用模型处理;若为时间序列数据(即数据可以按一定标准划分且相互运算不影响),则选择缓存中间结果的模型处理;若不为上述两类数据,则使用缓存combiner结果的模型处理数据。另外,由于缓存combiner结果的模型为通用模型,所以也可以根据实际情况或需求利用缓存combiner结果的模型来处理统计类数据与时间序列数据。Wherein, the selection of the corresponding incremental processing model according to the data characteristics of the input data for data processing may refer to the following rules: when the input data is obtained, judge the data characteristics of the input data; if it is statistical data, select the result Directly reuse model processing; if it is time series data (that is, the data can be divided according to certain standards and does not affect each other), then choose the model processing of caching intermediate results; if it is not the above two types of data, use the model processing of caching combiner results data. In addition, since the model for caching combiner results is a general model, the model for caching combiner results can also be used to process statistical data and time series data according to actual conditions or needs.

根据选择的增量处理模型的不同,本步骤可以有如下三种处理方式:According to the selected incremental processing model, this step can have the following three processing methods:

1)若在获得输入数据时,选择缓存combiner结果的模型来处理,则对输入的数据依次进行map(映射)运算、combine(本地归并)运算与reduce(归约)运算,并缓存所述combiner运算结果C1;其中,map运算结果作为combiner运算的输入数据,combiner运算结果C1作为reduce运算的输入数据;1) If the model that caches the combiner result is selected for processing when the input data is obtained, then the input data is sequentially performed with map (mapping) operation, combine (local merge) operation and reduce (reduce) operation, and caches the combiner The operation result C1; where, the map operation result is used as the input data of the combiner operation, and the combiner operation result C1 is used as the input data of the reduce operation;

当增量数据到来时,依次进行map运算与combiner运算,并缓存所述combiner运算结果C2;再调用之前缓存的combiner运算结果C1与当前的combiner运算结果C2作为reduce运算的输入数据,进行reduce运算。When the incremental data arrives, the map operation and the combiner operation are performed sequentially, and the combiner operation result C2 is cached; then the previously cached combiner operation result C1 and the current combiner operation result C2 are called as the input data of the reduce operation, and the reduce operation is performed .

2)若在获得输入数据时,选择缓存中间结果的模型来处理,则按照预定方式对所述输入数据进行划分后进行两个阶段的MapReduce运算;其中,划分后的数据分别依次进行map运算与reduce运算完成第一阶段MapReduce运算,获得所述按照预定方式划分的中间结果;将所述按照预定方式划分的中间结果作为map运算的输入数据,分别进行map运算,并缓存map运算结果M1,再将所述map运算结果M1作为reduce运算的输入数据,完成第二阶段MapReduce运算;2) If the model of caching intermediate results is selected for processing when the input data is obtained, the input data is divided according to a predetermined method and then two stages of MapReduce operations are performed; wherein, the divided data are respectively subjected to map operations and The reduce operation completes the first stage of the MapReduce operation, and obtains the intermediate result divided according to the predetermined method; uses the intermediate result divided according to the predetermined method as the input data of the map operation, respectively performs the map operation, and caches the map operation result M1, and then Using the map operation result M1 as the input data of the reduce operation to complete the second stage MapReduce operation;

当增量数据到来时,将所述增量数据按照预定方式进行划分,并将划分后的数据块依次单独的进行两个阶段的MapReduce运算;其中,对于第一个处理的数据块,进行第二阶段MapReduce运算时,缓存其map运算结果M2,再调用之前缓存的map运算结果M1与当前的map运算结果M2作为reduce运算的输入数据,进行reduce运算;对于后续的数据块,在进行第二阶段MapReduce运算时缓存其map运算结果,并调用之前缓存的map运算结果M1、该数据块之前所有数据块进行第二阶段map运算结果,以及该数据块第二阶段map运算结果作为reduce运算的输入数据,进行reduce运算。When the incremental data arrives, divide the incremental data according to a predetermined method, and perform two-stage MapReduce operations on the divided data blocks separately; wherein, for the first processed data block, perform the second During the second-stage MapReduce operation, the map operation result M2 is cached, and the previously cached map operation result M1 and the current map operation result M2 are used as the input data of the reduce operation to perform the reduce operation; for subsequent data blocks, the second During the stage MapReduce operation, the map operation result is cached, and the previously cached map operation result M1, the second-stage map operation result of all data blocks before the data block, and the second-stage map operation result of the data block are used as the input of the reduce operation Data, perform reduce operation.

3)若在获得输入数据时,选择结果直接重用的模型,则进行MapReduce运算,获得计算结果R1并缓存;所述计算结果R1中包含该数据的运算结果和用于后续运算且可直接调用的信息;3) If the model that directly reuses the result is selected when the input data is obtained, the MapReduce operation is performed, and the calculation result R1 is obtained and cached; the calculation result R1 includes the calculation result of the data and the data used for subsequent calculations that can be directly called information;

当增量数据到来时,将该增量数据进行MapReduce运算,获得计算结果R2并缓存;将之前缓存的计算结果R1与当前结算结果R2作为合并merge运算的输入进行,进行整合处理,获得最终结果。When the incremental data arrives, perform MapReduce operations on the incremental data to obtain the calculation result R2 and cache it; use the previously cached calculation result R1 and the current settlement result R2 as inputs for the merge operation, and perform integration processing to obtain the final result .

另外,上述方案的整体处理流程还可参见附图2。In addition, the overall processing flow of the above solution can also refer to FIG. 2 .

本发明实施例所提供的技术方案与现有技术相比,具有以下有益效果:Compared with the prior art, the technical solutions provided by the embodiments of the present invention have the following beneficial effects:

1)基于MapReduce数据处理模型,扩展了增量处理功能,提升增量数据处理效率;1) Based on the MapReduce data processing model, the incremental processing function is expanded to improve the efficiency of incremental data processing;

2)充分减少增量数据处理过程中的不必要的运算,节省存储空间和计算时间;2) Sufficiently reduce unnecessary calculations in the process of incremental data processing, saving storage space and computing time;

3)融合三个增量计算模型供用户选择,以尽可能的根据数据情况提高计算效率。3) Integrate three incremental calculation models for users to choose, so as to improve calculation efficiency as much as possible according to the data situation.

为了便于理解本发明,下面结合附图3-5对所述缓存combiner结果的模型、缓存中间结果的模型与结果直接重用的模型进行数据处理的过程做详细的说明。In order to facilitate the understanding of the present invention, the data processing process of the cache combiner result model, the cache intermediate result model and the result direct reuse model will be described in detail below with reference to the accompanying drawings 3-5.

1、缓存combiner结果的模型。1. A model that caches combiner results.

图3为本发明实施例提供的选择缓存combiner结果的模型时数据处理的示意图。其中字母M代表map运算阶段,C代表combiner运算阶段,R代表reduce运算阶段。如图3所示,该方法主要包括如下步骤:FIG. 3 is a schematic diagram of data processing when selecting a model for caching combiner results provided by an embodiment of the present invention. The letter M represents the map operation stage, C represents the combiner operation stage, and R represents the reduce operation stage. As shown in Figure 3, the method mainly includes the following steps:

步骤一、当启动增量数据计算任务时,模型对初始数据进行map运算,得到<key-value>形式的结果,将结果作为combiner运算阶段的输入,combiner运算阶段对map阶段输出的<key-value>数据在各个机器上进行简单归并后将结果缓存,然后送人reduce运算阶段进行运算得到初始数据的运算结果。Step 1. When the incremental data calculation task is started, the model performs a map operation on the initial data to obtain a result in the form of <key-value>, and uses the result as the input of the combiner operation stage, and the combiner operation stage performs the <key-value> output from the map stage. The value> data is simply merged on each machine and the result is cached, and then sent to the reduce operation stage for operation to obtain the operation result of the initial data.

其中,map、combiner、reduce是模型MapReduce的运算过程,新的增量处理模型将MapReduce中combiner运算阶段的结果缓存以便于后面进行增量计算,combiner运算阶段的结果相比原始数据大大减少,可以删除原始数据以节省存储空间。Among them, map, combiner, and reduce are the operation process of the model MapReduce. The new incremental processing model caches the results of the combiner operation stage in MapReduce for subsequent incremental calculations. Compared with the original data, the results of the combiner operation stage are greatly reduced. Delete raw data to save storage space.

步骤二、利用步骤一缓存的combiner结果进行增量数据的计算。Step 2: Use the combined result cached in step 1 to calculate the incremental data.

其中,map、combiner阶段为正常的MapReduce模型的运算,但进行到reduce阶段时,需要将缓存的历史数据的combiner运算结果和新增数据的combiner运算结果一起作为reduce阶段的输入,经reduce阶段运算后,可得到历史数据和新增数据总的最终结果。Among them, the map and combiner stages are normal MapReduce model operations, but when the reduce stage is reached, the combiner operation results of the cached historical data and the combiner operation results of the newly added data need to be used as the input of the reduce stage, and the reduce stage is calculated After that, the final result of historical data and new data can be obtained.

进一步的,说明该模型的原理:Further, explain the principle of the model:

Hadoop(一个分布式系统基础架构)中MapReduce模型的map阶段以<key,value>的形式产生中间结果输入到combiner函数中对数据进行预处理,这里需要说明的是每个节点的combiner函数仅对该节点的map结果进行处理,各节点不相关。combiner处理后数据能得到压缩并且仍以<key,value>的形式输出到Reduce阶段进行整合运算,得到最终结果。一般map阶段以行为单位划分数据进行运算得到<key,value>,combiner阶段对map阶段结果进行预处理和压缩,combiner函数只对本节点map结果进行处理,所以历史输入数据的combiner阶段运算和新增数据的combiner运算过程及结果不相关且结果是确定的,历史数据和新增数据combiner阶段所有产生的结果输入到reduce阶段相当于对所有数据进行map、combiner运算以后再进行reduce运算,得到的最终结果和对全部数据重新计算结果一致。The map stage of the MapReduce model in Hadoop (a distributed system infrastructure) generates intermediate results in the form of <key, value> and inputs them into the combiner function to preprocess the data. What needs to be explained here is that the combiner function of each node is only for The map result of this node is processed, and each node is not related. The data processed by the combiner can be compressed and output to the Reduce stage in the form of <key, value> for integration and calculation to obtain the final result. Generally, the map stage divides the data in units of behaviors to obtain <key, value>. The combiner stage preprocesses and compresses the results of the map stage. The combiner function only processes the map results of this node, so the combiner stage operation and new addition of historical input data The data combiner operation process and results are irrelevant and the results are deterministic. All the results generated in the combiner stage of historical data and new data are input into the reduce stage, which is equivalent to performing map and combiner operations on all data and then performing reduce operations to obtain the final result. The results are consistent with recalculations for all data.

2、缓存中间结果的模型。2. A model that caches intermediate results.

图4为本发明实施例提供的选择缓存中间结果模型时数据处理的示意图。其中字母M代表map运算阶段,R代表reduce运算阶段。FIG. 4 is a schematic diagram of data processing when selecting a cached intermediate result model according to an embodiment of the present invention. The letter M represents the map operation stage, and R represents the reduce operation stage.

该模型对MapReduce进行扩展,扩展为两个连续的MapReduce运算。需要说明,该模型通常只能针对一类具体问题,这类问题能够根据内容划分数据并单独计算每个划分块的结果,而且每个划分块的结果可以整合在一起作为最终结果。例如,计算用户最后登陆时间,可以以天为数据划分单位,中间结果为每天用户最后的登陆时间,经过第二阶段mapreduce运算,可以得到最终用户的最后登陆时间。如图4所示,该方法主要包括如下步骤:This model extends MapReduce to two consecutive MapReduce operations. It should be noted that this model can usually only be used for a specific class of problems, which can divide data according to content and calculate the results of each division block separately, and the results of each division block can be integrated as the final result. For example, to calculate the last login time of a user, you can use days as the data division unit, and the intermediate result is the last login time of the user every day. After the second stage of mapreduce calculation, the last login time of the end user can be obtained. As shown in Figure 4, the method mainly includes the following steps:

步骤一、当启动数据计算任务时,第一个阶段,将原始输入数据(时间序列数据)按预设方式进行划分,比如按时间段划分为不同的数据块,对数据进行MapReduce运算后得到按时间段划分的中间结果,需要更改该阶段reduce阶段的输出使得中间结果形式如<key,value>。第二阶段,对中间结果进行map运算,将map运算得到的结果缓存,继续进行reduce运算得到最终结果。Step 1. When the data calculation task is started, in the first stage, the original input data (time series data) is divided according to the preset method, for example, divided into different data blocks according to the time period, and the data is obtained after MapReduce operation. For the intermediate results of time segment division, it is necessary to change the output of the reduce phase of this stage so that the intermediate results are in the form of <key, value>. In the second stage, the map operation is performed on the intermediate results, the result obtained by the map operation is cached, and the reduce operation is continued to obtain the final result.

其中,map运算和第二阶段的reduce是模型MapReduce的运算过程,第一阶段的reduce需要更改输出数据形式,另外,为了增量数据的处理,需要将第二阶段的map运算结果缓存以便于后面进行增量计算,第二阶段map过程的结果相比原始数据大大减少,可以删除原始数据以节省存储空间。Among them, the map operation and the reduce in the second stage are the operation process of the model MapReduce. The reduce in the first stage needs to change the output data format. In addition, in order to process the incremental data, the map operation results in the second stage need to be cached for later use. For incremental calculations, the results of the map process in the second stage are greatly reduced compared to the original data, and the original data can be deleted to save storage space.

步骤二、利用步骤一缓存的第二阶段map运算结果进行增量数据的计算。Step 2: Use the second-stage map operation results cached in step 1 to calculate incremental data.

当新增数据到来时,按步骤一中的预设方式进行划分数据(例如,步骤一中按照时间段划分,此时也按照时间段划分),这里可以限制为新数据划分后将每个时间段数据单独作为新的输入添加进增量模型。对于增量数据,按步骤一的方式进行处理,在进行第二阶段的reduce过程时,将步骤一运算后缓存的第二阶段的map运算结果和步骤二中第二阶段map运算的结果一起作为reduce阶段的输入,进行reduce运算后得到最新结果。这里使用的缓存内容是所有运算过程中全部的缓存内容,即随着数据的增加缓存的每一次运算的中间结果。另外,步骤二中中间结果的map运算结果也要被缓存以供以后增量数据的计算。When the new data arrives, divide the data according to the preset method in step 1 (for example, in step 1, it is divided according to the time period, and at this time it is also divided according to the time period). Segment data are added individually as new inputs to the incremental model. For incremental data, it is processed according to the method of step 1. When performing the reduce process of the second stage, the result of the map operation of the second stage cached after the operation of step 1 and the result of the map operation of the second stage of step 2 are used together as The input of the reduce stage, and the latest result is obtained after the reduce operation. The cache content used here is all the cache content in all operations, that is, the intermediate result of each operation cached with the increase of data. In addition, the map operation result of the intermediate result in step 2 is also cached for future calculation of incremental data.

进一步的,说明该模型的原理:Further, explain the principle of the model:

模型将数据划分后计算,参数对应于每个划分块的中间结果,再对中间结果进行map操作,缓存map的运算结果。当有新的数据来临时,可以将新数据按划分逐步添加到系统中运算。新数据产生的中间结果经map后产生的运算结果和历史数据中间结果的map结果不相关,两个运算相互不影响且对reduce阶段运算不影响。reduce阶段整合所有map阶段的结果得到最新计算结果和对所有中间结果进行Mapreduce处理后的计算结果一致。The model calculates after dividing the data, and the parameters correspond to the intermediate results of each division block, and then performs map operation on the intermediate results, and caches the operation results of the map. When new data comes, the new data can be gradually added to the system by division for calculation. The operation result generated by the map of the intermediate result generated by the new data is irrelevant to the map result of the intermediate result of the historical data. The two operations do not affect each other and have no effect on the operation of the reduce stage. The reduce phase integrates the results of all the map phases to obtain the latest calculation results, which are consistent with the calculation results after Mapreduce processing of all intermediate results.

3、结果直接重用的模型。3. Models that result in direct reuse.

图5为本发明实施例提供的选择结果直接重用的模型时数据处理的示意图。其中字母M代表map阶段运算,R代表reduce阶段运算。需要说明,该模型通常应用与统计类数据,即在数据运算结果能够和历史运算结果易于整合的情况下。比如统计时间序列上的平均值,运算得到的新增数据的平均值在知道新增数据个数和历史数据个数的情况下可以很容易的和历史运算结果结合计算得到最新的所有数据的平均值。如图5所示,该方法主要包括如下步骤:FIG. 5 is a schematic diagram of data processing when selecting a model for direct reuse of results provided by an embodiment of the present invention. The letter M represents the map phase operation, and R represents the reduce phase operation. It should be noted that this model is usually applied to statistical data, that is, when the data operation results can be easily integrated with the historical operation results. For example, the average value on the statistical time series, the average value of the newly added data obtained by calculation can be easily combined with the historical operation results to calculate the latest average of all data when the number of new data and the number of historical data are known value. As shown in Figure 5, the method mainly includes the following steps:

步骤一、当启动数据计算任务时,模型对初始数据进行正常MapReduce运算,得到初始数据的计算结果并缓存,该缓存内容包括该数据的运算结果和用于后续运算可直接调用的信息,比如数据个数,数据最大最小值等。Step 1. When the data calculation task is started, the model performs normal MapReduce calculations on the initial data to obtain the calculation results of the initial data and cache them. The cache content includes the calculation results of the data and information that can be directly called for subsequent calculations, such as data Number, data maximum and minimum values, etc.

步骤二、利用步骤一缓存的数据进行增量数据的计算。当新的增量数据需要计算时,按照正常的MapReduce模型的进行计算,得到新数据的结果,然后通过用户控制的merge(合并)运算对新的计算结果和步骤一缓存结果进行整合,得到整体数据的最终结果。Step 2: Use the cached data in Step 1 to calculate incremental data. When the new incremental data needs to be calculated, the calculation is performed according to the normal MapReduce model to obtain the result of the new data, and then the new calculation result and the cached result of step 1 are integrated through the user-controlled merge operation to obtain the overall The end result of the data.

通过一些例子可以说明模型的可行性,比如统计最大值、平均值、最小值等,只需知道最新数据的计算结果和历史结果加上其他一些数据相关信息(如数据数量等)就可以得到所有数据的最终结果。Some examples can illustrate the feasibility of the model, such as statistical maximum, average, minimum, etc., only need to know the calculation results and historical results of the latest data plus some other data-related information (such as the number of data, etc.) to get all The end result of the data.

本发明实施例上述方案,根据不同的数据特性,选择不同的模型,通过缓存不同的历史数据,在启动增量计算时,根据获取到的历史计算结果,直接进行增量数据的计算,避免了不必要的重复计算,从而提高了数据处理的效率。According to the above scheme of the embodiment of the present invention, different models are selected according to different data characteristics, and different historical data are cached. When the incremental calculation is started, the calculation of the incremental data is directly performed according to the obtained historical calculation results, avoiding the Unnecessary repeated calculations, thereby improving the efficiency of data processing.

以上所述,仅为本发明较佳的具体实施方式,但本发明的保护范围并不局限于此,任何熟悉本技术领域的技术人员在本发明披露的技术范围内,可轻易想到的变化或替换,都应涵盖在本发明的保护范围之内。因此,本发明的保护范围应该以权利要求书的保护范围为准。The above is only a preferred embodiment of the present invention, but the scope of protection of the present invention is not limited thereto. Any person familiar with the technical field can easily conceive of changes or changes within the technical scope disclosed in the present invention. Replacement should be covered within the protection scope of the present invention. Therefore, the protection scope of the present invention should be determined by the protection scope of the claims.

Claims (4)

1.一种基于MapReduce的增量计算方法,其特征在于,该方法包括:1. A method for incremental calculation based on MapReduce, characterized in that the method comprises: 创建用于缓存不同的历史处理结果的增量处理模型,包括:缓存combiner结果的模型、缓存中间结果的模型与结果直接重用的模型;Create an incremental processing model for caching different historical processing results, including: a model for caching combiner results, a model for caching intermediate results, and a model for direct reuse of results; 当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算;When the input data is obtained, the corresponding incremental processing model is selected for data processing according to the data characteristics of the input data, and when the incremental data arrives, the corresponding incremental processing model is called for historical processing cached during data processing The results are calculated for incremental data; 其中,所述根据所述输入数据的数据特性选择对应的增量处理模型包括:Wherein, the selection of the corresponding incremental processing model according to the data characteristics of the input data includes: 当获取到输入数据时,判断输入数据的数据特性;When the input data is obtained, determine the data characteristics of the input data; 若为统计类数据,则选择结果直接重用模型处理;If it is statistical data, the selection result can be directly reused for model processing; 若为时间序列数据,则选择缓存中间结果的模型处理;If it is time series data, choose to cache the model processing of intermediate results; 若不为上述两类数据,则使用缓存combiner结果的模型处理数据。If it is not the above two types of data, use the model that caches the combiner result to process the data. 2.根据权利要求1所述的方法,其特征在于,所述当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算包括:2. The method according to claim 1, wherein when the input data is obtained, the corresponding incremental processing model is selected according to the data characteristics of the input data for data processing, and when the incremental data arrives, Invoking the corresponding incremental processing model for data processing and performing incremental data calculation on cached historical processing results includes: 若在获得输入数据时,选择缓存combiner结果的模型来处理,则对输入的数据依次进行map运算、combiner运算与reduce运算,并缓存所述combiner运算结果C1;其中,map运算结果作为combiner运算的输入数据,combiner运算结果C1作为reduce运算的输入数据;If the model that caches the combiner result is selected for processing when the input data is obtained, then map operation, combiner operation, and reduce operation are performed on the input data in sequence, and the combiner operation result C1 is cached; where the map operation result is used as the result of the combiner operation Input data, the combiner operation result C1 is used as the input data of the reduce operation; 当增量数据到来时,依次进行map运算与combiner运算,并缓存所述combiner运算结果C2;再调用之前缓存的combiner运算结果C1与当前的combiner运算结果C2作为reduce运算的输入数据,进行reduce运算。When the incremental data arrives, the map operation and the combiner operation are performed sequentially, and the combiner operation result C2 is cached; then the previously cached combiner operation result C1 and the current combiner operation result C2 are called as the input data of the reduce operation, and the reduce operation is performed . 3.根据权利要求1所述的方法,其特征在于,所述当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算包括:3. The method according to claim 1, wherein when the input data is obtained, a corresponding incremental processing model is selected for data processing according to the data characteristics of the input data, and when the incremental data arrives, Invoking the corresponding incremental processing model for data processing and performing incremental data calculation on cached historical processing results includes: 若在获得输入数据时,选择缓存中间结果的模型来处理,则按照预定方式对所述输入数据进行划分后进行两个阶段的MapReduce运算;其中,划分后的数据分别依次进行map运算与reduce运算完成第一阶段MapReduce运算,获得所述按照预定方式划分的中间结果;将所述按照预定方式划分的中间结果作为map运算的输入数据,分别进行map运算,并缓存map运算结果M1,再将所述map运算结果M1作为reduce运算的输入数据,完成第二阶段MapReduce运算;If the model that caches the intermediate results is selected for processing when the input data is obtained, the input data is divided according to a predetermined method and then two stages of MapReduce operations are performed; wherein, the divided data are respectively subjected to map operations and reduce operations in sequence Complete the first stage MapReduce operation, obtain the intermediate result divided according to the predetermined method; use the intermediate result divided according to the predetermined method as the input data of the map operation, perform the map operation respectively, and cache the map operation result M1, and then use the The map operation result M1 is used as the input data of the reduce operation to complete the second stage MapReduce operation; 当增量数据到来时,将所述增量数据按照预定方式进行划分,并将划分后的数据块依次单独的进行两个阶段的MapReduce运算;其中,对于第一个处理的数据块,进行第二阶段MapReduce运算时,缓存其map运算结果M2,再调用之前缓存的map运算结果M1与当前的map运算结果M2作为reduce运算的输入数据,进行reduce运算;对于后续的数据块,在进行第二阶段MapReduce运算时缓存其map运算结果,并调用之前缓存的map运算结果M1、该数据块之前所有数据块进行第二阶段map运算结果,以及该数据块第二阶段map运算结果作为reduce运算的输入数据,进行reduce运算。When the incremental data arrives, divide the incremental data according to a predetermined method, and perform two-stage MapReduce operations on the divided data blocks separately; wherein, for the first processed data block, perform the second During the second-stage MapReduce operation, the map operation result M2 is cached, and the previously cached map operation result M1 and the current map operation result M2 are used as the input data of the reduce operation to perform the reduce operation; for subsequent data blocks, the second During the stage MapReduce operation, the map operation result is cached, and the previously cached map operation result M1, the second-stage map operation result of all data blocks before the data block, and the second-stage map operation result of the data block are used as the input of the reduce operation Data, perform reduce operation. 4.根据权利要求1所述的方法,其特征在于,所述当获得输入数据时,根据所述输入数据的数据特性选择对应的增量处理模型进行数据处理,并在增量数据到来时,调用所述对应的增量处理模型进行数据处理时缓存的历史处理结果进行增量数据的计算包括:4. The method according to claim 1, wherein when the input data is obtained, a corresponding incremental processing model is selected for data processing according to the data characteristics of the input data, and when the incremental data arrives, Invoking the corresponding incremental processing model for data processing and performing incremental data calculation on cached historical processing results includes: 若在获得输入数据时,选择结果直接重用的模型,则进行MapReduce运算,获得计算结果R1并缓存;所述计算结果R1中包含该数据的运算结果和用于后续运算且可直接调用的信息;If when obtaining the input data, select a model for direct reuse of the result, then perform a MapReduce operation, obtain the calculation result R1 and cache it; the calculation result R1 includes the operation result of the data and information that can be used for subsequent operations and can be directly called; 当增量数据到来时,将该增量数据进行MapReduce运算,获得计算结果R2并缓存;将之前缓存的计算结果R1与当前计算结果R2作为合并merge运算的输入,进行整合处理,获得最终结果。When the incremental data arrives, the incremental data is processed by MapReduce to obtain the calculation result R2 and cached; the previously cached calculation result R1 and the current calculation result R2 are used as the input of the merge operation for integration processing to obtain the final result.
CN201410577293.XA 2014-10-24 2014-10-24 A Method of Incremental Computing Based on MapReduce Expired - Fee Related CN104317738B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN201410577293.XA CN104317738B (en) 2014-10-24 2014-10-24 A Method of Incremental Computing Based on MapReduce

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN201410577293.XA CN104317738B (en) 2014-10-24 2014-10-24 A Method of Incremental Computing Based on MapReduce

Publications (2)

Publication Number Publication Date
CN104317738A CN104317738A (en) 2015-01-28
CN104317738B true CN104317738B (en) 2017-07-25

Family

ID=52372973

Family Applications (1)

Application Number Title Priority Date Filing Date
CN201410577293.XA Expired - Fee Related CN104317738B (en) 2014-10-24 2014-10-24 A Method of Incremental Computing Based on MapReduce

Country Status (1)

Country Link
CN (1) CN104317738B (en)

Families Citing this family (7)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN106407636B (en) * 2015-07-31 2020-02-14 腾讯科技(深圳)有限公司 Integration result statistical method and device
CN106933882B (en) * 2015-12-31 2020-09-29 华为技术有限公司 Big data increment calculation method and device
CN106909495B (en) * 2016-06-03 2020-07-03 阿里巴巴集团控股有限公司 Data window statistical method, device and system
CN108846636A (en) * 2018-06-01 2018-11-20 北京字节跳动网络技术有限公司 Data dispatching method, device, computer readable storage medium
CN109951556A (en) * 2019-03-27 2019-06-28 联想(北京)有限公司 A kind of Spark task processing method and system
CN111858706A (en) * 2020-07-02 2020-10-30 中国建设银行股份有限公司 Data processing method and device
CN117214589A (en) * 2023-11-08 2023-12-12 天津德科智控股份有限公司 EPS system time domain response field test method

Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN103440244A (en) * 2013-07-12 2013-12-11 广东电子工业研究院有限公司 A big data storage optimization method

Family Cites Families (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US9424271B2 (en) * 2012-08-30 2016-08-23 International Business Machines Corporation Atomic incremental load for map-reduce systems on append-only file systems

Patent Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN103440244A (en) * 2013-07-12 2013-12-11 广东电子工业研究院有限公司 A big data storage optimization method

Also Published As

Publication number Publication date
CN104317738A (en) 2015-01-28

Similar Documents

Publication Publication Date Title
CN104317738B (en) A Method of Incremental Computing Based on MapReduce
WO2022262183A1 (en) Federated computing processing method and apparatus, electronic device, and storage medium
CN109597965B (en) Data processing method, system, terminal and medium based on deep neural network
CN107277615B (en) Live broadcast stylization processing method and device, computing device and storage medium
WO2017090475A1 (en) Information processing system, function creation method, and function creation program
CN114023342A (en) Voice conversion method and device, storage medium and electronic equipment
CN111104214B (en) Workflow application method and device
JP2022068327A (en) Node grouping method, apparatus therefor, and electronic device therefor
CN107589983A (en) Virtual machine creation method and its device in a kind of cloud computing system
CN112561081B (en) Conversion method and device of deep learning model, electronic equipment and storage medium
CN113888524A (en) Defect detection model training method, apparatus, device and readable storage medium
CN115631273A (en) Big data duplicate removal method, device, equipment and medium
CN107392316A (en) Network training method, device, computing device and computer-readable storage medium
CN114385816A (en) Conversation flow mining method and device, electronic equipment and computer storage medium
CN116758204B (en) Rendering processing method and related device based on line renderer
CN113935069B (en) Data verification method, device and equipment based on block chain and storage medium
CN110119935A (en) A method and device for tracking the application process of a scientific and technological project
CN116452861A (en) Target model training method and device and electronic equipment
EP3901760A1 (en) Data processing method and apparatus, electronic device, storage medium and program product
CN114048863A (en) Data processing method, data processing device, electronic equipment and storage medium
US20250022085A1 (en) Quantum computing and blockchain enabled learning management system
CN109828894A (en) Acquisition method, device, storage medium and the electronic equipment of device status data
CN117422612A (en) Image processing method, device, electronic equipment and readable storage medium
JP2022093331A (en) Point group data processing method, apparatus, electronic device, storage medium, and program
CN118338116A (en) Image acquisition method and device in pulse welding process

Legal Events

Date Code Title Description
C06 Publication
PB01 Publication
C10 Entry into substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant
CF01 Termination of patent right due to non-payment of annual fee

Granted publication date: 20170725

CF01 Termination of patent right due to non-payment of annual fee