[go: up one dir, main page]

CN108170527B - Remote multi-activity distributed message consumption method and device - Google Patents

Remote multi-activity distributed message consumption method and device Download PDF

Info

Publication number
CN108170527B
CN108170527B CN201711350825.6A CN201711350825A CN108170527B CN 108170527 B CN108170527 B CN 108170527B CN 201711350825 A CN201711350825 A CN 201711350825A CN 108170527 B CN108170527 B CN 108170527B
Authority
CN
China
Prior art keywords
consumption
service cluster
service
cluster
clusters
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN201711350825.6A
Other languages
Chinese (zh)
Other versions
CN108170527A (en
Inventor
冯浩
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
Beijing QIYI Century Science and Technology Co Ltd
Original Assignee
Beijing QIYI Century Science and Technology Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by Beijing QIYI Century Science and Technology Co Ltd filed Critical Beijing QIYI Century Science and Technology Co Ltd
Priority to CN201711350825.6A priority Critical patent/CN108170527B/en
Publication of CN108170527A publication Critical patent/CN108170527A/en
Application granted granted Critical
Publication of CN108170527B publication Critical patent/CN108170527B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Images

Classifications

    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L67/00Network arrangements or protocols for supporting network services or applications
    • H04L67/01Protocols
    • H04L67/10Protocols in which an application is distributed across nodes in the network
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/50Allocation of resources, e.g. of the central processing unit [CPU]
    • G06F9/5061Partitioning or combining of resources
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/46Multiprogramming arrangements
    • G06F9/50Allocation of resources, e.g. of the central processing unit [CPU]
    • G06F9/5083Techniques for rebalancing the load in a distributed system
    • HELECTRICITY
    • H04ELECTRIC COMMUNICATION TECHNIQUE
    • H04LTRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
    • H04L51/00User-to-user messaging in packet-switching networks, transmitted according to store-and-forward or real-time protocols, e.g. e-mail

Landscapes

  • Engineering & Computer Science (AREA)
  • Software Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Computer Networks & Wireless Communication (AREA)
  • Signal Processing (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Management, Administration, Business Operations System, And Electronic Commerce (AREA)
  • Hardware Redundancy (AREA)

Abstract

The embodiment of the invention provides a consumption method and a device of distributed messages of multiple activities in different places, wherein the method comprises the following steps: monitoring whether at least two service clusters of a server side are available; if the situation that the at least two service clusters are available is monitored, selecting any one of the at least two service clusters as a main service cluster; and connecting the selected main service cluster for consumption. In the embodiment of the invention, the availability of all service clusters at the server end is monitored, a main service cluster is selected from the available service clusters, and then the connection is established with the main service cluster and consumption is started. That is to say, in this embodiment, the consumption end selects the main service cluster from the available service clusters, so that the high availability of the service clusters is improved, and after the consumption end is switched to a new service cluster, consumption is started according to the reset consumption progress, thereby greatly reducing the repeated consumption of messages.

Description

Remote multi-activity distributed message consumption method and device
Technical Field
The invention relates to the technical field of computers, in particular to a distributed message consumption method and device for multiple activities in different places.
Background
Kafka is a distributed, partitionable, replicable, distributed messaging system that operates in a cluster and may be composed of one or more services. The distributed message system is used as an important component in the distributed system, mainly solves the problems of application coupling, asynchronous messages, traffic cutting and the like, and realizes a high-performance, high-availability, scalable and final consistency framework. Is an indispensable middleware of a large-scale distributed system. The Kafka cluster is a typical representative thereof. The existing Kafka clusters are separately deployed for operation and maintenance, because the number of Kafka clusters is large, the Kafka clusters are served to consumers through Data Centers (DC), once a problem occurs in a certain data center, all Kafka clusters connected with the data center cannot be served to the consumers, the consumers need to perform Kafka cluster switching, and after the consumers switch to a new Kafka cluster, the consumers need to re-consume, a large number of repeated messages are generated, and thus the satisfaction degree of the consumers is reduced.
Therefore, how to achieve high availability of services across data centers and reduce the generated repeated messages in the process of switching clusters by consumers is a technical problem to be solved at present.
Disclosure of Invention
The technical problem to be solved by the embodiments of the present invention is to provide a distributed message consumption method for multiple different places and multiple services, so as to solve the problem that in the prior art, high availability of a service across data centers cannot be realized, which causes a large amount of repeated messages to be generated in a cluster switching process by a consumer, and reduces satisfaction.
Correspondingly, the embodiment of the invention also provides a distributed message consumption device for multiple activities at different places, which is used for ensuring the realization and the application of the method.
In order to solve the problems, the invention is realized by the following technical scheme:
a first aspect provides a method of consumption of a placeshifting multi-alive distributed message, the method comprising:
the consumption end monitors whether at least two service clusters in the service end are available;
if the consumption end monitors that the at least two service clusters are available, one of the at least two service clusters is selected as a main service cluster;
and the consumption end is connected with the selected main service cluster for consumption.
Optionally, the manner of selecting any one of the at least two service clusters as the main service cluster is as follows:
and the consumption end selects one service cluster with higher priority in the at least two service clusters as a main service cluster for consumption.
Optionally, the method further includes:
after the consumption is started, the consumption end continuously monitors whether the main service cluster is available;
when the consumption end monitors that the main service cluster is unavailable, reselecting a service cluster with a high suboptimal priority from the at least two service clusters as a consumed main service cluster;
and the consumption end is connected with the reselected main service cluster for consumption.
Optionally, the method further includes:
calculating a consumption starting offset value initialized by the consumption end before the consumption end is connected to the reselected main service cluster for consumption;
the consumption end resets the consumption progress of the consumption end according to the consumption starting offset value;
and the consumption end is connected to the reselected main service cluster for consumption according to the reset consumption progress.
Optionally, the consumption starting offset value is calculated according to the following formula:
offset A is equal to offset B-Lag-adjustment factor offset A is less than or equal to 0,
wherein, the offset a is a consumption starting offset value of a topic message on the service cluster a;
the offset B is a current consumption offset value of the topic on the service cluster B;
the bag is a Lag value consumed by the topic message on the service cluster B;
the adjustment factor is an adjustment constant.
A second aspect provides an off-site multi-alive distributed message consuming apparatus, the apparatus comprising:
the monitoring module is used for monitoring whether at least two service clusters in the server side are available;
the first selection module is used for selecting any one of the at least two service clusters as a main service cluster when the monitoring module monitors that the at least two service clusters are available;
and the first consumption module is used for connecting the main service cluster selected by the first selection module for consumption.
Optionally, the first selecting module is specifically configured to select, when the monitoring module monitors that the at least two service clusters are available, one service cluster with a higher priority level in the at least two service clusters as a consuming master service cluster.
Optionally, the apparatus further comprises: a third selection module and a third consumption module, wherein,
the monitoring module is further configured to continue to monitor whether the main service cluster is available after consumption starts;
the third selecting module is configured to reselect a service cluster with a high suboptimal level from the at least two service clusters as a consuming main service cluster when the monitoring module monitors that the main service cluster is unavailable;
and the third consuming module is used for connecting the main service cluster reselected by the third selecting module for consumption.
Optionally, the apparatus further comprises:
the calculating module is used for calculating the consumption starting offset value initialized by the consumption end before the consumption of the third consumption module;
the setting module is used for resetting the consumption progress of the consumption end according to the consumption starting offset value;
and the third consuming module is further used for connecting to the reselected main service cluster for consumption according to the reset consumption progress.
Optionally, the calculating module calculates the consumption starting offset value according to the following formula:
offset A is equal to offset B-Lag-adjustment factor offset A is less than or equal to 0,
wherein, the offset a is a consumption starting offset value of a topic message on the service cluster a;
the offset B is a current consumption offset value of the topic on the service cluster B;
the bag is a Lag value consumed by the topic message on the service cluster B;
the adjustment factor is an adjustment constant.
Compared with the prior art, the embodiment of the invention has the following advantages:
in the embodiment of the invention, the consumption end selects the main service cluster from the available service clusters, thereby improving the high availability of the service clusters. When the consumption end is connected with the main service cluster and begins to consume, the consumption end begins to monitor the availability of the main service cluster, if the main service cluster is monitored to be unavailable, the main service cluster is reselected, and a consumption starting offset value initialized by the consumption end is calculated; then, resetting the consumption progress of the consumption end according to the consumption starting offset value; and connecting with the reselected main service cluster, and finally consuming in the connected main service cluster according to the reset consumption progress. The embodiment not only improves the high availability of the service cluster, but also starts to consume according to the reset consumption progress after the consumption end is switched to a new service cluster, thereby greatly reducing the repeated consumption of the message.
It is to be understood that both the foregoing general description and the following detailed description are exemplary and explanatory only and are not restrictive of the application.
Drawings
FIG. 1 is a flow chart of a method for consuming a distributed message of multiple removals in a network according to an embodiment of the present invention;
FIG. 2 is another flow chart of a method for consuming a distributed message of multiple removals in a network according to an embodiment of the present invention;
FIG. 3 is another flow chart of a method for consuming a distributed message of multiple removals in a network according to an embodiment of the present invention;
FIG. 4 is a schematic structural diagram of a remote multi-live distributed message consuming apparatus according to an embodiment of the present invention;
FIG. 5 is another structural diagram of a remote multi-live distributed message consuming apparatus according to an embodiment of the present invention;
FIG. 6 is another structural diagram of a remote multi-live distributed message consuming apparatus according to an embodiment of the present invention;
FIG. 7 is another structural diagram of a remote multi-live distributed message consuming apparatus according to an embodiment of the present invention;
fig. 8 is a schematic structural diagram of a distributed message consumption system with multiple different locations according to an embodiment of the present invention.
Detailed Description
In order to make the aforementioned objects, features and advantages of the present invention comprehensible, embodiments accompanied with figures are described in further detail below.
Referring to fig. 1, a flowchart of a method for consuming multiple alive messages in different places according to an embodiment of the present invention is shown, where the method includes:
step 101: the consumption end monitors whether at least two service clusters of the service end are available;
in this step, each consuming end in the consuming end cluster (or each message receiving end in the message receiving end cluster) first monitors whether all the service clusters of the service end are available (i.e. whether the service clusters are alive), and then selects and marks the main service cluster according to the monitoring result. The consumption end can only connect one main service cluster in each consumption process, and the connected main service cluster is selected by the consumption end.
In this embodiment, the consumption end may monitor whether the service cluster is available through an encapsulated Software Development Kit (SDK), and a specific monitoring process thereof is well known to those skilled in the art and will not be described herein again.
The service cluster in this embodiment may be a Kafka cluster.
Step 102: if the situation that the at least two service clusters are available is monitored, the consumption end selects one of the at least two service clusters as a main service cluster;
in this step, if the consumer terminal monitors that one or more service clusters or all service clusters of the server terminal are available, any service cluster can be selected from the available service clusters as a master cluster, that is, the selected available service cluster is marked as a master service cluster.
Of course, a service cluster with a higher cluster priority may also be used as the main service cluster according to the selected service, specifically:
in one mode, if the consumer side stores the priorities of the available service clusters, then the priorities of the available service clusters are compared, and the service cluster with the high priority is selected as the main priority;
in another case, the consumer side obtains the priorities of the at least two service clusters from the health check center; then, the service cluster with high priority can be used as the main priority.
In this embodiment, the priority of the service cluster is artificially defined, that is, the priority of the fixed cluster is set, assuming that the priorities of the service cluster a and the service cluster B are P1 and P2, respectively, and setting P1> P2, when both the service cluster a and the service cluster B are available, the service cluster a may be selected as the master cluster; when service cluster a is unavailable (e.g., fails), service cluster B is selected as the master cluster. And then, when the service cluster A is found to be available from the fault state, switching to the service cluster A again.
Step 103: and the consumption end is connected with the selected main service cluster for consumption.
And the consumption end establishes connection with the main service cluster and starts to consume.
In the embodiment of the invention, the availability of all service clusters at the server end is monitored, a main service cluster is selected from the available service clusters, and then the connection is established with the main service cluster and consumption is started. That is, in this embodiment, the consumption end selects the master service cluster from the available service clusters, thereby improving the high availability of the service clusters.
Referring to fig. 2, another flowchart of a method for consuming a distributed message of multiple remote locations according to an embodiment of the present invention is shown, where the difference in the embodiment is that a consuming side only monitors that one service cluster is available, and the method includes:
step 201: the consumption end monitors whether at least two service clusters of the service end are available;
the step is the same as step 101, which is described in detail above and will not be described herein again.
Step 202: if only one service cluster in the at least two service clusters is available, the consumption end takes the available service cluster as a main service cluster;
in this step, if only one service cluster is available at the monitoring server of the consumption end, the available service cluster is selected as the main service cluster, that is, the service cluster is marked as the main service cluster.
Step 203: and the consumption end is connected with the selected main service cluster for consumption.
And the consumption end establishes connection with the selected main service cluster and starts to consume.
Optionally, in another embodiment, on the basis of the above embodiment, the method may further include: and if none of the at least two service clusters is available, stopping consuming to any one of the two service clusters.
In the embodiment of the invention, the consumption end selects one of the monitored available service clusters as the main service cluster, and then the main service cluster is connected with the main service cluster to start consumption, so that the high availability of the service cluster is improved.
Referring to fig. 3, another flowchart of a consumption method for a distributed message of multiple different locations and multiple activities provided in an embodiment of the present invention is shown, where the difference between the embodiment and the above embodiment is that after a consuming end starts consuming, availability of a main service cluster connected to the consuming end is monitored, and when the main service cluster is unavailable, a new service cluster is switched to for consumption, and for convenience of description, the embodiment takes the embodiment of fig. 1 as an example, but is not limited thereto, and the specific process is as follows:
step 301: the consumption end monitors whether at least two service clusters of the service end are available;
step 302: if the situation that the at least two service clusters are available is monitored, the consumption end selects any one of the at least two service clusters as a main service cluster;
step 303: the consumption end is connected with the selected main service cluster for consumption;
the steps 301 to 303 are the same as the steps 101 to 103, and the specific implementation process is described above, which is not described herein again.
Step 304: after starting consumption, the consumption end monitors whether the main service cluster is available;
after the consumption end starts, the consumption end continues to monitor the availability of the main service cluster, i.e. the survivability of the service cluster. The mode of periodic monitoring is the same as the mode of monitoring whether the service cluster is available, which is described in detail above and is not described herein again.
Step 305: when the monitoring shows that the main service cluster is unavailable, the consuming end reselects a service cluster with a high suboptimal level from the at least two service clusters as a consuming main service cluster;
and when the consumption end monitors that the main service cluster is unavailable, selecting the service cluster with high suboptimal level as a new main service cluster.
Step 306: and the consumption end is connected with the reselected main service cluster for consumption.
And the consumption end establishes connection with the reselected main service cluster and starts to consume.
Optionally, in another embodiment, on the basis of the above embodiment, the method may further include:
calculating a consumption starting offset value of the consumption end before the consumption end is connected to the reselected main service cluster for consumption;
the consumption end resets the consumption progress of the consumption end according to the consumption starting offset value;
and the consumption end is connected with the selected main service cluster for consumption according to the reset consumption progress.
In this embodiment, the consuming end calculates the consuming start Offset value of the consuming end, and may calculate the consuming start Offset value consumed by each consuming end on the corresponding service cluster by using Offset synchronization (Offset sync) application, where a specific calculation formula of the consuming start Offset value is as follows: offset A is equal to offset B-Lag-adjustment factor, and offset A is less than or equal to 0.
Wherein, offset a is a consumption starting offset value of a certain topic (topic) information on the service cluster a;
the offset B is a current consumption offset value of the topic information on the service cluster B;
the Lag is a Lag value consumed by the topic on the service cluster B, and the calculation formula is as follows: log size-Offset, where the Offset value can be obtained from Zookeeper, Offset is a consumed Offset value marked as having been consumed for a single partition of a certain topic (topic) in a traffic cluster (such as kafka cluster); logsize is the total number of messages for a certain topic individual partition in a service cluster (such as a kafka cluster). Wherein, Partition is a Partition contained by Topic, and Topic usually has 0-8 partitions for 9 partitions.
The adjustment factor is an adjustment value, i.e., an adjustment constant, set to further improve accuracy. Can be flexibly set according to consumption speed and switching time. The calculation formula is as follows: QPS Time interval, wherein QPS is the Query Per Second (Query Per Second) rate at which the consumer is monitored. Resetting the consumption progress of the consumption end according to the consumption starting offset value;
that is, the consumption end resets the consumption progress according to the consumption starting offset value, so that the consumption is performed according to the reset consumption progress on the switching to the main service cluster, and the repeated consumption of the message is reduced to the greatest extent.
In the embodiment of the invention, when a consumption end is connected with a main service cluster and begins to consume, the consumption end begins to monitor the availability of the main service cluster, and if the main service cluster is monitored to be unavailable, the main service cluster is reselected and the consumption starting offset value initialized by the consumption end is calculated; then, resetting the consumption progress of the consumption end according to the consumption starting offset value; and connecting with the reselected main service cluster, and finally consuming in the connected main service cluster according to the reset consumption progress. The embodiment not only improves the high availability of the service cluster, but also starts to consume according to the reset consumption progress after the consumption end is switched to a new service cluster, thereby greatly reducing the repeated consumption of the message.
It should be noted that, for simplicity of description, the method embodiments are described as a series of acts or combination of acts, but those skilled in the art will recognize that the present invention is not limited by the illustrated order of acts, as some steps may occur in other orders or concurrently in accordance with the embodiments of the present invention. Further, those skilled in the art will appreciate that the embodiments described in the specification are presently preferred and that no particular act is required to implement the invention.
Referring to fig. 4, a schematic structural diagram of a remote multi-alive distributed message consuming apparatus according to an embodiment of the present invention is shown, where the apparatus includes: a monitoring module 41, a first selection module 42 and a first consumption module 43, wherein,
a monitoring module 41, configured to monitor whether at least two service clusters of the server are available;
a first selecting module 42, configured to select any one of the at least two service clusters as a main service cluster when the monitoring module 41 monitors that the at least two service clusters are both available;
the first selecting module 42 is specifically configured to select, when the monitoring module 41 monitors that the at least two service clusters are available, one service cluster with a higher priority level in the at least two service clusters as a consuming master service cluster.
A first consuming module 43, configured to connect the main service cluster selected by the first selecting module 42 for consumption.
In the embodiment of the invention, the availability of all service clusters at the server end is monitored, a main service cluster is selected from the available service clusters, and then the connection is established with the main service cluster and consumption is started. That is, in this embodiment, the consumption end selects the master service cluster from the available service clusters, thereby improving the high availability of the service clusters.
Optionally, in another embodiment, on the basis of the above embodiment, the apparatus may further include: a second selection module 51 and a second consumption module 52, which are schematically shown in fig. 5, wherein,
a second selecting module 51, configured to, when the monitoring module 41 monitors that only one service cluster of the at least two service clusters is available, use the available service cluster as a main service cluster;
a second consuming module 52, configured to connect the main service cluster selected by the second selecting module 51 for consumption.
Optionally, in another embodiment, on the basis of the above embodiment, the apparatus may further include: the sending module (not shown) is stopped, wherein,
and the sending stopping module is used for stopping consuming any one of the two service clusters when the monitoring module monitors that none of the two service clusters is available.
Optionally, in another embodiment, on the basis of the above embodiment, the apparatus may further include: a third selection module 61 and a third consuming module 62, which are schematically shown in fig. 6, wherein,
the monitoring module 41 is further configured to continue to monitor whether the main service cluster is available after the consumption starts;
a third selecting module 61, configured to reselect a service cluster with a higher suboptimal level from the at least two service clusters as a consuming main service cluster when the monitoring module 41 monitors that the main service cluster is unavailable;
a third consuming module 62, configured to connect the main service cluster reselected by the third selecting module 61 for consumption.
Optionally, in another embodiment, on the basis of the above embodiment, the apparatus may further include: a calculation module 71 and a setting module 72, the structure of which is schematically shown in FIG. 7,
a calculating module 71, configured to calculate a consumption starting offset value initialized by the consuming end before the third consuming module 62 consumes;
the calculating module 71 calculates the consumption starting offset value according to the following formula:
offset A is equal to offset B-Lag-adjustment factor, and offset A is less than or equal to 0
Wherein, offset a is a consumption starting offset value of a certain topic (topic) information on the service cluster a;
the offset B is a current consumption offset value of the topic information on the service cluster B;
the Lag is a Lag value consumed by the topic on the service cluster B, and the calculation formula is as follows: log size-Offset, where the Offset value can be obtained from Zookeeper, Offset is a consumed Offset value marked as having been consumed for a single partition of a certain topic (topic) in a traffic cluster (such as kafka cluster); logsize is the total number of messages for a certain topic individual partition in a service cluster (such as a kafka cluster). Wherein, Partition is a Partition contained by Topic, and Topic usually has 0-8 partitions for 9 partitions.
The adjustment factor is an adjustment value set to further improve accuracy. Can be flexibly set according to consumption speed and switching time. The calculation formula is as follows: QPS Time interval, wherein QPS is the Query Per Second (Query Per Second) rate at which the consumer is monitored.
A setting module 72, configured to reset the consumption progress of the consuming end according to the consumption starting offset value;
the third consuming module 62 is further configured to connect to the reselected main service cluster for consumption according to the reset consumption schedule.
Optionally, the device may be integrated on each consumption cluster of the consumption end, or may be deployed independently, which is not limited in this embodiment.
The implementation process of the functions and actions of each module in the device is detailed in the implementation process of the corresponding step in the method, and is not described herein again.
In the embodiment of the invention, when monitoring that the main service cluster is unavailable, the consumption end acquires a new service cluster and a corresponding consumption starting offset value again, and resets the starting point of the consumption progress of the consumption end, and then starts to consume according to the reset starting point when switching to the new service cluster, so that the repeated consumption of messages is reduced to the maximum extent.
Referring to fig. 8, a schematic structural diagram of a remote multi-live distributed message consumption system according to an embodiment of the present invention is shown, where the system includes: an analog Multiplexer 80 (AQM), a production side 81, a service side 82 and a consumption side 83. For convenience of illustration, in the present embodiment, the production side 81 includes the Producer1 and the Producer2 as an example, the service side 82 includes the Health Check Center 821(HCC, Health Check Center), the service cluster (Kafka) a and the service cluster (Kafka) B, and the zookeepers a and B corresponding to Kafka a and Kafka B, respectively, as an example, and the consumption side 83 includes the Consumer1 and the Consumer2 as an example.
In this embodiment, if, AQM80 sends a beijing telecom message to Producer 1; sending a Beijing Unicom message to Producer 2; the following description will take Producer1 as an example. Producer2 is similar to Producer 1;
after receiving the Beijing telecommunication message, the Producer1 monitors whether KafkaA and KafkaB of the service end 82 are available; if the KafkaA and the KafkaB are both available, the Producer1 sends the message (namely the received Beijing telecommunication message) issued by the Producer1 to the KafkaA and the KafkaB respectively; kafkaa and Kafkab will store the received Beijing telecommunication messages respectively, while ZookeeperA mainly functions to maintain and monitor the state change of the data stored by Kafkaa, and by monitoring the state change of these data, the cluster management based on data can be achieved. Similarly, zookeepers b are mainly used to maintain and monitor the state change of the data stored in KafkaB, and by monitoring the state change of the data, the data-based cluster management can be achieved.
Said Producer1 selecting one Kafka from said KafkaA and KafkaB as a primary Kafka for consumer-side ligation; the selection process comprises the following steps: the Producer1 obtains the distributed lock from the central zookeeper of the server 82; then, one Kafka is selected from the KafkA and the KafkAB as the main Kafka through the distributed lock, for example, the selected Kafka is marked as the main Kafka. Producer1 then updates the information of the master KafkaA to available KafkaB and to the health check center HCC. KafkaB of the server 82 and the health check centre HCC store the received information of the master KafkaA.
The health check center 821 of the server 82 checks whether each service cluster (such as KafkaA and KafkaB) of the server is available; if the health check center 821 checks that at least two service clusters (e.g., KafkaA and KafkaB) are available, then cluster priorities are marked for the at least two service clusters that are available, e.g., KafkaA has a priority of P1, KafkaB has a priority of P2, P1> P2 is set according to agreed rules, etc.
The health check center 821 also checks whether each service cluster of the server is available, and if yes, the service cluster information is reserved in a specific zookeeper of the available service cluster; and if the service cluster is detected to be unavailable, deleting the service cluster information in the appointed zookeeper of the unavailable service cluster.
A Consumer (such as Consumer1) monitors whether at least two service clusters (such as KafkaA and KafkaB) in a server are available; if the at least two service clusters (such as KafkaA and KafkaB) are available, selecting any one of the at least two service clusters as a main service cluster; for example, KafkaA is selected as the master service cluster, and of course, KafkaB may also be selected as the master service cluster. And then, the consumption end is connected with the selected main service cluster for consumption.
Further, the consuming side may select one of the at least two service clusters with a higher priority as a consuming master service cluster. If KafkaA has a priority of P1, KafkaB has a priority of P2, and P1> P2, KafkaA is selected as the consuming main service cluster.
Further, after the consumption end is connected with the selected main service cluster for consumption, the consumption end continuously monitors whether the main service cluster is available; when the consumption end monitors that the main service cluster is unavailable, reselecting a service cluster with a high suboptimal priority from the at least two service clusters as a consumed main service cluster; and the consumption end is connected with the reselected main service cluster for consumption.
Further, before the consumption end is connected to the reselected main service cluster for consumption, calculating a consumption starting offset value initialized by the consumption end; resetting the consumption progress of the consumption end according to the consumption starting offset value; and connecting to the reselected main service cluster for consumption according to the reset consumption schedule.
For the device embodiment, since it is basically similar to the method embodiment, the description is simple, and for the relevant points, refer to the partial description of the method embodiment.
The embodiments in the present specification are described in a progressive manner, each embodiment focuses on differences from other embodiments, and the same and similar parts among the embodiments are referred to each other.
As will be appreciated by one skilled in the art, embodiments of the present invention may be provided as a method, apparatus, or computer program product. Accordingly, embodiments of the present invention may take the form of an entirely hardware embodiment, an entirely software embodiment or an embodiment combining software and hardware aspects. Furthermore, embodiments of the present invention may take the form of a computer program product embodied on one or more computer-usable storage media (including, but not limited to, disk storage, CD-ROM, optical storage, and the like) having computer-usable program code embodied therein.
Embodiments of the present invention are described with reference to flowchart illustrations and/or block diagrams of methods, terminal devices (systems), and computer program products according to embodiments of the invention. It will be understood that each flow and/or block of the flow diagrams and/or block diagrams, and combinations of flows and/or blocks in the flow diagrams and/or block diagrams, can be implemented by computer program instructions. These computer program instructions may be provided to a processor of a general purpose computer, special purpose computer, embedded processor, or other programmable data processing terminal to produce a machine, such that the instructions, which execute via the processor of the computer or other programmable data processing terminal, create means for implementing the functions specified in the flowchart flow or flows and/or block diagram block or blocks.
These computer program instructions may also be stored in a computer-readable memory that can direct a computer or other programmable data processing terminal to function in a particular manner, such that the instructions stored in the computer-readable memory produce an article of manufacture including instruction means which implement the function specified in the flowchart flow or flows and/or block diagram block or blocks.
These computer program instructions may also be loaded onto a computer or other programmable data processing terminal to cause a series of operational steps to be performed on the computer or other programmable terminal to produce a computer implemented process such that the instructions which execute on the computer or other programmable terminal provide steps for implementing the functions specified in the flowchart flow or flows and/or block diagram block or blocks.
While preferred embodiments of the present invention have been described, additional variations and modifications of these embodiments may occur to those skilled in the art once they learn of the basic inventive concepts. Therefore, it is intended that the appended claims be interpreted as including preferred embodiments and all such alterations and modifications as fall within the scope of the embodiments of the invention.
Finally, it should also be noted that, herein, relational terms such as first and second, and the like may be used solely to distinguish one entity or action from another entity or action without necessarily requiring or implying any actual such relationship or order between such entities or actions. Also, the terms "comprises," "comprising," or any other variation thereof, are intended to cover a non-exclusive inclusion, such that a process, method, article, or terminal that comprises a list of elements does not include only those elements but may include other elements not expressly listed or inherent to such process, method, article, or terminal. Without further limitation, an element defined by the phrase "comprising an … …" does not exclude the presence of other like elements in a process, method, article, or terminal that comprises the element.
The method and apparatus provided by the present invention are described in detail, and the principle and the implementation of the present invention are described herein by using specific examples, which are only used to help understand the method and the core idea of the present invention; meanwhile, for a person skilled in the art, according to the idea of the present invention, there may be variations in the specific embodiments and the application scope, and in summary, the content of the present specification should not be construed as a limitation to the present invention.

Claims (4)

1. A consumption method of distributed messages of multiple activities at different places is characterized by comprising the following steps:
the consumption end monitors whether at least two service clusters in the service end are available;
if the consumption end monitors that the at least two service clusters are available, one of the at least two service clusters is selected as a main service cluster;
the consumption end establishes connection with the selected main service cluster and starts to consume;
after the consumption is started, the consumption end continuously monitors whether the main service cluster is available;
when the consumption end monitors that the main service cluster is unavailable, reselecting a service cluster with a high suboptimal priority from the at least two service clusters as a consumed main service cluster;
the consumption end is connected with the reselected main service cluster for consumption;
calculating a consumption starting offset value initialized by the consumption end before the consumption end is connected to the reselected main service cluster for consumption;
the consumption end resets the consumption progress of the consumption end according to the consumption starting offset value;
the consumption end is connected to the reselected main service cluster for consumption according to the reset consumption progress;
wherein the consumption starting offset value is calculated according to the following formula:
offset a-offset b-lang-adjustment factor offset a is less than or equal to 0,
wherein, the offset a is a consumption starting offset value of a topic message on the service cluster a;
the offset B is a current consumption offset value of the topic on the service cluster B;
the bag is a Lag value consumed by the topic message on the service cluster B, and a calculation formula of the bag is as follows: log size-Offset; the Offset is a consumption Offset value marked as consumed by a topic individual Partition in a service cluster B, the LogSize is a message total amount of the topic individual Partition in the service cluster B, and the Partition is a Partition contained in the topic;
the adjustment factor is an adjustment constant.
2. The method of claim 1, wherein the selecting any one of the at least two service clusters as a master service cluster comprises:
and the consumption end selects one service cluster with higher priority in the at least two service clusters as a main service cluster for consumption.
3. A displaced multi-live distributed message consuming apparatus, comprising:
the monitoring module is used for monitoring whether at least two service clusters in the server side are available;
the first selection module is used for selecting any one of the at least two service clusters as a main service cluster when the monitoring module monitors that the at least two service clusters are available;
the first consumption module is used for establishing connection with the main service cluster selected by the first selection module and starting consumption;
the monitoring module is further configured to continue to monitor whether the main service cluster is available after consumption starts;
a third selecting module, configured to reselect a service cluster with a higher suboptimal level from the at least two service clusters as a consuming main service cluster when the monitoring module monitors that the main service cluster is unavailable;
the third consuming module is used for connecting the main service cluster reselected by the third selecting module to consume;
the calculating module is used for calculating the consumption starting offset value initialized by the consumption end before the consumption of the third consumption module;
the setting module is used for resetting the consumption progress of the consumption end according to the consumption starting offset value;
the third consuming module is further configured to connect to the reselected main service cluster for consumption according to the reset consumption schedule;
the calculation module calculates the consumption starting offset value according to the following formula:
offset a-offset b-lang-adjustment factor offset a is less than or equal to 0,
wherein, the offset a is a consumption starting offset value of a topic message on the service cluster a;
the offset B is a current consumption offset value of the topic on the service cluster B;
the bag is a Lag value consumed by the topic message on the service cluster B, and a calculation formula of the bag is as follows: log size-Offset; the Offset is a consumption Offset value marked as consumed by a topic individual Partition in a service cluster B, the LogSize is a message total amount of the topic individual Partition in the service cluster B, and the Partition is a Partition contained in the topic;
the adjustment factor is an adjustment constant.
4. The apparatus of claim 3,
the first selection module is specifically configured to select, when the monitoring module monitors that the at least two service clusters are available, one service cluster with a higher priority level of the at least two service clusters as a consuming master service cluster.
CN201711350825.6A 2017-12-15 2017-12-15 Remote multi-activity distributed message consumption method and device Active CN108170527B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN201711350825.6A CN108170527B (en) 2017-12-15 2017-12-15 Remote multi-activity distributed message consumption method and device

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN201711350825.6A CN108170527B (en) 2017-12-15 2017-12-15 Remote multi-activity distributed message consumption method and device

Publications (2)

Publication Number Publication Date
CN108170527A CN108170527A (en) 2018-06-15
CN108170527B true CN108170527B (en) 2021-06-22

Family

ID=62522346

Family Applications (1)

Application Number Title Priority Date Filing Date
CN201711350825.6A Active CN108170527B (en) 2017-12-15 2017-12-15 Remote multi-activity distributed message consumption method and device

Country Status (1)

Country Link
CN (1) CN108170527B (en)

Families Citing this family (2)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN109688200A (en) * 2018-11-30 2019-04-26 北京奇艺世纪科技有限公司 A kind of message treatment method, device and equipment
CN115269725B (en) * 2022-07-25 2023-07-28 中电金信软件有限公司 Data synchronization method and system based on message middleware cluster

Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN107451147A (en) * 2016-05-31 2017-12-08 北京京东尚科信息技术有限公司 A kind of method and apparatus of kafka clusters switching at runtime

Family Cites Families (5)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US9678812B2 (en) * 2014-12-22 2017-06-13 International Business Machines Corporation Addressing for inter-thread push communication
CN107463468A (en) * 2016-06-02 2017-12-12 北京京东尚科信息技术有限公司 Buffer memory management method and its equipment
CN106789741B (en) * 2016-12-26 2020-02-18 北京奇虎科技有限公司 Consumption method and device for message queue
CN106649766B (en) * 2016-12-27 2020-12-29 北京锐安科技有限公司 A message processing method based on Kafka
CN107465735B (en) * 2017-07-31 2020-08-14 杭州多麦电子商务股份有限公司 Distributed messaging system

Patent Citations (1)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN107451147A (en) * 2016-05-31 2017-12-08 北京京东尚科信息技术有限公司 A kind of method and apparatus of kafka clusters switching at runtime

Also Published As

Publication number Publication date
CN108170527A (en) 2018-06-15

Similar Documents

Publication Publication Date Title
CN113014634B (en) Cluster election processing method, device, equipment and storage medium
CN108322358B (en) Method and device for sending, processing and consuming multi-live distributed messages in different places
CN109921942B (en) Cloud platform switching control method, device and system and electronic equipment
CN103744809A (en) Method for dual-computer hot-standby of vehicle information management system on basis of VRRP
CN112333249B (en) Business service system and method
CN105376083A (en) Energy-saving control method, management server and network equipment
CN103888277A (en) Gateway disaster recovery backup method, apparatus and system
CN114070739B (en) Cluster deployment method, device, equipment and computer readable storage medium
CN112395269B (en) MySQL high availability group building method and device
CN108199912B (en) Method and device for managing and consuming distributed messages of multiple activities in different places
CN106464516B (en) Event handling in a network management system
CN108540341A (en) resource monitoring method and device
CN108170527B (en) Remote multi-activity distributed message consumption method and device
CN110247980B (en) Gateway control method in local area network and gateway
CN108243222A (en) Server network architecture method and device
CN107682411A (en) A kind of extensive SDN controllers cluster and network system
CN112087506B (en) Cluster node management method and device and computer storage medium
CN103312541A (en) Management method of high-availability mutual backup cluster
CN113422623B (en) Management method, system, device, electronic equipment and storage medium
CN103188099B (en) A kind of backup method of multi-application system, Apparatus and system
CN113765690B (en) Cluster switching method, system, device, terminal, server and storage medium
CN110266790B (en) Edge cluster management method and device, edge cluster and readable storage medium
CN113326100A (en) Cluster management method, device and equipment and computer storage medium
CN110324564A (en) A kind of videoconference data synchronous method and device
CN104079663A (en) Distributed type real-time synchronizing network system and data annunciating method thereof

Legal Events

Date Code Title Description
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant