CN109726225A - A Storm-based Distributed Streaming Data Storage and Query Method - Google Patents
A Storm-based Distributed Streaming Data Storage and Query Method Download PDFInfo
- Publication number
- CN109726225A CN109726225A CN201910026601.2A CN201910026601A CN109726225A CN 109726225 A CN109726225 A CN 109726225A CN 201910026601 A CN201910026601 A CN 201910026601A CN 109726225 A CN109726225 A CN 109726225A
- Authority
- CN
- China
- Prior art keywords
- query
- data
- subquery
- server
- key
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Granted
Links
Classifications
-
- Y—GENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
- Y02—TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
- Y02D—CLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
- Y02D10/00—Energy efficient computing, e.g. low power processors, power management or thermal management
Landscapes
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
- Information Transfer Between Computers (AREA)
Abstract
本发明提供一种基于Storm的分布式流数据存储与查询方法,本发明基于Storm数据流式计算框架,CEPHFS作为数据底层存储系统下,通过对分布式流式数据的特征分析,对数据进行实时的分区与索引构建,将分区好的数据块压缩存入CEPHFS。查找操作时根据数据块的key与temporal两个维度的属性,将查询分解为对应的子查询,并通过bloomFilter方法只读取可能含有所需数据的文件,由predicate选择出符合条件的数据,提交子查询结果合并后进行aggregate操作,返回给用户。充分利用计算资源来提高数据存储与查询的效率。本发明具有应用场景广泛、低时延、负载均衡的特点,并且能够实现高速存储。
The present invention provides a method for storing and querying distributed stream data based on Storm. The present invention is based on the Storm data stream computing framework, and CEPHFS is used as the data underlying storage system to perform real-time data analysis by analyzing the characteristics of distributed stream data. The partition and index construction are performed, and the partitioned data blocks are compressed and stored in CEPHFS. During the search operation, according to the attributes of the key and temporal dimensions of the data block, the query is decomposed into corresponding sub-queries, and only the files that may contain the required data are read through the bloomFilter method, and the qualified data is selected by the predicate and submitted. After the sub-query results are merged, the aggregate operation is performed and returned to the user. Make full use of computing resources to improve the efficiency of data storage and query. The invention has the characteristics of wide application scenarios, low time delay, load balance, and can realize high-speed storage.
Description
技术领域technical field
本发明涉及数据处理技术领域,尤其是一种基于Storm的分布式流数据存储与查询方法。The invention relates to the technical field of data processing, in particular to a method for storing and querying distributed stream data based on Storm.
背景技术Background technique
随着网络技术的迅猛发展,社交网络和位置服务平台等所产生的实时流数据高速增长,在越来越多的领域出现了对海量流数据进行实时处理响应的要求,使得数据的高速插入和实时查找成为一个非常重要的数据处理能力,用户能够实时获得想要的历史数据和新数据。对于提供位置业务的平台如百度地图,高德地图等,每秒都瞬时产生了海量的位置信息和轨迹变化数据,为了能够满足用户的需求和提高公司效益,平台系统需要能够支持百万级流数据上的实时插入存储与低延时查询,例如客户需要获取当前时刻附近5km范围内所有车辆的GPS信息,或者是指定某辆车在过去1小时内的行驶轨迹。With the rapid development of network technology and the rapid growth of real-time streaming data generated by social networks and location-based service platforms, the requirement for real-time processing and response to massive streaming data has emerged in more and more fields. Real-time search has become a very important data processing capability, and users can obtain desired historical data and new data in real time. For platforms that provide location services, such as Baidu Maps, AutoNavi Maps, etc., massive amounts of location information and trajectory change data are instantaneously generated every second. In order to meet the needs of users and improve company benefits, the platform system needs to be able to support millions of streams. Real-time insertion storage and low-latency query on data. For example, customers need to obtain GPS information of all vehicles within 5km of the current moment, or specify the driving trajectory of a vehicle in the past 1 hour.
常用的key-value存储技术开源实现如HBase使用LSM-Tree来减少更新叶节点带来的时间开销,但是每次插入的新数据和历史数据需要进行更新,在time范围的查询时延过高;常用的时间序列数据库技术如阿里巴巴开源的Druid仅支持倒排索引,在key范围查询上比较低效。为了解决这一问题,必然要设计一个针对海量流数据能进行高速存储和实时查询的分布式数据库技术,key范围和时间范围上都支持高效的查询,这就要求新来数据和历史数据能有所分隔,在查询时尽量避免无关范围数据的遍历,提高查询效率,同时保证系统不同节点的负载均衡,来最大化资源的利用率。Common open-source implementations of key-value storage technologies, such as HBase, use LSM-Tree to reduce the time overhead caused by updating leaf nodes, but new data and historical data inserted each time need to be updated, and the query delay in the time range is too high; Commonly used time series database technologies such as Alibaba's open-source Druid only support inverted indexes, which are inefficient for key range queries. In order to solve this problem, it is necessary to design a distributed database technology that can perform high-speed storage and real-time query for massive stream data. Both key range and time range support efficient query, which requires new data and historical data to have Separated, try to avoid the traversal of irrelevant data when querying, improve query efficiency, and ensure the load balance of different nodes in the system to maximize resource utilization.
发明内容SUMMARY OF THE INVENTION
针对现有技术的不足,本发明提供一种基于Storm的分布式流数据存储与查询方法,本发明分析流数据在真实环境下会按照接近顺序到达和数据分布范围的稳定性特点,和现在数据库技术中无法满足key范围和时间范围上高效的问题,提供了一种海量流数据下的高效索引和时间域范围实时查询处理方法。本发明旨在通过对于即将到来的流数据进行范围划分,在不同机器节点并行索引后分别存储到分布式文件系统,查询时进行查询分解,并行执行子查询,过滤,聚合等操作后,合并结果返回。In view of the deficiencies of the prior art, the present invention provides a method for storing and querying distributed stream data based on Storm. The present invention analyzes the stability characteristics that stream data will arrive in close order and data distribution range in a real environment, and the current database The technology cannot meet the high-efficiency problem of key range and time range, and provides a real-time query processing method for efficient indexing and time domain range under massive flow data. The purpose of the invention is to divide the range of the upcoming stream data, store them in the distributed file system after parallel indexing on different machine nodes, perform query decomposition during query, execute sub-query, filter, aggregate and other operations in parallel, and merge the results. return.
本发明的技术方案为:一种基于Storm的分布式流数据存储与查询方法,本发明通过在接收分布式流数据时实时建立若干隔离范围的B+Tree索引,达到阈值后存储到分布式文件系统,并在查询时进行查询分解,并行处理不同范围下的子查询,保持负载均衡,完成后合并返回实时存储结果,实现高吞吐量的流数据插入和查询,具体包括以下步骤:The technical scheme of the present invention is as follows: a method for storing and querying distributed stream data based on Storm, the present invention establishes several B+Tree indexes with isolated ranges in real time when receiving distributed stream data, and stores them in distributed files after reaching a threshold value. system, and perform query decomposition when querying, process sub-queries in different scopes in parallel, maintain load balance, merge and return real-time storage results after completion, and realize high-throughput streaming data insertion and query, including the following steps:
S1)、接收源数据并分发给下游单元构建索引结构;S1), receive source data and distribute to downstream units to build an index structure;
S2)、将索引结构压缩为数据块并写入分布式文件存储系统CEPHFS;S2), compress the index structure into data blocks and write into the distributed file storage system CEPHFS;
S3)、基于查询条件和数据块信息将查询分解为若干独立的子查询;S3), decompose the query into several independent sub-queries based on query conditions and data block information;
S4)、通过访问分布式文件存储系统CEPHFS分发给下游独立的查询处理单元的子查询;S4), distribute to the sub-query of the downstream independent query processing unit by accessing the distributed file storage system CEPHFS;
S5)、接收返回的子查询结果并合并返回给用户。S5), receive the returned sub-query results and combine them and return them to the user.
进一步的,步骤S1)中,流数据存储系统接收的每个源数据为数据元祖,定义为d={dk,dt,dr},其中,dk是元祖的主键,dt是时间属性,dr是元祖的其他属性值,K和T定义了一个主键和时间域的二维空间D=(K,T);主键范围固定,时间范围不断增加,主键K区间表示为K(k-,k+),时间域T区间表示为T(t-,t+),根据两个区间建立唯一的矩形r≤K,T≥{(k,t)∈R|k∈K,t∈T}。Further, in step S1), each source data received by the streaming data storage system is a data tuple, which is defined as d={d k , d t , d r }, where d k is the primary key of the tuple, and d t is the time Attribute, d r is other attribute values of the tuple, K and T define a two-dimensional space of primary key and time domain D=(K, T); the primary key range is fixed, the time range is increasing, the primary key K interval is expressed as K(k -,k+), the time domain T interval is expressed as T(t-,t+), and a unique rectangle r≤K is established according to the two intervals, T≥{(k,t)∈R|k∈K,t∈T} .
进一步的,将矩形r≤K,T≥{(k,t)∈R|k∈K,t∈T}范围内的数据元组写入唯一对应的模板B+Tree中,key作为索引,内存中达到阈值chunkSize大小的模板B+Tree以chunk形式存储到分布式文件系统,chunk由key数组和数据数组组成,key数组存储顺序的key值,包括一个指向数据数组的偏移量。Further, write the data tuples within the range of the rectangle r≤K, T≥{(k,t)∈R|k∈K,t∈T} into the unique corresponding template B+Tree, the key is used as the index, the memory The template B+Tree that reaches the threshold chunkSize size is stored in the distributed file system in the form of chunks. The chunk consists of a key array and a data array. The key array stores the sequential key values, including an offset to the data array.
进一步的,基于二维空间D=(K,T),流数据存储系统的查询条件可以定义为一个三元组q={Kq,Tq,fq},Kq,Tq是在主键和时间域上的条件选择范围,查询区间切分为一个r≤K,T≥{(k,t)∈R|k∈Kq,t∈Tq},fq:t->{true,false}是用户自定义的条件过滤函数,用来判断是否满足用户的选择。Further, based on the two-dimensional space D=(K, T), the query condition of the streaming data storage system can be defined as a triple q={K q , T q , f q }, where K q , T q are in the primary key and the conditional selection range in the time domain, the query interval is divided into a r≤K, T≥{(k,t)∈R|k∈K q ,t∈T q },f q :t->{true, false} is a user-defined conditional filter function to determine whether the user's choice is satisfied.
进一步的,基于不同子查询服务器Subquery Server节点存储的文件块不同和缓存的模板B+Tree叶节点不同,实现查询分解调度的算法,计算出子查询服务器SubqueryServer对各个未处理的子查询优先级队列进行查询分配,直到未处理子查询集合为空,并将最近查询的叶节点数据写入缓存,实现查询分配的缓存局部性,数据块局部性和负载均衡;具体算法过程如下:Further, based on the different file blocks stored by different subquery server Subquery Server nodes and the different cached template B+Tree leaf nodes, the algorithm of query decomposition and scheduling is implemented, and the priority queue of each unprocessed subquery by the subquery server SubqueryServer is calculated. Query allocation is performed until the set of unprocessed subqueries is empty, and the recently queried leaf node data is written into the cache to achieve cache locality, data block locality and load balancing of query allocation. The specific algorithm process is as follows:
对S(qi)和进行洗牌,若S(qi)在前,则将两者拼接为新数组其中,下标小代表优先级高,将的元素包含优先级分别加入每个子查询服务器Subquery Server的子查询优先级队列中,所有的qi都处理完后,对子查询服务器Subquery Server的优先级队列依次取出优先级最高而且未处理过的qi进行分配,直到所有qi分配完毕,其中,S(qi)代表存有qi范围数据的子查询服务器Subquery Server数组,代表其余子查询服务器Subquery Server的数组,qi∈q代表一次查询分解后的子查询。For S(q i ) and Shuffle, if S(q i ) comes first, splicing the two into a new array Among them, the smaller the subscript represents the higher priority, the The element containing the priority is added to the subquery priority queue of each subquery server Subquery Server, after all qi are processed, the priority queue of the subquery server Subquery Server is taken out in turn with the highest priority and unprocessed. q i is allocated until all q i are allocated, where S(q i ) represents the Subquery Server array that stores the qi range data, Represents the array of other subquery servers Subquery Server, q i ∈ q represents the subquery after a query is decomposed.
进一步的,步骤S2)中,索引结构为树形索引结构,树形索引结构大小在超过指定的阈值后,通过Snappy算法将叶子节点中的数据元祖进行压缩,以数据块的形式写入到分布式文件存储系统CEPHFS中永久存储,并将数据块的元祖主键,时间域范围相关的元数据记录到元数据管理器metadata keeper;根据流数据key主键域会在一定的范围内进行变化,而时间域会不断增长的特性,将树形索引结构的非叶子节点部分进行保留为模板,以方便在下次构建时直接使用索引模板,避免像构建B+树一样进行节点的分裂,造成很大的时间开销。Further, in step S2), the index structure is a tree-shaped index structure, and after the size of the tree-shaped index structure exceeds a specified threshold, the data element ancestors in the leaf nodes are compressed by the Snappy algorithm, and are written into the distribution in the form of data blocks. It is permanently stored in CEPHFS, a file storage system, and records the metadata of the data block's ancestral primary key and the metadata related to the time domain to the metadata manager metadata keeper; The domain will continue to grow, and the non-leaf node part of the tree index structure is reserved as a template, so that the index template can be used directly in the next construction, avoiding the splitting of nodes like building a B+ tree, causing a lot of time overhead .
进一步的,步骤S3)中,基于查询条件和数据块信息将查询分解为若干独立的子查询,具体包括以下步骤:Further, in step S3), the query is decomposed into several independent sub-queries based on the query conditions and the data block information, which specifically includes the following steps:
S301)、查询调度器query dispatcher根据用户提供的查询条件中主键和时间域范围,读取元数据管理器(metadata keeper)中的数据块元数据信息进行对比,将查询区域划分为一系列二维的索引区域;S301), the query dispatcher, according to the primary key and the time domain range in the query condition provided by the user, reads the metadata information of the data block in the metadata manager (metadata keeper) for comparison, and divides the query area into a series of two-dimensional the index area;
S302)、基于用户提供的等值判断条件,通过布隆过滤器bloomFilter方法过滤掉一定不含有目标数据元祖的子查询区域;S302), based on the equivalent judgment condition provided by the user, filter out the sub-query area that must not contain the target data element ancestor by the bloom filter bloomFilter method;
S303)、只将可能含有目标数据元祖的子查询分发给下游独立的子查询服务器Subquery Server。S303), only distribute the subquery that may contain the target data element ancestor to the downstream independent subquery server Subquery Server.
进一步的,步骤S4)中,通过访问分布式文件存储系统CEPHFS分发给下游独立的查询处理单元的子查询,具体包括以下步骤:Further, in step S4), the sub-query distributed to the downstream independent query processing unit by accessing the distributed file storage system CEPHFS specifically includes the following steps:
S401)、子查询服务器Subquery Server并行读取分布式文件存储系统CephFs中与子查询对应的数据块,先读取数据块中索引结构的模板部分,得到叶节点对于所有叶节点的相对offset和分组压缩后的offset,计算得到可能包含目标key范围的一系列叶子节点offset;S401), the subquery server Subquery Server reads the data block corresponding to the subquery in the distributed file storage system CephFs in parallel, first reads the template part of the index structure in the data block, and obtains the relative offset and grouping of the leaf node to all leaf nodes The compressed offset is calculated to obtain a series of leaf node offsets that may include the target key range;
S402)、基于offset读取数据块文件中索引结构的叶节点部分,通过Snappy算法解压得到的叶节点分组数据块字节,反序列化为叶节点,并做时间范围和等值条件上的过滤;S402), read the leaf node part of the index structure in the data block file based on the offset, decompress the obtained leaf node group data block bytes through the Snappy algorithm, deserialize it into a leaf node, and filter on the time range and equivalent conditions ;
S403)、对过滤后的一系列数据元祖进行aggregate操作,序列化后发送到查询调度器query dispatcher。S403), perform an aggregate operation on the filtered series of data primitives, serialize them, and send them to the query dispatcher.
本发明的有益效果为:The beneficial effects of the present invention are:
1、本发明应用场景广泛,分布式流数据处理应用如通信公司监测分析网络流量,位置联网平台车流量轨迹变化,电商平台节假日实时成交指标等,实现海量数据的数据实时传输处理。1. The present invention has a wide range of application scenarios, such as distributed stream data processing applications such as communication companies monitoring and analyzing network traffic, location networking platform vehicle traffic trajectory changes, e-commerce platform holiday real-time transaction indicators, etc., to achieve real-time data transmission and processing of massive data.
2、本发明能够实现高速存储,本发明利用高效的数据划分方式将新到数据和历史数据分隔开,利用数据范围稳定性特点,通过保留索引模板构建B+Tree索引,避免树节点分裂带来的大量时间消耗。2. The present invention can realize high-speed storage. The present invention uses an efficient data division method to separate new data and historical data, and uses the characteristics of data range stability to build a B+Tree index by retaining an index template to avoid tree node splitting. A lot of time consuming to come.
3、本发明具有低时延的特点,对查询条件进行范围切分后,只访问元信息可能符合查询范围的文件,并行处理过滤,聚合等关键操作,并实现缓存局部性和文件局部性,提高查询效率。3. The present invention has the characteristics of low latency. After the range of query conditions is segmented, only files whose meta-information may meet the query range are accessed, and key operations such as filtering and aggregation are processed in parallel, and cache locality and file locality are realized. Improve query efficiency.
4、本发明负载均衡,通过设计的查询调度算法对分解的子查询进行分配到不同节点,充分利用系统资源。4. The load balancing of the present invention distributes the decomposed sub-queries to different nodes through the designed query scheduling algorithm, and makes full use of system resources.
附图说明Description of drawings
图1为本发明的流程示意图;Fig. 1 is the schematic flow chart of the present invention;
图2为本发明分布式流数据存储数据块的结构图;Fig. 2 is the structure diagram of distributed stream data storage data block of the present invention;
图3为本发明分布式流数据存储数据块中叶节点的内部结构图;Fig. 3 is the internal structure diagram of the leaf node in the distributed stream data storage data block of the present invention;
图4为本发明分布式流数据查询分解调度图。FIG. 4 is a diagram of decomposition and scheduling of distributed stream data query according to the present invention.
具体实施方式Detailed ways
下面结合附图对本发明的具体实施方式作进一步说明:The specific embodiments of the present invention will be further described below in conjunction with the accompanying drawings:
如图1所示,一种基于Storm的分布式流数据存储与查询方法,本发明通过在接收分布式流数据时实时建立若干隔离范围的B+Tree索引,达到阈值后存储到分布式文件系统,并在查询时进行查询分解,并行处理不同范围下的子查询,保持负载均衡,完成后合并返回实时存储结果,实现高吞吐量的流数据插入和查询,具体包括以下步骤:As shown in Figure 1 , a method for storing and querying distributed stream data based on Storm, the present invention establishes several B+Tree indexes with isolated ranges in real time when receiving distributed stream data, and stores them in a distributed file system after reaching a threshold value. , and perform query decomposition when querying, process sub-queries under different scopes in parallel, maintain load balance, merge and return real-time storage results after completion, and realize high-throughput streaming data insertion and query. The specific steps include the following steps:
S1)、接收源数据并分发给下游单元构建索引结构;S1), receive source data and distribute to downstream units to build an index structure;
其中,流数据存储系统接收的每个源数据称为数据元祖,并定义为d={dk,dt,dr},其中,dk是元祖的主键,dt是时间属性,dr是元祖的其他属性值,K和T定义了一个主键和时间域的二维空间D=(K,T);主键范围固定,时间范围不断增加,主键K区间表示为K(k-,k+),时间域T区间表示为T(t-,t+),根据两个区间建立唯一的矩形:Among them, each source data received by the streaming data storage system is called a data tuple, and is defined as d={d k , d t , d r }, where d k is the primary key of the tuple, d t is the time attribute, and d r are other attribute values of the tuple, K and T define a two-dimensional space of primary key and time domain D=(K, T); the primary key range is fixed, the time range is increasing, the primary key K interval is expressed as K(k-,k+) , the time domain T interval is represented as T(t-,t+), and a unique rectangle is established according to the two intervals:
r≤K,T≥{(k,t)∈R|k∈K,t∈T};r≤K, T≥{(k,t)∈R|k∈K,t∈T};
将矩形r≤K,T≥{(k,t)∈R|k∈K,t∈T}范围内的数据元组写入唯一对应的模板B+Tree中,key作为索引,当内存中达到阈值chunkSize大小的模板B+Tree以数据块datachunk形式存储到分布式文件系统,chunk由key数组和数据数组组成,key数组存储顺序的key值,包括一个指向数据数组的偏移量。Write the data tuple within the range of the rectangle r≤K,T≥{(k,t)∈R|k∈K,t∈T} into the unique corresponding template B+Tree, the key is used as the index, when the memory reaches The template B+Tree with the threshold chunkSize size is stored in the distributed file system in the form of data chunks. The chunk consists of a key array and a data array. The key array stores the sequential key values, including an offset to the data array.
基于二维空间D=(K,T),流数据存储系统的查询条件可以定义为一个三元组q={Kq,Tq,fq},Kq,Tq是在主键和时间域上的条件选择范围,查询区间切分为一个r≤K,T≥{(k,t)∈R|k∈Kq,t∈Tq},fq:t->{true,false}是用户自定义的条件过滤函数,用来判断是否满足用户的选择。Based on the two-dimensional space D=(K, T), the query condition of the streaming data storage system can be defined as a triple q={K q , T q , f q }, where K q , T q are in the primary key and time domain On the conditional selection range, the query interval is divided into a r≤K,T≥{(k,t)∈R|k∈K q ,t∈T q },f q :t->{true,false} is User-defined conditional filter function to determine whether the user's choice is satisfied.
基于不同子查询服务器Subquery Server节点存储的文件块不同和缓存的模板B+Tree叶节点不同,实现查询分解调度的算法,计算出子查询服务器Subquery Server对各个未处理的子查询优先级队列进行查询分配,直到未处理子查询集合为空,并将最近查询的叶节点数据写入缓存,实现查询分配的缓存局部性,数据块局部性和负载均衡;具体算法过程如下:Based on the different file blocks stored in the Subquery Server nodes of different subquery servers and the difference in the cached template B+Tree leaf nodes, the query decomposition and scheduling algorithm is implemented, and the subquery server Subquery Server is calculated to query each unprocessed subquery priority queue. Allocate until the set of unprocessed subqueries is empty, and write the recently queried leaf node data into the cache to achieve cache locality, data block locality and load balancing of query allocation; the specific algorithm process is as follows:
对S(qi)和进行洗牌,若S(qi)在前,则将两者拼接为新数组其中,下标小代表优先级高,将的元素包含优先级分别加入每个子查询服务器Subquery Server的子查询优先级队列中,所有的qi都处理完后,对子查询服务器Subquery Server的优先级队列依次取出优先级最高而且未处理过的qi进行分配,直到所有qi分配完毕,其中,S(qi)代表存有qi范围数据的子查询服务器(Subquery Server)数组,代表其余子查询服务器Subquery Server的数组,qi∈q代表一次查询分解后的子查询。For S(q i ) and Shuffle, if S(q i ) comes first, splicing the two into a new array Among them, the smaller the subscript represents the higher priority, the The element containing the priority is added to the subquery priority queue of each subquery server Subquery Server, after all qi are processed, the priority queue of the subquery server Subquery Server is taken out in turn, the highest priority and unprocessed ones are taken out. q i is allocated until all q i are allocated, where S(q i ) represents an array of Subquery Servers (Subquery Servers) that store qi range data, Represents the array of other subquery servers Subquery Server, q i ∈ q represents the subquery after a query is decomposed.
S2)、将索引结构压缩为数据块并写入分布式文件存储系统CEPHFS;其中,S2), compress the index structure into data blocks and write into the distributed file storage system CEPHFS; wherein,
索引结构为树形索引结构,树形索引结构大小在超过指定的阈值后,通过Snappy算法将叶子节点中的数据元祖进行压缩,以数据块的形式写入到分布式文件存储系统CEPHFS中永久存储,并将数据块的元祖主键,时间域范围相关的元数据记录到元数据管理器metadata keeper;根据流数据key主键域会在一定的范围内进行变化,而时间域会不断增长的特性,将树形索引结构的非叶子节点部分进行保留为模板,以方便在下次构建时直接使用索引模板,避免像构建B+树一样进行节点的分裂,造成很大的时间开销。The index structure is a tree index structure. After the size of the tree index structure exceeds the specified threshold, the data element ancestors in the leaf nodes are compressed by the Snappy algorithm, and written in the form of data blocks to the distributed file storage system CEPHFS for permanent storage. , and record the metadata related to the primary key of the data block and the time domain to the metadata manager metadata keeper; according to the stream data, the key primary key domain will change within a certain range, and the time domain will continue to grow. The non-leaf node part of the tree index structure is reserved as a template, so that the index template can be directly used in the next construction, and the node splitting is avoided like building a B+ tree, which causes a lot of time overhead.
S3)、基于查询条件和数据块信息将查询分解为若干独立的子查询,具体包括以下步骤:S3), decompose the query into several independent sub-queries based on the query conditions and data block information, which specifically includes the following steps:
S301)、查询调度器query dispatcher根据用户提供的查询条件中主键和时间域范围,读取元数据管理器metadata keeper中的数据块元数据信息进行对比,将查询区域划分为一系列二维的索引区域;S301), the query dispatcher, according to the primary key and the time domain range in the query conditions provided by the user, reads the data block metadata information in the metadata manager metadata keeper for comparison, and divides the query area into a series of two-dimensional indexes area;
S302)、基于用户提供的等值判断条件,通过布隆过滤器(bloomFilter)方法过滤掉一定不含有目标数据元祖的子查询区域;S302), based on the equivalent judgment condition provided by the user, filter out the sub-query area that must not contain the target data element ancestor by the bloom filter method;
S303)、只将可能含有目标数据元祖的子查询分发给下游独立的子查询服务器Subquery Server。S303), only distribute the subquery that may contain the target data element ancestor to the downstream independent subquery server Subquery Server.
S4)、通过访问分布式文件存储系统CEPHFS分发给下游独立的查询处理单元的子查询,具体包括以下步骤:S4), distribute the sub-query distributed to the downstream independent query processing unit by accessing the distributed file storage system CEPHFS, which specifically includes the following steps:
S401)、子查询服务器Subquery Server并行读取分布式文件存储系统CephFs中与子查询对应的数据块,先读取数据块中索引结构的模板部分,得到叶节点对于所有叶节点的相对offset和分组压缩后的offset,计算得到可能包含目标key范围的一系列叶子节点offset;S401), the subquery server Subquery Server reads the data block corresponding to the subquery in the distributed file storage system CephFs in parallel, first reads the template part of the index structure in the data block, and obtains the relative offset and grouping of the leaf node to all leaf nodes The compressed offset is calculated to obtain a series of leaf node offsets that may include the target key range;
S402)、基于offset读取数据块文件中索引结构的叶节点部分,通过Snappy算法解压得到的叶节点分组数据块字节,反序列化为叶节点,并做时间范围和等值条件上的过滤;S402), read the leaf node part of the index structure in the data block file based on the offset, decompress the obtained leaf node group data block bytes through the Snappy algorithm, deserialize it into a leaf node, and filter on the time range and equivalent conditions ;
S403)、对过滤后的一系列数据元祖进行aggregate操作,序列化后发送到查询调度器query dispatcher。S403), perform an aggregate operation on the filtered series of data primitives, serialize them, and send them to the query dispatcher.
S5)、接收返回的子查询结果并合并返回给用户。S5), receive the returned sub-query results and combine them and return them to the user.
如图2所示,流数据写入分布式文件系统的chunk文件内部结构。chunk包含了B+Tree模板部分和叶节点两部分。图中的template代表了B+Tree模板部分,leaf node代表叶节点部分,compress chunk代表叶节点分组压缩后的数据块。As shown in Figure 2, the stream data is written to the internal structure of the chunk file of the distributed file system. The chunk contains the B+Tree template part and the leaf node part. The template in the figure represents the B+Tree template part, the leaf node represents the leaf node part, and the compress chunk represents the data block grouped and compressed by the leaf node.
B+Tree模板部分包含B+Tree的根节点和内部节点部分,每个节点记录了key值,孩子节点等,最高一层内部节点还记录了一列叶节点在所有叶节点中的相对偏移量,一列叶节点分组压缩后在chunk的偏移量。The B+Tree template part includes the root node and the internal node part of the B+Tree. Each node records the key value, child nodes, etc. The highest internal node also records the relative offset of a list of leaf nodes in all leaf nodes. , the offset of a list of leaf nodes in the chunk after grouping compression.
叶节点包括key数组部分和数据数组部分,所有节点按从左到右的顺序进行连续存储。存储文件时,模板部分作为整体写入chunk,而叶节点以分组压缩后的形式写入chunk,每组叶节点个数N设定为20,提高压缩比例来处理空间存储的问题。Leaf nodes include the key array part and the data array part, and all nodes are stored consecutively from left to right. When storing files, the template part is written into the chunk as a whole, and the leaf nodes are written into the chunk in the form of group compression. The number of leaf nodes in each group is set to 20, and the compression ratio is increased to deal with the problem of space storage.
如图3所示,流数据存储结构chunk内的叶节点部分数据布局。数据布局由两部分组成,一个是key数组,一个是数据数组。图中的index array代表了key数组,data array代表了数据数组。Key数组存储顺序的key值,其中包括了一个指向数据数组的偏移量,搜索时通过在Key数组找到符合条件范围的Key值和偏移量,再到数据数组中拿到对应的数据元祖。As shown in Figure 3, the data layout of the leaf nodes in the stream data storage structure chunk. The data layout consists of two parts, one is the key array and the other is the data array. The index array in the figure represents the key array, and the data array represents the data array. The key value in the storage order of the Key array includes an offset pointing to the data array. When searching, the key value and offset that meet the conditions are found in the Key array, and then the corresponding data tuple is obtained from the data array.
如图4所示,处理查询分解调度的算法可以表示为一张图。图中的Pending Set代表了所有还未被分配的子查询,S(qi)代表每个子查询的最优分配Subquery Server,代表每个子查询的Subquery优先级数组,优选服务器队列PreferedServer Arrays代表子查询服务器(Subquery Server)对所有未处理子查询的优先级队列。Pending Set不为空时,每个Subquery Server根据文件系统存储在本地的数据范围和缓存中的数据范围,对Set中的子查询进行优先级排序,排序完成后对所有Subquery Server按照ID顺序,将优选服务器队列PreferedServer Arrays中未处理过的子查询进行分配,直到Pending Set所有子查询都进行处理为止。As shown in Figure 4, the algorithm for handling query decomposition scheduling can be represented as a graph. The Pending Set in the figure represents all the subqueries that have not been assigned, and S(q i ) represents the optimal assigned Subquery Server for each subquery, Subquery priority array representing each subquery, PreferredServer Arrays represents the priority queue of the Subquery Server for all unprocessed subqueries. When the Pending Set is not empty, each Subquery Server prioritizes the subqueries in the Set according to the local data range stored in the file system and the data range in the cache. The subqueries that have not been processed in the PreferredServer Arrays of the preferred server queue are allocated until all subqueries of the Pending Set are processed.
上述实施例和说明书中描述的只是说明本发明的原理和最佳实施例,在不脱离本发明精神和范围的前提下,本发明还会有各种变化和改进,这些变化和改进都落入要求保护的本发明范围内。What is described in the above-mentioned embodiments and specification is only to illustrate the principle and best embodiment of the present invention. Without departing from the spirit and scope of the present invention, the present invention will also have various changes and improvements, and these changes and improvements all fall within the scope of the present invention. within the scope of the claimed invention.
Claims (8)
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201910026601.2A CN109726225B (en) | 2019-01-11 | 2019-01-11 | A Storm-based distributed stream data storage and query method |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201910026601.2A CN109726225B (en) | 2019-01-11 | 2019-01-11 | A Storm-based distributed stream data storage and query method |
Publications (2)
Publication Number | Publication Date |
---|---|
CN109726225A true CN109726225A (en) | 2019-05-07 |
CN109726225B CN109726225B (en) | 2023-08-01 |
Family
ID=66299136
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN201910026601.2A Active CN109726225B (en) | 2019-01-11 | 2019-01-11 | A Storm-based distributed stream data storage and query method |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN109726225B (en) |
Cited By (8)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN110515990A (en) * | 2019-07-23 | 2019-11-29 | 华信永道(北京)科技股份有限公司 | Data query methods of exhibiting and inquiry display systems |
CN111241099A (en) * | 2020-01-09 | 2020-06-05 | 佛山科学技术学院 | Industrial big data storage method and device |
CN111310230A (en) * | 2020-02-10 | 2020-06-19 | 腾讯云计算(北京)有限责任公司 | Spatial data processing method, device, equipment and medium |
WO2020248150A1 (en) * | 2019-06-12 | 2020-12-17 | Alibaba Group Holding Limited | Method and system for answering multi-dimensional analytical queries under local differential privacy |
CN115563103A (en) * | 2022-09-15 | 2023-01-03 | 河南星环众志信息科技有限公司 | Multi-dimensional aggregation method, system, electronic device and storage medium |
CN116244313A (en) * | 2023-05-08 | 2023-06-09 | 北京四维纵横数据技术有限公司 | JSON data storage and access method, device, computer equipment and medium |
CN117076466A (en) * | 2023-10-18 | 2023-11-17 | 河北因朵科技有限公司 | Rapid data indexing method for large archive database |
CN117689451A (en) * | 2024-01-31 | 2024-03-12 | 浙江大学 | Flink-based stream vector search method, device and system |
Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN103678520A (en) * | 2013-11-29 | 2014-03-26 | 中国科学院计算技术研究所 | Multi-dimensional interval query method and system based on cloud computing |
US20140172867A1 (en) * | 2012-12-17 | 2014-06-19 | General Electric Company | Method for storage, querying, and analysis of time series data |
CN105589951A (en) * | 2015-12-18 | 2016-05-18 | 中国科学院计算机网络信息中心 | Distributed type storage method and parallel query method for mass remote-sensing image metadata |
CN107357659A (en) * | 2017-07-04 | 2017-11-17 | 东北大学 | Towards the group technology and querying method of Storm successive ranges inquiry GSLB |
-
2019
- 2019-01-11 CN CN201910026601.2A patent/CN109726225B/en active Active
Patent Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US20140172867A1 (en) * | 2012-12-17 | 2014-06-19 | General Electric Company | Method for storage, querying, and analysis of time series data |
CN103678520A (en) * | 2013-11-29 | 2014-03-26 | 中国科学院计算技术研究所 | Multi-dimensional interval query method and system based on cloud computing |
CN105589951A (en) * | 2015-12-18 | 2016-05-18 | 中国科学院计算机网络信息中心 | Distributed type storage method and parallel query method for mass remote-sensing image metadata |
CN107357659A (en) * | 2017-07-04 | 2017-11-17 | 东北大学 | Towards the group technology and querying method of Storm successive ranges inquiry GSLB |
Non-Patent Citations (1)
Title |
---|
朱东升等: "基于Hadoop平台的地铁NCC数据中心方案研究", 《计算机测量与控制》 * |
Cited By (13)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN113811868A (en) * | 2019-06-12 | 2021-12-17 | 阿里巴巴集团控股有限公司 | Method and system for answering multidimensional analytical queries under local differential privacy |
WO2020248150A1 (en) * | 2019-06-12 | 2020-12-17 | Alibaba Group Holding Limited | Method and system for answering multi-dimensional analytical queries under local differential privacy |
CN110515990A (en) * | 2019-07-23 | 2019-11-29 | 华信永道(北京)科技股份有限公司 | Data query methods of exhibiting and inquiry display systems |
CN111241099A (en) * | 2020-01-09 | 2020-06-05 | 佛山科学技术学院 | Industrial big data storage method and device |
CN111310230B (en) * | 2020-02-10 | 2023-04-14 | 腾讯云计算(北京)有限责任公司 | Spatial data processing method, device, equipment and medium |
CN111310230A (en) * | 2020-02-10 | 2020-06-19 | 腾讯云计算(北京)有限责任公司 | Spatial data processing method, device, equipment and medium |
CN115563103A (en) * | 2022-09-15 | 2023-01-03 | 河南星环众志信息科技有限公司 | Multi-dimensional aggregation method, system, electronic device and storage medium |
CN115563103B (en) * | 2022-09-15 | 2023-12-08 | 河南星环众志信息科技有限公司 | Multi-dimensional aggregation method, system, electronic equipment and storage medium |
CN116244313A (en) * | 2023-05-08 | 2023-06-09 | 北京四维纵横数据技术有限公司 | JSON data storage and access method, device, computer equipment and medium |
CN117076466A (en) * | 2023-10-18 | 2023-11-17 | 河北因朵科技有限公司 | Rapid data indexing method for large archive database |
CN117076466B (en) * | 2023-10-18 | 2023-12-29 | 河北因朵科技有限公司 | Rapid data indexing method for large archive database |
CN117689451A (en) * | 2024-01-31 | 2024-03-12 | 浙江大学 | Flink-based stream vector search method, device and system |
CN117689451B (en) * | 2024-01-31 | 2024-04-26 | 浙江大学 | Flink-based stream vector search method, device and system |
Also Published As
Publication number | Publication date |
---|---|
CN109726225B (en) | 2023-08-01 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN109726225B (en) | A Storm-based distributed stream data storage and query method | |
CN107423368B (en) | Spatio-temporal data indexing method in non-relational database | |
Liao et al. | Multi-dimensional index on hadoop distributed file system | |
CN103020204B (en) | A kind of method and its system carrying out multi-dimensional interval query to distributed sequence list | |
CN103366015B (en) | A kind of OLAP data based on Hadoop stores and querying method | |
US9535956B2 (en) | Efficient set operation execution using a single group-by operation | |
Liu et al. | U-skyline: A new skyline query for uncertain databases | |
CN103246749B (en) | The matrix database system and its querying method that Based on Distributed calculates | |
CN102722531B (en) | Query method based on regional bitmap indexes in cloud environment | |
CN104850572A (en) | HBase non-primary key index building and inquiring method and system | |
US20150006509A1 (en) | Incremental maintenance of range-partitioned statistics for query optimization | |
CN103678520A (en) | Multi-dimensional interval query method and system based on cloud computing | |
CN110287391A (en) | Hadoop-based multi-level trajectory data storage method, storage medium and terminal | |
CN107220285A (en) | Towards the temporal index construction method of magnanimity track point data | |
WO2017161540A1 (en) | Data query method, data object storage method and data system | |
Lee et al. | Efficient processing of multiple continuous skyline queries over a data stream | |
US12026162B2 (en) | Data query method and apparatus, computing device, and storage medium | |
Gao et al. | Real-time social media retrieval with spatial, temporal and social constraints | |
CN110059149A (en) | Electronic map spatial key Querying Distributed directory system and method | |
Singh et al. | SWST: A disk based index for sliding window spatio-temporal data | |
He et al. | TMan: a high-performance trajectory data management system based on key-value stores | |
Elmeiligy et al. | An efficient parallel indexing structure for multi-dimensional big data using spark | |
CN110275885A (en) | Hadoop-based multi-level trajectory data storage device | |
Tian et al. | Tinba: Incremental partitioning for efficient trajectory analytics | |
Wang et al. | Waterwheel: Realtime indexing and temporal range query processing over massive data streams |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |