[go: up one dir, main page]

WO2013153027A1 - Method and system for streaming processing in a map and reduce distributed computing platform - Google Patents

Method and system for streaming processing in a map and reduce distributed computing platform Download PDF

Info

Publication number
WO2013153027A1
WO2013153027A1 PCT/EP2013/057300 EP2013057300W WO2013153027A1 WO 2013153027 A1 WO2013153027 A1 WO 2013153027A1 EP 2013057300 W EP2013057300 W EP 2013057300W WO 2013153027 A1 WO2013153027 A1 WO 2013153027A1
Authority
WO
WIPO (PCT)
Prior art keywords
state
data
queue
node
input
Prior art date
Application number
PCT/EP2013/057300
Other languages
French (fr)
Inventor
Andreu Urruela Planas
Ken GUNNAR ZANGELIN
José Gregorio ESCALADA SARDINA
Original Assignee
Telefónica, S.A.
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Telefónica, S.A. filed Critical Telefónica, S.A.
Publication of WO2013153027A1 publication Critical patent/WO2013153027A1/en

Links

Classifications

    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/54Interprogram communication
    • G06F9/544Buffers; Shared memory; Pipes
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/24Querying
    • G06F16/245Query processing
    • G06F16/2453Query optimisation
    • G06F16/24532Query optimisation of parallel queries
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F2209/00Indexing scheme relating to G06F9/00
    • G06F2209/54Indexing scheme relating to G06F9/54
    • G06F2209/548Queue

Definitions

  • the present invention generally relates, in a first aspect, to a method for streaming processing in a map and reduce distributed computing platform, and more specifically to a method for optimizing the performance of processing continuous data flow in a distributed computing platforms.
  • a second aspect of the invention relates to a system arranged for implementing the method of the first aspect.
  • Apache Hadoop is great for simple batch processing tasks with Terabytes and Petabytes of data involved, but companies and institutions are confronting more complex tasks: graph computing, interactive analysis, complex joins, Atomicity, Consistency, Isolation and Durability (ACID) requirements, real-time requirements or the need for continuous incremental updates (all of them keeping the need of processing the volume of relevant information generated).
  • MR is still at the core of Big Data processing, and it is being evolved to solve the new arising problems.
  • the MR paradigm in 2004 and its open-source implementation with Apache Hadoop were the right solution.
  • the distributed space of key-values proved to be useful to store and retrieve information in a cost effective way.
  • This path is providing new solutions based on MR (Google's Percolator, Yahoo's S4) and a number of MR evolutions to improve time- response.
  • MapReduce is a paradigm evolved from functional programming and applied to distributed systems. It was presented in 2004 by Google [2]. It is meant for processing problems whose solution can be expressed in commutative and associative functions.
  • MR offers an abstraction for processing large datasets on a set of machines, configured in a cluster. With this abstraction, the platform can easily solve the synchronization problem, freeing the developer thus of thinking about that issue.
  • map takes as an argument a function f (that takes a single argument) and applies it to all elements in a list (the top part of the Figure 1 ), returning a list or results.
  • the second step, fold accumulates a new result by iterating through the elements in the result list. It takes three parameters: a base value, a list, and a function, e.g. Typically, map and fold are used in combination.
  • the output of one function is the input of the next one (as functional programming avoids state and mutable data, all the computation must progress by passing results from one function to the next one), and this type of functions can be cascaded until finishing the job.
  • map type of function a user-specified computation is applied over all input records in a dataset.
  • the task can be split among any number of instances (the mappers), each of them working on a subset of the input data, and can be distributed among any number of machines. These operations occur in parallel. Every key-value pair in the input data is processed, and they can produce none, one or multiple key-value pairs, with the same or different information. They yield intermediate output that is then dumped to the reduce functions.
  • the reduce phase has the function to aggregate the results disseminated in the map phase. In order to do so, all the results from all the mappers are sorted by the key element of the key-value pair, and the operation is distributed among a number of instances (the reducers, also running in parallel among the available machines). The platform guarantees that all the key-value pairs with the same key are presented to the same reducer. This phase has so the possibility to aggregate the information emitted in the map phase.
  • the job to be processed can be divided in any number of implementations of these two-phase cycles.
  • the platform provides the framework to execute these operations distributed in parallel in a number of CPUs.
  • the only point of synchronization is at the output of the map phase, were all the key-values must be available to be sorted and redistributed.
  • the developer has only to care about the implementation (according to the limitations of the paradigm) of the map and reduce functions, and the platform hides the complexity of data distribution and synchronization.
  • the developer can access the combined resources (CPU, disk, memory) of the whole cluster, in a transparent way.
  • the utility of the paradigm arises when dealing with big data problems, where a single machine has not enough memory to handle all the data or its local disk would not be big and fast enough to cope with all the data.
  • MapReduce A simple word count algorithm in MapReduce is shown in Figure 2. This algorithm counts the number of occurrences of every word in a text collection.
  • Input key-value pairs take the form of (docid, doc) pairs stored on the distributed file system, where the former is a unique identifier for the document, and the latter is the content of the document.
  • the mapper takes an input key-value pair, tokenizes the document, and emits an intermediate key-value pair for every word.
  • the key would be a string (the word itself) while the value is the count of the occurrences of the word (an integer).
  • Twister Original implementations [4] focus on performing single step MapReduce (computations that involve only one application of MapReduce) with better fault tolerance, and therefore store most of the data outputs to some form of file system throughout the computation.
  • the repetitive application of MR creates new map/reduce tasks in each iteration loading or accessing any static data repetitively. This strategy introduces considerable performance overheads for many iterative applications.
  • Twister is an enhanced MapReduce runtime that uses messaging infrastructure for communication and data transfers, and supports long running map/reduce tasks. Intermediate data are kept in memory.
  • MapReduce Online Another factor with a direct impact on latency is the needs of synchronizing at the beginning of reduce operation. It cannot start until all the map tasks have finished, so the time-response is very sensitive to the "stragglers". Furthermore, as the entire output of each task is materialized to a local (or distributed) file, it is not possible to return the results of a query until all the complete job has finished. In this evolution, the proposed architecture pipelines data between operators, while preserving the programming interfaces and fault tolerance models of the original framework.
  • Hash-Based MapReduce [6]: MapReduce implementations require the entire data set to be loaded into the cluster before running analytical queries, thereby incurring long latencies and making them unsuitable for producing incremental results. In order to support incremental analytics, blocking operations and computational and I/O bottlenecks should be avoided. In this solution, sort-merge implementation is replaced with a purely hash-based framework, which is designed to address the problems of the sort-merge.
  • the model is implemented on top of a MapReduce platform (Hadoop). But a black-box emulation is not enough, because it results in excess data movement and space usage. So modifications were made in the Hadoop implementation to support the full CPB model and to optimize the treatment of the state:
  • Dryad The computation is structured as a directed graph: programs are graph vertices, while the channels are graph edges.
  • a Dryad job is a graph generator which can synthesize any directed acyclic graph. These graphs can even change during execution, in response to important events in the computation. Dryad is quite expressive. It completely subsumes other computation frameworks, such as Google's map-reduce, or the relational algebra. Moreover, Dryad handles job creation and management, resource management, job monitoring and visualization, fault tolerance, re-execution, scheduling, and accounting.
  • Percolator After presenting map&reduce in order to create the web index for its search service, Google realized that building the complete index from scratch produced a large latency: they have to crawl the entire web, and then process it with a hundred of map&reduce cycles. Reprocessing the entire web discarded the work done in earlier runs and made latency proportional to the size of the repository, rather than the size of an update. They built Percolator, a system for incrementally processing updates to a large dataset, and on top of it the indexing system Caffeine.
  • Percolator provides an interface to access the Bigtable distributed storage system [9]. Bigtable provides lookup and update operations, and transactions with atomic read-modify-write operations on individual rows. Percolator maintains the gist of Bigtable's interface: data is organized into Bigtable's rows and columns, with Percolator metadata stored alongside in special columns. But Percolator provides the features that Bigtable does not: multi-row transactions and the framework to run complex operations in the form of observers, pieces of code that are invoked by the system whenever a user-specified column changes. Percolator applications are structured as a series of observers; each observer completes a task and creates more work for downstream observers by writing to the table.
  • System S Streams (InfoSphere Streams):
  • System S (also called InfoSphere Streams) is a large-scale, distributed data stream processing middleware. In essence an enhanced CEP, but offers:
  • the InfoSphere Streams architecture represents a significant change in computing system organization and capability. Even though it has some similarity to Complex Event Processing (CEP) systems, it is built to support higher data rates and a broader spectrum of input data modalities. It also provides infrastructure support to address the needs for scalability and dynamic adaptability, like scheduling, load balancing, and high availability.
  • CEP Complex Event Processing
  • InfoSphere Streams continuous applications are composed of individual operators, which interconnect and operate on multiple data streams.
  • Data streams can come from outside the system or be produced internally as part of an application.
  • SPADE provides a toolkit of type-generic, built-in stream processing operators (it also provides an intermediate language for flexible decomposition of parallel and distributed data-flow graphs, and a set of stream adapters to ingest and publish data).
  • Applications developed and optimized with SPADE run natively on the Stream Processing Core. Data from input data streams representing a myriad of data types and modalities flow into the system. The layout of the operations performed on that streaming data is determined by high-level system components that translate user requirements into running applications.
  • S4 is a general-purpose, distributed, scalable, partially fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data. S4 was initially developed to personalize search advertising products at Yahoo!, which operate at a rate of thousands of events per second.
  • PEs Processing Elements
  • the architecture resembles the Actors [2] model, providing semantics of encapsulation and location transparency, thus allowing applications to be massively concurrent while exposing a simple programming interface to application developers.
  • This design choice also makes it relatively easy to reason about correctness due to the general absence of side-effects.
  • S4 is logically a message-passing system: computational units, called Processing Elements (PEs), send and receive messages (called Events).
  • PEs Processing Elements
  • Events send and receive messages
  • the S4 framework defines an API which every PE must implement, and provides facilities instantiating PEs and for transporting Events.
  • Events in S4 are arbitrary Java Objects that can be passed between PEs. Adapters convert external data sources into Events that S4 can process. Attributes of events can be accessed via getters in PEs. This is the exclusive mode of communication between PEs.
  • PEs Processing Elements
  • Every PE consumes exactly those events which correspond to the value on which it is keyed. It may produce output events.
  • a PE is instantiated for each value of the key attribute. This instantiation is performed by the platform. For example, in an imaginary word counting application, WordCountPE is instantiated for each word in the input. When a new word is seen in an event, S4 creates a new instance of the PE corresponding to that word.
  • PEs are available for standard tasks such as count, aggregate, join, and so on. Many tasks can be accomplished using standard PEs which requires no additional coding. The task is defined using a configuration file. Custom PEs can easily be programmed using the S4 software development tools. Problems with existing solutions
  • Data stream management systems focus on near-real-time processing of continuously arriving data, so they should be close to stream processing.
  • parallel databases strategies and solutions are used at the core of stream processing platforms, like Percolator.
  • MapReduce based solutions are implemented on Apache Hadoop platform, so, although all of them achieve a better response a lower latency, still have to deal with an architecture designed to work with very large batch processing problems. And there is no implementation in the Apache Hadoop architecture to support a distributed state, accessible and updatable to all jobs. Proposed solutions that offer a state must rely on a static object and/or and external storage to provide it.
  • MapReduce on-line permits to run jobs continuously, but again state must be a structure internal to each task, and must fit completely into memory.
  • Stateful Block Processing is the existing solution closer to the proposed invention.
  • State is provided with BIPtables, with random access by indexing. This is an efficient solution when the update affects a very small portion of the state.
  • CEP oriented solutions
  • Dryad although more flexible, as regards the stream processing requirements, utilizes the same strategy as standard map&reduce.
  • the proposed solutions [1 1] to reduce latency are to automatically identify redundant computation; it caches prior results to avoid re-executing stages or to merge computations with new input. But this solution is out of the programmer's control, so it is not possible to retrieve and work on this "pseudo-state”. Dryad does not offer primitives for state management.
  • Google Percolator is the right solution for the problem of continuously updating the web index, but may be wrong for the typical problems that a Telco operator would face: continuously updating user profile based on Call Data Records (CDRs), mobile network information, navigation logs, etc.
  • CDRs Call Data Records
  • the problem domain is not as large as generating a web index, so the large cluster needed by Percolator may not be economically efficient for Telco type problems.
  • Percolator has shown to be less efficient that rebuilding the whole index from scratch when the update rate is significantly high. This is caused by the query to Bigtable to retrieve the state info for every key is much slower than reading the whole state as a dataset. This inefficiency is not an issue with the 1 trillion web pages on the web (and so the index repository), because every update will affect only a very small fraction of them. But as already stated, the Telco Company Big Data problems may suppose a large portion of the state revisited in every execution wave, so this solution may be very inefficient.
  • the present invention provides, in a first aspect, a method for streaming processing in a map and reduce distributed computing platform, said distributed computing platform comprising at least one cluster with a plurality of nodes with computing capacity, the method using information regarding state associated to at least reduce operations.
  • the method of the first aspect of the invention comprises generating said state as a result of a reduce operation performed by a node, in the form of an output queue, and using said state as an input queue of a subsequent reduce operation performed by said node, said output and input queue forming a single queue which is updated after the reduce operation processing
  • a second aspect of the present invention generally comprises a distributed computing platform system for map and reduce streaming processing, said distributed computing platform comprising at least one cluster with a plurality of nodes with computing capacity and configured for performing at least reduce operations using information regarding state associated thereto,
  • the system of the second aspect of the present invention wherein in that at least one of said nodes comprises at least one input state queue and at least one output state queue, in that said at least one node is configured for generating said state as a result of a reduce operation performed by said at least one node and providing said result to said at least one state output queue, and in that said input and output state queues are interconnected forming a single queue such that there is a feedback from said output state queue to said input state queue.
  • Other embodiments of the second aspect of the invention are described according to appended claims 14 to 18, and in a subsequent section related to the detailed description of several embodiments. Brief Description of the Drawings
  • Figure 1 shows an example of a functional programming diagram, with map (f) and fold (g) functions.
  • Figure 2 shows an example of the word count algorithm implementation in MapReduce.
  • FIG. 3 shows the logical flow of data management used in batch processing mode.
  • Figure 4 shows the data distribution diagram used in batch processing mode.
  • Figure 5 shows the logical data flow at the output of a distributed operation. Information is sent immediately to the destination node worker, instead of being stored locally like in Hadoop implementation
  • Figure 6 shows an example of the key-values consolidation.
  • Figure 7 shows an example of the local data file structure.
  • Figure 8 show an example of the Reading of key-values with associated hash group HG2 on a dataset with three files in the node worker local disk.
  • Figure 9 shows an example of the implementation of multiple inputs/outputs reduce operation.
  • Figure 10 shows the logical schema of the operation scheduler and the management of data in the queues (blocks).
  • Figure 1 1 shows the data flow in stream processing mode.
  • Figure 12 shows the data flow in stream processing mode for one mode.
  • Figure 13 shows the data flow in stream processing mode for multiple node workers.
  • Figure 14 shows the BlockManager strategy, according to an embodiment of the present invention.
  • Figure 15 shows the BlockManager in the node architecture.
  • Figure 16 shows the streaming state model with 2 inputs reduce operation, according to an embodiment of the present invention.
  • Figures 17 shows an example of the interpretation of a stateful reduce operation as a CEP system.
  • Figure 18 represents the state and block distribution in buckets, according to an embodiment of the present invention.
  • Figure 19 illustrates how input data blocks are accumulated in buckets according to the keys of the data they contain.
  • Figure 20 represents an example of the BlockBreak module.
  • the proposed invention proposes an extension of the MapReduce technology that efficiently reduces latency, implements a stream processing, continuous data flow, and allows developing solutions with a state that can be updated at the rate of arriving data.
  • the invention also includes a new module for managing the blocks of key-value pairs. This module keeps in memory the next needed blocks to be executed, and guarantees that idle blocks will be backed up to disk.
  • ⁇ Stream processing operations can be scheduled on data queues, and they are automatically executed when there are input data available.
  • o Queues can be controlled to specify a maximum latency
  • o Queues can be controlled to specify if input data must be deleted after processing, or it must be kept (for example, a queue with static information to be used in a JOIN type reduce operation).
  • This state can be continuously updated with the input data.
  • o In order to efficiently update the state, it is organized as a partition of the key ranges, so it is only processed when there is data to be processed.
  • Hash groups ranges assigned to buckets are dynamically distributed in order to keep bucket size always under control.
  • o Distribution of data into buckets can include a time stamp, which may provide a more efficient data handling, and allows forgetting and deleting unused state entries.
  • o State can be shared among any number of reduce operations.
  • o State (and any other dynamic queue) can be "frozen” in a static database, to be processed later on in batch mode.
  • the invention consists in a system and method to implement a new stream processing capability in medium sized big-data platforms working with the MapReduce paradigm.
  • This new system allows the development of stateful solutions, where state can be updated continuously and efficiently as input data arrives.
  • the new system takes as basement a data managing method that allows a very efficient and low latency implementation of MapReduce operations in batch processing mode.
  • the new system absorbs and extends this model into a new stream processing oriented platform.
  • the scenario is a platform implementing the MapReduce paradigm. Any operation (map, reduce) is distributed among the nodes in the cluster. Each node (or node worker) executes a number of instances of these operations. Each of these instances implies a sequence of sub-operations:
  • Input data is read. If needed (for reduce operations), a sort on the key is performed.
  • the first three steps are performed distributed across all the node workers in the platform.
  • it is always guaranteed that data and processing are always local to the node worker, and distribution of the data is performed on memory.
  • the invention relies on how sub-operations 1 ), 3) and 4) are performed and on how datasets, consisting in key-value pairs groups, are created, distributed and read.
  • the logical sequences of data handling and distribution are depicted in Figure 3 and explained in the following datasets management section.
  • Output data is distributed to the nodes of the platform:
  • key-value pairs are generated and emitted to a specific dataset (in the proposed invention, an operation can have multiple output datasets, so different key-value pairs types can be produced from the same operation).
  • Key-value pairs are distributed to their storage node without passing by the local disk of the emitting node, as this sub-operation is performed in memory and data is sent through the platform network layer. This process is performed without need to communicate with a central controller, as there is a fixed strategy for data distribution in the platform, and every node worker has this strategy.
  • a hash function is computed on the key.
  • the space of possible results of the hash function is partitioned in order to manage a more practical number of groups.
  • the key-value pair After the hash on the key and partitioning functions, the key-value pair has an associated number, that will be used to identify to which node worker it is going to be sent, and also will determine how it will be stored there. This number is called the hash group.
  • a standard function can be used to compute hash and to perform the partitioning of the data, globally optimized and associated to the type of the key, or it can be user provided. This gives the developer the freedom to control the data (and so the task) distribution for specific problems
  • the platform (and every node worker) also knows which node worker the data belongs to (and so, which node must execute the operations on the data with this same hash group).
  • the data is thus sent to the predefined node worker (or node workers) ( Figure 5).
  • the output is managed in memory, and then sent through the network, without being written down to disk. This is one of the core aspects of the invention.
  • the output of the operation is written down to local disk. Then it is sorted, shuffled and copied to the distributed file system, where it is merged. This extra data movement has a big impact on performance.
  • each node data generated from everywhere, is collected and stored locally:
  • key-values produced at all the operation instances running in the other node workers (and itself), with a hash group assigned to that node worker are received by a dedicated process (Consolidator).
  • Consolidator For every hash group, key-values are kept in memory until either the source operations running in the node workers is finished, either a chunk large enough to be dumped to a file has been collected (i.e., 1 GByte) .
  • All the key-values must be available in memory because they have to be written to the disk file as a whole, and have to be sorted before. Then it is possible to dump them to a disk file, so a local file holding all the key-values accumulated to be processed (or replicated) at the node worker is created. If there are more key-value pairs pending, they will be accumulated in the next file.
  • a compacting command is provided that can read back all the data-set files generated at each node worker, and reorganize the hash groups in a new set of disk files. Fragmentation of the hash groups among a large number of disk files can have a negative impact on performance, so this operation is very useful when the same dataset will be used as input to different operations.
  • the key-value pair distribution strategy, together with this possibility of compacting hash groups, are the base for allowing appends to the dataset keeping a good performance at the same time.
  • the structure of the disk files holding the datasets has been designed to offer the best performance for the input data reading sub-operation.
  • the structure of a local data file is the same for all the node workers and for all the key-value datasets (Figure 7).
  • the file has three parts:
  • the files are intended to be 1 GB (configurable) large. If the dataset key-values to be processed by the node worker are larger than the file size, its section of the dataset will be held in more than one file. If there are not enough key-values to fill the 1 GB file, it can be smaller.
  • the node worker retrieves its designated hash groups. Knowing the size of the data associated to every hash group, and the memory size available for the operation, it reads the maximum size of key-values in a single chunk, reducing so the number of I/O operations and maximizing throughput. In each file, the chunk corresponding to the requested key-values is large enough to allow an efficient implementation of the reading function.
  • Each sort operation is very efficient since it is performed with the key- values from each hash group, and is performed in memory after reading the hash group.
  • This distribution strategy guarantees that all the key-value pairs with a common key, from all the datasets in the platform, are stored in the local disk of the same machine.
  • MapReduce implementation Another innovation in this MapReduce implementation is to allow any map or reduce operation to have several inputs or output datasets (multiple inputs map operations do not make much sense).
  • This implementation of JOINs is as efficient as a standard reduce operation and has no size restrictions (unlike map-side or reduce-side JOINs in Hadoop).
  • MapReduce operations Based on the described data management infrastructure and the very low latency obtained, a new way of processing MapReduce operations has been designed, stream processing, which can be as close to real-time as desired. Data is handled in queues, rather than datasets, and when new data arrives to a queue, any number of operations can be executed to process it. Output data is then emitted to other queues, and new operations downstream are executed.
  • Operations are scheduled with a command that defines the operation to be executed and the input and output queues processed:
  • this command schedules a stream operation "parse” (this name identifies the operation together with its set of input and output queues), which will execute a map operation “page.parse_graphs” that reads key-value pairs from input queue “graph.txt” and emits to queue “graph”.
  • Whether input data queue is cleared or not after processing, is determined by a configuration option; by default the desired behaviour is to clean the queue, but in some cases it may be useful to have "static" queues, like catalogs or dictionaries.
  • FIG 1 1 a typical streaming flow is presented. To take into account all the other machines running in the cluster, a more detailed description is presented in Figure 12 and Figure 13. The distribution of key-value pairs through the platform is the same presented in the previous sections.
  • a mechanism for subscription to any queue in the platform allows clients to receive content and updates from any queue in the platform.
  • the ideal scenario would be to keep all data blocks in memory, and the reference to this memory is passed between operations. If there is enough memory, there is no requirement to download the data blocks to the local disk.
  • BlockManager module In Big Data, usually there will not be enough memory to handle all the data to be processed. So a new BlockManager module has been included in the system. This module optimizes and minimizes the disk usage, guaranteeing that all the data blocks needed to execute the next set of tasks, will be available in memory.
  • the BlockManager ( Figure 15) keeps a list of the blocks in the node worker. This is sorted according to the information kept in the tasks scheduling; the blocks that will be needed by operations to be executed earlier and kept first. The last blocks in the list are scheduled to be downloaded to the local disk, with a low priority.
  • the BlockManager guarantees that all the needed blocks are available in memory. If the block was copied to disk, and its memory freed, the BlockManager asks the Disk Manager to retrieve it. If there is not enough memory to copy it, already downloaded blocks will be scanned to free its associated memory, if not yet done.
  • MapReduce technology was oriented to solve problems in batch processing mode: to collect a huge amount of data, process them from scratch, and to obtain a result. But in many cases, data does not come from a static source, but rather they are a sample from a constantly changing source. In this model, the process must be rerun completely, with new and accumulated data, at a convenient rate (once a month, once a week). Or the developer must incorporate the results of the previous batch by hand, when the job has completely finished, and the results have been exported to external storage. In either case, they are inefficient solutions, with a very long latency.
  • the stream processing method can be easily extended to incorporate a state that can be treated as a special kind of queue.
  • the output state is a queue by a reduce operation that has 2 inputs, one which is the output from a previous cycle ( Figure 16).
  • reduce operations with multiple inputs allow an easy implementation of JOINs. This same interpretation can be extended to update the state, because the reduce operation will automatically be called with the state information and the input data records with the same key.
  • This operation with internal state could be interpreted as a CEP system, with a node for each of the keys in the data ( Figure 17). Each of these nodes must keep the value associated to its key.
  • this strategy would have scalability problems as the state (the number of possible keys) grows.
  • One of the problems of implementing a state in a MapReduce like platform is the need to reevaluate the whole state with every wave of new input data.
  • One extreme solution would be the strategy of rereading the state as an input (this one has other problems, mainly the very long latency, because result state must be completely computed by all node workers before being emitted as output and available as input).
  • a similar solution to data distribution by hash-group, described before for the data managing across the platform, can be extended to organize state information.
  • the state is split in a number of key hash-groups range, or Buckets (the range of hash-groups is already split among the node workers).
  • input data blocks usually contain a mix of key-value pairs corresponding to several state buckets (Figure 19) (though all of them must correspond to the portion of the state stored in the current node worker).
  • the simplest solution is to send the input data block to all the buckets with at least a common key hash-group, where they will be automatically processed, through the reduce operation; outer key-value pairs will be ignored. This is proven to be faster the data blocks are kept in memory, for reference. But this strategy implies that each bucket, a lot (depending on the block creation and the number of buckets) of useless information is maintained, key-values that are not going to be processed in that bucket. As the number of buckets grows, the size of local key-values (as opposed to key- values that will be processed in another bucket) is reduced, for the same memory limit, so the performance impact increases.
  • state is split into as many buckets as processors are defined in the platform (this parameter is common to every node worker). As they may grow, either because there are new keys, or because there are more information associated to the keys, a mechanism has been included to split large buckets into new ones, keeping their size under control, and allowing thus efficiency in update operations.
  • new incoming data and its update operation is stopped; new buckets are created, and the hash-group associated to the bucket is split.
  • the invention has been implemented in a Big-Data platform, a MapReduce framework (in experimental prototype status). With this framework, different problems relevant to the Telco business and other typical problems have been programmed and evaluated. A few examples of problems implemented:
  • SNA Social Network Analysis
  • Top hits Analysis of web navigation logs, in order to detect the most active hits per category, and recommend them according to the user profile.
  • OSN Navigation Traffic
  • the invention is meant to solve, in a very efficient way, the problem of processing a continuous data flow, having results up-to-date, and without the need of accumulate and reprocess all data, as would be required in batch oriented MapReduce platforms.
  • Data flow is processed in stream mode, keeping latency as low as required, because latency scales down with the number of machines in the cluster.
  • a client can subscribe to any queue for receiving its content and updates.
  • the internal state is continuously updated as new data is processed, without the need of store and reprocess all the accumulated data, as it would be required in a batch oriented MapReduce platform.
  • the state size can grow to the limit of the available global disk in the platform.
  • Memory and disk coordinated management guarantees that all memory is used efficiently to keep in memory the data blocks pronest to be used. 7. Operations are faster if input data are available in memory and output data may be kept also in memory, without going through disk.

