+

CN114780541B - Data partitioning method, device, equipment and medium in micro batch flow processing system - Google Patents

Data partitioning method, device, equipment and medium in micro batch flow processing system Download PDF

Info

Publication number
CN114780541B
CN114780541B CN202210339704.6A CN202210339704A CN114780541B CN 114780541 B CN114780541 B CN 114780541B CN 202210339704 A CN202210339704 A CN 202210339704A CN 114780541 B CN114780541 B CN 114780541B
Authority
CN
China
Prior art keywords
key
data
count
data block
batch
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN202210339704.6A
Other languages
Chinese (zh)
Other versions
CN114780541A (en
Inventor
李书亮
高杨
王霄阳
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
HONG KONG-ZHUHAI-MACAO BRIDGE AUTHORITY
Zhejiang University ZJU
Original Assignee
HONG KONG-ZHUHAI-MACAO BRIDGE AUTHORITY
Zhejiang University ZJU
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by HONG KONG-ZHUHAI-MACAO BRIDGE AUTHORITY, Zhejiang University ZJU filed Critical HONG KONG-ZHUHAI-MACAO BRIDGE AUTHORITY
Priority to CN202210339704.6A priority Critical patent/CN114780541B/en
Publication of CN114780541A publication Critical patent/CN114780541A/en
Application granted granted Critical
Publication of CN114780541B publication Critical patent/CN114780541B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Classifications

    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/22Indexing; Data structures therefor; Storage structures
    • G06F16/2228Indexing structures
    • G06F16/2255Hash tables
    • GPHYSICS
    • G06COMPUTING OR CALCULATING; COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F16/00Information retrieval; Database structures therefor; File system structures therefor
    • G06F16/20Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
    • G06F16/22Indexing; Data structures therefor; Storage structures
    • G06F16/2282Tablespace storage structures; Management thereof

Landscapes

  • Engineering & Computer Science (AREA)
  • Theoretical Computer Science (AREA)
  • Software Systems (AREA)
  • Data Mining & Analysis (AREA)
  • Databases & Information Systems (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Information Retrieval, Db Structures And Fs Structures Therefor (AREA)

Abstract

本申请涉及数据流实时处理技术领域,提供了一种微批流处理系统中的数据分区方法、装置、计算机设备、存储介质和计算机程序产品。本申请通过频率感知缓冲技术来使得批分区前准备工作所需时间最小化,遍历平衡二叉树可以得到一个键及其频率相关信息的有序列表,减少了处理阶段的排序时间,在批分区阶段通过将问题抽象为经典装箱问题,限制了键的碎片化程度,使得数据块之间的基数差异最小化,并保持各数据块大小相等,实现了对数据分区的负载平衡,在处理阶段把问题抽象为可变容量装箱问题,使用最差适应算法来分配键簇,保证了任务间的负载平衡,可以在不增加延迟的情况下大幅提高数据处理吞吐量。

The present application relates to the technical field of real-time data stream processing, and provides a data partitioning method, device, computer equipment, storage medium and computer program product in a micro-batch stream processing system. The present application minimizes the time required for preparatory work before batch partitioning through frequency-aware buffering technology, and traverses a balanced binary tree to obtain an ordered list of keys and their frequency-related information, reducing the sorting time in the processing stage. In the batch partitioning stage, the problem is abstracted as a classic packing problem, which limits the degree of key fragmentation, minimizes the cardinality difference between data blocks, and keeps the size of each data block equal, thereby achieving load balancing of data partitions. In the processing stage, the problem is abstracted as a variable capacity packing problem, and the worst-fit algorithm is used to allocate key clusters, ensuring load balancing between tasks, which can greatly improve data processing throughput without increasing latency.

Description

微批流处理系统中的数据分区方法、装置、设备和介质Data partitioning method, device, equipment and medium in micro-batch stream processing system

技术领域Technical Field

本申请涉及数据流实时处理技术领域,特别是涉及一种微批流处理系统中的数据分区方法、装置、计算机设备、存储介质和计算机程序产品。The present application relates to the technical field of real-time data stream processing, and in particular to a data partitioning method, apparatus, computer equipment, storage medium and computer program product in a micro-batch stream processing system.

背景技术Background technique

随着大数据技术的发展成熟,实时处理的需求愈加广泛,常分布于社交网络分析、点击流量分析等应用中。实时处理大数据流的重要性不言而喻,这就导致了大量分布式流处理系统的产生。With the development and maturity of big data technology, the demand for real-time processing has become more and more widespread, often distributed in applications such as social network analysis and click flow analysis. The importance of real-time processing of big data streams is self-evident, which has led to the emergence of a large number of distributed stream processing systems.

目前技术中,例如Spark Streaming、Comet、Google Dataflow等一些微批流处理系统采用了一次一批的处理模型来提高处理吞吐量,相比于传统一次一元组的流处理系统,微批流处理系统具有速度更快、容错机制更高效等优点。然而这种技术中的微批流处理系统使用基本数据分区技术,其性能对负载特性的动态变化非常敏感,资源利用率非常依赖于在处理单元上均匀的划分工作负载。In current technologies, some micro-batch stream processing systems, such as Spark Streaming, Comet, and Google Dataflow, use a batch-at-a-time processing model to improve processing throughput. Compared with traditional one-tuple-at-a-time stream processing systems, micro-batch stream processing systems have the advantages of faster speed and more efficient fault tolerance mechanism. However, the micro-batch stream processing systems in this technology use basic data partitioning technology, and their performance is very sensitive to dynamic changes in load characteristics. Resource utilization is very dependent on evenly dividing the workload on the processing unit.

发明内容Summary of the invention

基于此,有必要针对上述技术问题,提供一种微批流处理系统中的数据分区方法、装置、计算机设备、存储介质和计算机程序产品。Based on this, it is necessary to provide a data partitioning method, apparatus, computer equipment, storage medium and computer program product in a micro-batch stream processing system to address the above technical problems.

第一方面,本申请提供了一种微批流处理系统中的数据分区方法。所述方法包括:In a first aspect, the present application provides a data partitioning method in a micro-batch stream processing system. The method comprises:

获取数据流元组;Get data stream tuple;

基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;The data stream tuple is maintained based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree;

遍历所述平衡二叉搜索树,生成所述有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;Traversing the balanced binary search tree to generate the ordered list; wherein the ordered list includes the key, the frequency count of the key and the tuple list corresponding to the key;

基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;Based on the preset partitioning conditions, the data stream tuples in the ordered list are partitioned in batches; wherein each partition is a data block, and each data block stores information on whether the key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning conditions include: limiting the number of splits of a single item, minimizing the number of different single items in a data block, and maintaining equal capacity of each data block;

通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定。The key clusters are allocated to the buckets of the Reduce stage for processing by using the information of whether the key in the data block is split based on the worst fit algorithm through the Map task; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of the bucket is determined according to the ratio of the number of key clusters to the number of buckets.

在其中一个实施例中,所述方法还包括:记录各批处理时间与批间隔;获取所述各批处理时间与所述批间隔的比例;根据预设比例阈值,获取所述比例满足所述预设比例阈值的连续批计数;根据所述连续批计数,调整Map任务和/或Reduce任务。In one of the embodiments, the method further includes: recording each batch processing time and batch interval; obtaining the ratio of each batch processing time to the batch interval; according to a preset ratio threshold, obtaining the continuous batch count whose ratio satisfies the preset ratio threshold; and adjusting the Map task and/or Reduce task according to the continuous batch count.

在其中一个实施例中,所述预设比例阈值包括第一比例阈值;所述根据预设比例阈值,获取所述比例满足所述预设比例阈值的连续批计数,包括:获取所述比例大于所述第一比例阈值的连续批计数。In one of the embodiments, the preset ratio threshold includes a first ratio threshold; obtaining the continuous batch count whose ratio satisfies the preset ratio threshold according to the preset ratio threshold includes: obtaining the continuous batch count whose ratio is greater than the first ratio threshold.

在其中一个实施例中,所述根据所述连续批计数,调整Map任务和/或Reduce任务,包括:当第一连续批计数达到预设计数阈值时,在数据率增加的情况下增加Map任务,在数据分布增加的情况下增加Reduce任务;其中,所述第一连续批计数为所述比例大于所述第一比例阈值的连续批计数。In one of the embodiments, adjusting the Map task and/or Reduce task according to the continuous batch count includes: when the first continuous batch count reaches a preset count threshold, adding Map tasks when the data rate increases, and adding Reduce tasks when the data distribution increases; wherein the first continuous batch count is a continuous batch count whose ratio is greater than the first ratio threshold.

在其中一个实施例中,所述预设比例阈值包括第二比例阈值;所述根据预设比例阈值,获取所述比例满足所述预设比例阈值的连续批计数,包括:获取所述比例小于所述第二比例阈值的连续批计数。In one of the embodiments, the preset ratio threshold includes a second ratio threshold; obtaining the continuous batch count whose ratio satisfies the preset ratio threshold according to the preset ratio threshold includes: obtaining the continuous batch count whose ratio is less than the second ratio threshold.

在其中一个实施例中,所述根据所述连续批计数,调整Map任务和/或Reduce任务,包括:当第二连续批计数达到预设计数阈值时,在数据率减少的情况下减少Map任务,在数据分布减少的情况下减少Reduce任务;其中,所述第二连续批计数为所述比例小于所述第二比例阈值的连续批计数。In one of the embodiments, adjusting the Map task and/or Reduce task according to the continuous batch count includes: when the second continuous batch count reaches a preset count threshold, reducing the Map task when the data rate decreases, and reducing the Reduce task when the data distribution decreases; wherein the second continuous batch count is a continuous batch count whose ratio is less than the second ratio threshold.

第二方面,本申请还提供了一种微批流处理系统中的动态数据分区装置。In a second aspect, the present application also provides a dynamic data partitioning device in a micro-batch stream processing system.

所述装置包括:The device comprises:

获取模块,用于获取数据流元组;The acquisition module is used to obtain the data stream tuple;

维护模块,用于基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;A maintenance module, for maintaining the data stream tuple based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree;

生成模块,用于遍历所述平衡二叉搜索树,生成所述有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;A generating module, configured to traverse the balanced binary search tree to generate the ordered list; wherein the ordered list comprises the key, the frequency count of the key and the tuple list corresponding to the key;

分区模块,用于基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;A partitioning module is used to partition the data stream tuples in the ordered list by batches based on preset partitioning conditions; wherein each partition is a data block, and each data block stores information on whether a key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning conditions include: limiting the number of splits of a single item, minimizing the number of different single items in a data block, and maintaining equal capacity of each data block;

分配处理模块,用于通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定。An allocation processing module is used to allocate key clusters to buckets in the Reduce stage for processing based on the worst fit algorithm using information on whether the keys in the data blocks are split through Map tasks; wherein the output of the Map stage is a cluster composed of key values, each key cluster having all data values of the same key; and the capacity of a bucket is determined according to the ratio of the number of key clusters to the number of buckets.

第三方面,本申请还提供了一种计算机设备。所述计算机设备包括存储器和处理器,所述存储器存储有计算机程序,所述处理器执行所述计算机程序时实现以下步骤:In a third aspect, the present application further provides a computer device. The computer device includes a memory and a processor, the memory stores a computer program, and the processor implements the following steps when executing the computer program:

获取数据流元组;基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;遍历所述平衡二叉搜索树,生成所述有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定。Obtain data stream tuples; maintain the data stream tuples based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer to a corresponding frequency count node in the balanced binary search tree; traverse the balanced binary search tree to generate the ordered list; wherein the ordered list contains the key, the frequency count of the key, and the tuple list corresponding to the key; based on a preset partitioning condition, partition the data stream tuples in the ordered list by batches; wherein each partition is a data block, each data block stores information on whether the key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning conditions include: limiting the number of split times of a single item, minimizing the number of different single items in a data block, and maintaining the capacity of each data block equal; through a Map task based on a worst-fit algorithm, using the information on whether the key in the data block is split, the key cluster is allocated to the buckets of the Reduce stage for processing; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of a bucket is determined according to the ratio of the number of key clusters to the number of buckets.

第四方面,本申请还提供了一种计算机可读存储介质。所述计算机可读存储介质,其上存储有计算机程序,所述计算机程序被处理器执行时实现以下步骤:In a fourth aspect, the present application further provides a computer-readable storage medium. The computer-readable storage medium stores a computer program, and when the computer program is executed by a processor, the following steps are implemented:

获取数据流元组;基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;遍历所述平衡二叉搜索树,生成所述有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定。Obtain data stream tuples; maintain the data stream tuples based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree; traverse the balanced binary search tree to generate the ordered list; wherein the ordered list contains the key, the frequency count of the key, and the tuple list corresponding to the key; based on a preset partitioning condition, partition the data stream tuples in the ordered list by batches; wherein each partition is a data block, each data block stores information on whether the key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning conditions include: limiting the number of split times of a single item, minimizing the number of different single items in a data block, and maintaining the capacity of each data block equal; the Map task uses the information on whether the key in the data block is split to allocate key clusters to the buckets of the Reduce stage for processing based on the worst fit algorithm; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of a bucket is determined according to the ratio of the number of key clusters to the number of buckets.

第五方面,本申请还提供了一种计算机程序产品。所述计算机程序产品,包括计算机程序,该计算机程序被处理器执行时实现以下步骤:In a fifth aspect, the present application further provides a computer program product. The computer program product includes a computer program, and when the computer program is executed by a processor, the following steps are implemented:

获取数据流元组;基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;遍历所述平衡二叉搜索树,生成所述有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定。Obtain data stream tuples; maintain the data stream tuples based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree; traverse the balanced binary search tree to generate the ordered list; wherein the ordered list contains the key, the frequency count of the key, and the tuple list corresponding to the key; based on a preset partitioning condition, partition the data stream tuples in the ordered list by batches; wherein each partition is a data block, each data block stores information on whether the key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning conditions include: limiting the number of split times of a single item, minimizing the number of different single items in a data block, and maintaining the capacity of each data block equal; the Map task uses the information on whether the key in the data block is split to allocate key clusters to the buckets of the Reduce stage for processing based on the worst fit algorithm; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of a bucket is determined according to the ratio of the number of key clusters to the number of buckets.

上述微批流处理系统中的数据分区方法、装置、计算机设备、存储介质和计算机程序产品,通过频率感知缓冲技术来使得批分区前准备工作所需时间最小化,遍历平衡二叉树可以得到一个键及其频率相关信息的有序列表,减少了处理阶段的排序时间,在批分区阶段通过将问题抽象为经典装箱问题,限制了键的碎片化程度,使得数据块之间的基数差异最小化,并保持各数据块大小相等,实现了对数据分区的负载平衡,在处理阶段把问题抽象为可变容量装箱问题,使用最差适应算法来分配键簇,保证了任务间的负载平衡,可以在不增加延迟的情况下大幅提高数据处理吞吐量。The data partitioning method, apparatus, computer equipment, storage medium and computer program product in the above-mentioned micro-batch stream processing system minimize the time required for preparation before batch partitioning through frequency-aware buffering technology. Traversing the balanced binary tree can obtain an ordered list of keys and their frequency-related information, reducing the sorting time in the processing stage. In the batch partitioning stage, the problem is abstracted as a classic packing problem, which limits the degree of key fragmentation, minimizes the cardinality difference between data blocks, and keeps the size of each data block equal, thereby achieving load balancing of data partitions. In the processing stage, the problem is abstracted as a variable-capacity packing problem, and the worst-fit algorithm is used to allocate key clusters, thereby ensuring load balancing between tasks, and can greatly improve data processing throughput without increasing latency.

附图说明BRIEF DESCRIPTION OF THE DRAWINGS

图1为一个实施例中微批流处理系统中的数据分区方法的流程示意图;FIG1 is a schematic flow chart of a data partitioning method in a micro-batch stream processing system according to an embodiment;

图2为一个实施例中微批流处理系统中数据缓存及动态分区的流程示意图;FIG2 is a schematic diagram of a process flow of data caching and dynamic partitioning in a micro-batch stream processing system in one embodiment;

图3为一个实施例中频率感知技术流程图;FIG3 is a flow chart of a frequency sensing technique in one embodiment;

图4为一个实施例中分批阶段实现平衡负载分区流程图;FIG4 is a flow chart of implementing load balancing partitioning in batch stages in one embodiment;

图5为一个实施例中处理阶段分区流程图;FIG5 is a flowchart of processing phase partitioning in one embodiment;

图6为一个实施例中微批流处理系统中的数据分区装置的结构框图;FIG6 is a block diagram of a data partitioning device in a micro-batch stream processing system according to an embodiment;

图7为一个实施例中计算机设备的内部结构图。FIG. 7 is a diagram showing the internal structure of a computer device in one embodiment.

具体实施方式Detailed ways

为了使本申请的目的、技术方案及优点更加清楚明白,以下结合附图及实施例,对本申请进行进一步详细说明。应当理解,此处描述的具体实施例仅仅用以解释本申请,并不用于限定本申请。In order to make the purpose, technical solution and advantages of the present application more clearly understood, the present application is further described in detail below in conjunction with the accompanying drawings and embodiments. It should be understood that the specific embodiments described herein are only used to explain the present application and are not used to limit the present application.

本申请实施例提供的微批流处理系统中的数据分区方法,可以应用于服务器中,该服务器可以用独立的服务器或者是多个服务器组成的服务器集群来实现。The data partitioning method in the micro-batch stream processing system provided in the embodiment of the present application can be applied to a server, which can be implemented by an independent server or a server cluster composed of multiple servers.

以下结合各实施例及相应附图对本申请提供的微批流处理系统中的数据分区方法进行详细说明。The data partitioning method in the micro-batch stream processing system provided by the present application is described in detail below in combination with various embodiments and corresponding drawings.

在一个实施例中,如图1所示并结合图2,提供了一种微批流处理系统中的数据分区方法,包括:In one embodiment, as shown in FIG. 1 and in combination with FIG. 2 , a data partitioning method in a micro-batch stream processing system is provided, including:

步骤S101,获取数据流元组;Step S101, obtaining a data stream tuple;

步骤S102,基于哈希表和平衡二叉搜索树维护数据流元组;其中,哈希表存储数据流元组的键、指向键对应的元组列表的第一指针及键的频率计数;键的频率计数还保存至平衡二叉搜索树;哈希表中每一个键均拥有指向平衡二叉搜索树中相应频率计数节点的第二指针;Step S102, maintaining data stream tuples based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree;

步骤S103,遍历平衡二叉搜索树,生成有序列表;其中,有序列表包含键、键的频率计数及键对应的元组列表;Step S103, traversing the balanced binary search tree to generate an ordered list; wherein the ordered list includes the key, the frequency count of the key and the tuple list corresponding to the key;

上述步骤S101至S103主要是通过频率感知缓冲技术来使得批分区前准备工作所需时间最小化。具体的,频率感知缓冲技术:获取一批数据流元组,建立一个哈希表和平衡二叉搜索树来维护数据流元组的统计信息。具体的,根据数据流元组的键将数据流元组存储到哈希表HTable<k,vi>,其中,k为键,vi为指向每个键对应的元组列表tupleListi的第一指针,哈希表HTable还存储键的频率计数counti,同时,键的频率计数counti会保存到平衡二叉树CountTree中,哈希表HTable中每一个键都拥有指向平衡二叉树CountTree中相应频率计数节点的双向指针(即第二指针),该第二指针允许直接更新键的计数节点。基于此,遍历平衡二叉树CountTree来生成键及其相关频率信息的有序列表<ki,counti,tupleListi>,ki表示数据流元组中第i个键。结合图3,更具体的过程如下:The above steps S101 to S103 are mainly to minimize the time required for the preparation work before batch partitioning through frequency-aware buffering technology. Specifically, frequency-aware buffering technology: obtain a batch of data stream tuples, establish a hash table and a balanced binary search tree to maintain the statistical information of the data stream tuples. Specifically, according to the key of the data stream tuple, the data stream tuple is stored in the hash table HTable<k,v i >, where k is the key, and vi is the first pointer pointing to the tuple list tupleList i corresponding to each key. The hash table HTable also stores the frequency count count i of the key. At the same time, the frequency count count i of the key will be saved in the balanced binary tree CountTree. Each key in the hash table HTable has a bidirectional pointer (i.e., the second pointer) pointing to the corresponding frequency count node in the balanced binary tree CountTree. The second pointer allows the key count node to be directly updated. Based on this, the balanced binary tree CountTree is traversed to generate an ordered list of keys and their related frequency information <k i , count i , tupleList i >, where k i represents the i-th key in the data stream tuple. Combined with Figure 3, a more specific process is as follows:

输入数据流S,批间隔为tstart-tend,设置更新补偿budget和初始频率补偿f。首先,重置哈希表HTable和用来保存频率计数的CountTree;然后循环遍历批间隔内接收到的数据流元组,将数据流元组计数Nc加1;如果该数据流元组的键在哈希表HTable中,那么将该数据流元组插入到哈希表HTable该键的链表中,并更新当前键的频率k.Freqcurr、当前键频与更新前频率的差值Deltafreq=k.Freqcurr-k.Frequpdated,现在时间和上一次更新时间的差Deltatime=Timenow-klastUpdateTime,如果键的当前频率步长kf.step等于Deltafreq或者当前时间步长kt.step等于Deltatime,那么更新CountTree中的k.Freqcurr、k.budget和k.Frequpdated,如果kf.step等于Deltafreq,则更新如果kt.step等于Deltatime则更新kt.step=(tend-TimeNow)/k.budget。如果该元组的键不在哈希表HTable中,将不同键的计数值K自增1,并将元组插入到哈希表HTable中,将元组的键插入平衡二叉树中,初始化k.Freqcurr、k.Frequpdated为1,初始化kt.step=(tend-TimeNow)/budget、kf.step=f。Input data stream S, batch interval is t start -t end , set update compensation budget and initial frequency compensation f. First, reset hash table HTable and CountTree used to store frequency counts; then loop through the data stream tuples received within the batch interval, and add 1 to the data stream tuple count Nc; if the key of the data stream tuple is in the hash table HTable, then insert the data stream tuple into the linked list of the key in the hash table HTable, and update the frequency k.Freq curr of the current key, the difference between the current key frequency and the frequency before update Delta freq = k.Freq curr -k.Freq updated , the difference between the current time and the last update time Delta time = Time now -k lastUpdateTime , if the current frequency step k f.step of the key is equal to Delta freq or the current time step k t.step is equal to Delta time , then update k.Freq curr , k.budget and k.Freq updated in CountTree, if k f.step is equal to Delta freq , then update If k t.step is equal to Delta time , update k t.step = (t end - Time Now ) / k.budget. If the key of the tuple is not in the hash table HTable, increment the count value K of different keys by 1, insert the tuple into the hash table HTable, insert the key of the tuple into the balanced binary tree, initialize k.Freq curr and k.Freq updated to 1, initialize k t.step = (t end - Time Now ) / budget, k f.step = f.

为了提高数据处理速率,常采用更粗粒度的方式来更新,即在一定时间间隔内周期性的更新budget次,其中budget为根据需要确定的补偿值,定义控制参数fstep,每收到同一个键的fstep个新元组就更新一次节点的计数,最初fstep被设置为能反映最佳步长的常数其中Nest是平均数据率下一个批间隔中的数据元组数,KAvg为过去几个批中不同键的平均数。Fstep是根据当前键的频率与当前批间隔中接收到的元组总数之比自适应地为每个键更新估计,即更高频次出现的键需要接收更多数据元组来触发更新;为了确保所有元组的节点得到更新,设置基于时间的控制参数tstep来更新长时间未得到更新的键,该参数基于键的budget更新已消耗时间和批间隔剩余持续时间来估计的。In order to improve the data processing rate, a coarser-grained approach is often used to update, that is, to periodically update the budget times within a certain time interval, where budget is a compensation value determined according to needs. The control parameter fstep is defined, and the node count is updated once every fstep new tuples with the same key are received. Initially, fstep is set to a constant that reflects the optimal step size. Where Nest is the number of data tuples in the next batch interval at the average data rate, and K Avg is the average number of different keys in the past batches. Fstep is adaptively updated for each key based on the ratio of the frequency of the current key to the total number of tuples received in the current batch interval, that is, keys that appear more frequently need to receive more data tuples to trigger updates; in order to ensure that all tuple nodes are updated, a time-based control parameter tstep is set to update keys that have not been updated for a long time. This parameter is estimated based on the budget update time consumed by the key and the remaining duration of the batch interval.

步骤S104,基于预设分区条件,将有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;Step S104, based on the preset partitioning conditions, the data stream tuples in the ordered list are partitioned by batches; wherein each partition is a data block, and each data block stores information on whether the key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning conditions include: limiting the number of splits of a single item, minimizing the number of different single items in a data block, and maintaining the capacity of each data block equal;

上述步骤S104主要是平衡负载分区。具体的,所有共享相同键值的数据流元组被建模为一个单项,将步骤S103中有序列表中的数据流元组按批分区,每个分区为一个数据块,每个数据块中需存储键是否被分割的信息,分区过程需要满足以下预设分区条件:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等。The above step S104 is mainly to balance the load partition. Specifically, all data stream tuples sharing the same key value are modeled as a single item, and the data stream tuples in the ordered list in step S103 are partitioned in batches, each partition is a data block, and each data block needs to store information on whether the key is split. The partitioning process needs to meet the following preset partitioning conditions: limit the number of splits of a single item, minimize the number of different items in a data block, and maintain the capacity of each data block equal.

具体来说,该步骤S104是将批分区问题定义为可拆分项目的平衡装箱问题,给定拥有N个不同项的集合:k1,k2…,kN,每项大小为Sn,其中1≤n≤N,给定B={b1,b2,…,bB}个箱子,每个箱子容量为C,那么可拆分项的平衡装箱问题即将各项在同时满足以下条件的情况下分配到不同箱中:(1)对于任意bj,j∈[1,B],都有箱内的元组数等于箱的容量C;(2)对于任意bj,j∈[1,B],都有箱内不同项的个数大于等于N/B;(3)对于任意项,要求被分割次数尽可能少。装箱时,将频率counti大于数据块大小与数据块基数之比的键对应的数据流元组拆分为两项,其中一项的数据流元组大小等于数据块大小与数据块基数之比,将其放入数据块中,另一项放入一个新的列表中;然后,将排序列表中剩余键按蛇形排列分配给数据块,最后再按最佳适应算法将新的列表中的键分配给数据块。Specifically, step S104 defines the batch partitioning problem as a balanced packing problem for splittable items. Given a set of N different items: k 1 , k 2 , …, k N , each item is of size Sn , where 1≤n≤N, and given B={b 1 ,b 2 ,…,b B } boxes, each box has a capacity of C. Then, the balanced packing problem for splittable items is to allocate each item to different boxes while satisfying the following conditions: (1) for any b j , j∈[1,B], the number of tuples in the box is equal to the capacity of the box C; (2) for any b j , j∈[1,B], the number of different items in the box is greater than or equal to N/B; (3) for any item, it is required to be split as few times as possible. When packing, split the data stream tuples corresponding to the keys whose frequency count i is greater than the ratio of the data block size to the data block cardinality into two items, one of which has a data stream tuple size equal to the ratio of the data block size to the data block cardinality, put it into the data block, and the other is put into a new list; then, assign the remaining keys in the sorted list to the data blocks in a serpentine arrangement, and finally assign the keys in the new list to the data blocks according to the best fit algorithm.

结合图4,更具体的过程如下:Combined with Figure 4, the more specific process is as follows:

该过程可以由三个独立循环遍历算法构成。具体如下:a)遍历上述二叉树得到键及其频率信息的有序列表<ki,counti,tupleListi>和元组计数值Nc、不同键的计数值K作为输入,设置所需数据分区数P;定义分区大小PSize=Nc/P,分区基数Pk=K/P,分割键的阈值Scut=PSize/Pk,设置当前分区bj为第一个分区b1;b)遍历列表中的键,当其counti大于Scut时,将Scut个元组放入bj中,同时将剩余部分放入临时列表RList,并更新该键对应的bj所在位置为Pos(k)=bj;并设置bj=bj%P,j自增1,然后重复步骤b)直到不存在counti大于Scut;c)遍历List中剩下的键,遍历分区bj依次放入一个键,遍历完分区后逆转分区顺序,重复步骤c);d)遍历临时列表RList中的键,设b=Pos(k),如果该键能全部放入b中则将键放入b,否则先将b装满,然后将剩余部分装到能容纳它的剩余容量最小的分区中。This process can be composed of three independent loop traversal algorithms. Specifically as follows: a) Traverse the above binary tree to obtain the ordered list of keys and their frequency information <k i , count i , tupleList i > and the tuple count value Nc, the count value K of different keys as input, set the required number of data partitions P; define the partition size P Size = Nc/P, the partition cardinality P k = K/P, the threshold for splitting keys S cut = P Size / P k , and set the current partition b j to be the first partition b 1 ; b) Traverse the keys in the list, when its count i is greater than S cut , put S cut tuples into b j , and put the remaining part into the temporary list RList, and update the position of b j corresponding to the key to Pos(k) = b j ; and set b j = b j%P , j increases by 1, and then repeat step b) until there is no count i greater than S cut ; c) Traverse the remaining keys in List, traverse partition bj and put a key in turn, reverse the partition order after traversing the partitions, and repeat step c); d) Traverse the keys in the temporary list RList, set b = Pos(k), if the key can be fully placed in b, put the key in b, otherwise fill b first, and then put the remaining part into the partition with the smallest remaining capacity that can accommodate it.

步骤S105,通过Map任务基于最差适应算法利用数据块中的键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与buckets的数量的比值确定。Step S105, using the Map task based on the worst fit algorithm and using the information of whether the keys in the data block are split, the key clusters are allocated to the buckets of the Reduce stage for processing; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of the bucket is determined according to the ratio of the number of key clusters to the number of buckets.

步骤S105为处理阶段分区,在平衡负载批分区步骤中,每个数据块拥有键是否被分割的信息,用Map任务处理数据库,Map任务利用键被分割的信息来将键簇分配到Reduce阶段的buckets中处理。其中,Map阶段的输出为由键值对组成的簇,每个键簇拥有相同键的所有数据值,键簇Ck可被表示为Ck={(k,vi)|vi∈k},vi为键k所对应的数据值;假设由给定Map阶段输出的K个键簇需要分配到r个Reducebuckets中。Map任务的输出为I={Ck|k∈K},为保证Reduce阶段的负载平衡,需要保证每个Reducebuckets的分配一致,故设置bucket的容量为处理阶段的分区问题可以简化为装箱问题;将键簇看作是项,Reducebuckets为箱。与批分区问题不同,处理阶段的分区问题为可变容量平衡装箱问题,定义如下:给定拥有M个项的集合,A个箱子a1,a2,…,aA,每个箱子容量为Ci,那么可变容量平衡装箱问题即在满足以下条件的情况下将项分配到不同箱a1,a2,…,aA中;(1)对于任意aj,j∈[1,A],都有箱内的元组数小于箱的容量Cj;(2)对于任意aj,j∈[1,A],都有箱内不同项的个数大于等于M。结合图5,更具体的过程如下:Step S105 is the processing stage partitioning. In the load balancing batch partitioning step, each data block has information about whether the key is split. The Map task is used to process the database. The Map task uses the information that the key is split to allocate key clusters to the buckets of the Reduce stage for processing. Among them, the output of the Map stage is a cluster composed of key-value pairs. Each key cluster has all data values of the same key. The key cluster C k can be expressed as C k = {(k,v i )|v i ∈ k}, where vi is the data value corresponding to key k; assume that the K key clusters output by a given Map stage need to be allocated to r Reduce buckets. The output of the Map task is I = {C k |k∈K}. In order to ensure the load balance of the Reduce stage, it is necessary to ensure that the allocation of each Reduce bucket is consistent, so the capacity of the bucket is set to The partitioning problem in the processing phase can be simplified to a bin packing problem; consider key clusters as items and Reducebuckets as bins. Unlike the batch partitioning problem, the partitioning problem in the processing phase is a variable capacity balanced bin packing problem, which is defined as follows: given a set with M items, A bins a 1 , a 2 , …, a A , each with a capacity C i , then the variable capacity balanced bin packing problem is to distribute items to different bins a 1 , a 2 , …, a A under the following conditions: (1) For any a j , j∈[1,A], the number of tuples in the bin is less than the capacity C j of the bin; (2) For any a j , j∈[1,A], the number of different items in the bin is greater than or equal to M. Combined with Figure 5, the more specific process is as follows:

如图2所示,分区结果会进入到Map任务中进行处理,图5展示了对Map任务中间结果进行分配到Reduce buckets中的详细过程。首先,输入信息为Map任务得到的键簇C,经上述步骤得到的数据分区中包含有键是否被分割过的信息,Reduce阶段所有buckets的集合R,设置Bucksize=|C|/|R|,使用Hash算法分配被分割过的键,使得键簇中仅剩下未被分割的键,并对其进行降序排序。然后遍历键簇中的键,按最差适应算法尽可能的将较大键簇分配到第r个bucket中,并将第r个bucket从R中删除,如果R中没有bucket则重置R为所有bucket,继续遍历键簇中的剩余键。As shown in Figure 2, the partition results will enter the Map task for processing. Figure 5 shows the detailed process of assigning the intermediate results of the Map task to the Reduce buckets. First, the input information is the key cluster C obtained by the Map task. The data partition obtained through the above steps contains information on whether the key has been split. The set R of all buckets in the Reduce stage sets Bucksize = |C|/|R|. Use the Hash algorithm to assign the split keys so that only the unsplit keys remain in the key cluster, and sort them in descending order. Then traverse the keys in the key cluster, and assign the larger key cluster to the rth bucket as much as possible according to the worst fit algorithm, and delete the rth bucket from R. If there is no bucket in R, reset R to all buckets, and continue to traverse the remaining keys in the key cluster.

