CN110442594A - A kind of Dynamic Execution method towards Spark SQL Aggregation Operators - Google Patents
A kind of Dynamic Execution method towards Spark SQL Aggregation Operators Download PDFInfo
- Publication number
- CN110442594A CN110442594A CN201910650455.0A CN201910650455A CN110442594A CN 110442594 A CN110442594 A CN 110442594A CN 201910650455 A CN201910650455 A CN 201910650455A CN 110442594 A CN110442594 A CN 110442594A
- Authority
- CN
- China
- Prior art keywords
- data
- partition
- tasks
- task
- stealing
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/242—Query formulation
- G06F16/2433—Query languages
- G06F16/244—Grouping and aggregation
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/245—Query processing
- G06F16/2455—Query execution
- G06F16/24552—Database cache management
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Physics & Mathematics (AREA)
- Computational Linguistics (AREA)
- Data Mining & Analysis (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Mathematical Physics (AREA)
- Memory System Of A Hierarchy Structure (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本发明公开了一种面向Spark SQL聚集算子的动态执行方法,它实现于Spark系统之上,旨在充分利用空闲执行任务的闲置资源,提高系统在数据分布倾斜状态下的聚集算子执行性能。其技术方案可概括为:根据执行任务所负责处理数据分区的大小,增加两类新的执行任务类型,即分段任务和偷取任务,将负责处理大数据分区的任务标记为分段任务,将负责处理小数据分区的任务标记为偷取任务;分段任务以段为逻辑单位处理数据,而偷取任务则在处理完被分配的小数据分区后,主动从分段任务中偷取数据进行处理。所述的动态执行方法平衡了数据倾斜状态下所有执行任务之间的工作负载,在整体上提升了聚集算子的处理性能,适用于Spark SQL聚集算子的执行。The invention discloses a dynamic execution method for Spark SQL aggregation operators, which is implemented on the Spark system, and aims to make full use of idle resources for idle execution tasks, and improve the execution performance of aggregation operators in the system in the state of data distribution inclination . Its technical solution can be summarized as follows: according to the size of the data partition that the execution task is responsible for processing, two new types of execution tasks are added, namely segment tasks and stealing tasks, and tasks responsible for processing large data partitions are marked as segment tasks. Mark the tasks responsible for processing small data partitions as stealing tasks; segment tasks process data with segments as logical units, and steal tasks actively steal data from segment tasks after processing the allocated small data partitions to process. The dynamic execution method balances the workload among all execution tasks in the state of data skew, improves the processing performance of the aggregation operator as a whole, and is suitable for the execution of the Spark SQL aggregation operator.
Description
技术领域technical field
本发明属于计算机技术领域,具体而言,涉及一种面向Spark SQL聚集算子的动态执行方法。The invention belongs to the technical field of computers, and in particular relates to a dynamic execution method for Spark SQL aggregation operators.
背景技术Background technique
随着硬件技术的不断发展,它导致了内存价格持续的下降,同时内存带宽和内存容量却在不断的提升,以上的因素让以Spark为代表的内存计算系统逐渐成为了如今主流的大数据处理平台。Spark为处理结构化和半结构化数据提供了一个名为Spark SQL的库,聚集算子是其中出现频率最高、开销最大的数据处理算子之一。Spark SQL对聚集算子的实现采用了两阶段数据聚合实现方式,在第一阶段中,每个节点对本地的数据根据主键执行一次聚合操作,第二阶段,主键相同的数据被传输到同一节点上,传输之后,各个节点再对本地数据执行一次聚合操作,它们在Spark中分别对应两个阶段(Stage)。With the continuous development of hardware technology, it has led to a continuous decline in memory prices, while memory bandwidth and memory capacity are constantly increasing. The above factors have made the memory computing system represented by Spark gradually become the mainstream big data processing today. platform. Spark provides a library called Spark SQL for processing structured and semi-structured data. Aggregation operators are one of the most frequently occurring and expensive data processing operators. Spark SQL implements a two-stage data aggregation method for the aggregation operator. In the first stage, each node performs an aggregation operation on the local data according to the primary key. In the second stage, the data with the same primary key is transmitted to the same node. Above, after the transmission, each node performs an aggregation operation on the local data, which correspond to two stages (Stage) in Spark.
将数据提前缓存到内存中然后再进行处理,能大幅提升Spark SQL中聚集算子的处理性能,然而缓存带来的性能收益在实际过程中往往取决于缓存之后的数据分布,当数据分布不均,即数据倾斜发生时,整个系统的处理速度将受到极大的限制,以至于无法充分发挥内存计算的优势,由于聚集算子的主要计算开销集中在第一阶段中,第一阶段到第二阶段中会有一次数据清洗操作,故而数据倾斜发生情况下,仅有第一个阶段的执行受到影响。为了避免数据倾斜,用户需要对要处理的数据有着充分的先验知识,这往往在真实场景中是不太可能的,这也导致了数据倾斜这一现象较为常见。现有的技术大多基于数据采样来避免或者处理该问题,为了避免数据倾斜,系统需要提前对数据进行采样,利用采样的信息,来将数据进行均匀的划分,由于每次采样的准确性很难得以保证,它只能在某种程度上降低数据倾斜的程度,同时数据采样本身还需要引入额外的性能开销;在数据倾斜已经发生的情况下,处理数据倾斜往往需要在采样数据的基础上对数据进行重新分布,它进一步引入了数据重新分区的开销,这加深了性能损耗。Caching data in memory in advance and then processing can greatly improve the processing performance of aggregation operators in Spark SQL. However, the performance benefits brought by caching often depend on the data distribution after caching in the actual process. When data distribution is uneven , that is, when data skew occurs, the processing speed of the entire system will be greatly limited, so that the advantages of memory computing cannot be fully utilized. Since the main calculation overhead of the aggregation operator is concentrated in the first stage, the first stage to the second stage There will be a data cleaning operation in the stage, so when data skew occurs, only the execution of the first stage will be affected. In order to avoid data skew, users need to have sufficient prior knowledge of the data to be processed, which is often not possible in real scenarios, which also leads to the common phenomenon of data skew. Most of the existing technologies are based on data sampling to avoid or deal with this problem. In order to avoid data skew, the system needs to sample the data in advance and use the sampled information to evenly divide the data. Because the accuracy of each sampling is difficult to obtain To ensure that it can only reduce the degree of data skew to a certain extent, at the same time, data sampling itself needs to introduce additional performance overhead; when data skew has already occurred, processing data skew often needs to be based on the sampled data. Data is redistributed, which further introduces the overhead of data repartitioning, which deepens performance loss.
综上所述,缓存后的数据分布对聚集算子性能有着巨大的性能影响,主要是导致了聚集算子第一阶段的执行时间大幅上升。现有的优化方案一方面引入了采样开销,通过数据采样来避免数据倾斜的发生,但保证数据采样的精确性往往是一个巨大的挑战,这也让数据分布往往在现实中较为常见。另一方面,虽然可以通过重新分布数据来处理数据倾斜,但重新分布数据往往导致了巨大的计算和网络开销。To sum up, the cached data distribution has a huge performance impact on the performance of the aggregation operator, mainly resulting in a significant increase in the execution time of the first stage of the aggregation operator. On the one hand, existing optimization schemes introduce sampling overhead to avoid data skew through data sampling, but ensuring the accuracy of data sampling is often a huge challenge, which also makes data distribution often more common in reality. On the other hand, although data skew can be dealt with by redistributing data, redistributing data often results in huge computational and network overhead.
发明内容Contents of the invention
本发明结合Spark系统提供的缓存分区信息,实现了一种轻量的动态执行方法,目的是优化Spark SQL聚集算子第一阶段在数据倾斜下的数据处理性能。The present invention combines the cache partition information provided by the Spark system to realize a lightweight dynamic execution method, with the purpose of optimizing the data processing performance of the first stage of the Spark SQL aggregation operator under data skew.
为优化上述算子性能,本发明在Spark系统上提供的技术方案是,一种面向SparkSQL聚集算子的动态执行方法。所述动态执行方法包括数据倾斜检测和任务偷取两个阶段,数据倾斜检测阶段采用检测算法来检测数据倾斜现象:若存在数据倾斜,则执行分区依赖关系构建算法识别并构建大小数据分区之间的依赖关系,根据依赖关系,在原生执行任务的基础上,增加了两类新的执行任务,即分段任务和偷取任务,并为每个分段任务和偷取任务分别分配一个大数据分区和一个小数据分区;任务偷取阶段为分段任务设置了分段处理执行方式,使分段任务以段为单位处理被分配的大数据分区;为偷取任务设置了数据偷取执行方式,使偷取任务处理完被分配的小数据分区后,主动从分段任务的大数据分区中偷取数据并进行处理。In order to optimize the performance of the above operators, the technical solution provided by the present invention on the Spark system is a dynamic execution method for SparkSQL aggregation operators. The dynamic execution method includes two stages of data skew detection and task stealing. In the data skew detection stage, a detection algorithm is used to detect the data skew phenomenon: if there is data skew, the partition dependency construction algorithm is executed to identify and build a data partition between large and small data partitions. According to the dependencies, on the basis of the original execution tasks, two new types of execution tasks are added, namely segmentation tasks and stealing tasks, and a big data is assigned to each segmentation task and stealing task Partition and a small data partition; the task stealing stage sets the segmented processing execution method for the segmented task, so that the segmented task processes the allocated large data partition in units of segments; sets the data stealing execution method for the stealing task , so that after the stealing task finishes processing the allocated small data partition, it will actively steal data from the large data partition of the segmentation task and process it.
所述方法中数据倾斜检测阶段中的检测算法,其步骤如下:The detection algorithm in the data inclination detection stage in the described method, its steps are as follows:
步骤A-1:从Spark系统内部的键值存储子系统中获取Spark关于所有缓存数据分区的大小信息;Step A-1: Obtain Spark's size information about all cached data partitions from the key-value storage subsystem inside the Spark system;
步骤A-2:根据数据分区的大小信息,将所有数据分区从小到大进行排序后,放入一个新的数组中,标记为分区数组,设置头尾指针分别指向数组的头尾;Step A-2: According to the size information of the data partition, sort all the data partitions from small to large, put them into a new array, mark it as a partitioned array, and set the head and tail pointers to point to the head and tail of the array respectively;
步骤A-3:判断头尾指针指向分区的大小是否相差一个数量级,如果是,那么存在数据倾斜,不是则不存在;Step A-3: Determine whether the sizes of the partitions pointed to by the head and tail pointers differ by an order of magnitude, if yes, then there is data skew, otherwise, it does not exist;
步骤A-4:运行分区依赖关系构建算法,识别并建立大小数据分区之间的依赖关系;Step A-4: Run the partition dependency construction algorithm to identify and establish dependencies between large and small data partitions;
步骤A-5:根据依赖关系,将负责处理大数据分区的执行任务标记为分段任务,将负责处理小数据分区的执行任务标记为偷取任务;Step A-5: According to the dependency relationship, mark the execution tasks responsible for processing large data partitions as segmentation tasks, and mark the execution tasks responsible for processing small data partitions as stealing tasks;
步骤A-6:根据A-4中构建的依赖关系,找到每个偷取任务负责处理的小数据分区所对应的大数据分区,让偷取任务同时保有需要处理的小数据分区信息和需要偷取的大数据分区信息。Step A-6: According to the dependencies built in A-4, find the big data partition corresponding to the small data partition that each stealing task is responsible for processing, so that the stealing task can also keep the information of the small data partition that needs to be processed and the information that needs to be stolen. The fetched big data partition information.
所述方法中数据倾斜检测阶段分区依赖关系构建算法主要流程如下:The main flow of the partition dependency construction algorithm in the data skew detection stage in the method is as follows:
步骤B-1:判断分区数组的头尾指针指向的数据分区大小是否相差一个数量级,如果是,那么头指针指向的分区为小数据分区,尾指针指向的分区为大数据分区,保留此时的依赖关系;Step B-1: Determine whether the size of the data partition pointed to by the head and tail pointers of the partition array is an order of magnitude different. If yes, then the partition pointed to by the head pointer is a small data partition, and the partition pointed to by the tail pointer is a large data partition. dependency;
步骤B-2:在记录了一对依赖关系后,头指针后移一位,尾指针前移一位;Step B-2: After recording a pair of dependencies, the head pointer is moved backward by one bit, and the tail pointer is moved forward by one bit;
步骤B-3:当头指针和尾指针所指向的分区数据大小相差一个数量级时,重复步骤B-1和步骤B-2,直到头尾指针相邻时结束;当头指针和尾指针所指向的数据分区大小相差小于一个数量级时,进入步骤B-4;Step B-3: When the size of the partition data pointed to by the head pointer and the tail pointer differs by an order of magnitude, repeat steps B-1 and B-2 until the end when the head and tail pointers are adjacent; when the data pointed to by the head pointer and the tail pointer When the difference in partition size is less than an order of magnitude, go to step B-4;
步骤B-4:保留此时尾指针所在的位置为K,将尾指针放置数组尾部,重复步骤B-1和B-2记录分区依赖关系,当尾指针到达K时,则表示所有的依赖关系已经被构建完毕。Step B-4: Keep the position of the tail pointer at this time as K, place the tail pointer at the end of the array, repeat steps B-1 and B-2 to record partition dependencies, and when the tail pointer reaches K, it indicates all dependencies has been constructed.
所述方法中任务偷取阶段为分段任务设置的分段处理执行方式,流程如下:In the method, the task stealing stage is the segmentation processing execution mode set for the segmentation task, and the process is as follows:
步骤 C-1:查看被分配的大数据分区是否被构建缓存分区块,如果是,直接进入C-2,如果否,那么先构建缓存分区块,然后进入C-2;Step C-1: Check whether the allocated large data partition has been built into a cache partition block, if yes, go directly to C-2, if not, then build a cache partition block first, and then go to C-2;
步骤C-2:在缓存分区块的基础上,每次按照缓存分区块中的读取数量从分区数据读取对应大小数据,读取完后,将缓存分区块的数据头部索引置为原数据头部索引值加上读取数量后的值;Step C-2: On the basis of the cache partition block, read the data of the corresponding size from the partition data according to the number of reads in the cache partition block each time. After reading, set the data header index of the cache partition block to the original The index value of the data header plus the value after reading the quantity;
步骤 C-3:按照正常任务逻辑处理读取到的数据;Step C-3: Process the read data according to the normal task logic;
步骤 C-4:重复步骤C-2和步骤C-3,直到所有缓存分区块的数据头部索引和数据尾部索引值相同,将处理标识标记为1,表示处理结束。Step C-4: Repeat steps C-2 and C-3 until the data head index and data tail index value of all cache partition blocks are the same, and mark the processing flag as 1, indicating that the processing is over.
所述方法中任务偷取阶段为偷取任务设置的数据偷取执行方式,流程如下:In the task stealing stage of the method, the data stealing execution mode set for the stealing task is as follows:
步骤D-1:按照原生执行任务逻辑处理被分配的小数据分区;Step D-1: Process the allocated small data partitions according to the logic of native execution tasks;
步骤D-2:查看需要偷取的大数据分区是否被构建缓存分区块,如果是,直接进入D-3,如果否,则先构建缓存分区块;Step D-2: Check whether the large data partition to be stolen has been constructed as a cache partition block, if yes, go directly to D-3, if not, first construct a cache partition block;
步骤D-3:查看需要偷取的缓存分区块是否被处理完毕,如果是,那么直接返回,如果否,直接进入D-4;Step D-3: Check whether the cache partition block to be stolen has been processed, if yes, return directly, if not, directly enter D-4;
步骤D-4:默认从缓存分区块中偷取未处理数据的三分之一进行处理,偷取完后,将缓存分区块的数据尾部索引减去偷取的数据量。Step D-4: By default, one-third of the unprocessed data is stolen from the cache partition for processing. After the stealing is completed, the data tail index of the cache partition is subtracted from the amount of stolen data.
任务偷取阶段中所涉及到缓存分区块,在任务偷取阶段由第一个访问内存中某个大数据分区的执行任务生成,包含以下属性:分区数据、数据头部索引、数据尾部索引、读取数量、处理标识位,缓存分区块被保存在Spark系统的每个执行器中,在Spark系统的每个执行阶段中是独有的。The cache partition blocks involved in the task stealing phase are generated by the first execution task that accesses a large data partition in the memory in the task stealing phase, and include the following attributes: partition data, data header index, data tail index, The number of reads, processing flags, and cache partition blocks are stored in each executor of the Spark system, and are unique to each execution stage of the Spark system.
构建缓存分区块时,首先将内存中的大数据分区读取并保存为一个分区数据,分区数据本质上是一个保存分区所有数据的数组;数据头部索引在构建时默认指向分区数据数组的头部,默认值为0;数据尾部索引指向分区数据数组的尾部,默认值为数组的长度值;读取数量根据头部索引、尾部索引和需要划分的段数共同计算得出,默认是将一个大数据分区划分成十份,因此读取数量为头部索引值与尾部索引值之和除以十;处理标识位仅由负责该分区的分段任务才能进行更改,默认值为0,该处理标识位用于通知访问该缓存分区块的其他任务,该分区中的数据已经全部被处理执行完毕。When building a cache partition block, first read and save the large data partition in the memory as a partition data. The partition data is essentially an array that stores all the data of the partition; the data head index points to the head of the partition data array by default when it is constructed. part, the default value is 0; the data tail index points to the tail of the partition data array, and the default value is the length value of the array; the number of reads is calculated based on the head index, tail index and the number of segments to be divided, and the default is a large The data partition is divided into ten parts, so the number of reads is the sum of the head index value and the tail index value divided by ten; the processing flag can only be changed by the segmentation task responsible for the partition, the default value is 0, the processing flag The bit is used to notify other tasks accessing the cache partition that all the data in the partition has been processed and executed.
本发明的有益效果是,通过上述一种面向Spark SQL聚集算子的动态执行方法,减少了传统方案中采样和重新分区的开销,平衡了执行任务之间的工作负载,在数据倾斜状态下,大幅提升了聚集算子的数据处理性能。The beneficial effects of the present invention are that, through the above-mentioned dynamic execution method oriented to the Spark SQL aggregation operator, the overhead of sampling and re-partitioning in the traditional scheme is reduced, and the workload between execution tasks is balanced. In the state of data skew, The data processing performance of the aggregation operator has been greatly improved.
附图说明Description of drawings
图1是本发明实施例中面向Spark SQL聚集算子的动态执行方法数据倾斜检测阶段的流程图;Fig. 1 is the flow chart of the data inclination detection stage of the dynamic execution method facing Spark SQL aggregation operator in the embodiment of the present invention;
图2是本发明实施例中面向Spark SQL聚集算子的动态执行方法数据倾斜检测阶段的分区依赖关系构建算法流程图;Fig. 2 is in the embodiment of the present invention face the dynamic execution method data tilt detection stage of the Spark SQL aggregation operator to the partition dependency construction algorithm flow chart;
图3是本发明实施例中面向Spark SQL聚集算子的动态执行方法任务偷取阶段分段任务执行方式流程图;Fig. 3 is in the embodiment of the present invention facing the dynamic execution method task of Spark SQL aggregation operator task stealing phase segmentation task execution mode flow chart;
图4是本发明实施例中面向Spark SQL聚集算子的动态执行方法任务偷取阶段偷取任务执行方式的流程图。Fig. 4 is a flow chart of the stealing task execution mode in the task stealing stage of the dynamic execution method oriented to the Spark SQL aggregation operator in the embodiment of the present invention.
具体实施方式Detailed ways
本发明在Spark系统的基础上实现了一种轻量的动态执行方法,其目的是优化Spark SQL聚集算子在数据倾斜下的数据处理性能。下面结合实施例及附图,详细描述本发明的技术方案。The present invention implements a lightweight dynamic execution method on the basis of the Spark system, and its purpose is to optimize the data processing performance of the Spark SQL aggregation operator under data skew. The technical solution of the present invention will be described in detail below in combination with the embodiments and the accompanying drawings.
本发明实施例中一种面向Spark SQL聚集算子的动态执行方法,它对Spark系统的调度器(DAGScheduler)、任务(Task)以及块管理器(BlockManager)三大部分进行了修改。修改后的调度器在执行聚合操作之前执行检测算法,检查数据是否存在倾斜,如果存在,则执行分区依赖关系构建算法识别并构建大小数据分区之间的依赖关系,根据依赖关系,为数据分区分配执行任务。修改后的执行任务有三类,分别是本发明新增的负责处理大数据分区的分段任务和负责处理小数据分区的偷取任务,以及Spark系统原生的执行任务。原生的执行任务所负责处理的数据分区大小适中,保留Spark原执行方式,而新增的分段任务和偷取任务则分别以分段处理执行方式和数据偷取执行方式处理数据,并通过修改后的块管理器访问它们所负责处理的数据分区。In the embodiment of the present invention, there is a dynamic execution method for Spark SQL aggregation operators, which modifies three parts of the Spark system, the scheduler (DAGScheduler), the task (Task) and the block manager (BlockManager). The modified scheduler executes the detection algorithm before performing the aggregation operation, checks whether the data is skewed, and if so, executes the partition dependency construction algorithm to identify and build the dependencies between large and small data partitions, and allocates data partitions according to the dependencies perform tasks. There are three types of modified execution tasks, which are newly added segmentation tasks for processing large data partitions and stealing tasks for processing small data partitions in the present invention, and original execution tasks of the Spark system. The size of the data partition processed by the original execution task is moderate, and the original execution mode of Spark is retained, while the newly added segmentation task and stealing task process data in the execution mode of segment processing and data stealing respectively, and modify Subsequent block managers access the data partitions they are responsible for processing.
本发明实施例中一种面向Spark SQL聚集算子的动态执行方法包括数据倾斜检测和任务偷取两个阶段,数据倾斜检测阶段采用检测算法来检测数据倾斜现象:如果发现存在数据倾斜,则执行分区依赖关系构建算法构建数据分区之间的依赖关系,根据依赖关系,在原生执行任务的基础上,增加了两类新的执行任务,即分段任务和偷取任务,并为每个分段任务和偷取任务分配一个数据分区;任务偷取阶段为分段任务设置了分段处理执行方式,使分段任务以段为单位处理被分配的数据分区;为偷取任务设置了数据偷取执行方式,使偷取任务处理完被分配的数据分区后,主动从分段任务的数据分区中偷取数据并进行处理。In the embodiment of the present invention, a dynamic execution method for Spark SQL aggregation operators includes two stages of data skew detection and task stealing. In the data skew detection stage, a detection algorithm is used to detect data skew phenomenon: if data skew is found, execute The partition dependency construction algorithm builds the dependencies between data partitions. According to the dependencies, on the basis of the original execution tasks, two new types of execution tasks are added, namely segmentation tasks and stealing tasks, and each segment A data partition is assigned to the task and the stealing task; in the task stealing stage, the segmented processing execution method is set for the segmented task, so that the segmented task processes the allocated data partition in units of segments; data stealing is set for the stealing task The execution mode enables the stealing task to actively steal data from the data partition of the segmented task and process it after processing the allocated data partition.
所述方法在块管理器中为内存中的大数据分区设置了一个新的数据结构,缓存分区块。它在任务偷取阶段由第一个访问内存中某个大数据分区的任务生成,作用是为了从逻辑上将一个数据分区切分为多个细粒度的段,为多个任务在同一分区上并发执行提供保证。缓存分区块包含以下属性:分区数据、数据头部索引、数据尾部索引、读取数量、处理标识位,保存在Spark系统的块管理器中,在Spark系统的每个执行阶段中是独有的,这保证了访问该缓存分区块的不同应用的并发执行正确性。The method sets a new data structure for the large data partition in the memory in the block manager, and caches the partition block. It is generated by the first task that accesses a large data partition in the memory during the task stealing phase. Its function is to logically divide a data partition into multiple fine-grained segments, and provide multiple tasks on the same partition. Concurrent execution is guaranteed. The cache partition block contains the following attributes: partition data, data header index, data tail index, read quantity, and processing identification bit, which are stored in the block manager of the Spark system and are unique in each execution stage of the Spark system , which ensures the correctness of concurrent execution of different applications accessing the cache partition.
在构建缓存分区块时,首先将内存中的大数据分区读取并保存为一个分区数据,分区数据本质上是一个保存分区所有数据的数组;数据头部索引在构建时默认指向分区数据数组的头部,默认值为0;数据尾部索引指向分区数据数组的尾部,默认值为数组的长度值;读取数量根据头部索引、尾部索引和需要划分的段数共同计算得出,默认是将一个大数据分区划分成十份,因此读取数量为头部索引值与尾部索引值之和除以十;处理标识位仅由负责该分区的分段任务才能进行更改,默认值为0,该处理标识位用于通知访问该缓存分区块的其他任务,该分区中的数据已经全部被处理执行完毕。When building a cache partition block, first read and save the large data partition in the memory as a partition data. The partition data is essentially an array that stores all the data of the partition; the data header index points to the partition data array by default when it is constructed. The head, the default value is 0; the data tail index points to the tail of the partition data array, and the default value is the length value of the array; the number of reads is calculated based on the head index, tail index and the number of segments to be divided, and the default is a The large data partition is divided into ten parts, so the number of reads is the sum of the head index value and the tail index value divided by ten; the processing flag can only be changed by the segmentation task responsible for the partition, the default value is 0, the processing The identification bit is used to notify other tasks accessing the cache partition that all the data in the partition has been processed and executed.
本发明数据倾斜检测阶段的流程图参见图1,其包括以下步骤:Referring to Fig. 1 for the flowchart of the data inclination detection stage of the present invention, it comprises the following steps:
步骤S101:从Spark系统内部的键值存储子系统中获取Spark关于所有缓存数据分区的大小信息;Step S101: Obtain Spark's size information about all cached data partitions from the key-value storage subsystem inside the Spark system;
步骤S102:根据缓存数据分区的大小,将分区按升序进行排列,放入数组中,设置头尾指针分别指向数组的头尾;Step S102: Arrange the partitions in ascending order according to the size of the buffered data partitions, put them into the array, and set the head and tail pointers to point to the head and tail of the array respectively;
步骤S103:判断头尾指针指向分区的大小是否相差一个数量级(默认值1024),如果是那么存在数据倾斜,不是则不存在;Step S103: Determine whether the sizes of the partitions pointed to by the head and tail pointers differ by an order of magnitude (the default value is 1024), if so, there is data skew, otherwise, it does not exist;
步骤S104:如存在数据倾斜,根据分区依赖关系构建算法识别大小数据分区并建立它们之间的依赖关系;Step S104: If there is data skew, identify large and small data partitions and establish dependencies between them according to the partition dependency construction algorithm;
步骤S105:根据分区依赖关系,检查执行任务所负责数据分区,如果为大数据分区,那么将任务标记为分段任务;如果为小数据分区,那么标记为偷取任务;当执行任务的负责数据分区没有出现在依赖关系中时,该任务为原生的执行任务,按Spark原执行方式处理数据分区;Step S105: According to the partition dependencies, check the data partition responsible for the execution task. If it is a large data partition, then mark the task as a segmentation task; if it is a small data partition, then mark it as a stealing task; When the partition does not appear in the dependency relationship, the task is a native execution task, and the data partition is processed according to the original execution method of Spark;
步骤S106:根据分区依赖关系,为偷取任务分配一个它们负责的小数据分区所对应的大数据分区。Step S106: According to the partition dependency, allocate a large data partition corresponding to the small data partition they are responsible for for the stealing tasks.
本发明分区依赖关系构建算法的流程图参见图2,其主要流程如下:Referring to Fig. 2 for the flow chart of partition dependency construction algorithm of the present invention, its main flow is as follows:
步骤S201:判断分区数组的头尾指针指向的数据分区大小是否相差一个数量级(默认值为1024),如果是,那么头指针指向的分区为小数据分区,尾指针指向的分区为大数据分区,保留此时它们的依赖关系;Step S201: Determine whether the sizes of the data partitions pointed to by the head and tail pointers of the partition array differ by an order of magnitude (the default value is 1024). If so, the partition pointed to by the head pointer is a small data partition, and the partition pointed to by the tail pointer is a large data partition. retain their dependencies at this point;
步骤S202:在记录了一对依赖关系后,头指针后移一位,尾指针前移一位;Step S202: After recording a pair of dependencies, the head pointer is moved backward by one bit, and the tail pointer is moved forward by one bit;
步骤S203:当头指针和尾指针所指向的分区数据大小相差一个数量级时,重复步骤S201和步骤S202,直到头尾指针相邻时结束;当头指针和尾指针所指向的分区数据大小没有相差一个数量级时,进入步骤S204;Step S203: When the size of the partition data pointed to by the head pointer and the tail pointer differs by an order of magnitude, repeat steps S201 and S202 until the end when the head and tail pointers are adjacent; when the size of the partition data pointed to by the head pointer and the tail pointer does not differ by an order of magnitude , enter step S204;
步骤S204:保留此时尾指针所在的位置为K,将尾指针放置数组尾部,重复步骤S201和S202记录分区依赖关系,当发现尾指针到达K时,则表示所有的依赖关系已经被构建完毕。Step S204: Keep the position of the tail pointer at this time as K, place the tail pointer at the end of the array, repeat steps S201 and S202 to record partition dependencies, and when the tail pointer reaches K, it means that all dependencies have been constructed.
本发明任务偷取阶段分段任务执行的分段处理执行方式流程图参见图3,其包括以下步骤:Referring to Figure 3, the flow chart of the segmented processing execution mode of segmented task execution in the task stealing stage of the present invention includes the following steps:
步骤S301:检查待处理的大数据分区是否已经被构建缓存分区块,如果否,那么根据大数据分区的信息构建之:将大数据分区的数据数组放入缓存分区块的分区数据中,数据头部索引放置分区数据的头部,数据尾部索引放置分区数据的尾部,默认处理标识位标记为0;如果该大数据分区已经被构建缓存分区块,那么直接进入S303;Step S301: Check whether the large data partition to be processed has been constructed as a cache partition block, if not, then construct it according to the information of the large data partition: put the data array of the large data partition into the partition data of the cache partition block, and the data header The head of the partition data is placed in the part index, the tail of the partition data is placed in the data tail index, and the default processing flag is marked as 0; if the large data partition has been constructed with a cache partition block, then directly enter S303;
步骤S302:根据数据头部索引和尾部索引之和除以需要逻辑切分的段数量(默认值10),计算出每次读取分区数据的读取数量;Step S302: According to dividing the sum of data head index and tail index by the number of segments that need to be logically divided (default value is 10), calculate the number of partition data read each time;
步骤S303:按照读取数量的值从分区数据中读取数据并处理直到全部处理完毕,在处理完之后将处理标识位标记为1。在每次读取数据前,首先对缓存分区块加锁,避免多个执行任务之间互相干扰,在读取完数据之后,将数据头部索引设置为原数据头部索引加上读取数量后的值,表示下次开始处理的位置,做完上述操作之后立即释放锁,并开始处理数据。Step S303: Read the data from the partition data according to the value of the read quantity and process until all the data are processed, and mark the processing flag as 1 after the processing is completed. Before reading data each time, first lock the cache partition block to avoid mutual interference between multiple execution tasks. After reading the data, set the data header index to the original data header index plus the number of reads The last value indicates the position where the processing will start next time. After the above operations are completed, the lock is released immediately and data processing begins.
本发明任务偷取阶段偷取任务执行的数据偷取执行方式流程图参见图4,其包括以下步骤:The flow chart of the data stealing execution mode of the stealing task execution in the task stealing stage of the present invention is shown in Figure 4, which includes the following steps:
步骤S401:读取并处理小数据分区的数据;Step S401: read and process the data of the small data partition;
步骤S402:检查待偷取的大数据分区是否已经被构建缓存分区块,如果否,那么根据分区信息构建之:将大数据分区的数据数组放入缓存分区块的分区数据中,数据头部索引放置分区数据的头部,数据尾部索引放置分区数据的尾部,默认处理标识位标记为0;如果该大数据分区已经被构建缓存分区块,那么进入S403;Step S402: Check whether the large data partition to be stolen has been constructed as a cache partition block, if not, then construct it according to the partition information: put the data array of the large data partition into the partition data of the cache partition block, and the data head index Place the head of the partition data, the data tail index places the tail of the partition data, and the default processing flag is marked as 0; if the large data partition has been constructed as a cache partition block, then enter S403;
步骤S403:通过判断缓存分区块的处理标识位是否为1,检查需要偷取的缓存分区块是否已经被处理完毕,如果是,那么直接返回,如果否,那么进行下一步;Step S403: By judging whether the processing flag of the cache partition block is 1, check whether the cache partition block to be stolen has been processed, if yes, then return directly, if not, proceed to the next step;
步骤S404:对缓存分区块加锁以保证多个执行任务的同步;根据访问缓存分区块时的数据头部索引和尾部索引值,从缓存分区块的分区数据中偷取部分未处理数据(默认偷取未处理数据的三分之一,即尾部索引值减去数据头部索引的差值除以三),偷到数据后将数据尾部索引的值减去偷取的数据量;释放锁并对偷取到的数据进行处理。Step S404: Lock the cache partition to ensure the synchronization of multiple execution tasks; according to the data head index and tail index value when accessing the cache partition, steal part of the unprocessed data from the partition data of the cache partition (default Steal one-third of the unprocessed data, that is, the difference between the tail index value minus the data head index divided by three), and after stealing the data, subtract the value of the data tail index from the amount of stolen data; release the lock and Process the stolen data.
Claims (7)
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201910650455.0A CN110442594A (en) | 2019-07-18 | 2019-07-18 | A kind of Dynamic Execution method towards Spark SQL Aggregation Operators |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201910650455.0A CN110442594A (en) | 2019-07-18 | 2019-07-18 | A kind of Dynamic Execution method towards Spark SQL Aggregation Operators |
Publications (1)
Publication Number | Publication Date |
---|---|
CN110442594A true CN110442594A (en) | 2019-11-12 |
Family
ID=68429724
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN201910650455.0A Pending CN110442594A (en) | 2019-07-18 | 2019-07-18 | A kind of Dynamic Execution method towards Spark SQL Aggregation Operators |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN110442594A (en) |
Cited By (3)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN112115191A (en) * | 2020-09-22 | 2020-12-22 | 南京北斗创新应用科技研究院有限公司 | Branch optimization method executed by big data ETL model |
CN112905628A (en) * | 2021-03-26 | 2021-06-04 | 第四范式(北京)技术有限公司 | Data processing method and device |
CN113495923A (en) * | 2021-02-09 | 2021-10-12 | 深圳市云网万店科技有限公司 | Scheduling management method and system for distributed database executor |
Citations (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US20170097957A1 (en) * | 2015-10-01 | 2017-04-06 | International Business Machines Corporation | System and method for transferring data between rdbms and big data platform |
CN107220123A (en) * | 2017-05-25 | 2017-09-29 | 郑州云海信息技术有限公司 | One kind solves Spark data skew method and system |
CN108319604A (en) * | 2017-01-16 | 2018-07-24 | 南京烽火软件科技有限公司 | The associated optimization method of size table in a kind of hive |
CN108572873A (en) * | 2018-04-24 | 2018-09-25 | 中国科学院重庆绿色智能技术研究院 | A load balancing method and device for solving Spark data skew problem |
CN109144707A (en) * | 2017-06-16 | 2019-01-04 | 田文洪 | A kind of unbalanced method of processing big data platform Spark data distribution |
-
2019
- 2019-07-18 CN CN201910650455.0A patent/CN110442594A/en active Pending
Patent Citations (5)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US20170097957A1 (en) * | 2015-10-01 | 2017-04-06 | International Business Machines Corporation | System and method for transferring data between rdbms and big data platform |
CN108319604A (en) * | 2017-01-16 | 2018-07-24 | 南京烽火软件科技有限公司 | The associated optimization method of size table in a kind of hive |
CN107220123A (en) * | 2017-05-25 | 2017-09-29 | 郑州云海信息技术有限公司 | One kind solves Spark data skew method and system |
CN109144707A (en) * | 2017-06-16 | 2019-01-04 | 田文洪 | A kind of unbalanced method of processing big data platform Spark data distribution |
CN108572873A (en) * | 2018-04-24 | 2018-09-25 | 中国科学院重庆绿色智能技术研究院 | A load balancing method and device for solving Spark data skew problem |
Non-Patent Citations (1)
Title |
---|
UMUT A. ACAR ET AL.: ""Scheduling Parallel Programs by Work Stealing with Private Deques"", 《ACM》 * |
Cited By (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN112115191A (en) * | 2020-09-22 | 2020-12-22 | 南京北斗创新应用科技研究院有限公司 | Branch optimization method executed by big data ETL model |
CN113495923A (en) * | 2021-02-09 | 2021-10-12 | 深圳市云网万店科技有限公司 | Scheduling management method and system for distributed database executor |
CN112905628A (en) * | 2021-03-26 | 2021-06-04 | 第四范式(北京)技术有限公司 | Data processing method and device |
CN112905628B (en) * | 2021-03-26 | 2024-01-02 | 第四范式(北京)技术有限公司 | Data processing method and device |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
US8788543B2 (en) | Scalable, concurrent resizing of hash tables | |
US8364909B2 (en) | Determining a conflict in accessing shared resources using a reduced number of cycles | |
CN101866359B (en) | Small file storage and visit method in avicade file system | |
US8793528B2 (en) | Dynamic hypervisor relocation | |
Luo et al. | {SMART}: A {High-Performance} adaptive radix tree for disaggregated memory | |
JP7340326B2 (en) | Perform maintenance operations | |
US10984073B2 (en) | Dual phase matrix-vector multiplication system | |
WO2018027839A1 (en) | Method for accessing table entry in translation lookaside buffer (tlb) and processing chip | |
JP7724318B2 (en) | Hardware-based memory compression | |
CN109461113B (en) | A data structure-oriented graphics processor data prefetching method and device | |
KR20130018742A (en) | Gpu support for garbage collection | |
CN113641596B (en) | Cache management method, cache management device and processor | |
CN110442594A (en) | A kind of Dynamic Execution method towards Spark SQL Aggregation Operators | |
Li et al. | Leveraging NVMe SSDs for building a fast, cost-effective, LSM-tree-based KV store | |
Lee et al. | ActiveSort: Efficient external sorting using active SSDs in the MapReduce framework | |
CN107220069B (en) | Shuffle method for nonvolatile memory | |
KR102326280B1 (en) | Method, apparatus, device and medium for processing data | |
WO2020125362A1 (en) | File system and data layout method | |
CN116955348A (en) | A database index construction method and device | |
WO2016106738A1 (en) | Transaction conflict detection method and apparatus and computer system | |
Min et al. | Sephash: A write-optimized hash index on disaggregated memory via separate segment structure | |
CN104899158A (en) | Memory access optimization method and memory access optimization device | |
US12019629B2 (en) | Hash-based data structure | |
CN120066987B (en) | Data prefetcher, processor and device for graph computing | |
CN113253947B (en) | A deduplication method, apparatus, device and readable storage medium |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
WD01 | Invention patent application deemed withdrawn after publication | ||
WD01 | Invention patent application deemed withdrawn after publication |
Application publication date: 20191112 |