CN118193238A - Service information processing method, device, equipment and storage medium - Google Patents
Service information processing method, device, equipment and storage medium Download PDFInfo
- Publication number
- CN118193238A CN118193238A CN202410192717.4A CN202410192717A CN118193238A CN 118193238 A CN118193238 A CN 118193238A CN 202410192717 A CN202410192717 A CN 202410192717A CN 118193238 A CN118193238 A CN 118193238A
- Authority
- CN
- China
- Prior art keywords
- event
- processing
- queue
- message
- consumption
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/547—Remote procedure calls [RPC]; Web services
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06Q—INFORMATION AND COMMUNICATION TECHNOLOGY [ICT] SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES; SYSTEMS OR METHODS SPECIALLY ADAPTED FOR ADMINISTRATIVE, COMMERCIAL, FINANCIAL, MANAGERIAL OR SUPERVISORY PURPOSES, NOT OTHERWISE PROVIDED FOR
- G06Q30/00—Commerce
- G06Q30/06—Buying, selling or leasing transactions
- G06Q30/0601—Electronic shopping [e-shopping]
- G06Q30/0633—Lists, e.g. purchase orders, compilation or processing
- G06Q30/0635—Processing of requisition or of purchase orders
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/548—Queue
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Software Systems (AREA)
- General Physics & Mathematics (AREA)
- Business, Economics & Management (AREA)
- Physics & Mathematics (AREA)
- Accounting & Taxation (AREA)
- Finance (AREA)
- General Engineering & Computer Science (AREA)
- Development Economics (AREA)
- General Business, Economics & Management (AREA)
- Strategic Management (AREA)
- Marketing (AREA)
- Economics (AREA)
- Debugging And Monitoring (AREA)
Abstract
The embodiment of the application discloses a method, a device, equipment and a storage medium for processing service information, which comprise the following steps: receiving a queue hosting request and an event policy, and monitoring a source queue according to a queue configuration parameter in the queue hosting request, wherein the event policy comprises an event processing policy and an event routing policy, consuming and processing the service information monitored from the source queue to obtain a message event, carrying out event operation and maintenance on the message event according to the event processing policy, wherein the number of the monitored service information is one or more, and sending the message event after operation and maintenance to a corresponding processing service according to the event routing policy, so that the corresponding processing is carried out on the message event after operation and maintenance through the processing service, unified persistent storage management can be carried out on each service, the stability of service information processing is improved, and decoupling of consumption and event processing is realized.
Description
Technical Field
The embodiment of the application relates to the technical field of computers, in particular to a business information processing method, a system, equipment and a storage medium.
Background
Message queues are a common data processing approach for asynchronous communication mechanisms, often used to improve system scalability. The sender places the message in a queue, and the receiver obtains the message from the queue and processes it. However, as the service data increases, the number of message queues corresponding to the service data increases, so that the management of the message queues is more and more complex.
In the related art, a general queue message processing method is generally adopted to process a plurality of message queues, such as an order delivery task, a refund deduction task, a gift issuing task and the like, wherein asynchronous tasks are dropped into the asynchronous queues, each service uses an asynchronous script configured by each service to subscribe to the queues and processes the queues by adopting a corresponding processing mode, wherein the task execution of each core service has a set of processing standards which are independent, and the processing standards are uneven and lack standardization and stability.
Disclosure of Invention
The embodiment of the application provides a processing method, a device, equipment and a storage medium for service information, solves the problems of different execution standards and lack of standardization and stability of each service in an asynchronous queue, can perform unified and durable storage management on each service, improves the stability of service information processing, and realizes decoupling of consumption and event processing.
In a first aspect, an embodiment of the present application provides a method for processing service information, including:
receiving a queue hosting request and an event policy, and monitoring a source queue according to a queue configuration parameter in the queue hosting request, wherein the event policy comprises an event processing policy and an event routing policy;
performing consumption processing on the monitored service information in the source queue to obtain a message event, and performing event operation and maintenance on the message event according to the event processing strategy, wherein the number of the monitored service information is one or more;
And sending the message event after operation and maintenance to a corresponding processing service according to the event routing strategy so as to correspondingly process the message event after operation and maintenance through the processing service.
Optionally, after the get message event, the method further includes:
Storing the message event into a message event library, and migrating the message event exceeding the storage time threshold in the message event library into a message event archiving library according to a preset storage time threshold;
correspondingly, the event operation and maintenance of the message event according to the event processing strategy comprises the following steps:
And automatically retrying, manually triggering or playing back the message event in the message event archive and/or the message event library according to the event processing strategy.
Optionally, after the event operation is performed on the message event according to the event processing policy, the method further includes:
Recording the event operation and maintenance process, obtaining a message event record, and updating the state of the message event and the event processing strategy according to the message event record, wherein the updated message event record comprises a message body record, a message processing record or an error record;
and storing the updated message processing record into a message processing library, and storing the error record into an error log library.
Optionally, the monitoring the source queue according to the queue configuration parameter in the queue hosting request includes:
analyzing the queue configuration parameters in the queue hosting request to obtain a queue address and queue authorization information;
And connecting the queue appointed by the queue address according to the queue address and the queue authorization information, and performing consumption monitoring on service information in the queue.
Optionally, the processing policy includes a consumption processing policy, and the processing for consuming the service information monitored from the source queue to obtain a message event includes:
creating a consumption slot according to the received parallel consumption quantity, sequentially filling the business information monitored from the source queue into the consumption slot for parallel consumption processing, and judging whether the parallel consumption processing is abnormal or not;
and under the condition that the parallel consumption processing is abnormal, carrying out abnormal consumption processing on abnormal service information according to the consumption processing strategy to obtain a message event.
Optionally, after the service information monitored from the source queue is sequentially filled into the consumption slot, the method further includes:
distributing information identification for the business information in each consumption slot;
correspondingly, after the message obtaining event, the method further comprises:
And binding the message event and the corresponding information identifier and storing the message event and the corresponding information identifier into a message event library, and searching for a corresponding message event in the message event library according to the information identifier when the consumption fault is detected.
Optionally, before the monitoring the source queue according to the queue configuration parameter in the queue hosting request, the method further includes:
and comparing the queue configuration parameters in the queue hosting request with preset expected parameters, and updating the queue configuration parameters into the expected parameters under the condition that the parameters are inconsistent.
In a second aspect, an embodiment of the present application provides a service information processing apparatus, including:
the queue monitoring module is used for receiving a queue hosting request and a processing strategy, monitoring a source queue according to a queue configuration parameter in the queue hosting request, wherein the processing strategy comprises an event processing strategy and an event routing strategy;
the consumption processing module is used for carrying out consumption processing on the service information monitored from the source queue to obtain a message event;
The operation and maintenance module is used for carrying out event operation and maintenance on the message event according to the event processing strategy, and the number of the monitored service information is one or more;
and the message event sending module is used for sending the message event after operation and maintenance to a corresponding processing service according to the event routing strategy, so that the processing service processes the message event after operation and maintenance.
In a third aspect, an embodiment of the present application provides a service information processing apparatus, including: one or more processors; and a storage device configured to store one or more programs, which when executed by the one or more processors, cause the one or more processors to implement the method for processing service information according to the first aspect.
In a fourth aspect, an embodiment of the present application provides a storage medium containing computer executable instructions which, when executed by a computer processor, are used to perform a method of processing traffic information according to the first aspect.
According to the embodiment of the application, a source queue is monitored according to queue configuration parameters in the queue hosting request by receiving the queue hosting request and the event policy, wherein the event policy comprises an event processing policy and an event routing policy, consumption processing is carried out on business information monitored from the source queue to obtain a message event, event operation is carried out on the message event according to the event processing policy, the number of the monitored business information is one or more, and the operated and maintained message event is sent to a corresponding processing service according to the event routing policy so as to carry out corresponding processing on the operated and maintained message event through the processing service. The operation and maintenance processing of the various types of service information can be performed by adopting the corresponding event processing strategy, the operation and maintenance capability of the service information is improved, any consumption logic is not needed to be considered in the process of processing the service information, and only how the information is processed is needed to be considered. The route of the consumption event is rapidly determined through the event routing strategy, so that the transmission efficiency of the message event can be improved, and the processing efficiency of the message event can be further improved. The whole processing architecture of the service information is clear, the data flow is simple, the influence of the quantity of the service information is avoided, and the stability is high.
Drawings
Fig. 1 is a flowchart of a method for processing service information according to an embodiment of the present application;
FIG. 2 is a schematic structural diagram of a queue hosting platform according to an embodiment of the present application;
FIG. 3 is a flow chart of a method for processing a message event according to an embodiment of the present application;
FIG. 4 is a flowchart of a method for consuming service information in a source queue according to an embodiment of the present application;
Fig. 5 is a schematic structural diagram of a service information processing device according to an embodiment of the present application;
Fig. 6 is a schematic structural diagram of a service information processing device according to an embodiment of the present application.
Detailed Description
In order to make the objects, technical solutions and advantages of the present application more apparent, the following detailed description of specific embodiments of the present application is given with reference to the accompanying drawings. It is to be understood that the specific embodiments described herein are merely illustrative of the application and are not limiting thereof. It should be further noted that, for convenience of description, only some, but not all of the matters related to the present application are shown in the accompanying drawings. Before discussing exemplary embodiments in more detail, it should be mentioned that some exemplary embodiments are described as processes or methods depicted as flowcharts. Although a flowchart depicts operations (or steps) as a sequential process, many of the operations can be performed in parallel, concurrently, or at the same time. Furthermore, the order of the operations may be rearranged. The process may be terminated when its operations are completed, but may have additional steps not included in the figures. The processes may correspond to methods, functions, procedures, subroutines, and the like.
The technical solutions of the embodiments of the present application will be clearly described below with reference to the drawings in the embodiments of the present application, and it is apparent that the described embodiments are some embodiments of the present application, but not all embodiments. All other embodiments, which are obtained by a person skilled in the art based on the embodiments of the present application, fall within the scope of protection of the present application.
The terms first, second and the like in the description and in the claims, are used for distinguishing between similar elements and not necessarily for describing a particular sequential or chronological order. It is to be understood that the data so used may be interchanged, as appropriate, such that embodiments of the present application may be implemented in sequences other than those illustrated or described herein, and that the objects identified by "first," "second," etc. are generally of a type, and are not limited to the number of objects, such as the first object may be one or more. Furthermore, in the description and claims, "and/or" means at least one of the connected objects, and the character "/", generally means that the associated object is an "or" relationship.
The method, device, equipment and medium for processing service information provided by the embodiment of the application are described in detail below through specific embodiments and application scenes thereof with reference to the accompanying drawings.
The embodiment of the application can be used for hosting the consumption queue and carrying out corresponding processing according to the business event type. Based on the above usage scenario, it may be appreciated that the execution body of the present solution may be a queue hosting platform. The processing method of service information provided in this embodiment may be executed by a service information processing device, where the service information processing device may be implemented by software and/or hardware, and the service information processing device may be configured by two or more physical entities or may be configured by one physical entity. In general, the processing device of the service information may be a mobile phone, a computer, a tablet, or the like.
Fig. 1 is a flowchart of a method for processing service information according to an embodiment of the present application, where, as shown in fig. 1, the method includes:
step S101, receiving a queue hosting request and an event policy, wherein the event policy comprises an event processing policy and an event routing policy, and monitoring a source queue according to a queue configuration parameter in the queue hosting request.
In one embodiment, the queue hosting request may be a request for a request queue hosting platform to process business information in a consumer queue. The queue hosting request can be sent by a user through a terminal, and after the queue hosting platform receives the request, the request is stored in a management background, and the queue hosting request is regularly called by a scheduler to process business information in a queue. The event policy may be a processing policy used when processing the service information, and the event policy may include a consumption policy, that is, a processing policy adopted when processing the service information in the queue for consumption. Optionally, the event policy further includes an event handling policy and an event routing policy. The event processing policy may be a processing method for tracking, playing back, analyzing, alarming, etc. the service information. The event routing policy may be used to assign a method of corresponding routing for the post-consumer traffic information. The queue configuration parameter is used for indicating the parameters of the to-be-processed dequeue, and can comprise various service information, the quantity of the service information and the like in the queue, and optionally, the queue configuration parameter also comprises the address of the queue and the queue authorization information. The source queue may be a message queue for storing a plurality of pieces of traffic information, the source queue being a message queue to be consumed. The snoop source queue may snoop whether there are new messages in the source queue.
Fig. 2 is a schematic structural diagram of a queue hosting platform provided by the embodiment of the present application, as shown in fig. 2, a queue hosting request and an event policy are received and stored in a management background, the queue hosting request and the corresponding event policy in the management background are obtained at regular time by a scheduler, a consumption unit is pulled up according to a queue configuration parameter in the queue hosting request to monitor a corresponding source queue, and consumption processing is performed on service information in the source queue.
In one embodiment, analyzing the queue configuration parameters in the queue hosting request to obtain a queue address and queue authorization information; and connecting the queue appointed by the queue address according to the queue address and the queue authorization information, and performing consumption monitoring on service information in the queue.
The queue address is used to represent the location of the service information to be monitored in the queue, and may include a consumption topic and a consumption group name. The consumption topic may be regarded as a message classification method, similar to a tag or classification in mail, and in an exemplary application, multiple topics may be included such as "order shipping", "refund deduction" or "gift certificate issuing", etc. The queue authorization information is used for indicating whether the queue has the queue connection authority, and comprises a queue name and an authorization token. Consumption monitoring is the process of triggering the consumption of a message when it is monitored that there is a new message in the source queue. As shown in fig. 2, the consumption unit determines a queue to be monitored through a queue name, performs security verification according to the authorization token, connects the queue under the condition that the security verification is passed, and performs consumption monitoring on corresponding service information in the source queue according to the consumption subject and the consumption group name. By analyzing the queue address and the queue authorization information, the consumption monitoring of the service information in the source queue can be performed, so that the security of the service information in the source queue can be improved.
And step S102, carrying out consumption processing on the monitored service information in the source queue to obtain a message event, carrying out event operation and maintenance on the message event according to the event processing strategy, wherein the number of the monitored service information is one or more.
The consuming process may be a process of sequentially filling the service information in the source queue into the consuming slot and converting the service data in the consuming slot from handling to an ok state. The message event may be consuming completed business information. Event-handling of a message event refers to retrying, manually triggering, playback, or batch playback of the message event, etc. The business information may be one or more pieces of information under the consumption topic, such as information under the topic of "refund deduction" such as "user refund", "refund retry", etc. In one embodiment, the service information monitored from the source queue is respectively filled into the consumption slots for consumption processing, after the consumption processing is completed, consumption events corresponding to the service information are obtained, and event operation and maintenance are carried out on the message events according to the received event processing strategy. For example, if the received event operation and maintenance policy is automatic retry, automatic retry processing may be performed on the "order shipment" information, and if the received event operation and maintenance policy is manual trigger, manual trigger processing may be performed on the "order shipment" information.
Step S103, the message event after operation and maintenance is sent to a corresponding processing service according to the event routing strategy, so that the corresponding processing is carried out on the message event after operation and maintenance through the processing service.
The processing service is a corresponding service interface set according to the type of the corresponding service information, and the processing of the service information is realized by connecting the corresponding interface. As shown in fig. 2, for example, if the obtained message event is obtained by consuming the service information under the "order delivery" theme, the message event is sent to the event proxy module, and the event proxy module triggers the message body transmission operation after receiving the consumption event, and sends the message event to the delivery processing service interface through the route, and the delivery processing service performs delivery processing on the message event. If the obtained message event is obtained by consuming the service information under the subject of refund processing, the message event is sent to an event proxy module, the event proxy module triggers a message body transmission operation after receiving the message event, and the message event is transmitted to a refund processing service interface through the determined route and refund processing is carried out through refund processing service.
According to the embodiment of the application, a source queue is monitored according to the queue configuration parameters in the queue hosting request by receiving the queue hosting request and the event policy, the event policy comprises an event processing policy and an event routing policy, the business information monitored from the source queue is subjected to consumption processing to obtain a message event, the message event is subjected to event operation according to the event processing policy, the number of the monitored business information is one or more, and the message event after operation is sent to a corresponding processing service according to the event routing policy so as to correspondingly process the message event after operation and maintenance through the processing service. The operation and maintenance processing of the various types of service information can be performed by adopting the corresponding event processing strategy, the operation and maintenance capability of the service information is improved, any consumption logic is not needed to be considered in the process of processing the service information, and only how the information is processed is needed to be considered. The route of the consumption event is rapidly determined through the event routing strategy, so that the transmission efficiency of the message event can be improved, and the processing efficiency of the message event can be further improved. The whole processing architecture of the service information is clear, the data flow is simple, the influence of the quantity of the service information is avoided, and the stability is high.
In one embodiment, before the listening for the source queue according to the queue configuration parameters in the queue hosting request, the method further comprises:
and comparing the queue configuration parameters in the queue hosting request with preset expected parameters, and updating the queue configuration parameters into the expected parameters under the condition that the parameters are inconsistent.
The queue configuration parameters also include copy number, specification, resident consumption process, mounting policy file, update processing policy, etc. The desired parameter may be a parameter preset by the consuming unit, the type of the desired parameter being the same as the type of the queue configuration parameter. After receiving the queue configuration parameters in the queue hosting request, performing one comparison on the expected parameters of the same type in the queue configuration parameters, and if the comparison is inconsistent, updating the queue configuration parameters into the expected parameters. For example, if the number of active ngix pod in the queue configuration parameter is 1 and the number of active ngix pod in the expected parameter is 2, the number of active ngix pod in the queue configuration parameter is updated to 2, and one ngix pod is restarted on the basis of the original state, so as to achieve state consistency.
Fig. 3 is a flowchart of a method for processing a message event according to an embodiment of the present application, where, as shown in fig. 3, the method includes:
step 301, a queue hosting request and an event policy are received, a source queue is monitored according to a queue configuration parameter in the queue hosting request, and the event policy comprises an event processing policy and an event routing policy.
And step S302, carrying out consumption processing on the business information monitored from the source queue to obtain a message event, storing the message event into a message event library, and migrating the message event exceeding the storage time threshold in the message event library into a message event archiving library according to a preset storage time threshold.
In one embodiment, a message event library is used to store business information for completion of the consumption. After the business information monitored from the source queue is subjected to consumption processing to obtain a message event, the obtained message event is stored in a message event library, and the message event in the message event library is processed according to preset conditions. The storage time threshold may be the maximum time that a message event is stored in the message event library. The message event archive may be a database for storing timeout message events. The storage time length of each message event in the message event library can be monitored by creating a timing task, and the message event with the storage time length exceeding the storage time threshold value is migrated to the message event archiving library, so that the storage pressure of the message event library is reduced, and the insertion and query performance of the message event in the message event library is ensured.
And step S303, automatically retrying, manually triggering or playing back the message event in the message event archive and/or the message event library according to the event processing strategy.
In one embodiment, event handling policies include store-i-process success policies, trigger-i-process success policies, store-and-trigger success policies, trigger-and-store failure policies, store-only policies, and no use persistence policies, among others. The message event triggering is carried out after the message storing time, if the message event triggering is successful, the stored message state is updated, and if the stored message processing is successful, the message event processing is regarded as processing completion, and even if the message event triggering is abnormal, the message event is not thrown out. The trigger, namely processing success strategy is to store the triggering state of the message synchronization record after the triggering of the message event, and if the processing of the stored message event is successful, the processing of the message event is regarded as processing completion, and even if the triggering of the message event is abnormal, the message event cannot be thrown out. The triggering and storing success strategy can be that after the message event is stored, the message event is triggered, and finally the stored message state is updated. Message event processing is considered to be complete only if both the store message event and the message event trigger are successful, and even if the message event trigger exception is not thrown. The trigger and store failure strategy can be that when the trigger fails, the store message records the trigger state at the same time, the trigger and store message records the trigger state at the same time, and the message event processing is regarded as the processing completion. Only the storage policy may be that after the storage message event is successfully executed, the message processing is considered complete, the policy does not trigger the message, and the user is required to go to the background to trigger. Message event triggering can be performed without using a persistence policy, and message processing is considered to be processing completion only if message event triggering is successful, and the policy does not store messages. And correspondingly processing the message event in the message event library and/or the message event archiving library according to the received event processing strategy. For example, as shown in fig. 2, if the event processing policy is a store and trigger success policy, the message event database and/or the message event archive database is triggered periodically by a timing task, and if the trigger fails, an automatic retry triggering process is performed to complete the processing of the message event, and if the retry number exceeds a preset retry number, a message event with failed processing is obtained. If the event processing strategy is a storage-only strategy, generating manual triggering prompt information to prompt a user that the event storage of the message is completed and the manual triggering process is carried out in the background.
Step S304, the message event after operation and maintenance is sent to the corresponding processing service according to the event routing strategy, so that the corresponding processing is carried out on the message event after operation and maintenance through the processing service.
And carrying out consumption processing on the business information monitored from the source queue to obtain a message event, storing the message event into a message event library, migrating the message event exceeding the storage time threshold in the message event library into a message event archiving library according to a preset storage time threshold, and carrying out automatic retry, manual triggering or playback processing on the message event in the message event archiving library and/or the message event library according to the event processing strategy. The storage pressure of the message event library can be reduced by migrating the message events in the message event library, so that the insertion and inquiry performances of the message events in the message event library are ensured, the persistence of the message events can be ensured, the message events in the message event library and the message event archiving library are correspondingly processed through an event processing strategy, and the operation and maintenance capabilities of the message events are passed.
In one embodiment, after the event operation is performed on the message event according to the event processing policy, the method further includes: recording the event operation and maintenance process, obtaining a message event record, and updating the state of the message event and the event processing strategy according to the message event record, wherein the updated message event record comprises a message body record, a message processing record or an error record;
and storing the updated message processing record into a message processing library, and storing the error record into an error log library.
The message event record may be used to represent the process of operating on the message event. The status of a message event may include stateless events and stateful events. Wherein the stateless event may be a process failure message event and the stateful event is a process completed message event. The message body record may be used to represent a specific message event. The message processing record may be used to process completed message events. The error record may be used to represent a message event that triggers an error or stores an error. And updating the state and the processing strategy of the message event according to the message event record. As shown in fig. 2, for example, if the message event record is an automatic retry of the stateless message event a, to obtain a stateful message event, the original message event record is updated according to the message event record, the stateless message event a in the message processing record is updated to be a stateful message event, and the message event a is deleted from the error record.
The message processing library is a database for storing message events for which processing is completed. The error log library is a database for storing message event processing failures. And storing the updated message event processing record into a corresponding database, optionally storing the message processing record into a message processing library and storing the error record into an error log library. The queue hosting platform may update the message processing library according to each received message processing record in the message processing library, and may update the error log library according to each received error record in the error log library.
The process of recording the event operation and maintenance is carried out, and a message event record is obtained, and the state of the message event and the event processing strategy are updated according to the message event record, wherein the updated message event record comprises a message body record, a message processing record and an error record; storing the message processing record into a message processing library, storing the error record into an error log library, storing the content in the vanishing event record into a corresponding database to ensure the durability of the message event, and updating the state of the message event and the event processing strategy to avoid repeated processing of the message event and improve the processing efficiency.
Fig. 4 is a flowchart of a method for consuming service information in a source queue according to an embodiment of the present application, as shown in fig. 4, including:
And S401, creating consumption slots according to the received parallel consumption quantity, sequentially filling the business information monitored from the source queue into the consumption slots for parallel consumption processing, and judging whether the parallel consumption processing is abnormal or not.
In one embodiment, the consumption slots are slots for placing service information to be consumed, one service information is placed in each consumption slot, and new service information to be consumed can be added in the consumption slots after the service information in the consumption slots is consumed. The number of the consumption slots can be set according to actual demands, a user can input the number of the consumption slots through the terminal equipment, the queue management platform creates the consumption slots according to the received number of the consumption slots input by the user, and the service information to be consumed in the source queue is sequentially filled into the consumption slots according to the monitoring sequence, so that the service information in the consumption slots is subjected to parallel consumption processing. Wherein the parallel consumption processing can be the simultaneous consumption processing of the business information in a plurality of consumption slots.
Illustratively, if the number of consumer slots receiving user input is 4, then consumer slots A, B, C, D are created. And filling the service information ① to be consumed into the consumption slot A and triggering the processing of the first message, wherein the state of the service information in the consumption slot A is handling, the available slot is pointed to the consumption B, the service information ② to be consumed is filled into the consumption slot B, and when the state of the service information is changed from handling to ok, the service information is consumed. When all four consumption slots are filled with data, checking whether an ok-state consumption slot exists, if so, filling the 5 th business information into the consumption slot, and judging whether abnormal consumption exists in real time in the process of business information consumption. For example, if the state of the second service information in the B consumption slot is handling and the state of the third service information in the C consumption slot is ok, it indicates that the first service information in the source queue has been consumed, but the second service information has not been consumed, and it may be determined that the second service information in the B consumption slot has abnormal consumption.
And step S402, under the condition that the parallel consumption processing is abnormal, carrying out abnormal consumption processing on abnormal service information according to the consumption processing strategy to obtain a message event.
The consumption processing policy may be a processing policy set for a case of a consumption abnormality. The consumption processing policy may include a robust policy, a minimum loss policy, a tolerant loss policy, an asynchronous policy, and a direct discard policy. The stable strategy is to retry continuously when abnormal consumption occurs until the consumption is successful, so that all service information can be ensured not to be lost due to processing failure, and the stable strategy can be used when the stable strategy is sensitive to service information loss. The minimum loss strategy can synchronously retry the appointed times (defaulting 3 times) or change the state from handling to ok after the retry fails when the consumption abnormality occurs, the error is input into a fusing controller to count, the fusing state is entered after the continuous failure exceeds the fusing times, the consumption scheduling rate is slowed down until the abnormality is recovered by fusing, the strategy can cause a little message to be lost, but the strategy can be controlled at a lower level, and the strategy can automatically control the skip when a small part of specific messages cause the processing failure. The first processing of the tolerant loss strategy uses synchronous processing, the state is changed from handling to ok after the first success or failure, if the number of retries designated abnormally is asynchronously executed through a new cooperative program, and finally, if the error still fails, the error is input into the fuse controller for counting. The asynchronous strategy directly uses cooperative asynchronous processing, the state can be changed from handling to ok, the slot control of the scheduler in the mode is not very useful, and if the abnormal retry still fails, the error can be input into the fuse controller for counting. The direct discard strategy is to discard the consumed service information directly, and if it is determined that the service information is worthless and does not want to waste resources to process due to upstream exception, the strategy can be used for direct offset submission of the information directly. Therefore, under the condition of parallel consumption processing exception, the exception service information can be correspondingly processed according to a consumption processing strategy, and the message event can be rapidly obtained.
And creating a consumption slot according to the received parallel consumption quantity, sequentially filling the business information monitored from the source queue into the consumption slot for parallel consumption processing, judging whether the parallel consumption processing is abnormal, and performing abnormal consumption processing on the abnormal business information according to the consumption processing strategy under the condition that the parallel consumption processing is abnormal to obtain a message event. The consumption efficiency of the service information can be improved by the parallel consumption processing mode of the service information in the source queue, and after the consumption abnormality is judged, the abnormal consumption processing of the abnormal service information can be automatically carried out according to the consumption processing strategy, so that the abnormal consumption efficiency is improved, the abnormal condition in the consumption process is not required to be considered by a user, and the use experience of the user is improved.
In one embodiment, after the service information monitored from the source queue is sequentially filled into the consumption slots, an information identifier is allocated to the service information in each consumption slot;
correspondingly, after the message obtaining event, the method further comprises:
And binding the message event and the corresponding information identifier and storing the message event and the corresponding information identifier into a message event library, and searching for a corresponding message event in the message event library according to the information identifier when the consumption fault is detected.
The information identifier is information for identifying service information in the source queue, the information identifier has uniqueness, and the information identifier can represent unique service information. After the business information is filled in the consumption slots in sequence, an information identifier is distributed to the business information in each consumption slot, and after a consumption event is obtained, the information identifier and the corresponding consumption event are bound and stored in a message event library. The consumption position of the service information in the source queue can be judged by the identification information stored last after the maximum identification information stored in the message event library. For example, if the maximum identification information stored in the message event library is 5, it may indicate that the first 5 pieces of service information in the source queue have been consumed, and the next consumption may begin to perform the consumption processing from the 6 th piece of service information. And after the consumption fault is detected, the corresponding fault message event can be quickly searched from the database according to the information identification.
By distributing the information identifier for the service information in each consumption slot and binding and storing the message event and the corresponding information identifier in the message event library, the query efficiency of the consumption event can be improved, and the last consumption position can be conveniently and quickly determined.
Fig. 5 is a schematic structural diagram of a service information processing apparatus according to an embodiment of the present application, where, as shown in fig. 5, the service information processing apparatus includes:
the queue monitoring module 51 is configured to receive a queue hosting request and a processing policy, and monitor a source queue according to a queue configuration parameter in the queue hosting request, where the processing policy includes an event processing policy and an event routing policy;
the consumption processing module 52 is configured to perform consumption processing on the service information monitored from the source queue to obtain a message event;
An operation and maintenance module 53, configured to perform event operation and maintenance on the message event according to the event processing policy, where the number of the monitored service information is one or more;
And the message event sending module 54 is configured to send the message event after operation and maintenance to a corresponding processing service according to the event routing policy, so that the processing service processes the message event after operation and maintenance.
According to the embodiment of the application, a source queue is monitored according to the queue configuration parameters in the queue hosting request by receiving the queue hosting request and the event policy, the event policy comprises an event processing policy and an event routing policy, the business information monitored from the source queue is subjected to consumption processing to obtain a message event, the message event is subjected to event operation according to the event processing policy, the number of the monitored business information is one or more, and the message event after operation is sent to a corresponding processing service according to the event routing policy so as to correspondingly process the message event after operation and maintenance through the processing service. The operation and maintenance processing of the various types of service information can be performed by adopting the corresponding event processing strategy, the operation and maintenance capability of the service information is improved, any consumption logic is not needed to be considered in the process of processing the service information, and only how the information is processed is needed to be considered. The route of the consumption event is rapidly determined through the event routing strategy, so that the transmission efficiency of the message event can be improved, and the processing efficiency of the message event can be further improved. The whole processing architecture of the service information is clear, the data flow is simple, the influence of the quantity of the service information is avoided, and the stability is high.
In one possible embodiment, the processing device of service information further includes a storage module, where the storage module is configured to store the message event in a message event library, and migrate, according to a preset storage time threshold, a message event in the message event library that exceeds the storage time threshold to a message event archive library;
The operation and maintenance module 53 is specifically configured to perform automatic retry, manual triggering or playback processing on the message event in the message event archive and/or the message event library according to the event processing policy.
In one possible embodiment, the processing device of service information further includes an update module, where the update module is configured to record a process of the event operation and maintenance, obtain a message event record, and update a state of the message event and the event processing policy according to the message event record, where the updated message event record includes a message body record, a message processing record, and an error record;
The storage module is further configured to store the message processing record in a message processing library, and store the error record in an error log library.
In one possible embodiment, the service information processing device further includes an parsing module, where the parsing module is configured to parse the queue configuration parameter in the queue hosting request to obtain a queue address and queue authorization information;
The queue monitoring module 51 is specifically configured to connect a queue specified by the queue address according to the queue address and the queue authorization information, and perform consumption monitoring on service information in the queue.
In one possible embodiment, the consumption processing module 52 is specifically configured to create a consumption slot according to the received parallel consumption number, sequentially fill the service information monitored from the source queue into the consumption slot for parallel consumption processing, and determine whether the parallel consumption processing is abnormal;
and under the condition that the parallel consumption processing is abnormal, carrying out abnormal consumption processing on abnormal service information according to the consumption processing strategy to obtain a message event.
In one possible embodiment, the service information processing device further includes an identifier allocation module, where the identifier allocation module is configured to allocate an information identifier to the service information in each consumption slot;
The service information processing device further comprises a searching module, wherein the searching module is used for binding and storing the message event and the corresponding information identifier into a message event library, and searching the corresponding message event in the message event library according to the information identifier when the consumption fault is detected.
In a possible embodiment, the processing device of service information further includes a parameter comparison module, where the parameter comparison module is configured to compare, according to a consumption unit configuration parameter in the queue hosting request, with a preset expected parameter, and update the consumption unit configuration parameter to the expected parameter when the parameters are inconsistent.
The embodiment of the application also provides a service information processing device which can integrate the service information processing device provided by the embodiment of the application. Fig. 6 is a schematic structural diagram of a service information processing device according to an embodiment of the present application. Referring to fig. 6, the service information processing apparatus includes: an input device 63, an output device 64, a memory 62, and one or more processors 61; a memory 62 for storing one or more programs; when the one or more programs are executed by the one or more processors 61, the one or more processors 61 are caused to implement the processing method of service information provided in the above-described embodiment. Wherein the input device 63, the output device 64, the memory 62 and the processor 61 may be connected by a bus or otherwise, for example in fig. 6 by a bus connection.
The memory 62 is a computer readable storage medium, and may be used to store a software program, a computer executable program, and modules, such as program instructions/modules corresponding to the processing method of service information provided in any embodiment of the present application. The memory 62 may mainly include a storage program area and a storage data area, wherein the storage program area may store an operating system, at least one application program required for functions; the storage data area may store data created according to the use of the device, etc. In addition, memory 62 may include high-speed random access memory, and may also include non-volatile memory, such as at least one magnetic disk storage device, flash memory device, or other non-volatile solid-state storage device. In some examples, memory 62 may further comprise memory located remotely from processor 61, which may be connected to the device via a network. Examples of such networks include, but are not limited to, the internet, intranets, local area networks, mobile communication networks, and combinations thereof.
The input means 63 may be used to receive entered numeric or character information and to generate key signal inputs related to user settings and function control of the device. The output device 64 may include a display device such as a display screen.
The processor 61 executes various functional applications of the device and data processing, that is, implements the above-described processing method of service information, by running software programs, instructions, and modules stored in the memory 62.
The processing device, the equipment and the computer for the service information provided by the above embodiment can be used for executing the processing method for the service information provided by any embodiment, and have corresponding functions and beneficial effects.
The embodiment of the present application also provides a storage medium storing computer-executable instructions which, when executed by a computer processor, are configured to perform the service information processing method provided in the embodiment, the service information processing method includes:
receiving a queue hosting request and an event policy, and monitoring a source queue according to a queue configuration parameter in the queue hosting request, wherein the event policy comprises an event processing policy and an event routing policy;
performing consumption processing on the monitored service information in the source queue to obtain a message event, and performing event operation and maintenance on the message event according to the event processing strategy, wherein the number of the monitored service information is one or more;
And sending the message event after operation and maintenance to a corresponding processing service according to the event routing strategy so as to correspondingly process the message event after operation and maintenance through the processing service.
Storage media-any of various types of memory devices or storage devices. The term "storage medium" is intended to include: mounting media such as CD-ROM, floppy disk or tape devices; computer system memory or random access memory such as DRAM, DDR RAM, SRAM, EDO RAM, lanbas (Rambus) RAM, etc.; nonvolatile memory such as flash memory, magnetic media (e.g., hard disk or optical storage); registers or other similar types of memory elements, etc. The storage medium may also include other types of memory or combinations thereof. In addition, the storage medium may be located in a first computer system in which the program is executed, or may be located in a second, different computer system connected to the first computer system through a network such as the internet. The second computer system may provide program instructions to the first computer for execution. The term "storage medium" may include two or more storage media that may reside in different locations (e.g., in different computer systems connected by a network). The storage medium may store program instructions (e.g., embodied as a computer program) executable by one or more processors.
Of course, the storage medium containing the computer executable instructions provided in the embodiments of the present application is not limited to the method for processing service information as described above, and may also perform related operations in the method for processing service information provided in any embodiment of the present application.
The processing system, the device and the storage medium based on the service information provided in the foregoing embodiments may perform the processing method of the service information provided in any embodiment of the present application, and technical details not described in detail in the foregoing embodiments may be referred to the processing method of the service information provided in any embodiment of the present application.
The foregoing description is only of the preferred embodiments of the application and the technical principles employed. The present application is not limited to the specific embodiments described herein, but is capable of numerous modifications, rearrangements and substitutions as will now become apparent to those skilled in the art without departing from the scope of the application. Therefore, while the application has been described in connection with the above embodiments, the application is not limited to the embodiments, but may be embodied in many other equivalent forms without departing from the spirit of the application, the scope of which is set forth in the following claims.
Claims (10)
1. A method for processing service information, comprising:
receiving a queue hosting request and an event policy, and monitoring a source queue according to a queue configuration parameter in the queue hosting request, wherein the event policy comprises an event processing policy and an event routing policy;
performing consumption processing on the monitored service information in the source queue to obtain a message event, and performing event operation and maintenance on the message event according to the event processing strategy, wherein the number of the monitored service information is one or more;
And sending the message event after operation and maintenance to a corresponding processing service according to the event routing strategy so as to correspondingly process the message event after operation and maintenance through the processing service.
2. The method for processing service information according to claim 1, further comprising, after the get message event:
Storing the message event into a message event library, and migrating the message event exceeding the storage time threshold in the message event library into a message event archiving library according to a preset storage time threshold;
correspondingly, the event operation and maintenance of the message event according to the event processing strategy comprises the following steps:
And automatically retrying, manually triggering or playing back the message event in the message event archive and/or the message event library according to the event processing strategy.
3. The method for processing service information according to claim 2, further comprising, after said event operation on said message event according to said event processing policy:
Recording the event operation and maintenance process, obtaining a message event record, and updating the state of the message event and the event processing strategy according to the message event record, wherein the updated message event record comprises a message body record, a message processing record or an error record;
and storing the updated message processing record into a message processing library, and storing the error record into an error log library.
4. The message queue management method according to claim 1, wherein the listening for the source queue according to the queue configuration parameters in the queue hosting request comprises:
analyzing the queue configuration parameters in the queue hosting request to obtain a queue address and queue authorization information;
And connecting the queue appointed by the queue address according to the queue address and the queue authorization information, and performing consumption monitoring on service information in the queue.
5. The message queue management method according to claim 1, wherein the processing policy includes a consumption processing policy, and the performing consumption processing on the service information monitored from the source queue to obtain a message event includes:
creating a consumption slot according to the received parallel consumption quantity, sequentially filling the business information monitored from the source queue into the consumption slot for parallel consumption processing, and judging whether the parallel consumption processing is abnormal or not;
and under the condition that the parallel consumption processing is abnormal, carrying out abnormal consumption processing on abnormal service information according to the consumption processing strategy to obtain a message event.
6. The message queue management method according to claim 5, further comprising, after the service information monitored from the source queue is sequentially filled into the consumption slot:
distributing information identification for the business information in each consumption slot;
correspondingly, after the message obtaining event, the method further comprises:
And binding the message event and the corresponding information identifier and storing the message event and the corresponding information identifier into a message event library, and searching for a corresponding message event in the message event library according to the information identifier when the consumption fault is detected.
7. The message queue management method according to any one of claims 1-6, further comprising, prior to said listening to a source queue according to a queue configuration parameter in the queue hosting request:
and comparing the queue configuration parameters in the queue hosting request with preset expected parameters, and updating the queue configuration parameters into the expected parameters under the condition that the parameters are inconsistent.
8. A service information processing apparatus, comprising:
the queue monitoring module is used for receiving a queue hosting request and a processing strategy, monitoring a source queue according to a queue configuration parameter in the queue hosting request, wherein the processing strategy comprises an event processing strategy and an event routing strategy;
the consumption processing module is used for carrying out consumption processing on the service information monitored from the source queue to obtain a message event;
The operation and maintenance module is used for carrying out event operation and maintenance on the message event according to the event processing strategy, and the number of the monitored service information is one or more;
and the message event sending module is used for sending the message event after operation and maintenance to a corresponding processing service according to the event routing strategy, so that the processing service processes the message event after operation and maintenance.
9. A processing apparatus of service information, the apparatus comprising: one or more processors; storage means for storing one or more programs which, when executed by the one or more processors, cause the one or more processors to implement the method of processing traffic information according to any of claims 1-7.
10. A storage medium storing computer executable instructions which, when executed by a computer processor, are for performing the method of processing traffic information according to any one of claims 1-7.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202410192717.4A CN118193238A (en) | 2024-02-21 | 2024-02-21 | Service information processing method, device, equipment and storage medium |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202410192717.4A CN118193238A (en) | 2024-02-21 | 2024-02-21 | Service information processing method, device, equipment and storage medium |
Publications (1)
Publication Number | Publication Date |
---|---|
CN118193238A true CN118193238A (en) | 2024-06-14 |
Family
ID=91407865
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202410192717.4A Pending CN118193238A (en) | 2024-02-21 | 2024-02-21 | Service information processing method, device, equipment and storage medium |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN118193238A (en) |
Cited By (1)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN118377684A (en) * | 2024-06-25 | 2024-07-23 | 恒生电子股份有限公司 | Service playback method and device |
-
2024
- 2024-02-21 CN CN202410192717.4A patent/CN118193238A/en active Pending
Cited By (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN118377684A (en) * | 2024-06-25 | 2024-07-23 | 恒生电子股份有限公司 | Service playback method and device |
CN118377684B (en) * | 2024-06-25 | 2024-09-27 | 恒生电子股份有限公司 | Service playback method and device |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN111104235B (en) | Queue-based asynchronous processing method and device for service requests | |
CN114338651B (en) | File transmission method, device, electronic equipment and readable storage medium | |
US9705752B2 (en) | Reliably updating a messaging system | |
CN109714409B (en) | Message management method and system | |
CN112118315A (en) | Data processing system, method, device, electronic equipment and storage medium | |
CN114253748B (en) | Message processing system and message processing method | |
CN106034137A (en) | Intelligent scheduling method for distributed system, and distributed service system | |
US9652307B1 (en) | Event system for a distributed fabric | |
CN114900449B (en) | Resource information management method, system and device | |
CN109697112B (en) | Distributed intensive one-stop operating system and implementation method | |
WO2017181430A1 (en) | Method and device for duplicating database in distributed system | |
CN112217847A (en) | Micro service platform, implementation method thereof, electronic device and storage medium | |
CN112559461A (en) | File transmission method and device, storage medium and electronic equipment | |
KR20180037342A (en) | Application software error monitoring, statistics management service and solution method. | |
CN118193238A (en) | Service information processing method, device, equipment and storage medium | |
EP3852363B1 (en) | Device state monitoring method and apparatus | |
CN107426012B (en) | Fault recovery method and device based on super-fusion architecture | |
CN112181627A (en) | Scheduled task scheduling method, device and system | |
CN116405547A (en) | Message pushing method and device, processor, electronic equipment and storage medium | |
CN118585297A (en) | A task execution method and related equipment | |
CN117971520A (en) | Request processing method, device, equipment and medium based on real-time transaction | |
US9967163B2 (en) | Message system for avoiding processing-performance decline | |
CN116483543A (en) | Task processing method, device, equipment and storage medium | |
CN115643271A (en) | Method, device, server and medium for synchronizing multi-application data on cloud | |
CN109634787B (en) | Distributed file system monitor switching method, device, device and storage medium |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination |