Disclosure of Invention
The technical problem to be solved by the invention is to provide a hierarchical storage fault-tolerant method for streaming data processing aiming at the defects of the prior art, to solve the problems of I/O operation and search efficiency of a fault-tolerant subsystem, develop research on a storage strategy of the fault-tolerant subsystem and a Kafka overall system, realize related optimization of streaming data storage and strategy iteration of hierarchical storage of the Kafka system, and further improve the advantages of low delay and high throughput.
In order to solve the technical problems, the technical scheme adopted by the invention is as follows:
a fault-tolerant processing system related to the method is based on a Kafka system, adopts a two-level storage structure of hybrid storage of an HDD and an SSD, wherein the SSD is a performance layer, the HDD is a storage layer, and a corresponding hierarchical storage strategy is established through a hot data identification algorithm;
the hierarchical storage strategy comprises self-judgment hierarchical storage, Leader-Follower hierarchical storage and user-defined hierarchical storage; the self-judgment grading storage is to grade through the data heat of the log, then respectively store the data in the SSD (solid State disk) of the performance layer and the HDD (hard disk drive) of the storage layer according to the data heat and the hardware environment, and carry out data migration when the data heat changes; in the method, a log.dirs configuration item in the server.properties is assigned with a path, and after the assigned 'hard configuration', a Leader node is stored in the SSD; the user-defined hierarchical storage faces users, if certain data need to be stored in the SSD or the HDD under special conditions in the using process of the Kafka, the users designate and select the storage position at the moment;
the hot data identification algorithm comprises an LFU algorithm and an LRU algorithm which are respectively used for preventing the frequent migration of data and judging hot data; firstly, acquiring hot data information in a periodic task of a ReplicaManager according to an LRU algorithm, then extracting and processing the information, updating the rest HotData data, then performing Counter calculation in the LFU algorithm according to the data, sequencing according to a new Counter value, then performing Score sequencing on the rear half part of a Counter sequencing table, namely performing double sequencing, and performing data migration through the sequencing table after double processing; after migration, the Counter and lastdecrertime are updated again as with the LFU algorithm.
Further, Counter calculation in the LFU algorithm includes an increase calculation and an attenuation process;
building a thermal data storage model by taking into account the heat of the data, the model data structure comprising: counter, Score, SSDorHDD, TopicPartion, LastReadTime, LastWriteTime, UbleSpace, and LastDecreTime;
the specific steps of performing the additional calculation on the counter are as follows:
step 1.1: extracting LFUinitval and LFUOgFactor from the thermal data storage model, wherein LFUinitval is a preset initial value, and LFUOgFactor is a growth difficulty parameter;
step 1.2: inputting a Counter value;
step 1.3: if the Counter is greater than or equal to the set boundary value 255, directly finishing the Counter increment calculation step;
step 1.4: setting a judgment value p, if the Counter is smaller than LFUinitval, making p equal to 0, otherwise, setting p as the difference value of two parameters, namely Counter-LFUinitval;
step 1.5: using a random number algorithm, carrying out negative feedback on the Counter indirectly through a negative feedback mode on p, applying a formula (1), if the result is true, enabling the Counter to increase by one time, and if the result is false, directly jumping out of the step 1.5;
1.0/(p × LFUOLOgFactor +1) -random number (mat
Step 1.6: outputting the Counter;
if a certain data is not accessed for a long time, although the access frequency of the certain data is extremely high in the previous working phase, when the access time interval rises, the Counter needs to be subjected to attenuation processing, and the specific steps of the attenuation processing are as follows:
step 2.1: extracting LFUdecapayTime and lastDecreTime from the thermal data storage model, wherein LFUdecapayTime is an attenuation coefficient, and lastDecreTime is attenuation time;
step 2.2: inputting a Counter value;
step 2.3: when the attenuation coefficient lfudecacytime is not 0, the difference ratio between the running time of the current round and the attenuation time, i.e., (time-lastdecrytime)/lastdecrytime, is used as the attenuation value of the Counter;
step 2.4: carrying out attenuation processing on the Counter according to an attenuation value obtained by a formula (time-lastDecrreTime)/lastDecrreTime, wherein the Counter has a boundary and is 0 at minimum;
step 2.6: and outputting the Counter.
Adopt the produced beneficial effect of above-mentioned technical scheme to lie in: the hierarchical storage fault-tolerant method for streaming data processing, which is provided by the invention, aims at the problems of I/O operation and search efficiency of the fault-tolerant subsystem, develops research on the storage strategy of the fault-tolerant subsystem and the Kafka overall system, realizes related optimization of streaming data storage and strategy iteration of hierarchical storage of the Kafka system, and further improves the advantages of low delay and high throughput. Self-judging hierarchical storage enables the system to have the capability of automatically classifying data and automatically selecting data storage positions; the Leader-Follower storage method has the advantages that Leader files in Kafka are all stored in high-speed storage media such as SSD, the running efficiency and the fault tolerance performance of a system are improved, the method can be further applied to computing engines such as Spark and the like in an expanded mode, and Checkpoint files such as RDD and the like are stored in the high-speed storage media such as SSD. In the self-judging grading storage provided by the invention, the corresponding hot data identification algorithm is required to provide the system with the capability of automatically classifying data, the invention provides that the hot data is judged by using an LRU algorithm, the weight of data classification is provided for the system in a negative feedback effect mode, and the LFU algorithm is used for calculating through the built-in attribute of the data, so that the data is prevented from being frequently migrated. In large-scale distributed cluster computing, frequent data migration is prevented through an LFU algorithm, the operation efficiency of the system can be integrally improved, the resource occupation is reduced, data can be rapidly received through an LRU algorithm, the data congestion is prevented, and the actual operation cost of hardware storage is reduced.
Detailed Description
The following detailed description of embodiments of the present invention is provided in connection with the accompanying drawings and examples. The following examples are intended to illustrate the invention but are not intended to limit the scope of the invention.
The method is a hierarchical storage fault-tolerant method facing stream data processing, and a fault-tolerant processing system related to the method is based on a Kafka system and adopts a two-level storage structure of hybrid storage of an HDD and an SSD, wherein the SSD is a performance layer, the HDD is a storage layer, and a corresponding hierarchical storage strategy is established through a hot data identification algorithm.
The fault-tolerant processing system is based on a Kafka system, and the Kafka serving as an intermediate cache system has the advantages of high throughput, high decoupling performance, dynamic expansion, low delay, high speed and the like. However, the above advantages are mainly realized in Kafka by relying on high-speed and high-performance I/O operations, even if hardware conditions are met as much as possible, the problems of high efficiency, high price, low price and low efficiency still exist, and practical application must consider cost, and a single algorithm upgrade cannot generate better changes, so that a hierarchical storage strategy can be considered to improve the efficiency of a fault-tolerant system and even the Kafka overall system.
As can be seen from the description of the mechanisms in fig. 1, in the Kafka fault-tolerant subsystem, the main fault-tolerant mechanism relies on the Leader, and by using the Leader to ensure message security, recover server data, and help the system to reselect the Controller, even if all the leaders are down, the Leader needs to be reselected from the ISR or the Replica. Therefore, the Leader is the key point for improving the fault-tolerant processing subsystem, the I/O operation efficiency is improved, the fault-tolerant efficiency and the overall operation efficiency of the system are greatly improved, and the system throughput is improved.
Fig. 2 shows a Data storage and Leader storage model for a hierarchical storage strategy.
The subject in Kafka's practical application corresponds to a certain service, and is more like a logic concept. A Topic may be divided into multiple partitions, which are pre-received parts of the log file, and the system will automatically update the log file each time a message is received by a Partition, and add an attribute offset to the message to mark the location of the message in the log. During the release process, the message is stored in a disk location assigned by the Broker server, and the message is also stored strictly according to the offset, so that different Consumers can receive the read message conveniently.
In large-scale distributed cluster computing, the system needs a high-efficiency and safe hierarchical storage strategy, the self-judging hierarchical system can meet the requirement, the hierarchical storage strategy is realized in a reasonable range with high cost performance, the thermal data is judged by using a thermal data identification algorithm of an LFU (Linear feedback Unit) and an LRU (least recently used) and provides related data for self-judging hierarchical storage, and FIG. 3 is a flow chart of the self-judging hierarchical storage.
The technical point of this embodiment is high-speed I/O operation, after the above hierarchical storage algorithm, hot data is stored in the performance layer SSD, and for the Leader file of Kafka (RDD and Checkpoint in Spark) data identification is performed by a Leader-Follower hierarchical storage policy, after successful identification, it is also stored in the performance layer SSD, and when periodic check is required, the SSD can quickly provide a fast read function for Checkpoint access. Even if errors such as overtime or downtime occur, because the Leader file is stored in the SSD, the rapid recovery can still be realized; if the hard disk loses data of a certain partition, all Leader data are stored in the partition, and a part of Leader nodes can be rapidly elected by an election algorithm built in the Kafka according to a Follower file with high access frequency, so that the system can realize efficient fault-tolerant processing and accelerate fault-tolerant recovery.
The user-defined hierarchical storage is user-oriented, and the user can select the storage location by himself, as shown in fig. 4, which is a flow chart of the user-defined hierarchical storage.
In terms of efficiency improvement of the fault-tolerant processing system, a considerable part of data access frequency is found to be extremely high, such as Leader nodes, RDD partition data, and the like. The thermal data refers to the common data, and the heat degree of the data is considered by the following points: the access frequency, the access interval, the I/O hit rate, the data relevance, the granularity, even the size of the data itself, and so on, so that the determination and the identification are performed according to the above conditions to facilitate the data migration problem involved in the hierarchical storage, a general hot data storage model is shown in fig. 5, and the model data structure includes: counter, Score, SSDorHDD, TopicPartion, LastReadTime, LastWriteTime, UbleSpace, and LastDecreTime.
The Counter in the above thermal data storage model is applied to the S-LFU and S-LRFU of the thermal data identification algorithm, and is mainly applied to the number of hits of the log or the number of migrations of the first partition, the Counter needs to define a limit, such as 0-255, and the following is the step of performing the increment calculation for the Counter:
step 1.1: extracting LFUinitval and LFUOgFactor from the thermal data storage model in FIG. 4, wherein LFUinitval is a preset initial value, and LFUOgFactor is a growth difficulty parameter;
step 1.2: inputting a Counter value;
step 1.3: if the Counter is greater than or equal to the set boundary value 255, directly finishing the Counter increment calculation step;
setting a boundary value to enable part of high-heat data to realize corresponding fast checking and fast storing, and also ensuring that new high-heat data can be rapidly subjected to 'bit complementing' after the heat degree of the data is reduced to some extent, so that excessive occupation of SSD resources is avoided;
step 1.4: setting a judgment value p, if the Counter is smaller than LFUinitval, making p equal to 0, otherwise, setting p as the difference value of two parameters, namely Counter-LFUinitval;
step 1.5: using a random number algorithm, carrying out negative feedback on the Counter indirectly through a negative feedback mode on p, applying a formula (1), if the result is true, enabling the Counter to increase by one time, and if the result is false, directly jumping out of the step 1.5;
1.0/(p × LFUOLOgFactor +1) -random number (mat
A random number strategy is used, only when p is larger than a random number, the Counter is added with one, and the Counter value is more difficult to be seen through two formulas of Counter-LFUinitval and p 1.0/(base x LFUOgFactor +1), the Counter value is larger, the increase of the Counter value is more difficult, the problem that the increase rate is too fast is solved by using a negative feedback effect, and the reliability that the Counter can be used as a hit rate parameter can be ensured;
step 1.6: and outputting the Counter.
Thermal data considerations include time intervals in addition to access frequency, so if a certain data is not accessed for a long time, and the access time interval rises despite its extremely high access frequency in the previous operation stage, it is necessary to perform attenuation processing on its Counter, and the following steps are expressed as attenuation processing:
step 2.1: extracting lfudecametime and lastDecreTime from the thermal data storage model in fig. 4, where lfudecametime is an attenuation coefficient and lastDecreTime is an attenuation time;
step 2.2: inputting a Counter value;
step 2.3: when the attenuation coefficient lfudecacytime is not 0, the difference ratio between the running time of the current round and the attenuation time, i.e., (time-lastdecrytime)/lastdecrytime, is used as the attenuation value of the Counter;
step 2.3, a negative feedback mechanism is not used as in step 1.3, but the counter is directly reduced according to time and multiplying power of an attenuation coefficient (preset value), and the method efficiently solves the problem that temporary data occupies excessive space;
step 2.4: performing attenuation processing on the Counter according to the attenuation value obtained by the formula (time-lastdecrytime)/lastdecrytime, and noting that the Counter has a boundary (minimum 0);
step 2.6: and outputting the Counter.
After the hot data discrimination flag is determined, the hot data identification algorithm based on the page replacement strategy is established for Kafka design in the embodiment, and the hot data identification algorithm can be designed according to the access frequency and the access time interval.
The traditional LFU algorithm carries out page replacement according to the access frequency of data, selects the least frequently used data to replace, and in the two-level storage strategy currently used in Kafka, the frequently accessed data can be placed in the SSD, and the infrequently accessed data can be placed in the HDD, so that the data searching efficiency and the data transmission efficiency can be improved. As shown in fig. 6, is a specific implementation process of the algorithm.
As shown in fig. 5, in the hot data identification model, during the read/write operation, the Counter is updated, the lastdecrertime and the Counter value in the HotData are updated, the local copies are sorted according to the updated Counter value, the high access frequency is sent to the SSD, the low access frequency is sent to the HDD, if the Counter values are the same and conflict, the sorting is performed according to the updated lastdecrertime value, and if the data itself is already in the SSD or HDD, the data does not need to be migrated, and only the value is updated.
The LRU algorithm is the least recently used algorithm, and directly judges that the data which is not used for a long time is not used in the future, so that the data which is not used for a long time is directly replaced, an algorithm which is suitable for a Kafka system can be established by using the step of counter addition calculation and the step of decay processing, the data which are not used for a long time in the SSD are migrated to the HDD, and the data which are used for a last time and are increased in the number of times in the HDD are migrated to the SSD. As shown in fig. 7, is a specific implementation process of the LRU algorithm.
In the Score calculation method in the LRU method, the criterion whether topicPartition exists is whether the topicPartition can be found in the value to brokerid, lastReadTime and lastWriteTime are the latest read and write time stored in the hot data, and lastReadTime and lastWriteTime are log records obtained by log logging. The weighted value of the Score method is derived from the actual operation of a Kafka system, the Leader is the key of a fault-tolerant processing system, so the weighted value is set to be 4, the time occupation cost of write operation is higher than that of read operation after the actual operation is found by setting a corresponding negative feedback algorithm, the weighted value of the write operation is set to be 2, the weighted value of the read operation is set to be 1, and the algorithm judgment speed and the system operation efficiency reach the highest.
After the Score is calculated, namely the data is directly updated, the HotData is updated and sorted, at the moment, if the scores are the same (the possibility is higher), sorting is carried out according to the UusableSpace, and data migration is carried out on the sorted result.
The LFU and the LRU algorithms can be used for preventing frequent data migration and judging hot data respectively, while the two algorithms are supposed to participate in work simultaneously when being really used for large-scale actual operation, and the migration frequency of related data is recorded by counting the data migration times on the basis of the LFU. Fig. 8 shows a specific implementation process of combining the LFU and the LRU algorithm. The binding algorithm first obtains the hot data information in the recurrent manager's periodic tasks according to the LRU algorithm, then processes the information, and updates the rest of the HotData data. Then, Counter calculation in the LFU algorithm is performed through these data, sorting is performed according to the new Counter value, and then Score sorting (double sorting) is performed for the latter half of the Counter sorting table, and data migration is performed through the double-processed sorting table. The Counter may indicate how frequently the data is migrated, so the first half of the Counter sorted list is migrated frequently and does not participate in the proper Score sorting. After migration, the Counter and lastdecrertime are updated again as with the LFU algorithm.
The page replacement algorithms implemented above serve for self-judgment hierarchical storage, and for the overall system, especially for large-scale cluster storage, a fast judgment of a computer machine is still required, and based on Kafka, a distinctive storage manner is also required, and this embodiment includes the following storage manners: self-judging hierarchical storage, Leader-Follower hierarchical storage and user-defined hierarchical storage.
In the design, the secondary storage structure is designed to be SSD + HDD, and the storage capacity of the SSD in the performance layer is limited, so the algorithm is required to serve large-scale streaming data cluster calculation, serve and improve the original Kafka system, so that the buffer storage data volume is larger, the data congestion is avoided, the operation speed and the fault-tolerant efficiency are higher, and the larger-scale operation can be realized. And through the improvement of the performance layer, the efficiency of the improved system for fault-tolerant processing, recovery algorithm and the like can be better improved.
Under the condition of good relevant environment, because the data is identified according to the data heat degree, the information is already bound on the data, and the data can be classified quickly to realize hierarchical storage (less occupied resources and high hierarchical speed). The self-judgment grading storage is to grade through the data heat of the log, then store the data in the performance layer SSD and the storage layer HDD respectively according to the data heat and the hardware environment, and perform data migration when the data heat changes (according to the use frequency interval time and the like).
The LFU is used for preventing data from being frequently migrated in hot data identification, the LRU algorithm judges hot data, the two algorithms are actually used for self-judging hierarchical storage and simultaneously participate in work, the data migration times are counted on the basis of the LFU, the migration frequency of related data is recorded, and then the LRU algorithm is used for related migration operation. The algorithm flow can be summarized as: firstly, acquiring HotData information, acquiring LastReadTime, LastWriteTime, Counter, TopicPartion and other attribute information, updating the Counter and Score values through LFU and LRU algorithm, and updating HotData again. And after the data migration, updating the Counter data for the data which is actually subjected to the migration operation again, wherein the updating operation is to ignore the data which is already in the SSD but is judged to be migrated to the SSD.
In the Kafka system, each partition can select one copy as a Leader (when the copy is more than one), the Leader node is very important and is a hub node for communication between Kafka producers and consumers, the utilization rate (the access frequency and the heat degree of data) of the Leader node is very high, and the Leader node can become high-heat data at any time even if the Leader node is not used in a certain time period, so that the Leader node can be directly stored to a performance layer, the I/O performance is greatly improved, the overall speed of the system is improved, the recovery speed of a fault-tolerant recovery mechanism can be greatly improved by storing the Leader in an SSD, and the fault-tolerant performance is greatly improved. The RDD data file mentioned above can also directly store high-heat part data in the RDD, and if a system exists, fault-tolerant recovery such as coarse-grained conversion can be rapidly carried out.
The method is that log.dirs configuration items in the server. By placing the above-described file inside the SSD, "fast response" of the fault tolerant processing can be quickly and efficiently achieved. When Kafka is used with Spark systems, even if partial loss of the RDD subfile occurs, for example, the lost file can be quickly recovered by the line Lineage layer fault-tolerant mechanism on the SSD.
If a particular situation requires some data to be stored in the SSD or HDD during the use of Kafka, the user can specify that the storage device be selected to improve the ease of use of the Kafka system, and thus manual custom tiered storage is important.
Log.dirs configuration items in the server.properties specify the operation of routing, wherein the security of the performance layer is increased by considering whether the performance layer has the problems or not. In the design process, in order to improve the reliability of data storage, a method of storing SSD when SSD exists and storing HDD when SSD does not exist is adopted, and if no storage equipment exists, an error report is directly sent and is directly handed to a user for error processing.
The method of the embodiment is implemented, and the implementation process is performed in a stand-alone environment and a cluster environment. Respectively testing the efficiency of the original Kafka system and the efficiency of the improved system in comparison with a fault-tolerant subsystem and the overall system, wherein the efficiency comparison of the fault-tolerant subsystem mainly depends on a Leader-Follower hierarchical storage strategy, and the efficiency of the overall system mainly depends on a self-judgment hierarchical storage strategy (particularly under a cluster environment experiment). To ensure that the comparison of the experiments is reliable, other configuration parameters cannot be changed, such as: the number of messages generated by the producer is 10000, the number of messages consumed by the consumer is 10000 according to the producer, the timeout waiting time is preset to be 1000000ms, the upper limit of the throughput of the system is 20000, and the size of each message is set to be 512 KB. In debugging, errors such as worker timeout, exception and Master exception in a cluster environment can be set, so that the source code and the improved efficiency of the system are compared, and an experimental result is obtained.
In the aspect of system implementation, the invention improves the storage structure and the storage strategy of the original Kafka system, thereby not only realizing the original aim of improving the fault-tolerant processing efficiency, but also improving the use efficiency of the entire Kafka system to a certain extent.
On the storage medium, original data storage of Kafka on the HDFS is improved into a two-layer storage structure of SSD + HDD. In terms of storage policy, three storage methods are used: the user-defined hierarchical storage realizes that a user can define the storage position of the data file by modifying the server file; the self-judgment hierarchical storage provides a greater possibility for the high efficiency of large-scale non-distributed cluster operation, and the frequently used files are stored in the SSD through log files and a hot data identification algorithm, so that the system running speed is increased; the Leader-Follower hierarchical storage judges that one log is the Leader or the Follower by modifying files such as servers, log. dirs and the like, and respectively stores the log in the SSD and the HDD, so that the fault-tolerant processing performance is greatly improved, and the overall efficiency of the system is improved.
The implementation results are shown in fig. 8-13, and as can be seen from the comparison of the throughput in fig. 9, both the LRU and LFU strategies can reach more than 3 times of the storage rate of the pure HDD, which exceeds 1.5 times of the storage rate of the pure secondary storage strategy; in the production rate shown in fig. 10, both LRU and LFU far exceed the underlying secondary storage policy. FIG. 11 illustrates consumer throughput, with LRU and LFU throughput being the highest, 2.5 times that of the secondary storage policy, at copy coefficients of 3 and 6; fig. 12 shows that Kafka consumer consumption rate, LRU is more stable, and particularly when the copy coefficient is 1, LFU does not perform well because it mainly deals with the possibility of frequent copy migration, but in large-scale clustering, LFU can be used as a key strategy of the system to prevent large-scale frequent migration of data. Fig. 13 and fig. 14 show that the Leader-Follower hierarchical storage strategy in the stand-alone environment is almost equal to the other two improved strategies in consumption rate and production rate, and even partially exceeds the LFU algorithm (the copy coefficient is large, and the parallelism degree is high), because the communication hub Leader is completely stored on the performance layer in the stand-alone environment. The result comparison shows that the response to the Leader-Follower hierarchical storage strategy of the fault-tolerant system is good, the subsystem speed is greatly improved, the performance improvement is obvious, the performance of a producer can reach more than 1.2 times of the original effect, the effect of the fault-tolerant subsystem can reach 1.5 times, and the performance of a consumer is more than 2 times of the original effect.
Finally, it should be noted that: the above embodiments are only used to illustrate the technical solution of the present invention, and not to limit the same; while the invention has been described in detail and with reference to the foregoing embodiments, it will be understood by those skilled in the art that: the technical solutions described in the foregoing embodiments may still be modified, or some or all of the technical features may be equivalently replaced; such modifications and substitutions do not depart from the spirit of the corresponding technical solutions and scope of the present invention as defined in the appended claims.