上述微批流处理系统中的数据分区方法,通过频率感知缓冲技术来使得批分区前准备工作所需时间最小化,遍历平衡二叉树可以得到一个键及其频率相关信息的有序列表,减少了处理阶段的排序时间,在批分区阶段通过将问题抽象为经典装箱问题,限制了键的碎片化程度,使得数据块之间的基数差异最小化,并保持各数据块大小相等,实现了对数据分区的负载平衡,在处理阶段把问题抽象为可变容量装箱问题,使用最差适应算法来分配键簇,保证了任务间的负载平衡,可以在不增加延迟的情况下大幅提高数据处理吞吐量。The data partitioning method in the above-mentioned micro-batch stream processing system minimizes the time required for preparation before batch partitioning through frequency-aware buffering technology. Traversing the balanced binary tree can obtain an ordered list of keys and their frequency-related information, reducing the sorting time in the processing stage. In the batch partitioning stage, the problem is abstracted as a classic packing problem, which limits the degree of key fragmentation, minimizes the cardinality difference between data blocks, and keeps the size of each data block equal, thereby achieving load balancing of data partitions. In the processing stage, the problem is abstracted as a variable-capacity packing problem, and the worst-fit algorithm is used to allocate key clusters, which ensures load balancing between tasks and can greatly improve data processing throughput without increasing latency.

在一些实施例中,上述方法还可以包括以下步骤:In some embodiments, the above method may further include the following steps:

记录各批处理时间与批间隔;获取各批处理时间与批间隔的比例;根据预设比例阈值,获取比例满足预设比例阈值的连续批计数;根据连续批计数,调整Map任务和/或Reduce任务。Record each batch processing time and batch interval; obtain the ratio of each batch processing time to the batch interval; according to a preset ratio threshold, obtain the continuous batch count whose ratio meets the preset ratio threshold; according to the continuous batch count, adjust the Map task and/or Reduce task.

本实施例主要资源动态管理,通过设定Map-Reduce任务处理时间的阈值来改变运行时的并行程度,根据工作负载的变化来调整Map-Reduce任务,具体连续根据各批处理时间与两批数据流元组之间的时间间隔之比,调整Map任务和/或Reduce任务。This embodiment mainly manages resources dynamically. It changes the degree of parallelism at runtime by setting a threshold for Map-Reduce task processing time, and adjusts Map-Reduce tasks according to changes in workload. Specifically, it continuously adjusts Map tasks and/or Reduce tasks based on the ratio of each batch processing time to the time interval between two batches of data stream tuples.

在其中一些实施例中,预设比例阈值包括第一比例阈值;上述根据预设比例阈值,获取比例满足预设比例阈值的连续批计数,包括:获取比例大于第一比例阈值的连续批计数。In some embodiments, the preset ratio threshold includes a first ratio threshold; the above-mentioned obtaining the continuous batch count whose ratio satisfies the preset ratio threshold according to the preset ratio threshold includes: obtaining the continuous batch count whose ratio is greater than the first ratio threshold.

进一步的,上述根据连续批计数,调整Map任务和/或Reduce任务,具体包括:Furthermore, the above adjustment of Map tasks and/or Reduce tasks according to the continuous batch count specifically includes:

当第一连续批计数达到预设计数阈值时,在数据率增加的情况下增加Map任务,在数据分布增加的情况下增加Reduce任务;其中,第一连续批计数为比例大于第一比例阈值的连续批计数。When the first continuous batch count reaches a preset count threshold, Map tasks are added when the data rate increases, and Reduce tasks are added when the data distribution increases; wherein the first continuous batch count is a continuous batch count whose ratio is greater than a first ratio threshold.

在另外一些实施例中,预设比例阈值包括第二比例阈值;上述根据预设比例阈值,获取比例满足预设比例阈值的连续批计数,包括:获取比例小于第二比例阈值的连续批计数。In some other embodiments, the preset ratio threshold includes a second ratio threshold; the above-mentioned obtaining the continuous batch count whose ratio meets the preset ratio threshold according to the preset ratio threshold includes: obtaining the continuous batch count whose ratio is less than the second ratio threshold.

进一步的,上述根据连续批计数,调整Map任务和/或Reduce任务,具体包括:Furthermore, the above adjustment of Map tasks and/or Reduce tasks according to the continuous batch count specifically includes:

当第二连续批计数达到预设计数阈值时,在数据率减少的情况下减少Map任务,在数据分布减少的情况下减少Reduce任务;其中,第二连续批计数为比例小于第二比例阈值的连续批计数。When the second continuous batch count reaches a preset count threshold, the Map task is reduced when the data rate decreases, and the Reduce task is reduced when the data distribution decreases; wherein the second continuous batch count is a continuous batch count whose ratio is less than the second ratio threshold.

上述实施例中,当各批处理时间与两批数据流元组之间的时间间隔之比,在连续几个批次均超过或低于设定阈值时,即触发对Map-Reduce任务的调整。具体如下:In the above embodiment, when the ratio of the processing time of each batch to the time interval between two batches of data stream tuples exceeds or falls below the set threshold for several consecutive batches, the adjustment of the Map-Reduce task is triggered. The details are as follows:

用Statsd来记录前d个(预设计数阈值)批次的处理时间与批间隔之比,以及数据率和数据分布等状态信息,定义批处理时间与批间隔的比例每个批都将该比例及数据率和数据分布情况加入到Statsd中。设定第一比例阈值为thres1,采用count表示Wi>thres1的连续若干个批计数即第一连续批计数,若出现Wi<thres1,则可以将第一连续批计数置零重新计数。当第一连续批计数等于d,即连续d个批的Wi大于预设计数阈值时,若数据率增加则增加相应的Map任务,若数据分布增加则增加Reduce任务;同样的,设第二比例阈值为thres2,当连续d个批次Wi<thres2时根据数据率和数据分布的变化情况减少相应的任务,即若数据率减少则减少Map任务,若数据分布减少则减少Reduce任务。Use Stats d to record the ratio of the processing time of the first d batches (preset count threshold) to the batch interval, as well as status information such as data rate and data distribution, and define the ratio of batch processing time to batch interval Each batch adds the ratio, data rate and data distribution to Stats d . Set the first ratio threshold to thres 1 , and use count to represent the count of several consecutive batches with Wi > thres 1 , that is, the first consecutive batch count. If Wi < thres 1 occurs, the first consecutive batch count can be reset to zero and recounted. When the first consecutive batch count is equal to d, that is, Wi for d consecutive batches is greater than the preset count threshold, if the data rate increases, the corresponding Map task is added, and if the data distribution increases, the Reduce task is added; similarly, set the second ratio threshold to thres 2 , when Wi < thres 2 for d consecutive batches, the corresponding tasks are reduced according to the changes in the data rate and data distribution, that is, if the data rate decreases, the Map task is reduced, and if the data distribution decreases, the Reduce task is reduced.

上述实施例采用动态资源管理技术实现负载动态调整,调整运行时的并行程度,使得该方法对数据分布和到达率的波动具有鲁棒性,并可以在不增加延迟的情况下大幅提高数据处理吞吐量。The above embodiment uses dynamic resource management technology to achieve dynamic load adjustment and adjust the degree of parallelism during runtime, so that the method is robust to fluctuations in data distribution and arrival rate, and can significantly improve data processing throughput without increasing latency.

应该理解的是,虽然如上所述的各实施例所涉及的流程图中的各个步骤按照箭头的指示依次显示,但是这些步骤并不是必然按照箭头指示的顺序依次执行。除非本文中有明确的说明,这些步骤的执行并没有严格的顺序限制,这些步骤可以以其它的顺序执行。而且,如上所述的各实施例所涉及的流程图中的至少一部分步骤可以包括多个步骤或者多个阶段,这些步骤或者阶段并不必然是在同一时刻执行完成,而是可以在不同的时刻执行,这些步骤或者阶段的执行顺序也不必然是依次进行,而是可以与其它步骤或者其它步骤中的步骤或者阶段的至少一部分轮流或者交替地执行。It should be understood that, although the various steps in the flowcharts involved in the above-mentioned embodiments are displayed in sequence according to the indication of the arrows, these steps are not necessarily executed in sequence according to the order indicated by the arrows. Unless there is a clear explanation in this article, the execution of these steps does not have a strict order restriction, and these steps can be executed in other orders. Moreover, at least a part of the steps in the flowcharts involved in the above-mentioned embodiments can include multiple steps or multiple stages, and these steps or stages are not necessarily executed at the same time, but can be executed at different times, and the execution order of these steps or stages is not necessarily carried out in sequence, but can be executed in turn or alternately with other steps or at least a part of the steps or stages in other steps.

基于同样的发明构思,本申请实施例还提供了一种用于实现上述所涉及的微批流处理系统中的数据分区方法的微批流处理系统中的数据分区装置。该装置所提供的解决问题的实现方案与上述方法中所记载的实现方案相似,故下面所提供的一个或多个微批流处理系统中的数据分区装置实施例中的具体限定可以参见上文中对于微批流处理系统中的数据分区方法的限定,在此不再赘述。Based on the same inventive concept, the embodiment of the present application also provides a data partitioning device in a micro-batch stream processing system for implementing the data partitioning method in the micro-batch stream processing system involved above. The implementation scheme for solving the problem provided by the device is similar to the implementation scheme recorded in the above method, so the specific limitations in the embodiments of the data partitioning device in one or more micro-batch stream processing systems provided below can refer to the limitations of the data partitioning method in the micro-batch stream processing system above, and will not be repeated here.

在一个实施例中,如图6所示,提供了一种微批流处理系统中的数据分区装置,该装置600可以包括:In one embodiment, as shown in FIG6 , a data partitioning device in a micro-batch stream processing system is provided, and the device 600 may include:

获取模块601,用于获取数据流元组;An acquisition module 601 is used to acquire a data stream tuple;

维护模块602,用于基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;A maintenance module 602 is used to maintain the data stream tuple based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree;

生成模块603,用于遍历所述平衡二叉搜索树,生成所述有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;A generating module 603 is used to traverse the balanced binary search tree to generate the ordered list; wherein the ordered list includes the key, the frequency count of the key and the tuple list corresponding to the key;

分区模块604,用于基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;A partitioning module 604 is used to partition the data stream tuples in the ordered list by batches based on a preset partitioning condition; wherein each partition is a data block, and each data block stores information on whether the key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning condition includes: limiting the number of splits of a single item, minimizing the number of different single items in a data block, and maintaining equal capacity of each data block;

分配处理模块605,用于通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定。The allocation processing module 605 is used to allocate key clusters to the buckets of the Reduce stage for processing based on the worst fit algorithm using the information of whether the key in the data block is split through the Map task; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of the bucket is determined according to the ratio of the number of key clusters to the number of buckets.

在一个实施例中,该装置600还可以包括:In one embodiment, the apparatus 600 may further include:

任务调整模块,用于记录各批处理时间与批间隔;获取所述各批处理时间与所述批间隔的比例;根据预设比例阈值,获取所述比例满足所述预设比例阈值的连续批计数;根据所述连续批计数,调整Map任务和/或Reduce任务。The task adjustment module is used to record each batch processing time and batch interval; obtain the ratio of each batch processing time to the batch interval; according to a preset ratio threshold, obtain the continuous batch count whose ratio meets the preset ratio threshold; according to the continuous batch count, adjust the Map task and/or Reduce task.

在一个实施例中,所述预设比例阈值包括第一比例阈值;任务调整模块,用于获取所述比例大于所述第一比例阈值的连续批计数。In one embodiment, the preset ratio threshold comprises a first ratio threshold; and the task adjustment module is used to obtain a count of consecutive batches whose ratio is greater than the first ratio threshold.

在一个实施例中,任务调整模块,用于当第一连续批计数达到预设计数阈值时,在数据率增加的情况下增加Map任务,在数据分布增加的情况下增加Reduce任务;其中,所述第一连续批计数为所述比例大于所述第一比例阈值的连续批计数。In one embodiment, a task adjustment module is used to increase Map tasks when the data rate increases, and to increase Reduce tasks when the data distribution increases, when the first continuous batch count reaches a preset count threshold; wherein the first continuous batch count is a continuous batch count whose ratio is greater than the first ratio threshold.

在一个实施例中,所述预设比例阈值包括第二比例阈值;任务调整模块,用于获取所述比例小于所述第二比例阈值的连续批计数。In one embodiment, the preset ratio threshold includes a second ratio threshold; and the task adjustment module is used to obtain a continuous batch count in which the ratio is less than the second ratio threshold.

在一个实施例中,任务调整模块,用于当第二连续批计数达到预设计数阈值时,在数据率减少的情况下减少Map任务,在数据分布减少的情况下减少Reduce任务;其中,所述第二连续批计数为所述比例小于所述第二比例阈值的连续批计数。In one embodiment, a task adjustment module is used to reduce Map tasks when the data rate decreases, and to reduce Reduce tasks when the data distribution decreases, when the second continuous batch count reaches a preset count threshold; wherein the second continuous batch count is a continuous batch count whose ratio is less than the second ratio threshold.

