CN113626217A - Asynchronous message processing method and device, electronic equipment and storage medium - Google Patents
Asynchronous message processing method and device, electronic equipment and storage medium Download PDFInfo
- Publication number
- CN113626217A CN113626217A CN202110859560.2A CN202110859560A CN113626217A CN 113626217 A CN113626217 A CN 113626217A CN 202110859560 A CN202110859560 A CN 202110859560A CN 113626217 A CN113626217 A CN 113626217A
- Authority
- CN
- China
- Prior art keywords
- asynchronous message
- target partition
- asynchronous
- target
- time
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Granted
Links
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/54—Interprogram communication
- G06F9/546—Message passing systems or structures, e.g. queues
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/547—Messaging middleware
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F2209/00—Indexing scheme relating to G06F9/00
- G06F2209/54—Indexing scheme relating to G06F9/54
- G06F2209/548—Queue
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Information Transfer Between Computers (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
The disclosure provides an asynchronous message processing method, an asynchronous message processing device, an electronic device and a storage medium, wherein the method comprises the following steps: the method comprises the steps of obtaining asynchronous messages to be sent, determining a target partition in an asynchronous message queue corresponding to the asynchronous messages according to data carried by a set field in the asynchronous messages, and storing the asynchronous messages to the target partition corresponding to the asynchronous messages so that consumers corresponding to the target partition can obtain the asynchronous messages stored in the target partition. According to the method and the device, the corresponding target partition is determined according to the data carried by the set field of the asynchronous message, the updating corresponding to the same data can be stored in the same target partition, so that when a consumer corresponding to the target partition processes the message from the target partition, the old data is prevented from covering the new data, and the consistency of the data of the consumer and the sender is ensured.
Description
Technical Field
The present disclosure relates to the field of information processing technologies, and in particular, to an asynchronous message processing method and apparatus, an electronic device, and a storage medium.
Background
At present, the data transmission mode of asynchronous messages is widely applied to various large-scale systems. The producer places the generated message in a message queue and the consumer listens to the message queue and consumes the data. The coupling degree between different systems can be effectively reduced through an asynchronous message interaction mechanism, the system development complexity is reduced, and the system processing performance is improved through high concurrent processing.
However, in the case of high concurrency, the asynchronous processing manner may cause a problem of data consistency between the producer and the consumer, that is, data acquired by the consumer and data generated by the producer are inconsistent.
Disclosure of Invention
The present disclosure provides an asynchronous message processing method, apparatus, electronic device, and storage medium for guaranteeing consumption order consistency.
According to a first aspect of the embodiments of the present disclosure, there is provided an asynchronous message processing method, including the following steps:
acquiring an asynchronous message to be sent;
determining a target partition in an asynchronous message queue corresponding to the asynchronous message according to data carried by a set field in the asynchronous message;
and storing the asynchronous message to the target partition corresponding to the asynchronous message, so that the asynchronous message stored in the target partition is processed by the consumer corresponding to the target partition.
As an implementation manner, the determining, according to data carried by a set field in the asynchronous message, a target partition in an asynchronous message queue corresponding to the asynchronous message includes:
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the mapping relation between the data carried by the set field and the plurality of partitions in the asynchronous message queue.
As an implementation manner, the determining, according to a mapping relationship between data carried by the set field and a plurality of partitions in the asynchronous message queue, a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions includes:
performing hash operation on the data carried by the set field to obtain a hash value of the data carried by the set field;
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the hash value and the mapping relation.
As an implementation, the method further comprises:
and storing the time for sending the asynchronous message to the target partition corresponding to the asynchronous message and the time for writing the asynchronous message into the target partition corresponding to the asynchronous message so that a consumer corresponding to the target partition processes each asynchronous message in the target partition according to the time for sending the asynchronous message to the target partition and the time for writing the asynchronous message into the target partition corresponding to each asynchronous message stored in the target partition.
As one implementation, the asynchronous message queue is a message queue of a distributed publish-subscribe messaging system.
According to a second aspect of the embodiments of the present disclosure, there is provided another asynchronous message processing method, including the steps of:
acquiring asynchronous messages stored in a target partition of an asynchronous message queue, and sending time of the asynchronous messages to the target partition and writing time of the asynchronous messages into the target partition;
acquiring a target thread corresponding to a set field of the asynchronous message of the target partition in the affinity thread pool;
and processing the asynchronous message by using the target thread according to the time of sending the asynchronous message to the target partition and the time of writing the asynchronous message into the target partition.
As an implementation, the processing, by the target thread, the asynchronous message according to the time when the asynchronous message is sent to the target partition and the time when the asynchronous message is written into the target partition includes:
if the time for sending the asynchronous message to the target partition is earlier than the time for sending the executed asynchronous message to the target partition, acquiring processing parameter information; wherein the processing parameter information is used for processing the asynchronous message;
and under the condition of acquiring the processing parameter information, processing the asynchronous message by using the target thread according to the processing mode indicated in the processing parameter information.
As an implementation manner, after the obtaining of the processing parameter information, the method further includes:
and under the condition that the processing parameter information is not acquired, discarding the asynchronous message by using the target thread.
As an implementation, the processing, by the target thread, the asynchronous message according to the time when the asynchronous message is sent to the target partition and the time when the asynchronous message is written into the target partition includes:
and if the time for sending the asynchronous message to the target partition is later than the time for sending the executed asynchronous message to the target partition, processing the asynchronous message by using the target thread.
As one implementation, the asynchronous message queue is a message queue of a distributed publish-subscribe messaging system.
According to a third aspect of the embodiments of the present disclosure, there is provided an asynchronous message processing apparatus, including:
the acquisition module is configured to acquire an asynchronous message to be sent;
the determining module is configured to determine a target partition in an asynchronous message queue corresponding to the asynchronous message according to data carried by a set field in the asynchronous message;
a storage module configured to store the asynchronous message to the target partition corresponding to the asynchronous message, so that a consumer corresponding to the target partition processes each asynchronous message stored in the target partition.
As one implementation, the determining module is configured to perform:
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the mapping relation between the data carried by the set field and the plurality of partitions in the asynchronous message queue.
As one implementation, the determining module is further configured to perform:
performing hash operation on the data carried by the set field to obtain a hash value of the data carried by the set field;
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the hash value and the mapping relation.
As an implementation manner, the storage module is further configured to execute storing the time for sending the asynchronous message to the target partition corresponding to the asynchronous message and the time for writing the asynchronous message into the target partition corresponding to the asynchronous message, so that the consumer corresponding to the target partition processes each asynchronous message in the target partition according to the time for sending the asynchronous message to the target partition and the time for writing the asynchronous message into the target partition, which correspond to each asynchronous message stored in the target partition.
As one implementation, the asynchronous message queue is a message queue of a distributed publish-subscribe messaging system.
According to a fourth aspect of the embodiments of the present disclosure, there is provided another asynchronous message processing apparatus, including:
a first obtaining module configured to obtain an asynchronous message stored in a target partition of an asynchronous message queue, and a time when the asynchronous message is sent to the target partition and a time when the asynchronous message is written into the target partition;
a second obtaining module configured to execute obtaining of a target thread corresponding to a set field of an asynchronous message of the target partition in an affinity thread pool;
a processing module configured to execute processing of the asynchronous message with the target thread according to a time when the asynchronous message is sent to the target partition and a time when the asynchronous message is written to the target partition.
As one implementation, the processing module is configured to perform:
if the time for sending the asynchronous message to the target partition is earlier than the time for sending the executed asynchronous message to the target partition, acquiring processing parameter information; wherein the processing parameter information is used for processing the asynchronous message;
and under the condition of acquiring the processing parameter information, processing the asynchronous message by using the target thread according to the processing mode indicated in the processing parameter information.
As an implementation, the processing module is further configured to perform:
and under the condition that the processing parameter information is not acquired, discarding the asynchronous message by using the target thread.
As an implementation, the processing module is further configured to perform:
and if the time for sending the asynchronous message to the target partition is later than the time for sending the executed asynchronous message to the target partition, processing the asynchronous message by using the target thread.
As one implementation, the asynchronous message queue is a message queue of a distributed publish-subscribe messaging system.
According to a fifth aspect of embodiments of the present disclosure, there is provided an electronic apparatus including:
a processor; a memory for storing the processor-executable instructions;
wherein the processor is configured to execute the instructions to implement the asynchronous message processing method according to the first aspect or to implement the asynchronous message processing method according to the second aspect.
According to a sixth aspect of embodiments of the present disclosure, there is provided a computer-readable storage medium, wherein instructions of the storage medium, when executed by a processor in an electronic device, enable the electronic device to implement the asynchronous message processing method according to the first aspect, or the asynchronous message processing method according to the second aspect.
According to a seventh aspect of embodiments of the present disclosure, there is provided a computer program product comprising a computer program which, when executed by a processor, implements the asynchronous message processing method according to the first aspect, or implements the asynchronous message processing method according to the second aspect.
The technical scheme provided by the embodiment of the disclosure has the following beneficial effects:
the method comprises the steps of obtaining an asynchronous message to be sent, determining a target partition in an asynchronous message queue corresponding to the asynchronous message according to data carried by a set field in the asynchronous message, and storing the asynchronous message to the target partition corresponding to the asynchronous message, so that a consumer corresponding to the target partition processes each asynchronous message stored in the target partition. According to the method and the device, the corresponding target partition is determined according to the data carried by the set field in the asynchronous message, the updating corresponding to the same data can be stored in the same target partition, so that when a consumer corresponding to the target partition processes the message from the target partition, the old data is prevented from covering the new data, and the consistency of the data of the consumer and the sender is ensured.
It is to be understood that both the foregoing general description and the following detailed description are exemplary and explanatory only and are not restrictive of the disclosure.
Drawings
The accompanying drawings, which are incorporated in and constitute a part of this specification, illustrate embodiments consistent with the invention and together with the description, serve to explain the principles of the invention.
Fig. 1 is a flowchart illustration of an asynchronous message processing method provided by an embodiment of the present disclosure;
fig. 2 is a schematic flow chart of another asynchronous message processing method provided in the embodiment of the present disclosure;
fig. 3 is a schematic flowchart of another asynchronous message processing method provided in the embodiment of the present disclosure;
fig. 4 is a schematic structural diagram of an asynchronous message processing apparatus according to an embodiment of the present disclosure;
fig. 5 is a schematic structural diagram of another asynchronous message processing apparatus according to an embodiment of the present disclosure;
fig. 6 is a block diagram of an electronic device 10 according to an embodiment of the present disclosure.
Detailed Description
In order to make the technical solutions of the present disclosure better understood by those of ordinary skill in the art, the technical solutions in the embodiments of the present disclosure will be clearly and completely described below with reference to the accompanying drawings.
It should be noted that the terms "first," "second," and the like in the description and claims of the present disclosure and in the above-described drawings are used for distinguishing between similar elements and not necessarily for describing a particular sequential or chronological order. It is to be understood that the data so used is interchangeable under appropriate circumstances such that the embodiments of the disclosure described herein are capable of operation other than as illustrated or otherwise described herein
In sequences other than those described. The implementations described in the exemplary embodiments below are not intended to represent all implementations consistent with the present disclosure. Rather, they are merely examples of apparatus and methods consistent with certain aspects of the present disclosure, as detailed in the appended claims.
Fig. 1 is a flowchart illustration of an asynchronous message processing method according to an embodiment of the disclosure.
As shown in fig. 1, the method comprises the following steps:
To facilitate understanding of the embodiments of the present disclosure, the asynchronous message processing framework of the embodiments of the present disclosure is described first, and the present disclosure includes a sender, a consumer, an asynchronous message, and an asynchronous message queue.
A sender: i.e., the provider, sender of the asynchronous message, note that in the art, the nature here generally refers to a node rather than a natural person.
Asynchronous messages: which may be data or management instructions, etc.
An asynchronous message queue: message queues are communication means for asynchronously processing a series of incoming messages, and message queues may be implemented by a message queue server.
The consumer side: i.e. the user, recipient of the message, it is noted that in the art, the nature here generally refers to a node and not to a natural person.
In the embodiment of the disclosure, the sending party generates an asynchronous message to be sent according to the service requirement, for example, if the data a in the database is modified, the asynchronous message is generated according to the modified type and the updated data, and is sent to the asynchronous message queue for storage.
And step 102, determining a target partition in an asynchronous message queue corresponding to the asynchronous message according to data carried by a set field in the asynchronous message.
In this embodiment, data carried in a set field in the asynchronous message is used to determine a keyword of the data, and the keyword may be used to uniquely identify the data. For example, an Identity document (Id) corresponding field. And the sender determines a target partition for data storage according to the set field of the data carried in the asynchronous message and the corresponding relation between the data carried in the set field and each partition in the asynchronous message queue.
It should be noted that, in the embodiment of the present disclosure, a plurality of partitions are divided in the asynchronous message queue, and each partition is used for storing a plurality of asynchronous messages of one data, that is, the update information related to one data is stored in one partition. The corresponding relation between the data carried by the set field and the target partition can be pre-established, so that the target partition for storing the asynchronous message is determined according to the obtained asynchronous message based on the corresponding relation.
And 103, storing the asynchronous messages to the target partitions corresponding to the asynchronous messages, so that the consumers corresponding to the target partitions process the asynchronous messages stored in the target partitions.
In this embodiment, according to the determined target partition corresponding to the asynchronous message, the asynchronous message is stored in the target partition corresponding to the asynchronous message, so that the consumer corresponding to the target partition sequentially consumes the asynchronous message from the asynchronous message stored in the target partition, thereby ensuring consistency between the order in which the consumer consumes the asynchronous message and the order in which the sender generates the asynchronous message, and further ensuring consistency between the data stored by the consumer and the sender.
In the asynchronous message processing method of the embodiment of the disclosure, an asynchronous message to be sent is acquired, a target partition in an asynchronous message queue corresponding to the asynchronous message is determined according to data carried by a set field in the asynchronous message, and the asynchronous message is stored in the target partition corresponding to the asynchronous message, so that a consumer corresponding to the target partition acquires the asynchronous message stored in the target partition. According to the method and the device, the corresponding target partition is determined according to the data carried by the set field of the asynchronous message, the updating corresponding to the same data can be stored in the same target partition, so that when a consumer corresponding to the target partition processes the message from the target partition, the old data is prevented from covering the new data, and the consistency of the data of the consumer and the sender is ensured.
Based on the foregoing embodiment, this embodiment provides another asynchronous message processing method, and fig. 2 is a schematic flow chart of the another asynchronous message processing method provided in the embodiment of the present disclosure, as shown in fig. 2, step 102 includes the following steps:
Specifically, reference may be made to the description in the foregoing method embodiments, which are not repeated in this embodiment.
In the embodiment of the present disclosure, mapping relationships between each asynchronous message and a plurality of partitions in an asynchronous message queue are established in advance, and all changes corresponding to the same data can be stored in the same partition according to the mapping relationships and the data carried by the set fields in the asynchronous messages.
As a possible implementation manner, hash processing is performed on data carried by a set field in the asynchronous message to obtain a hash value of the preset field. Wherein the hash value indicates a correspondence of the asynchronous message and the target partition.
In this embodiment, hash processing is performed on a set field carrying data in an asynchronous message, the set field is mapped to a hash value corresponding to the set field through a hash function, where the hash value includes a keyword of the asynchronous message, a partition corresponding to the keyword is determined according to a pre-established correspondence between the keyword and a stored partition, and the partition is used as a target partition for storing the asynchronous message.
And further, determining a target partition corresponding to the hash value of the asynchronous message from a plurality of partitions in the asynchronous message queue.
As one implementation, the asynchronous message queue is a kafka message queue based on a publish/subscribe schema.
In this embodiment, according to the hash value of the asynchronous message, the corresponding relationship between the hash value of the asynchronous message and the partition is searched to determine the target partition corresponding to the asynchronous message from the multiple partitions in the asynchronous message queue, so that when there is an update, such as deletion, addition, reduction, and the like, of data carried by the asynchronous message, the keyword of the asynchronous message may be determined according to a set field in the data carried by the asynchronous message, and the target partition corresponding to the asynchronous message is determined according to the keyword of the asynchronous message, that is, the update for the same data is all stored in the same target partition. For example, according to a set field carrying original data a in the asynchronous message 1, a hash value obtained after hash processing is a keyword of the data a in the asynchronous message, and is marked as X to identify the data a, where the asynchronous message 1 carries data B after the data a is newly added; the asynchronous message 2 carries the data C after the data B is updated, and according to the set field carrying the data in the asynchronous message 2, the keyword determined after the hash processing is also X, that is, the keywords corresponding to the data carried in the asynchronous message 1 and the asynchronous message 2 are the same, the asynchronous message 1 and the asynchronous message 2 are indicated to be generated based on the update of the same original data A, so that the asynchronous message 1 and the asynchronous message 2 can be stored in the same target partition, and the asynchronous message after the data corresponding to the same keyword is updated can be stored in the same target partition.
In the embodiment of the disclosure, the asynchronous messages are stored in the target partitions corresponding to the asynchronous messages, so that the same data can be updated and stored in the same target partition, the time for sending the asynchronous messages to the target partitions and the time for writing the asynchronous messages into the target partitions are stored in the target partitions, when a consumer reads the asynchronous messages in the target partitions, the asynchronous messages stored in the target partitions are read according to the time sequence for writing the asynchronous messages into the target partitions, and for each read asynchronous message, the time for sending the asynchronous message to the target partitions and the time for sending the asynchronous message to the target partitions after the asynchronous message is consumed are compared to determine whether the asynchronous message can be processed or not. Therefore, the unexpired asynchronous messages are ensured to be processed, the consistency of the sequence of the messages consumed by the consumer and the sequence of the messages generated by the sender when the consumer consumes the messages from the same target partition is ensured, and the consistency of the data stored by the consumer and the sender is further ensured.
For example, 2 asynchronous messages, denoted as asynchronous message a and asynchronous message B, are stored in the target partition X, where the asynchronous message a is sent to the target partition X earlier than the asynchronous message B is sent to the target partition X. However, the time for writing the asynchronous message a into the target partition is later than the time for writing the asynchronous message B into the target partition, so that the consumer reads the asynchronous message B first and then reads the asynchronous message a according to the time sequence for writing the asynchronous message in the target partition X, and since the time for sending the asynchronous message a into the target partition X is earlier than the time for sending the asynchronous message B into the target partition X, that is, the time for generating the asynchronous message a is earlier than the time for generating the asynchronous message B, after the consumer reads the asynchronous message B, the consumer reads the delayed asynchronous message a, and the asynchronous message a expires, the asynchronous message a is not consumed, wherein the asynchronous message a may be directly discarded, or corresponding processing is performed according to a preset rule, which will be described in the following embodiments.
In the asynchronous message processing method of this embodiment, an asynchronous message to be sent is acquired, hash processing is performed on data carried by a set field in the asynchronous message to obtain a hash value of a preset field, the hash value indicates a correspondence between the asynchronous message and a target partition, the target partition corresponding to the hash value of the asynchronous message is determined from a plurality of partitions of an asynchronous message queue, and the target partitions are stored in the same target partition for updating the same data.
To implement the above embodiments, the present embodiment provides another asynchronous message processing method, which is performed by a consumer. Fig. 3 is a schematic flowchart of another asynchronous message processing method provided in the embodiment of the present disclosure, and as shown in fig. 3, the method includes the following steps:
In the embodiment of the invention, a plurality of target partitions are arranged in the asynchronous message queue, each target partition stores different asynchronous messages corresponding to one piece of data, the generation time of the different asynchronous messages is different, and the asynchronous messages in each target partition are arranged according to the time sequence written into the target partition. The asynchronous message in each target partition is executed by a corresponding consumer, that is, the consumer can obtain the asynchronous message in the corresponding target partition.
In this embodiment, the affinity thread pool may ensure that the asynchronous messages carrying the same keywords corresponding to the setting fields of the data in the asynchronous messages are executed according to the time sequence in which the asynchronous messages are written into the target partition.
In this embodiment, the control target thread pool sequentially reads the asynchronous messages in the target partition according to the time sequence of the asynchronous messages written in the target partition, and compares the time of sending the currently read asynchronous message to the target partition with the time of sending the asynchronous message, which has been executed before the asynchronous message, to the target partition, to determine whether the asynchronous message is an executable asynchronous message, that is, to determine whether the current asynchronous message is an expired message.
In an implementation manner of this embodiment, if a time when a currently acquired asynchronous message is sent to a target partition is earlier than a time when an executed asynchronous message is sent to the target partition, it indicates that a time when the currently acquired asynchronous message is generated is earlier than a time when the executed asynchronous message is generated, and in order to avoid that old data caused by executing the asynchronous message overwrites new data, so that a situation that data acquired by a consumer and data after current update of a sender are inconsistent exists, the currently acquired asynchronous message that has expired is discarded.
In an implementation manner of this embodiment, in order to meet requirements of different service scenarios, when it is determined that a currently acquired asynchronous message is an expired asynchronous message, the currently acquired asynchronous message may also be processed by a consumer, specifically, if a time when the currently acquired asynchronous message is sent to a target partition is earlier than a time when an executed asynchronous message is sent to the target partition, interface parameter information is acquired, where the asynchronous message and the executed asynchronous message are both stored in the target partition and are asynchronous messages corresponding to the same data update. And the control target thread processes the currently acquired asynchronous message according to the processing mode indicated in the parameter information. It should be noted that, different service scenarios and different processing manners of the asynchronous message determined to have expired are different, for example, the consumer may store the asynchronous message determined to have expired in another storage unit for analyzing the reason of the expiration with respect to a specific scenario. In the embodiment, the processing modes in different service scenes are not limited, and the service requirements in different scenes are met.
In another implementation manner of this embodiment, if the time for sending the asynchronous message to the target partition is later than the time for sending the executed asynchronous message to the target partition, that is, the time for generating the currently acquired asynchronous message is later than the time for generating the executed asynchronous message, the target thread is controlled to process the asynchronous message, that is, the currently acquired asynchronous message is an asynchronous message that is not out of date, the consumer may perform corresponding processing on the data indicated by the asynchronous message according to the update type of the data carried in the asynchronous message, for example, whether the asynchronous message is added or subtracted, or deleted, since the time for actually writing the asynchronous message into the target partition is different from the time for sending each asynchronous message due to the factors such as the amount of data to be transmitted and the transmission delay of each asynchronous message sent to the target partition, by comparing the time for sending the asynchronous message to the target partition with the time for sending the executed asynchronous message to the target partition, the method and the device have the advantage that the final consistency of asynchronous message consumption can be ensured under the condition that the asynchronous messages in the target partition cannot be executed according to the time sequence of sending.
In the asynchronous message processing method of this embodiment, an asynchronous message stored in a target partition of an asynchronous message queue is obtained, a time for sending the asynchronous message to the target partition corresponding to the asynchronous message and a time for writing the asynchronous message are obtained, a target thread in a affinity thread pool corresponding to a set field of the asynchronous message of the target partition is obtained, the target thread is controlled to process the asynchronous message according to the time for sending the asynchronous message to the target partition corresponding to the asynchronous message and the time for writing the asynchronous message, the time for actually writing the asynchronous message to the target partition is different from the time for sending each asynchronous message due to factors such as transmission data amount and transmission delay of each asynchronous message sent to the target partition, the time for sending the asynchronous message to the target partition corresponding to the asynchronous message is compared with the time for sending the executed asynchronous message to the target partition based on the affinity thread pool, the method and the device have the advantage that the final consistency of asynchronous message consumption can be ensured under the condition that the asynchronous messages in the target partition cannot be executed according to the time sequence of sending.
In order to implement the above embodiments, the present embodiment provides an asynchronous message processing apparatus.
Fig. 4 is a schematic structural diagram of an asynchronous message processing apparatus according to an embodiment of the present disclosure, and as shown in fig. 4, the asynchronous message processing apparatus includes:
an obtaining module 41 configured to perform obtaining an asynchronous message to be sent.
And the determining module 42 is configured to determine the target partition in the asynchronous message queue corresponding to the asynchronous message according to the data carried by the set field in the asynchronous message.
A storage module 43 configured to execute storing the asynchronous message to the target partition corresponding to the asynchronous message, so that the consumer corresponding to the target partition processes each asynchronous message stored in the target partition.
Further, in an implementation manner of the present embodiment, the determining module 42 is further configured to perform:
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the mapping relation between the data carried by the set field and the plurality of partitions in the asynchronous message queue.
In one implementation manner of this embodiment, the determining module 42 is further configured to perform:
performing hash operation on the data carried by the set field to obtain a hash value of the data carried by the set field;
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the hash value and the mapping relation.
In an implementation manner of this embodiment, the storage module 43 is further configured to execute storing the time for sending the asynchronous message to the target partition corresponding to the asynchronous message and the time for writing the asynchronous message into the target partition corresponding to the asynchronous message, so that the consumer corresponding to the target partition processes each asynchronous message in the target partition according to the time for sending the asynchronous message to the target partition and the time for writing the asynchronous message into the target partition corresponding to each asynchronous message stored in the target partition.
In one implementation of this embodiment, the asynchronous message queue is a message queue of a distributed publish-subscribe messaging system.
It should be noted that the foregoing explanation of the method embodiment is also applicable to the apparatus of the present embodiment, and the principle is the same, and the present embodiment is not limited thereto.
In the asynchronous message processing apparatus according to the embodiment of the present disclosure, an asynchronous message to be sent is acquired, a target partition in an asynchronous message queue corresponding to the asynchronous message is determined according to data carried in a set field in the asynchronous message, and the asynchronous message is stored in the target partition corresponding to the asynchronous message, so that a consumer corresponding to the target partition acquires the asynchronous message stored in the target partition. According to the method and the device, the corresponding target partition is determined according to the set field of the data carried by the asynchronous message, and the data updated by the same data can be stored in the same target partition, so that the consistency of the consumption sequence of a consumer and the data generated by a sender when the consumer corresponding to the target partition sequentially obtains the asynchronous message from the target partition is ensured, and the consistency of the data stored by the consumer and the sender is further ensured.
In order to implement the above embodiments, the present embodiment provides another asynchronous message processing apparatus.
Fig. 5 is a schematic structural diagram of another asynchronous message processing apparatus provided in an embodiment of the present disclosure, and as shown in fig. 5, the apparatus includes:
a first obtaining module 51 configured to perform obtaining of the asynchronous message stored in the target partition of the asynchronous message queue, and the time when the asynchronous message is sent to the target partition and the time when the asynchronous message is written into the target partition.
And a second obtaining module 52 configured to perform obtaining of a target thread in the affinity thread pool corresponding to the set field of the asynchronous message of the target partition.
A processing module 53 configured to execute processing of the asynchronous message by the target thread according to the time when the asynchronous message is sent to the target partition and the time when the asynchronous message is written into the target partition.
Further, in an implementation manner of this embodiment, the processing module 53 is further configured to perform:
if the time for sending the asynchronous message to the target partition is earlier than the time for sending the executed asynchronous message to the target partition, acquiring processing parameter information; wherein the processing parameter information is used for processing the asynchronous message;
and under the condition of acquiring the processing parameter information, processing the asynchronous message by using the target thread according to the processing mode indicated in the processing parameter information.
In an implementation manner of this embodiment, the processing module 53 is further configured to discard the asynchronous message by using the target thread if the processing parameter information is not acquired.
In an implementation manner of this embodiment, the processing module 53 is further configured to execute processing the asynchronous message by using the target thread if the time of sending the asynchronous message to the target partition is later than the time of sending the executed asynchronous message to the target partition.
In one implementation of this embodiment, the asynchronous message queue is a message queue of a distributed publish-subscribe messaging system.
With regard to the apparatus in the above-described embodiment, the specific manner in which each module performs the operation has been described in detail in the embodiment related to the method, and will not be elaborated here.
In the asynchronous message processing apparatus of this embodiment, an asynchronous message stored in a target partition of an asynchronous message queue, a time when the asynchronous message is sent to the target partition and a time when the asynchronous message is written into the target partition are obtained, a target thread corresponding to a set field of the asynchronous message of the target partition in an affinity thread pool is obtained, the target thread is controlled, the asynchronous message is processed according to the time when the asynchronous message is sent to the target partition and the time when the asynchronous message is written into the target partition, because the time when the asynchronous message is actually written into the target partition is different from the time when the asynchronous message is sent to the target partition due to factors such as a transmission data amount and a transmission delay, the time when the asynchronous message is sent to the target partition corresponding to the asynchronous message is compared with the time when the asynchronous message is sent to the target partition corresponding to the executed asynchronous message based on the affinity thread pool, the method and the device have the advantage that the final consistency of asynchronous message consumption can be ensured under the condition that the asynchronous messages in the target partition cannot be executed according to the time sequence of sending.
In order to implement the above embodiments, an embodiment of the present disclosure provides an electronic device, including:
a processor; a memory for storing the processor-executable instructions; wherein the processor is configured to execute the instructions to implement the asynchronous message processing method as described in the aforementioned method embodiments.
To implement the above embodiments, the present disclosure provides a computer-readable storage medium, wherein instructions of the storage medium, when executed by a processor in an electronic device, enable the electronic device to implement the asynchronous message processing method as described in the foregoing method embodiments.
To implement the above embodiments, the present disclosure provides a computer program product comprising a computer program, which when executed by a processor implements the asynchronous message processing method as described in the foregoing method embodiments.
Fig. 6 is a block diagram of an electronic device 10 according to an embodiment of the present disclosure. The electronic device shown in fig. 6 is only an example, and should not bring any limitation to the functions and the scope of use of the embodiments of the present disclosure.
As shown in fig. 6, the electronic device 10 includes a processor 11, which can perform various appropriate actions and processes according to a program stored in a Read Only Memory (ROM) 12 or a program loaded from a Memory 16 into a Random Access Memory (RAM) 13. In the RAM 13, various programs and data necessary for the operation of the electronic apparatus 10 are also stored. The processor 11, the ROM 12, and the RAM 13 are connected to each other via a bus 14. An Input/Output (I/O) interface 15 is also connected to the bus 14.
The following components are connected to the I/O interface 15: a memory 16 including a hard disk and the like; and a communication section 17 including a Network interface card such as a LAN (Local Area Network) card, a modem, or the like, the communication section 17 performing communication processing via a Network such as the internet; a drive 18 is also connected to the I/O interface 15 as necessary.
In particular, according to an embodiment of the present disclosure, the processes described above with reference to the flowcharts may be implemented as computer software programs. For example, embodiments of the present disclosure include a computer program, carried on a computer readable medium, containing program code for performing the method illustrated in the flow chart. In such an embodiment, the computer program can be downloaded and installed from a network through the communication section 17. The computer program, when executed by the processor 11, performs the above-described functions defined in the method of the present disclosure.
In an exemplary embodiment, there is also provided a storage medium comprising instructions, such as the memory 16 comprising instructions, executable by the processor 11 of the electronic device 10 to perform the above-described method. Alternatively, the storage medium may be a non-transitory computer readable storage medium, which may be, for example, a ROM, a Random Access Memory (RAM), a CD-ROM, a magnetic tape, a floppy disk, an optical data storage device, and the like.
In the present disclosure, a computer readable storage medium may be any tangible medium that can contain, or store a program for use by or in connection with an instruction execution system, apparatus, or device. In contrast, in the present disclosure, a computer-readable signal medium may include a propagated data signal with computer-readable program code embodied therein, for example, in baseband or as part of a carrier wave. Such a propagated data signal may take many forms, including, but not limited to, electro-magnetic, optical, or any suitable combination thereof. A computer readable signal medium may also be any computer readable medium that is not a computer readable storage medium and that can communicate, propagate, or transport a program for use by or in connection with an instruction execution system, apparatus, or device. Program code embodied on a computer readable medium may be transmitted using any appropriate medium, including but not limited to: wireless, wire, fiber optic cable, RF, etc., or any suitable combination of the foregoing.
Other embodiments of the invention will be apparent to those skilled in the art from consideration of the specification and practice of the invention disclosed herein. This disclosure is intended to cover any variations, uses, or adaptations of the invention following, in general, the principles of the invention and including such departures from the present disclosure as come within known or customary practice within the art to which the invention pertains. It is intended that the specification and examples be considered as exemplary only, with a true scope and spirit of the invention being indicated by the following claims.
It will be understood that the invention is not limited to the precise arrangements described above and shown in the drawings and that various modifications and changes may be made without departing from the scope thereof. The scope of the invention is limited only by the appended claims.
Claims (10)
1. An asynchronous message processing method, comprising:
acquiring an asynchronous message to be sent;
determining a target partition in an asynchronous message queue corresponding to the asynchronous message according to data carried by a set field in the asynchronous message;
and storing the asynchronous messages to the target partitions corresponding to the asynchronous messages, so that the consumers corresponding to the target partitions process the asynchronous messages stored in the target partitions.
2. The method of claim 1, wherein the determining the target partition in the asynchronous message queue corresponding to the asynchronous message according to the data carried in the set field of the asynchronous message comprises:
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the mapping relation between the data carried by the set field and the plurality of partitions in the asynchronous message queue.
3. The method of claim 2, wherein the determining, from the plurality of partitions, the target partition corresponding to the data of the set field in the asynchronous message according to the mapping relationship between the data carried by the set field and the plurality of partitions in the asynchronous message queue comprises:
performing hash operation on the data carried by the set field to obtain a hash value of the data carried by the set field;
and determining a target partition corresponding to the data of the set field in the asynchronous message from the plurality of partitions according to the hash value and the mapping relation.
4. The method of any one of claims 1-3, further comprising:
and storing the time for sending the asynchronous message to the target partition corresponding to the asynchronous message and the time for writing the asynchronous message into the target partition corresponding to the asynchronous message so that a consumer corresponding to the target partition processes each asynchronous message in the target partition according to the time for sending the asynchronous message to the target partition and the time for writing the asynchronous message into the target partition corresponding to each asynchronous message stored in the target partition.
5. An asynchronous message processing method, comprising:
acquiring asynchronous messages stored in a target partition of an asynchronous message queue, and sending time of the asynchronous messages to the target partition and writing time of the asynchronous messages into the target partition;
acquiring a target thread corresponding to a set field of the asynchronous message of the target partition in the affinity thread pool;
and processing the asynchronous message by using the target thread according to the time of sending the asynchronous message to the target partition and the time of writing the asynchronous message into the target partition.
6. An asynchronous message processing apparatus, comprising:
the acquisition module is configured to acquire an asynchronous message to be sent;
the determining module is configured to determine a target partition in an asynchronous message queue corresponding to the asynchronous message according to data carried by a set field in the asynchronous message;
a storage module configured to store the asynchronous message to the target partition corresponding to the asynchronous message, so that a consumer corresponding to the target partition processes each asynchronous message stored in the target partition.
7. An asynchronous message processing apparatus, comprising:
a first obtaining module configured to obtain an asynchronous message stored in a target partition of an asynchronous message queue, and a time when the asynchronous message is sent to the target partition and a time when the asynchronous message is written into the target partition;
a second obtaining module configured to execute obtaining of a target thread corresponding to a set field of an asynchronous message of the target partition in an affinity thread pool;
a processing module configured to execute processing of the asynchronous message with the target thread according to a time when the asynchronous message is sent to the target partition and a time when the asynchronous message is written to the target partition.
8. An electronic device, comprising:
a processor;
a memory for storing the processor-executable instructions;
wherein the processor is configured to execute the instructions to implement the asynchronous message processing method of any one of claims 1 to 4 or to implement the asynchronous message processing method of claim 5.
9. A computer-readable storage medium, wherein instructions in the storage medium, when executed by a processor in an electronic device, enable the electronic device to perform the asynchronous message processing method of any of claims 1 to 4 or the asynchronous message processing method of claim 5.
10. A computer program product comprising a computer program, characterized in that the computer program realizes the asynchronous message processing method as claimed in any one of claims 1 to 4, or the asynchronous message processing method as claimed in claim 5, when executed by a processor.
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202110859560.2A CN113626217B (en) | 2021-07-28 | 2021-07-28 | Asynchronous message processing method, device, electronic equipment and storage medium |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN202110859560.2A CN113626217B (en) | 2021-07-28 | 2021-07-28 | Asynchronous message processing method, device, electronic equipment and storage medium |
Publications (2)
| Publication Number | Publication Date |
|---|---|
| CN113626217A true CN113626217A (en) | 2021-11-09 |
| CN113626217B CN113626217B (en) | 2024-08-13 |
Family
ID=78381397
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN202110859560.2A Active CN113626217B (en) | 2021-07-28 | 2021-07-28 | Asynchronous message processing method, device, electronic equipment and storage medium |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN113626217B (en) |
Cited By (3)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN114385380A (en) * | 2021-12-17 | 2022-04-22 | 北京达佳互联信息技术有限公司 | Message processing method, device, electronic device and storage medium |
| CN114579330A (en) * | 2022-02-17 | 2022-06-03 | 百果园技术(新加坡)有限公司 | Sequential message processing method, device, equipment and storage medium |
| CN115221824A (en) * | 2022-07-18 | 2022-10-21 | 北京极光星通科技有限公司 | Asynchronous reconstruction method and device and computer equipment |
Citations (7)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN106095589A (en) * | 2016-06-30 | 2016-11-09 | 浪潮软件集团有限公司 | Partition allocation method, device and system |
| CN109407975A (en) * | 2018-09-19 | 2019-03-01 | 华为技术有限公司 | Data writing method and calculate node and distributed memory system |
| CN110008041A (en) * | 2019-03-27 | 2019-07-12 | 北京奇艺世纪科技有限公司 | A kind of message treatment method and device |
| US20200348851A1 (en) * | 2018-01-16 | 2020-11-05 | Huawei Technologies Co., Ltd. | Data processing method and apparatus |
| CN112579595A (en) * | 2020-12-01 | 2021-03-30 | 北京三快在线科技有限公司 | Data processing method and device, electronic equipment and readable storage medium |
| CN112650759A (en) * | 2020-12-30 | 2021-04-13 | 中国平安人寿保险股份有限公司 | Data query method and device, computer equipment and storage medium |
| CN112988423A (en) * | 2021-03-19 | 2021-06-18 | 北京京东拓先科技有限公司 | Message consumption and message distribution method, device, server and storage medium |
-
2021
- 2021-07-28 CN CN202110859560.2A patent/CN113626217B/en active Active
Patent Citations (7)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN106095589A (en) * | 2016-06-30 | 2016-11-09 | 浪潮软件集团有限公司 | Partition allocation method, device and system |
| US20200348851A1 (en) * | 2018-01-16 | 2020-11-05 | Huawei Technologies Co., Ltd. | Data processing method and apparatus |
| CN109407975A (en) * | 2018-09-19 | 2019-03-01 | 华为技术有限公司 | Data writing method and calculate node and distributed memory system |
| CN110008041A (en) * | 2019-03-27 | 2019-07-12 | 北京奇艺世纪科技有限公司 | A kind of message treatment method and device |
| CN112579595A (en) * | 2020-12-01 | 2021-03-30 | 北京三快在线科技有限公司 | Data processing method and device, electronic equipment and readable storage medium |
| CN112650759A (en) * | 2020-12-30 | 2021-04-13 | 中国平安人寿保险股份有限公司 | Data query method and device, computer equipment and storage medium |
| CN112988423A (en) * | 2021-03-19 | 2021-06-18 | 北京京东拓先科技有限公司 | Message consumption and message distribution method, device, server and storage medium |
Cited By (3)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN114385380A (en) * | 2021-12-17 | 2022-04-22 | 北京达佳互联信息技术有限公司 | Message processing method, device, electronic device and storage medium |
| CN114579330A (en) * | 2022-02-17 | 2022-06-03 | 百果园技术(新加坡)有限公司 | Sequential message processing method, device, equipment and storage medium |
| CN115221824A (en) * | 2022-07-18 | 2022-10-21 | 北京极光星通科技有限公司 | Asynchronous reconstruction method and device and computer equipment |
Also Published As
| Publication number | Publication date |
|---|---|
| CN113626217B (en) | 2024-08-13 |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| US11188380B2 (en) | Method and apparatus for processing task in smart device | |
| CN113626217B (en) | Asynchronous message processing method, device, electronic equipment and storage medium | |
| CN109388626B (en) | Method and apparatus for assigning numbers to services | |
| US20220138074A1 (en) | Method, electronic device and computer program product for processing data | |
| CN112948138B (en) | A method and device for processing messages | |
| CN111831748A (en) | Data synchronization method, device and storage medium | |
| CN112181477B (en) | Complex event processing method and device and terminal equipment | |
| CN108933695B (en) | Method and apparatus for processing information | |
| CN112199007A (en) | Menu display method and device, electronic equipment and storage medium | |
| CN113779422B (en) | Method and device for realizing relationship chain label, electronic equipment and storage medium | |
| CN112579695A (en) | Data synchronization method and device | |
| CN113312553A (en) | Method and device for determining user label | |
| CN116647532A (en) | Mail management method, device, server and medium based on knowledge graph | |
| CN110399393B (en) | Data processing method, device, medium and electronic equipment | |
| US20180343216A1 (en) | Context driven modification of attachments in a messaging session | |
| CN111159142A (en) | Data processing method and device | |
| CN113886485A (en) | Data processing method, apparatus, electronic device, system and storage medium | |
| CN114553947B (en) | Method and device for processing message | |
| CN115904369A (en) | Method and system for efficient aggregation and correlation analysis of network security source data | |
| CN113076186B (en) | Task processing method, device, electronic equipment and storage medium | |
| CN117950850A (en) | Data transmission method, device, electronic equipment and computer readable medium | |
| CN117032467B (en) | Method, device, electronic equipment and storage medium for interaction with chat robot | |
| CN113761052A (en) | Database synchronization method and device | |
| CN110322350B (en) | Method, device, equipment and storage medium for cutting hollow block in consensus network | |
| US20180137066A1 (en) | Aggregation handling |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| GR01 | Patent grant | ||
| GR01 | Patent grant |