Landscapes

  • Engineering & Computer Science (AREA)
  • Theoretical Computer Science (AREA)
  • Software Systems (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Computational Linguistics (AREA)
  • Data Mining & Analysis (AREA)
  • Databases & Information Systems (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
  • Image Processing (AREA)

Abstract

The method comprising said distributed computing platform comprising at least one cluster with a plurality of nodes with computing capacity, the method using information regarding state associated to at least reduce operations, the method comprises generating said state as a result of a reduce operation performed by a node, in the form of an output queue, and using said state as an input queue of a subsequent reduce operation performed by said node, said output and input queue forming a single queue which is updated after the reduce operation processing. The system is arranged for implementing the method of the present invention.

Description

Method and system for streaming processing in a map and reduce distributed computing platform
Field of the art
The present invention generally relates, in a first aspect, to a method for streaming processing in a map and reduce distributed computing platform, and more specifically to a method for optimizing the performance of processing continuous data flow in a distributed computing platforms.
A second aspect of the invention relates to a system arranged for implementing the method of the first aspect.
By means of state, it will be understood any information of interest that has to be maintained and evolve over time being continuously updated (e.g. a user profile for recommendations). This state will contain all the relevant information for each element to be modeled.
Prior State of the Art
The huge (and growing) amount of data generated and available nowadays has driven the development of paradigms, platforms and applications to process and create value of all this information. This process has been boosted by the appearance of the MapReduce (or map&reduce, MR) paradigm and its implementation in an open source project (Apache Hadoop) and an ecosystem of products oriented to Big Data solutions. In this ecosystem, NoSQL solutions can also be found.
But more and more, "Big" does not only means "huge amount", but also stronger requirements in other dimensions of the solution space. Apache Hadoop is great for simple batch processing tasks with Terabytes and Petabytes of data involved, but companies and institutions are confronting more complex tasks: graph computing, interactive analysis, complex joins, Atomicity, Consistency, Isolation and Durability (ACID) requirements, real-time requirements or the need for continuous incremental updates (all of them keeping the need of processing the volume of relevant information generated).
These requirements are being covered with different strategies: evolutions of the MapReduce paradigm and its implementation, specialized solutions for graph processing, Complex Event Processing (CEP) solutions, and even a trend of innovation in SQL [1]. In 2004, Google presented the MapReduce paradigm [2], a programming model for expressing distributed computations on massive datasets and an execution framework for large-scale data processing. It allows distributing very large data problems on a cluster of machines.
Theory on distributed and parallel processing had been being in the core of computer science for a long time (and MR is based on functional programming principles), but the new proposal arrived just in time to solve a growing problem of dealing with the new sources of data and extracting information and value out of them.
Long before, there was already a need to work with large datasets [3]. RDBMS have also been evolving to larger and larger systems. CEP systems have had their niche for some years, and recently, more and more are scaling up to deal with Big Data problems.
But MR is still at the core of Big Data processing, and it is being evolved to solve the new arising problems. At the beginning there was the need to process huge amount of data in a distributed way to have solutions in a reasonable time, and the MR paradigm in 2004 and its open-source implementation with Apache Hadoop were the right solution. Then the distributed space of key-values proved to be useful to store and retrieve information in a cost effective way. This was the source of BigTable and Dynamo solutions, and a number of products (Cassandra, MongoDB...). And now the focus is moving to real-time processing. This path is providing new solutions based on MR (Google's Percolator, Yahoo's S4) and a number of MR evolutions to improve time- response.
The proposed solution is also an evolution of MR, so this technical background will present in more detail this technology.
MapReduce and Apache Hadoop implementation
MapReduce (MR) is a paradigm evolved from functional programming and applied to distributed systems. It was presented in 2004 by Google [2]. It is meant for processing problems whose solution can be expressed in commutative and associative functions.
In essence, MR offers an abstraction for processing large datasets on a set of machines, configured in a cluster. With this abstraction, the platform can easily solve the synchronization problem, freeing the developer thus of thinking about that issue.
All data of these datasets is stored, processed and distributed in the form of key-value pairs, where both the key and the value can be of any data type. From the field of functional programming, it is proved that any problem whose solution can be expressed in terms of commutative and associative functions, can be expressed in two types of functions: map (named also map in the MR paradigm) and fold (named reduce in the MR paradigm). Any job must be expressed as a sequence of these functions. These functions have a restriction: they operate on some input data, and produce a result without side effects, i.e. without modifying neither the input data nor any global state. This restriction is the key point to allow an easy parallelization.
Given a list of elements, map takes as an argument a function f (that takes a single argument) and applies it to all elements in a list (the top part of the Figure 1 ), returning a list or results. The second step, fold, accumulates a new result by iterating through the elements in the result list. It takes three parameters: a base value, a list, and a function, e.g. Typically, map and fold are used in combination. The output of one function is the input of the next one (as functional programming avoids state and mutable data, all the computation must progress by passing results from one function to the next one), and this type of functions can be cascaded until finishing the job.
In the map type of function, a user-specified computation is applied over all input records in a dataset. As the result depends only on the input data, the task can be split among any number of instances (the mappers), each of them working on a subset of the input data, and can be distributed among any number of machines. These operations occur in parallel. Every key-value pair in the input data is processed, and they can produce none, one or multiple key-value pairs, with the same or different information. They yield intermediate output that is then dumped to the reduce functions.
The reduce phase has the function to aggregate the results disseminated in the map phase. In order to do so, all the results from all the mappers are sorted by the key element of the key-value pair, and the operation is distributed among a number of instances (the reducers, also running in parallel among the available machines). The platform guarantees that all the key-value pairs with the same key are presented to the same reducer. This phase has so the possibility to aggregate the information emitted in the map phase.
The job to be processed can be divided in any number of implementations of these two-phase cycles.
The platform provides the framework to execute these operations distributed in parallel in a number of CPUs. The only point of synchronization is at the output of the map phase, were all the key-values must be available to be sorted and redistributed. This way, the developer has only to care about the implementation (according to the limitations of the paradigm) of the map and reduce functions, and the platform hides the complexity of data distribution and synchronization. Basically, the developer can access the combined resources (CPU, disk, memory) of the whole cluster, in a transparent way. The utility of the paradigm arises when dealing with big data problems, where a single machine has not enough memory to handle all the data or its local disk would not be big and fast enough to cope with all the data.
The entire process can be presented in a simple, typical example: word frequency computing in a large set of documents. A simple word count algorithm in MapReduce is shown in Figure 2. This algorithm counts the number of occurrences of every word in a text collection. Input key-value pairs take the form of (docid, doc) pairs stored on the distributed file system, where the former is a unique identifier for the document, and the latter is the content of the document. The mapper takes an input key-value pair, tokenizes the document, and emits an intermediate key-value pair for every word. The key would be a string (the word itself) while the value is the count of the occurrences of the word (an integer). In an initial approximation, it will be a "1 " (denoting that we've seen the word once). The MapReduce execution framework guarantees that all values associated with the same key are brought together in the reducer. Therefore, the reducer simply needs to sum up all counts (ones) associated with each word, and to emit final key-value pairs with the word as the key, and the count as the value.
This paradigm has had a number of different implementations: the already presented by Google, patent US 7,650,331 , the open source project Apache Hadoop, that is the most prominent and widely used implementation, and a number of implementations of the same concept: Sector/Sphere (written in Erlang). Microsoft has also developed a framework for parallel computing, Dryad, which is a kind of superset of MapReduce.
These implementations have been developed to solve a number of problems (task scheduling, scalability, fault tolerance...). One such problem is how to ensure that every task will have the input data available as soon as it is needed, without making network and disk input/output the system bottleneck (a difficulty inherent in big-data problems).
Most of these implementations (Google, Hadoop, Sphere, Dryad...) rely on data a distributed file-system for data management. Data files are split in large chunks (e.g. 64MB), and these chunks are stored and replicated to a number of data nodes. Tables keep track on how data files are split and where the replica for each chunk resides. When scheduling a task, the distributed file system can be queried to determine the node that has the required data to fulfill the task. The node that has the data (or one nearby) is selected to execute the operation, reducing network traffic.
The main problem of this model is the increased latency. Data can be distributed and processed in a very large number of machines, and synchronization is provided by the platform in a transparent way to the developer. But this ease of use has a price: no reduce operation can start until all the map operations have finished and their results are placed on the distributed file-system. These limitations increase the response time, and this response time limits the type of solutions where a "standard" MR solution can be applied. This is the main reason for a number of evolutions to the model, focused on improving response time. The main modification proposals are:
Twister: Original implementations [4] focus on performing single step MapReduce (computations that involve only one application of MapReduce) with better fault tolerance, and therefore store most of the data outputs to some form of file system throughout the computation. The repetitive application of MR creates new map/reduce tasks in each iteration loading or accessing any static data repetitively. This strategy introduces considerable performance overheads for many iterative applications. Twister is an enhanced MapReduce runtime that uses messaging infrastructure for communication and data transfers, and supports long running map/reduce tasks. Intermediate data are kept in memory.
MapReduce Online [5]: Another factor with a direct impact on latency is the needs of synchronizing at the beginning of reduce operation. It cannot start until all the map tasks have finished, so the time-response is very sensitive to the "stragglers". Furthermore, as the entire output of each task is materialized to a local (or distributed) file, it is not possible to return the results of a query until all the complete job has finished. In this evolution, the proposed architecture pipelines data between operators, while preserving the programming interfaces and fault tolerance models of the original framework.
Hash-Based MapReduce [6]: MapReduce implementations require the entire data set to be loaded into the cluster before running analytical queries, thereby incurring long latencies and making them unsuitable for producing incremental results. In order to support incremental analytics, blocking operations and computational and I/O bottlenecks should be avoided. In this solution, sort-merge implementation is replaced with a purely hash-based framework, which is designed to address the problems of the sort-merge.
Stateful Bulk Processing [7]: This is the implementation of architecture for continuously bulk processing (CBP). The core is a flexible, groupwise processing operator that takes state as an explicit input. Unifying stateful programming with a data- parallel operator affords opportunities for minimizing the movement of data in the underlying processing system. This new operator is translating. It is run repeatedly, allowing users to easily store and retrieve state as new data inputs arrive.
The model is implemented on top of a MapReduce platform (Hadoop). But a black-box emulation is not enough, because it results in excess data movement and space usage. So modifications were made in the Hadoop implementation to support the full CPB model and to optimize the treatment of the state:
• multiple inputs and outputs
• incremental shuffling for loopback flows
· state random access to the with index
• multicast and broadcast routing
There are also other solutions that, though inspired by MR, are a completely break with this model. They also address the problem of long latency, and more and more are oriented to real-time analytics:
Dryad: The computation is structured as a directed graph: programs are graph vertices, while the channels are graph edges. A Dryad job is a graph generator which can synthesize any directed acyclic graph. These graphs can even change during execution, in response to important events in the computation. Dryad is quite expressive. It completely subsumes other computation frameworks, such as Google's map-reduce, or the relational algebra. Moreover, Dryad handles job creation and management, resource management, job monitoring and visualization, fault tolerance, re-execution, scheduling, and accounting.
Percolator [8] : After presenting map&reduce in order to create the web index for its search service, Google realized that building the complete index from scratch produced a large latency: they have to crawl the entire web, and then process it with a hundred of map&reduce cycles. Reprocessing the entire web discarded the work done in earlier runs and made latency proportional to the size of the repository, rather than the size of an update. They built Percolator, a system for incrementally processing updates to a large dataset, and on top of it the indexing system Caffeine.
Percolator provides an interface to access the Bigtable distributed storage system [9]. Bigtable provides lookup and update operations, and transactions with atomic read-modify-write operations on individual rows. Percolator maintains the gist of Bigtable's interface: data is organized into Bigtable's rows and columns, with Percolator metadata stored alongside in special columns. But Percolator provides the features that Bigtable does not: multi-row transactions and the framework to run complex operations in the form of observers, pieces of code that are invoked by the system whenever a user-specified column changes. Percolator applications are structured as a series of observers; each observer completes a task and creates more work for downstream observers by writing to the table.
System S Streams (InfoSphere Streams): System S (also called InfoSphere Streams) is a large-scale, distributed data stream processing middleware. In essence an enhanced CEP, but offers:
• Scale-out
• Support for complex data types
• Security and general industrial-strength
The InfoSphere Streams architecture represents a significant change in computing system organization and capability. Even though it has some similarity to Complex Event Processing (CEP) systems, it is built to support higher data rates and a broader spectrum of input data modalities. It also provides infrastructure support to address the needs for scalability and dynamic adaptability, like scheduling, load balancing, and high availability.
In InfoSphere Streams continuous applications are composed of individual operators, which interconnect and operate on multiple data streams. Data streams can come from outside the system or be produced internally as part of an application.
In this framework, SPADE [10] provides a toolkit of type-generic, built-in stream processing operators (it also provides an intermediate language for flexible decomposition of parallel and distributed data-flow graphs, and a set of stream adapters to ingest and publish data). Applications developed and optimized with SPADE run natively on the Stream Processing Core. Data from input data streams representing a myriad of data types and modalities flow into the system. The layout of the operations performed on that streaming data is determined by high-level system components that translate user requirements into running applications.
S4: S4 is a general-purpose, distributed, scalable, partially fault-tolerant, pluggable platform that allows programmers to easily develop applications for processing continuous unbounded streams of data. S4 was initially developed to personalize search advertising products at Yahoo!, which operate at a rate of thousands of events per second.
In S4, keyed data events are routed with affinity to Processing Elements (PEs), which consume the events and do one or both of the following
• emit one or more events which may be consumed by other PEs,
• publish results, possibly to an external data store or consumer.
The architecture resembles the Actors [2] model, providing semantics of encapsulation and location transparency, thus allowing applications to be massively concurrent while exposing a simple programming interface to application developers. This design choice also makes it relatively easy to reason about correctness due to the general absence of side-effects.
S4 is logically a message-passing system: computational units, called Processing Elements (PEs), send and receive messages (called Events). The S4 framework defines an API which every PE must implement, and provides facilities instantiating PEs and for transporting Events.
Events in S4 are arbitrary Java Objects that can be passed between PEs. Adapters convert external data sources into Events that S4 can process. Attributes of events can be accessed via getters in PEs. This is the exclusive mode of communication between PEs.
Events are dispatched in named streams. These streams are identified by a string-valued stream name.
Processing Elements (PEs) are the basic computational units in S4. They consume events and can in turn emit new events and update their state. Each instance of a PE is uniquely identified by four components:
• its functionality as defined by a PE class and associated configuration, • the named stream that it consumes,
• the keyed attribute in those events, and
• the value of the keyed attribute in events which it consumes.
Every PE consumes exactly those events which correspond to the value on which it is keyed. It may produce output events. Note that a PE is instantiated for each value of the key attribute. This instantiation is performed by the platform. For example, in an imaginary word counting application, WordCountPE is instantiated for each word in the input. When a new word is seen in an event, S4 creates a new instance of the PE corresponding to that word.
Several PEs are available for standard tasks such as count, aggregate, join, and so on. Many tasks can be accomplished using standard PEs which requires no additional coding. The task is defined using a configuration file. Custom PEs can easily be programmed using the S4 software development tools. Problems with existing solutions
As shown, the original MapReduce paradigm and its open source implementation (Apache Hadoop) are very well suited for batch processing, taking a very large amount of data as input, processing them and generating an output. The long latency problem of these solutions is not a serious obstacle, as long as they are used in the scenarios they were designed for. But when a fast response is needed, and when solutions must maintain an updated state with the new data, this model starts to become inefficient. Even against the lack of a model to support an internal state (the only solution available in this technology is to process input data, to emit state as any other output data, and when all the batch processing is finished, to reread state as input.
As these problems of the standard solution are already assumed and detailed in most of the references of the existing solutions (as all of them present solutions to these problems), this document will go no further with them. Analysis will concentrate on the new proposed solutions.
There are quite a number of proposed solutions to solve the problem of realtime or streaming process in the Big Data scenario. To simplify the analysis, they will be grouped by the underlying technology:
• Traditional RDBMS • Systems based on MapReduce and Apache Hadoop implementation
• Complex Event Processing oriented solutions
• Other paradigms Traditional RDBMS
Data stream management systems focus on near-real-time processing of continuously arriving data, so they should be close to stream processing. In fact, parallel databases strategies and solutions are used at the core of stream processing platforms, like Percolator.
But if no other intelligence is built around the database, for this solution to be efficient, the records must reside in memory, which limits (or makes very expensive, or impossible) the availability for very large data problems. As they have been designed to give fast answer to queries made by a human operator, database systems tend to emphasize latency over throughput, so usually it is not able to work with the required data input rate. This limit is also affected by the ACID requirement.
MapReduce based solutions
Most of MapReduce based solutions are implemented on Apache Hadoop platform, so, although all of them achieve a better response a lower latency, still have to deal with an architecture designed to work with very large batch processing problems. And there is no implementation in the Apache Hadoop architecture to support a distributed state, accessible and updatable to all jobs. Proposed solutions that offer a state must rely on a static object and/or and external storage to provide it.
Twister optimizes iterations in MapReduce jobs, allowing access to a static state. Map and reduce tasks may persist across iterations, amortizing the cost of loading the static state. However, the state cannot change during iteration. Another limitation is that state must fit into memory, as it is handled as static data and it is not managed by the distributed platform.
MapReduce on-line permits to run jobs continuously, but again state must be a structure internal to each task, and must fit completely into memory.
Stateful Block Processing is the existing solution closer to the proposed invention. State is provided with BIPtables, with random access by indexing. This is an efficient solution when the update affects a very small portion of the state. CEP oriented solutions
CEP systems (System S, S4) focus in digesting and processing input events as they arrive. There is no explicit concept of state, and though it can be implemented as local storage in memory associated to the processing nodes, it does not scale: the whole 'state' must fit into memory, as there is no support for distributing state among machines in the cluster. The main problem of CEP oriented solutions (System S, S4) is the need for the whole state to fit into memory. Other
Dryad, although more flexible, as regards the stream processing requirements, utilizes the same strategy as standard map&reduce. The proposed solutions [1 1] to reduce latency are to automatically identify redundant computation; it caches prior results to avoid re-executing stages or to merge computations with new input. But this solution is out of the programmer's control, so it is not possible to retrieve and work on this "pseudo-state". Dryad does not offer primitives for state management.
A solution that applies a different approach to MapReduce and does not suffer from the "in-memory state" limitation is Google's Percolator.
In the same way as MapReduce was designed to build their web index, this platform has been designed to update it when a very small part of the web has been crawled. The main focus is the consistency of the state (the index). It is even more resource consuming than the original MapReduce (although the incremental way of working and the reduction in latency makes worth the extra resources).
Google Percolator is the right solution for the problem of continuously updating the web index, but may be wrong for the typical problems that a Telco operator would face: continuously updating user profile based on Call Data Records (CDRs), mobile network information, navigation logs, etc. On one hand, the problem domain is not as large as generating a web index, so the large cluster needed by Percolator may not be economically efficient for Telco type problems.
On the other hand, Percolator has shown to be less efficient that rebuilding the whole index from scratch when the update rate is significantly high. This is caused by the query to Bigtable to retrieve the state info for every key is much slower than reading the whole state as a dataset. This inefficiency is not an issue with the 1 trillion web pages on the web (and so the index repository), because every update will affect only a very small fraction of them. But as already stated, the Telco Company Big Data problems may suppose a large portion of the state revisited in every execution wave, so this solution may be very inefficient.
Description of the Invention
It is necessary to offer an alternative to the state of the art which covers the gaps found therein, particularly related to the lack of proposals which really allow the optimization for performing the processing of continuous data flow in a distributed computing platform.
To that end, the present invention provides, in a first aspect, a method for streaming processing in a map and reduce distributed computing platform, said distributed computing platform comprising at least one cluster with a plurality of nodes with computing capacity, the method using information regarding state associated to at least reduce operations.
On contrary to the known proposals, the method of the first aspect of the invention, comprises generating said state as a result of a reduce operation performed by a node, in the form of an output queue, and using said state as an input queue of a subsequent reduce operation performed by said node, said output and input queue forming a single queue which is updated after the reduce operation processing
Other embodiments of the method of the first aspect of the invention are described according to appended claims 2 to 12, and in a subsequent section related to the detailed description of several embodiments.
A second aspect of the present invention generally comprises a distributed computing platform system for map and reduce streaming processing, said distributed computing platform comprising at least one cluster with a plurality of nodes with computing capacity and configured for performing at least reduce operations using information regarding state associated thereto, On contrary to the known proposals, the system of the second aspect of the present invention wherein in that at least one of said nodes comprises at least one input state queue and at least one output state queue, in that said at least one node is configured for generating said state as a result of a reduce operation performed by said at least one node and providing said result to said at least one state output queue, and in that said input and output state queues are interconnected forming a single queue such that there is a feedback from said output state queue to said input state queue. Other embodiments of the second aspect of the invention are described according to appended claims 14 to 18, and in a subsequent section related to the detailed description of several embodiments. Brief Description of the Drawings
The previous and other advantages and features will be more fully understood from the following detailed description of embodiments, with reference to the attached drawings, which must be considered in an illustrative and non-limiting manner, in which:
Figure 1 shows an example of a functional programming diagram, with map (f) and fold (g) functions.
Figure 2 shows an example of the word count algorithm implementation in MapReduce.
Figure 3 shows the logical flow of data management used in batch processing mode.
Figure 4 shows the data distribution diagram used in batch processing mode. Figure 5 shows the logical data flow at the output of a distributed operation. Information is sent immediately to the destination node worker, instead of being stored locally like in Hadoop implementation
Figure 6 shows an example of the key-values consolidation.
Figure 7 shows an example of the local data file structure.
Figure 8 show an example of the Reading of key-values with associated hash group HG2 on a dataset with three files in the node worker local disk.
Figure 9 shows an example of the implementation of multiple inputs/outputs reduce operation.
Figure 10 shows the logical schema of the operation scheduler and the management of data in the queues (blocks).
Figure 1 1 shows the data flow in stream processing mode.
Figure 12 shows the data flow in stream processing mode for one mode.
Figure 13 shows the data flow in stream processing mode for multiple node workers.
Figure 14 shows the BlockManager strategy, according to an embodiment of the present invention.
Figure 15 shows the BlockManager in the node architecture. Figure 16 shows the streaming state model with 2 inputs reduce operation, according to an embodiment of the present invention.
Figures 17 shows an example of the interpretation of a stateful reduce operation as a CEP system.
Figure 18 represents the state and block distribution in buckets, according to an embodiment of the present invention.
Figure 19 illustrates how input data blocks are accumulated in buckets according to the keys of the data they contain.
Figure 20 represents an example of the BlockBreak module.
Detailed Description of Several Embodiments
The proposed invention proposes an extension of the MapReduce technology that efficiently reduces latency, implements a stream processing, continuous data flow, and allows developing solutions with a state that can be updated at the rate of arriving data.
The invention also includes a new module for managing the blocks of key-value pairs. This module keeps in memory the next needed blocks to be executed, and guarantees that idle blocks will be backed up to disk.
The main objectives solved by the invention:
· Stream processing: operations can be scheduled on data queues, and they are automatically executed when there are input data available.
o Queues can be controlled to specify a maximum latency, o Queues can be controlled to specify if input data must be deleted after processing, or it must be kept (for example, a queue with static information to be used in a JOIN type reduce operation).
o Very efficient low latency with a Block Manager module that optimizes resources in order to keep active data blocks in memory.
• Includes the concept of state in the MapReduce platform.
o This state can be continuously updated with the input data. o In order to efficiently update the state, it is organized as a partition of the key ranges, so it is only processed when there is data to be processed. o Hash groups ranges assigned to buckets are dynamically distributed in order to keep bucket size always under control.
o Distribution of data into buckets can include a time stamp, which may provide a more efficient data handling, and allows forgetting and deleting unused state entries.
o State can be shared among any number of reduce operations.
o State (and any other dynamic queue) can be "frozen" in a static database, to be processed later on in batch mode.
The invention consists in a system and method to implement a new stream processing capability in medium sized big-data platforms working with the MapReduce paradigm. This new system allows the development of stateful solutions, where state can be updated continuously and efficiently as input data arrives.
The new system takes as basement a data managing method that allows a very efficient and low latency implementation of MapReduce operations in batch processing mode. The new system absorbs and extends this model into a new stream processing oriented platform.
In order to explain the innovative features of the new platform, the basic data management method will be presented, though it is already subject of a pendent patent request.
Operations management in batch processing mode:
The scenario is a platform implementing the MapReduce paradigm. Any operation (map, reduce) is distributed among the nodes in the cluster. Each node (or node worker) executes a number of instances of these operations. Each of these instances implies a sequence of sub-operations:
1. Input data is read. If needed (for reduce operations), a sort on the key is performed.
2. The body of the operation itself is run. This is the piece of code written by the developer that forms part of a particular solution.
3. Output data is distributed to the nodes of the platform.
4. In each node, data generated from everywhere, is collected and stored locally.
The first three steps are performed distributed across all the node workers in the platform. With the proposed invention, it is always guaranteed that data and processing are always local to the node worker, and distribution of the data is performed on memory.
The invention relies on how sub-operations 1 ), 3) and 4) are performed and on how datasets, consisting in key-value pairs groups, are created, distributed and read. The logical sequences of data handling and distribution are depicted in Figure 3 and explained in the following datasets management section.
Datasets management in batch processing mode
Output data is distributed to the nodes of the platform:
At the output of an operation (either map or reduce), key-value pairs are generated and emitted to a specific dataset (in the proposed invention, an operation can have multiple output datasets, so different key-value pairs types can be produced from the same operation).
Key-value pairs are distributed to their storage node without passing by the local disk of the emitting node, as this sub-operation is performed in memory and data is sent through the platform network layer. This process is performed without need to communicate with a central controller, as there is a fixed strategy for data distribution in the platform, and every node worker has this strategy.
The schema is showed in Figure 4. When an instance of an operation (parse, map or reduce) running in a node worker (#2, for example) generates and emits a key- value pair (dotted line), it is distributed through the platform network layer to the final storage node (or nodes, if redundancy is working). The distribution criterion is based in the key field.
A hash function is computed on the key. The space of possible results of the hash function is partitioned in order to manage a more practical number of groups. After the hash on the key and partitioning functions, the key-value pair has an associated number, that will be used to identify to which node worker it is going to be sent, and also will determine how it will be stored there. This number is called the hash group.
A standard function can be used to compute hash and to perform the partitioning of the data, globally optimized and associated to the type of the key, or it can be user provided. This gives the developer the freedom to control the data (and so the task) distribution for specific problems
Knowing the hash group of the key, the platform (and every node worker) also knows which node worker the data belongs to (and so, which node must execute the operations on the data with this same hash group). The data is thus sent to the predefined node worker (or node workers) (Figure 5). The output is managed in memory, and then sent through the network, without being written down to disk. This is one of the core aspects of the invention. In other solutions ([2]), the output of the operation is written down to local disk. Then it is sorted, shuffled and copied to the distributed file system, where it is merged. This extra data movement has a big impact on performance.
In contrast, the distribution of data in memory has the inconvenient that if there is a machine failure, all the operation has to be rerun from scratch. So the new solution is not very well suited to environments where machine failure may be frequent (very large clusters, with thousands of machines).
In each node, data generated from everywhere, is collected and stored locally: In each node (Figure 6), key-values produced at all the operation instances running in the other node workers (and itself), with a hash group assigned to that node worker, are received by a dedicated process (Consolidator). For every hash group, key-values are kept in memory until either the source operations running in the node workers is finished, either a chunk large enough to be dumped to a file has been collected (i.e., 1 GByte) .
All the key-values must be available in memory because they have to be written to the disk file as a whole, and have to be sorted before. Then it is possible to dump them to a disk file, so a local file holding all the key-values accumulated to be processed (or replicated) at the node worker is created. If there are more key-value pairs pending, they will be accumulated in the next file.
Using this file size limitation (i.e., 1 GB) can produce data-set fragmentation, a compacting command is provided that can read back all the data-set files generated at each node worker, and reorganize the hash groups in a new set of disk files. Fragmentation of the hash groups among a large number of disk files can have a negative impact on performance, so this operation is very useful when the same dataset will be used as input to different operations. The key-value pair distribution strategy, together with this possibility of compacting hash groups, are the base for allowing appends to the dataset keeping a good performance at the same time.
The pre-known (although flexible and modifiable) distribution of hash group values among the node workers guarantees the locality of operations and its corresponding input data. This simplifies the task of distribution and it boosts performance. This is another core aspect of the invention.
Input data is read:
The structure of the disk files holding the datasets has been designed to offer the best performance for the input data reading sub-operation.
The structure of a local data file is the same for all the node workers and for all the key-value datasets (Figure 7). The file has three parts:
• A header with information about the file contents and its type and coding
• One structure per hash group identifier (typically 65536, but this number can be changed in the platform when dealing with very large datasets). Each structure has:
o The offset to the file section where the key-values which correspond to the selected hash group are stored
o The size of the set of key-value pairs belonging to the hash- group (this information could be computed from the offset of the next hash-group, but the extra size cost simplifies the management.
· All the sequence of key-values that corresponds to the hash groups represented in the file.
The files are intended to be 1 GB (configurable) large. If the dataset key-values to be processed by the node worker are larger than the file size, its section of the dataset will be held in more than one file. If there are not enough key-values to fill the 1 GB file, it can be smaller.
As presented in Figure 7, for all the dataset files in every node worker, space is reserved to represent all the possible hash groups (from 0 to 65536). The distribution strategy guarantees that in every node worker, only key-values having specific hash groups will arrive and be stored, and so only a subset of this space will be used (different subset in different node workers). This represents a small penalty, but simplifies data handling and offers a simple way to work with duplicated files across the cluster nodes, and so it gives some fault tolerance.
When the key-values have to be read, in order to be processed, the node worker retrieves its designated hash groups. Knowing the size of the data associated to every hash group, and the memory size available for the operation, it reads the maximum size of key-values in a single chunk, reducing so the number of I/O operations and maximizing throughput. In each file, the chunk corresponding to the requested key-values is large enough to allow an efficient implementation of the reading function.
In Figure 8, they key-values of a dataset have been stored in three files in the node worker local disk. In order to retrieve the data corresponding to a set of hashgroups (from hashgroup N to hashgroup N+k, for a total size of 1 GB) the platform has to read on the three files. As one hash group is distributed among more and more disk files, the reading function may be less and less efficient (many readings of small size). The compacting functionality presented previously performs a defragmentation of the hash group distribution, and allows recovering the reading efficiency.
If needed (if the operation to be run is a Reduce), a local sort is run on every hash group. Each sort operation is very efficient since it is performed with the key- values from each hash group, and is performed in memory after reading the hash group.
With this architecture, there is no restriction to the number of input datasets an operation can have. In particular, this property simplifies and speeds up operations like JOINs, very frequent in database oriented, big-data applications. This structure is key to the data management in the platform. This functionality is fully described in next section, as it constitutes one of the main claims of the invention.
Multiple inputs, multiple outputs operations:
In the previous section, a new data management strategy for MapReduce key- value datasets has been presented, with a fixed key-value pairs distributed by hash- group.
This distribution strategy guarantees that all the key-value pairs with a common key, from all the datasets in the platform, are stored in the local disk of the same machine.
Another innovation in this MapReduce implementation is to allow any map or reduce operation to have several inputs or output datasets (multiple inputs map operations do not make much sense).
An immediate consequence of having all the key-value pairs with a common key, from all the datasets in the platform, stored in the local disk of the same machine, is that it is very simple and efficient to implement multiple input reduce operations (Figure 9). All the key-value pairs from all the datasets in the platform, with a common key, are kept in the same machine, so reduce operations will always have local access to any required dataset.
Obviously, the only requirement is that all input datasets must have the same data type as key, so the key can be compared among them.
JOIN operations implementation:
As a result of both previous innovations, reduce operations with several input datasets will have the datasets automatically partitioned and sorted in the same way, so the basic condition to perform a JOIN type operation is automatically met in the platform. This is possible because the dataset distribution strategy has put all the key- values with a common key, from its input datasets, in the local disk of the same machine. So there is no conflict (as it would be in a platform based on a distributed file system, like Apache Hadoop and HDFS) in assigning reducers to machines where local data lives.
This implementation of JOINs is as efficient as a standard reduce operation and has no size restrictions (unlike map-side or reduce-side JOINs in Hadoop).
Operations management in stream processing mode:
Based on the described data management infrastructure and the very low latency obtained, a new way of processing MapReduce operations has been designed, stream processing, which can be as close to real-time as desired. Data is handled in queues, rather than datasets, and when new data arrives to a queue, any number of operations can be executed to process it. Output data is then emitted to other queues, and new operations downstream are executed.
Operations are scheduled with a command that defines the operation to be executed and the input and output queues processed:
add stream operation parse page . parse graph graph . txt graph
For example, this command schedules a stream operation "parse" (this name identifies the operation together with its set of input and output queues), which will execute a map operation "page.parse_graphs" that reads key-value pairs from input queue "graph.txt" and emits to queue "graph".
A number of properties can be defined in this stream_operation to control its scheduling:
· maxjatency min_input_queue_size
min_time_between_schedulings
max_number_parallel_operations
max_output_queue_size
any other kind of restriction to control the execution of the operation.
There is a streaming task scheduler that whenever any of these properties is met, prepares the operation to be executed. Once all input data is consumed, the operation ends and the task scheduler continue to evaluate the relevant parameters. A logical schema of the operation scheduler and the management of the data in the queues (blocks) is presented in Figure 10.
Whether input data queue is cleared or not after processing, is determined by a configuration option; by default the desired behaviour is to clean the queue, but in some cases it may be useful to have "static" queues, like catalogs or dictionaries.
In Figure 1 1 , a typical streaming flow is presented. To take into account all the other machines running in the cluster, a more detailed description is presented in Figure 12 and Figure 13. The distribution of key-value pairs through the platform is the same presented in the previous sections.
A mechanism for subscription to any queue in the platform allows clients to receive content and updates from any queue in the platform.
Latency improvement. Block Management
In the new stream oriented processing, as there is a continuous data flow in the operations chain, the ideal scenario would be to keep all data blocks in memory, and the reference to this memory is passed between operations. If there is enough memory, there is no requirement to download the data blocks to the local disk.
In Big Data, usually there will not be enough memory to handle all the data to be processed. So a new BlockManager module has been included in the system. This module optimizes and minimizes the disk usage, guaranteeing that all the data blocks needed to execute the next set of tasks, will be available in memory.
If there is enough memory in the system, data blocks are kept in memory without the need of being paged to disk. When the system experiences memory contention due to lack of memory, the BlockManager will page to disk the least useful data blocks. When these blocks are required again, the BlockManager will page them in from disk. In the best case scenario, data blocks don't need to be copied to disk, and in the worst case, these copies are optimized and minimized (Figure 14).
The BlockManager (Figure 15) keeps a list of the blocks in the node worker. This is sorted according to the information kept in the tasks scheduling; the blocks that will be needed by operations to be executed earlier and kept first. The last blocks in the list are scheduled to be downloaded to the local disk, with a low priority. When a task is ready to be run, the BlockManager guarantees that all the needed blocks are available in memory. If the block was copied to disk, and its memory freed, the BlockManager asks the Disk Manager to retrieve it. If there is not enough memory to copy it, already downloaded blocks will be scanned to free its associated memory, if not yet done.
Stream processing with state
MapReduce technology was oriented to solve problems in batch processing mode: to collect a huge amount of data, process them from scratch, and to obtain a result. But in many cases, data does not come from a static source, but rather they are a sample from a constantly changing source. In this model, the process must be rerun completely, with new and accumulated data, at a convenient rate (once a month, once a week...). Or the developer must incorporate the results of the previous batch by hand, when the job has completely finished, and the results have been exported to external storage. In either case, they are inefficient solutions, with a very long latency.
Many of the solutions relevant to Telco businesses could be represented as an internal state that is continuously adapted with new data. Based on the same principle for managing key-value pairs in the distributed platform, through hash group distribution, the stream processing method can be easily extended to incorporate a state that can be treated as a special kind of queue. The output state is a queue by a reduce operation that has 2 inputs, one which is the output from a previous cycle (Figure 16). As presented before, reduce operations with multiple inputs allow an easy implementation of JOINs. This same interpretation can be extended to update the state, because the reduce operation will automatically be called with the state information and the input data records with the same key.
This operation with internal state could be interpreted as a CEP system, with a node for each of the keys in the data (Figure 17). Each of these nodes must keep the value associated to its key. As already presented about CEP systems, this strategy would have scalability problems as the state (the number of possible keys) grows. One of the problems of implementing a state in a MapReduce like platform is the need to reevaluate the whole state with every wave of new input data. One extreme solution would be the strategy of rereading the state as an input (this one has other problems, mainly the very long latency, because result state must be completely computed by all node workers before being emitted as output and available as input). In the other extreme, there would be the solution of accessing the individual entry to be updated with every input key-value pair (this kind of solution would be the one used in Percolator system [8], or in CBP model [7]), through the use of some kind of indexing. This second option is more efficient when the state portion to be updated is a small fraction of the total state size. If not so, the overhead of index computing can be larger than updating the whole state.
For the majority of typical Telco problems, where state can be a user activity related description, a significant portion of users' state can be affected quite frequently (every 5 or 10 minutes). So a new intermediate solution has been designed to work optimally when a large amount of the state is updated, and to avoid inefficiency when a small set is updated. This is the scenario where the proposed innovation is focused.
Moreover, this innovative interpretation of the state as a single queue treated simultaneously as input and as output, together with the BlockManager activity, offers an optimal performance, as most blocks of the state will probably be always kept in memory (to the maximum memory size available), improving the overall latency, as there is no need stop the platform to consolidate state in disk (it will be done by the BlockManager when required disk operations allow so).
A similar solution to data distribution by hash-group, described before for the data managing across the platform, can be extended to organize state information. In every node worker, the state is split in a number of key hash-groups range, or Buckets (the range of hash-groups is already split among the node workers).
When an input data block arrives from a queue, it is assigned to the key's corresponding state Bucket (Figure 18), and is accumulated there until processed. In order to perform an efficient state update, it is convenient that the size of the data to be processed be at least similar to the size of the Bucket, so input data blocks are accumulated until the assigned memory has been filled.
But input data blocks usually contain a mix of key-value pairs corresponding to several state buckets (Figure 19) (though all of them must correspond to the portion of the state stored in the current node worker). The simplest solution is to send the input data block to all the buckets with at least a common key hash-group, where they will be automatically processed, through the reduce operation; outer key-value pairs will be ignored. This is proven to be faster the data blocks are kept in memory, for reference. But this strategy implies that each bucket, a lot (depending on the block creation and the number of buckets) of useless information is maintained, key-values that are not going to be processed in that bucket. As the number of buckets grows, the size of local key-values (as opposed to key- values that will be processed in another bucket) is reduced, for the same memory limit, so the performance impact increases.
When this situation is detected (by evaluating the relative size of local key-values to the total memory occupied by data blocks), a new module enters into play that breaks up and compacts the data blocks into groups of key-values whose hash-group correspond to just one bucket (Figure 20). In this way, at the cost of splitting the data block and holding an extra copy of the memory, input data in the buckets is compacted, and operations can be completed and the memory freed.
Initially, state is split into as many buckets as processors are defined in the platform (this parameter is common to every node worker). As they may grow, either because there are new keys, or because there are more information associated to the keys, a mechanism has been included to split large buckets into new ones, keeping their size under control, and allowing thus efficiency in update operations. When a bucket has grown and must be split, new incoming data and its update operation is stopped; new buckets are created, and the hash-group associated to the bucket is split.
There are different strategies to perform bucket splitting. If there is little variation in the size distribution of buckets and all the buckets are close to capacity, a general split operation is performed on all buckets (a fourfold split). All the state related operations are stopped. If just one bucket is too large, then just that bucket is split. Although just one bucket needs to be stopped, management is more complicated, and it is not so efficient. Embodiments of the Invention:
The invention has been implemented in a Big-Data platform, a MapReduce framework (in experimental prototype status). With this framework, different problems relevant to the Telco business and other typical problems have been programmed and evaluated. A few examples of problems implemented:
• Social Network Analysis (SNA): Analysis of CDRs to detect user communities.
• Top hits: Analysis of web navigation logs, in order to detect the most active hits per category, and recommend them according to the user profile.
· Navigation Traffic (OSN): Analysis of patterns navigation in mobile users, classified by type of URL, type of device, etc.
• Mobility: Analysis of CDRs to detect mobility patterns.
• Page Rank with incremental web crawling
• Processing and distribution of stream data flows
· Last known position based on mobile network handshake information
Advantages of the Invention
The invention is meant to solve, in a very efficient way, the problem of processing a continuous data flow, having results up-to-date, and without the need of accumulate and reprocess all data, as would be required in batch oriented MapReduce platforms.
This solution offers the best performance for these special types of applications, which cannot be solved efficiently with the MapReduce solutions available in the market.
The main advantages of the invention are:
1. Data flow is processed in stream mode, keeping latency as low as required, because latency scales down with the number of machines in the cluster.
2. Developer has control on the granularity and latency of the obtained results.
3. A client can subscribe to any queue for receiving its content and updates. 4. The internal state is continuously updated as new data is processed, without the need of store and reprocess all the accumulated data, as it would be required in a batch oriented MapReduce platform.
5. The state size can grow to the limit of the available global disk in the platform.
6. Memory and disk coordinated management guarantees that all memory is used efficiently to keep in memory the data blocks pronest to be used. 7. Operations are faster if input data are available in memory and output data may be kept also in memory, without going through disk.
ACRONYMS
ACID Atomicity, Consistency, Isolation, Durability
CBP Continuous Bulk Processing
CDR Call Data Record
CEP Complex Event Processing
BMS Database Management System
GFS Google File System
HDFS Apache Hadoop Distributed File System
MR Map-Reduce
NoSQL "No SQL" or "Not only SQL"
RDBMS Relational Database Management System
SNA Social Network Analysis
SQL Structured Query Language
URL Uniform Resource Locator
REFERENCES
[1] Brian Babcock et al, "Models and Issues in Data Stream Systems", Proceedings of the 21 st ACM SIGMOD-SIGACT-SIGART symposium on Principles of database systems, New York, 2002.
[2] Jeffrey Dean and Sanjay Ghemawat. "MapReduce: Simplified data processing on large clusters". In Proceedings of the 6th Symposium on Operating System Design and Implementation (OSDI 2004), pages 137-150, San Francisco, California, 2004.
[3] Cortes et al., "Hancock: A Language for Extracting Signatures from Data Streams", Proc. Sixth International Conference on Knowledge Discovery and Data Mining, Boston, 2000, pp. 9-17.
[4] Yunhong Gu, Robert Grossman, "Sector and Sphere: The Design and Implementation of a High Performance Data Cloud", 2008
[5] CONDIE, T., CONWAY, N., ALVARO, P., AND HELLERSTI EN, J. M. "MapReduce online". In 7th NSDI (2010).
[6] Boduo Li et al, "A Platform for Scalable One-Pass Analytics using MapReduce", in SIGMOD 2011 , Athens
[7] Dyonisios Logothetis et all, "Stateful Bulk Processing for Incremental Analytics", SoCC'10, Indianapolis, USA, 2010.
[8] Daniel Peng, Frank Dabek, "Large-scale Incremental Processing Using Distributed Transactions and Notifications", 2010
[9] F. Chang et al, "Bigtable: A distributed storage system for structured data", in 7th OSDI (Nov. 2006), pp 205-218
[10] Bugra Gedik et al, "SPADE: The System S Declarative Stream Processing Engine", in SIGMOD 2008, Vancouver
[11] L. Popa et al, "Dryadlnc: reusing work in large-scale computations", in HotCloud Workshop, 2009

Claims

Claims
\ - A method for streaming processing in a map and reduce distributed computing platform, said distributed computing platform comprising at least one cluster with a plurality of nodes with computing capacity, the method using information regarding state associated to at least reduce operations, characterized in that the method comprises generating said state as a result of a reduce operation performed by a node, in the form of an output queue, and using said state as an input queue of a subsequent reduce operation performed by said node, said output and input queue forming a single queue which is updated after the reduce operation processing.
2.- The method of claim 1 , comprising generating a plurality of states as a result of a corresponding plurality of reduce operations performed by a plurality of nodes, in the form of queue data, and using said states as inputs of subsequent reduce operations performed by the respective nodes.
3. - The method of claim 1 or 2, comprising storing at least part of the blocks forming said state in a local memory of the respective node, and accessing and retrieving said state blocks from at least said local memory for said subsequent reduce operations.
4. - The method of claim 3, comprising storing all of the blocks forming said state in the local memory of the respective node, and accessing and retrieving said state blocks from only said local memory for said subsequent reduce operations.
5.- The method of claim 3, comprising storing in a local disk of the respective node the rest of the blocks forming said state which have not been stored in the local memory, and accessing and retrieving said state blocks from both the local memory and the local disk, for said subsequent reduce operations.
6. - The method of any of the previous claims, comprising sharing said node or at least one of said plurality of nodes its respective state with other nodes, said other nodes performing reduce operations using said shared state as an input therefor.
7. - The method of any of the previous claims, comprising performing said stream processing by scheduling operations on data queues, and automatically executing them when there are input data available on the respective node.
8.- The method of claim 7, comprising controlling the data queues to specify a maximum latency.
9.- The method of claim 7 or 8, comprising controlling the data queues to specify if input data, including state, must be deleted after processing, or it must be kept for a posterior use.
10. - The method of any of the previous claims, comprising organizing said state as a partition of the key ranges.
1 1. - The method of any of the previous claims, comprising splitting the state into several buckets.
12.- The method of claim 1 1 when depending on claim 9, comprising assigning a time stamp to each of said bucket, and deleting unused state entries according to the time stamp associated to the bucket comprising part of the state.
13. - A distributed computing platform system for map and reduce streaming processing, said distributed computing platform comprising at least one cluster with a plurality of nodes with computing capacity and configured for performing at least reduce operations using information regarding state associated thereto, characterized in that at least one of said nodes comprises at least one input state queue and at least one output state queue, in that said at least one node is configured for generating said state as a result of a reduce operation performed by said at least one node and providing said result to said at least one state output queue, and in that said input and output state queues are interconnected forming a single queue such that there is a feedback from said output state queue to said input state queue.
14. - The system of claim 13, comprising a plurality of nodes, each comprising at least one input state queue and at least one output state queue, and configured for generating said state as a result of a reduce operation and providing said result to said at least one state output queue, wherein the input and output state queues of every node are interconnected forming a single queue such that there is a feedback from said output state queue to said input state queue.
15. - The system of claim 13 or 14, wherein said at least one node or at least part of said plurality of nodes comprise at least one local memory for storing/retrieving at least part of the blocks forming the state of the respective node.
16. - The system of claim 15, wherein said at least one node or at least part of said plurality of nodes comprise at least one local disk for storing/retrieving the rest of the blocks forming the state which have not been stored in the local memory.
17.- The system of claim 15 or 16, comprising a Block Manager module that optimizes resources by handling the storing/retrieving of state blocks in said local memory and/or said local disk, in order to keep active state data blocks in the local memory.
18.- The system of any of claims 13 to 17, wherein said at least one node or at least part of said plurality of nodes implements the method of any of claims 1 to 12.
PCT/EP2013/057300 2012-04-12 2013-04-08 Method and system for streaming processing in a map and reduce distributed computing platform WO2013153027A1 (en)

Applications Claiming Priority (2)

Application Number Priority Date Filing Date Title
ES201230550A ES2425447B1 (en) 2012-04-12 2012-04-12 METHOD AND PROCESSING SYSTEM IN CONTINUOUS FLOW IN A DISTRIBUTED MAPPING AND REDUCTION CALCULATION PLATFORM
ESP201230550 2012-04-12

Publications (1)

Publication Number Publication Date
WO2013153027A1 true WO2013153027A1 (en) 2013-10-17

Family

ID=48139915

Family Applications (1)

Application Number Title Priority Date Filing Date
PCT/EP2013/057300 WO2013153027A1 (en) 2012-04-12 2013-04-08 Method and system for streaming processing in a map and reduce distributed computing platform

Country Status (2)

Country Link
ES (1) ES2425447B1 (en)
WO (1) WO2013153027A1 (en)

Cited By (8)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US9075670B1 (en) 2014-08-01 2015-07-07 Pivotal Software, Inc. Stream processing with context data affinity
US9300712B2 (en) 2014-08-01 2016-03-29 Pivotal Software, Inc. Stream processing with context data affinity
US9473550B2 (en) 2014-05-13 2016-10-18 International Business Machines Corporation Multifusion of a stream operator in a streaming application
US9779266B2 (en) 2014-03-04 2017-10-03 International Business Machines Corporation Generation of analysis reports using trusted and public distributed file systems
US10083224B2 (en) 2016-03-15 2018-09-25 International Business Machines Corporation Providing global metadata in a cluster computing environment
US10102029B2 (en) 2015-06-30 2018-10-16 International Business Machines Corporation Extending a map-reduce framework to improve efficiency of multi-cycle map-reduce jobs
CN110378550A (en) * 2019-06-03 2019-10-25 东南大学 The processing method of the extensive food data of multi-source based on distributed structure/architecture
CN112948432A (en) * 2019-12-11 2021-06-11 中国电信股份有限公司 Data processing method and data processing device

Families Citing this family (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN109815064B (en) * 2019-01-04 2023-10-27 平安科技(深圳)有限公司 Node isolation method, node isolation device, node equipment and computer readable storage medium

Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US7650331B1 (en) 2004-06-18 2010-01-19 Google Inc. System and method for efficient large-scale data processing

Patent Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US7650331B1 (en) 2004-06-18 2010-01-19 Google Inc. System and method for efficient large-scale data processing

Non-Patent Citations (15)

* Cited by examiner, † Cited by third party
Title
ARASU ARVIND ET AL: "Stream: The Stanford data stream management system", 1 January 2004 (2004-01-01), XP055071477, Retrieved from the Internet <URL:http://ilpubs.stanford.edu:8090/641/1/2004-20.pdf> [retrieved on 20130716] *
BODUO LI ET AL.: "A Platform for Scalable One-Pass Analytics using MapReduce", SIGMOD, 2011
BRIAN BABCOCK ET AL.: "Models and Issues in Data Stream Systems", PROCEEDINGS OF THE 21ST ACM SIGMOD-SIGACT-SIGART SYMPOSIUM ON PRINCIPLES OF DATABASE SYSTEMS, 2002
BUGRA GEDIK ET AL.: "SPADE: The System S Declarative Stream Processing Engine", SIGMOD, 2008
CONDIE, T.; CONWAY, N.; ALVARO, P.; HELLERSTIEN, J. M.: "MapReduce online", 7TH NSDI, 2010
CORTES ET AL.: "Hancock: A Language for Extracting Signatures from Data Streams", PROC. SIXTH INTERNATIONAL CONFERENCE ON KNOWLEDGE DISCOVERY AND DATA MINING, 2000, pages 9 - 17
DANIEL PENG; FRANK DABEK, LARGE-SCALE INCREMENTAL PROCESSING USING DISTRIBUTED TRANSACTIONS AND NOTIFICATIONS, 2010
DIONYSIOS LOGOTHETIS ET AL: "Stateful bulk processing for incremental analytics", PROCEEDINGS OF THE 1ST ACM SYMPOSIUM ON CLOUD COMPUTING, SOCC '10, 1 January 2010 (2010-01-01), New York, New York, USA, pages 51, XP055071340, ISBN: 978-1-45-030036-0, DOI: 10.1145/1807128.1807138 *
DYONISIOS LOGOTHETIS: "Stateful Bulk Processing for Incremental Analytics", SOCC'10, 2010
F. CHANG ET AL.: "Bigtable: A distributed storage system for structured data", 7TH OSDI, November 2006 (2006-11-01), pages 205 - 218
JEFFREY DEAN; SANJAY GHEMAWAT: "MapReduce: Simplified data processing on large clusters", PROCEEDINGS OF THE 6TH SYMPOSIUM ON OPERATING SYSTEM DESIGN AND IMPLEMENTATION, 2004, pages 137 - 150
L. POPA ET AL.: "Dryadlnc: reusing work in large-scale computations", HOTCLOUD WORKSHOP, 2009
MADDEN S ET AL: "Fjording the stream: an architecture for queries over streaming sensor data", PROCEEDINGS 18TH. INTERNATIONAL CONFERENCE ON DATA ENGINEERING. (ICDE'2002). SAN JOSE, CA, FEB. 26 - MARCH 1, 2002; [INTERNATIONAL CONFERENCE ON DATA ENGINEERING. (ICDE)], LOS ALAMITOS, CA : IEEE COMP. SOC, US, vol. CONF. 18, 26 February 2002 (2002-02-26), pages 555 - 566, XP010588275, ISBN: 978-0-7695-1531-1, DOI: 10.1109/ICDE.2002.994774 *
PRAMOD BHATOTIA ET AL: "Incoop", CLOUD COMPUTING, ACM, 2 PENN PLAZA, SUITE 701 NEW YORK NY 10121-0701 USA, 26 October 2011 (2011-10-26), pages 1 - 14, XP058005041, ISBN: 978-1-4503-0976-9, DOI: 10.1145/2038916.2038923 *
YUNHONG GU; ROBERT GROSSMAN, SECTOR AND SPHERE: THE DESIGN AND IMPLEMENTATION OF A HIGH PERFORMANCE DATA CLOUD, 2008

Cited By (14)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US9779266B2 (en) 2014-03-04 2017-10-03 International Business Machines Corporation Generation of analysis reports using trusted and public distributed file systems
US9971810B2 (en) 2014-05-13 2018-05-15 International Business Machines Corporation Multifusion of a stream operator in a streaming application
US9473550B2 (en) 2014-05-13 2016-10-18 International Business Machines Corporation Multifusion of a stream operator in a streaming application
US9614740B2 (en) 2014-05-13 2017-04-04 International Business Machines Corporation Multifusion of a stream operator in a streaming application
US9680718B2 (en) 2014-05-13 2017-06-13 International Business Machines Corporation Multifusion of a stream operator in a streaming application
US9986002B2 (en) 2014-05-13 2018-05-29 International Business Machines Corporation Multifusion of a stream operator in a streaming application
US9300712B2 (en) 2014-08-01 2016-03-29 Pivotal Software, Inc. Stream processing with context data affinity
US9075670B1 (en) 2014-08-01 2015-07-07 Pivotal Software, Inc. Stream processing with context data affinity
US10102029B2 (en) 2015-06-30 2018-10-16 International Business Machines Corporation Extending a map-reduce framework to improve efficiency of multi-cycle map-reduce jobs
US10083224B2 (en) 2016-03-15 2018-09-25 International Business Machines Corporation Providing global metadata in a cluster computing environment
US10083221B2 (en) 2016-03-15 2018-09-25 International Business Machines Corporation Providing global metadata in a cluster computing environment
CN110378550A (en) * 2019-06-03 2019-10-25 东南大学 The processing method of the extensive food data of multi-source based on distributed structure/architecture
CN112948432A (en) * 2019-12-11 2021-06-11 中国电信股份有限公司 Data processing method and data processing device
CN112948432B (en) * 2019-12-11 2023-10-13 天翼云科技有限公司 Data processing method and data processing device

Also Published As

Publication number Publication date
ES2425447A2 (en) 2013-10-15
ES2425447B1 (en) 2015-02-13
ES2425447R1 (en) 2014-04-23

Similar Documents

Publication Publication Date Title
To et al. A survey of state management in big data processing systems
Heidari et al. Scalable graph processing frameworks: A taxonomy and open challenges
Dobre et al. Parallel programming paradigms and frameworks in big data era
Khalifa et al. The six pillars for building big data analytics ecosystems
Zhang et al. Parallel processing systems for big data: a survey
Zaharia et al. Spark: Cluster computing with working sets
Behm et al. Asterix: towards a scalable, semistructured data platform for evolving-world models
Sakr et al. Big data 2.0 processing systems
Crotty et al. Tupleware:" Big" Data, Big Analytics, Small Clusters.
WO2013153027A1 (en) Method and system for streaming processing in a map and reduce distributed computing platform
Humbetov Data-intensive computing with map-reduce and hadoop
Vargas-Solar et al. Big data management: what to keep from the past to face future challenges?
Sinthong et al. Aframe: Extending dataframes for large-scale modern data analysis
Birjali et al. Evaluation of high-level query languages based on MapReduce in Big Data
Perera et al. Supercharging distributed computing environments for high-performance data engineering
WO2013153029A1 (en) Method and system for managing and processing data in a distributed computing platform
Chan et al. A distributed stream library for Java 8
Middleton Data-intensive technologies for cloud computing
Jayalath et al. Efficient geo-distributed data processing with rout
Sinthong et al. AFrame: Extending DataFrames for large-scale modern data analysis (Extended Version)
Sakr General-purpose big data processing systems
Cardinale et al. Classifying big data analytic approaches: A generic architecture
Demchenko et al. Big Data Algorithms, MapReduce and Hadoop ecosystem
Li Introduction to Big Data
Ventocilla Big data programming with Apache spark

Legal Events

Date Code Title Description
121 Ep: the epo has been informed by wipo that ep was designated in this application

Ref document number: 13717002

Country of ref document: EP

Kind code of ref document: A1

NENP Non-entry into the national phase

Ref country code: DE

122 Ep: pct application non-entry in european phase

Ref document number: 13717002

Country of ref document: EP

Kind code of ref document: A1