上述微批流处理系统中的数据分区装置中的各个模块可全部或部分通过软件、硬件及其组合来实现。上述各模块可以硬件形式内嵌于或独立于计算机设备中的处理器中,也可以以软件形式存储于计算机设备中的存储器中,以便于处理器调用执行以上各个模块对应的操作。Each module in the data partitioning device in the micro-batch stream processing system can be implemented in whole or in part by software, hardware, or a combination thereof. Each module can be embedded in or independent of a processor in a computer device in the form of hardware, or can be stored in a memory in a computer device in the form of software, so that the processor can call and execute operations corresponding to each module.

在一个实施例中,提供了一种计算机设备,该计算机设备可以是服务器,其内部结构图可以如图7所示。该计算机设备包括通过系统总线连接的处理器、存储器和网络接口。其中,该计算机设备的处理器用于提供计算和控制能力。该计算机设备的存储器包括非易失性存储介质和内存储器。该非易失性存储介质存储有操作系统、计算机程序和数据库。该内存储器为非易失性存储介质中的操作系统和计算机程序的运行提供环境。该计算机设备的数据库用于存储数据流元组等数据。该计算机设备的网络接口用于与外部的终端通过网络连接通信。该计算机程序被处理器执行时以实现一种微批流处理系统中的数据分区方法。In one embodiment, a computer device is provided, which may be a server, and its internal structure diagram may be shown in FIG7. The computer device includes a processor, a memory, and a network interface connected via a system bus. The processor of the computer device is used to provide computing and control capabilities. The memory of the computer device includes a non-volatile storage medium and an internal memory. The non-volatile storage medium stores an operating system, a computer program, and a database. The internal memory provides an environment for the operation of the operating system and the computer program in the non-volatile storage medium. The database of the computer device is used to store data such as data stream tuples. The network interface of the computer device is used to communicate with an external terminal via a network connection. When the computer program is executed by the processor, a data partitioning method in a micro-batch stream processing system is implemented.

本领域技术人员可以理解,图7中示出的结构,仅仅是与本申请方案相关的部分结构的框图,并不构成对本申请方案所应用于其上的计算机设备的限定,具体的计算机设备可以包括比图中所示更多或更少的部件,或者组合某些部件,或者具有不同的部件布置。Those skilled in the art will understand that the structure shown in FIG. 7 is merely a block diagram of a partial structure related to the solution of the present application, and does not constitute a limitation on the computer device to which the solution of the present application is applied. The specific computer device may include more or fewer components than shown in the figure, or combine certain components, or have a different arrangement of components.

在一个实施例中,还提供了一种计算机设备,包括存储器和处理器,存储器中存储有计算机程序,该处理器执行计算机程序时实现上述各方法实施例中的步骤。In one embodiment, a computer device is further provided, including a memory and a processor, wherein a computer program is stored in the memory, and the processor implements the steps in the above method embodiments when executing the computer program.

在一个实施例中,提供了一种计算机可读存储介质,其上存储有计算机程序,该计算机程序被处理器执行时实现上述各方法实施例中的步骤。In one embodiment, a computer-readable storage medium is provided, on which a computer program is stored. When the computer program is executed by a processor, the steps in the above-mentioned method embodiments are implemented.

在一个实施例中,提供了一种计算机程序产品,包括计算机程序,该计算机程序被处理器执行时实现上述各方法实施例中的步骤。In one embodiment, a computer program product is provided, including a computer program, which implements the steps in the above method embodiments when executed by a processor.

本领域普通技术人员可以理解实现上述实施例方法中的全部或部分流程,是可以通过计算机程序来指令相关的硬件来完成,所述的计算机程序可存储于一非易失性计算机可读取存储介质中,该计算机程序在执行时,可包括如上述各方法的实施例的流程。其中,本申请所提供的各实施例中所使用的对存储器、数据库或其它介质的任何引用,均可包括非易失性和易失性存储器中的至少一种。非易失性存储器可包括只读存储器(Read-OnlyMemory,ROM)、磁带、软盘、闪存、光存储器、高密度嵌入式非易失性存储器、阻变存储器(ReRAM)、磁变存储器(Magnetoresistive Random Access Memory,MRAM)、铁电存储器(Ferroelectric Random Access Memory,FRAM)、相变存储器(Phase Change Memory,PCM)、石墨烯存储器等。易失性存储器可包括随机存取存储器(Random Access Memory,RAM)或外部高速缓冲存储器等。作为说明而非局限,RAM可以是多种形式,比如静态随机存取存储器(Static Random Access Memory,SRAM)或动态随机存取存储器(Dynamic RandomAccess Memory,DRAM)等。本申请所提供的各实施例中所涉及的数据库可包括关系型数据库和非关系型数据库中至少一种。非关系型数据库可包括基于区块链的分布式数据库等,不限于此。本申请所提供的各实施例中所涉及的处理器可为通用处理器、中央处理器、图形处理器、数字信号处理器、可编程逻辑器、基于量子计算的数据处理逻辑器等,不限于此。Those of ordinary skill in the art can understand that all or part of the processes in the above-mentioned embodiment methods can be completed by instructing the relevant hardware through a computer program, and the computer program can be stored in a non-volatile computer-readable storage medium. When the computer program is executed, it can include the processes of the embodiments of the above-mentioned methods. Among them, any reference to the memory, database or other medium used in the embodiments provided in the present application can include at least one of non-volatile and volatile memory. Non-volatile memory can include read-only memory (ROM), magnetic tape, floppy disk, flash memory, optical memory, high-density embedded non-volatile memory, resistive random access memory (ReRAM), magnetoresistive random access memory (MRAM), ferroelectric random access memory (FRAM), phase change memory (PCM), graphene memory, etc. Volatile memory can include random access memory (RAM) or external cache memory, etc. As an illustration and not limitation, RAM can be in various forms, such as static random access memory (SRAM) or dynamic random access memory (DRAM). The database involved in each embodiment provided in this application may include at least one of a relational database and a non-relational database. Non-relational databases may include distributed databases based on blockchains, etc., but are not limited to this. The processor involved in each embodiment provided in this application may be a general-purpose processor, a central processing unit, a graphics processor, a digital signal processor, a programmable logic device, a data processing logic device based on quantum computing, etc., but are not limited to this.

需要说明的是,本申请所涉及的用户信息(包括但不限于用户设备信息、用户个人信息等)和数据(包括但不限于用于分析的数据、存储的数据、展示的数据等),均为经用户授权或者经过各方充分授权的信息和数据。It should be noted that the user information (including but not limited to user device information, user personal information, etc.) and data (including but not limited to data used for analysis, stored data, displayed data, etc.) involved in this application are all information and data authorized by the user or fully authorized by all parties.

以上实施例的各技术特征可以进行任意的组合,为使描述简洁,未对上述实施例中的各个技术特征所有可能的组合都进行描述,然而,只要这些技术特征的组合不存在矛盾,都应当认为是本说明书记载的范围。The technical features of the above embodiments may be arbitrarily combined. To make the description concise, not all possible combinations of the technical features in the above embodiments are described. However, as long as there is no contradiction in the combination of these technical features, they should be considered to be within the scope of this specification.

以上所述实施例仅表达了本申请的几种实施方式,其描述较为具体和详细,但并不能因此而理解为对本申请专利范围的限制。应当指出的是,对于本领域的普通技术人员来说,在不脱离本申请构思的前提下,还可以做出若干变形和改进,这些都属于本申请的保护范围。因此,本申请的保护范围应以所附权利要求为准。The above-described embodiments only express several implementation methods of the present application, and the descriptions thereof are relatively specific and detailed, but they cannot be understood as limiting the scope of the present application. It should be pointed out that, for a person of ordinary skill in the art, several variations and improvements can be made without departing from the concept of the present application, and these all belong to the protection scope of the present application. Therefore, the protection scope of the present application shall be subject to the attached claims.

Claims (9)

