CN109379443B - Method for realizing distributed message queue facing to Internet of things - Google Patents
Method for realizing distributed message queue facing to Internet of things Download PDFInfo
- Publication number
- CN109379443B CN109379443B CN201811548642.XA CN201811548642A CN109379443B CN 109379443 B CN109379443 B CN 109379443B CN 201811548642 A CN201811548642 A CN 201811548642A CN 109379443 B CN109379443 B CN 109379443B
- Authority
- CN
- China
- Prior art keywords
- message
- receiving end
- agent
- connection
- sent
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
- 238000000034 method Methods 0.000 title claims abstract description 33
- 230000006854 communication Effects 0.000 claims abstract description 20
- 238000004891 communication Methods 0.000 claims abstract description 19
- 230000000977 initiatory effect Effects 0.000 claims abstract description 4
- 230000002688 persistence Effects 0.000 claims description 4
- 238000012545 processing Methods 0.000 claims description 3
- 238000013461 design Methods 0.000 description 3
- 238000010586 diagram Methods 0.000 description 2
- 230000006870 function Effects 0.000 description 2
- 230000009286 beneficial effect Effects 0.000 description 1
- 230000005540 biological transmission Effects 0.000 description 1
- 230000007547 defect Effects 0.000 description 1
- 238000005516 engineering process Methods 0.000 description 1
- 230000002085 persistent effect Effects 0.000 description 1
- 238000012546 transfer Methods 0.000 description 1
Images
Classifications
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/01—Protocols
- H04L67/10—Protocols in which an application is distributed across nodes in the network
- H04L67/104—Peer-to-peer [P2P] networks
- H04L67/1044—Group management mechanisms
- H04L67/1046—Joining mechanisms
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/50—Network services
- H04L67/56—Provisioning of proxy services
Landscapes
- Engineering & Computer Science (AREA)
- Computer Networks & Wireless Communication (AREA)
- Signal Processing (AREA)
- Physics & Mathematics (AREA)
- Computing Systems (AREA)
- Mathematical Physics (AREA)
- Theoretical Computer Science (AREA)
- Computer And Data Communications (AREA)
Abstract
The invention discloses a realization method of a distributed message queue facing to the Internet of things, wherein a distributed message queue system comprises a message sending end, a message agent and a message receiving end, and the message sending end and the message receiving end, the message sending end/the message receiving end and the message agent are in communication connection in a long-connection mixed short-connection mode, and the realization method comprises the following steps: judging whether a message request needs to be sent to a message receiving end, if so, initiating P2P connection according to the address of the message receiving end; judging whether the P2P connection is successful, if not, establishing a TURN connection with a message receiving end through a message agent; the message is sent to the message receiving end through the message agent, on the basis of realizing the message queue function, the system costs less flow cost, simultaneously provides higher terminal equipment access capacity, and meets the requirement of large number of terminals of the Internet of things.
Description
Technical Field
The invention relates to the technical field of computers, in particular to a method for realizing a distributed message queue facing to the Internet of things.
Background
The distributed message queue is a common method for communication and decoupling among submodules in a large-scale system, a unified communication mode is provided in the system, the reliability of the system is improved, the flexibility of system design is improved, and higher peak processing capacity is provided for the system. The distributed nature adds scalability to the message queue, and also introduces design difficulties. A common design scheme of a distributed message queue is shown in fig. 1, and includes a plurality of producers (APP in the figure), message brokers (Broker, Broker Leader in the figure is a central node of the message Broker), and consumers (APP in the figure), where the message brokers provide core functions of forwarding and persisting messages for the producers and consumers, the producers issue messages to the message brokers, the consumers subscribe and consume messages to the brokers, and the producers/consumers and the message brokers are connected through TCP (Transmission Control Protocol).
Generally, in the existing system, the producer, the consumer and the message broker are in the same local area network, there is no limitation on bandwidth, and it is possible to forward all messages between the producer and the consumer through the message broker, but in the internet of things, a considerable part of the producer, the consumer and the message broker are in different local area networks, and if the existing scheme is adopted, a large amount of paid bandwidth is generated in the terminal device and the message broker undoubtedly.
Disclosure of Invention
Aiming at the defects of the prior art, the invention provides a method for realizing a distributed message queue facing to the Internet of things, which effectively solves the technical problem that a large amount of paid bandwidth is generated in the prior art.
In order to achieve the purpose, the invention is realized by the following technical scheme:
a distributed message queue implementation method facing to the Internet of things, wherein a distributed message queue system comprises a message sending end, a message agent and a message receiving end, the message sending end and the message receiving end, and the message sending end/the message receiving end and the message agent are in communication connection in a long-connection mixed short-connection mode, and the implementation method is applied to the message sending end and comprises the following steps:
judging whether a message request needs to be sent to a message receiving end, if so, initiating P2P connection according to the address of the message receiving end;
judging whether the P2P connection is successful, if not, establishing a TURN connection with a message receiving end through a message agent;
and sending the message to a message receiving end through the message agent.
Further preferably, in the step of determining whether the P2P connection is successful, if the P2P connection is successful, the message is directly sent to the message receiving end.
Further preferably, after sending the message to the message receiving end, the method further includes:
and judging whether the communication with the message agent/message receiving end is not carried out within a preset time period, if so, disconnecting the communication connection with the message agent/message receiving end.
Further preferably, the step of determining whether to send the message request to the message receiving end, if it is determined that the message request needs to be sent, further includes:
and carrying out persistence processing on the sent message request.
Further preferably, the determining whether the message request needs to be sent to the message receiving end further includes:
and regularly inquiring whether the subscribed theme is updated or not from the message agent, and if so, judging that a message request needs to be sent to a message receiving end.
Further preferably, before the step of periodically inquiring the message broker about whether the subscribed topic has a message update, the method includes:
and judging whether the message needs to be subscribed or not, if so, sending the subject needing to be subscribed to the message agent.
Further preferably, the implementation method further includes:
and judging whether a new message is to be issued or not, and if so, sending the issued message to a message agent.
The method for realizing the distributed message queue facing to the Internet of things, provided by the invention, can at least bring the following beneficial effects:
1. the message sender (corresponding to the producer) and the message receiver (corresponding to the consumer) preferentially use the P2P technology for communication through direct connection, and use the message proxy to relay the message when the P2P connection fails, so that the flow of the message proxy is greatly saved compared with the prior art in which all the message proxy to relay the message exists. In practical application, if the P2P connection is successfully established, approximately half of the traffic can be saved, that is, the implementation method provided by the invention can provide higher terminal device access capability while consuming less traffic cost on the basis of realizing the message queue function, thereby meeting the requirement of a large number of terminals in the internet of things;
2. compared with the prior art that the persistence of the sent message is realized through a message agent in different places, the sent message directly persists at a message sending end, so that the pressure of the message agent is greatly reduced while the message can be sent to a message receiving end;
3. the message sending end and the message agent and the message sending end and the message receiving end communicate in a mode of long connection mixed short connection, compared with the prior art that long connection is used completely, the burden of the message agent is greatly reduced, and meanwhile, more terminal equipment access capacity is provided.
Drawings
A more complete understanding of the present invention, and the attendant advantages and features thereof, will be more readily understood by reference to the following detailed description when considered in conjunction with the accompanying drawings wherein:
FIG. 1 is a diagram of a distributed message queue system in the prior art;
fig. 2 is a schematic flow chart of an embodiment of a method for implementing a distributed message queue for the internet of things in the present invention;
FIG. 3 is a diagram of a distributed message queue system according to the present invention;
fig. 4 is a schematic flow chart of another embodiment of the method for implementing the internet of things-oriented distributed message queue according to the present invention.
Detailed Description
In order to make the contents of the present invention more comprehensible, the present invention is further described below with reference to the accompanying drawings. The invention is of course not limited to this particular embodiment, and general alternatives known to those skilled in the art are also covered by the scope of the invention.
As shown in fig. 2, which is a schematic flow chart of an implementation method of a distributed message queue for internet of things provided by the present invention, it can be seen from the figure that the implementation method includes:
s10 judging whether it is needed to send message request to message receiving end, if yes, initiating P2P connection according to message receiving end address;
s20 judging whether P2P connection is successful, if not, establishing TURN connection with message receiving end through message agent;
s30 sends the message to the message receiver through the message broker.
In this embodiment, the distributed message queue system includes a message sender (corresponding to a producer in the existing distributed message queue system), a message Broker and a message receiver (corresponding to a consumer in the existing distributed message queue system), as shown in fig. 3, two HOSTs respectively correspond to the message sender and the message receiver, a Broker corresponds to the message Broker, and the Broker Leader is a central node of the message Broker, the message sender and the message receiver are connected by a long-Connection hybrid short-Connection P2P communication (illustrated by a Temporary P2P communication), and the message sender/message receiver and the message Broker are connected by a long-Connection hybrid short-Connection communication (illustrated by a Temporary Optimized communication). In practical applications, a Client (Client in the figure) is used to send and receive messages, and generally, the same Client can send and receive messages. The client and the specific application (APP in the figure) communicate with each other in a Shared Memory or TCP or HTTP (Hyper Text Transfer Protocol), and specifically, when the client and the specific application are applied to the same host, the client and the specific application communicate with each other in a Shared Memory manner to increase the speed of data exchange, and when the specific application needs to send a message, the client is used to implement the communication; when the client and the specific application are in different hosts, the communication is carried out by using a TCP or HTTP mode.
In the message agent cluster, selecting a unique central node through paxos algorithm, wherein the central node is responsible for data needing synchronization in the message agent cluster, and other message agents are equivalent and independent and have the same working content; and if the central node cannot work normally, re-election is carried out.
In the working process, when the message sending end judges that a message request needs to be sent to the message receiving end, the P2P (Peer-to-Peer) connection is tried to be initiated according to the address of the message receiving end; if the connection of the P2P is successful, the message sending end directly sends the message to the message receiving end; if the connection of P2P is unsuccessful, a TURN connection with the message receiving end is established through the message broker, and then the message request is sent to the message receiving end through the message broker. And after receiving the message request sent by the message sending end, the message receiving end sends the feedback message to the message sending end. As for the address of the message receiving end, the address is obtained when the message sending end communicates with the message receiving end through the message proxy for the first time (in the process of establishing the distributed message queue system, the client corresponding to each message sending end/message receiving end is registered in the message proxy respectively, so that the client corresponding to the message sending end/message receiving end obtains the address of the other side through the message proxy in the communication process).
In the process of communication between the message sending end and the message agent/message receiving end, the message sending end can also judge whether the message sending end does not communicate with the message agent/message receiving end within a preset time period, if so, the message sending end is not connected with the message agent/message receiving end for a long time, and then the communication connection with the message agent/message receiving end is disconnected. The preset time period is set according to actual conditions, for example, set to be 2min (minutes), 5min, 10min, and the like. Similarly, the message receiver will also perform the same operation in the process of communicating with the message broker/message receiver to reduce the burden of the message broker.
Compared with the prior art that the persistence is realized through the message agent in different places, the method ensures that the message can be sent to the message receiving end, and greatly reduces the pressure of the message agent.
The present embodiment is obtained by modifying the above embodiment, and as shown in fig. 4, the implementation method includes:
s11 inquires whether the subscribed theme has message update or not regularly from the message agent, if yes, it is determined that the message request needs to be sent to the message receiving end;
s12 initiates P2P connection according to the address of the message receiving end;
s20 judging whether P2P connection is successful, if not, establishing TURN connection with message receiving end through message agent;
s30 sends the message to the message receiver through the message broker.
In the embodiment, when a specific application in a client needs to subscribe to a message, the client operates to send a Topic (Topic) needing to be subscribed to a message broker; when a new message is to be published in a specific application in a client, the published message (including a subject needing to publish the message and the like) is sent to a message broker through the client, and the client serves as a message sending end at the moment.
In the working process, the client regularly inquires whether the subscribed theme has message update to the message broker, and if the message broker feeds back that the message update exists, the address of the client of the message source is obtained from the message broker, at this time, the client is used as a message sending end (hereinafter referred to as a first client), and the client of the message source is used as a message receiving end (hereinafter referred to as a second client).
Thereafter, the first client attempts to initiate a P2P connection based on the address of the second client; if the connection of the P2P is successful, the first client directly sends the message to the second client; if the P2P connection is unsuccessful, a TURN connection is established with the second client through the message broker, and the message request is sent to the second client through the message broker. And after receiving the message request sent by the first client, the second client sends the feedback message to the first client.
And if the first client is not connected with the message agent/second client for a long time, the communication connection with the message agent/second client is disconnected.
Claims (6)
1. A realization method of a distributed message queue facing to the Internet of things is characterized in that a distributed message queue system comprises a message sending end, a message agent and a message receiving end, wherein the message sending end and the message receiving end, and the message sending end/the message receiving end and the message agent are in communication connection in a long-connection mixed short-connection mode, and the realization method is applied to the message sending end and comprises the following steps:
judging whether a message request needs to be sent to a message receiving end, if so, initiating P2P connection according to the address of the message receiving end;
judging whether the P2P connection is successful, if not, establishing a TURN connection with a message receiving end through a message agent;
sending the message to a message receiving end through a message agent;
after the message is sent to the message receiving end, the method further comprises the following steps:
judging whether the communication with the message agent/message receiving end is not carried out within a preset time period or not, if so, disconnecting the communication connection with the message agent/message receiving end;
in the message agent cluster, selecting a unique central node through paxos algorithm, wherein the central node is responsible for data needing synchronization in the message agent cluster, and other message agents are equivalent and independent and have the same working content; and if the central node cannot work normally, re-election is carried out.
2. The method as claimed in claim 1, wherein in the step of determining whether the P2P connection is successful, if the P2P connection is successful, the message is directly sent to the message receiver.
3. The method as claimed in claim 1, wherein the step of determining whether to send the message request to the message receiving end, if it is determined that the message request needs to be sent, further comprises:
and carrying out persistence processing on the sent message request.
4. The method of claim 1, 2 or 3, wherein determining whether a message request needs to be sent to a message receiver further comprises:
and regularly inquiring whether the subscribed theme is updated or not from the message agent, and if so, judging that a message request needs to be sent to a message receiving end.
5. The method of claim 4, wherein before the step of periodically querying the message broker for the subscribed topic for message updates, comprising:
and judging whether the message needs to be subscribed or not, if so, sending the subject needing to be subscribed to the message agent.
6. The implementation method of claim 1, 2, 3 or 5, further comprising:
and judging whether a new message is to be issued or not, and if so, sending the issued message to a message agent.
Priority Applications (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN201811548642.XA CN109379443B (en) | 2018-12-18 | 2018-12-18 | Method for realizing distributed message queue facing to Internet of things |
Applications Claiming Priority (1)
| Application Number | Priority Date | Filing Date | Title |
|---|---|---|---|
| CN201811548642.XA CN109379443B (en) | 2018-12-18 | 2018-12-18 | Method for realizing distributed message queue facing to Internet of things |
Publications (2)
| Publication Number | Publication Date |
|---|---|
| CN109379443A CN109379443A (en) | 2019-02-22 |
| CN109379443B true CN109379443B (en) | 2022-02-15 |
Family
ID=65374219
Family Applications (1)
| Application Number | Title | Priority Date | Filing Date |
|---|---|---|---|
| CN201811548642.XA Active CN109379443B (en) | 2018-12-18 | 2018-12-18 | Method for realizing distributed message queue facing to Internet of things |
Country Status (1)
| Country | Link |
|---|---|
| CN (1) | CN109379443B (en) |
Families Citing this family (2)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN111740872A (en) * | 2020-05-06 | 2020-10-02 | 北京百度网讯科技有限公司 | Messaging test method, apparatus, electronic device and storage medium |
| CN116567077A (en) * | 2023-06-08 | 2023-08-08 | 曙光信息产业股份有限公司 | Bare metal instruction sending method, device, equipment and storage medium |
Citations (9)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN101221435A (en) * | 2006-09-28 | 2008-07-16 | 洛克威尔自动控制技术股份有限公司 | Message engine searching and classification |
| CN101431464A (en) * | 2007-11-06 | 2009-05-13 | 阿里巴巴集团控股有限公司 | Method and apparatus for optimizing data transmission route between clients |
| CN102883293A (en) * | 2012-10-12 | 2013-01-16 | 杭州东信北邮信息技术有限公司 | System and method for achieving intelligent management of mobile telephone incoming call |
| CN103702062A (en) * | 2013-12-27 | 2014-04-02 | Tcl集团股份有限公司 | Audio and video communication method, device and system |
| CN104363291A (en) * | 2014-11-19 | 2015-02-18 | 中国航天科工集团第二研究院七〇六所 | Network communication middleware implementation method |
| CN104486107A (en) * | 2014-12-05 | 2015-04-01 | 曙光信息产业(北京)有限公司 | Log collection device and method |
| CN105007313A (en) * | 2015-07-03 | 2015-10-28 | 许继集团有限公司 | Terminal access method and mass data transmission system for energy saving service |
| CN106331115A (en) * | 2016-08-26 | 2017-01-11 | 深圳市同为数码科技股份有限公司 | Distributed expandable server system in support of multi-device connection |
| CN107231290A (en) * | 2017-04-19 | 2017-10-03 | 中国建设银行股份有限公司 | A kind of instant communicating method and system |
Family Cites Families (1)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| US20050160424A1 (en) * | 2004-01-21 | 2005-07-21 | International Business Machines Corporation | Method and system for grid-enabled virtual machines with distributed management of applications |
-
2018
- 2018-12-18 CN CN201811548642.XA patent/CN109379443B/en active Active
Patent Citations (9)
| Publication number | Priority date | Publication date | Assignee | Title |
|---|---|---|---|---|
| CN101221435A (en) * | 2006-09-28 | 2008-07-16 | 洛克威尔自动控制技术股份有限公司 | Message engine searching and classification |
| CN101431464A (en) * | 2007-11-06 | 2009-05-13 | 阿里巴巴集团控股有限公司 | Method and apparatus for optimizing data transmission route between clients |
| CN102883293A (en) * | 2012-10-12 | 2013-01-16 | 杭州东信北邮信息技术有限公司 | System and method for achieving intelligent management of mobile telephone incoming call |
| CN103702062A (en) * | 2013-12-27 | 2014-04-02 | Tcl集团股份有限公司 | Audio and video communication method, device and system |
| CN104363291A (en) * | 2014-11-19 | 2015-02-18 | 中国航天科工集团第二研究院七〇六所 | Network communication middleware implementation method |
| CN104486107A (en) * | 2014-12-05 | 2015-04-01 | 曙光信息产业(北京)有限公司 | Log collection device and method |
| CN105007313A (en) * | 2015-07-03 | 2015-10-28 | 许继集团有限公司 | Terminal access method and mass data transmission system for energy saving service |
| CN106331115A (en) * | 2016-08-26 | 2017-01-11 | 深圳市同为数码科技股份有限公司 | Distributed expandable server system in support of multi-device connection |
| CN107231290A (en) * | 2017-04-19 | 2017-10-03 | 中国建设银行股份有限公司 | A kind of instant communicating method and system |
Also Published As
| Publication number | Publication date |
|---|---|
| CN109379443A (en) | 2019-02-22 |
Similar Documents
| Publication | Publication Date | Title |
|---|---|---|
| CN109327513B (en) | Interaction method, interaction device and computer-readable storage medium | |
| CN113596191A (en) | Data processing method, network element equipment and readable storage medium | |
| CN110460638A (en) | Message push method and device | |
| HK1208970A1 (en) | Processing method, device and system for message service and message service system | |
| EP2088791A1 (en) | Method, system and device for increasing multimedia messaging service system capacity | |
| CN108683653B (en) | Active message pushing system based on WebSocket | |
| KR101805458B1 (en) | Method for supporting websocket and web server and web application server using the same | |
| CN107528891B (en) | Websocket-based automatic clustering method and system | |
| CN111917562B (en) | Broadcast message forwarding method, device, equipment and storage medium | |
| CN110730116B (en) | Operation method of communication equipment in local area network, communication equipment and gateway equipment | |
| CN112217649B (en) | Terminal equipment management method, server and terminal equipment | |
| CN106453136A (en) | Method and device for establishing message queue | |
| CN113572835A (en) | Data processing method, network element equipment and readable storage medium | |
| CN111683158A (en) | An MQTT protocol communication method for realizing synchronous request response | |
| CN111741129A (en) | MQTT protocol communication method and system for realizing multiplexing | |
| CN111131188A (en) | Communication connection method, server, client, and storage medium | |
| CN109379443B (en) | Method for realizing distributed message queue facing to Internet of things | |
| CN116846959A (en) | Message receiving and transmitting method and device based on MQTT protocol | |
| CN110380967B (en) | SSE technology-based server message pushing method | |
| CN106027599B (en) | Data transmission channel establishing method, system and server | |
| EP2480009B1 (en) | Multimedia message transmission method and apparatus thereof, and domain name server | |
| WO2013159492A1 (en) | Method and system for reporting and downloading information | |
| CN113037803A (en) | Geological disaster monitoring method and system, electronic equipment and storage medium | |
| CN115412599B (en) | Message data forwarding method, device and server | |
| EP4238249B1 (en) | Lwm2m server device, lwm2m client device, and methods thereof |
Legal Events
| Date | Code | Title | Description |
|---|---|---|---|
| PB01 | Publication | ||
| PB01 | Publication | ||
| SE01 | Entry into force of request for substantive examination | ||
| SE01 | Entry into force of request for substantive examination | ||
| GR01 | Patent grant | ||
| GR01 | Patent grant |