+

TWI735519B - Distributed environment coordinated consumption queue method and device - Google Patents

Distributed environment coordinated consumption queue method and device Download PDF

Info

Publication number
TWI735519B
TWI735519B TW106102716A TW106102716A TWI735519B TW I735519 B TWI735519 B TW I735519B TW 106102716 A TW106102716 A TW 106102716A TW 106102716 A TW106102716 A TW 106102716A TW I735519 B TWI735519 B TW I735519B
Authority
TW
Taiwan
Prior art keywords
queue
lease
client
segment
time
Prior art date
Application number
TW106102716A
Other languages
Chinese (zh)
Other versions
TW201828199A (en
Inventor
孫廷韜
Original Assignee
香港商阿里巴巴集團服務有限公司
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by 香港商阿里巴巴集團服務有限公司 filed Critical 香港商阿里巴巴集團服務有限公司
Priority to TW106102716A priority Critical patent/TWI735519B/en
Publication of TW201828199A publication Critical patent/TW201828199A/en
Application granted granted Critical
Publication of TWI735519B publication Critical patent/TWI735519B/en

Links

Images

Landscapes

  • Computer And Data Communications (AREA)

Abstract

本發明實施例提供了一種分散式環境協調消費(consume)佇列方法和裝置,涉及分散式技術領域。該方法包括:針對一待消費的佇列分片,基於租賃協定獲取的該佇列分片的狀態資料;根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片;如果確定沒有其他用戶端在消費該佇列分片,則更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度;根據當前的消費進度繼續消費該佇列分片,並將該佇列分片新的消費進度進行記錄。本發明可以在負載均衡、某個用戶端當掉等情況下,在當前用戶端A搶佔其他客戶端正在消費的佇列分片時,佇列分片的消費進度可以無縫傳遞到當前用戶端中,避免部分資料的重複消費,使消費結果更精確。 The embodiment of the present invention provides a method and device for a decentralized environment coordinated consumption (consume) queue, which relates to the field of decentralized technologies. The method includes: for a queue segment to be consumed, the status data of the queue segment obtained based on the lease agreement; judging whether there are other clients consuming the queue segment according to the status data; if it is determined that there is no When other clients are consuming the queue segment, they update the status data of the queue segment and obtain the current consumption progress of the queue segment; continue to consume the queue segment according to the current consumption progress, and add the Queue and record the new consumption progress of shards. In the present invention, when the current client A preempts the queue segment that is being consumed by other clients under load balancing, a certain client crash, etc., the consumption progress of the queue segment can be seamlessly transferred to the current client Avoid repeated consumption of part of the data, and make the consumption result more accurate.

Description

分散式環境協調消費隊列方法和裝置 Distributed environment coordinated consumption queue method and device

本發明涉及分散式技術領域,特別是涉及一種分散式環境協調消費(consume)佇列方法和一種分散式環境協調消費佇列裝置。 The present invention relates to the field of decentralized technology, in particular to a decentralized environment-coordinated consumption (consume) queue method and a decentralized environment-coordinated consumption queue device.

隨著雲計算、大資料時代到來,資料產生的來源越來越廣,速度越來越快,數量也越來越大。如web伺服器上、各種用戶端、感測器等,即時產生了大量的資料,記錄了各種使用者訪問請求、監控資料、程式運行狀態等資訊。為了更好挖掘資料的價值,往往有多個系統消費這些資料,如各類即時、離線系統對資料進行的使用者行為分析、監控報警、程式入侵偵測等。 With the advent of the era of cloud computing and big data, the sources of data are becoming wider and wider, faster and faster, and the quantity is increasing. For example, on the web server, various clients, sensors, etc., a large amount of data is generated in real time, recording various user access requests, monitoring data, program running status and other information. In order to better tap the value of data, there are often multiple systems that consume the data, such as user behavior analysis, monitoring alarms, and program intrusion detection performed by various real-time and offline systems on the data.

為了降低資料生產和消費的耦合度,通常使用佇列系統快取、彙集多個生成者生產的資料,再由多個作為計算節點的用戶端從佇列中消費資料。一般佇列系統為了能支援大量資料,都會使用多個佇列分片(如shard或partition)來支援大量資料的水準擴展。為了增加寫入的輸送量,只需要增加shard(或partition)的個數即可。 In order to reduce the degree of coupling between production and consumption of data, a queue system is usually used to cache and aggregate data produced by multiple producers, and then multiple clients as computing nodes consume data from the queue. In order to support a large amount of data, a general queue system uses multiple queue shards (such as shards or partitions) to support the level expansion of a large amount of data. In order to increase the throughput of writing, you only need to increase the number of shards (or partitions).

當作為計算節點的用戶端需要從多個shard(或partition)中消費資料的時候,就需要在多個用戶端上運行相同的程式來協同消費這些資料。而用戶端在消費資料的時候,同樣也需要指定從哪個shard(或partition)中抓取資料,完成消費,通常對於一個由N個shard(或partition)組成的資料佇列,會由M(M的取值範圍為1~N)個消費用戶端來協同消費這些shard(或partition),平均每台用戶端消費N/M個shard(或partition)。為了正確消費所有shard(或partition)中的資料,這M個用戶端之間需要協同合作。 When a client as a computing node needs to consume data from multiple shards (or partitions), it needs to run the same program on multiple clients to collaboratively consume these data. When the client consumes data, it also needs to specify from which shard (or partition) to grab the data to complete the consumption. Usually, for a data queue consisting of N shards (or partitions), M(M The value range of is 1~N) consumer clients to collaboratively consume these shards (or partitions), and each client consumes N/M shards (or partitions) on average. In order to correctly consume the data in all shards (or partitions), these M clients need to cooperate.

在先技術中,對於分散式環境下的各個用戶端,存在以下方案協同各個用戶端對各個佇列分片進行消費:如採用Kinesis Client Library。 In the prior art, for each client in a decentralized environment, there is the following solution to cooperate with each client to consume each queue fragment: for example, using Kinesis Client Library.

Kinesis是AWS提供的資料即時佇列服務,Kinesis Client Library是用於消費Kinesis數據的協同lib。Kinesis Client Library依賴於DynamoDB來完成用戶端之間的協同。雖然Kinesis Client Library支援新增用戶端消費資料,但是進行負載均衡等操作時,在某個用戶端對搶佔佇列分片,直接重新進行消費,因此會導致部分資料的重複消費,而導致消費結果不精確。 Kinesis is a real-time data queuing service provided by AWS, and Kinesis Client Library is a collaborative lib for consuming Kinesis data. Kinesis Client Library relies on DynamoDB to complete the collaboration between the clients. Although Kinesis Client Library supports new client consumption data, when performing operations such as load balancing, the preemption queue fragments are directly re-consumed on a certain client side, which will lead to repeated consumption of some data and lead to consumption results. Not precise.

鑒於上述問題,提出了本發明實施例以便提供一種克服上述問題或者至少部分地解決上述問題的一種分散式環 境協調消費佇列方法和相應的一種分散式環境協調消費佇列裝置。 In view of the above problems, the embodiments of the present invention are proposed to provide a decentralized environment coordinated consumption queue method and a corresponding distributed environment coordinated consumption queue device that overcome the above problems or at least partially solve the above problems.

為了解決上述問題,本發明公開了一種分散式環境協調消費佇列的方法,包括:針對一待消費的佇列分片,基於租賃協定獲取的該佇列分片的狀態資料;根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片;如果確定沒有其他用戶端在消費該佇列分片,則更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度;根據當前的消費進度繼續消費該佇列分片,並將該佇列分片新的消費進度進行記錄。 In order to solve the above problems, the present invention discloses a method for coordinating consumption queues in a distributed environment, including: for a queue segment to be consumed, the status data of the queue segment is obtained based on a lease agreement; and according to the status data To determine whether there are other clients consuming the queue segment; if it is determined that no other clients are consuming the queue segment, update the status data of the queue segment and obtain the current consumption of the queue segment Progress; continue to consume the queue segment according to the current consumption progress, and record the new consumption progress of the queue segment.

本發明還公開了一種分散式環境協調消費佇列的裝置,包括:狀態資料獲取模組,用於針對一待消費的佇列分片,基於租賃協定獲取的該佇列分片的狀態資料;消費判斷模組,用於根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片;進度獲取模組,用於如果確定沒有其他用戶端在消費該佇列分片,則更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度;消費模組,用於根據當前的消費進度繼續消費該佇列分片,並將該佇列分片新的消費進度進行記錄。 The present invention also discloses a device for coordinating consumption queues in a distributed environment, including: a status data acquisition module for acquiring status data of a queue segment to be consumed based on a lease agreement; The consumption judgment module is used to judge whether other clients are consuming the queue segment based on the status data; the progress acquisition module is used to update the queue segment if it is determined that no other client is consuming the queue segment The status data of the queue segment and obtain the current consumption progress of the queue segment; the consumption module is used to continue to consume the queue segment according to the current consumption progress, and the new consumption progress of the queue segment Make a record.

本發明實施例包括以下優點:本發明實施例在分散式環境下,對於一用戶端,其在搶佔到佇列分片後,該佇列分片對於該用戶端即是待消費的佇列分片,那麼該用戶端需要首先基於租賃協定獲取的該佇列分片的狀態資料,然後根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片,在確定沒有其他用戶端在消費該佇列分片後,該用戶端才會獲取該佇列分片當前的消費進度,並且更新該佇列分片的狀態資料,然後該用戶端再以當前的消費進度繼續消費該佇列分片並記錄新的消費進度。本發明實施例通過上述過程,在當前用戶端A搶佔其他客戶端正在消費的佇列分片時,該佇列分片的消費進度可以無縫傳遞到當前用戶端中,使在進行佇列分片負載均衡時,或者某個正在消費佇列分片的用戶端當掉,某個用戶端的佇列分片被當前用戶端搶佔後,當前用戶端A搶佔到上述佇列分片後,可以按照該佇列分片的已消費的消費進度,繼續消費該佇列分片,避免部分資料的重複消費,使消費結果更精確。 The embodiments of the present invention include the following advantages: in a distributed environment, for a client, after it preempts the queue segment, the queue segment is the queue segment to be consumed for the client. The client needs to first obtain the status data of the queue segment based on the lease agreement, and then determine whether there are other clients consuming the queue segment based on the status data, and after confirming that no other client is consuming the queue segment After the queue is fragmented, the client will obtain the current consumption progress of the queue fragment and update the status data of the queue fragment, and then the client will continue to consume the queue fragment with the current consumption progress Film and record new consumption progress. Through the above-mentioned process, in the embodiment of the present invention, when the current client A preempts the queue segment that is being consumed by other clients, the consumption progress of the queue segment can be seamlessly transferred to the current client, so that the queue segment is in progress. When the slice load is balanced, or a client that is consuming the queue fragment crashes, after the queue fragment of a certain client is preempted by the current client, after the current client A preempts the above queue fragment, you can follow The consumption progress of the queue segment has been consumed, continue to consume the queue segment, avoid repeated consumption of part of the data, and make the consumption result more accurate.

110‧‧‧步驟 110‧‧‧Step

120‧‧‧步驟 120‧‧‧Step

130‧‧‧步驟 130‧‧‧Step

140‧‧‧步驟 140‧‧‧Step

310‧‧‧狀態資料獲取模組 310‧‧‧Status data acquisition module

320‧‧‧消費判斷模組 320‧‧‧Consumption Judgment Module

330‧‧‧進度獲取模組 330‧‧‧Progress Obtaining Module

340‧‧‧消費模組 340‧‧‧Consumer Module

圖1是本發明的一種分散式環境協調消費佇列方法實施例的步驟流程圖;圖2是本發明的一種分散式環境協調消費佇列裝置實施例的結構框圖;圖2A是本發明一種分散式環境的架構示例。 Fig. 1 is a step flow diagram of an embodiment of a method for coordinated consumption queuing in a distributed environment according to the present invention; Fig. 2 is a structural block diagram of an embodiment of a device for coordinated consumption queuing in a distributed environment according to the present invention; Fig. 2A is an embodiment of the present invention An example of the architecture of a decentralized environment.

為使本發明的上述目的、特徵和優點能夠更加明顯易懂,下面結合圖式和具體實施方式對本發明作進一步詳細的說明。 In order to make the above-mentioned objects, features and advantages of the present invention more obvious and easy to understand, the present invention will be further described in detail below in conjunction with the drawings and specific embodiments.

本發明實施例的核心構思之一在於,對於分散式環境下,當生成者寫入資料的時候,可以通過雜湊或者輪詢調度的方式將資料寫入不同的佇列分片(shard或partition)來完成負載均衡。而作為分散式環境下的計算節點的用戶端,其在消費資料的時候,同樣也需要指定從哪個佇列分片中抓取資料,完成消費,通常對於一個由N個佇列分片組成的資料佇列,會由M(M的取值範圍為1~N)個消費用戶端來協同消費這些佇列分片,平均每個用戶端消費N/M個佇列分片。本發明實施例中,為了方便描述,佇列分片以shard表示。 One of the core concepts of the embodiments of the present invention is that for a distributed environment, when a generator writes data, the data can be written into different queue shards (shards or partitions) through hashing or round-robin scheduling. To complete load balancing. As the client of a computing node in a distributed environment, when consuming data, it also needs to specify which queue shard to fetch data from to complete the consumption, usually for a queue composed of N queue shards In the data queue, M (the value range of M is 1~N) consumer clients will coordinately consume these queue fragments, and each client consumes N/M queue fragments on average. In the embodiment of the present invention, for the convenience of description, the queue segment is represented by a shard.

為了正確消費所有shard中的資料,這M個用戶端之間需要協同合作,其需要考慮如下幾個方面:第一,各用戶端如何正確選擇消費的shard,以確保任意時刻任意shard有且只有一個消費用戶端;第二,同時如何處理某個或者某些用戶端宕機的情況下,進行再負載均衡時,所有的shard上的資料仍舊能夠正確處理,不被重複消費;第三,當處理壓力增大時,需要新增處理的用戶端,如何做到自動負載均衡,並能保證任意資料不被重複消費。 In order to correctly consume the data in all shards, the M clients need to cooperate and cooperate, and it needs to consider the following aspects: First, how to correctly choose the shard to be consumed by each client to ensure that any shard at any time has and only A consumer client; second, how to deal with the situation that one or some clients are down at the same time, and when the load balance is performed, all the data on the shard can still be processed correctly and will not be repeatedly consumed; third, when When processing pressure increases, users that need to be added for processing, how to achieve automatic load balancing, and to ensure that any data is not repeatedly consumed.

基於保證資料不被重複消費的考慮,本發明實施例在 分散式環境下,對於一用戶端A,其在搶佔到佇列分片後,該佇列分片對於該用戶端A即是待消費的佇列分片,那麼該用戶端A需要首先基於租賃協定獲取的該佇列分片的狀態資料,然後根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片,在確定沒有其他用戶端在消費該佇列分片後,該用戶端A才會獲取該佇列分片當前的消費進度,並且更新該佇列分片的狀態資料,然後該用戶端A再以當前的消費進度繼續消費該佇列分片並記錄新的消費進度。如此迴圈,當另一用戶端B搶佔用戶端的該佇列分片時,則按照相同的邏輯執行。本發明實施例通過上述過程,在當前用戶端A搶佔其他客戶端正在消費的佇列分片時,該佇列分片的消費進度可以無縫傳遞到當前用戶端中,使在進行佇列分片負載均衡時,或者某個正在消費佇列分片的用戶端當掉,某個用戶端的佇列分片被當前用戶端搶佔後,當前用戶端A搶佔到上述佇列分片後,可以按照該佇列分片的已消費的消費進度,繼續消費該佇列分片,避免部分資料的重複消費,使消費結果更精確。 Based on the consideration of ensuring that data is not repeatedly consumed, in the embodiment of the present invention, in a distributed environment, for a client A, after it preempts the queue segment, the queue segment is to be consumed for the client A Queue segment, then the client A needs to first obtain the status data of the queue segment based on the lease agreement, and then determine whether other clients are consuming the queue segment based on the status data. After other clients consume the queue segment, the client A will obtain the current consumption progress of the queue segment and update the status data of the queue segment, and then the client A will use the current consumption The progress continues to consume the queue segment and records the new consumption progress. In this loop, when another client B preempts the queue segment of the client, it is executed according to the same logic. Through the above-mentioned process, in the embodiment of the present invention, when the current client A preempts the queue segment that is being consumed by other clients, the consumption progress of the queue segment can be seamlessly transferred to the current client, so that the queue segment is in progress. When the slice load is balanced, or a client that is consuming the queue fragment crashes, after the queue fragment of a certain client is preempted by the current client, after the current client A preempts the above queue fragment, you can follow The consumption progress of the queue segment has been consumed, continue to consume the queue segment, avoid repeated consumption of part of the data, and make the consumption result more accurate.

為了更清楚的描述本發明實施例,本發明定義了以下幾種術語: In order to describe the embodiments of the present invention more clearly, the present invention defines the following terms:

Shard:佇列分片,其為資料佇列中實際保存資料的容器,一個資料佇列由多個shard構成。每個Worker消費資料時需要選擇具體的shard。當然,也可用partition表示,本發明不對其加以限制。 Shard: Queue fragment, which is the container that actually stores data in the data queue. A data queue is composed of multiple shards. Each worker needs to select a specific shard when consuming data. Of course, it can also be represented by partition, which is not limited by the present invention.

Worker:可以理解為一個用戶端或者說一個工作進 程,對應一個計算節點,一個worker可消費一個或多個shard中的資料。 Worker: It can be understood as a client or a work process, corresponding to a computing node, and a worker can consume data in one or more shards.

Worker_name:每個worker的名字,用於區別對應不同用戶端。 Worker_name: The name of each worker, used to distinguish between different clients.

Lease:每個shard都有一個鎖(lease),只有搶佔到該lease的worker才能消費該shard中的資料。 Lease: Each shard has a lock (lease), and only workers who seize the lease can consume the data in the shard.

Check Point:消費進度,其記錄一個shard當前已經被消費的位置資訊,表示哪些資料已經被消費完。 Check Point: Consumption progress, which records the current location information of a shard that has been consumed, indicating which data has been consumed.

實施例一 Example one

參照圖1,示出了本發明的一種分散式環境協調消費佇列方法實施例的步驟流程圖,具體可以包括如下步驟:步驟110,針對一待消費的佇列分片,基於租賃協定獲取的該佇列分片的狀態資料;本發明實施例提供了一套分散式環境下多消費者協同消費佇列中各shard的租賃協議,實現多計算節點協同消費多個shard中的資料的過程。 1, there is shown a step flow chart of an embodiment of a method for coordinating consumption queues in a distributed environment of the present invention, which may specifically include the following steps: Step 110, for a queue segment to be consumed, obtained based on a lease agreement The status data of the queue fragments; the embodiment of the present invention provides a set of lease agreements for each shard in a multi-consumer collaborative consumption queue in a distributed environment to realize the process of multi-computing nodes collaboratively consuming data in multiple shards.

根據該租賃協定,某個用戶端消費shard時,會將其消費進度記錄到一個持久化的儲存空間中,並且會更新對該shard的狀態資料。該shard的狀態資料用於判斷該shard是否有客戶端正在消費。 According to the lease agreement, when a client consumes a shard, its consumption progress will be recorded in a persistent storage space, and the status data of the shard will be updated. The status data of the shard is used to determine whether the shard is being consumed by clients.

那麼對當前用戶端worker A來說,如果其搶佔了一個shard,該shard對於該用戶端來說,就是一個待消費的shard,則當前用戶端worker A需要基於租賃協定獲取 的該佇列分片的狀態資料,進入步驟120。 So for the current client worker A, if it preempts a shard, the shard is a shard to be consumed for the client, then the current client worker A needs to obtain the queue shards based on the lease agreement For the status information, go to step 120.

在本發明一優選的實施例中,步驟110之前還包括: In a preferred embodiment of the present invention, before step 110, the method further includes:

步驟100,基於租賃協議確定當前用戶端需求的待消費的佇列分片。 Step 100: Determine the queue fragments to be consumed that are currently required by the client based on the lease agreement.

在實際應用中,當前用戶端首先需要根據租賃協定,去確定其能夠消費幾個shard,然後從何處搶佔上述shard,作為當前用戶端worker A的待消費的shard。 In practical applications, the current client first needs to determine how many shards it can consume according to the lease agreement, and then where to seize the above shards as the shards to be consumed by the current client worker A.

在本發明一優選的實施例中,該步驟100包括子步驟M1:子步驟M1,基於租賃協議每隔第一時間週期,確定當前用戶端需求的待消費的佇列分片。 In a preferred embodiment of the present invention, the step 100 includes sub-step M1: sub-step M1, based on the lease agreement, determines the queue fragments to be consumed that are currently required by the user at a first time period.

在本發明實施例中,租賃協議規定了當前用戶端worker A每隔第一時間週期,去定其能夠消費幾個shard,然後從何處搶佔上述shard,作為當前用戶端的待消費的shard。如此,可以即時的對分散式環境下的各shard進行負載均衡。 In the embodiment of the present invention, the lease agreement stipulates that the current user-side worker A determines how many shards it can consume every first time period, and then where to seize the above-mentioned shards as the current user-side shard to be consumed. In this way, load balancing can be performed on the shards in a distributed environment in real time.

在本發明一優選的實施例中,該步驟100包括子步驟101-102: 子步驟101,獲取活躍的用戶端總數U、佇列分片總數P以及當前用戶端已消費的佇列分片總數Q,以計算當前用戶端需要搶佔的佇列分片數量N。 In a preferred embodiment of the present invention, this step 100 includes sub-steps 101-102: Sub-step 101, obtaining the total number of active clients U, the total number of queued fragments P, and the total number of queued fragments consumed by the current client Q, to calculate the number N of queue fragments that the current client needs to preempt.

在實際應用中,活躍的用戶端表示該用戶端是正常的,可以分擔處理shard。為了計算當前用戶端A需要搶佔的shard數量N,該數量N需要保證分散式環境的負載 均衡,那麼需要獲取活躍的用戶端總數U、佇列分片總數P以及當前用戶端已消費的佇列分片總數Q,然後計算該數量N。 In practical applications, an active client means that the client is normal and can share the processing of shards. In order to calculate the number N of shards that the current client A needs to preempt, the number N needs to ensure the load balance of the distributed environment, then it is necessary to obtain the total number of active clients U, the total number of queued shards P, and the queues that the current client has consumed The total number of shards Q, and then the number N is calculated.

在本發明另一優選的實施例中,子步驟101包括:子步驟A11-A14: 子步驟A11,從持久化儲存空間中儲存的用戶端實例表和佇列租期表中獲取活躍的用戶端總數U。 In another preferred embodiment of the present invention, the sub-step 101 includes: sub-steps A11-A14: sub-step A11, obtain active client from the client instance table and queue lease table stored in the persistent storage space The total number U.

在本發明實施例中,可以預先在持久化儲存空間中構建用戶端實例表和佇列租期表。該持久化儲存空間可以為資料庫,或者如分散式快取,當然也可以為其他類型的持久化儲存空間,本發明實施例不對其加以限制。在本發明實施例中優選的所示持久化儲存空間為資料庫,當然該資料庫可以為mysql資料庫,也可以為其他類型的資料庫,本發明實施例不對其加以限制。 In the embodiment of the present invention, the client instance table and the queue lease period table can be constructed in the persistent storage space in advance. The persistent storage space may be a database, or, for example, a distributed cache, of course, it may also be other types of persistent storage space, which is not limited in the embodiment of the present invention. In the embodiment of the present invention, the preferred persistent storage space is a database. Of course, the database can be a mysql database or other types of database, which is not limited in the embodiment of the present invention.

其中,佇列租期表client_shard_lease,可以採用表一的形式:

Figure 106102716-A0202-12-0009-1
Among them, the queue lease period table client_shard_lease can take the form of Table 1:
Figure 106102716-A0202-12-0009-1

表一中: consume_group:消費組欄位,資料類型為Char(64),是表的主鍵之一,表示對於某一個佇列的消費組名。在實際應用中,shard是以消費組劃分的,用戶端也是以消費組創建的,因此,本發明可以採用consume_group區分不同的消費組。 In Table 1: consume_group: consumer group field, the data type is Char(64), which is one of the primary keys of the table, and represents the consumer group name for a certain queue. In practical applications, shards are divided by consumption groups, and user terminals are also created by consumption groups. Therefore, the present invention can use consume_group to distinguish different consumption groups.

shard_id:佇列標識欄位,資料類型為Char(64),是表的主鍵之一,佇列中各個shard的標識;lease_id:租用標識欄位,資料類型為int(20),worker租用shard使用的id。在本發明實施例中,使用原子的test and set操作,保證任意時刻,只有一個owner能修改lease的值,也就是能搶到該shard。其中,“test”和“set”操作是在一個不可以分割的原子操作中完成,使同一時刻只有1個用戶端可以搶佔該shard,以保障資料操作的正確性。 shard_id: queue identification field, the data type is Char(64), which is one of the primary keys of the table, and the identification of each shard in the queue; lease_id: lease identification field, the data type is int(20), the worker rents the shard for use Id. In the embodiment of the present invention, the atomic test and set operation is used to ensure that at any time, only one owner can modify the value of the lease, that is, the shard can be grabbed. Among them, the "test" and "set" operations are completed in an inseparable atomic operation, so that only one client can preempt the shard at the same time to ensure the correctness of data operations.

lease_owner:租用者欄位,資料類型為Char(64),搶佔該shard的lease的所有者,即某個worker_name;consumer_owner:消費者欄位,資料類型Char(64),當前正在消費該shard的所有者,即某個worker_name;check_point:進度欄位,資料類型為Text,記錄該shard當前已消費的消費進度。 lease_owner: The renter field, the data type is Char(64), the owner of the lease that preempts the shard, that is, a worker_name; consumer_owner: the consumer field, the data type is Char(64), which is currently consuming all of the shard , That is, a certain worker_name; check_point: progress field, the data type is Text, which records the current consumption progress of the shard.

update_time:更新時間欄位,資料類型為DateTime,用於記錄更新時間,供監控使用。 update_time: Update time field, the data type is DateTime, used to record the update time for monitoring.

當然,在實際應用中,本發明實施例的client_shard_lease表可以包括shard_id、lease_owner、consumer_owner、check_point,以在搶佔的用戶端和被搶的占用戶端之間,傳遞被搶的占佇列分片被被搶用戶端所消費的消費進度。其他幾個欄位可以為優選的欄位。 Of course, in practical applications, the client_shard_lease table of the embodiment of the present invention may include shard_id, lease_owner, consumer_owner, and check_point to transfer the preempted queue fragments between the preempted client and the preempted client. The consumption progress of the robbed client. Several other fields can be preferred fields.

在本發明實施例中,各個用戶端消費某個shard時,會更新表一中該shard的狀態資料。 In the embodiment of the present invention, when each client consumes a shard, the status data of the shard in Table 1 is updated.

其中,用戶端實例表client_worker_instance,可以採用表二的形式:

Figure 106102716-A0202-12-0011-3
Among them, the user-side instance table client_worker_instance can take the form of Table 2:
Figure 106102716-A0202-12-0011-3

consume_group:為消費者欄位,資料類型為Char(128),是表的主鍵之一,表示對於某一個佇列的消費組名;worker_name:為用戶端名,資料類型為Char(64),是表的主鍵之一。 consume_group: is the consumer field, the data type is Char(128), which is one of the primary keys of the table, and represents the consumer group name for a certain queue; worker_name: is the client name, and the data type is Char(64), yes One of the primary keys of the table.

create_time:資料類型為DateTime,是用戶端啟動的時間。 create_time: The data type is DateTime, which is the time when the client is started.

表二可以記錄對一個消費組創建的各個用戶端。 Table 2 can record each client created for a consumer group.

在實際應用中,針對一個消費組的用戶端啟動後,該用戶端的worker_name會被寫入到該用戶端實例表中,並記錄其啟動的時間。 In practical applications, after the client for a consumer group is started, the worker_name of the client will be written into the instance table of the client, and the start time will be recorded.

具體的,在本發明實施例中,用戶端中包括了client lib和資料處理邏輯。lient lib執行本發明的搶佔和續租等邏輯,資料處理邏輯為正常對shard中資料進行消費的邏輯。其中,上述消費的邏輯,比如使用者行為分析、監控報警、程式入侵偵測等具體的分析處理操作。其中,client lib可以理解為用戶端的一個執行緒,client lib執行過程大致為: Specifically, in the embodiment of the present invention, the client lib and data processing logic are included in the client terminal. The lient lib executes the logic of preemption and lease renewal of the present invention, and the data processing logic is the logic for normally consuming data in the shard. Among them, the above consumption logic, such as user behavior analysis, monitoring alarm, program intrusion detection and other specific analysis and processing operations. Among them, client lib can be understood as a thread on the user side, and the execution process of client lib is roughly as follows:

1、worker啟動時,client lib將worker的名稱worker_name和當前系統時間寫入到表二所示的 client_worker_instance表中。 1. When the worker starts, the client lib writes the worker name worker_name and the current system time into the client_worker_instance table shown in Table 2.

2、client lib獲取佇列中所有shard的名稱,即shard_id。 2. The client lib obtains the names of all shards in the queue, that is, shard_id.

3、Client lib從client_shard_lease中獲取所有shard_id的資訊,包括每個shard對應的lease_id和lease_owner; 3. Client lib obtains all shard_id information from client_shard_lease, including the lease_id and lease_owner corresponding to each shard;

4、Client lib判斷2中獲取的shard_id是否在client_shard_lease表出現;如果沒出現,則在client_shard_lease表中添加該shard_id的記錄,設置lease_id為0,lease_owner、consumer_owner、check_point、update_time設置為空。 4. Client lib judges whether the shard_id obtained in 2 appears in the client_shard_lease table; if it does not appear, add the record of the shard_id in the client_shard_lease table, set the lease_id to 0, and set the lease_owner, consumer_owner, check_point, and update_time to empty.

當然在實際應用中,用戶端會創建搶佔執行緒和租用執行緒。搶佔執行緒用於搶佔shard,租用執行緒用於租用shard進行消費。 Of course, in actual applications, the client will create preemptive threads and leased threads. The preemption thread is used to preempt the shard, and the leased thread is used to rent the shard for consumption.

由於實際應用中,那麼本發明實施例可以從表一和表二中獲取活躍的用戶端,從而計算活躍的用戶端總數U。 Due to practical applications, the embodiment of the present invention can obtain active client terminals from Table 1 and Table 2, so as to calculate the total number U of active client terminals.

進一步的,該子步驟A11包括:子步驟A111-A113: Further, this sub-step A11 includes: sub-steps A111-A113:

子步驟A111,獲取用戶端實例表中記錄的最近一個第一時間週期內啟動的用戶端的數量live1;在實際應用中,如果一個用戶端啟動後,執行時間短,則可能該用戶端並未搶佔任何一個shard,那麼在表一中的租用者欄位就沒有該用戶端的記錄。 Sub-step A111, obtain the number live1 of the client started in the last first time period recorded in the client instance table; in actual applications, if the execution time of a client is short after it is started, it may not be preempted by the client For any shard, there is no record of the client in the tenant field in Table 1.

那麼,如前述的client_worker_instance表,由於每個用戶端啟動後,基於其處理的佇列的消費組名,都會將 其worker_name和啟動時間寫入該client_worker_instance表中。 Then, as in the aforementioned client_worker_instance table, since each client is started, its worker_name and start time will be written into the client_worker_instance table based on the consumer group name of the queue it processes.

本發明實施例可採用用戶端的系統時間減去啟動時間得到的t,判斷該t是否小於第一時間週期T,如果小於,則該用戶端是活躍的,則統計得到該類活躍的用戶端的數量得到live1。 In the embodiment of the present invention, t obtained by subtracting the startup time from the system time of the user terminal can be used to determine whether the t is less than the first time period T. If it is less than, the user terminal is active, and the number of active users of this type is obtained by statistics. Get live1.

子步驟A112,獲取佇列租期表中,沒有超時佇列分片對應的用戶端的數量live2;該沒有超時的用戶端為佇列分片的租用者欄位下記錄的用戶端;本發明實施例還從client_shard_lease表中,獲取所有的shard的記錄,保存所有lease沒有超時的shard記錄,然後從中提取lease_owner下的worker_name,其對應的用戶端即為活躍的,則統計得到該類活躍的用戶端的數量得到live2。 Sub-step A112, obtain the number of clients live2 corresponding to the queue segment without timeout in the queue lease table; the client without timeout is the client recorded under the tenant field of the queue segment; this The embodiment of the invention also obtains the records of all shards from the client_shard_lease table, saves records of all shards whose leases have not timed out, and then extracts the worker_name under lease_owner from it. The corresponding client is active, and the statistics show that this type of activity is active The number of clients gets live2.

子步驟A113,將live1加上live2得到總的活躍的用戶端數量U。 In sub-step A113, add live1 to live2 to obtain the total number of active clients U.

U=live1+live2,該U即為活躍的用戶端總數。 U=live1+live2, this U is the total number of active clients.

子步驟A12,從佇列系統中獲取佇列分片的總個數P;在本發明實施例中,對於一個消費組,佇列系統中獲取該消費組下的shard總數P。 Sub-step A12: Obtain the total number of shards P from the queue system; in the embodiment of the present invention, for a consumer group, the queue system acquires the total number P of shards under the consumer group.

子步驟A13,從持久化儲存空間中儲存的佇列租期表中獲取當前用戶端已消費的佇列分片總數Q。 Sub-step A13, obtain the total number Q of the queue fragments consumed by the current client from the queue lease period table stored in the persistent storage space.

對於client_shard_lease表中的任一shard記錄, lease_owner和consumer_owner的worker_name相同時,並且該consumer_owner為當前的搶佔的工作進程的consumer_owner,則說明該worker_name佔用了該shard。如果lease_owner和consumer_owner的worker_name不相同時,則說明consumer_owner中的worker_name不再允許佔用該shard,該shard被其他worker搶佔。 For any shard record in the client_shard_lease table, when the worker_name of lease_owner and consumer_owner are the same, and the consumer_owner is the consumer_owner of the current preempted worker process, it means that the worker_name occupies the shard. If the worker_names of lease_owner and consumer_owner are not the same, it means that the worker_name in consumer_owner is no longer allowed to occupy the shard, and the shard is preempted by other workers.

因此,本發明實施例則對於一個worker_name,統計client_shard_lease表,lease_owner和consumer_owner的都為該worker_name情況下,shard的數量P,即為該worker已佔用的shard數量。 Therefore, in the embodiment of the present invention, for a worker_name, the client_shard_lease table is counted. When the lease_owner and consumer_owner are both the worker_name, the number of shards P is the number of shards occupied by the worker.

假設表一的記錄為如下述的表三的形式:

Figure 106102716-A0202-12-0014-4
Assume that the records in Table 1 are in the form of Table 3 as follows:
Figure 106102716-A0202-12-0014-4

比如對於當前用戶端worker A,在client_shard_lease表中,有shard_1、shard_2的lease_owner和consumer_owner的都為worker A,而shard_3的lease_owner為workerB,consumer_owner為worker B。那麼則worker A佔用shard_1、shard_2,而不佔用shard_3。因此worker A佔用的shard數量Q為2。 For example, for the current client worker A, in the client_shard_lease table, the lease_owner and consumer_owner of shard_1 and shard_2 are all worker A, while the lease_owner of shard_3 is worker B, and the consumer_owner is worker B. Then worker A occupies shard_1 and shard_2, but not shard_3. Therefore, the number of shards occupied by worker A is 2.

當然子步驟A11-A13之間的順序本發明實施例不對其加以限制。 Of course, the sequence between the sub-steps A11-A13 is not limited in the embodiment of the present invention.

子步驟A14,通過N=[P/U]-Q,計算當前用戶端需要 搶佔的佇列分片數量N。 Sub-step A14, through N=[P/U]-Q, calculate the number N of queue fragments that the current client needs to preempt.

本發明實施例採用N=[P/U]-Q計算中計算當前用戶端需要搶佔的佇列分片數量N。 In the embodiment of the present invention, N=[P/U]-Q is used to calculate the number of queue fragments N that the current user terminal needs to preempt.

其中[P/U],計算該worker應該佔用的shard個數M。其中[P/U]表示對P/U的計算結果向上取整。 Among them [P/U], calculate the number of shards M that the worker should occupy. Among them, [P/U] means rounding up the calculation result of P/U.

比如在worker A進行具體的搶佔之前,worker F佔用了11個shard,worker B佔用了10個shard,worker C佔用了10個shard,worker D佔用了10個shard,worker E佔用了11個shard。該worker A為新建worker,不佔用shard。而總shard為54個,那麼worker A應該佔用的數量為54/6=9個,worker A需要佔用的shard為9-0=9個。其他情況以此類推。 For example, before worker A performs specific preemption, worker F occupies 11 shards, worker B occupies 10 shards, worker C occupies 10 shards, worker D occupies 10 shards, and worker E occupies 11 shards. The worker A is a new worker and does not occupy a shard. And the total number of shards is 54, then the number of shards that worker A should occupy is 54/6=9, and the number of shards that worker A needs to occupy is 9-0=9. Other situations can be deduced by analogy.

子步驟101,從超時的佇列分片和/或者其他客戶端正在消費的佇列分片中,搶佔N個佇列分片作為當前用戶端的待消費佇列分片。 Sub-step 101, from the timeout queue fragments and/or the queue fragments being consumed by other clients, preempt the N queue fragments as the queue fragments to be consumed by the current client.

在本發明實施例中,因為對於所有的shard,可能存在如下幾種情況: In the embodiment of the present invention, because for all shards, there may be the following situations:

1、存在超時的佇列分片,而超時的shard數量L大於等於N。 1. There are queue fragments that have timed out, and the number of timed out shards L is greater than or equal to N.

2、存在超時的佇列分片,而超時的shard數量L小於N。 2. There are queued shards that have timed out, and the number of timed out shards L is less than N.

3、不存在超時的佇列分片。 3. There is no timeout queue fragment.

對於第1種情況,worker A直接從超時的shard中搶佔N個佇列分片即可。 For the first case, worker A can directly preempt N queue shards from the timeout shard.

對於第2種情況,worker A可以從超時的佇列分片中搶佔不大於L的佇列分片,剩餘的N-L個shard從其他worker中搶佔。 For the second case, worker A can preempt the queue shards that are not larger than L from the timed out shards, and the remaining N-L shards can be preempted from other workers.

對於第3種情況,則直接從其他worker中搶佔N各shard。 For the third case, N shards are directly preempted from other workers.

在本發明另一優選的實施例中,該子步驟101包括子步驟A21-A24: In another preferred embodiment of the present invention, the sub-step 101 includes sub-steps A21-A24:

子步驟A21,判斷該N是否大於0;可以理解的是,對於前述步驟得到的N,如果N=0,則對於該worker,不用搶佔。如果N>0,則才搶佔。 Sub-step A21, judge whether the N is greater than 0; it is understandable that for the N obtained in the previous step, if N=0, then the worker does not need to be preempted. If N>0, then preempt.

子步驟A22,如果該N大於0,則判斷超時的佇列分片數量L是否小於N; In sub-step A22, if the N is greater than 0, determine whether the number of timeout queue fragments L is less than N;

子步驟A23,如果超時的佇列分片數量L大於等於N,則從超時的佇列分片中搶佔N個佇列分片;在本發明實施例中,如果L

Figure 106102716-A0202-12-0016-11
N,則從L個超時佇列分片中隨機搶佔N個。 Sub-step A23, if the number of timeout queue fragments L is greater than or equal to N, then N queue fragments are preempted from the timeout queue fragments; in the embodiment of the present invention, if L
Figure 106102716-A0202-12-0016-11
N, then randomly preempt N from the L timeout queue fragments.

子步驟A24,如果判斷超時的佇列分片數量L小於N,則從超時的佇列分片中搶佔L個佇列分片,並從其他客戶端正在消費的佇列分片中,搶佔N-L個佇列分片。 Sub-step A24, if it is determined that the number of timed out queue fragments L is less than N, then preempt the L queue fragments from the timed out queue fragments, and from the queue fragments that are being consumed by other clients, Preempt NL queue fragments.

如果L<N,則搶佔該L個佇列分片,然後剩餘的N-L個從其他worker正在消費的shard中搶佔。 If L<N, the L queue fragments are preempted, and then the remaining N-L shards are preempted from the shards being consumed by other workers.

可以理解的是如果L=0,則worker A需要從其他worker正在消費的shard中搶佔N個shard。 It is understandable that if L=0, worker A needs to preempt N shards from the shards that other workers are consuming.

進一步的,在本發明另一優選的實施例中,子步驟 A24中該從其他客戶端正在消費的佇列分片中,搶佔N-L個佇列分片的步驟,包括:子步驟A241:子步驟A241,當前用戶端從其他用戶端佔用的佇列分片中搶佔N-L各佇列分片,並使各用戶端佔用最多佇列分片的數量和佔用最少佇列分片的數量之差不超過指定數量。 Further, in another preferred embodiment of the present invention, the step of preempting NL queue fragments from the queue fragments being consumed by other clients in sub-step A24 includes: sub-step A241: sub-step A241, the current client seizes each NL queue fragment from the queue fragments occupied by other client terminals, and the difference between the number of the most queue fragments occupied by each client and the number of the least queue fragments occupied by each client does not exceed Specify the quantity.

在本發明實施例中,每個worker的搶佔執行緒,在搶佔時,其保證各個worker佔用的shard的數量滿足以下條件:佔用最多佇列分片的數量和佔用最少佇列分片的數量之差不超過指定數量。該指定數量比如為1。 In the embodiment of the present invention, the preemption thread of each worker, during preemption, ensures that the number of shards occupied by each worker meets the following conditions: the number of shards occupying the most queue and the number of shards occupying the least queue The difference does not exceed the specified amount. The specified number is, for example, 1.

那麼當前用戶端worker A在從其他worker搶佔shard時,也使各用戶端佔用最多佇列分片的數量和佔用最少佇列分片的數量之差不超過指定數量。 Then when the current client worker A preempts the shard from other workers, the difference between the number of shards occupying the largest queue and the number of shards occupying the smallest queue does not exceed the specified number.

比如在worker A進行具體的搶佔之前,worker B佔用了11個shard,worker C佔用了10個shard,workerD佔用了10個shard,worker E佔用了10個shard,worker F佔用了11個shard。而總shard為54個,超時的佇列分片為2個。worker A為新啟動的worker。 For example, before worker A performs specific preemption, worker B occupies 11 shards, worker C occupies 10 shards, worker D occupies 10 shards, worker E occupies 10 shards, and worker F occupies 11 shards. The total number of shards is 54 and the number of timeout queue fragments is 2. Worker A is the newly started worker.

那麼需要為worker A從worker B搶佔2個shard,從worker C搶佔1個,從worker D搶佔1個shard,從worker E搶佔1個shard,從worker F搶佔2個shard,從超時的shard中搶佔2個。保證各個worker佔用的shard數量不超過1。其他情況以此類推。 Then you need to preempt 2 shards from worker B for worker A, 1 shard from worker C, 1 shard from worker D, 1 shard from worker E, 2 shards from worker F, and shards that have timed out. Seize 2 of them. Ensure that the number of shards occupied by each worker does not exceed 1. Other situations can be deduced by analogy.

在本發明另一優選的實施例中,該子步驟A241包 括:子步驟A2411-子步驟A2412: In another preferred embodiment of the present invention, the sub-step A241 includes: sub-step A2411-sub-step A2412:

子步驟A2411,基於該佇列租期表,將各個用戶端按照其佔用的佇列分片數量,從多到少進行排序;在本發明實施例中,在worker A搶佔shard之前,首先基於client_shard_lease表,統計各個worker_name佔用的shard_id的數量。 Sub-step A2411, based on the queue lease time table, each client is sorted according to the number of queue fragments occupied by it, from most to least; in the embodiment of the present invention, before worker A preempts the shard, it is first based on client_shard_lease Table, count the number of shard_id occupied by each worker_name.

比如前述例子,按數量進行排序得到如表四:

Figure 106102716-A0202-12-0018-5
For example, in the previous example, sorted by quantity, the results are shown in Table 4:
Figure 106102716-A0202-12-0018-5

其中,超時的shard有2個,worker A需要搶佔9個。 Among them, there are 2 shards that have timed out, and worker A needs to preempt 9 shards.

子步驟A2412,每次從前K個用戶端中搶佔一個或多個搶佔佇列分片後,使前K個用戶端被搶佔J個之後,前K個用戶端剩下的佇列分片的平均數大於第K+1個用戶端當前佔用的佇列分片數量,並且使前K個用戶端中,各個用戶端剩餘的佇列分片數與該平均數相差不超過指定數量,直至成功搶佔到N-L個佇列分片。 Sub-step A2412, each time one or more preemption queue fragments are preempted from the first K client terminals, the average of the remaining queue fragments of the first K client terminals after the first K client terminals are preempted by J The number is greater than the number of queue fragments currently occupied by the K+1 client, and the remaining number of queue fragments of each client among the first K client terminals does not differ from the average number by more than the specified number, until the preemption is successful To NL queue fragments.

可以理解的是,worker A在搶佔其他worker的shard時,對於搶佔後一個worker的shard數量不超過前一shard的數量。 It is understandable that when worker A preempts the shards of other workers, the number of shards of the next worker does not exceed the number of the previous shard.

以指定數量為1,對前述表四進行搶佔,由於為 worker A可以搶佔2個超時佇列。那麼worker A需要從其他worker中搶佔7個shard。 Taking the specified number as 1, preempt the aforementioned Table 4, because it is worker A, it can preempt 2 timeout queues. Then worker A needs to grab 7 shards from other workers.

比如先設定K=1,從第1個搶佔,則不滿足剩下的shard數量大於第2個的shard數量11。 For example, if you first set K=1 and preempt from the first one, it does not satisfy that the number of remaining shards is greater than the number of shards of the second 11.

再設定K=2,由於使前K個工作進程每個剩餘的佇列分片數與該平均數相差不超過指定數量,和搶佔後剩下的佇列分片的平均數大於第K+1個工作進程當前佔用的佇列分片數量,兩個條件的限制,不能從前2個work中搶佔大於2個的shard。那麼可以從第1個搶佔1個,第2個不搶佔。此時worker F還需搶佔6個shard,需要繼續搶佔。 Then set K=2, because the difference between the number of remaining queue fragments of each of the first K work processes and the average number does not exceed the specified number, and the average number of remaining queue fragments after preemption is greater than the K+1 The number of shards in the queue currently occupied by a worker process is limited by two conditions, and it is not possible to preempt more than 2 shards from the first 2 jobs. Then one can be preempted from the first one, and the second one is not preempted. At this time worker F still needs to seize 6 shards, and needs to continue seizing.

此時表四變更為表五:

Figure 106102716-A0202-12-0019-6
At this time, Table 4 is changed to Table 5:
Figure 106102716-A0202-12-0019-6

再設定K=3,此時因為如果搶佔後,剩下的佇列分片的平均數大於第K+1個工作進程當前佔用的佇列分片數量的條件限制,不能從前3個中搶任一個shard。K=4類似。 Then set K=3, because if the average number of remaining queue fragments is greater than the condition limit of the number of queue fragments currently occupied by the K+1 working process after preemption, you cannot preempt from the first 3 A shard. K=4 is similar.

那麼設置K等於5,則從worker C搶佔2個shard、從worker B、worker D、worke E、worker F中分別搶佔1個shard 後,才滿足前述兩個條件,其他搶佔方式不滿足前述兩個條件。 Then set K equal to 5, then preempt 2 shards from worker C, and 1 shard from worker B, worker D, worker E, and worker F before satisfying the foregoing two conditions. Other preemption methods do not satisfy the foregoing two condition.

基於前述佇列租期表,在本發明另一優選的實施例中,該子步驟101包括:子步驟1011,在搶佔一佇列分片時,將佇列租期表中,該被搶佔佇列分片的消費者欄位修改為當前的用戶端。 Based on the aforementioned queue lease schedule, in another preferred embodiment of the present invention, the sub-step 101 includes: sub-step 1011, when a queue segment is preempted, the queue is queued in the lease table, and the preempted queue The consumer field of the column fragment is modified to the current client.

比如表三中,當worker A從worker B搶佔了shard_3後,則將lease_owner修改為worker A,則表三變化為如下表六:

Figure 106102716-A0202-12-0020-7
For example, in Table 3, when worker A preempts shard_3 from worker B, the lease_owner is changed to worker A, and table 3 is changed to the following table 6:
Figure 106102716-A0202-12-0020-7

也可以理解為,用戶端在搶佔了一個佇列分片後,也會根據租賃協定更新該shard的狀態資料。 It can also be understood that after the client has preempted a queue segment, it will also update the status data of the shard according to the lease agreement.

如此,通過上述步驟,為用戶端worker A搶佔了N了shard,該N個shard對於worker A來說,就是待消費的佇列分片。 In this way, through the above steps, N shards are preempted for the client worker A. For worker A, the N shards are queue fragments to be consumed.

在實際應用中,在子步驟101之前需要判斷超時的佇列分片,優選的,在子步驟101之前還包括:子步驟A31-A34: In practical applications, it is necessary to determine the timeout queue fragments before sub-step 101. Preferably, before sub-step 101, it further includes: sub-steps A31-A34:

子步驟A31,每隔第一時間週期,獲取佇列租期表,並判斷該佇列租期表中各佇列分片下租用者欄位的租用ID,是否與本機存放區的map表中上次記錄的相應佇列 分片的租用ID相同;在本發明實施例中,每個worker的client lib創建了一個map表,該map表包括佇列分片標識欄位shard_id、租用標識欄位lease_id、上次監控時間欄位last_update_time。 Sub-step A31, every first time period, obtain the queue lease period table, and determine whether the lease ID of the tenant field under each queue segment in the queue lease period table is consistent with the map table of the local storage area The lease ID of the corresponding queue segment recorded last time is the same; in the embodiment of the present invention, the client lib of each worker creates a map table, which includes the queue segment identification field shard_id and the lease identification field Bit lease_id, last monitoring time field last_update_time.

其中,該map表在worker所在計算伺服器的記憶體中。 Among them, the map table is in the memory of the computing server where the worker is located.

在map表創建後,先從client_shard_lease表獲取所有shard的相關資訊寫入map表。該相關資訊如以shard_id為主鍵,對於每個shard,逐條記錄shard_id、lease_id和當前系統時間。然後即可進入步驟B11進行超時監控過程。 After the map table is created, first obtain all shard information from the client_shard_lease table and write it into the map table. For example, the related information is based on shard_id, and for each shard, shard_id, lease_id and current system time are recorded one by one. Then you can enter step B11 to perform the timeout monitoring process.

在本發明實施例中,一個worker的client lib每隔第一時間週期T,從資料庫的client_shard_lease表中,獲取各個shard_id以及該shard_id的lease_id。 In the embodiment of the present invention, the client lib of a worker obtains each shard_id and the lease_id of the shard_id from the client_shard_lease table of the database every first time period T.

然後,將client_shard_lease表的shard_id+lease_id,與map表中的shard_id+lease_id進行匹配。 Then, match the shard_id+lease_id in the client_shard_lease table with the shard_id+lease_id in the map table.

如果shard_id未匹配上,則說明該shard是新出現的,則map中沒有相應記錄,將該shard_id+lease_id添加到map表中,並在對應的last_update_time欄位下記錄當前的系統時間。 If the shard_id does not match, it means that the shard is new, and there is no corresponding record in the map. Add the shard_id+lease_id to the map table and record the current system time in the corresponding last_update_time field.

另外,本發明對於新出現的shard。如果shard_id的lease_id不為0,則說明該shard被某個worker搶佔或者佔用,則可以將該shard_id、lease_id記錄到map表,並 在last_update_time中記錄當前時間。如果shard_id的lease_id為0,說明沒有worker搶佔或者佔用該shard,在map表中記錄shard_id的lease_id=0,last_update_time設置為0,該可以直接認為該shard超時。 In addition, the present invention is for newly emerging shards. If the lease_id of the shard_id is not 0, it means that the shard is preempted or occupied by a worker. You can record the shard_id and lease_id in the map table, and record the current time in last_update_time. If the lease_id of the shard_id is 0, it means that no worker preempts or occupies the shard. Record the lease_id=0 of the shard_id in the map table and set the last_update_time to 0. It can be directly considered that the shard has timed out.

如果shard_id匹配上,而lease_id未匹配上,則進入步驟A32。 If the shard_id matches but the lease_id does not match, go to step A32.

如果shard_id+lease_id匹配上,則進入步驟A33。 If shard_id+lease_id matches, go to step A33.

子步驟A32,如果佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID不同,則更新map表中該佇列分片的租用ID為佇列租期表中的租用ID,以及更新map表中該佇列分片的上次監控時間欄位為當前的系統時間; 在本發明實施例中,由於各個worker的續租執行緒每隔第二時間週期為該worker續租其lease_owner和consumer_owner相同的shard。其中該第二時間週期小於第一時間週期,第二時間週期比如T1/2。在續租一個shard時,會更改client_shard_lease表lease_id為lease_id+1。那麼該worker如果續租shard,則lease_id變化,說明worker在正常消費該shard。如果lease_owner和consumer_owner相同不同,則不允許該worker續租該shard。 Sub-step A32, if the lease ID of a queue segment in the queue lease table is different from the lease ID of the corresponding queue segment recorded last time in the map table, update the lease of the queue segment in the map table ID is the lease ID in the queue lease period table, and the last monitoring time field of the queue segment in the updated map table is the current system time; in the embodiment of the present invention, due to the renewal thread of each worker Renew the lease for the worker with the same lease_owner and consumer_owner shards every second time period. The second time period is less than the first time period, and the second time period is, for example, T1/2. When renewing a shard, the client_shard_lease table lease_id will be changed to lease_id+1. Then if the worker renews the lease of the shard, the lease_id will change, indicating that the worker is consuming the shard normally. If lease_owner and consumer_owner are the same and different, the worker is not allowed to renew the shard.

那麼,本發明實施例的當前用戶端workerA,針對各個shard,判斷出map表shard_id與佇列租期表的shard_id匹配上,而lease_id未匹配上,則確定回應的 shard未超時,在map表的shard_id後的last_update_time欄位下,將其last_update_time中記錄的系統時間更新為當前的系統時間。 Then, the current user-side workerA in the embodiment of the present invention, for each shard, determines that the map table shard_id matches the shard_id of the queue lease table, but the lease_id does not match, then it is determined that the responding shard has not timed out, in the map table Under the last_update_time field after the shard_id, update the system time recorded in its last_update_time to the current system time.

子步驟A33,如果佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID相同,則維持上次監控時間欄位下的系統時間,並判斷當前系統時間減去上次監控的系統時間是否大於第一時間週期; 如前所述,那麼如果沒有為worker續租的shard,則lease_id不變化,說明worker沒有正常消費該shard。 Sub-step A33, if the lease ID of a queue segment in the queue lease period table is the same as the lease ID of the corresponding queue segment recorded last time in the map table, then the system time under the last monitoring time field is maintained , And determine whether the current system time minus the last monitored system time is greater than the first time period; as mentioned above, if there is no shard for renewing the worker's lease, the lease_id does not change, indicating that the worker does not consume the shard normally.

那麼,worker A維持其map表中的last_update_time欄位下的系統時間不變,然後判斷當前系統時間sys_time減去last_update_time是否大於第一時間週期T。 Then, worker A maintains the system time under the last_update_time column in the map table unchanged, and then determines whether the current system time sys_time minus last_update_time is greater than the first time period T.

子步驟A34,如果當前系統時間減去上次監控的系統時間大於第一時間週期,則確定相應佇列分片超時。 In sub-step A34, if the current system time minus the last monitored system time is greater than the first time period, it is determined that the corresponding queue segment has timed out.

如果當前系統時間sys_time減去last_update_time大於第一時間週期T,則說明該shard超時。該shard則會被作為被搶佔對象。可以進入步驟A11。 If the current system time sys_time minus last_update_time is greater than the first time period T, it means that the shard has timed out. The shard will be regarded as the preempted object. You can go to step A11.

那麼在當前用戶端搶佔了各佇列分片後,該步驟110包括子步驟111:子步驟111,針對一待消費的佇列分片,從持久化儲存空間中儲存的佇列租期表中獲取該佇列分片的狀態資料。 Then after the current client has preempted each queue segment, the step 110 includes sub-step 111: sub-step 111, for a queue segment to be consumed, from the queue lease table stored in the persistent storage space Get the status data of the queue segment.

在本發明實施例中,由於各佇列分片被任一一用戶端 消費時,或者該佇列分片新生成時,其狀態資料被寫入到前述佇列租期表中,那麼當前用戶端worker A在搶佔了N個佇列分片後,則可以從佇列租期表中獲取該佇列分片的狀態資料。進入步驟120。 In the embodiment of the present invention, since each queue segment is consumed by any client, or when the queue segment is newly generated, its status data is written into the aforementioned queue lease table, then the current user After the end worker A has preempted the N queue shards, it can obtain the status data of the queue shard from the queue lease table. Go to step 120.

步驟120,根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片;在本發明實施例中,對於當前用戶端worker A搶佔的N個shard中的每個shard,由於前述步驟獲取了每個shard的狀態資料,則可以根據該狀態資料,分析這些shard是否還有其他worker正在使用該shard。如果沒有使用該shard,則進入步驟130。如果還在使用,則繼續獲取該shard新的狀態資料,進行上述判斷。 Step 120: According to the status data, determine whether there are other clients consuming the queue segment; in the embodiment of the present invention, for each of the N shards preempted by the current client worker A, the result is obtained by the foregoing steps. With the status data of each shard, you can analyze whether these shards have other workers using the shard based on the status data. If the shard is not used, go to step 130. If it is still in use, continue to obtain the new status data of the shard and make the above judgment.

優選的,基於前述佇列租期表,該狀態資料包括佇列租期表中該佇列分片下消費者欄位的值、租用者欄位的值和更新時間欄位的消費時間;該消費時間為當前用戶端搶佔該佇列分片後修改該更新時間欄位的值獲得。 Preferably, based on the aforementioned queue rental period table, the status data includes the value of the consumer field under the queue segment in the queue rental period table, the value of the renter field, and the consumption time of the update time field; The consumption time is obtained by modifying the value of the update time field after the current client preempts the queue segment.

基於前述的佇列租期表,對於worker A搶佔的各個shard,根據shard_id從佇列租期表中獲取該shard的狀態資料,如lease_owner、consumer_owner、update_time各自的值。 Based on the aforementioned queue lease table, for each shard preempted by worker A, the status data of the shard, such as lease_owner, consumer_owner, and update_time, are obtained from the queue lease table according to the shard_id.

該update_time的值是worker A搶佔該shard時,將消費時間值基於shard_id生成時間更新請求發送到資料庫,由資料庫更新佇列租期表的該shard_id的update_time值得到。 The value of the update_time is that when worker A preempts the shard, the consumption time value is sent to the database based on the shard_id generation time update request, which is obtained from the update_time value of the shard_id in the database update queue lease table.

當然,在實際應用中,還可結合消費組去獲取shard的狀態資料,以及向資料庫發送更新請求,以更新佇列租期表中的狀態資料。 Of course, in practical applications, it can also be combined with the consumer group to obtain the status data of the shard, and send an update request to the database to update the status data in the queue lease.

進一步的,該步驟120包括子步驟121-123:子步驟121,判斷該佇列分片的消費者欄位的值的值是否為當前用戶端;在實際應用中,當前用戶端worker A,針對其待消費的shard,根據相應的shard_id,向資料庫發送狀態資料獲取請求,以獲取相應的狀態資料後,可以從狀態資料中提取lease_owner的值,如果該值為worker A,則意味著該用戶端成功搶佔該shard。進入子步驟121。 Further, this step 120 includes sub-steps 121-123: sub-step 121, judging whether the value of the consumer field of the queue segment is the current client; in practical applications, the current client worker A is for The shard to be consumed, according to the corresponding shard_id, sends a status data acquisition request to the database to obtain the corresponding status data, and then the value of lease_owner can be extracted from the status data. If the value is worker A, it means the user The terminal successfully seizes the shard. Proceed to sub-step 121.

子步驟122,如果該佇列分片的消費者欄位的值是當前用戶端,則判斷當前用戶端的搶佔時間是否大於該消費時間;子步驟123,如果當前用戶端的搶佔時間大於該消費時間,則確定沒有其他用戶端在消費該佇列分片。 Sub-step 122, if the value of the consumer field of the queue segment is the current client, determine whether the preemption time of the current client is greater than the consumption time; sub-step 123, if the preemption time of the current client is greater than the consumption time, It is determined that no other client is consuming the queue segment.

在實際用於中,由於當前用戶端worker A搶佔了shard後,會更新資料庫中的佇列租期表下,相應shard_id的update_time值為消費時間。 In actual use, after the current client worker A preempts the shard, it will update the queue lease table in the database, and the update_time value of the corresponding shard_id is the consumption time.

在當前用戶端worker A本地可以記錄搶佔時間。 The preemption time can be recorded locally on worker A on the current user side.

如此,可以判斷當前用戶端的搶佔時間是否大於該消費時間。如果當前用戶端的搶佔時間大於該消費時間,則說明沒有其他用戶端在消費該佇列分片,該佇列分片的消費進度沒有其他用戶端更新。如果當前用戶端的搶佔時間 不大於該消費時間,則表示還可能有其他用戶端在消費該佇列分片,其他用戶端的消費進度可能還未更新到佇列租期表中。 In this way, it can be judged whether the preemption time of the current user terminal is greater than the consumption time. If the preemption time of the current client is greater than the consumption time, it means that no other client is consuming the queue segment, and the consumption progress of the queue segment is not updated by other client terminals. If the preemption time of the current client is not greater than the consumption time, it means that there may be other client consuming the queue segment, and the consumption progress of other clients may not have been updated to the queue lease period table.

優選的,該搶佔時間為當前用戶端的系統時間,該消費時間為該用戶端搶佔該佇列分片時的系統時間與第一週期時間之和。 Preferably, the preemption time is the system time of the current user end, and the consumption time is the sum of the system time when the user end preempts the queue segment and the first cycle time.

在本發明實施例中,為了計算方便,在當前用戶端workerA搶佔shard時,即將佇列租期表的lease_owner更新為worker A時,同時會將worker A的當前系統時間加上第一時間週期得到的消費時間,更新到佇列租期表該shard_id下的update_time。 In the embodiment of the present invention, for the convenience of calculation, when the current client worker A preempts the shard, when the lease_owner of the queue lease table is updated to worker A, the current system time of worker A is added to the first time period to obtain The consumption time is updated to the update_time under the shard_id in the queue rental period table.

那麼在步驟110獲取到shard_id的狀態資料後,子步驟122從update_time提取消費時間,然後直接以worker A當前的系統時間與該消費時間比較,如果當前系統時間超過該消費時間,則確定沒有其他用戶端在消費該佇列分片。 Then after obtaining the status data of shard_id in step 110, sub-step 122 extracts the consumption time from update_time, and then directly compares the current system time of worker A with the consumption time. If the current system time exceeds the consumption time, it is determined that there are no other users The end is consuming the queue fragment.

在實際應用中,由於各個worker在不同的計算伺服器中,而由於計算伺服器的系統不同,所以各個worker的系統時間也可能存在差異。那麼本發明則針對各個worker記錄其自己的系統時間,而統一第一時間週期T。以前述worker A為例,通過前述步驟可以很容易得到從worker A搶佔開始後,經過T後,系統應該到達的系統時間,該應該到達的系統時間即前述消費時間。當worker A的系統時間超過shard_3的消費時間,則確定沒有其他用 戶端在消費該佇列分片。比如worker A搶佔shard_3時的系統時間為12:00:00:000,第一時間週期T為50ms,那麼消費時間為12:00:00:050,此時worker A可以每隔T/2,從資料庫獲取佇列租期表的shard_3的狀態資料,該狀態資料包括update_time。那麼若系統時間走到12:00:00:051,則大於消費時間,則意味著其他用戶端在第一時間週期T內,將最新的消費進度寫入了shard_3的check_point中,可以進入步驟130。如系統時間不大於消費時間,則說明可能存在其他用戶端在消費該佇列分片。如此,避免了各個用戶端的系統時間的差異,導致當前用戶端對確定沒有其他用戶端在消費該佇列分片的判斷出錯。 In practical applications, because each worker is in a different computing server, and because the computing server has a different system, the system time of each worker may also be different. Then, the present invention records its own system time for each worker, and unifies the first time period T. Taking the aforementioned worker A as an example, through the aforementioned steps, it is easy to obtain the system time that the system should reach after the preemption of worker A begins and after T has passed, and the system time that should arrive is the aforementioned consumption time. When the system time of worker A exceeds the consumption time of shard_3, it is determined that no other users are consuming the queue segment. For example, the system time when worker A preempts shard_3 is 12:00:00:000, and the first time period T is 50ms, then the consumption time is 12:00:00:050. At this time, worker A can start every T/2 from The database obtains the status data of shard_3 of the queued lease period table, and the status data includes update_time. Then, if the system time reaches 12:00:00:051, it is greater than the consumption time, which means that other users write the latest consumption progress into the check_point of shard_3 in the first time period T, and you can go to step 130 . If the system time is not greater than the consumption time, it means that there may be other clients consuming the queue segment. In this way, the difference of the system time of each client is avoided, which causes the current client to make an error in determining that no other client is consuming the queue segment.

步驟130,如果確定沒有其他用戶端在消費該佇列分片,則更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度;在本發明實施例中,如果搶佔時間大於消費時間,則確定沒有其他用戶端在消費該佇列分片,可以向資料庫發送更新請求,更新佇列租期表中該佇列分片的狀態資料,同時從資料庫的佇列租期表中,獲取該佇列分片的進度欄位的消費進度。 Step 130: If it is determined that no other client is consuming the queue segment, update the status data of the queue segment, and obtain the current consumption progress of the queue segment; in the embodiment of the present invention, if time is preempted If it is longer than the consumption time, it is determined that no other client is consuming the queue segment, and can send an update request to the database, update the status data of the queue segment in the queue lease table, and rent from the queue of the database at the same time In the timetable, obtain the consumption progress of the progress field of the queue segment.

如前所述,如果搶佔時間不大於消費時間,則可能有其他用戶端在消費該shard,可繼續執行步驟110,獲取狀態資料進行判斷。 As mentioned above, if the preemption time is not greater than the consumption time, there may be other clients consuming the shard, and step 110 may be continued to obtain status data for judgment.

基於前述佇列租期表,在本發明另一優選的實施例 中,步驟130包括子步驟131-132:子步驟131,租用該佇列分片,並將持久化儲存空間中儲存佇列租期表中該佇列分片下的消費者欄位更新為當前用戶端;子步驟132,從持久化儲存空間中儲存的佇列租期表中,獲取該佇列分片的進度欄位下的消費進度。 Based on the aforementioned queue lease schedule, in another preferred embodiment of the present invention, step 130 includes sub-steps 131-132: sub-step 131, lease the queue segment, and store the queue lease in the persistent storage space The consumer field under the queue shard in the schedule is updated to the current client; sub-step 132, from the queue lease table stored in the persistent storage space, obtain the progress field of the queue shard Consumption progress.

在實際應用中,對於worker A搶佔的任一待消費的shard,當判斷搶佔時間大於消費時間後,則說明佇列租期表中記錄的該shard的消費進度為最新的shard,並且沒有其他worker在消費該shard。 In practical applications, for any shard to be consumed preempted by worker A, when it is judged that the preemption time is greater than the consumption time, it means that the consumption progress of the shard recorded in the queue lease table is the latest shard, and there are no other workers. The shard is being consumed.

所以,worker A的租用執行緒可以租用該shard,然後向資料庫發送請求,以更新資料庫中佇列租期表的相應shard_ id的consumer_owner為worker A。當然,在實際應用中,還更新佇列租期表中該shard_id下的lease_id為lease_id+1,表示worker A佔用了該佇列分片。 Therefore, the renting thread of worker A can rent the shard, and then send a request to the database to update the consumer_owner of the corresponding shard_id of the queued lease table in the database to worker A. Of course, in actual applications, the lease_id under the shard_id in the queue lease table is also updated to lease_id+1, which means that worker A occupies the queue shard.

同時,可以從佇列租期表的相應shard_id的update_time下,獲取其記錄消費進度。 At the same time, the record consumption progress can be obtained from the update_time of the corresponding shard_id in the queue lease period table.

如果搶佔時間大於消費時間,那麼可以只更新佇列租期表中該shard_id下的lease_id為lease_id+1,不讓其他用戶端搶佔該shard。 If the preemption time is greater than the consumption time, you can only update the lease_id under the shard_id in the queue lease table to lease_id+1, and prevent other clients from preempting the shard.

如此,可以通過該上述方式,對於其他用戶端消費過的shard,其最後的消費進度可以無縫轉移到當前用戶端中,不存在重複消費的問題。 In this way, in this way, for shards that have been consumed by other users, the final consumption progress can be seamlessly transferred to the current user, and there is no problem of repeated consumption.

在本發明另一優選的實施例中,在子步驟132之後, 還包括子步驟133-134:子步驟133,判斷在佇列租期表中,該佇列分片下的搶佔者欄位和消費者欄位是否為當前用戶端;子步驟134,如果述佇列分片下的搶佔者欄位和消費者欄位是否為當前用戶端,則為當前用戶端續租該佇列分片。 In another preferred embodiment of the present invention, after sub-step 132, it further includes sub-steps 133-134: sub-step 133, judging in the queue lease table, the preemptor field under the queue segment and Whether the consumer field is the current client; sub-step 134, if the preemptor field and the consumer field under the queue segment are the current client, then the queue segment is renewed for the current client.

可以理解,每個worker的續租執行緒判斷該worker佔用各個shard,在client_shard_lease表各個shard_id下的consumer_onwer和lease_owner是否相同。如果相同,則為該worker續租該shard。如果不同,則不為該worker續租該shard。 Understandably, the lease renewal thread of each worker determines whether the worker occupies each shard, and whether the consumer_onwer and lease_owner under each shard_id in the client_shard_lease table are the same. If they are the same, the shard is renewed for the worker. If they are different, the shard will not be renewed for the worker.

在實際應用中,每個worker的續租執行緒每隔T1/2執行步驟A41至A42的過程。 In practical applications, each worker's lease renewal thread executes the process of steps A41 to A42 every T1/2.

在每個工作進程消費的過程中,還包括:步驟A31,針對每個工作進程,在消費該工作進程所佔用的佇列分片時,每隔第一時間週期,在工作進程所在記憶體中,基於上一次的租用ID生成新的租用ID;在本發明實施例中,對於每個worker消費任一shard的過程中,每隔第一時間週期T。 During the consumption process of each work process, it also includes: step A31, for each work process, when consuming the queue fragments occupied by the work process, every first time period, in the memory where the work process is located , Generate a new lease ID based on the last lease ID; in the embodiment of the present invention, during the process of consuming any shard for each worker, every first time period T.

步驟A32,判斷該工作進程記憶體中記錄的上一次租用ID,與佇列租期表中相應佇列分片的租用ID是否相同;步驟A33,如果相同,則在更新佇列租期表中租用標識欄位為該新的租用ID; 步驟A34,如果不同,則拒絕更新佇列租期表中租用標識欄位為該新的租用ID。 Step A32, determine whether the last lease ID recorded in the memory of the work process is the same as the lease ID of the corresponding queue segment in the queue lease period table; Step A33, if they are the same, in the update queue lease period table The lease identification field is the new lease ID; step A34, if they are different, refuse to update the lease identification field in the queue lease period table to the new lease ID.

在本發明實施例中,各個worker對應的clientlib以當前記憶體中各shard的lease_id去更新資料庫中該shard的lease_id。client lib每隔T時間在記憶體中將該shard的lease_id+1,同時以上次的lease_id與client_shard_lease表中該shard的lease_id比較,如果相同,則允許更新client_shard_lease表的lease_id為lease_id+1,如果不同則不允許進行上述更新。由於該操作的原子性,能夠保證任意時刻只有一個worker能更新成功,即只有一個worker能搶到該shard的lease。 In the embodiment of the present invention, the clientlib corresponding to each worker uses the lease_id of each shard in the current memory to update the lease_id of the shard in the database. The client lib adds the lease_id of the shard to the memory every T time, and compares the lease_id of the last time with the lease_id of the shard in the client_shard_lease table. If they are the same, the lease_id of the client_shard_lease table is allowed to be updated to lease_id+1, if different The above update is not allowed. Due to the atomicity of the operation, it can be guaranteed that only one worker can update successfully at any time, that is, only one worker can grab the lease of the shard.

步驟140,根據當前的消費進度繼續消費該佇列分片,並將該佇列分片新的消費進度進行記錄。 Step 140: Continue to consume the queue segment according to the current consumption progress, and record the new consumption progress of the queue segment.

當獲取到上述shard的消費進度,則worker A可以從該shard的消費進度處繼續消費該shard。同時,在消費該shard的過程中,將新的消費進度更新到資料庫的佇列租期表中。 When the consumption progress of the above-mentioned shard is obtained, worker A can continue to consume the shard from the consumption progress of the shard. At the same time, in the process of consuming the shard, the new consumption progress is updated to the queue lease period table of the database.

優選的,步驟140中將該佇列分片新的消費進度進行記錄的步驟,包括:子步驟A41-A42:子步驟A41,每隔第二時間週期,判斷佇列租期表中,該佇列分片的租用者欄位是否為當前用戶端;子步驟A42,如果該佇列分片的租用者欄位是當前用戶端,則將當前用戶端對該佇列分片的消費進度,更新到佇列租期表中該佇列分片的進度欄位下。 Preferably, the step of recording the new consumption progress of the queue fragments in step 140 includes: sub-steps A41-A42: sub-step A41, every second time period, determine whether the queue is in the queue Whether the tenant field of the queue segment is the current client; sub-step A42, if the tenant field of the queue segment is the current client, update the consumption progress of the queue segment by the current client Go to the progress column of the queue segment in the queue lease table.

比如對於前述表六,當前用戶端worker A確定可以對shard_3進行消費之後,則從表六的shard_3之下的check_point欄位下讀取消費進度2/5,然後從shard_3的2/5處開始讀取資料進行消費。 For example, for the aforementioned Table 6, after the current user-side worker A determines that it can consume shard_3, it reads the consumption progress 2/5 from the check_point column under shard_3 in Table 6, and then starts reading from 2/5 of shard_3 Take data for consumption.

需要說明的是,如果資料庫該shard的check_point的資訊為空,則根據預先的配置,從shard的begin或者end處開始讀取資料。優選配置從begin處開始讀取資料,避免資料遺漏。 It should be noted that if the check_point information of the shard in the database is empty, the data will be read from the beginning or end of the shard according to the pre-configuration. It is preferable to configure to read data from begin to avoid data omission.

同時,在worker A消費shard_3的過程中,將新的消費進度更新到資料庫的佇列租期表的shard_3的check_point欄位下。 At the same time, in the process of consuming shard_3 by worker A, the new consumption progress is updated to the check_point field of shard_3 in the queue lease table of the database.

需要說明的是,本發明實施例的client lib提供了check point介面,通過該check point介面來完成保存check point的資訊,確保在worker進行fail over,或shard被不同worker搶佔的時候,worker正確消費shard中數據。Check point的使用有以下兩個部分組成: It should be noted that the client lib of the embodiment of the present invention provides a check point interface, through which check point information is saved, ensuring that the worker consumes correctly when the worker fails over or the shard is preempted by different workers Data in the shard. The use of check point consists of the following two parts:

(1)用於worker初始化check point的部分: (1) The part used for worker initialization check point:

(1.1)當一個shard被確定可消費的時候,client自動從資料庫載入check point信息。即執行步驟120的相關方法。 (1.1) When a shard is determined to be consumed, the client automatically loads the check point information from the database. That is, the relevant method of step 120 is executed.

如果資料庫該shard的check_point的資訊為空,則根據預先的配置,從shard的begin或者end處開始讀取資料。 If the check_point information of the shard in the database is empty, the data will be read from the beginning or end of the shard according to the pre-configuration.

(2)用於worker持久化check point的部分,即執 行步驟140相關方法: (2) The part used for worker's persistent check point, that is, the related method of executing step 140:

(2.1)Clientlib提供介面進行check point的操作saveCheckPoint(Bool persistent)。其中,參數persistent用於控制是否需要理解持久化到外部資料庫中,如果persistent為true,則立即持久化至資料庫,否則的話,每隔一定時間,持久化一次。在本發明實施例中persistent的值由client lib控制。 (2.1) Clientlib provides an interface to perform check point operations saveCheckPoint (Bool persistent). Among them, the parameter persistent is used to control whether it is necessary to understand the persistence to an external database. If persistent is true, it will be persisted to the database immediately, otherwise, it will be persisted at regular intervals. In the embodiment of the present invention, the value of persistent is controlled by the client lib.

(2.11)如果persistent為true,則將消費進度持久化到資料庫中相應shard的check_point欄位下。在本發明實施例中,在佇列分片的佇列租期表中的租用者欄位被修改為一搶佔的工作進程後,則在當前所處的第一時間週期T結束時,將persistent修改為true,從而可以立即將消費進度持久化到資料庫中相應shard的check_point欄位下。 (2.11) If persistent is true, the consumption progress will be persisted to the check_point column of the corresponding shard in the database. In the embodiment of the present invention, after the renter field in the queue lease period table of the queue fragment is modified to a preemptive work process, at the end of the current first time period T, the persistent Modify it to true to immediately persist the consumption progress to the check_point column of the corresponding shard in the database.

(2.12)如果persistent不為true,則將消費進度保存在記憶體中,同時,後臺會定期將超過指定時間長度長時間還沒有持久化消費進度,持久化到資料庫中相應shard的check_point欄位下。 (2.12) If persistent is not true, the consumption progress will be saved in the memory. At the same time, the background will periodically persist the consumption progress that has not been persisted for a long time longer than the specified time and persist to the check_point field of the corresponding shard in the database Down.

在本發明實施例中,在佇列分片的lease_owner和consumer_owner相同時,可以保持persistent不為true,從而可以定期將超過指定時間長度長時間還沒有持久化消費進度,持久化到資料庫中相應shard的check_point欄位下。該定期如2T。 In the embodiment of the present invention, when the lease_owner and consumer_owner of the queue shards are the same, persistent can be kept not true, so that the consumption progress that has not been persisted for a long time longer than a specified time period can be persisted to the corresponding database. Under the check_point column of the shard. It should be regularly as 2T.

(2.2)對於一個shard,在正常情況下,只有當資料 庫中,consumer_onwer和worker_name相同的時候,clientlib才將消費進度持久化到資料庫中相應shard的check_point欄位下。 (2.2) For a shard, under normal circumstances, only when consumer_onwer and worker_name in the database are the same, clientlib will persist the consumption progress to the check_point column of the corresponding shard in the database.

(2.3)當一個shard被其他worker搶佔或者當掉之後,clientlib調用shutdown介面,通知上層應用,將消費進度持久化到資料庫中相應shard的check_point欄位下。 (2.3) When a shard is preempted or crashed by other workers, clientlib calls the shutdown interface to notify the upper application and persist the consumption progress to the check_point field of the corresponding shard in the database.

當worker的shard被搶佔之後,由於在第一時間週期T內,資料庫中該shard的consumer_owner資訊尚未被更新,因此被搶佔的worker能夠正確持久化其消費的各shard的check point信息。 After the worker's shard is preempted, since the consumer_owner information of the shard in the database has not been updated in the first time period T, the preempted worker can correctly persist the check point information of each shard it consumes.

本發明實施例在分散式環境下,對於一用戶端,其在搶佔到佇列分片後,該佇列分片對於該用戶端即是待消費的佇列分片,那麼該用戶端需要首先基於租賃協定獲取的該佇列分片的狀態資料,然後根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片,在確定沒有其他用戶端在消費該佇列分片後,該用戶端才會獲取該佇列分片當前的消費進度,並且更新該佇列分片的狀態資料,然後該用戶端再以當前的消費進度繼續消費該佇列分片並記錄新的消費進度。本發明實施例通過上述過程,在當前用戶端搶佔其他客戶端正在消費的佇列分片時,該佇列分片的消費進度可以無縫傳遞到當前用戶端中,使在進行佇列分片負載均衡時,或者某個正在消費佇列分片的用戶端當掉,某個用戶端的佇列分片被當前用戶端搶佔後,當前用戶端可 以按照已消費的消費進度繼續消費該佇列分片,避免部分資料的重複消費,使消費結果更精確。另外,對於前述三個方面的考慮:第一,各計算伺服器如何正確選擇消費的shard,以確保任意時刻任意shard有且只有一個消費計算伺服器;第二,同時如何處理某個或者某些計算伺服器宕機的情況下,進行再負載均衡時,所有的shard上的資料仍舊能夠正確處理,不被重複消費;第三,當處理壓力增大是,需要新增處理的計算伺服器,如何做到自動負載均衡,並能保證任意資料不被重複消費。在先技術還存在採用Kafka Client的方案,Kafka是一個專門處理的日誌的分散式訊息佇列,提供消息的發佈-訂閱功能。Kakfa提供一套高級消費API(Application Programming Interface,應用程式設計發展介面)完成多個工作進程之間的同步,該API依賴kafka後臺zookeeper系統。由於依賴zookeeper系統,工作進程A對某個佇列分片A的key更新的資料更新後,不能及時傳回zookeeper,那麼如果再負載均衡時,其他工作進程拿到的佇列分片A的key還是更新之前的資料。因此,Kafka系統中,當不同工作進程看到的zookeeper資料可能是不同的版本,而導致failover時負載均衡操作失敗。 In the embodiment of the present invention, in a distributed environment, for a client, after it preempts the queue segment, the queue segment is the queue segment to be consumed for the client, and the client needs to first The status data of the queue segment obtained based on the lease agreement, and then based on the status data, determine whether other clients are consuming the queue segment. After confirming that no other client is consuming the queue segment, the The client will obtain the current consumption progress of the queue segment, and update the status data of the queue segment, and then the client will continue to consume the queue segment with the current consumption progress and record the new consumption progress. Through the above process, in the embodiment of the present invention, when the current client seizes the queue fragment that is being consumed by other clients, the consumption progress of the queue fragment can be seamlessly transferred to the current client, so that the queue fragmentation is in progress. During load balancing, or a client that is consuming a queue fragment crashes, after the queue fragment of a certain client is preempted by the current client, the current client can continue to consume the queue according to the consumption progress. To avoid repeated consumption of part of the data, make the consumption result more accurate. In addition, consider the aforementioned three aspects: first, how each computing server correctly selects the shard to consume to ensure that any shard has and only one consuming computing server at any time; second, how to deal with one or some of them at the same time When the computing server is down, the data on all shards can still be processed correctly when re-load balancing is performed, and will not be repeatedly consumed; third, when the processing pressure increases, a new computing server needs to be added. How to achieve automatic load balancing, and to ensure that any data is not repeatedly consumed. In the prior art, there is also a solution using Kafka Client. Kafka is a distributed message queue of specially processed logs that provides a publish-subscribe function of messages. Kakfa provides a set of advanced consumption API (Application Programming Interface, application programming development interface) to complete the synchronization between multiple work processes. The API relies on the kafka backend zookeeper system. Due to the reliance on the zookeeper system, after the worker process A updates the data of the key of a certain queue segment A, it cannot be transmitted back to the zookeeper in time. Then if the load balance is performed, the other worker processes get the key of the queue segment A Still update the previous information. Therefore, in the Kafka system, the zookeeper data seen by different worker processes may be of different versions, causing the load balancing operation to fail during failover.

本發明實施例的上述過程,本發明的工作進程直接面對佇列分片,對佇列分片進行處理,也避免了Kafka系統再負載均衡時的,不同工作進程看到不同資料版本的問題。 In the above process of the embodiment of the present invention, the work process of the present invention directly faces the queue shards and processes the queue shards, which also avoids the problem that different work processes see different data versions when the Kafka system re-balances the load. .

再者,在先技術中,Kinesis Client Library在再負載均衡時,每次只能對一個佇列分片進行負載均衡,比如工作進程A正在消費100個佇列分片,而對於工作進程B,負載均衡時要從工作進程A搶佔佇列分片,其每次負載均衡時,只能從工作進程A搶佔1個佇列分片,如果實際上A要從B搶佔30個佇列分片的話,要觸發30次負載均衡操作,導致負載均衡耗用時間長,不能快速到達穩定狀態 Furthermore, in the prior art, Kinesis Client Library can only load balance one queue segment at a time during load balancing. For example, worker process A is consuming 100 queue segments, and for worker process B, When load balancing, it is necessary to preempt the queue shards from the work process A. Each time the load is balanced, only 1 queue shard can be preempted from the work process A. If in fact, A wants to preempt 30 queue shards from B , To trigger 30 load balancing operations, resulting in long load balancing time-consuming and unable to reach a stable state quickly

而本發明,在一次負載均衡過程中,可以一次性的搶佔該工作進程應該搶佔的N各佇列分片,本發明實施例可以快速批量選擇需要搶佔shard的方法,從而是各worker快速達到穩定狀態,並且使每個worker消費的shard個數均衡。 However, in the present invention, in a load balancing process, the N queue segments that should be preempted by the work process can be preempted at one time. The embodiment of the present invention can quickly select the method of preempting shards in batches, so that each worker can quickly reach stability. State, and balance the number of shards consumed by each worker.

尤其是以儲存在持久化儲存空間的用戶端實例表和佇列租期表為基礎,本發明實施例通過佇列租期表,然後在該佇列租期表中的消費者欄位consumer_owner、租用者欄位lease_owner和進度欄位check_point,分別記錄各個佇列分片的搶佔者、佔用者和佔用者的消費進度參數。然後在一工作進程搶佔另一工作進程佔用的佇列分片時,通過佇列租期表中該佇列分片的消費者欄位consumer_owner、租用者欄位lease_owner和進度欄位check_point,將消費進度參數傳遞給搶佔的工作進程,使在新增工作進程、某個工作進程所在計算伺服器宕機、或者某個工作進程fail over後,重新進行負載均衡時,其原來佔用的佇列分片被其他工作進程搶佔時,搶佔的佇列分片可以按照其已消費 的位置繼續進行消費,避免部分資料的重複消費,使消費結果更精確。 Especially based on the client instance table and the queue lease period table stored in the persistent storage space, the embodiment of the present invention queues the lease period table, and then lists the consumer_owner, consumer_owner, and consumer_owner fields in the queue lease period table. The renter field lease_owner and progress field check_point respectively record the consumption progress parameters of the preemptor, occupant, and occupant of each queue segment. Then when a worker process preempts a queue segment occupied by another worker process, the consumption will be consumed through the consumer_owner field, lease_owner field and progress field check_point of the queue segment in the queue lease table. The progress parameter is passed to the preempted work process, so that when a new work process is added, the computing server where a work process is located is down, or a work process fails over, the original queue fragments occupied when the load balance is restarted When preempted by other work processes, the preempted queue segment can continue to consume according to its consumed position, avoiding repeated consumption of part of the data, and making the consumption result more accurate.

為了更清楚的描述本發明,本實施例以佇列租期表和用戶端實例表為基礎,以某個用戶端worker A為新啟動的情況進行描述: In order to describe the present invention more clearly, this embodiment is based on the queue lease period table and the client instance table, and describes the situation where a certain client worker A is newly started:

步驟201,預先在資料庫中創建用戶端實例表和佇列租期表。 Step 201: Create a client instance table and a queue lease period table in the database in advance.

類似實施例一的原理創建client_worker_instance表和client_shard_lease表。 The client_worker_instance table and the client_shard_lease table are created in a principle similar to the first embodiment.

然後對於每個新啟動的用戶端執行步驟202-227。 Then steps 202-227 are executed for each newly started client terminal.

步驟202,啟動用戶端,並將用戶端名稱寫入用戶端實例表。 Step 202: Start the client, and write the client name into the client instance table.

worker A動時,會啟動其中的client lib執行緒,由用戶端的client lib執行緒將worker_name,當前系統時間寫入到client_worker_instance表。 When worker A moves, the client lib thread will be started, and the client lib thread on the client side will write worker_name and the current system time into the client_worker_instance table.

步驟203,從佇列系統中獲取所有的佇列分片的佇列分片標識。 Step 203: Obtain the queue fragment identifiers of all the queue fragments from the queue system.

由Client lib執行緒獲取佇列系統中所有shard的shard_id。 Get the shard_id of all shards in the queue system by the client lib thread.

步驟204,從資料庫的佇列租期表中獲取所有佇列分片的租用ID欄位和租用者欄位。 Step 204: Obtain the lease ID field and the renter field of all the queue segments from the queue lease period table in the database.

Client lib執行緒根據shard_id,從client_shard_lease表中獲取所有shard的lease_id和lease_owner According to the shard_id, the client lib thread obtains the lease_id and lease_owner of all shards from the client_shard_lease table

步驟205,判斷佇列系統中所有佇列分片是否有未出 現在佇列租期表中;如果有未出現在佇列租期表中的佇列分片,則將該佇列分片的相關資訊寫入資料庫的佇列租期表中。 Step 205: Determine whether all the queue fragments in the queue system do not appear in the queue lease period table; if there are queue fragments that do not appear in the queue lease period table, then the queue fragments are listed Relevant information is written into the queue lease period table of the database.

當Client lib執行緒判斷有新的shard_id未出現在client_shard_lease表中,則向client_shard_lease表寫入該shard_id對應的記錄,將對應的lease_id為0,其他欄位設置為空。 When the client lib thread determines that a new shard_id does not appear in the client_shard_lease table, it writes the record corresponding to the shard_id to the client_shard_lease table, sets the corresponding lease_id to 0, and sets other fields to be empty.

然後Client lib執行緒創建搶佔執行緒和續租執行緒。搶佔執行緒執行佇列分片的搶佔過程,續租執行緒執行佇列分片的租用過程。 Then Client lib threads create preemption threads and lease renewal threads. The preemption thread executes the preemption process of the queue segment, and the renewal thread executes the lease process of the queue segment.

步驟206,獲取用戶端實例表中記錄的最近一個第一時間週期內啟動的用戶端的數量live1. Step 206, obtain the number live1 of the client started in the last first time period recorded in the client instance table.

步驟207,獲取佇列租期表中,沒有超時佇列分片對應的用戶端的數量live2;該沒有超時的用戶端為佇列分片的租用者欄位下記錄的用戶端。 Step 207: Obtain the number live2 of the client corresponding to the queue segment without timeout in the queue lease period table; the client without timeout is the client recorded under the tenant field of the queue segment.

步驟208,將live1加上live2得到總的活躍的用戶端數量U。 Step 208: Add live1 to live2 to obtain the total number of active users U.

步驟209,從佇列系統中獲取佇列分片的總個數P; Step 209: Obtain the total number of queue fragments P from the queue system;

步驟210,從佇列租期表中獲取當前用戶端已消費的佇列分片總數Q; Step 210: Obtain the total number Q of queue fragments consumed by the current client from the queue lease period table;

步驟211,通過N=[P/U]-Q,計算當前用戶端需要搶佔的佇列分片數量N。 Step 211, by N=[P/U]-Q, calculate the number N of queue fragments that the current client needs to preempt.

步驟212,每隔第一時間週期,獲取佇列租期表,並判斷該佇列租期表中各佇列分片下租用者欄位的租用 ID,是否與本機存放區的map表中上次記錄的相應佇列分片的租用ID相同; 用戶端的搶佔執行緒每隔第一時間週期,獲取資料庫中client_shard_lease表中各shard的lease_id,將其與map表中上次記錄的相應shard的lease_id相進行比較,判斷是否相同。 Step 212: Obtain the queue lease period table every first time period, and determine whether the lease ID of the tenant field under each queue segment in the queue lease period table is consistent with the map table of the local storage area The lease ID of the corresponding queue shards recorded last time is the same; the preemption thread on the client side obtains the lease_id of each shard in the client_shard_lease table in the database every first time period, and compares it with the corresponding shard last recorded in the map table. The lease_id is compared to determine whether they are the same.

步驟213,如果佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID不同,則更新map表中該佇列分片的租用ID為佇列租期表中的租用ID,以及更新map表中該佇列分片的上次監控時間欄位為當前的系統時間; 如果client_shard_lease表中一shard的lease_id,與map表中上次記錄的相應shard的lease_id不同,則更新map表中該shard的lease_id為client_shard_lease表中的lease_id,以及更新map表中該shard的監控欄位為當前的系統時間 Step 213: If the lease ID of a queue segment in the queue lease period table is different from the lease ID of the corresponding queue segment recorded last time in the map table, update the lease ID of the queue segment in the map table. To list the lease ID in the lease period table, and update the last monitoring time field of the queue segment in the map table to the current system time; if the lease_id of a shard in the client_shard_lease table is the same as the last record in the map table If the lease_id of the corresponding shard is different, update the lease_id of the shard in the map table to the lease_id in the client_shard_lease table, and update the monitoring field of the shard in the map table to the current system time

步驟214,如果佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID相同,則維持上次監控時間欄位下的系統時間,並判斷當前系統時間減去上次監控的系統時間是否大於第一時間週期; 如果client_shard_lease表中一shard的lease_id,與map表中上次記錄的相應shard的lease_id相同,則維持上次監控欄位下的系統時間,並判斷當前系統時間減去上次監控的系統時間是否大於第一時間週期T。 Step 214: If the lease ID of a queue segment in the queue lease period table is the same as the lease ID of the corresponding queue segment recorded last time in the map table, the system time under the last monitoring time field is maintained. And determine whether the current system time minus the last monitored system time is greater than the first time period; if the lease_id of a shard in the client_shard_lease table is the same as the lease_id of the corresponding shard recorded last time in the map table, the last monitoring field will be maintained And determine whether the current system time minus the last monitored system time is greater than the first time period T.

步驟215,如果當前系統時間減去上次監控的系統時間大於第一時間週期,則確定相應佇列分片超時。 Step 215: If the current system time minus the last monitored system time is greater than the first time period, it is determined that the corresponding queue segment has timed out.

步驟216,判斷該N是否大於0; Step 216: Determine whether the N is greater than 0;

步驟217,如果該N大於0,則判斷超時的佇列分片數量L是否小於N; Step 217: If the N is greater than 0, determine whether the number of timed out queue fragments L is less than N;

步驟218,如果超時的佇列分片數量L大於等於N,則從超時的佇列分片中搶佔N個佇列分片; Step 218: If the number of timeout queue fragments L is greater than or equal to N, preempt N queue fragments from the timeout queue fragments;

步驟219,如果判斷超時的佇列分片數量L小於N,則從超時的佇列分片中搶佔L個佇列分片,並從其他客戶端正在消費的佇列分片中,搶佔N-L個佇列分片。 Step 219: If it is determined that the number of timed out queue fragments L is less than N, then preempt the L queue fragments from the timed out queue fragments, and preempt the queue fragments that are being consumed by other clients NL queue fragments.

其中,針對被搶佔的佇列分片,在搶佔一佇列分片時,將資料庫的佇列租期表中,該被搶佔佇列分片的消費者欄位修改為當前的用戶端,並將佇列租期表中的更新時間欄位修改為消費時間;該消費時間為該用戶端搶佔該佇列分片時的系統時間與第一週期時間之和。 Among them, for the preempted queue segment, when a queue segment is preempted, the consumer field of the preempted queue segment in the queue lease table of the database is modified to the current client, And modify the update time field in the queue lease period table to consumption time; the consumption time is the sum of the system time and the first cycle time when the client seizes the queue segment.

對於一個被搶佔的shard,將client_shard_lease表中,該被搶佔的shard的lease_owner修改為該worker A,並在update_time下記錄消費時間。並將該被搶佔的shard放入續租執行緒。 For a preempted shard, modify the lease_owner of the preempted shard in the client_shard_lease table to the worker A, and record the consumption time under update_time. And put the preempted shard into the lease renewal thread.

對於從其他用戶端搶佔的shard,由於該被搶用戶端判斷本地map中的租用ID與佇列租期表中的不同,此時消費者欄位與該被搶用戶端還相同,則可被搶用戶端時間更新消費進度,並且由於續租的週期為T/2,被搶用戶端不能續租該shard,不能續租則該被搶用戶端就不能繼續 處理該shard。如此保證了佇列分片的消費進度無縫在各用戶端中傳遞。 For shards that are preempted from other clients, because the looted client judges that the lease ID in the local map is different from that in the queue lease table, at this time the consumer field is the same as the robbed client, it can be The robbed client time updates the consumption progress, and since the renewal period is T/2, the robbed client cannot renew the shard, and the robbed client cannot continue to process the shard if the lease cannot be renewed. This ensures that the consumption progress of the queue fragments is seamlessly transmitted to each client.

上述步驟212-步驟219為前述搶佔執行緒執行,搶佔執行緒每個2*T執行一次。在用戶端worker A新加入時其所有佇列分片均為上述步驟搶佔得到。在後續每次執行,如果通過上述步驟確定該用戶端要負載均衡其他用戶端的佇列分片,則執行搶佔過程。 The above steps 212 to 219 are the aforementioned preemptive thread execution, and the preemptive thread is executed once every 2*T. When the user-side worker A newly joins, all its queue fragments are obtained by the above steps. In each subsequent execution, if it is determined through the above steps that the client terminal wants to load balance the queue fragments of other client terminals, the preemption process is executed.

步驟220,對於搶佔的佇列分片,從佇列租期表中獲取該佇列分片的消費這欄位的值,並判斷該佇列分片的消費者欄位的值的值是否為當前用戶端。 Step 220: For the preempted queue segment, obtain the value of the consumption field of the queue segment from the queue lease table, and determine whether the value of the consumer field of the queue segment is The current client.

步驟221,如果該佇列分片的消費者欄位的值的值是當前用戶端,則判斷當前用戶端的搶佔時間是否大於該消費時間。 Step 221: If the value of the consumer field of the queue segment is the current client, it is determined whether the preemption time of the current client is greater than the consumption time.

本步驟判斷搶佔的用戶端的系統時間是否大於update_time下的消費時間。 In this step, it is judged whether the system time of the preempted client is greater than the consumption time under update_time.

步驟222,如果當前用戶端的搶佔時間大於該消費時間,則租用該佇列分片,並將持久化儲存空間中儲存佇列租期表中該佇列分片下的消費者欄位更新為當前用戶端。 Step 222: If the current client's preemption time is greater than the consumption time, the queue segment is leased, and the consumer field under the queue segment in the storage queue lease table in the persistent storage space is updated to the current user terminal.

判斷搶佔的用戶端的系統時間是否大於update_time下的消費時間;如果該用戶端的系統時間超過該消費時間,則將該被搶佔shard的consumer_owner修改為worker A的用戶端,並將該被搶佔的shard的lease_id修改為lease_id+1,如此worker A則租用了該shard,並且意味著worker A搶佔成功上述shard。 Determine whether the system time of the preempted client is greater than the consumption time under update_time; if the system time of the client exceeds the consumption time, modify the consumer_owner of the preempted shard to the client of worker A, and change the preempted shard’s The lease_id is modified to lease_id+1, so that worker A leases the shard, and it means that worker A successfully preempts the shard.

步驟223,對於用戶端佔用的佇列分片,從資料庫的佇列租期表中獲取搶佔者欄位值和租用者欄位值,並判斷該佇列分片下的搶佔者欄位和消費者欄位是否為當前用戶端; Step 223: For the queue segment occupied by the client, obtain the preemptor field value and the tenant field value from the queue lease period table of the database, and determine the preemptor field and the tenant field value under the queue segment Whether the consumer field is the current client;

步驟224,如果述佇列分片下的搶佔者欄位和消費者欄位是否為當前用戶端,則為當前用戶端續租該佇列分片。 Step 224: If the preemptor field and the consumer field under the queue segment are the current client, then the queue segment is renewed for the current client.

上述步驟220-224為續租執行緒執行的步驟,續租現在每隔T/2,對於當前用戶端佔用的shard,當判斷client_shard_lease表中的lease_owner和consumer_owner相同,則進行續租。 The above steps 220-224 are the execution steps of the lease renewal thread. The lease renewal is now every T/2. For the shard occupied by the current client, when it is judged that the lease_owner and the consumer_owner in the client_shard_lease table are the same, the lease renewal is performed.

步驟225,對於用戶端搶佔成功的佇列分片,從資料庫的佇列租期表的該佇列分片下的進度欄位中獲取消費進度。 Step 225: For the queue segment that is successfully preempted by the client, the consumption progress is obtained from the progress field under the queue segment in the queue lease table of the database.

client則從client_shard_lease表的check_point欄位提取消費進度。當該消費進度為空是,則該shard的消費進度為從頭部開始消費。 The client extracts the consumption progress from the check_point column of the client_shard_lease table. When the consumption progress is empty, the consumption progress of the shard is to start consumption from the head.

步驟226,用戶端從該佇列分片的該消費進度位置繼續消費該佇列分片。 In step 226, the client continues to consume the queue segment from the consumption progress position of the queue segment.

步驟227,用戶端在消費佇列分片的過程中,將消費進度更新到資料庫中該佇列分片下的進度欄位中。 Step 227: During the process of consuming the queue segment, the user terminal updates the consumption progress to the progress field under the queue segment in the database.

在worker消費shard的過程中,Client lib執行緒調用checkpoint的介面,每隔T時間,將消費進度更新到client_shard_lease表的check_point下;或者在該shard lease_owner被搶佔之後,續租週期到達時,調用checkpoint的shutdown介面,通知上層應用,將當前消費成功的check point信息持久化到client_shard_lease表的check_point下;或者,在worker A當掉時,調用checkpoint的shutdown介面,通知上層應用,將當前消費成功的check point信息持久化到client_shard_lease表的check_point下。 When the worker consumes the shard, the client lib thread calls the checkpoint interface and updates the consumption progress to the check_point in the client_shard_lease table every T time; or after the shard lease_owner is preempted, the checkpoint is called when the lease renewal period arrives. The shutdown interface of, notifies the upper application, and persists the check point information of the current successful consumption to the check_point in the client_shard_lease table; or, when worker A is down, calls the shutdown interface of checkpoint to notify the upper application to check the current successful consumption check The point information is persisted under check_point in the client_shard_lease table.

當然,對於用戶端執行第一次搶佔佇列分片之後,其可以執行步驟203及以後的步驟。 Of course, after the user terminal performs the first preemption queue segmentation, it can perform step 203 and subsequent steps.

需要說明的是,對於方法實施例,為了簡單描述,故將其都表述為一系列的動作組合,但是所屬技術領域中具有通常知識者應該知悉,本發明實施例並不受所描述的動作順序的限制,因為依據本發明實施例,某些步驟可以採用其他順序或者同時進行。其次,所屬技術領域中具有通常知識者也應該知悉,說明書中所描述的實施例均屬於優選實施例,所涉及的動作並不一定是本發明實施例所必須的。 It should be noted that for the method embodiments, for the sake of simple description, they are all expressed as a series of action combinations, but those with ordinary knowledge in the technical field should know that the embodiments of the present invention are not subject to the described sequence of actions. The limitation is because according to the embodiment of the present invention, some steps can be performed in other order or simultaneously. Secondly, those with ordinary knowledge in the technical field should also know that the embodiments described in the specification are all preferred embodiments, and the actions involved are not necessarily required by the embodiments of the present invention.

實施例二 Example two

參照圖2,示出了本發明的一種分散式環境協調消費佇列的系統實施例的結構框圖,具體可以包括如下模組:狀態資料獲取模組310,用於針對一待消費的佇列分片,基於租賃協定獲取的該佇列分片的狀態資料;消費判斷模組320,用於根據該狀態資料,判斷是否 有其他用戶端在消費該佇列分片;進度獲取模組330,用於如果確定沒有其他用戶端在消費該佇列分片,則更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度;消費模組340,用於根據當前的消費進度繼續消費該佇列分片,並將該佇列分片新的消費進度進行記錄。 Referring to FIG. 2, there is shown a structural block diagram of an embodiment of a system for coordinating consumption queues in a distributed environment of the present invention, which may specifically include the following modules: a status data acquisition module 310 for a queue to be consumed The sharding is based on the status data of the queue shard obtained based on the lease agreement; the consumption judgment module 320 is used to determine whether other clients are consuming the queue shard according to the status data; the progress obtaining module 330, If it is determined that no other client is consuming the queue segment, update the status data of the queue segment, and obtain the current consumption progress of the queue segment; the consumption module 340 is used to according to the current consumption The progress continues to consume the queue segment, and the new consumption progress of the queue segment is recorded.

在本發明一優選的實施例中,在狀態資料獲取模組310之前,還包括:佇列分片確定模組,用於基於租賃協定確定當前用戶端需求的待消費的佇列分片。 In a preferred embodiment of the present invention, before the status data acquisition module 310, it further includes: a queue segment determination module, which is used to determine the queue segment to be consumed based on the current client demand based on the lease agreement.

在本發明一優選的實施例中,該佇列分片確定模組包括:需求數量確定子模組,用於獲取活躍的用戶端總數U、佇列分片總數P以及當前用戶端已消費的佇列分片總數Q,以計算當前用戶端需要搶佔的佇列分片數量N;搶佔子模組,用於從超時的佇列分片和/或者其他客戶端正在消費的佇列分片中,搶佔N個佇列分片作為當前用戶端的待消費佇列分片。 In a preferred embodiment of the present invention, the queue segment determination module includes: a demand quantity determination sub-module for obtaining the total number of active clients U, the total number of queue segments P, and the current client has consumed The total number of queue shards Q is used to calculate the number of shards N that the current client needs to preempt; the preemption sub-module is used for queue shards that have timed out and/or queue shards that other clients are consuming In, preempt the N queue fragments as the queue fragments to be consumed on the current user end.

在本發明一優選的實施例中,該需求數量確定子模組包括:用戶端總數獲取子模組,用於從持久化儲存空間中儲存的用戶端實例表和佇列租期表中獲取活躍的用戶端總數U;整體佇列分片總數獲取子模組,用於從佇列系統中獲 取佇列分片的總個數P;單體佇列分片總數獲取子模組,用於從持久化儲存空間中儲存的佇列租期表中獲取當前用戶端已消費的佇列分片總數Q;需求數量計算子模組,用於通過N=[P/U]-Q,計算當前用戶端需要搶佔的佇列分片數量N。 In a preferred embodiment of the present invention, the demand quantity determination sub-module includes: a client total number obtaining sub-module, which is used to obtain active data from the client instance table and the queue lease period table stored in the persistent storage space. The total number of client-sides U; the total number of shards in the overall queue obtains the sub-module, which is used to obtain the total number of shards P from the queue system; the total number of single-queue shards obtains the sub-module, which is used from Get the total number Q of queue shards consumed by the current client from the queue lease table stored in the persistent storage space; the demand quantity calculation sub-module is used to calculate the current user through N=[P/U]-Q The number of shards in the queue that the end needs to preempt is N.

在本發明一優選的實施例中,該用戶端總數獲取子模組包括:新建用戶端數量獲取子模組,用於獲取用戶端實例表中記錄的最近一個第一時間週期內啟動的用戶端的數量live1;未超時用戶端數量獲取子模組,用於獲取佇列租期表中,沒有超時佇列分片對應的用戶端的數量live2;該沒有超時的用戶端為佇列分片的租用者欄位下記錄的用戶端;用戶端數量累加子模組,用於將live1加上live2得到總的活躍的用戶端數量U。 In a preferred embodiment of the present invention, the client total number acquisition sub-module includes: a newly created client number acquisition sub-module, which is used to obtain the latest client information recorded in the client instance table during the first time period. Quantity live1; the sub-module for obtaining the number of clients that have not timed out is used to obtain the number of clients that do not have a timeout queued segment in the queue lease table, live2; the client without a timeout is the queued segment The client recorded under the tenant field; the client quantity accumulation sub-module is used to add live1 to live2 to get the total active client quantity U.

在本發明一優選的實施例中,該搶佔子模組包括:第一判斷子模組,用於判斷該N是否大於0;第二判斷子模組,用於如果該N大於0,則判斷超時的佇列分片數量L是否小於N;全超時搶佔子模組,用於如果超時的佇列分片數量L大於等於N,則從超時的佇列分片中搶佔N個佇列分片;混和搶佔子模組,用於如果判斷超時的佇列分片數量 L小於N,則從超時的佇列分片中搶佔L個佇列分片,並從其他客戶端正在消費的佇列分片中,搶佔N-L個佇列分片。 In a preferred embodiment of the present invention, the preemption sub-module includes: a first judging sub-module for judging whether the N is greater than 0; a second judging sub-module for judging if the N is greater than 0 Whether the number of timeout queue fragments L is less than N; full timeout preemption sub-module, used to preempt N from the timeout queue fragments if the number of timeout queue fragments L is greater than or equal to N Queue shards; mixed preemption sub-module, used to determine that the number of timed out queue shards L is less than N, then preempt L queue shards from the timed out queue shards, and correct them from other clients Among the queue fragments for consumption, NL queue fragments are preempted.

在本發明一優選的實施例中,該混和搶佔模組包括:第一混和搶佔子模組,用於當前用戶端從其他用戶端佔用的佇列分片中搶佔N-L各佇列分片,並使各用戶端佔用最多佇列分片的數量和佔用最少佇列分片的數量之差不超過指定數量。 In a preferred embodiment of the present invention, the hybrid preemption module includes: a first hybrid preemption sub-module for the current client to preempt each queue segment of NL from the queue segments occupied by other client terminals, and The difference between the number of shards that each client occupies the most and the number of shards that occupies the least does not exceed the specified number.

在本發明一優選的實施例中,該第一混和搶佔子模組包括:排序子模組,用於基於該佇列租期表,將各個用戶端按照其佔用的佇列分片數量,從多到少進行排序;第二混和搶佔模組,用於每次從前K個用戶端中搶佔一個或多個搶佔佇列分片後,使前K個用戶端被搶佔J個之後,前K個用戶端剩下的佇列分片的平均數大於第K+1個用戶端當前佔用的佇列分片數量,並且使前K個用戶端中,各個用戶端剩餘的佇列分片數與該平均數相差不超過指定數量,直至成功搶佔到N-L個佇列分片。 In a preferred embodiment of the present invention, the first hybrid preemption sub-module includes: a sorting sub-module, which is used for sorting each client according to the number of queue shards occupied by it based on the queue lease table, from Sort from more to less; the second hybrid preemption module is used to preempt one or more preemption queue fragments from the first K client terminals each time, so that after the first K client terminals are preempted by J, the first K The average number of remaining queue fragments on the client side is greater than the number of queue fragments currently occupied by the K+1th client side, and the number of remaining queue fragments on each client side among the first K client terminals is equal to this The average number does not differ by more than the specified number, until NL queue fragments are successfully preempted.

在本發明一優選的實施例中,該搶佔子模組包括:消費者欄位修改子模組,用於在搶佔一佇列分片時,將佇列租期表中,該被搶佔佇列分片的消費者欄位修改為當前的用戶端。 In a preferred embodiment of the present invention, the preemption sub-module includes: a consumer field modification sub-module, which is used to queue the lease term in the preempted queue when a queue segment is preempted The consumer field of the fragment is modified to the current client.

在本發明一優選的實施例中,該搶佔子模組之前,還包括: map表判斷子模組,用於每隔第一時間週期,獲取佇列租期表,並判斷該佇列租期表中各佇列分片下租用者欄位的租用ID,是否與本機存放區的map表中上次記錄的相應佇列分片的租用ID相同;map監控時間更新子模組,用於如果佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID不同,則更新map表中該佇列分片的租用ID為佇列租期表中的租用ID,以及更新map表中該佇列分片的上次監控時間欄位為當前的系統時間;map監控時間維持子模組,用於如果佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID相同,則維持上次監控時間欄位下的系統時間,並判斷當前系統時間減去上次監控的系統時間是否大於第一時間週期;超時判斷子模組,用於如果當前系統時間減去上次監控的系統時間大於第一時間週期,則確定相應佇列分片超時。 In a preferred embodiment of the present invention, before the preemption submodule, it further includes: a map table judging submodule, which is used to obtain the queue lease period table every first time period and determine the queue lease period Whether the lease ID of the tenant field under each queue segment in the table is the same as the lease ID of the corresponding queue segment recorded last time in the map table of the local storage area; the map monitoring time update submodule is used for If the lease ID of a queue segment in the queue lease table is different from the lease ID of the corresponding queue segment recorded last time in the map table, update the lease ID of the queue segment in the map table to the queue The lease ID in the lease table, and the last monitoring time field of the queue segment in the updated map table is the current system time; the map monitoring time maintenance sub-module is used if a queue in the queue is listed The lease ID of the column segment is the same as the lease ID of the corresponding queue segment recorded last time in the map table. The system time under the last monitoring time field is maintained, and the current system time minus the last monitored system is determined Whether the time is greater than the first time period; the timeout judging sub-module is used to determine if the current system time minus the last monitored system time is greater than the first time period, determine the corresponding queue segment timeout.

在本發明一優選的實施例中,該佇列分片確定模組包括:第一佇列分片確定子模組,用於基於租賃協定每隔第一時間週期,確定當前用戶端需求的待消費的佇列分片。 In a preferred embodiment of the present invention, the queue fragmentation determining module includes: a first queue fragmentation determining sub-module, which is used to determine the waiting time of the current user end demand every first time period based on the lease agreement. The consumption queue fragments.

在本發明一優選的實施例中,該狀態資料獲取模組310包括:第一狀態資料獲取子模組,用於針對一待消費的佇列 分片,從持久化儲存空間中儲存的佇列租期表中獲取該佇列分片的狀態資料。 In a preferred embodiment of the present invention, the status data acquisition module 310 includes: a first status data acquisition sub-module, which is used for a queue segment to be consumed from the queue stored in the persistent storage space Obtain the status data of the queue segment in the lease table.

在本發明一優選的實施例中,該狀態資料包括佇列租期表中該佇列分片下消費者欄位的值、租用者欄位的值和更新時間欄位的消費時間;該消費時間為當前用戶端搶佔該佇列分片後修改該更新時間欄位的值獲得;則,該消費判斷模組320包括:消費者欄位判斷子模組,用於判斷該佇列分片的消費者欄位的值的值是否為當前用戶端;搶佔時間判斷子模組,用於如果該佇列分片的消費者欄位的值的值是當前用戶端,則判斷當前用戶端的搶佔時間是否大於該消費時間;確定子模組,用於如果當前用戶端的搶佔時間大於該消費時間,則確定沒有其他用戶端在消費該佇列分片。 In a preferred embodiment of the present invention, the status data includes the value of the consumer field under the queue segment in the queue rental period table, the value of the renter field, and the consumption time of the update time field; the consumption The time is obtained by modifying the value of the update time field after the current client preempts the queue segment; then, the consumption judgment module 320 includes: a consumer field judgment sub-module for judging the queue segment Whether the value of the consumer field is the current client; the preemption time judging sub-module is used to determine the preemption time of the current client if the value of the consumer field of the queue segment is the current client Whether it is greater than the consumption time; the determining sub-module is used for determining that no other client is consuming the queue segment if the current client's preemption time is greater than the consumption time.

在本發明一優選的實施例中,該搶佔時間為當前用戶端的系統時間,該消費時間為該用戶端搶佔該佇列分片時的系統時間與第一週期時間之和。 In a preferred embodiment of the present invention, the preemption time is the system time of the current client, and the consumption time is the sum of the system time when the client preempts the queue segment and the first cycle time.

在本發明一優選的實施例中,該進度獲取模組330包括:租用子模組,用於租用該佇列分片,並將持久化儲存空間中儲存佇列租期表中該佇列分片下的消費者欄位更新為當前用戶端;進度欄位讀取模組,用於從持久化儲存空間中儲存的佇列租期表中,獲取該佇列分片的進度欄位下的消費進 度。 In a preferred embodiment of the present invention, the progress obtaining module 330 includes: a lease sub-module for renting the queue segment, and storing the queue segment in the queue lease schedule in the persistent storage space. The consumer field under the slice is updated to the current client; the progress field reading module is used to obtain the progress field of the queue segment from the queue lease table stored in the persistent storage space Consumption progress.

在本發明一優選的實施例中,該消費模組320包括:租用者判斷子模組,用於每隔第二時間週期,判斷佇列租期表中,該佇列分片的租用者欄位是否為當前用戶端;消費進度更新子模組,用於如果該佇列分片的租用者欄位是當前用戶端,則將當前用戶端對該佇列分片的消費進度,更新到佇列租期表中該佇列分片的進度欄位下。 In a preferred embodiment of the present invention, the consumption module 320 includes: a renter judging sub-module for judging the renter column of the queue segment in the queue lease period every second time period Whether the bit is the current client; the consumption progress update sub-module is used to update the consumption progress of the queue by the current client to the queue if the tenant field of the queue segment is the current client List under the progress column of the queue segment in the lease period table.

在本發明一優選的實施例中,在租用子模組之後,還包括:續租判斷子模組,用於判斷在佇列租期表中,該佇列分片下的搶佔者欄位和消費者欄位是否為當前用戶端;第一續租子模組,用於如果述佇列分片下的搶佔者欄位和消費者欄位是否為當前用戶端,則為當前用戶端續租該佇列分片。 In a preferred embodiment of the present invention, after the submodule is leased, it further includes: a lease renewal judging submodule for judging the preemptor field and the sub-module under the queue segment in the queue lease period table Whether the consumer field is the current client; the first renewal sub-module is used to renew the lease for the current client if the preemptor field and the consumer field under the queue segment are the current client The queue fragment.

結合圖2A,其示出了本發明的分散式環境的示例,其中各個計算節點即為一個用戶端。調度伺服器可以分配佇列分片。每個用戶端包括:狀態資料獲取模組310、消費判斷模組320、進度獲取模組330、消費模組340。當然每個用戶端也可包括相應優選的模組。 With reference to FIG. 2A, it shows an example of the distributed environment of the present invention, in which each computing node is a user terminal. The scheduling server can allocate queue fragments. Each client terminal includes: a status data acquisition module 310, a consumption judgment module 320, a progress acquisition module 330, and a consumption module 340. Of course, each client can also include a corresponding preferred module.

在分散式環境下,對於一用戶端,其在搶佔到佇列分片後,該佇列分片對於該用戶端即是待消費的佇列分片,那麼該用戶端需要首先基於租賃協定獲取的該佇列分片的狀態資料,然後根據該狀態資料,判斷是否有其他用戶端 在消費該佇列分片,在確定沒有其他用戶端在消費該佇列分片後,該用戶端才會獲取該佇列分片當前的消費進度,並且更新該佇列分片的狀態資料,然後該用戶端再以當前的消費進度繼續消費該佇列分片並記錄新的消費進度。本發明實施例通過上述過程,在當前用戶端搶佔其他客戶端正在消費的佇列分片時,該佇列分片的消費進度可以無縫傳遞到當前用戶端中,使在進行佇列分片負載均衡時,或者某個正在消費佇列分片的用戶端當掉,某個用戶端的佇列分片被當前用戶端搶佔後,當前用戶端可以按照已消費的消費進度繼續消費該佇列分片,避免部分資料的重複消費,使消費結果更精確 In a distributed environment, for a client, after it preempts the queue segment, the queue segment is the queue segment to be consumed for the client, then the client needs to obtain it based on the lease agreement first According to the status data of the queue segment, determine whether there is another client consuming the queue segment. After confirming that no other client is consuming the queue segment, the client will Obtain the current consumption progress of the queue segment, and update the status data of the queue segment, and then the client continues to consume the queue segment with the current consumption progress and records the new consumption progress. Through the above process, in the embodiment of the present invention, when the current client seizes the queue fragment that is being consumed by other clients, the consumption progress of the queue fragment can be seamlessly transferred to the current client, so that the queue fragmentation is in progress. During load balancing, or a client that is consuming a queue fragment crashes, after the queue fragment of a certain client is preempted by the current client, the current client can continue to consume the queue according to the consumption progress. To avoid repeated consumption of part of the data and make the consumption result more accurate

對於裝置實施例而言,由於其與方法實施例基本相似,所以描述的比較簡單,相關之處參見方法實施例的部分說明即可。 As for the device embodiment, since it is basically similar to the method embodiment, the description is relatively simple, and for related parts, please refer to the part of the description of the method embodiment.

本說明書中的各個實施例均採用遞進的方式描述,每個實施例重點說明的都是與其他實施例的不同之處,各個實施例之間相同相似的部分互相參見即可。 The various embodiments in this specification are described in a progressive manner, and each embodiment focuses on the differences from other embodiments, and the same or similar parts between the various embodiments can be referred to each other.

所屬技術領域中具有通常知識者應明白,本發明實施例的實施例可提供為方法、裝置、或電腦程式產品。因此,本發明實施例可採用完全硬體實施例、完全軟體實施例、或結合軟體和硬體方面的實施例的形式。而且,本發明實施例可採用在一個或多個其中包含有電腦可用程式碼的電腦可用儲存媒介(包括但不限於磁碟儲存器、CD-ROM、光學儲存器等)上實施的電腦程式產品的形式。 Those skilled in the art should understand that the embodiments of the present invention can be provided as methods, devices, or computer program products. Therefore, the embodiments of the present invention may take the form of a completely hardware embodiment, a completely software embodiment, or an embodiment combining software and hardware. Moreover, the embodiments of the present invention may adopt computer program products implemented on one or more computer-usable storage media (including but not limited to disk storage, CD-ROM, optical storage, etc.) containing computer-usable program codes. form.

在一個典型的配置中,該電腦設備包括一個或多個消費器(處理器)(CPU)、輸入/輸出介面、網路介面和記憶體。記憶體可能包括電腦可讀媒介中的非永久性記憶體,隨機存取記憶體(RAM)和/或非易失性記憶體等形式,如唯讀記憶體(ROM)或快閃記憶體(flash RAM)。記憶體是電腦可讀媒介的示例。電腦可讀媒介包括永久性和非永久性、可移動和非可移動媒體可以由任何方法或技術來實現資訊儲存。資訊可以是電腦可讀指令、資料結構、程式的模組或其他資料。電腦的儲存媒介的例子包括,但不限於相變記憶體(PRAM)、靜態隨機存取記憶體(SRAM)、動態隨機存取記憶體(DRAM)、其他類型的隨機存取記憶體(RAM)、唯讀記憶體(ROM)、電可擦除可程式設計唯讀記憶體(EEPROM)、快閃記憶體或其他記憶體技術、唯讀光碟唯讀記憶體(CD-ROM)、數位多功能光碟(DVD)或其他光學儲存、磁盒式磁帶,磁帶磁磁片儲存或其他磁性存放裝置或任何其他非傳輸媒介,可用於儲存可以被計算設備訪問的資訊。按照本文中的界定,電腦可讀媒介不包括非持續性的電腦可讀媒體(transitory media),如調製的資料信號和載波。 In a typical configuration, the computer device includes one or more consumer (processor) (CPU), input/output interfaces, network interfaces, and memory. Memory may include non-permanent memory in computer readable media, random access memory (RAM) and/or non-volatile memory, such as read-only memory (ROM) or flash memory ( flash RAM). Memory is an example of computer readable media. Computer-readable media includes permanent and non-permanent, removable and non-removable media, and information storage can be realized by any method or technology. Information can be computer-readable instructions, data structures, program modules, or other data. Examples of computer storage media include, but are not limited to, phase change memory (PRAM), static random access memory (SRAM), dynamic random access memory (DRAM), and other types of random access memory (RAM) , Read-only memory (ROM), electrically erasable programmable read-only memory (EEPROM), flash memory or other memory technology, CD-ROM, digital multi-function Optical discs (DVD) or other optical storage, magnetic cassettes, magnetic tape storage or other magnetic storage devices or any other non-transmission media that can be used to store information that can be accessed by computing devices. According to the definition in this article, computer-readable media does not include non-persistent computer-readable media (transitory media), such as modulated data signals and carrier waves.

本發明實施例是參照根據本發明實施例的方法、終端設備(系統)、和電腦程式產品的流程圖和/或方框圖來描述的。應理解可由電腦程式指令實現流程圖和/或方框圖中的每一流程和/或方框、以及流程圖和/或方框圖中的流程和/或方框的結合。可提供這些電腦程式指令到通用電 腦、專用電腦、嵌入式消費機或其他可程式設計資料消費終端設備的消費器以產生一個機器,使得通過電腦或其他可程式設計資料消費終端設備的消費器執行的指令產生用於實現在流程圖一個流程或多個流程和/或方框圖一個方框或多個方框中指定的功能的裝置。 The embodiments of the present invention are described with reference to the flowcharts and/or block diagrams of the methods, terminal devices (systems), and computer program products according to the embodiments of the present invention. It should be understood that each process and/or block in the flowchart and/or block diagram, and the combination of processes and/or blocks in the flowchart and/or block diagram can be realized by computer program instructions. These computer program instructions can be provided to general-purpose computers, dedicated computers, embedded consumer computers, or other programmable data consumer terminal devices to produce a machine, which can be executed by computers or other programmable data consumer terminal devices. The instructions generate means for realizing the functions specified in one process or multiple processes in the flowchart and/or one block or multiple blocks in the block diagram.

這些電腦程式指令也可儲存在能引導電腦或其他可程式設計資料消費終端設備以特定方式工作的電腦可讀記憶體中,使得儲存在該電腦可讀記憶體中的指令產生包括指令裝置的製造品,該指令裝置實現在流程圖一個流程或多個流程和/或方框圖一個方框或多個方框中指定的功能。 These computer program instructions can also be stored in a computer readable memory that can guide a computer or other programmable data consumer terminal equipment to work in a specific way, so that the instructions stored in the computer readable memory can be generated including the manufacturing of the instruction device The instruction device realizes the functions specified in one process or multiple processes in the flowchart and/or one block or multiple blocks in the block diagram.

這些電腦程式指令也可裝載到電腦或其他可程式設計資料消費終端設備上,使得在電腦或其他可程式設計終端設備上執行一系列操作步驟以產生電腦實現的消費,從而在電腦或其他可程式設計終端設備上執行的指令提供用於實現在流程圖一個流程或多個流程和/或方框圖一個方框或多個方框中指定的功能的步驟。 These computer program instructions can also be loaded on a computer or other programmable data consumer terminal equipment, so that a series of operation steps are executed on the computer or other programmable terminal equipment to generate computer-realized consumption, so that the computer or other programmable terminal equipment The instructions executed on the design terminal device provide steps for implementing functions specified in one or more processes in the flowchart and/or one block or more in the block diagram.

儘管已描述了本發明實施例的優選實施例,但所屬技術領域中具有通常知識者一旦得知了基本創造性概念,則可對這些實施例做出另外的變更和修改。所以,所附請求項意欲解釋為包括優選實施例以及落入本發明實施例範圍的所有變更和修改。 Although the preferred embodiments of the embodiments of the present invention have been described, those skilled in the art can make additional changes and modifications to these embodiments once they learn the basic creative concepts. Therefore, the appended claims are intended to be interpreted as including the preferred embodiments and all changes and modifications falling within the scope of the embodiments of the present invention.

最後,還需要說明的是,在本文中,諸如第一和第二等之類的關係術語僅用來將一個實體或者操作與另一個實體或操作區分開來,而不一定要求或者暗示這些實體或操 作之間存在任何這種實際的關係或者順序。而且,術語“包括”、“包含”或者其任何其他變體意在涵蓋非排他性的包含,從而使得包括一系列要素的過程、方法、物品或者終端設備不僅包括那些要素,而且還包括沒有明確列出的其他要素,或者是還包括為這種過程、方法、物品或者終端設備所固有的要素。在沒有更多限制的情況下,由語句“包括一個......”限定的要素,並不排除在包括該要素的過程、方法、物品或者終端設備中還存在另外的相同要素。 Finally, it should be noted that in this article, relational terms such as first and second are only used to distinguish one entity or operation from another entity or operation, and do not necessarily require or imply these entities. Or there is any such actual relationship or sequence between operations. Moreover, the terms "including", "including" or any other variants thereof are intended to cover non-exclusive inclusion, so that a process, method, article or terminal device including a series of elements not only includes those elements, but also includes those elements that are not explicitly listed. Other elements listed, or also include elements inherent to this process, method, article, or terminal device. Without more restrictions, the element defined by the sentence "including a..." does not exclude the existence of other identical elements in the process, method, article, or terminal device that includes the element.

以上對本發明所提供的一種分散式環境協調消費佇列方法和一種分散式環境協調消費佇列系統,進行了詳細介紹,本文中應用了具體個例對本發明的原理及實施方式進行了闡述,以上實施例的說明只是用於幫助理解本發明的方法及其核心思想;同時,對於所屬技術領域中具有通常知識者,依據本發明的思想,在具體實施方式及應用範圍上均會有改變之處,綜上所述,本說明書內容不應理解為對本發明的限制。 The above provides a detailed introduction to a distributed environment coordinated consumption queue method and a distributed environment coordinated consumption queue system provided by the present invention. Specific examples are used in this article to illustrate the principle and implementation of the present invention. The description of the embodiments is only used to help understand the method and the core idea of the present invention; at the same time, for those with ordinary knowledge in the technical field, according to the idea of the present invention, there will be changes in the specific implementation and the scope of application In summary, the content of this specification should not be construed as a limitation of the present invention.

Claims (32)

一種分散式環境協調消費(consume)佇列的方法,其特徵在於,包括:針對一待消費的佇列分片,基於租賃協定,從持久化儲存空間中儲存的佇列租期表中獲取該佇列分片的狀態資料;根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片;如果確定沒有其他用戶端在消費該佇列分片,則更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度;根據當前的消費進度繼續消費該佇列分片,並將該佇列分片新的消費進度進行記錄;其中,該狀態資料包括該佇列租期表中該佇列分片下消費者欄位的值、租用者欄位的值和更新時間欄位的消費時間。 A method for coordinating consumption queues in a distributed environment, which is characterized in that: for a queue segment to be consumed, based on a lease agreement, obtaining the queue lease time table stored in a persistent storage space The status data of the queue segment; based on the status data, determine whether other clients are consuming the queue segment; if it is determined that no other clients are consuming the queue segment, update the status of the queue segment Data, and obtain the current consumption progress of the queue segment; continue to consume the queue segment according to the current consumption progress, and record the new consumption progress of the queue segment; where the status data includes the queue The value of the consumer field, the value of the renter field, and the consumption time of the update time field in the queue segment in the lease period table. 根據請求項1所述的方法,其中,在針對一待消費的佇列分片,基於租賃協定獲取該佇列分片的狀態資料的步驟之前,還包括:基於租賃協議確定當前用戶端需求的待消費的佇列分片。 The method according to claim 1, wherein, before the step of obtaining the status data of the queue segment based on the lease agreement for a queue segment to be consumed, the method further includes: determining the current client demand based on the lease agreement Queue fragments to be consumed. 根據請求項2所述的方法,其中,該基於租賃協議確定當前用戶端需求的待消費的佇列分片的步驟,包括: 獲取活躍的用戶端總數U、佇列分片總數P以及當前用戶端已消費的佇列分片總數Q,以計算當前用戶端需要搶佔的佇列分片數量N;從超時的佇列分片和/或者其他客戶端正在消費的佇列分片中,搶佔N個佇列分片作為當前用戶端的待消費佇列分片。 The method according to claim 2, wherein the step of determining the queue fragments to be consumed based on the current client demand based on the lease agreement includes: Obtain the total number of active clients U, the total number of queue fragments P, and the total number of queue fragments consumed by the current client Q to calculate the number of queue fragments N that the current client needs to preempt; Among the fragments and/or queue fragments being consumed by other clients, preempt the N queue fragments as the queue fragments to be consumed by the current client. 根據請求項3所述的方法,其中,該獲取活躍的用戶端總數U、佇列分片總數P以及當前用戶端已消費的佇列分片總數Q的步驟,包括:從該持久化儲存空間中儲存的用戶端實例表和該佇列租期表中獲取活躍的用戶端總數U;從佇列系統中獲取佇列分片的總個數P;從該持久化儲存空間中儲存的該佇列租期表中獲取當前用戶端已消費的佇列分片總數Q;通過N=[P/U]-Q,計算當前用戶端需要搶佔的佇列分片數量N。 The method according to claim 3, wherein the step of obtaining the total number U of active clients, the total number P of queued shards, and the total number Q of queued shards consumed by the current client includes: obtaining from the persistent storage space The total number of active clients U is obtained from the client instance table stored in the Queue and the Queue Lease Table; the total number of shards P is obtained from the Queue System; the queue is stored from the persistent storage space Obtain the total number of queue fragments Q consumed by the current client in the column lease period table; pass N=[P/U]-Q to calculate the number of queue fragments N that the current client needs to preempt. 根據請求項4所述的方法,其中,該從該持久化儲存空間中儲存的用戶端實例表和該佇列租期表中獲取活躍的用戶端總數U步驟,包括:獲取用戶端實例表中記錄的最近一個第一時間週期內啟動的用戶端的數量live1;獲取該佇列租期表中,沒有超時佇列分片對應的用戶端的數量live2;該沒有超時的用戶端為佇列分片的該租用者欄位下記錄的用戶端; 將live1加上live2得到總的活躍的用戶端數量U。 The method according to claim 4, wherein the step of obtaining the total number of active clients U from the client instance table stored in the persistent storage space and the queue lease period table includes: obtaining the client instance table Record the number of clients started in the first time period live1; get the number of clients live2 corresponding to the queue segment without timeout in the queue lease table; the client without timeout is the queue segment The client recorded under the renter field of the film; Add live1 to live2 to get the total number of active clients U. 根據請求項3所述的方法,其中,該從超時的佇列分片和/或者其他客戶端正在消費的佇列分片中,搶佔N個佇列分片作為當前用戶端的待消費佇列分片的步驟,包括:判斷該N是否大於0;如果該N大於0,則判斷超時的佇列分片數量L是否小於N;如果超時的佇列分片數量L大於等於N,則從超時的佇列分片中搶佔N個佇列分片;如果判斷超時的佇列分片數量L小於N,則從超時的佇列分片中搶佔L個佇列分片,並從其他客戶端正在消費的佇列分片中,搶佔N-L個佇列分片。 The method according to claim 3, wherein, among the queue fragments that have timed out and/or the queue fragments being consumed by other clients, preempt the N queue fragments as the queue to be consumed by the current client The step of fragmentation includes: judging whether the N is greater than 0; if the N is greater than 0, judging whether the number of timeout queue fragments L is less than N; if the number of timeout queue fragments L is greater than or equal to N, then Preempt N queue fragments from the timeout queue fragments; if it is determined that the number of timeout queue fragments L is less than N, then seize L queue fragments from the timeout queue fragments, and Preempt NL queue fragments from the queue fragments being consumed by other clients. 根據請求項6所述的方法,其中,該從其他客戶端正在消費的佇列分片中,搶佔N-L個佇列分片的步驟,包括:當前用戶端從其他用戶端佔用的佇列分片中搶佔N-L各佇列分片,並使各用戶端佔用最多佇列分片的數量和佔用最少佇列分片的數量之差不超過指定數量。 The method according to claim 6, wherein the step of preempting NL queue fragments from the queue fragments being consumed by other clients includes: the current client's queue fragments occupied by other clients Preempt the NL queue fragments in the middle, and make the difference between the number of the largest queue fragments occupied by each client and the number of the least queue fragments occupied by each client not exceed the specified number. 根據請求項7所述的方法,其中,該當前用戶端從其他用戶端佔用的佇列分片中搶佔N-L各佇列分片,並使各用戶端佔用最多佇列分片的數量和佔用最少佇列分片的數量之差不超過指定數量的步驟,包括:基於該佇列租期表,將各個用戶端按照其佔用的佇列 分片數量,從多到少進行排序;每次從前K個用戶端中搶佔一個或多個搶佔佇列分片後,使前K個用戶端被搶佔J個之後,前K個用戶端剩下的佇列分片的平均數大於第K+1個用戶端當前佔用的佇列分片數量,並且使前K個用戶端中,各個用戶端剩餘的佇列分片數與該平均數相差不超過指定數量,直至成功搶佔到N-L個佇列分片。 The method according to claim 7, wherein the current client preempts each queue segment of NL from the queue segments occupied by other client terminals, and makes each client occupy the largest number of queue segments and the least occupied The steps for the difference between the number of queued shards not to exceed the specified number, including: based on the queued lease schedule, each client is queued according to its occupied queue The number of fragments is sorted from most to least; each time one or more preemption queue fragments are preempted from the first K client terminals, after the first K client terminals are preempted by J, the first K client terminals remain The average number of queue fragments is greater than the number of queue fragments currently occupied by the K+1th client, and the remaining queue fragments of each client in the first K client terminals are not different from the average number The specified number is exceeded, until NL queue fragments are successfully preempted. 根據請求項3所述的方法,其中,該從超時的佇列分片和/或者其他客戶端正在消費的佇列分片中,搶佔N個佇列分片作為當前用戶端的待消費佇列分片的步驟包括:在搶佔一佇列分片時,將該佇列租期表中,被搶佔佇列分片的該消費者欄位修改為當前的用戶端。 The method according to claim 3, wherein, among the queue fragments that have timed out and/or the queue fragments being consumed by other clients, preempt the N queue fragments as the queue to be consumed by the current client The fragmentation step includes: when a queue fragment is preempted, the consumer field of the preempted queue fragment in the queue lease table is modified to the current client. 根據請求項3所述的方法,其中,該從超時的佇列分片和/或者其他客戶端正在消費的佇列分片中,搶佔N個佇列分片作為當前用戶端的待消費佇列分片的步驟之前,還包括:每隔第一時間週期,獲取該佇列租期表,並判斷該佇列租期表中各佇列分片下該租用者欄位的租用ID,是否與本機存放區的map表中上次記錄的相應佇列分片的租用ID相同;如果該佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID不同,則更新map表中該佇列分片的租用ID為該佇列租期表中的租用 ID,以及更新map表中該佇列分片的上次監控時間欄位為當前的系統時間;如果該佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID相同,則維持上次監控時間欄位下的系統時間,並判斷當前系統時間減去上次監控的系統時間是否大於第一時間週期;如果當前系統時間減去上次監控的系統時間大於第一時間週期,則確定相應佇列分片超時。 The method according to claim 3, wherein, among the queue fragments that have timed out and/or the queue fragments being consumed by other clients, preempt the N queue fragments as the queue to be consumed by the current client Before the fragmentation step, it also includes: every first time period, obtain the queue lease table, and determine whether the lease ID of the tenant field under each queue fragment in the queue lease table is the same as The lease ID of the corresponding queue segment recorded last time in the map table of the local storage area is the same; if the lease ID of a queue segment in the queue lease table is the same as the corresponding queue recorded last time in the map table If the lease ID of the segment is different, update the lease ID of the queue segment in the map table to the lease in the queue lease period table ID, and update the last monitoring time field of the queue segment in the map table to the current system time; if the lease ID of a queue segment in the queue lease period table is the same as the last recorded time in the map table If the lease ID of the corresponding queue shards is the same, the system time under the last monitoring time column is maintained, and it is judged whether the current system time minus the last monitored system time is greater than the first time period; if the current system time minus the last The system time of the second monitoring is greater than the first time period, then it is determined that the corresponding queue segment has timed out. 根據請求項2所述的方法,其中,該基於租賃協議確定當前用戶端需求的待消費的佇列分片的步驟包括:基於租賃協議每隔第一時間週期,確定當前用戶端需求的待消費的佇列分片。 The method according to claim 2, wherein the step of determining the queue fragments to be consumed for the current client demand based on the lease agreement comprises: determining the to-be-consumed current client demand based on the lease agreement every first time period The queue shards. 根據請求項1所述的方法,其中,該消費時間為當前用戶端搶佔該佇列分片後修改該更新時間欄位的值獲得;則,該根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片的步驟,包括:判斷該佇列分片的該消費者欄位的值是否為當前用戶端;如果該佇列分片的該消費者欄位的值是當前用戶端,則判斷當前用戶端的搶佔時間是否大於該消費時間;如果當前用戶端的搶佔時間大於該消費時間,則確定沒有其他用戶端在消費該佇列分片。 The method according to claim 1, wherein, the consumption time is obtained by modifying the value of the update time field after the current client has preempted the queue segment; then, it is determined whether there are other clients on the basis of the status data The step of consuming the queue segment includes: determining whether the value of the consumer field of the queue segment is the current client; if the value of the consumer field of the queue segment is the current client, It is determined whether the preemption time of the current user terminal is greater than the consumption time; if the preemption time of the current user terminal is greater than the consumption time, it is determined that no other client terminal is consuming the queue segment. 根據請求項12所述的方法,其中,該搶佔時間 為當前用戶端的系統時間,該消費時間為該用戶端搶佔該佇列分片時的系統時間與第一週期時間之和。 The method according to claim 12, wherein the preemption time Is the system time of the current user end, and the consumption time is the sum of the system time when the user end preempts the queue segment and the first cycle time. 根據請求項1所述的方法,其中,該更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度的步驟,包括:租用該佇列分片,並將該持久化儲存空間中儲存該佇列租期表中該佇列分片下的該消費者欄位更新為當前用戶端;從該持久化儲存空間中儲存的該佇列租期表中,獲取該佇列分片的進度欄位下的消費進度。 The method according to claim 1, wherein the step of updating the status data of the queue segment and obtaining the current consumption progress of the queue segment includes: renting the queue segment and making the queue persistent The consumer field under the queue shard in the queue lease table stored in the storage space is updated to the current client; the queue is obtained from the queue lease table stored in the persistent storage space The consumption progress under the progress column of the shard. 根據請求項14所述的方法,其中,該將該佇列分片新的消費進度進行記錄的步驟,包括:每隔第二時間週期,判斷該佇列租期表中,該佇列分片的該租用者欄位是否為當前用戶端;如果該佇列分片的該租用者欄位是當前用戶端,則將當前用戶端對該佇列分片的消費進度,更新到該佇列租期表中該佇列分片的進度欄位下。 The method according to claim 14, wherein the step of recording the new consumption progress of the queue segment includes: every second time period, determining that the queue segment is in the queue lease table Whether the tenant field of the queue is the current client; if the tenant field of the queue segment is the current client, update the current client's consumption progress for the queue segment to the queue lease Under the progress field of the queue segment in the schedule. 根據請求項14所述的方法,其中,在從該持久化儲存空間中儲存的該佇列租期表中,獲取該佇列分片的進度欄位下的消費進度的步驟之後,還包括:判斷在該佇列租期表中,該佇列分片下的搶佔者欄位和該消費者欄位是否為當前用戶端;如果該佇列分片下的搶佔者欄位和該消費者欄位是否為當前用戶端,則為當前用戶端續租該佇列分片。 The method according to claim 14, wherein, after the step of obtaining the consumption progress under the progress field of the queue segment from the queue lease time table stored in the persistent storage space, the method further includes: Determine whether the preemptor field and the consumer field under the queue segment in the queue lease table are the current client; if the preemptor field under the queue segment and the consumer field Whether the bit is the current client, the queue segment is renewed for the current client. 一種分散式環境協調消費佇列的裝置,其特徵在於,包括:狀態資料獲取模組,用於針對一待消費的佇列分片,基於租賃協定獲取該佇列分片的狀態資料;消費判斷模組,用於根據該狀態資料,判斷是否有其他用戶端在消費該佇列分片;進度獲取模組,用於如果確定沒有其他用戶端在消費該佇列分片,則更新該佇列分片的狀態資料,並獲取該佇列分片當前的消費進度;消費模組,用於根據當前的消費進度繼續消費該佇列分片,並將該佇列分片新的消費進度進行記錄;其中,該狀態資料獲取模組包括:第一狀態資料獲取子模組,用於針對一待消費的佇列分片,基於該租賃協定,從持久化儲存空間中儲存的佇列租期表中獲取該佇列分片的該狀態資料;其中,該狀態資料包括該佇列租期表中該佇列分片下消費者欄位的值、租用者欄位的值和更新時間欄位的消費時間。 A device for coordinating consumption queues in a distributed environment is characterized by comprising: a status data acquisition module for acquiring status data of a queue segment to be consumed based on a lease agreement; consumption judgment The module is used to determine whether other clients are consuming the queue segment based on the status data; the progress acquisition module is used to update the queue if it is determined that no other client is consuming the queue segment The status data of the shard and obtain the current consumption progress of the queue shard; the consumption module is used to continue to consume the queue shard according to the current consumption progress, and record the new consumption progress of the queue shard ; Wherein, the state data acquisition module includes: a first state data acquisition sub-module, used for a queue segment to be consumed, based on the lease agreement, from the queue lease table stored in the persistent storage space Obtain the status data of the queue segment in the queue; where the status data includes the value of the consumer field under the queue segment in the queue lease table, the value of the tenant field, and the update time field Consumption time. 根據請求項17所述的裝置,其中,在狀態資料獲取模組之前,還包括:佇列分片確定模組,用於基於租賃協定確定當前用戶端需求的待消費的佇列分片。 The device according to claim 17, wherein, before the status data acquisition module, it further includes: a queue segment determination module, configured to determine the queue segment to be consumed based on the current client demand based on the lease agreement. 根據請求項18所述的裝置,其中,該佇列分片確定模組包括: 需求數量確定子模組,用於獲取活躍的用戶端總數U、佇列分片總數P以及當前用戶端已消費的佇列分片總數Q,以計算當前用戶端需要搶佔的佇列分片數量N;搶佔子模組,用於從超時的佇列分片和/或者其他客戶端正在消費的佇列分片中,搶佔N個佇列分片作為當前用戶端的待消費佇列分片。 The device according to claim 18, wherein the queue fragment determination module includes: The demand quantity determination sub-module is used to obtain the total number of active clients U, the total number of queue shards P, and the total number of queue shards consumed by the current client Q to calculate the number of queue shards that the current client needs to preempt N; The preemption sub-module is used to preempt N queue segments from the timeout queue fragments and/or the queue fragments being consumed by other clients as the current client's queue fragments to be consumed. 根據請求項19所述的裝置,其中,該需求數量確定子模組包括:用戶端總數獲取子模組,用於從該持久化儲存空間中儲存的用戶端實例表和該佇列租期表中獲取活躍的用戶端總數U;整體佇列分片總數獲取子模組,用於從佇列系統中獲取佇列分片的總個數P;單體佇列分片總數獲取子模組,用於從該持久化儲存空間中儲存的該佇列租期表中獲取當前用戶端已消費的佇列分片總數Q;需求數量計算子模組,用於通過N=[P/U]-Q,計算當前用戶端需要搶佔的佇列分片數量N。 The device according to claim 19, wherein the demand quantity determination submodule includes: a client total number obtaining submodule, which is used to obtain a client instance table and the queue lease period table stored in the persistent storage space Obtain the total number of active clients U; obtain the total number of shards in the overall queue, which is used to obtain the total number of shards P from the queuing system; obtain the total number of shards in the single queue, obtain the sub-module, It is used to obtain the total number Q of queue shards consumed by the current client from the queue lease period table stored in the persistent storage space; the demand quantity calculation sub-module is used to pass N=[P/U]- Q. Calculate the number N of queue fragments that the current client needs to preempt. 根據請求項20所述的裝置,其中,該用戶端總數獲取子模組包括:新建用戶端數量獲取子模組,用於獲取用戶端實例表中記錄的最近一個第一時間週期內啟動的用戶端的數量live1;未超時用戶端數量獲取子模組,用於獲取該佇列租期 表中,沒有超時佇列分片對應的用戶端的數量live2;該沒有超時的用戶端為佇列分片的該租用者欄位下記錄的用戶端;用戶端數量累加子模組,用於將live1加上live2得到總的活躍的用戶端數量U。 The apparatus according to claim 20, wherein the total number of client terminals obtaining submodule includes: a newly created client number obtaining submodule, which is used to obtain the users that have been activated in the most recent first time period recorded in the client instance table The number of clients live1; the sub-module for obtaining the number of clients that has not expired is used to obtain the lease period of the queue In the table, the number of clients corresponding to the queue fragment without timeout live2; the client without timeout is the client recorded under the tenant field of the queue fragment; the number of clients accumulates the submodule, using Add live1 to live2 to get the total number of active users U. 根據請求項19所述的裝置,其中,該搶佔子模組包括:第一判斷子模組,用於判斷該N是否大於0;第二判斷子模組,用於如果該N大於0,則判斷超時的佇列分片數量L是否小於N;全超時搶佔子模組,用於如果超時的佇列分片數量L大於等於N,則從超時的佇列分片中搶佔N個佇列分片;混和搶佔子模組,用於如果判斷超時的佇列分片數量L小於N,則從超時的佇列分片中搶佔L個佇列分片,並從其他客戶端正在消費的佇列分片中,搶佔N-L個佇列分片。 The device according to claim 19, wherein the preemption sub-module includes: a first judging sub-module for judging whether the N is greater than 0; a second judging sub-module for if the N is greater than 0, then Determine whether the number of timeout queue fragments L is less than N; the full timeout preemption submodule is used to preempt N from the timeout queue fragments if the number of timeout queue fragments L is greater than or equal to N A queue shard; mixed preemption sub-module, used to determine that the number of timed out queue shards L is less than N, then preempt L queue shards from the timed out queue shards, and from other customers Among the queue fragments that the end is consuming, preempt the NL queue fragments. 根據請求項22所述的裝置,其中,該混和搶佔子模組包括:第一混和搶佔子模組,用於當前用戶端從其他用戶端佔用的佇列分片中搶佔N-L各佇列分片,並使各用戶端佔用最多佇列分片的數量和佔用最少佇列分片的數量之差不超過指定數量。 The device according to claim 22, wherein the hybrid preemption sub-module includes: a first hybrid preemption sub-module for the current client to preempt each queue segment of NL from the queue segments occupied by other client terminals , And make the difference between the number of shards occupying the largest queue and the number of shards occupying the least queued by each client does not exceed the specified number. 根據請求項23所述的裝置,其中,該第一混和搶佔子模組包括: 排序子模組,用於基於該佇列租期表,將各個用戶端按照其佔用的佇列分片數量,從多到少進行排序;第二混和搶佔模組,用於每次從前K個用戶端中搶佔一個或多個搶佔佇列分片後,使前K個用戶端被搶佔J個之後,前K個用戶端剩下的佇列分片的平均數大於第K+1個用戶端當前佔用的佇列分片數量,並且使前K個用戶端中,各個用戶端剩餘的佇列分片數與該平均數相差不超過指定數量,直至成功搶佔到N-L個佇列分片。 The device according to claim 23, wherein the first hybrid preemption submodule includes: The sorting sub-module is used to sort each client according to the number of queue shards occupied by it based on the queue lease time table, from most to least; the second hybrid preemption module is used for the first K each time After one or more preemption queue fragments are preempted in the client, after the first K client terminals are preempted for J, the average number of remaining queue fragments for the first K client terminals is greater than that of the K+1th client terminal The number of queue fragments currently occupied, and the remaining number of queue fragments of each client in the first K client terminals does not differ from the average number by more than a specified number, until NL queue fragments are successfully preempted. 根據請求項19所述的裝置,其中,該搶佔子模組包括:消費者欄位修改子模組,用於在搶佔一佇列分片時,將該佇列租期表中,該被搶佔佇列分片的該消費者欄位修改為當前的用戶端。 The device according to claim 19, wherein the preemption submodule includes: a consumer field modification submodule, which is used to preempt a queue segment in the lease time table, and the preempted submodule The consumer field of the queue segment is modified to the current client. 根據請求項19所述的裝置,其中,該搶佔子模組之前,還包括:map表判斷子模組,用於每隔第一時間週期,獲取該佇列租期表,並判斷該佇列租期表中各佇列分片下該租用者欄位的租用ID,是否與本機存放區的map表中上次記錄的相應佇列分片的租用ID相同;map監控時間更新子模組,用於如果該佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID不同,則更新map表中該佇列分片的租用ID為該佇列租期表中的租用ID,以及更新map表中該佇列分片的上次監控時間欄位為當前的系統時間; map監控時間維持子模組,用於如果該佇列租期表中一佇列分片的租用ID,與map表中上次記錄的相應佇列分片的租用ID相同,則維持上次監控時間欄位下的系統時間,並判斷當前系統時間減去上次監控的系統時間是否大於第一時間週期;超時判斷子模組,用於如果當前系統時間減去上次監控的系統時間大於第一時間週期,則確定相應佇列分片超時。 The device according to claim 19, wherein, before the preemption submodule, it further includes: a map table judging submodule, configured to obtain the queue lease table every first time period, and judge the queue Whether the lease ID of the tenant field under each queue segment in the lease table is the same as the lease ID of the corresponding queue segment recorded last time in the map table of the local storage area; map monitoring time update submodule , Used to update the lease of the queue segment in the map table if the lease ID of a queue segment in the queue lease table is different from the lease ID of the corresponding queue segment recorded last time in the map table ID is the lease ID in the queue lease period table, and the last monitoring time field of the queue segment in the updated map table is the current system time; The map monitoring time maintenance sub-module is used to maintain the last monitoring if the lease ID of a queue segment in the queue lease period table is the same as the lease ID of the corresponding queue segment recorded last time in the map table The system time under the time field, and determine whether the current system time minus the last monitored system time is greater than the first time period; the timeout judgment sub-module is used if the current system time minus the last monitored system time is greater than In the first time period, it is determined that the corresponding queue fragment has timed out. 根據請求項18所述的裝置,其中,該佇列分片確定模組包括:第一佇列分片確定子模組,用於基於租賃協定每隔第一時間週期,確定當前用戶端需求的待消費的佇列分片。 The device according to claim 18, wherein the queue fragmentation determination module includes: a first queue fragmentation determination sub-module configured to determine the current user end demand based on the lease agreement every first time period Queue fragments to be consumed. 根據請求項17所述的裝置,其中,該消費時間為當前用戶端搶佔該佇列分片後修改該更新時間欄位的值獲得;則,該消費判斷模組包括:消費者欄位判斷子模組,用於判斷該佇列分片的該消費者欄位的值是否為當前用戶端;搶佔時間判斷子模組,用於如果該佇列分片的該消費者欄位的值是當前用戶端,則判斷當前用戶端的搶佔時間是否大於該消費時間;確定子模組,用於如果當前用戶端的搶佔時間大於該消費時間,則確定沒有其他用戶端在消費該佇列分片。 The device according to claim 17, wherein the consumption time is obtained by modifying the value of the update time field after the current user terminal preempts the queue segment; then, the consumption judgment module includes: a consumer field judgment sub The module is used to determine whether the value of the consumer field of the queue segment is the current client; the preemption time determination sub-module is used to determine if the value of the consumer field of the queue segment is the current client The client terminal determines whether the preemption time of the current client terminal is greater than the consumption time; the determining sub-module is used to determine that no other client terminal is consuming the queue segment if the preemption time of the current client terminal is greater than the consumption time. 根據請求項28所述的裝置,其中,該搶佔時間 為當前用戶端的系統時間,該消費時間為該用戶端搶佔該佇列分片時的系統時間與第一週期時間之和。 The device according to claim 28, wherein the preemption time Is the system time of the current user end, and the consumption time is the sum of the system time when the user end preempts the queue segment and the first cycle time. 根據請求項17所述的裝置,其中,該進度獲取模組包括:租用子模組,用於租用該佇列分片,並將該持久化儲存空間中儲存該佇列租期表中該佇列分片下的該消費者欄位更新為當前用戶端;進度欄位讀取模組,用於從該持久化儲存空間中儲存的該佇列租期表中,獲取該佇列分片的進度欄位下的消費進度。 The device according to claim 17, wherein the progress acquisition module includes: a lease sub-module for renting the queue segment, and storing the queue in the queue lease time table in the persistent storage space The consumer field under the row segment is updated to the current client; the progress field reading module is used to obtain the queue segment’s data from the queue lease table stored in the persistent storage space The consumption progress under the progress bar. 根據請求項30所述的裝置,其中,該消費模組包括:租用者判斷子模組,用於每隔第二時間週期,判斷該佇列租期表中,該佇列分片的該租用者欄位是否為當前用戶端;消費進度更新子模組,用於如果該佇列分片的該租用者欄位是當前用戶端,則將當前用戶端對該佇列分片的消費進度,更新到該佇列租期表中該佇列分片的進度欄位下。 The device according to claim 30, wherein the consumption module includes: a renter judging sub-module for judging the lease of the queue segment in the queue lease period every second time period Whether the user field is the current client; the consumption progress update sub-module is used for if the tenant field of the queue segment is the current client, then the current client's consumption progress for the queue segment, Update to the progress column of the queue segment in the queue lease period table. 根據請求項30所述的裝置,其中,在租用子模組之後,還包括:續租判斷子模組,用於判斷在該佇列租期表中,該佇列分片下的搶佔者欄位和該消費者欄位是否為當前用戶端; 第一續租子模組,用於如果該佇列分片下的搶佔者欄位和該消費者欄位是否為當前用戶端,則為當前用戶端續租該佇列分片。 The device according to claim 30, wherein, after the lease sub-module, it further includes: a lease renewal judging sub-module for judging the preemptor column under the queue segment in the queue lease period table Bit and whether the consumer field is the current client; The first lease renewal sub-module is used to renew the lease of the queue segment for the current client if the preemptor field and the consumer field under the queue segment are the current client.
TW106102716A 2017-01-24 2017-01-24 Distributed environment coordinated consumption queue method and device TWI735519B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
TW106102716A TWI735519B (en) 2017-01-24 2017-01-24 Distributed environment coordinated consumption queue method and device

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
TW106102716A TWI735519B (en) 2017-01-24 2017-01-24 Distributed environment coordinated consumption queue method and device

Publications (2)

Publication Number Publication Date
TW201828199A TW201828199A (en) 2018-08-01
TWI735519B true TWI735519B (en) 2021-08-11

Family

ID=63960239

Family Applications (1)

Application Number Title Priority Date Filing Date
TW106102716A TWI735519B (en) 2017-01-24 2017-01-24 Distributed environment coordinated consumption queue method and device

Country Status (1)

Country Link
TW (1) TWI735519B (en)

Citations (9)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN102289392A (en) * 2011-09-08 2011-12-21 曙光信息产业股份有限公司 Operation scheduling method and system based on check point
TW201342284A (en) * 2011-12-29 2013-10-16 Intel Corp Method and system for mobile commerce with real-time purchase support
CN104754036A (en) * 2015-03-06 2015-07-01 合一信息技术(北京)有限公司 Message processing system and processing method based on kafka
US20150200886A1 (en) * 2014-01-14 2015-07-16 International Business Machines Corporation Message switch file sharing
US20150242827A1 (en) * 2014-02-21 2015-08-27 Lin Guo System and method for facilitating space transactions
TW201606675A (en) * 2014-08-07 2016-02-16 黃音凱 Self-leasing system for audiovisual media and leasing method for using the same
CN105550890A (en) * 2016-02-15 2016-05-04 重庆昇鑫科技有限公司 Intelligent business district management platform based on internet of things
US20160321568A1 (en) * 2013-12-20 2016-11-03 Smartseats Ip Bvba Systems and methods for redistributing tickets to an event
JP2016206779A (en) * 2015-04-17 2016-12-08 株式会社日立製作所 Transaction management method and transaction management apparatus

Patent Citations (9)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN102289392A (en) * 2011-09-08 2011-12-21 曙光信息产业股份有限公司 Operation scheduling method and system based on check point
TW201342284A (en) * 2011-12-29 2013-10-16 Intel Corp Method and system for mobile commerce with real-time purchase support
US20160321568A1 (en) * 2013-12-20 2016-11-03 Smartseats Ip Bvba Systems and methods for redistributing tickets to an event
US20150200886A1 (en) * 2014-01-14 2015-07-16 International Business Machines Corporation Message switch file sharing
US20150242827A1 (en) * 2014-02-21 2015-08-27 Lin Guo System and method for facilitating space transactions
TW201606675A (en) * 2014-08-07 2016-02-16 黃音凱 Self-leasing system for audiovisual media and leasing method for using the same
CN104754036A (en) * 2015-03-06 2015-07-01 合一信息技术(北京)有限公司 Message processing system and processing method based on kafka
JP2016206779A (en) * 2015-04-17 2016-12-08 株式会社日立製作所 Transaction management method and transaction management apparatus
CN105550890A (en) * 2016-02-15 2016-05-04 重庆昇鑫科技有限公司 Intelligent business district management platform based on internet of things

Also Published As

Publication number Publication date
TW201828199A (en) 2018-08-01

Similar Documents

Publication Publication Date Title
CN109582466B (en) Timed task execution method, distributed server cluster and electronic equipment
CN106933672B (en) Distributed environment coordinated consumption queue method and device
WO2020211579A1 (en) Processing method, device and system for distributed bulk processing system
WO2021159638A1 (en) Method, apparatus and device for scheduling cluster queue resources, and storage medium
WO2020232875A1 (en) Actor model-based task scheduling method and apparatus, and storage medium
CN105468450A (en) Task scheduling method and system
CN105049268A (en) Distributed computing resource allocation system and task processing method
CN111597270B (en) Data synchronization method, device, equipment and computer storage medium
CN111831408A (en) Asynchronous task processing method and device, electronic equipment and medium
CN113886069B (en) Resource allocation method, device, electronic device and storage medium
US10498817B1 (en) Performance tuning in distributed computing systems
CN102521265B (en) Dynamic consistency control method in massive data management
CN113760513B (en) Distributed task scheduling method, device, equipment and medium
US8959518B2 (en) Window-based scheduling using a key-value data store
CN104166589A (en) Heartbeat package processing method and device
US11816511B1 (en) Virtual partitioning of a shared message bus
CN114900449B (en) Resource information management method, system and device
CN113204353A (en) Big data platform assembly deployment method and device
CN109684051B (en) Method and system for asynchronously submitting hybrid big data task
CN111858656A (en) Static data query method and device based on distributed architecture
CN106815318B (en) Clustering method and system for time sequence database
CN111008071A (en) Task scheduling system, method and server
CN109032809A (en) Heterogeneous parallel scheduling system based on remote sensing image storage position
TWI735519B (en) Distributed environment coordinated consumption queue method and device
CN116594734A (en) Container migration method and device, storage medium and electronic equipment
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载