1.一种微批流处理系统中的数据分区方法,其特征在于,所述方法包括:1. A data partitioning method in a micro-batch stream processing system, characterized in that the method comprises: 获取数据流元组;Get data stream tuple; 基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;The data stream tuple is maintained based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree; 遍历所述平衡二叉搜索树,生成有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;Traversing the balanced binary search tree to generate an ordered list; wherein the ordered list includes the key, the frequency count of the key and the tuple list corresponding to the key; 基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;其中,将批分区问题定义为可拆分项目的平衡装箱问题,给定拥有N个不同项的集合:k1,k2…,kN,每项大小为Sn,其中1≤n≤N,给定B={b1,b2,…,bB}个箱子,每个箱子容量为C,可拆分项的平衡装箱问题为将各项在同时满足以下条件的情况下分配到不同箱中:(1)对于任意bj,j∈[1,B],都有箱内的元组数等于箱的容量C;(2)对于任意bj,j∈[1,B],都有箱内不同项的个数大于等于N/B;(3)对于任意项,要求被分割次数尽可能少;装箱时,将频率counti大于数据块大小与数据块基数之比的键对应的数据流元组拆分为两项,其中一项的数据流元组大小等于数据块大小与数据块基数之比,将其放入数据块中,另一项放入一个新的列表中;将排序列表中剩余键按蛇形排列分配给数据块,再按最佳适应算法将新的列表中的键分配给数据块;Based on a preset partitioning condition, the data stream tuples in the ordered list are partitioned into batches; wherein each partition is a data block, and each data block stores information on whether a key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning condition includes: limiting the number of split times of a single item, minimizing the number of different single items in a data block, and maintaining the capacity of each data block equal; wherein the batch partitioning problem is defined as a balanced bin packing problem for splittable items, given a set of N different items: k 1 , k 2 …, k N , each item is of size Sn , where 1≤n≤N, given B={b 1 ,b 2 ,…,b B } boxes, each box has a capacity of C, the balanced bin packing problem for splittable items is to allocate each item to different boxes while satisfying the following conditions: (1) for any b j , j∈[1,B], the number of tuples in the box is equal to the capacity C of the box; (2) for any b j , j∈[1,B], the number of different items in the box is greater than or equal to N/B; (3) For any item, it is required to be split as few times as possible; when packing, split the data stream tuple corresponding to the key whose frequency count i is greater than the ratio of the data block size to the data block cardinality into two items, one of which has a data stream tuple size equal to the ratio of the data block size to the data block cardinality, put it into the data block, and the other item is put into a new list; assign the remaining keys in the sorted list to the data blocks in a serpentine arrangement, and then assign the keys in the new list to the data blocks according to the best fit algorithm; 通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定;其中,Map任务利用键被分割的信息将键簇分配到Reduce阶段的buckets中处理;其中,Map阶段的输出为由键值对组成的簇,每个键簇拥有相同键的所有数据值,键簇Ck被表示为Ck={(k,vi)|vi∈k},vi为键k所对应的数据值;当由给定Map阶段输出的K个键簇需要分配到r个Reducebuckets中,Map任务的输出为I={Ck|k∈K},设置bucket的容量为处理阶段的分区问题简化为装箱问题,将键簇看作是项,Reducebuckets为箱;给定拥有M个项的集合,A个箱子a1,a2,…,aA,每个箱子容量为Ci,可变容量平衡装箱问题即在满足以下条件的情况下将项分配到不同箱a1,a2,…,aA中:(1)对于任意aj,j∈[1,A],都有箱内的元组数小于箱的容量Cj;(2)对于任意aj,j∈[1,A],都有箱内不同项的个数大于等于M。The Map task uses the information of whether the key in the data block is split to allocate key clusters to the buckets of the Reduce stage for processing based on the worst fit algorithm; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of the bucket is determined according to the ratio of the number of key clusters to the number of buckets; wherein the Map task uses the information that the key is split to allocate key clusters to the buckets of the Reduce stage for processing; wherein the output of the Map stage is a cluster composed of key-value pairs, each key cluster has all data values of the same key, and the key cluster C k is represented as C k ={(k,v i )|v i ∈k}, vi is the data value corresponding to key k; when the K key clusters output by a given Map stage need to be allocated to r Reduce buckets, the output of the Map task is I ={C k |k∈K}, and the capacity of the bucket is set to The partitioning problem in the processing stage is simplified to the bin packing problem, where key clusters are regarded as items and Reducebuckets are bins. Given a set with M items and A bins a 1 , a 2 , … , a A , each with capacity C i , the variable-capacity balanced bin packing problem is to distribute items to different bins a 1 , a 2 , … , a A under the following conditions: (1) for any a j , j∈[1,A], the number of tuples in the bin is less than the capacity C j of the bin; (2) for any a j , j∈[1,A], the number of different items in the bin is greater than or equal to M. 2.根据权利要求1所述的方法,其特征在于,所述方法还包括:2. The method according to claim 1, characterized in that the method further comprises: 记录各批处理时间与批间隔;Record the processing time and interval of each batch; 获取所述各批处理时间与所述批间隔的比例;Obtaining the ratio of each batch processing time to the batch interval; 根据预设比例阈值,获取所述比例满足所述预设比例阈值的连续批计数;According to a preset ratio threshold, obtaining a continuous batch count whose ratio satisfies the preset ratio threshold; 根据所述连续批计数,调整Map任务和/或Reduce任务。According to the continuous batch count, the Map task and/or the Reduce task are adjusted. 3.根据权利要求2所述的方法,其特征在于,所述预设比例阈值包括第一比例阈值;所述根据预设比例阈值,获取所述比例满足所述预设比例阈值的连续批计数,包括:3. The method according to claim 2, characterized in that the preset ratio threshold comprises a first ratio threshold; and obtaining the continuous batch count whose ratio satisfies the preset ratio threshold according to the preset ratio threshold comprises: 获取所述比例大于所述第一比例阈值的连续批计数。A count of consecutive batches in which the ratio is greater than the first ratio threshold is obtained. 4.根据权利要求3所述的方法,其特征在于,所述根据所述连续批计数,调整Map任务和/或Reduce任务,包括:4. The method according to claim 3, characterized in that adjusting the Map task and/or Reduce task according to the continuous batch count comprises: 当第一连续批计数达到预设计数阈值时,在数据率增加的情况下增加Map任务,在数据分布增加的情况下增加Reduce任务;其中,所述第一连续批计数为所述比例大于所述第一比例阈值的连续批计数。When the first continuous batch count reaches a preset count threshold, Map tasks are added when the data rate increases, and Reduce tasks are added when the data distribution increases; wherein the first continuous batch count is a continuous batch count whose ratio is greater than the first ratio threshold. 5.根据权利要求2所述的方法,其特征在于,所述预设比例阈值包括第二比例阈值;所述根据预设比例阈值,获取所述比例满足所述预设比例阈值的连续批计数,包括:5. The method according to claim 2, characterized in that the preset ratio threshold includes a second ratio threshold; and obtaining the continuous batch count whose ratio satisfies the preset ratio threshold according to the preset ratio threshold comprises: 获取所述比例小于所述第二比例阈值的连续批计数。A count of consecutive batches in which the ratio is less than the second ratio threshold is obtained. 6.根据权利要求5所述的方法,其特征在于,所述根据所述连续批计数,调整Map任务和/或Reduce任务,包括:6. The method according to claim 5, characterized in that adjusting the Map task and/or Reduce task according to the continuous batch count comprises: 当第二连续批计数达到预设计数阈值时,在数据率减少的情况下减少Map任务,在数据分布减少的情况下减少Reduce任务;其中,所述第二连续批计数为所述比例小于所述第二比例阈值的连续批计数。When the second continuous batch count reaches a preset count threshold, the Map task is reduced when the data rate decreases, and the Reduce task is reduced when the data distribution decreases; wherein the second continuous batch count is a continuous batch count whose ratio is less than the second ratio threshold. 7.一种微批流处理系统中的动态数据分区装置,其特征在于,所述装置包括:7. A dynamic data partitioning device in a micro-batch stream processing system, characterized in that the device comprises: 获取模块,用于获取数据流元组;The acquisition module is used to obtain the data stream tuple; 维护模块,用于基于哈希表和平衡二叉搜索树维护所述数据流元组;其中,所述哈希表存储所述数据流元组的键、指向所述键对应的元组列表的第一指针及所述键的频率计数;所述键的频率计数还保存至所述平衡二叉搜索树;所述哈希表中每一个键均拥有指向所述平衡二叉搜索树中相应频率计数节点的第二指针;A maintenance module, for maintaining the data stream tuple based on a hash table and a balanced binary search tree; wherein the hash table stores the key of the data stream tuple, a first pointer pointing to a tuple list corresponding to the key, and a frequency count of the key; the frequency count of the key is also saved to the balanced binary search tree; each key in the hash table has a second pointer pointing to a corresponding frequency count node in the balanced binary search tree; 生成模块,用于遍历所述平衡二叉搜索树,生成有序列表;其中,所述有序列表包含所述键、所述键的频率计数及所述键对应的元组列表;A generating module, configured to traverse the balanced binary search tree and generate an ordered list; wherein the ordered list comprises the key, the frequency count of the key and a tuple list corresponding to the key; 分区模块,用于基于预设分区条件,将所述有序列表中的数据流元组按批分区;其中,每个分区为一个数据块,每个数据块中存储有键是否被分割的信息;所有共享相同键值的数据流元组被建模为一个单项,所述预设分区条件包括:限制单项的拆分次数、最小化数据块中不同单项的数目和维持各数据块的容量相等;其中,将批分区问题定义为可拆分项目的平衡装箱问题,给定拥有N个不同项的集合:k1,k2…,kN,每项大小为Sn,其中1≤n≤N,给定B={b1,b2,…,bB}个箱子,每个箱子容量为C,可拆分项的平衡装箱问题为将各项在同时满足以下条件的情况下分配到不同箱中:(1)对于任意bj,j∈[1,B],都有箱内的元组数等于箱的容量C;(2)对于任意bj,j∈[1,B],都有箱内不同项的个数大于等于N/B;(3)对于任意项,要求被分割次数尽可能少;装箱时,将频率counti大于数据块大小与数据块基数之比的键对应的数据流元组拆分为两项,其中一项的数据流元组大小等于数据块大小与数据块基数之比,将其放入数据块中,另一项放入一个新的列表中;将排序列表中剩余键按蛇形排列分配给数据块,再按最佳适应算法将新的列表中的键分配给数据块;A partitioning module is used to partition the data stream tuples in the ordered list into batches based on a preset partitioning condition; wherein each partition is a data block, and each data block stores information on whether a key is split; all data stream tuples sharing the same key value are modeled as a single item, and the preset partitioning condition includes: limiting the number of split times of a single item, minimizing the number of different single items in a data block, and maintaining the capacity of each data block equal; wherein the batch partitioning problem is defined as a balanced packing problem of splittable items, given a set of N different items: k 1 , k 2 , …, k N , each item has a size of Sn , where 1≤n≤N, given B={b 1 ,b 2 ,…,b B } boxes, each box has a capacity of C, the balanced packing problem of splittable items is to distribute each item to different boxes while satisfying the following conditions: (1) for any b j , j∈[1,B], the number of tuples in the box is equal to the capacity C of the box; (2) for any b j , j∈[1,B], the number of different items in the box is greater than or equal to N/B; (3) For any item, it is required to be split as few times as possible; when packing, split the data stream tuple corresponding to the key whose frequency count i is greater than the ratio of the data block size to the data block cardinality into two items, one of which has a data stream tuple size equal to the ratio of the data block size to the data block cardinality, put it into the data block, and the other item is put into a new list; assign the remaining keys in the sorted list to the data blocks in a serpentine arrangement, and then assign the keys in the new list to the data blocks according to the best fit algorithm; 分配处理模块,用于通过Map任务基于最差适应算法利用所述数据块中的所述键是否被分割的信息将键簇分配至Reduce阶段的buckets中处理;其中,所述Map阶段的输出为由键值组成的簇,每个键簇拥有相同键的所有数据值;bucket的容量根据键簇的数量与所述buckets的数量的比值确定;其中,Map任务利用键被分割的信息将键簇分配到Reduce阶段的buckets中处理;其中,Map阶段的输出为由键值对组成的簇,每个键簇拥有相同键的所有数据值,键簇Ck被表示为Ck={(k,vi)|vi∈k},vi为键k所对应的数据值;当由给定Map阶段输出的K个键簇需要分配到r个Reducebuckets中,Map任务的输出为I={Ck|k∈K},设置bucket的容量为处理阶段的分区问题简化为装箱问题,将键簇看作是项,Reduce buckets为箱;给定拥有M个项的集合,A个箱子a1,a2,…,aA,每个箱子容量为Ci,可变容量平衡装箱问题即在满足以下条件的情况下将项分配到不同箱a1,a2,…,aA中:(1)对于任意aj,j∈[1,A],都有箱内的元组数小于箱的容量Cj;(2)对于任意aj,j∈[1,A],都有箱内不同项的个数大于等于M。An allocation processing module is used to allocate key clusters to the buckets of the Reduce stage for processing by using the information of whether the key in the data block is split based on the worst fit algorithm through the Map task; wherein the output of the Map stage is a cluster composed of key values, each key cluster has all data values of the same key; the capacity of the bucket is determined according to the ratio of the number of key clusters to the number of buckets; wherein the Map task allocates key clusters to the buckets of the Reduce stage for processing by using the information that the key is split; wherein the output of the Map stage is a cluster composed of key-value pairs, each key cluster has all data values of the same key, and the key cluster C k is represented as C k ={(k,v i )|v i ∈k}, vi is the data value corresponding to key k; when K key clusters output by a given Map stage need to be allocated to r Reduce buckets, the output of the Map task is I ={C k |k∈K}, and the capacity of the bucket is set to The partitioning problem in the processing stage is simplified to the bin packing problem, where key clusters are regarded as items and Reduce buckets are bins. Given a set with M items and A bins a 1 , a 2 , … , a A , each with a capacity C i , the variable-capacity balanced bin packing problem is to distribute items to different bins a 1 , a 2 , … , a A under the following conditions: (1) for any a j , j∈[1,A], the number of tuples in the bin is less than the capacity of the bin C j ; (2) for any a j , j∈[1,A], the number of different items in the bin is greater than or equal to M. 8.一种计算机设备,包括存储器和处理器,所述存储器存储有计算机程序,其特征在于,所述处理器执行所述计算机程序时实现权利要求1至6中任一项所述的方法的步骤。8. A computer device, comprising a memory and a processor, wherein the memory stores a computer program, wherein the processor implements the steps of the method according to any one of claims 1 to 6 when executing the computer program. 9.一种计算机可读存储介质,其上存储有计算机程序,其特征在于,所述计算机程序被处理器执行时实现权利要求1至6中任一项所述的方法的步骤。9. A computer-readable storage medium having a computer program stored thereon, wherein when the computer program is executed by a processor, the steps of the method according to any one of claims 1 to 6 are implemented.
CN202210339704.6A 2022-04-01 2022-04-01 Data partitioning method, device, equipment and medium in micro batch flow processing system Active CN114780541B (en)

Priority Applications (1)

Application Number Priority Date Filing Date Title
CN202210339704.6A CN114780541B (en) 2022-04-01 2022-04-01 Data partitioning method, device, equipment and medium in micro batch flow processing system

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202210339704.6A CN114780541B (en) 2022-04-01 2022-04-01 Data partitioning method, device, equipment and medium in micro batch flow processing system

Publications (2)

Publication Number Publication Date
CN114780541A CN114780541A (en) 2022-07-22
CN114780541B true CN114780541B (en) 2024-04-12

Family

ID=82426739

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202210339704.6A Active CN114780541B (en) 2022-04-01 2022-04-01 Data partitioning method, device, equipment and medium in micro batch flow processing system

Country Status (1)

Country Link
CN (1) CN114780541B (en)

Citations (8)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN104462582A (en) * 2014-12-30 2015-03-25 武汉大学 Web data similarity detection method based on two-stage filtration of structure and content
WO2017031961A1 (en) * 2015-08-24 2017-03-02 华为技术有限公司 Data processing method and apparatus
US9613127B1 (en) * 2014-06-30 2017-04-04 Quantcast Corporation Automated load-balancing of partitions in arbitrarily imbalanced distributed mapreduce computations
CN108595268A (en) * 2018-04-24 2018-09-28 咪咕文化科技有限公司 Data distribution method and device based on MapReduce and computer-readable storage medium
CN109325034A (en) * 2018-10-12 2019-02-12 平安科技(深圳)有限公司 Data processing method, device, computer equipment and storage medium
CN110955732A (en) * 2019-12-16 2020-04-03 湖南大学 Method and system for realizing partition load balance in Spark environment
CN111858607A (en) * 2020-07-24 2020-10-30 北京金山云网络技术有限公司 Data processing method and device, electronic equipment and computer readable medium
CN113468178A (en) * 2021-07-07 2021-10-01 武汉达梦数据库股份有限公司 Data partition loading method and device of association table

Patent Citations (8)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
US9613127B1 (en) * 2014-06-30 2017-04-04 Quantcast Corporation Automated load-balancing of partitions in arbitrarily imbalanced distributed mapreduce computations
CN104462582A (en) * 2014-12-30 2015-03-25 武汉大学 Web data similarity detection method based on two-stage filtration of structure and content
WO2017031961A1 (en) * 2015-08-24 2017-03-02 华为技术有限公司 Data processing method and apparatus
CN108595268A (en) * 2018-04-24 2018-09-28 咪咕文化科技有限公司 Data distribution method and device based on MapReduce and computer-readable storage medium
CN109325034A (en) * 2018-10-12 2019-02-12 平安科技(深圳)有限公司 Data processing method, device, computer equipment and storage medium
CN110955732A (en) * 2019-12-16 2020-04-03 湖南大学 Method and system for realizing partition load balance in Spark environment
CN111858607A (en) * 2020-07-24 2020-10-30 北京金山云网络技术有限公司 Data processing method and device, electronic equipment and computer readable medium
CN113468178A (en) * 2021-07-07 2021-10-01 武汉达梦数据库股份有限公司 Data partition loading method and device of association table

Non-Patent Citations (3)

* Cited by examiner, † Cited by third party
Title
Locality based data partitioning in Map reduce;S. Ancy等;2016 International Conference on Electrical, Electronics, and Optimization Techniques;第4869-4874页 *
基于MapReduce的大数据处理算法综述;门威;;濮阳职业技术学院学报(05);第91-94页 *
基于索引偏移的MapReduce聚类负载均衡策略;周华平;刘光宗;张贝贝;;计算机科学(05);第310-316页 *

Also Published As

Publication number Publication date
CN114780541A (en) 2022-07-22

Similar Documents

Publication Publication Date Title
JP6025149B2 (en) System and method for managing data
Yahya et al. An efficient implementation of Apriori algorithm based on Hadoop-Mapreduce model
CN110347651B (en) Cloud storage-based data synchronization method, device, equipment and storage medium
US10394782B2 (en) Chord distributed hash table-based map-reduce system and method
US9389913B2 (en) Resource assignment for jobs in a system having a processing pipeline that satisfies a data freshness query constraint
US11836132B2 (en) Managing persistent database result sets
CN106570113B (en) Mass vector slice data cloud storage method and system
CN115576924B (en) A method for data migration
CN112947860A (en) Hierarchical storage and scheduling method of distributed data copies
CN118656198A (en) Data processing method, device, electronic device and storage medium
CN115618050A (en) Video data storage, analysis method, device, system, communication equipment and storage medium
CN114780541B (en) Data partitioning method, device, equipment and medium in micro batch flow processing system
CN118838877A (en) Distributed data equalization method, device, equipment and storage medium
CN106933882B (en) Big data increment calculation method and device
CN111897784A (en) A near-data computing cluster system for key-value storage
CN117370462A (en) Data synchronization method
CN106227465A (en) A kind of data placement method of ring structure
CN116795790A (en) Method and device for merging small files, electronic equipment and storage medium
EP4113313A1 (en) Control method, information processing device, and control program
Li Dynamic load balancing method for urban surveillance video big data storage based on HDFS
CN119576250B (en) Data stream writing control method, system, device, equipment and readable storage medium
Mohanty et al. Small files consolidation technique in Hadoop cluster
US20250103204A1 (en) Workload-aware memory reclamation on graph databases
Bhardwah et al. Greedy based privacy preserving in Data Mining using NoSQL
CN114706687B (en) Computational task allocation method, device, computer equipment and storage medium

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant
点击 这是indexloc提供的php浏览器服务,不要输入任何密码和下载