CN114490027A - Distributed job adjustment method, master node, system, physical machine and storage medium - Google Patents
Distributed job adjustment method, master node, system, physical machine and storage medium Download PDFInfo
- Publication number
- CN114490027A CN114490027A CN202111583453.8A CN202111583453A CN114490027A CN 114490027 A CN114490027 A CN 114490027A CN 202111583453 A CN202111583453 A CN 202111583453A CN 114490027 A CN114490027 A CN 114490027A
- Authority
- CN
- China
- Prior art keywords
- execution
- data
- stage
- node
- job
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Granted
Links
Images
Classifications
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5083—Techniques for rebalancing the load in a distributed system
-
- G—PHYSICS
- G06—COMPUTING OR CALCULATING; COUNTING
- G06N—COMPUTING ARRANGEMENTS BASED ON SPECIFIC COMPUTATIONAL MODELS
- G06N20/00—Machine learning
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Artificial Intelligence (AREA)
- Computer Vision & Pattern Recognition (AREA)
- Data Mining & Analysis (AREA)
- Evolutionary Computation (AREA)
- Medical Informatics (AREA)
- Computing Systems (AREA)
- Mathematical Physics (AREA)
- Management, Administration, Business Operations System, And Electronic Commerce (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
Description
本申请是申请号为202110950182.9、申请日期为2021年8月18日、发明名称为“分布式作业调整方法、主节点、系统、物理机及存储介质”的申请的分案申请。This application is a divisional application of an application with an application number of 202110950182.9, an application date of August 18, 2021, and an invention title of "distributed job adjustment method, master node, system, physical machine and storage medium".
技术领域technical field
本申请实施例涉及分布式技术领域,具体涉及一种分布式作业调整方法、主节点、系统、物理机及存储介质。The embodiments of the present application relate to the field of distributed technologies, and in particular, to a distributed job adjustment method, a master node, a system, a physical machine, and a storage medium.
背景技术Background technique
分布式系统是多个物理机通过通信线路互联而构成的系统,具有分布性、自治性、并行性、全局性等特点。利用分布式系统执行用户提交的作业,可通过分布式系统的分布式计算能力,提升作业执行效率。A distributed system is a system composed of multiple physical machines interconnected through communication lines, and has the characteristics of distribution, autonomy, parallelism, and globality. Using a distributed system to execute jobs submitted by users can improve the efficiency of job execution through the distributed computing capabilities of the distributed system.
分布式系统主要包括主节点和工作节点。用户提交的作业可以由主节点生成执行计划。主节点可以对生成的执行计划进行配置。基于执行计划的配置,主节点在作业的执行过程中,可以调度工作节点和资源来实现作业的具体执行。基于分布式系统的广泛应用,本领域技术人员一直致力于提升分布式系统的性能。A distributed system mainly includes a master node and a worker node. Jobs submitted by users can be executed by the master node. The master node can configure the generated execution plan. Based on the configuration of the execution plan, the master node can schedule worker nodes and resources to implement the specific execution of the job during the execution of the job. Based on the wide application of distributed systems, those skilled in the art have been working to improve the performance of distributed systems.
发明内容SUMMARY OF THE INVENTION
有鉴于此,本申请实施例提供一种分布式作业调整方法、主节点、系统、物理机及存储介质,以在作业执行过程中,准确、合理的动态配置执行计划,实现作业配置的动态调整,从而提升分布式系统的性能。In view of this, the embodiments of the present application provide a distributed job adjustment method, a master node, a system, a physical machine, and a storage medium, so as to accurately and reasonably dynamically configure an execution plan during job execution, so as to realize dynamic adjustment of job configuration , thereby improving the performance of the distributed system.
为实现上述目的,本申请实施例提供如下技术方案。To achieve the above purpose, the embodiments of the present application provide the following technical solutions.
第一方面,本申请实施例提供一种分布式作业调整方法,包括:In a first aspect, an embodiment of the present application provides a distributed job adjustment method, including:
获取用户提交的作业;Get jobs submitted by users;
生成所述作业的执行计划,所述执行计划包括多个执行阶段,所述多个执行阶段包括上游执行阶段以及所述上游执行阶段的直接下游执行阶段;generating an execution plan for the job, the execution plan including a plurality of execution stages, the plurality of execution stages including an upstream execution stage and an execution stage directly downstream of the upstream execution stage;
在作业的执行过程中,获取所述上游执行阶段的输出数据的统计信息;During the execution of the job, obtain the statistical information of the output data of the upstream execution stage;
根据所述统计信息,对所述直接下游执行阶段进行配置,以使得所述直接下游执行阶段基于配置结果执行作业。According to the statistical information, the direct downstream execution stage is configured so that the direct downstream execution stage executes the job based on the configuration result.
第二方面,本申请实施例提供一种分布式作业调整方法,包括:In a second aspect, an embodiment of the present application provides a distributed job adjustment method, including:
获取深度学习作业;Get deep learning assignments;
生成所述深度学习作业的执行计划,所述执行计划包括多个执行阶段,所述多个执行阶段包括:工作器执行阶段和资源优化执行阶段;所述工作器执行阶段用于计算深度学习参数的梯度;An execution plan for the deep learning job is generated, the execution plan includes multiple execution stages, and the multiple execution stages include: a worker execution stage and a resource optimization execution stage; the worker execution stage is used to calculate deep learning parameters the gradient of ;
在所述深度学习作业的执行过程中,调度所述资源优化执行阶段对应的资源优化节点,通过所述资源优化节点确定与深度学习作业的当前执行状态相匹配的历史使用的资源信息;During the execution of the deep learning job, schedule resource optimization nodes corresponding to the resource optimization execution stage, and determine historically used resource information that matches the current execution state of the deep learning job through the resource optimization nodes;
通过资源优化节点,为所述工作器执行阶段配置所述资源信息。The resource information is configured for the worker execution phase through a resource optimization node.
第三方面,本申请实施例提供一种主节点,所述主节点被配置为执行如上述第一方面或第二方面所述的分布式作业调整方法。In a third aspect, an embodiment of the present application provides a master node, where the master node is configured to execute the distributed job adjustment method described in the first aspect or the second aspect.
第四方面,本申请实施例提供一种分布式系统,所述分布式系统包括主节点和多个工作节点,所述主节点如上述第三方面所述的主节点。In a fourth aspect, an embodiment of the present application provides a distributed system, where the distributed system includes a master node and a plurality of worker nodes, where the master node is the master node described in the third aspect.
第五方面,本申请实施例提供一种物理机,所述物理机包括至少一个存储器和至少一个处理器,所述存储器存储一条或多条计算机可执行指令,所述处理器调用所述一条或多条计算机可执行指令,以执行如上述第一方面或第二方面所述的分布式作业调整方法。In a fifth aspect, an embodiment of the present application provides a physical machine, where the physical machine includes at least one memory and at least one processor, the memory stores one or more computer-executable instructions, and the processor calls the one or more A plurality of computer-executable instructions are used to execute the distributed job adjustment method according to the first aspect or the second aspect.
第六方面,本申请实施例提供一种存储介质,所述存储介质存储一条或多条计算机可执行指令,所述一条或多条计算机可执行指令被执行时实现如上述第一方面或第二方面所述的分布式作业调整方法。In a sixth aspect, an embodiment of the present application provides a storage medium, where the storage medium stores one or more computer-executable instructions, and when the one or more computer-executable instructions are executed, the above-mentioned first aspect or the second The distributed job adjustment method described in the aspect.
本申请实施例提供的分布式作业调整方法中,主节点获取用户提交的作业之后,可生成作业的执行计划,所述执行计划包括多个执行阶段,所述多个执行阶段包括上游执行阶段以及所述上游执行阶段的直接下游执行阶段。在作业执行过程中,主节点可获取上游执行阶段的输出数据的统计信息,从而根据所述统计信息,对直接下游执行阶段进行配置,以使得所述直接下游执行阶段基于配置结果执行作业。由于本申请实施例能够在作业执行过程中,基于上游执行阶段的数据输出结果,动态调整下游执行阶段的配置,以使得下游执行阶段的配置能够适应于上游执行阶段的实际执行结果,从而使得下游执行阶段的并发度、资源等配置能够贴合作业的具体执行情况,提升了执行计划配置的合理性和准确性。可见,本申请实施例提供的分布式作业调整方法,能够在作业执行过程中对执行计划的配置进行动态调整,实现作业的动态调整效果,并使得执行计划的配置能够贴合作业的具体执行情况;进而基于动态配置的执行计划实现作业的具体执行,能够使得分布式系统更为合理高效的完成作业执行,显著提升分布式系统执行作业的性能。In the distributed job adjustment method provided by the embodiment of the present application, after the master node obtains the job submitted by the user, it can generate an execution plan of the job, the execution plan includes multiple execution stages, and the multiple execution stages include an upstream execution stage and The immediate downstream execution stage of the upstream execution stage. During the job execution process, the master node can obtain the statistical information of the output data of the upstream execution stage, so as to configure the direct downstream execution stage according to the statistical information, so that the direct downstream execution stage executes the job based on the configuration result. Because the embodiments of the present application can dynamically adjust the configuration of the downstream execution stage based on the data output result of the upstream execution stage during the job execution process, so that the configuration of the downstream execution stage can be adapted to the actual execution result of the upstream execution stage, so that the downstream execution stage can be adapted to the actual execution result of the upstream execution stage. The configuration of concurrency and resources in the execution phase can match the specific execution situation of the job, and improve the rationality and accuracy of the configuration of the execution plan. It can be seen that the distributed job adjustment method provided by the embodiment of the present application can dynamically adjust the configuration of the execution plan during the execution of the job, realize the dynamic adjustment effect of the job, and make the configuration of the execution plan fit the specific execution situation of the job. Furthermore, the specific execution of the job is realized based on the dynamically configured execution plan, which can make the distributed system complete the job execution more reasonably and efficiently, and significantly improve the performance of the distributed system to execute the job.
附图说明Description of drawings
为了更清楚地说明本申请实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本申请的实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据提供的附图获得其他的附图。In order to more clearly illustrate the embodiments of the present application or the technical solutions in the prior art, the following briefly introduces the accompanying drawings required for the description of the embodiments or the prior art. Obviously, the drawings in the following description are only It is an embodiment of the present application. For those of ordinary skill in the art, other drawings can also be obtained according to the provided drawings without any creative effort.
图1A为分布式系统的结构示意图。FIG. 1A is a schematic structural diagram of a distributed system.
图1B为执行计划的示意图。FIG. 1B is a schematic diagram of an execution plan.
图1C为DAG的示意图。Figure 1C is a schematic diagram of a DAG.
图1D为逻辑图与物理图的映射示意图。FIG. 1D is a schematic diagram of mapping between a logical map and a physical map.
图2为本申请实施例提供的分布式作业调整方法的流程图。FIG. 2 is a flowchart of a distributed job adjustment method provided by an embodiment of the present application.
图3A为直接下游stage的处理数据分配示意图。FIG. 3A is a schematic diagram of processing data distribution in the directly downstream stage.
图3B为本申请实施例提供的分布式作业调整方法的另一流程图。FIG. 3B is another flowchart of the distributed job adjustment method provided by the embodiment of the present application.
图3C为Partition分配给直接下游stage的示例图。Figure 3C is an example diagram of Partitions assigned to directly downstream stages.
图4A为数据shuffle的示例图。FIG. 4A is an example diagram of data shuffling.
图4B为本申请实施例提供的分布式作业调整方法的再一流程图。FIG. 4B is still another flowchart of the distributed job adjustment method provided by the embodiment of the present application.
图4C为本申请实施例进行Partition拆分的示例图。FIG. 4C is an example diagram of partition splitting according to an embodiment of the present application.
图5A为Join过程的示例图。FIG. 5A is an example diagram of a Join process.
图5B为Join过程的另一示例图。FIG. 5B is another example diagram of the Join process.
图5C为本申请实施例提供的分布式作业调整方法的又一流程图。FIG. 5C is another flowchart of the distributed job adjustment method provided by the embodiment of the present application.
图5D为Join过程的再一示例图。FIG. 5D is another example diagram of the Join process.
图5E示出了Join过程的又一示例图。Figure 5E shows yet another example diagram of the Join process.
图5F为在图5D基础上进一步示出的union操作的示例图。FIG. 5F is an example diagram of a union operation further shown on the basis of FIG. 5D .
图5G为在图5E基础上进一步示出的union操作的示例图。FIG. 5G is an example diagram of a union operation further shown on the basis of FIG. 5E .
图6A为Sort Merge Join的示例图。FIG. 6A is an example diagram of Sort Merge Join.
图6B为Broadcast Join的示例图。FIG. 6B is an example diagram of Broadcast Join.
图6C为本申请实施例提供的分布式作业调整方法的又另一流程图。FIG. 6C is still another flowchart of the distributed job adjustment method provided by the embodiment of the present application.
图6D为携带多条执行路径的执行计划的示意图。FIG. 6D is a schematic diagram of an execution plan carrying multiple execution paths.
图7A为生成携带多条执行路径的执行计划的流程图。FIG. 7A is a flowchart of generating an execution plan with multiple execution paths.
图7B、图7C、图7D和图7E示出了物理计划转化为执行计划的过程示例图。Figures 7B, 7C, 7D and 7E illustrate example diagrams of a process of converting a physical plan into an execution plan.
图7F为选择执行路径之后完整执行计划的示例图。FIG. 7F is an example diagram of a complete execution plan after selecting an execution path.
图7G为选择执行路径之后完整执行计划的另一示例图。FIG. 7G is another example diagram of a complete execution plan after an execution path is selected.
图8A为并行连连接的PS stage和Worker stage的示例图。FIG. 8A is an example diagram of a PS stage and a Worker stage connected in parallel.
图8B为本申请实施例提供的分布式作业调整方法的又再一流程图。FIG. 8B is still another flowchart of the distributed job adjustment method provided by the embodiment of the present application.
图8C为Resource Optimization节点调整Worker节点的资源示例图。FIG. 8C is a diagram showing an example of resource adjustment of the Worker node by the Resource Optimization node.
图9为物理机的结构框图。FIG. 9 is a structural block diagram of a physical machine.
具体实施方式Detailed ways
下面将结合本申请实施例中的附图,对本申请实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本申请一部分实施例,而不是全部的实施例。基于本申请中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本申请保护的范围。The technical solutions in the embodiments of the present application will be clearly and completely described below with reference to the drawings in the embodiments of the present application. Obviously, the described embodiments are only a part of the embodiments of the present application, but not all of the embodiments. Based on the embodiments in the present application, all other embodiments obtained by those of ordinary skill in the art without creative efforts shall fall within the protection scope of the present application.
分布式作业可以理解为将作业提交给分布式系统进行执行。图1A示例性的示出了分布式系统的结构示意图。如图1A所示,分布式系统可以包括:主节点110和多个工作节点120。主节点110和工作节点120可以视为是分布式系统中的计算节点,计算节点可承载于具有数据计算能力的物理机,一个物理机可以承载一个或多个计算节点。A distributed job can be understood as submitting a job to a distributed system for execution. FIG. 1A exemplarily shows a schematic structural diagram of a distributed system. As shown in FIG. 1A , the distributed system may include: a
在分布式系统中,主节点110为用于管理、控制的计算节点。例如,主节点110可以管理工作节点120、协调作业在执行计划的各执行阶段相应的并发度和资源等。在一些方面,主节点110作为分布式系统中的中心管控节点,也称为分布式系统的执行引擎。工作节点120为分布式系统中具体执行作业的计算节点,可受主节点110的管理和协调来执行作业。In a distributed system, the
在分布式系统执行作业时,作业可由用户通过终端提交给集群资源管理器,集群资源管理器拉起主节点110。从而主节点110可解析作业,生成执行计划。执行计划描述的是作业的数据从最开始的源表,在经历一系列的数据流动、执行、以及变化后,最终产生输出的过程。图1B示例性的示出了执行计划的示意图。如图1B所示,执行计划可以包括:多个具有层级关系的stage(执行阶段)。在一些实施例中,stage之间可以是树状的层级结构。一个stage可以包括一个或多个task(任务)。针对每个stage,主节点110可以通过配置工作节点数量(并发度)、所使用的资源等来实现调度多个工作节点并行执行stage的task,从而实现作业在分布式系统中的执行。When the distributed system executes the job, the job can be submitted by the user to the cluster resource manager through the terminal, and the cluster resource manager pulls up the
在一些实施例中,作业一般是由终端以请求的方式提交给分布式系统。在一个示例中,终端提交的作业包括查询数据库的查询语句,例如SQL(Structured QueryLanguage,结构化查询语言)语句。In some embodiments, jobs are typically submitted by terminals to the distributed system on a request basis. In one example, the job submitted by the terminal includes a query statement for querying a database, such as a SQL (Structured QueryLanguage, structured query language) statement.
在进一步的一些实施例中,执行计划可以通过DAG(Directed Acyclic Graph,有向无环图)描述。DAG包括多个顶点(vertex)以及顶点之间的连接边(edge)。图1C示例性的示出了DAG的示意图。需要说明的是,DAG实际的顶点数量、层级、连接边可能相比于图1C更为复杂,图1C仅是出于便于理解目的而示出的简易DAG示例。如图1C所示,DAG可以包括4个顶点V1至V4,以及连接边11、12、13和14。其中,连接边11连接顶点V1和V2、连接边12连接顶点V1和V3、连接边13连接顶点V2和V4、连接边14连接顶点V3和V4。In some further embodiments, the execution plan can be described by DAG (Directed Acyclic Graph). A DAG includes a plurality of vertices and connecting edges between the vertices. Figure 1C exemplarily shows a schematic diagram of a DAG. It should be noted that the actual number of vertices, levels, and connecting edges of the DAG may be more complicated than those in FIG. 1C , which is only a simple DAG example for the purpose of easy understanding. As shown in FIG. 1C , the DAG may include four vertices V1 to V4 , and connecting
DAG中的一个顶点可以表示执行计划中一个独立的stage。顶点之间的连接边可以是有向连接边,表示顶点之间的关系。基于连接边指向的关系,顶点连接的连接边可能是顶点的输入连接边(输入连接边指向顶点),也可能是顶点的输出连接边(输出连接边由顶点指向其他顶点)。例如图1C中,连接边12指向V3,为V3的输入连接边;连接边14由V3输出,为V3的输出连接边;而连接边12是由V1输出,因此连接边12又作为V1的输出连接边;连接边14输入V4,因此连接边14又作为V4的输入连接边。A vertex in a DAG can represent a separate stage in the execution plan. Connecting edges between vertices can be directed connecting edges, which represent relationships between vertices. Based on the relationship that the connecting edges point to, the connecting edges connected by vertices may be input connecting edges of vertices (input connecting edges point to vertices), or may be output connecting edges of vertices (output connecting edges point from vertices to other vertices). For example, in Figure 1C, the connecting
连接边连接的两个顶点中,连接边输出的顶点称为另一顶点的直接上游顶点,连接边输入的顶点称为另一顶点的直接下游顶点。例如图1C中,连接边12连接V1和V3中,V1输出连接边12且连接边12输入V3,则V1可称为V3的直接上游顶点,V3可称为V1的直接下游顶点。一个顶点可能具有一个或多个直接上游顶点、一个或多个直接下游顶点。需要说明的是,一个顶点除具有直接上游顶点之外,可能还具有间接上游顶点,间接上游顶点与该顶点并不直接连接,而是处于该顶点的上层并且与该顶点之间通过一个或多个顶点相连接。例如图1C中,V1处于V4的上层,V1通过V2或V3与V4连接,因此V1可称为V4的间接上游顶点。显然,一个顶点除具有直接下游顶点之外,可能还具有间接下游顶点,间接下游顶点与该顶点并不直接连接,而是处于该顶点的下层并且与该顶点之间通过一个或多个顶点相连接。例如图1C中,V4处于V1的下层,且通过V2或V3与V1连接,因此V4可称为V1的间接下游顶点。顶点的上游顶点可以包括直接上游顶点和间接下游顶点,顶点的下游顶点可以包括直接下游顶点和间接下游顶点。Among the two vertices connected by a connecting edge, the vertex that connects the output of the connecting edge is called the immediate upstream vertex of the other vertex, and the vertex that connects the input of the connecting edge is called the immediate downstream vertex of the other vertex. For example, in FIG. 1C, connecting
顶点的执行可能依赖于直接上游顶点,即顶点与直接上游顶点存在执行依赖关系,顶点需要在直接上游顶点执行后才能执行;顶点的执行也可能不依赖于直接上游顶点,而是可与直接上游顶点并行执行。The execution of a vertex may depend on the immediate upstream vertex, that is, there is an execution dependency between the vertex and the immediate upstream vertex, and the vertex needs to be executed after the immediate upstream vertex is executed; Vertices are executed in parallel.
在进一步的一些实施例中,DAG可以具有两个层面的表述:逻辑图与物理图。逻辑图可以认为是执行计划的一个自然延伸,描述的是用户针对作业想要实现的数据执行流程。物理图则体现了执行计划的各stage映射到分布式系统的物理属性,描述的是执行计划的各stage在执行层面的并发度、工作节点使用的资源、数据传输方式等物理属性。In further embodiments, the DAG may have two levels of representation: a logical graph and a physical graph. The logic diagram can be considered as a natural extension of the execution plan, which describes the data execution flow that the user wants to implement for the job. The physical map reflects the physical attributes of the execution plan mapped to the distributed system by each stage, and describes the physical attributes such as the concurrency of each stage of the execution plan at the execution level, the resources used by the worker nodes, and the data transmission method.
图1D示例性的示出了逻辑图与物理图的映射示意图。为便于示例,图1D仅以执行计划具有4个stage进行示意。如图1D所示,逻辑图描述了执行计划的4个顶点(顶点V0、V1、V2和V3)以及各个顶点的关系(例如,顶点V0指向顶点V2,顶点V1和顶点V2指向顶点V3),一个顶点对应执行计划的一个stage。逻辑图可以体现执行计划的数据执行流程。在将逻辑图映射为物理图之后,物理图可以描述各个stage需要配置的并发度、各stage的工作节点使用的资源(例如CPU资源、内存资源等)、数据传输方式等物理属性。例如,结合图1D示例,物理图描述了顶点V0需要配置3个工作节点(并发度为3),顶点V1、V2和V3分别需要配置2个工作节点(并发度为2)。也就是说,物理图能够表达DAG中顶点和连接边的物理属性。通过物理图描述的顶点和连接边的物理属性,主节点可以为各个stage调度工作节点和资源,以使得stage中的task可以被多个工作节点并行执行,实现作业在分布式系统中的执行。FIG. 1D exemplarily shows a schematic diagram of a mapping between a logical map and a physical map. For the convenience of illustration, FIG. 1D only illustrates that the execution plan has 4 stages. As shown in Figure 1D, the logic diagram describes the 4 vertices (vertices V0, V1, V2, and V3) of the execution plan and the relationship of each vertex (for example, vertex V0 points to vertex V2, and vertex V1 and vertex V2 point to vertex V3), A vertex corresponds to a stage of the execution plan. The logic diagram can reflect the data execution flow of the execution plan. After the logical graph is mapped to a physical graph, the physical graph can describe the concurrency that needs to be configured for each stage, the resources used by the worker nodes of each stage (such as CPU resources, memory resources, etc.), data transmission methods, and other physical attributes. For example, combined with the example in Figure 1D, the physical diagram describes that vertex V0 needs to be configured with 3 worker nodes (with a concurrency of 3), and vertices V1, V2, and V3 need to be configured with 2 worker nodes (with a concurrency of 2). That is, a physical graph can express the physical properties of vertices and connecting edges in a DAG. Through the physical attributes of the vertices and connecting edges described by the physical graph, the master node can schedule worker nodes and resources for each stage, so that tasks in the stage can be executed in parallel by multiple worker nodes, realizing the execution of jobs in a distributed system.
本申请实施例所指的配置执行计划可以包括配置执行计划的逻辑,以及配置执行计划的物理属性。配置执行计划的逻辑可以认为是在逻辑图层面对执行计划进行配置,例如,配置执行计划的执行流程等。配置执行计划的物理属性可以认为是在物理图层面对执行计划进行配置,例如,配置执行计划各stage的并发度、资源、数据传输方式等物理属性。The configuration execution plan referred to in the embodiment of the present application may include the logic of the configuration execution plan and the physical properties of the configuration execution plan. The logic of configuring the execution plan can be considered as configuring the execution plan at the logical layer, for example, configuring the execution flow of the execution plan. Configuring the physical properties of the execution plan can be considered as configuring the execution plan at the physical layer, for example, configuring the physical properties such as the concurrency, resources, and data transmission methods of each stage of the execution plan.
如果执行计划在作业具体执行之前配置,并且执行计划配置之后无法在作业的具体执行过程中调整,这种执行计划称为静态执行计划。也就是说,静态执行计划在作业执行之前配置完成,并且无法在作业具体执行过程中调整。然而,在作业具体执行之前并无法准确预估执行计划各stage需要实际使用的资源,以及执行计划合理的执行路径,这无疑导致静态执行计划的配置难以合理和准确,降低了分布式系统执行作业的性能。If the execution plan is configured before the specific execution of the job, and after the configuration of the execution plan cannot be adjusted during the specific execution of the job, such an execution plan is called a static execution plan. That is, a static execution plan is configured before job execution and cannot be adjusted during job execution. However, it is impossible to accurately estimate the actual resources required by each stage of the execution plan and the reasonable execution path of the execution plan before the specific execution of the job, which undoubtedly makes the configuration of the static execution plan difficult to be reasonable and accurate. performance.
基于此,本申请实施例提供新型的执行计划配置方案,该方案能够在作业执行过程中,基于上游stage的数据输出结果,动态调整下游stage的配置,以使得下游stage的配置能够适应于上游stage的实际执行结果,从而使得下游stage的并发度、资源等配置能够贴合作业的具体执行情况,提升执行计划配置的合理性和准确性。Based on this, the embodiments of the present application provide a new execution plan configuration solution, which can dynamically adjust the configuration of the downstream stage based on the data output results of the upstream stage during the job execution process, so that the configuration of the downstream stage can be adapted to the upstream stage. Therefore, the configuration of the concurrency and resources of the downstream stage can match the specific execution situation of the job, and improve the rationality and accuracy of the configuration of the execution plan.
作为一种可选实现,图2示例性的示出了本申请实施例提供的分布式作业调整方法的流程图。该方法流程可由主节点执行实现,参照图2,该方法流程可以包括如下步骤。As an optional implementation, FIG. 2 exemplarily shows a flowchart of the distributed job adjustment method provided by the embodiment of the present application. The method flow may be implemented by the master node. Referring to FIG. 2 , the method flow may include the following steps.
在步骤S210中,获取用户提交的作业。In step S210, the job submitted by the user is acquired.
在步骤S211中,生成所述作业的执行计划,所述执行计划包括多个stage,所述多个stage包括上游stage以及所述上游stage的直接下游stage。In step S211, an execution plan of the job is generated, and the execution plan includes a plurality of stages, and the plurality of stages include an upstream stage and a stage directly downstream of the upstream stage.
终端向分布式系统提交作业之后,分布式系统中的主节点可解析作业,生成作业的执行计划,所述执行计划可由DAG描述。所述执行计划可以包括多个stage,该多个stage中可以包括上游stage,以及该上游stage的直接下游stage。在一些实施例中,该多个stage中任一存在下游stage的stage均可以视为是所述上游stage,在作业的具体执行过程中,上游stage的输出数据可输入直接下游stage进行处理。一个上游stage的输出数据可以输入一个或多个直接下游stage,一个直接下游stage也可能输入一个或多个上游stage的输出数据;也就是说,上游stage可以对应一个或多个直接下游stage,一个直接下游stage也可以对应一个或多个上游stage。在一些实施例中,上游stage可称为直接下游stage的上一stage,直接下游stage可称为上游stage的下一stage。After the terminal submits the job to the distributed system, the master node in the distributed system can parse the job and generate an execution plan of the job, and the execution plan can be described by the DAG. The execution plan may include multiple stages, and the multiple stages may include an upstream stage and a stage directly downstream of the upstream stage. In some embodiments, any stage with a downstream stage in the plurality of stages can be regarded as the upstream stage, and during the specific execution process of the job, the output data of the upstream stage can be input directly to the downstream stage for processing. The output data of an upstream stage can be input to one or more directly downstream stages, and a direct downstream stage may also input the output data of one or more upstream stages; that is, an upstream stage can correspond to one or more directly downstream stages, a A directly downstream stage can also correspond to one or more upstream stages. In some embodiments, the upstream stage may be referred to as the previous stage of the directly downstream stage, and the directly downstream stage may be referred to as the next stage of the upstream stage.
在一个示例中,结合图1C所示,V1为V2和V3的上游stage,V2和V3为V1的直接下游stage,在作业的执行过程中,V1的输出数据可输入V2和V3进行处理;而V2和V3又作为V4的上游stage,V4作为V2和V3的直接下游stage,V2和V3的输出数据可输入V4进行处理。本申请实施例关注的一个方面是直接下游stage需要配置多少并发度、资源等,来处理上游stage的输出数据;比如V2和V3分别需要配置多少并发度、资源等,来处理V1的输出数据,V4需要配置多少并发度、资源等,来处理V2和V3的输出数据。In an example, as shown in Figure 1C, V1 is the upstream stage of V2 and V3, and V2 and V3 are the direct downstream stages of V1. During the execution of the job, the output data of V1 can be input to V2 and V3 for processing; and V2 and V3 also serve as the upstream stage of V4, V4 serves as the direct downstream stage of V2 and V3, and the output data of V2 and V3 can be input to V4 for processing. One aspect that the embodiments of this application focus on is how much concurrency, resources, etc., need to be configured for the direct downstream stage to process the output data of the upstream stage; for example, how much concurrency, resources, etc. need to be configured for V2 and V3 respectively to process the output data of V1, How much concurrency and resources need to be configured for V4 to process the output data of V2 and V3.
在步骤S212中,在作业的执行过程中,获取所述上游stage的输出数据的Statistics(统计信息)。In step S212, during the execution of the job, the Statistics (statistical information) of the output data of the upstream stage is acquired.
在作业执行过程中,任一stage执行完成后,主节点可收集该stage的工作节点的输出数据的Statistics(统计信息)。基于此,在执行计划的某一上游stage执行完成后,主节点可收集到该上游stage的输出数据的统计信息。在一些实施例中,上游stage的输出数据的Statistics可以包括如下任一项:输出数据的数据量(例如输出数据分别在数据压缩前和压缩后的数据量)、输出数据的Partition(数据分区)的数据量分布信息、输出数据中每个Partition的Record(序列化数据记录)数量等。During job execution, after the execution of any stage is completed, the master node can collect the Statistics of the output data of the worker nodes of the stage. Based on this, after an upstream stage of the execution plan is executed, the master node can collect the statistics of the output data of the upstream stage. In some embodiments, the Statistics of the output data of the upstream stage may include any of the following: the data volume of the output data (for example, the data volume of the output data before and after data compression respectively), the Partition of the output data (data partition) The data volume distribution information, the number of Records (serialized data records) of each Partition in the output data, etc.
在步骤S213中,根据所述Statistics,对所述直接下游stage进行配置,以使得所述直接下游stage基于配置结果执行作业。In step S213, the directly downstream stage is configured according to the Statistics, so that the directly downstream stage executes a job based on the configuration result.
基于上游stage的输出数据的统计信息,主节点可对直接下游stage进行配置,以使得直接下游stage的配置能够适应于上游stage的实际执行结果,从而使得下游stage的并发度、资源等配置能够贴合作业的具体执行情况;进而,直接下游stage可基于配置结果执行作业,实现直接下游stage相应的task能够被合理、高效的执行完成。Based on the statistics of the output data of the upstream stage, the master node can configure the direct downstream stage, so that the configuration of the direct downstream stage can be adapted to the actual execution result of the upstream stage, so that the concurrency, resources and other configurations of the downstream stage can be posted. The specific execution situation of the cooperative business; further, the direct downstream stage can execute the job based on the configuration result, so that the corresponding task of the direct downstream stage can be executed reasonably and efficiently.
在作业执行过程中,以本申请实施例提供的方式,动态配置上游stage的直接下游stage,能够使得执行计划在作业执行过程中实现配置的动态调整,并使得执行计划的配置能够贴合作业的具体执行情况。也就是说,在作业执行过程中,本申请实施例可基于上游stage的输出数据与统计信息来对分布式作业进行动态调整,实现作业的动态调整效果。进而基于动态配置的执行计划实现作业的具体执行,能够使得分布式系统更为合理高效的完成作业执行,显著提升分布式系统执行作业的性能。During job execution, in the manner provided by the embodiments of the present application, the directly downstream stages of the upstream stage are dynamically configured, so that the execution plan can be dynamically adjusted during the job execution process, and the configuration of the execution plan can be adapted to the job's configuration. specific implementation. That is, during the job execution process, the embodiments of the present application can dynamically adjust the distributed job based on the output data and statistical information of the upstream stage, so as to realize the dynamic adjustment effect of the job. Further, the specific execution of the job is realized based on the dynamically configured execution plan, which enables the distributed system to complete the job execution more reasonably and efficiently, and significantly improves the performance of the distributed system to execute the job.
在一些实施例中,基于上游stage的输出数据的统计信息,对直接下游stage进行配置可以包括如下任一项:In some embodiments, based on the statistics of the output data of the upstream stage, configuring the directly downstream stage may include any of the following:
根据所述统计信息,对直接下游stage配置并发度;According to the statistical information, configure the degree of concurrency for the direct downstream stage;
根据所述统计信息,为直接下游stage分配Partition;例如,将上游stage输出的Partition分配给直接下游stage的工作节点,或者,在Join(连接)场景下,将上游stage输出的Partition拆分后分配给直接下游stage,以在直接下游stage的工作节点进行Join操作;Assign a Partition to the directly downstream stage according to the statistical information; for example, assign the Partition output by the upstream stage to the worker node of the directly downstream stage, or, in the Join (connection) scenario, split the Partition output by the upstream stage and assign it Give the direct downstream stage to perform the Join operation on the worker node of the direct downstream stage;
根据所述统计信息,选择后续执行的直接下游stage;在执行计划中,上游stage的下游可以配置候选的多条执行路径,以便在作业具体执行过程中,基于上游stage的执行结果,从多条执行路径中选择实际执行的执行路径,使得执行计划的执行逻辑更为准确、合理;在此情况下,一条执行路径中的stage可以至少包括上游stage的直接下游stage;通过对上游stage后续候选的多条执行路径进行选择,能够实现执行逻辑的动态调整,并实现对后续执行的直接下游stage的选择。According to the statistical information, the direct downstream stage for subsequent execution is selected; in the execution plan, the downstream of the upstream stage can be configured with multiple candidate execution paths, so that during the specific execution of the job, based on the execution result of the upstream stage, from multiple execution paths The actual execution path is selected in the execution path to make the execution logic of the execution plan more accurate and reasonable; in this case, the stages in an execution path can include at least the direct downstream stage of the upstream stage; The selection of multiple execution paths enables dynamic adjustment of execution logic and selection of directly downstream stages for subsequent execution.
针对于上述根据统计信息对直接下游stage进行配置的各种情况,后文将分别进行说明,此处不再展开。For the above-mentioned various situations of configuring the direct downstream stage based on statistical information, the following sections will describe them respectively, and will not be expanded here.
本申请实施例提供的分布式作业调整方法中,主节点获取用户提交的作业之后,可生成作业的执行计划,所述执行计划包括多个stage,所述多个stage包括上游stage以及所述上游stage的直接下游stage。在作业执行过程中,主节点可获取上游stage的输出数据的统计信息,从而根据所述统计信息,对直接下游stage进行配置。由于本申请实施例能够在作业执行过程中,基于上游stage的数据输出结果,动态调整下游stage的配置,以使得下游stage的配置能够适应于上游stage的实际执行结果,从而使得下游stage的并发度、资源等配置能够贴合作业的具体执行情况,提升了执行计划配置的合理性和准确性。可见,本申请实施例提供的分布式作业调整方法,能够在作业执行过程中对执行计划的配置进行动态调整,实现作业配置的动态调整效果,并使得执行计划的配置能够贴合作业的具体执行情况;进而基于动态配置的执行计划实现作业的具体执行,能够使得分布式系统更为合理高效的完成作业执行,实现显著提升分布式系统执行作业的性能。In the distributed job adjustment method provided by the embodiment of the present application, after acquiring the job submitted by the user, the master node can generate an execution plan of the job, the execution plan includes multiple stages, and the multiple stages include an upstream stage and the upstream stage. The stage directly downstream of the stage. During the job execution process, the master node can obtain the statistical information of the output data of the upstream stage, so as to configure the directly downstream stage according to the statistical information. Because the embodiment of the present application can dynamically adjust the configuration of the downstream stage based on the data output result of the upstream stage during the job execution process, so that the configuration of the downstream stage can be adapted to the actual execution result of the upstream stage, so that the concurrency of the downstream stage can be improved. The allocation of resources, etc. can match the specific implementation of the business, and improve the rationality and accuracy of the allocation of the implementation plan. It can be seen that the distributed job adjustment method provided by the embodiment of the present application can dynamically adjust the configuration of the execution plan during the execution of the job, realize the dynamic adjustment effect of the job configuration, and make the configuration of the execution plan fit the specific execution of the job. Then, the specific execution of the job is realized based on the dynamically configured execution plan, which can make the distributed system complete the job execution more reasonably and efficiently, and significantly improve the performance of the distributed system to execute the job.
下面对基于上游stage的输出数据的统计信息,配置直接下游stage的并发度的实现方案进行介绍。The following describes the implementation scheme for configuring the concurrency of the direct downstream stage based on the statistical information of the output data of the upstream stage.
针对静态执行计划,执行计划各stage的并发度可以是由主节点通过预估规则确定。例如,主节点在作业提交后,可以根据作业的源数据总量,使用预估规则,配置执行计划各stage的并发度(可以视为是配置DAG中各顶点的并发度),或者,基于用户指定的不同种类的stage的并发度,对执行计划各stage的并发度进行配置。然而,分布式作业处理的源数据复杂多样,主节点往往难以依赖预估规则,来配置适应不同作业的并发度,这导致执行计划各stage的并发度配置并不准确。例如,对于处理数据量较小的stage,如果静态配置了较大的并发度,将导致分布式系统的计算资源浪费;而对于处理数据量较大的stage,如果静态配置了较小的并发度,将导致stage的执行时间延长,甚至带来内存使用超限等各种错误,导致作业执行失败。因此在静态配置并发度的实现中,为避免stage配置的并发度较低而难以处理海量数据的可能,stage往往需要配置较高并发度,这导致作业在实际执行过程中,存在较多的计算资源浪费。例如,对于一个Map(映射)-Reduce(归约)的作业而言,即使上游Map阶段在实际运行过程中只产生1KB的输出数据,但下游Reduce阶段还是会由于静态配置了较高的并发度,而调度较高数量的工作节点来处理这1KB的数据,这无疑导致了不必要的计算资源浪费。For static execution plans, the concurrency of each stage of the execution plan can be determined by the master node through estimation rules. For example, after the job is submitted, the master node can use the estimation rule to configure the concurrency of each stage of the execution plan according to the total amount of source data of the job (which can be regarded as configuring the concurrency of each vertex in the DAG), or, based on the user Specify the concurrency of different types of stages, and configure the concurrency of each stage of the execution plan. However, the source data processed by distributed jobs is complex and diverse, and it is often difficult for the master node to rely on the estimation rules to configure the concurrency of different jobs, which leads to inaccurate configuration of the concurrency of each stage of the execution plan. For example, for a stage that processes a small amount of data, if a large concurrency is statically configured, it will lead to a waste of computing resources in the distributed system; while for a stage that processes a large amount of data, if a small concurrency is statically configured , which will prolong the execution time of the stage, and even bring about various errors such as memory usage exceeding the limit, resulting in the failure of job execution. Therefore, in the implementation of statically configured concurrency, in order to avoid the possibility that the stage is configured with a low concurrency and it is difficult to process massive data, the stage often needs to be configured with a high concurrency, which leads to more computations during the actual execution of the job. Waste of resources. For example, for a Map (mapping)-Reduce (reduction) job, even if the upstream Map stage only generates 1KB of output data during the actual operation, the downstream Reduce stage will still have a higher degree of concurrency due to static configuration. , and a higher number of worker nodes are scheduled to process the 1KB data, which undoubtedly leads to unnecessary waste of computing resources.
基于此,在作业执行过程中,基于上游stage的输出数据结果,为上游stage的直接下游stage配置并发度显得尤为必要。在一些实施例中,针对上游stage的直接下游stage,主节点可将上游stage输出数据中的Partition(数据分区),按照Partition数量均匀的原则,分配给直接下游stage的工作节点。例如,直接下游stage的每个工作节点会处理上游stage输出的连续多个Partition,并且每个工作节点处理的Partition数量相同。这种方式可称为基于Partition数量的Even Reduction(均匀还原)策略,其在每个Partition的数据量均匀的情况下,能够达到较为理想的效果;但是在实际环境下,数据分布特性多种多样并经常具有不均匀特性,对于非均匀数据(每个Partition的数据量并不均匀),上述方式可能导致直接下游stage的单个工作节点出现数据倾斜问题,即单个工作节点的处理数据量远大于其他工作节点,从而进一步导致工作节点的长尾问题,致使作业执行时间被不必要的拉长。例如,图3A示例性的示出了直接下游stage的处理数据分配示意图。如图3A所示,上游stage输出的每个Partition的数据量并不均匀,上游stage输出的Partition内的数值可以代表对应Partition的数据量,如果只是简单的合并多个指定数目的Partition给直接下游stage的单个工作节点,虽然每个工作节点处理的Partition数量一致(例如图3A中直接下游stage的每个工作节点都处理2个Partition),但这可能带来直接下游stage的工作节点在数据处理量上的分配不均匀:一方面,部分工作节点处理的数据不合理的偏小(甚至可能存在工作节点完全没有需要处理的数据),另一方面,部分工作节点会因为合并了个数据量较大的Partition,而进一步加剧数据倾斜问题,导致工作节点的运行时间被拉长而形成长尾。Based on this, in the process of job execution, it is particularly necessary to configure the concurrency for the direct downstream stage of the upstream stage based on the output data results of the upstream stage. In some embodiments, for the directly downstream stage of the upstream stage, the master node may allocate the partitions (data partitions) in the output data of the upstream stage to the work nodes of the directly downstream stage according to the principle of the number of partitions being even. For example, each worker node of the directly downstream stage will process multiple consecutive Partitions output by the upstream stage, and each worker node processes the same number of Partitions. This method can be called the Even Reduction strategy based on the number of Partitions, which can achieve ideal results when the data volume of each Partition is uniform; however, in the actual environment, the data distribution characteristics are diverse And it often has uneven characteristics. For non-uniform data (the amount of data in each Partition is not uniform), the above method may lead to a data skew problem in a single worker node of the direct downstream stage, that is, the amount of data processed by a single worker node is much larger than that of other nodes. worker nodes, which further leads to the long tail problem of worker nodes, causing the job execution time to be stretched unnecessarily. For example, FIG. 3A exemplarily shows a schematic diagram of processing data distribution in a directly downstream stage. As shown in Figure 3A, the data volume of each Partition output by the upstream stage is not uniform. The value in the Partition output by the upstream stage can represent the data volume of the corresponding Partition. If you simply merge multiple specified number of Partitions to the direct downstream For a single worker node of the stage, although the number of Partitions processed by each worker node is the same (for example, each worker node of the directly downstream stage in Figure 3A processes 2 Partitions), this may cause the worker nodes of the directly downstream stage to process data in the process. Uneven distribution in terms of volume: On the one hand, the data processed by some worker nodes is unreasonably small (there may even be worker nodes that have no data to process at all); Large Partition, which further exacerbates the data skew problem, causing the running time of the worker nodes to be elongated and forming a long tail.
此外,由于Partition的数量可能在整体上与实际计算需要的耗时是正相关的,但是对于一些数据类型(例如整型),其数据压缩率较大,有可能出现输出数据经过压缩后,数据文件被严重压缩的情况。在这种情况下,可能少量的Partition数量就对应着大量的数据记录(即数量较少的Partition中记录较大量的数据记录),因此简单的基于上游stage的Partition数量来做直接下游stage的并发度配置,在实际作业中可能带来较大的不确定性。In addition, since the number of Partitions may be positively related to the time-consuming of the actual calculation as a whole, but for some data types (such as integers), the data compression rate is relatively large, and it is possible that after the output data is compressed, the data file Severely compressed. In this case, a small number of Partitions may correspond to a large number of data records (that is, a large number of data records are recorded in a small number of Partitions), so the direct downstream stage concurrency is simply based on the number of Partitions in the upstream stage. degree configuration, which may bring greater uncertainty in actual operations.
基于上述情况,本申请实施例提供在作业执行过程中,基于上游stage的输出数据的统计信息,对直接下游stage的并发度进行适应动态调整的方案,并保障直接下游stage中每个工作节点的数据处理量趋近于均衡。作为可选实现,图3B示出了本申请实施例提供的分布式作业调整方法的另一流程图。该方法流程可由主节点执行实现,参照图3B,该方法流程可以包括如下步骤。Based on the above situation, the embodiment of the present application provides a solution for dynamically adjusting the concurrency of the direct downstream stage based on the statistical information of the output data of the upstream stage during the job execution process, and guarantees the The amount of data processing tends to be balanced. As an optional implementation, FIG. 3B shows another flowchart of the distributed job adjustment method provided by the embodiment of the present application. The method flow may be implemented by the master node. Referring to FIG. 3B , the method flow may include the following steps.
在步骤S310中,获取用户提交的作业。In step S310, the job submitted by the user is acquired.
在步骤S311中,生成所述作业的执行计划,所述执行计划包括多个stage,所述多个stage包括上游stage以及所述上游stage的直接下游stage。In step S311, an execution plan of the job is generated, and the execution plan includes a plurality of stages, and the plurality of stages includes an upstream stage and a stage directly downstream of the upstream stage.
在步骤S312中,至少对所述多个stage中首先执行的stage初始化配置并发度。In step S312, the concurrency is initially configured for at least the stage that is executed first among the multiple stages.
主节点可对执行计划的并发度进行初始化配置。在一些实施例中,主节点可根据作业的源数据总量,使用预估规则,对执行计划的并发度进行初始化配置;或者,主节点可基于用户指定的不同种类的stage的并发度,对执行计划的并发度进行初始化配置。The master node can initially configure the concurrency of the execution plan. In some embodiments, the master node can use estimation rules to initially configure the concurrency of the execution plan according to the total amount of source data of the job; The concurrency of the execution plan is initially configured.
在本申请实施例中,主节点可基于上游stage输出的数据量,来配置直接下游stage的并发度;基于此,由于执行计划中首先执行的stage没有上游stage,因此主节点在对执行计划的并发度进行初始化配置时,应至少对首先执行的stage初始化配置并发度,以使得首先执行的stage能通过初始化配置的并发度执行作业;而对于执行计划中非首先执行的stage,由于非首先执行的stage一般都具有上游stage,因此非首先执行的stage可以作为某一个或多个上游stage的直接下游stage,从而基于上游stage的输出数据的统计信息,来配置并发度。In this embodiment of the present application, the master node can configure the concurrency of the direct downstream stage based on the amount of data output by the upstream stage; based on this, since the first executed stage in the execution plan does not have an upstream stage, the master node is in the process of monitoring the execution plan. When initializing the configuration of concurrency, the concurrency should be initially configured for at least the stage executed first, so that the stage executed first can execute the job through the concurrency of the initial configuration; and for the stage that is not executed first in the execution plan, because it is not executed first The stages generally have upstream stages, so the stage that is not executed first can be used as the direct downstream stage of one or more upstream stages, so as to configure the concurrency based on the statistics of the output data of the upstream stage.
由于执行计划中非首先执行的stage,能够在作业执行过程中动态调整并发度,因此在初始化配置并发度时,本申请实施例并不一定需要对非首先执行的stage初始化配置并发度。当然,本申请实施例也可支持对非首先执行的stage初始化配置并发度,本申请实施例对此并不设限。在一个示例中,结合图1C所示,V1作为首先执行的stage,需要初始化配置并发度,而V2、V3和V4均存在上游stage,本申请实施例并不限制一定要对V2、V3和V4初始化配置并发度。Since the stage that is not executed first in the execution plan can dynamically adjust the degree of concurrency during job execution, when initializing and configuring the degree of concurrency, the embodiment of the present application does not necessarily need to initialize and configure the degree of concurrency for the stage that is not executed first. Of course, the embodiments of the present application may also support initializing and configuring the concurrency for stages that are not executed first, which is not limited in the embodiments of the present application. In an example, as shown in FIG. 1C , V1, as the first stage to be executed, needs to initialize the configuration concurrency, and V2, V3, and V4 all have upstream stages. This embodiment of the present application does not limit the need for V2, V3, and V4. Initialize the configuration concurrency.
在步骤S313中,在作业执行过程中,确定上游stage的输出数据的数据量分布信息,所述数据量分布信息包括输出数据对应的多个数据分区的数据量。In step S313, during the job execution process, the data volume distribution information of the output data of the upstream stage is determined, where the data volume distribution information includes the data volume of the multiple data partitions corresponding to the output data.
在作业执行过程中,主节点可按照上游stage的并发度,调度工作节点对上游stage的输入数据进行处理,从而上游stage调度的工作节点可在处理输入数据之后,产生上游stage的输出数据。上游stage的输出数据可以被切分为多个数据分区(Partition)。输出数据的数据分区的数据量可作为输出数据的数据量分布信息,即输出数据的数据量分布信息可以表示输出数据的每个数据分区所分布的数据量,该数据量分布信息可以携带在输出数据的统计信息中。During job execution, the master node can schedule worker nodes to process the input data of the upstream stage according to the concurrency of the upstream stage, so that the worker nodes scheduled by the upstream stage can generate the output data of the upstream stage after processing the input data. The output data of the upstream stage can be divided into multiple data partitions (Partitions). The data volume of the data partition of the output data can be used as the data volume distribution information of the output data, that is, the data volume distribution information of the output data can represent the data volume distributed in each data partition of the output data, and the data volume distribution information can be carried in the output data. in the statistics of the data.
在本申请实施例中,上游stage的输出数据的数据量分布信息可以被主节点采集。In this embodiment of the present application, the data volume distribution information of the output data of the upstream stage may be collected by the master node.
在一些实施例中,步骤S313中所指的上游stage可以是首先执行的stage,也可以是非首先执行的stage。需要说明的是,如果上游stage为首先执行的stage,则上游stage可基于初始化配置的并发度,调度工作节点处理数据;如果上游stage为非首先执行的stage,则上游stage可基于本申请实施例提供的方案动态调整并发度之后,再调度工作节点处理数据。In some embodiments, the upstream stage referred to in step S313 may be the stage that is executed first, or the stage that is not executed first. It should be noted that if the upstream stage is the stage that is executed first, the upstream stage can schedule worker nodes to process data based on the concurrency of the initial configuration; if the upstream stage is the stage that is not executed first, the upstream stage can be based on the embodiments of this application. The provided solution dynamically adjusts the concurrency, and then schedules worker nodes to process data.
在步骤S314中,根据直接下游stage的工作节点对应的理想数据量(Ideal DataSize)以及所述数据量分布信息,确定为直接下游stage的工作节点分配的处理数据量。In step S314, according to the ideal data size (Ideal DataSize) corresponding to the work node of the directly downstream stage and the data size distribution information, the processing data size allocated to the work node of the directly downstream stage is determined.
在主节点采集上游stage的输出数据的数据量分布信息之后,主节点可基于该数据量分布信息,为直接下游stage的工作节点分配处理数据量。本申请实施例可在直接下游stage的工作节点分配处理数据量完成后,自动实现配置直接下游stage的并发度。After the master node collects the data volume distribution information of the output data of the upstream stage, the master node can allocate the processing data volume to the worker nodes of the directly downstream stage based on the data volume distribution information. This embodiment of the present application can automatically configure the concurrency of the direct downstream stage after the worker nodes of the direct downstream stage complete the allocation and processing of the data amount.
在一些实施例中,本申请实施例可预先设置直接下游stage的工作节点对应的理想数据量,工作节点的处理数据量如果超过理想数据量,则将超出工作节点的处理负荷,导致工作节点执行失败的可能性增大,因此在为直接下游stage的工作节点分配处理数据时,应使得处理数据的数据量贴近但不超出理想数据量。In some embodiments, the embodiments of the present application may preset the ideal data volume corresponding to the worker nodes of the directly downstream stage. If the processing data volume of the worker nodes exceeds the ideal data volume, the processing load of the worker nodes will be exceeded, causing the worker nodes to execute The probability of failure increases, so when allocating processing data to the worker nodes of the directly downstream stage, the data volume of the processing data should be close to but not exceeding the ideal data volume.
基于所述数据量分布信息中表达了上游stage的数据分区的数据量,主节点在为直接下游stage的工作节点分配处理数据量时,可基于理想数据量以及所述数据量分布信息,确定直接下游stage的各工作节点分配的处理数据量,并使得为工作节点分配的处理数据量不超出所述理想数据量。Based on the data volume distribution information that expresses the data volume of the data partition of the upstream stage, when the master node allocates the processing data volume to the worker nodes of the directly downstream stage, it can determine the direct data volume based on the ideal data volume and the data volume distribution information. The amount of processing data allocated by each worker node of the downstream stage, such that the amount of processing data allocated to the worker nodes does not exceed the ideal amount of data.
在一些实施例中,主节点可将上游stage输出的连续且总数据量不超出理想数据量的数据分区,分配给直接下游stage的一个工作节点,从而使得直接下游stage的工作节点能够分配到连续且总数据量不超出理想数据量的数据分区。在进一步的一些实施例中,主节点在将连续且总数据量不超出理想数据量的多个数据分区,分配给直接下游stage的一个工作节点的基础上,可保障各个直接下游stage分配的处理数据量尽可能的均衡。In some embodiments, the master node can allocate the continuous data partition output by the upstream stage and the total data volume does not exceed the ideal data volume to a worker node of the directly downstream stage, so that the worker node of the directly downstream stage can be allocated to the continuous data partition. and the total data volume does not exceed the ideal data volume. In some further embodiments, the master node can ensure the processing of the allocation of each direct downstream stage on the basis of allocating multiple data partitions with a total data volume that does not exceed the ideal data volume to one worker node of the directly downstream stage The amount of data is as balanced as possible.
在一个示例中,图3C示出了数据分区(Partition)分配给直接下游stage的示例图。如图3C所示,上游stage输出的Partition具有多个,图3C中表示Partition的方框内的数值为Partition的数据量;可以看出,主节点在为直接下游stage分配处理数据量时,是将连续且总数据量不超出理想数据量的数据分区分配给直接下游stage的一个工作节点,并保障每个工作节点的处理数据量趋近于均匀分布。例如,直接下游stage的4工作节点分配的处理数据量分别为15、16、19和10。可以看出,在为直接下游stage的工作节点分配的处理数据量之后,主节点即可完成配置直接下游stage的并发度,例如,图3C中每个工作节点完成处理数据量的分配之后,主节点可确定需要4个工作节点(并发度为4)来处理上游stage的输出数据。通过本申请实施例提供的方式来配置并发,可避免直接下游stage的某一工作节点的处理数据量虽然未超出理想数据量,但相比于其他工作节点过高或过低的情况,从而使得各工作节点的处理数据量能够趋近于均衡、合理。In one example, FIG. 3C shows an example diagram of data partitions (Partitions) assigned to directly downstream stages. As shown in Figure 3C, there are multiple Partitions output by the upstream stage, and the value in the box representing the Partition in Figure 3C is the data volume of the Partition; it can be seen that when the master node allocates the processing data volume for the direct downstream stage, it is Allocate continuous data partitions with a total data volume that does not exceed the ideal data volume to a worker node of the directly downstream stage, and ensure that the processing data volume of each worker node tends to be distributed evenly. For example, the 4 worker nodes of the directly downstream stage are allocated 15, 16, 19 and 10 of the processed data respectively. It can be seen that after the processing data volume allocated to the worker nodes of the direct downstream stage, the master node can complete the configuration of the concurrency of the direct downstream stage. For example, after each worker node in Figure 3C completes the allocation of processing data volume, the master node can The node can determine that 4 worker nodes (with a concurrency of 4) are needed to process the output data of the upstream stage. By configuring concurrency in the manner provided by this embodiment of the present application, it is possible to avoid the situation that although the processing data volume of a certain worker node of the direct downstream stage does not exceed the ideal data volume, it is too high or too low compared with other worker nodes, thus making The amount of data processed by each worker node can be balanced and reasonable.
在步骤S315中,根据直接下游stage中分配处理数据量的工作节点的数量,配置直接下游stage的并发度。In step S315, the concurrency of the directly downstream stage is configured according to the number of worker nodes in the directly downstream stage that are allocated the amount of processing data.
主机节点在确定直接下游stage的工作节点分配的处理数据量之后,即可基于分配处理数据量的工作节点的数量,完成直接下游stage的并发度配置。After the host node determines the amount of processed data allocated by the worker nodes of the direct downstream stage, it can complete the concurrency configuration of the direct downstream stage based on the number of worker nodes that are allocated the amount of processed data.
本申请实施例提供的分布式作业调整方法,可在生成作业的执行计划之后,至少对执行计划中首先执行的stage初始化配置并发度,从而使得作业能够实际执行。在作业执行过程中,主节点可确定执行计划的上游stage的输出数据的数据量分布信息,该数据量分布信息能够表达输出数据对应的多个数据分区的数据量;从而基于直接下游stage的工作节点对应的理想数据量,以及该数据量分布信息,确定为直接下游stage的工作节点分配的处理数据量;进而,主节点可根据直接下游stage中分配处理数据量的工作节点的数量,配置直接下游stage的并发度,并使得直接下游stage中的各工作节点能以不超出理想数据量的数据量来执行作业,降低了直接下游stage中单个工作节点的处理数据量过高而导致执行失败的概率。本申请实施例能够在作业执行过程中,基于上游stage的输出数据的数据量数据量分布信息,以及直接下游stage的工作节点对应的理想数据量,为直接下游stage的工作节点分配处理数据量,并实现自动配置直接下游stage的并发度,达到基于上游stage的输出数据结果,对直接下游stage的并发度进行动态自适应调整的效果,提升了直接下游stage的并发度配置的精确性和合理性,极大的减少了不合理的并发度配置而带来的计算资源浪费情况,能够显著提升分布式系统的性能。In the distributed job adjustment method provided by the embodiment of the present application, after the execution plan of the job is generated, the concurrency degree is initially configured for at least the stage executed first in the execution plan, so that the job can be actually executed. During job execution, the master node can determine the data volume distribution information of the output data of the upstream stage of the execution plan, and the data volume distribution information can express the data volume of multiple data partitions corresponding to the output data; thus, based on the work of the directly downstream stage The ideal data volume corresponding to the node, and the data volume distribution information, determine the processing data volume allocated to the worker nodes of the direct downstream stage; further, the master node can configure the direct The concurrency of the downstream stage enables each worker node in the direct downstream stage to execute the job with a data volume that does not exceed the ideal data volume, reducing the execution failure caused by the excessive processing data volume of a single worker node in the direct downstream stage. probability. In the embodiment of the present application, during the job execution process, based on the data volume data volume distribution information of the output data of the upstream stage, and the ideal data volume corresponding to the work node of the directly downstream stage, the processing data volume can be allocated to the work node of the directly downstream stage, And realize the automatic configuration of the concurrency of the direct downstream stage, to achieve the effect of dynamic adaptive adjustment of the concurrency of the direct downstream stage based on the output data results of the upstream stage, and improve the accuracy and rationality of the concurrency configuration of the direct downstream stage. , which greatly reduces the waste of computing resources caused by unreasonable concurrency configuration, and can significantly improve the performance of distributed systems.
本申请实施例提供的方案可称为基于Partition数据量的Fair-Parallelism(均匀相似)策略。作为可选实现过程,以上游stage实现数据shuffle(分布式作业执行中,对数据记录按照特定Hash函数计算散列值,并发送到对应工作节点的过程)为例,则主节点可获取上游stage的工作节点在数据shuffle之后,输出的Partition的数据量分布信息(每个Partition的数据量)。主节点可以按照Partition聚合来为直接下游stage的工作节点分配处理数据量,例如,通过将连续且总数据量不超过理想数据量的多个Partition分配给直接下游stage的一个工作节点。在上述分配过程中,如果分配给一个工作节点的多个Partition的数据总量达到理想数据量,则自动将后续连续且总数据量不超过理想数据量的Partition分配给下一个工作节点,以此类推,直至上游stage输出的所有Partition分配完毕为止。直接下游stage实际的并发度,可以在上游stage所有的Partition均分配完毕后自动确定,即基于直接下游stage中分配处理数据量的工作节点数量,确定直接下游stage的并发度。本申请实施例可将上游stage输出的Partition尽量均匀的分配给直接下游stage的每个工作节点,实现更为精准、合理的为直接下游stage配置并发度,并且避免了单个工作节点的长尾问题。The solution provided by the embodiment of the present application may be called a Fair-Parallelism (evenly similar) strategy based on the amount of Partition data. As an optional implementation process, take the upstream stage to implement data shuffle (in distributed job execution, the process of calculating the hash value of the data record according to a specific Hash function and sending it to the corresponding worker node) as an example, the master node can obtain the upstream stage. After the data shuffle, the worker node outputs the partition data volume distribution information (the data volume of each partition). The master node can allocate the amount of processing data to the worker nodes of the directly downstream stage according to Partition aggregation, for example, by allocating multiple Partitions that are consecutive and whose total data volume does not exceed the ideal amount of data to a worker node of the directly downstream stage. In the above allocation process, if the total amount of data of multiple Partitions allocated to a worker node reaches the ideal data volume, the subsequent consecutive Partitions whose total data volume does not exceed the ideal data volume are automatically allocated to the next worker node. And so on, until all Partitions output by the upstream stage are allocated. The actual concurrency of the direct downstream stage can be automatically determined after all the partitions in the upstream stage are allocated, that is, the concurrency of the direct downstream stage is determined based on the number of worker nodes allocated to process the data in the direct downstream stage. In this embodiment of the present application, the partition output by the upstream stage can be distributed as evenly as possible to each work node of the direct downstream stage, so as to achieve a more accurate and reasonable configuration of concurrency for the direct downstream stage, and avoid the long tail problem of a single work node. .
需要说明的是,在步骤S315为直接下游stage配置并发度之后,如果直接下游stage还通过连接边输入其他下游stage(即直接下游stage还具有下游stage),则步骤S315当前配置并发度的直接下游stage可成为上游stage,并基于本申请实施例提供的方案,继续为后续的下游stage调整并发度,直至执行计划的各stage均在作业执行过程中完成并发度的调整。It should be noted that, after configuring the concurrency for the direct downstream stage in step S315, if the direct downstream stage also inputs other downstream stages through the connection edge (that is, the direct downstream stage also has downstream stages), then step S315 currently configures the direct downstream of the concurrency. A stage can become an upstream stage, and based on the solution provided by the embodiments of the present application, the concurrency adjustment for subsequent downstream stages continues until each stage of the execution plan completes the adjustment of the concurrency during the job execution process.
在进一步的一些实施例中,本申请实施例可进一步通过技术完善,以减少直接下游stage的工作节点出现数据倾斜的情况,从而满足作业执行的各种数据特点需求,具体来说:In some further embodiments, the embodiments of the present application can be further improved through technology to reduce the situation of data skew in the working nodes of the directly downstream stage, so as to meet various data characteristics requirements of job execution, specifically:
当连续多个Partition的数据量都非常小时,可能会出现多个Partition的数据量加和还小于理想数据量的情况,这种极端情况下可能导致大量的Partition被分配到单个工作节点进行处理,而直接下游stage的工作节点如果顺序操作大量的Partition,这可能导致读写性能的回退。基于此,本申请实施例可以设置单个工作节点允许分配的Partition数量上限,从而在规避工作节点处理的数据量为零的前提下,对于加入工作节点的Partition数量上限进行控制。When the data volume of multiple consecutive Partitions is very small, the sum of the data volume of multiple Partitions may be less than the ideal data volume. In this extreme case, a large number of Partitions may be allocated to a single worker node for processing. However, if the worker nodes of the directly downstream stage operate a large number of Partitions sequentially, this may lead to a fallback in read and write performance. Based on this, the embodiment of the present application can set an upper limit of the number of Partitions allowed to be allocated by a single worker node, so as to control the upper limit of the number of Partitions added to the worker node on the premise that the amount of data processed by the worker node is zero.
当工作节点的运行时间与处理数据的大小的相关性较小,而与数据其它特征(比如数据记录的数目等)相关时,如果根据Partition的数据量大小来为工作节点分配处理数据,则可能将数据量较小但存在大量数据记录的Partition分配到单个工作节点,从而导致工作节点的运行时间被拉长,造成作业执行的瓶颈;此外,还需要考虑工作节点本身涉及的单位数据记录来计算复杂度,这可能与工作节点的算子数目,以及算子特点等因素相关,这些信息也需要作为计算并发度的考虑因素。基于此,在使用Partition的数据大小作为基准,来分配工作节点的处理数据量以外,本申请实施例可进一步结合Partition的Record数量,工作节点的算子数目,以及算子复杂度等特征,对分配给工作节点的处理数据量进行二次调整,并选取在这几个维度上得到的并发度较大的结果,从而完成直接下游stage的最终并发度的配置。基于此,在步骤S315的可选实现中,本申请实施例可进一步根据直接下游stage中分配处理数据量的工作节点的数量,分配给工作节点的Partition的Record数量、工作节点的算子数目、以及算子复杂度,完成直接下游stage的最终并发度配置。When the running time of a worker node has less correlation with the size of the processed data, but is related to other characteristics of the data (such as the number of data records, etc.), if the worker nodes are allocated processing data according to the size of the partition data, it may be Allocate a partition with a small amount of data but a large number of data records to a single worker node, which will prolong the running time of the worker node and cause a bottleneck in job execution; in addition, it is necessary to consider the unit data records involved in the worker node itself to calculate The complexity, which may be related to the number of operators on the worker nodes, as well as the characteristics of the operators and other factors, also need to be taken into account when calculating concurrency. Based on this, in addition to using the data size of the Partition as a benchmark to allocate the processing data volume of the worker nodes, the embodiment of the present application can further combine the number of records of the Partition, the number of operators of the worker nodes, and the complexity of the operators, etc. The amount of processing data allocated to the worker nodes is readjusted, and the results with higher concurrency obtained in these dimensions are selected to complete the configuration of the final concurrency of the direct downstream stage. Based on this, in an optional implementation of step S315, the embodiment of the present application may further according to the number of work nodes allocated to process the amount of data in the direct downstream stage, the number of records of the Partition allocated to the work node, the number of operators of the work node, As well as the operator complexity, the final concurrency configuration of the direct downstream stage is completed.
本申请实施例在动态调整作业执行的并发度时,能够结合多个维度的数据特性,统计计算执行结果,对分布式系统引擎动态执行方式提供了更多的优化方向;本申请实施例应用于实际生产环境中,能够对生产作业整体的执行并发度带来数量级别的降低,显著提升分布式系统中工作节点的运算效率,极大程度上避免计算资源的浪费,以及工作节点频繁调度拉起的消耗。相比于简单直接的Even-Reduction策略,本申请实施例提出的Fair-Parallelism策略,能够有效的避免Even-Reduction策略下可能引入的严重数据倾斜,使得分布式作业中所有工作节点处理的数据量尽可能分布均匀,避免出现突出的长尾或者短板,可以避免长尾成为运行时间的瓶颈,也可以防止工作节点频繁处理较小的数据而导致反复启动工作节点造成的资源浪费。进一步的,通过在合并数据量较小的Partition时,限制连续合并的Partition数量上限,防止Partition过度合并引起下游stage在读数据性能的下降。进一步的,依据统计作业运行期间,每个工作节点生成的详细数据信息(包括数据量以及Record数量),本申请实施例可结合Record数量,算子数目与算子复杂度等信息,来做出更加均衡完善的并发度调整。可见,本申请实施例能够显著提升分布式系统的性能。When dynamically adjusting the concurrency of job execution, the embodiments of the present application can combine the data characteristics of multiple dimensions to calculate the execution results statistically, and provide more optimization directions for the dynamic execution mode of the distributed system engine; the embodiments of the present application are applied to In the actual production environment, it can bring an order of magnitude reduction to the overall execution concurrency of production jobs, significantly improve the computing efficiency of work nodes in a distributed system, and greatly avoid the waste of computing resources and frequent scheduling of work nodes. consumption. Compared with the simple and direct Even-Reduction strategy, the Fair-Parallelism strategy proposed in the embodiment of the present application can effectively avoid the serious data skew that may be introduced under the Even-Reduction strategy, so that the amount of data processed by all working nodes in a distributed job is increased. Distribute as evenly as possible to avoid prominent long tails or short boards, which can prevent long tails from becoming a bottleneck in running time, and can also prevent worker nodes from frequently processing small data and causing waste of resources caused by repeatedly starting worker nodes. Further, when merging Partitions with a small amount of data, the upper limit of the number of consecutively merged Partitions is limited, so as to prevent excessive merging of Partitions from causing degradation of data reading performance of downstream stages. Further, according to the detailed data information (including the amount of data and the number of Records) generated by each work node during the operation of the statistical job, the embodiment of the present application can be combined with information such as the number of Records, the number of operators, and the complexity of operators to make a More balanced and perfect concurrency adjustment. It can be seen that the embodiments of the present application can significantly improve the performance of the distributed system.
基于上游stage的输出数据的统计信息,本申请实施例提供了将多个Partition进行动态合并的方案。同时,对于一个数据量较大的Partition,本申请实施例也提供了Partition的拆分方案。下面对具体实现方案进行详细介绍。Based on the statistical information of the output data of the upstream stage, the embodiments of the present application provide a solution for dynamically merging multiple Partitions. At the same time, for a Partition with a relatively large amount of data, the embodiment of the present application also provides a partitioning solution for the Partition. The specific implementation scheme is described in detail below.
对于数据编排和shuffle而言,有效的数据编排和shuffle,是分布式系统能够水平扩展的重要前提。对于Map-Reduce的执行阶段,高效的数据shuffle就一直是分布式作业执行性能的重要性能指标之一。然而全连接数据shuffle(full-shuffle),只有在数据均匀分布的理想场景下,才能比较高效的执行。而在实际生产作业中,数据的分布往往并不均匀,数据的倾斜特性可能会在full-shuffle模式下被进一步放大,从而导致个别工作节点运行时间的大幅拉长而造成严重长尾。For data orchestration and shuffle, effective data orchestration and shuffle are important prerequisites for the horizontal expansion of distributed systems. For the execution stage of Map-Reduce, efficient data shuffle has always been one of the important performance indicators of the execution performance of distributed jobs. However, full-connection data shuffle (full-shuffle) can only be executed efficiently in the ideal scenario where the data is evenly distributed. In actual production operations, the distribution of data is often uneven, and the skewed characteristics of data may be further amplified in the full-shuffle mode, resulting in a significant increase in the running time of individual worker nodes and serious long tails.
需要说明的是,shuffle这个概念来自于Map-Reduce的计算模型,虽然现代的作业执行框架已经演进到了使用更通用的基于DAG来描述,但是Map-Reduce的运行模式依然是DAG中各个子图的重要描述。而众多更复杂DAG拓扑中,各个连接边上的数据流动,也可以通过各种shuffle模型来描述;例如在分布式执行框架中,DAG的连接边的一个重要物理属性就是边上数据的传输。边上数据的传输不仅可以使用full-shuffle,还允许引入更加动态和智能化的数据编排方式,并以此来解决许多实际场景中full-shuffle面临的问题。It should be noted that the concept of shuffle comes from the calculation model of Map-Reduce. Although the modern job execution framework has evolved to use a more general DAG-based description, the operation mode of Map-Reduce is still the sub-graph in the DAG. Important description. In many more complex DAG topologies, the data flow on each connection edge can also be described by various shuffle models; for example, in a distributed execution framework, an important physical attribute of the connection edge of a DAG is the transmission of data on the edge. The transmission of data on the edge can not only use full-shuffle, but also allow the introduction of a more dynamic and intelligent data arrangement, and use this to solve the problems faced by full-shuffle in many practical scenarios.
以动态分区写入为例,对于一个典型的分布式数仓系统而言,分区表的使用是极为广泛的。将数据写入分区表,通常有静态分区写入和动态分区写入两种方式;对于需要写入分区的值能够事先指定时,直接使用静态写入指定分区是较为简单的方式;对于分区值无法事先判断,尤其是一个查询产出的数据分布在多个分区的时候,则使用动态分区(Dynamic Partition)写入,也就是数据写入Partition的值,将会在作业运行过程中根据具体产出数据来获取。比如在下面的SQL(Structured Query Language,结构化查询语言)语句中,数据写出指定为country这个分区列名,但是具体会写到哪些分区,其对应的分区数值则是在运行中获取的:Taking dynamic partition writing as an example, for a typical distributed data warehouse system, partition tables are widely used. There are usually two ways to write data into the partition table: static partition writing and dynamic partition writing; when the value that needs to be written to the partition can be specified in advance, it is relatively simple to directly use static writing to the specified partition; for the partition value It cannot be judged in advance, especially when the data produced by a query is distributed in multiple partitions, the dynamic partition (Dynamic Partition) is used to write, that is, the value of the data written to the Partition will be written according to the specific output during the job running process. data to obtain. For example, in the following SQL (Structured Query Language) statement, the data is written out as the partition column name designated as country, but which partitions will be written to, and the corresponding partition values are obtained during operation:
INSERT OVERWRITE TABLE Partitioned_sales Partition(country);INSERT OVERWRITE TABLE Partitioned_sales Partition(country);
SELECT country,item,price from sales。SELECT country, item, price from sales.
在分布式系统中,由于分区数目的不确定,数据分布特性的多种多样以及不均匀特性,要实现通用的高效动态分区写入,一直是一个挑战性的问题。对此,分布式系统需要能既避免作业产生严重长尾,同时又不会由于小文件的大量产生而对存储系统带来严重的负担。In a distributed system, due to the uncertainty of the number of partitions, the diverse and uneven characteristics of data distribution, it has always been a challenging problem to achieve general efficient dynamic partition writing. In this regard, the distributed system needs to be able to avoid the serious long tail of the job, and at the same time not bring a serious burden to the storage system due to the large number of small files.
在一些实施例中,主节点可通过单Map stage的执行计划来实现动态分区。在此实现中,主节点生成的执行计划只包含一个Map stage,该一个Map stage可通过一个或多个工作节点执行task。Map stage的每个工作节点读取数据后,可按照分区值(例如上述示例中的country)来写出文件。也就是,一个工作节点如果处理了不同的country值对应的数据,就会产生对应的不同路径的文件。一个Map stage的执行计划简单而直观,但是对于实际大规模分布式系统而言会带来各种隐患,其中最突出的就是小文件碎片化的问题。在一个Map stage的执行计划中,假设Map stage的并发度为M,用户数据中country的取值可能为N个。在数据随机分布的情况下,因为Map stage的每个工作节点独立输出,最终可能造成每个分区都会写出M个文件。所以这种执行方式最终将会可能产生M*N个数据文件,而这其中有可能存在大量的小文件。这些大量碎片化小文件的存在,对于一个分布式系统会带来较大的负面影响:In some embodiments, the master node can implement dynamic partitioning through a single Map stage execution plan. In this implementation, the execution plan generated by the master node contains only one Map stage, which can execute tasks through one or more worker nodes. After each worker node of the map stage reads the data, it can write out the file according to the partition value (such as country in the above example). That is, if a worker node processes data corresponding to different country values, it will generate corresponding files with different paths. The execution plan of a Map stage is simple and intuitive, but it will bring various hidden dangers to the actual large-scale distributed system, the most prominent of which is the fragmentation of small files. In the execution plan of a Map stage, assuming that the concurrency of the Map stage is M, the value of country in the user data may be N. In the case of random distribution of data, because each worker node of the Map stage outputs independently, each partition may eventually write out M files. Therefore, this execution method will eventually generate M*N data files, and there may be a large number of small files among them. The existence of these large numbers of fragmented small files will have a greater negative impact on a distributed system:
对于分布式系统的存储而言,大量小文件的管理,需要消耗庞大的Meta(元)信息;极端情况下,甚至会打爆整个分布式系统的主节点,导致整个分布式系统不可用;同时从存储效率而言,存在碎片化的小文件也会带来较差的存储压缩比,占用更多存储空间;For the storage of distributed systems, the management of a large number of small files needs to consume huge Meta (meta) information; in extreme cases, even the main node of the entire distributed system will be destroyed, making the entire distributed system unavailable; In terms of storage efficiency, the existence of fragmented small files will also bring about a poor storage compression ratio and take up more storage space;
数据生成后总是需要被处理的,上游stage产出小文件后,下游stage的计算消耗会变大,同时实际数据读取效率也会更加低下;After the data is generated, it always needs to be processed. After the upstream stage produces small files, the calculation consumption of the downstream stage will increase, and the actual data reading efficiency will also be lower;
对于Map stage而言,由于要写出M*N的小文件,因此就要创建M*N的写服务(writer),每个writer保留一定的Memory Buffer(内存缓存)进行Encoding(编译)、Compress(压缩)等;而如果Buffer过大,则Map stage使用内存会过大,Buffer过小则导致Encoding和Compress的效果较差。For the Map stage, since it is necessary to write out M*N small files, it is necessary to create an M*N write service (writer), and each writer reserves a certain Memory Buffer (memory cache) for Encoding (compiling), Compress (compression), etc.; and if the Buffer is too large, the memory used by the Map stage will be too large, and if the Buffer is too small, the effects of Encoding and Compress will be poor.
在另一些实施例中,主节点可基于全连接的Reshuffle方式来实现动态分区。为避免产生上述的小文件问题,主节点可在一个执行阶段产生输入数据后,先根据PartitionKey(键)进行一个全连接的Reshuffle,即,把相同Partition的数据聚合到一个工作节点上再写出。通过这种方式保证每个Partition的分区数值只产生一个分区文件。但这种强制限制,在降低了文件数的同时,也会带来另外一个负面效应:数据倾斜。In other embodiments, the master node may implement dynamic partitioning based on a fully connected Reshuffle. In order to avoid the above-mentioned small file problem, the master node can perform a fully connected Reshuffle according to the PartitionKey (key) after generating the input data in an execution stage, that is, aggregate the data of the same Partition to a worker node and then write it out. . In this way, it is ensured that only one partition file is generated for each Partition's partition value. However, this mandatory restriction, while reducing the number of files, will also bring about another negative effect: data skew.
需要说明的是,对于执行阶段产生的数据而言,数据的分布是各种各样的,并且动态分区的数据分布在Query执行前无法获取。比如在上述的SQL语句的示例中,如果对某一平台的用户数据进行不同国家的分区,那么本国分区上的数据毫无疑问会远大于其他分区,这导致本国分区上的数据对应的工作节点产生严重长尾,导致整个作业的运行时间被大幅度拉长。对于倾斜严重的数据,这种长尾可能带来数百甚至上千倍的作业延迟,对于整个分布式系统的资源利用率也有非常坏的影响。It should be noted that for the data generated in the execution phase, the data distribution is various, and the data distribution of the dynamic partition cannot be obtained before the Query is executed. For example, in the example of the above SQL statement, if the user data of a certain platform is partitioned in different countries, the data in the local partition will undoubtedly be much larger than other partitions, which leads to the work node corresponding to the data in the local partition. A serious long tail occurs, causing the running time of the entire job to be greatly elongated. For heavily skewed data, this long tail may bring hundreds or even thousands of times of job delay, and also have a very bad impact on the resource utilization of the entire distributed system.
为了能缓解这个问题,Reshuffle的实现实际上还能通过额外引入了一个Randomshuffle Key(随机shuffle键)来进行优化。比如通过取值在[0,9]的一个Reshuffle key,将数据随机打散到10个分区上来减少数据的倾斜程度。然而这样的解决方案仍然存在一定的问题:在数据倾斜严重的情况下,即使将数据切分成10份,切分后的数据仍然可能还是存在倾斜问题,甚至可能由一个数据长尾情况变成10个程度稍轻的数据长尾情况;对于没有倾斜的数据或者数据量本身很小的分区,如果同样切分成10份,又带来了最终文件数的增加(10*N);分布式系统强行加入Random Key的shuffle方式,可能破坏数据的幂等性,在分布式系统中,如果发生工作节点重跑,存在产生数据正确性的风险。In order to alleviate this problem, the implementation of Reshuffle can actually be optimized by introducing an additional Randomshuffle Key (random shuffle key). For example, through a Reshuffle key with a value of [0,9], the data is randomly scattered into 10 partitions to reduce the skewness of the data. However, such a solution still has certain problems: in the case of serious data skew, even if the data is divided into 10 parts, the data after the division may still have a skew problem, and may even change from a long tail of data to 10 A slightly lighter data long-tail situation; for data that is not skewed or a partition with a small amount of data itself, if it is also divided into 10 parts, it will bring an increase in the number of final files (10*N); distributed systems force Adding Random Key's shuffle method may destroy the idempotency of data. In a distributed system, if the worker node reruns, there is a risk of data correctness.
基于上述情况,本申请实施例依照上游stage(例如Map stage)实时产生的数据分布情况,进行智能的动态数据编排,在保证写出数据均衡性的同时,解决小文件问题,从而克服上述两种方案的缺点。Based on the above situation, the embodiment of the present application performs intelligent dynamic data arrangement according to the data distribution situation generated in real time by the upstream stage (for example, the Map stage), and solves the problem of small files while ensuring the balance of the written data, thereby overcoming the above two kinds of problems. Disadvantages of the program.
首先用图4A来简单描述一下shuffle方式(为了方便理解,这里省略了按照Reduce并发取模进行shuffle的细节),图4A示出了数据shuffle的示例图。如图4A所示,图4A中,相同线性的方框表示Map阶段产生的shuffle数据中相同Partition Key的数据;例如细虚线的方框、细实线的方框、粗虚线的方框、粗实线的方框分别表示相同Partition Key的数据。而方框内的数值则表示对应的数据量。在经过一次shuffle之后,同一Partition Key的数据将交给Reduce阶段的同一个工作节点进行处理。通过图4A可以看出,Reduce阶段的工作节点R#0处理的数据量为8,工作节点R#1处理的数据量为2,工作节点R#2处理的数据量为43,工作节点R#3处理的数据量为16;工作节点R#2处理的数据存在严重的数据倾斜,这导致工作节点R#2在执行Reduce阶段的任务时将存在非常严重的长尾。First, use Figure 4A to briefly describe the shuffle method (for ease of understanding, the details of performing shuffle according to Reduce concurrent modulo are omitted here). Figure 4A shows an example diagram of data shuffle. As shown in Figure 4A, in Figure 4A, the same linear box represents the data of the same Partition Key in the shuffle data generated in the Map stage; The solid-line boxes respectively represent the data of the same Partition Key. The value in the box represents the corresponding amount of data. After a shuffle, the data of the same Partition Key will be handed over to the same worker node in the Reduce phase for processing. As can be seen from Figure 4A, the amount of data processed by worker
对于动态分区插入等场景,同一分区数据只需要保证写到同一目录下,而不需要保证写到同一个数据文件。因此在shuffle过程中,并不需要保证所有相同Partition Key的数据,都交给同一个reduce的工作节点来处理。基于这个特点,本申请实施例可以对Map阶段产生的数据量较大的Partition进行自动拆分,然后交给Reduce阶段的多个工作节点进行处理。For scenarios such as dynamic partition insertion, the data of the same partition only needs to be guaranteed to be written to the same directory, not to the same data file. Therefore, during the shuffle process, it is not necessary to ensure that all data with the same Partition Key are handed over to the same reduce worker node for processing. Based on this feature, the embodiments of the present application can automatically split Partitions with a large amount of data generated in the Map phase, and then hand them over to multiple work nodes in the Reduce phase for processing.
作为可选实现,图4B示出了本申请实施例提供的分布式作业调整方法的再一流程图。该方法流程可由主节点执行实现,参照图4B,该方法流程可以包括如下步骤。As an optional implementation, FIG. 4B shows yet another flowchart of the distributed job adjustment method provided by the embodiment of the present application. The method flow may be implemented by the master node. Referring to FIG. 4B , the method flow may include the following steps.
在步骤S410中,获取用户提交的作业。In step S410, the job submitted by the user is acquired.
在步骤S411中,生成所述作业的执行计划,所述执行计划包括多个stage,所述多个stage包括上游stage以及所述上游stage的直接下游stage。In step S411, an execution plan of the job is generated, and the execution plan includes a plurality of stages, and the plurality of stages include an upstream stage and a stage directly downstream of the upstream stage.
在一些实施例中,上游stage例如Map stage。直接下游stage例如Reduce stage。In some embodiments, the upstream stage is such as the Map stage. Direct downstream stages such as Reduce stage.
在步骤S412中,在作业执行过程中,获取上游stage的输出数据的Statistics,所述Statistics包括:输出数据的Partition的数据量分布式信息。In step S412, during the execution of the job, the Statistics of the output data of the upstream stage is obtained, where the Statistics includes: data volume distribution information of the Partition of the output data.
在一些实施例中,上游stage的工作节点(一个或多个)可在执行task的过程中以及执行结束之后,将输出数据的Statistics上报给主节点。所述Statistics可以包括:输出数据的Partition的数据量分布信息,该数据量分布信息可以指示输出数据的每个Partition的数据量。在进一步的一些实施例中,所述Statistics还可以包括:输出数据的数据量(例如输出数据分别在数据压缩前和压缩后的数据量)、输出数据中每个Partition的Record数量等。In some embodiments, the worker node(s) of the upstream stage may report the Statistics of the output data to the master node during the execution of the task and after the execution ends. The Statistics may include: data volume distribution information of a Partition of the output data, where the data volume distribution information may indicate the data volume of each Partition of the output data. In some further embodiments, the Statistics may further include: the data volume of the output data (for example, the data volume of the output data before and after data compression), the number of records of each Partition in the output data, and the like.
在步骤S413中,根据理想数据量,将上游stage的输出数据中数据量大于理想数据量的Partition进行拆分。In step S413, according to the ideal data amount, the partitions whose data amount is larger than the ideal data amount in the output data of the upstream stage are divided.
本申请实施例可预先设置直接下游stage的工作节点对于Partition的理想数据量。需要说明的是,执行阶段的不同stage作为直接下游stage时,所设置的理想数据量可以不同。主节点在获取上游stage的输出数据的Statistics之后,可确定输出数据中每个Partition的数据量。从而基于直接下游stage的工作节点对于Partition的理想数据量,判断输出数据中是否存在数据量大于理想数据量的Partition;对于输出数据中数据量大于理想数据量的Partition,本申请实施例可对该Partition进行拆分,并使得拆分后的Partition的数据量不大于理想数据量。In this embodiment of the present application, the ideal amount of data for the partition of the work node of the directly downstream stage can be preset. It should be noted that when different stages of the execution stage are used as direct downstream stages, the ideal amount of data set can be different. After obtaining the Statistics of the output data of the upstream stage, the master node can determine the data volume of each Partition in the output data. Therefore, based on the ideal data volume of the Partition by the working node of the direct downstream stage, it is judged whether there is a Partition whose data volume is greater than the ideal data volume in the output data; for a Partition whose data volume in the output data is greater than the ideal data volume, the embodiments of the present application may The partition is split, and the data volume of the split partition is not larger than the ideal data volume.
在一些实施例中,本申请实施例可根据理想数据量,对输出数据中数据量大于理想数据量的Partition进行拆分,使得拆分后的Partition的数据量不大于理想数据量且趋近于均匀分布。In some embodiments, the embodiments of the present application may, according to the ideal data volume, split the partitions whose data volume is greater than the ideal data volume in the output data, so that the data volume of the split Partitions is not greater than the ideal data volume and is close to the ideal data volume. Evenly distributed.
在步骤S414中,将拆分后的Partition分配给直接下游stage,且一个拆分后的Partition配置为由直接下游stage的一个工作节点执行。In step S414, the split Partition is allocated to the directly downstream stage, and one split Partition is configured to be executed by a worker node of the directly downstream stage.
在对输出数据的Partition进行拆分后,本申请实施例可将拆分后的Partition分配给直接下游stage,并且,一个拆分后的Partition配置为由直接下游stage的一个工作节点执行,从而实现为直接下游stage配置需要处理的数据。After the partition of the output data is split, the embodiment of the present application can assign the split partition to the directly downstream stage, and a split partition is configured to be executed by a work node of the directly downstream stage, so as to realize Configure the data to be processed for the immediate downstream stage.
在进一步的一些实施例中,如果输出数据中存在数据量小于理想数据量的Partition,则本申请实施例可将输出数据中总数据量不大于理想数据量的至少两个Partition进行合并,该至少两个Partition中的各个Partition的数据量均小于理想数据量,从而将合并的Partition分配给直接下游stage,且一个合并的Partition配置为由直接下游stage的一个工作节点执行,以实现直接下游stage的计算资源的高效利用。In some further embodiments, if there are Partitions whose data volume is less than the ideal data volume in the output data, in this embodiment of the present application, at least two Partitions whose total data volume is not greater than the ideal data volume in the output data may be merged. The data volume of each Partition in the two Partitions is smaller than the ideal data volume, so that the merged Partition is allocated to the directly downstream stage, and a merged Partition is configured to be executed by a worker node of the directly downstream stage to realize the direct downstream stage. Efficient use of computing resources.
本申请实施例通过在作业执行过程中,获取上游stage的输出数据的Statistics,从而基于Statistics中指示的输出数据的每个Partition的数据量,对数据量大于理想数据量的Partition进行拆分,保障拆分后的Partition的数据量不大于理想数据量。进而将拆分后的Partition分配给直接下游stage,且一个拆分后的Partition配置为由直接下游stage的一个工作节点执行,实现了为直接下游stage的工作节点配置需要处理的数据。本申请实施例可依据上游stage的输出数据的Statistics,对配置给直接下游stage的Partition进行动态调整,保障直接下游stage中每个工作节点处理的一个Partition的数据量不会超过理想数据量,使得直接下游stage的工作节点处理的数据能够趋近于均匀分布,降低了直接下游stage的数据倾斜情况,避免了直接下游stage的个别工作节点由于需要处理较大数据量的Partition,而导致运行时间的大幅拉长的长尾问题。可见,本申请实施例能够使得直接下游stage的工作节点处理的数据趋近于均匀分布,降低直接下游stage的数据倾斜情况和工作节点的长尾问题,显著提升了分布式系统的性能。In this embodiment of the present application, by acquiring the statistics of the output data of the upstream stage during the job execution process, based on the data amount of each partition of the output data indicated in the statistics, the partitions whose data amount is greater than the ideal data amount are divided, ensuring that The data volume of the partitioned partition is not larger than the ideal data volume. Then, the split Partition is assigned to the directly downstream stage, and a split Partition is configured to be executed by a worker node of the directly downstream stage, which realizes the configuration of the data to be processed for the worker node of the directly downstream stage. This embodiment of the present application can dynamically adjust the Partition configured to the directly downstream stage according to the statistics of the output data of the upstream stage, so as to ensure that the data volume of a Partition processed by each worker node in the directly downstream stage does not exceed the ideal data volume, so that the The data processed by the working nodes of the direct downstream stage can be distributed evenly, which reduces the data skew of the direct downstream stage, and avoids the running time caused by the individual working nodes of the direct downstream stage needing to process the partition with a large amount of data. A greatly elongated long tail problem. It can be seen that the embodiment of the present application can make the data processed by the working nodes of the directly downstream stage tend to be evenly distributed, reduce the data skew of the directly downstream stage and the long tail problem of the working nodes, and significantly improve the performance of the distributed system.
在一些实施例中,针对于Map-Reduce阶段,Map可以认为是上游stage,Reduce可以认为是Map的直接下游stage。Map阶段的每个工作节点在运行过程中以及运行结束后,均可将输出的shuffle数据的Statistics上报给执行引擎(执行引擎可以是主节点);例如,Map阶段可能存在一个或多个工作节点来执行task,每个工作节点执行task的过程中以及执行结束之后,均可将输出的shuffle数据的Statistics上报给执行引擎。在一些实施例中,shuffle数据的Statistics例如:shuffle数据在压缩前和压缩后的数据量、shuffle数据的每个Partition的数据量,以及Partition包含的Record数量等。In some embodiments, for the Map-Reduce stage, Map can be considered as an upstream stage, and Reduce can be considered as a direct downstream stage of Map. Each worker node in the Map stage can report the Statistics of the output shuffle data to the execution engine (the execution engine can be the master node) during and after the operation; for example, there may be one or more worker nodes in the Map stage To execute the task, each worker node can report the Statistics of the output shuffle data to the execution engine during and after the execution of the task. In some embodiments, the statistics of the shuffled data are, for example, the data volume of the shuffled data before and after compression, the data volume of each Partition of the shuffled data, and the number of Records included in the Partition, and the like.
在一些实施例中,本申请实施例可定义Reduce阶段的一个Partition对应的理想数据量。对于Map阶段输出的任一Partition而言,如果Partition的数据量大于理想数据量,则本申请实施例可将Partition按照理想数据量进行拆分,并尽量保证Reduce阶段每个工作节点处理的数据量是均匀的;如果Partition的数据量小于理想数据量,则本申请实施例可将数据量小于理想数据量的多个Partition进行合并,并保障合并后的Partition的数据量不大于理想数据量。In some embodiments, the embodiments of the present application may define an ideal amount of data corresponding to one Partition in the Reduce phase. For any Partition output in the Map stage, if the data volume of the Partition is greater than the ideal data volume, the embodiment of the present application can split the Partition according to the ideal data volume, and try to ensure the data volume processed by each worker node in the Reduce phase as much as possible. is uniform; if the data volume of the Partition is less than the ideal data volume, the embodiment of the present application may combine multiple Partitions with the data volume less than the ideal data volume, and ensure that the data volume of the merged Partition is not greater than the ideal data volume.
在一个实现示例中,图4C示出了本申请实施例进行Partition拆分的示例图。结合图4C所示,Map阶段输出的4个shuffle数据可分成4个Partition(P#0、P#1、P#2和P#3),例如,4个shuffle数据中同一线性表达的数据聚合成一个Partition,其中,shuffle数据中的细虚线对应的数据聚合成P#0,细实线对应的数据聚合成P#1,粗虚线对应的数据聚合成P#2,粗实线对应的数据聚合成P#3。如图4C所示,4个Partition的数据量大小依次为8、2、43和16。假设定义Reduce阶段的一个Partition对应的理想数据量为10,则P#0、P#1的数据量均小于10,且P#0、P#1合并后的数据量并不超出理想数据量10,因此P#0、P#1可合并为一个Partition,并分配到Reduce阶段的一个工作节点R#0进行处理。而由于P#2的数据量43大于理想数据量10,则P#2可按照理想数据量尽可能均匀的拆分为多个Partition,且拆分的每个Partition的数据量不大于理想数据量,如图4C所示,P#2可按照理想数据量尽可能的拆分为5个Partition,并分配给Reduce阶段的5个工作节点R#1至R#5进行处理,其中,R#1至R#4每个分配数据量为9的Partition,R#5分配数据量为7的Partition。同理,P#3的数据量由于大于理想数据量,因此被均分拆分为2个Partition,并分配到Reduce阶段的2个工作节点R#6和R#7进行处理。In an implementation example, FIG. 4C shows an example diagram of partition splitting in an embodiment of the present application. As shown in Figure 4C, the 4 shuffle data output in the Map stage can be divided into 4 Partitions (
上述对于Map阶段大于理想数据量的Partition进行拆分,小于理想数据量的Partition进行合并的机制可称为Adaptive(自适应)shuffle机制。在进一步的一些实施例中,基于Adaptive shuffle机制,Reduce阶段最终写出的文件数将主要依赖于输入的Partition数据量的大小,以及理想数据量的大小配置。假设输入Reduce阶段的理想Partition数量(Ideal Parallelism)定义为:shuffle数据总量除以理想数据量;则经过本申请实施例的Adaptive shuffle机制之后,Reduce阶段最终生成的文件数的最大值为Ideal Parallelism加上N。因此,如果要保证每个有输出数据的Partition上至少存在一个数据文件的话,只有数据量大于理想数据量的Partition经过拆分后会使文件数有所增加。但这种拆分产生的文件大小均为Partition的量级,而不是碎片化的小文件。基于此,在理想数据量的大小配置合理的情况下,无论产生多少文件,对于分布式系统都是合理的;即只要文件大小合适,海量的数据可以通过多一点的文件来存储。The above-mentioned mechanism for splitting Partitions with a larger than ideal amount of data in the Map stage and merging Partitions with a smaller amount of data in the Map stage may be called an Adaptive shuffle mechanism. In some further embodiments, based on the Adaptive shuffle mechanism, the number of files finally written in the Reduce phase will mainly depend on the size of the input partition data volume and the size configuration of the ideal data volume. Assuming that the ideal number of partitions (Ideal Parallelism) in the input Reduce phase is defined as: the total amount of shuffled data divided by the ideal amount of data; then after the Adaptive shuffle mechanism of the embodiment of the present application, the maximum number of files finally generated in the Reduce phase is Ideal Parallelism Add N. Therefore, if you want to ensure that at least one data file exists on each Partition with output data, only the Partition with a larger data volume than the ideal data volume will increase the number of files after splitting. However, the file sizes generated by this split are all on the order of Partition, rather than fragmented small files. Based on this, if the size of the ideal data volume is reasonably configured, no matter how many files are generated, it is reasonable for a distributed system; that is, as long as the file size is appropriate, massive data can be stored by a little more files.
也就是说,基于Adaptive shuffle机制,Reduce阶段的每个工作节点处理的数据量,不会超过给定的理想数据量,同时每个小于理想数据量的Partition均可在Reduce阶段由一个工作节点输出一个文件。这就从根本上,同时解决了大量小文件产生以及可能的数据倾斜问题,能够比较好的解决动态分区场景上面临的两难问题。此外还要说明的是,使用Adaptive shuffle机制,能够避免需要添加额外的Random shuffle Key来减少数据倾斜的情况。由于本申请实施例提供的Adaptive shuffle机制在整个过程是确定性,可重入的,因此在不稳定的分布式系统环境上,能从根本上保证在各种重试发生时,输出数据的正确性。That is to say, based on the Adaptive shuffle mechanism, the amount of data processed by each worker node in the Reduce phase will not exceed the given ideal amount of data, and each Partition smaller than the ideal amount of data can be output by a worker node in the Reduce phase a file. This fundamentally solves the problem of generating a large number of small files and possible data skew, and can better solve the dilemma faced in dynamic partition scenarios. In addition, it should be noted that using the Adaptive shuffle mechanism can avoid the need to add an additional Random shuffle Key to reduce data skew. Since the Adaptive shuffle mechanism provided by the embodiments of this application is deterministic and reentrant in the whole process, in an unstable distributed system environment, it can fundamentally ensure that the output data is correct when various retries occur. sex.
本申请实施例通过结合执行引擎(主节点)提供的作业运行期间的统计信息收集能力,以及多作业执行图和shuffle模式的动态调整能力,实现了自适应的数据编排;能够基于上游stage的输出数据的数据特性,来对输出数据进行智能化的分配与编排,包括倾斜的数据分区的自动拆分,以及多个数据量较大的小分区的合并,从根本上解决了在数据shuffle过程中可能带来的数据倾斜以及工作节点的长尾问题,同时避免了其他无shuffle方案下的数据碎片化问题,能够显著的提升分布式系统的性能。The embodiment of the present application realizes adaptive data arrangement by combining the statistical information collection capability during job running provided by the execution engine (master node) and the dynamic adjustment capability of the multi-job execution graph and shuffle mode; it can be based on the output of the upstream stage. The data characteristics of the data allow for intelligent allocation and arrangement of output data, including automatic splitting of slanted data partitions, and merging of multiple small partitions with a large amount of data, which fundamentally solves the problem of data shuffle during the process of data shuffling. It may bring about data skew and long-tail problems of worker nodes, while avoiding the data fragmentation problem in other non-shuffle solutions, which can significantly improve the performance of distributed systems.
除了对Partition合并和拆分的通用优化之外,本申请实施例针对特定的场景进行了专门性的优化。下面以Join场景为例,对基于上游stage的输出数据的统计信息,将上游stage输出的Partition拆分后,分配给直接下游stage来进行Join操作的实现方案进行介绍。In addition to the general optimization of Partition merging and splitting, the embodiments of the present application perform specific optimizations for specific scenarios. Taking the Join scenario as an example, the implementation scheme of the Join operation based on the output data of the upstream stage is introduced.
在分布式系统的作业执行过程中,Join操作是最常见、同时也是处理复杂度较高的操作之一。由于其涉及到多路数据的计算和处理,除了分布式系统中普遍需要解决的挑战之外,不同路数据在Join算子上发生交互,也衍生了更多数据处理的场景。然而,数据分配不均匀等情况,将导致Join操作出现数据倾斜和长尾问题,这成为分布式系统中比较常见且一直没有被系统化解决的问题。In the job execution process of a distributed system, the Join operation is one of the most common and complex operations. Since it involves the calculation and processing of multi-channel data, in addition to the challenges that generally need to be solved in distributed systems, the interaction of different channels of data on the Join operator also generates more data processing scenarios. However, uneven data distribution will lead to data skew and long tail problems in Join operations, which have become common problems in distributed systems and have not been systematically solved.
图5A示例性的示出了分布式SQL的Join过程的示例图。如图5A所示,上游stage提供给下游stage的两路输入数据(如图5A所示M1和M2)在下游stage进行Join操作,其中,M1和M2的两路输入数据可按照Partition写出,并且按照分区编号被shuffle到J3不同的工作节点上去实现数据Join。其中,中间数据会按照分区编排,被保存在物理介质上。FIG. 5A exemplarily shows an example diagram of the Join process of distributed SQL. As shown in Figure 5A, the two input data provided by the upstream stage to the downstream stage (M1 and M2 shown in Figure 5A) are joined in the downstream stage, wherein the two input data of M1 and M2 can be written according to Partition, And according to the partition number, it is shuffled to different worker nodes of J3 to realize data join. Among them, the intermediate data will be arranged according to the partition and stored on the physical medium.
图5A展示的是上游stage的Partition数据分布比较均匀的情况,但是在实际的query(查询)和数据处理中,上游stage的Partition数据的分布很可能存在倾斜。图5B示出了分布式SQL的Join过程的另一示例图。如图5B所示,M1提供给J3的输入数据中的Partition1(分区编号为1的Partition)存在严重的数据倾斜,同时,M2提供给J3的输入数据中Partition1存在轻微的数据倾斜,在这种情况下,J3对Partition1进行Join操作时,J3的工作节点将存在严重的长尾,甚至由于内存超限而导致作业失败。Figure 5A shows that the partition data distribution of the upstream stage is relatively uniform, but in the actual query (query) and data processing, the distribution of the partition data of the upstream stage is likely to be skewed. FIG. 5B shows another example diagram of the Join process of distributed SQL. As shown in Figure 5B, Partition1 (Partition with partition number 1) in the input data provided by M1 to J3 has serious data skew, and at the same time, Partition1 in the input data provided by M2 to J3 has a slight data skew. In this case, when J3 performs the Join operation on Partition1, there will be a serious long tail on the J3 worker node, and the job may even fail due to memory overrun.
为解决上述问题,图5C示出了本申请实施例提供的分布式作业调整方法的又一流程图。该方法流程可由主节点执行实现,参照图5C,该方法流程可以包括如下步骤。To solve the above problem, FIG. 5C shows another flowchart of the distributed job adjustment method provided by the embodiment of the present application. The method flow may be implemented by the master node. Referring to FIG. 5C , the method flow may include the following steps.
在步骤S510中,获取用户提交的作业。In step S510, the job submitted by the user is acquired.
在步骤S511中,生成所述作业的执行计划,所述执行计划包括多个stage,所述多个stage包括Join stage以及Join stage的直接上游stage,所述直接上游stage向Joinstage提供多路输入数据;其中,一路输入数据包括多个Partition。In step S511, an execution plan of the job is generated, the execution plan includes multiple stages, the multiple stages include a Join stage and a direct upstream stage of the Join stage, and the direct upstream stage provides multiple input data to the Join stage ; Among them, one input data includes multiple Partitions.
在Join操作的情况下,Join stage作为某stage的直接下游stage,可输入该stage提供的多路输入数据,一路输入数据可以包括多个Partition。In the case of Join operation, the Join stage, as the direct downstream stage of a stage, can input multiple input data provided by the stage, and one input data can include multiple Partitions.
在步骤S512中,在作业执行过程中,若任一路输入数据中存在数据倾斜的目标Partition,将所述目标Partition拆分为多个Partition,将所述多个子Partition分配给Join stage的多个工作节点。In step S512, during the job execution process, if there is a target Partition with skewed data in any input data, the target Partition is split into multiple Partitions, and the multiple sub-Partitions are allocated to multiple jobs of the Join stage node.
在作业执行过程中,主节点可获取任一stage的输出数据的Statistics,基于Statistics,如果直接上游stage提供给Join stage的任一路输入数据中存在数据倾斜的Partition(为便于说明,存在数据倾斜的Partition可称为目标Partition),则本申请实施例可将目标Parititio拆分为多个Partition,并将多个Partition分配给Join stage的多个工作节点。During the job execution process, the master node can obtain the statistics of the output data of any stage. Based on the statistics, if any input data provided by the upstream stage directly to the Join stage has a data-sloped Partition (for the convenience of explanation, there is a data-sloped partition). The Partition may be referred to as a target Partition), and in this embodiment of the present application, the target Partition may be split into multiple Partitions, and the multiple Partitions may be allocated to multiple work nodes of the Join stage.
在一些实施例中,本申请实施例可针对任一路输入数据中的Partition,判断Partition的数据量是否大于第一数据量阈值,如果Partition的数据量大于第一数据量阈值,则确定Partition为存在数据倾斜的目标Partition。In some embodiments, the embodiments of the present application can determine whether the data volume of the Partition is greater than the first data volume threshold for any partition in the input data, and if the data volume of the Partition is greater than the first data volume threshold, it is determined that the Partition exists The target Partition for the data skew.
在一些实施例中,在目标Partition进行拆分时,本申请实施例可基于第二数据量阈值,对目标Partition进行拆分,以使得目标Partition拆分后的多个子Partition的数据量均匀分布并且每个子Partition的数据量不大于第二数据量阈值。在进一步的一些实施例中,第二数据量阈值小于第一数据量阈值,第一数据量阈值和第二数据量阈值的具体数值可根据实际情况定义,本申请实施例并不设限。在一种实现示例中,第二数据量阈值可以例如前文描述的理想数据量。In some embodiments, when the target Partition is split, the embodiments of the present application may split the target Partition based on the second data volume threshold, so that the data volume of the multiple sub-Partitions after the split of the target Partition is evenly distributed and The data volume of each sub-Partition is not greater than the second data volume threshold. In some further embodiments, the second data volume threshold is smaller than the first data volume threshold, and specific values of the first data volume threshold and the second data volume threshold may be defined according to actual conditions, which are not limited in the embodiments of the present application. In an implementation example, the second data volume threshold may be, for example, the ideal data volume described above.
在一些实施例中,将目标Partition拆分的多个子Partition,分配给Join stage的多个工作节点时,一个Partition可分配给Join stage的一个工作节点。In some embodiments, when multiple sub-Partitions split into a target Partition are allocated to multiple work nodes of the Join stage, one Partition may be allocated to one work node of the Join stage.
在步骤S513中,将其他路输入数据中与所述子Partition属于相同分区编号的Partition,广播到所述子Partition分配的工作节点。In step S513, the Partition belonging to the same partition number as the sub-Partition in the input data of other channels is broadcasted to the working node allocated by the sub-Partition.
为在Join stage能够实现正确的Join操作,本申请实施例在将某路输入数据的目标Partition拆分为多个子Partition,并将子Partition分配到Join stage的工作节点之后,针对目标Partition拆分后的各个子Partition,本申请实施例可确定其他路输入数据(多路输入数据中与该某路输入数据不同路的输入数据)中,与子Partition属于相同分区编号的Partition,从而将所确定的Partition广播到子Partition分配的工作节点,以使得子Partition与其他路输入数据中属于相同分区编号的Partition能够在Join stage进行正确Join。In order to realize the correct Join operation in the Join stage, in this embodiment of the present application, the target Partition of a certain channel of input data is split into multiple sub-Partitions, and after the sub-Partitions are allocated to the work nodes of the Join stage, the target Partition is split for the target Partition. Each sub-Partition of the sub-Partition, the embodiment of the present application can determine that in the input data of other channels (input data of a different channel from the input data of the certain channel in the multi-channel input data), and the sub-Partition belong to the Partition of the same partition number, so that the determined Partitions are broadcast to work nodes assigned by sub-Partitions, so that Partitions belonging to the same partition number in the sub-Partition and other input data can be correctly joined in the Join stage.
在一些实施例中,直接上游stage提供的多路输入数据可以至少包括第一路输入数据和第二路输入数据,第一路输入数据和第二路输入数据可以是多路输入数据中的任意两路输入数据,多路输入数据的路数可以大于二,而不限于只有两路输入数据。在此情况下,多路输入数据中存在目标Partition可能存在如下几种情况。In some embodiments, the multiplex input data provided by the directly upstream stage may include at least the first channel input data and the second channel input data, and the first channel input data and the second channel input data may be any of the multiplex input data. Two-way input data, the number of multiple-way input data can be more than two, not limited to only two-way input data. In this case, there may be the following situations when the target Partition exists in the multiplexed input data.
第一种情况,多路输入数据中有一路输入数据存在一个目标Partition;在第一种情况下,以多路输入数据中的第一路输入数据中存在一个目标Partition为例,主节点可将第一路输入数据中的一个目标Partition拆分为多个子Partition,并将各子Partition分配给Join stage的工作节点;而其他路输入数据(多路输入数据中除第一路输入数据外的输入数据)中与目标Partition属于相同分区编号的Partition,将被广播到各子Partition分配的工作节点。In the first case, there is a target Partition in one of the input data in the multi-channel input data; One target Partition in the first input data is divided into multiple sub-Partitions, and each sub-Partition is assigned to the work node of the Join stage; while other input data (inputs other than the first input data in the multiple input data) The Partition that belongs to the same partition number as the target Partition in the data) will be broadcast to the worker nodes allocated by each sub-Partition.
作为一个实现示例,针对于上述第一种情况,图5D示出了Join过程的再一示例图。如图5D所示,图5D展示了两路Join的输入(M1和M2),且其中一路存在数据倾斜的场景。假设执行Join的工作节点上不需要额外维护Partition、Sort等特性的算子,则主节点在统计了M1和M2输出的各个Partition的数据量后,可判断M1输出的Partition1为存在数据倾斜的目标Partition,从而将M1输出的Partition1拆分成多个子Partition。图5D中,M1的Partition1所在方框中的一个小方框可表示拆分的一个子Partition。M1的Partition1拆分成的多个子Partition,可分别分配到Join阶段的多个工作节点上进行Join处理。同时,为了在Join阶段,能够对于拆分后的子Partition进行正确的Join操作,主节点需将M2中与目标Partition相同分区编号的Partition,分别广播给各子Partition所分配的工作节点;例如,主节点需将M2所产生的Partition1广播给M1的各子Partition分配的工作节点。通过向Join阶段的多个工作节点广播M1和M2相同分区编号的Partition,能够在目标Partition拆分为多个子Partition的情况下,保障相同分区编号的数据可以被正确的Join并产出结果。As an implementation example, for the above-mentioned first case, FIG. 5D shows yet another example diagram of the Join process. As shown in FIG. 5D , FIG. 5D shows the input of two channels of Join (M1 and M2), and one of them has a scenario where the data is skewed. Assuming that there is no need for additional operators to maintain features such as Partition and Sort on the worker node that executes Join, the master node can determine that Partition1 output by M1 is the target with data skew after counting the data volume of each Partition output by M1 and M2. Partition, thereby splitting Partition1 output by M1 into multiple sub-Partitions. In FIG. 5D , a small box in the box where Partition1 of M1 is located may represent a split sub-Partition. Partition1 of M1 is divided into multiple sub-Partitions, which can be allocated to multiple work nodes in the Join phase for Join processing. At the same time, in order to perform the correct Join operation on the split sub-Partition in the Join phase, the master node needs to broadcast the Partition with the same partition number as the target Partition in M2 to the worker nodes assigned by each sub-Partition; for example, The master node needs to broadcast the Partition1 generated by M2 to the working nodes allocated by each sub-Partition of M1. By broadcasting Partitions with the same partition numbers of M1 and M2 to multiple worker nodes in the Join phase, when the target Partition is split into multiple sub-Partitions, it is possible to ensure that the data with the same partition number can be joined correctly and the result can be produced.
第二种情况,多路输入数据中有一路输入数据存在多个目标Partition;在第二种情况下,以多路输入数据中的第一路输入数据存在多个目标Partition为例,针对第一路输入数据中每个目标Partition的处理与第一种情况同理。在一些实施例中,针对第一路输入数据中的每个目标Partition,主节点可将目标Partition拆分为多个子Partition,并将目标Partition的子Partition分配给Join stage的工作节点,同时,其他路输入数据中与目标Partition属于相同分区编号的Partition,将被广播到各子Partition分配的工作节点。只不过在第一路输入数据存在多个目标Partition的情况下,每个目标Partition均需要按照上述方式进行处理。In the second case, there are multiple target Partitions in one input data in the multi-channel input data; The processing of each target Partition in the input data is the same as the first case. In some embodiments, for each target Partition in the first input data, the master node may split the target Partition into multiple sub-Partitions, and assign the sub-Partitions of the target Partition to the work nodes of the Join stage, while other Partitions belonging to the same partition number as the target Partition in the input data will be broadcast to the work nodes allocated by each sub-Partition. However, in the case where there are multiple target Partitions in the first input data, each target Partition needs to be processed in the above manner.
也就是说,若第一路输入数据中存在一个目标Partition或多个目标Partition,则针对任一个目标Partition,本申请实施例可将该目标Partition拆分为多个子Partition,并将该目标Partition的各子Partition分配给Join stage的工作节点,同时,将其他路输入数据中与该目标Partition属于相同分区编号的Partition,广播到该目标Partition的各子Partition分配的工作节点。That is, if there is one target Partition or multiple target Partitions in the first input data, for any target Partition, the embodiment of the present application may split the target Partition into multiple sub-Partitions, and divide the target Partition into multiple sub-Partitions. Each sub-Partition is allocated to the work node of the Join stage, and at the same time, the Partition belonging to the same partition number as the target Partition in the input data of other channels is broadcast to the work node allocated by each sub-Partition of the target Partition.
进一步结合图5D所示,如果Join的单路输入中多个Parition均发生数据倾斜,则可对发生数据倾斜的每个Parition均进行拆分,并向Join阶段的工作节点广播另一路相同分区编号的Parition,以实现在Join阶段将拆分后的子Parition与另一路相同分区编号的数据进行正确Join。例如,假设图5D中,M1输出的Partition3也存在数据倾斜,则M1输出的Partition3也可拆分成多个子Partition,并分别分配到Join阶段的多个工作节点,同时,M2中的Partition3会被分别广播到M1的Partition3对应的子Partition分配的工作节点。Further combined as shown in Figure 5D, if data skew occurs in multiple partitions in the single input of Join, each partition with data skew can be split, and the same partition number of another channel can be broadcast to the worker nodes in the Join phase. In the Join phase, the split sub-Parition is correctly joined with the data of the same partition number in another way. For example, assuming that in Figure 5D, Partition3 output by M1 also has data skew, then Partition3 output by M1 can also be split into multiple sub-Partitions and assigned to multiple work nodes in the Join phase. At the same time, Partition3 in M2 will be Broadcast to the worker nodes assigned by the sub-Partitions corresponding to Partition3 of M1 respectively.
第三种情况,多路输入数据中有至少两路输入数据均存在一个目标Partition;在第三种情况下,以第一路输入数据和第二路输入数据均存在一个目标Partition为例,主节点可将第一路输入数据中的一个目标Partition拆分为多个子Partition,并将拆分的各子Partition分配给Join stage的工作节点;同时,主节点可将第二路输入数据中的一个目标Partition拆分为多个子Partition,并将拆分的各子Partition分配给Join stage的工作节点。针对于第一路输入数据和第二路输出中相同分区编号的子Partition,需要分别将子Partition广播到相同分区编号的其他路子Partition所分配的工作节点。例如,将第一路输入数据的子Partition,广播到第二路输入数据的相同分区编号的子Partition所分配的工作节点,将第二路输入数据的子Partition,广播到第一路输入数据的相同分区编号的子Partition所分配的工作节点。In the third case, at least two input data in the multi-channel input data have a target Partition; The node can split a target Partition in the first input data into multiple sub-Partitions, and assign each split sub-Partition to the work node of the Join stage; at the same time, the master node can split one of the second input data The target Partition is split into multiple sub-Partitions, and each split sub-Partition is assigned to the work node of the Join stage. For the sub-partitions with the same partition number in the first channel of input data and the second channel of output, it is necessary to broadcast the sub-partitions to the work nodes allocated by other sub-partitions of the same partition number. For example, broadcast the sub-Partition of the first input data to the working node assigned by the sub-Partition with the same partition number of the second input data, and broadcast the sub-Partition of the second input data to the first input data. Worker nodes assigned to child Partitions with the same partition number.
图5E示出了Join过程的又一示例图。如图5E所示,M1和M2的Partition1均存在数据倾斜,左路M1输出的Partition1需要被拆分成多个子Partition,右路M2输出的Partition1需要被拆分成多个子Partition,则M1的Partition1拆分的多个子Partition会被分配到Join阶段的多个工作节点,M2的Partition1拆分的多个子Partition也会被分配到Join阶段的多个工作节点,且一个子Partition分配到一个工作节点。图5E中M1、M2的Partition1所在方框中的一个小方框可表示拆分的一个子Partition。为保障能够对M1和M2相同分区编号的子Partition进行正确Join,主节点可将M1的Partition1拆分的多个子Partition,广播到M2的相同分区编号的子Partition所分配的工作节点,同理,主节点可将M2的Partition1拆分的多个子Partition,广播到M1的相同分区编号的子Partition所分配的工作节点。Figure 5E shows yet another example diagram of the Join process. As shown in Figure 5E, both Partition1 of M1 and M2 have data skew. The Partition1 output by the left channel M1 needs to be split into multiple sub-Partitions, and the Partition1 output by the right channel M2 needs to be split into multiple sub-Partitions. Then the Partition1 of M1 needs to be split into multiple sub-Partitions. Multiple sub-partitions that are split will be assigned to multiple work nodes in the Join stage, and multiple sub-Partitions split by Partition1 of M2 will also be assigned to multiple work nodes in the Join stage, and one sub-Partition is assigned to one work node. In FIG. 5E , a small box in the box where Partition1 of M1 and M2 is located may represent a sub-Partition that is split. In order to ensure that the sub-Partitions with the same partition number of M1 and M2 can be correctly joined, the master node can broadcast the multiple sub-Partitions split by Partition1 of M1 to the work nodes allocated by the sub-Partitions of the same partition number of M2. Similarly, The master node can broadcast the multiple sub-Partitions split by Partition1 of M2 to the worker nodes allocated by the sub-Partitions of the same partition number of M1.
第四种情况,多路输入数据中有至少两路输入数据均存在多个目标Partition;在第四种情况下,以第一路输入数据和第二路输入数据均存在多个目标Partition为例,针对第一路输入数据和第二路输入数据中一个相同的目标Partition的处理过程与第三种情况同理。只不过在第一路输入数据和第二路输入数据均存在多个目标Partition的情况下,第一路输入数据和第二路输入数据每个相同的目标Partition均需要按照上述方式进行处理。In the fourth case, at least two input data in the multi-channel input data have multiple target Partitions; in the fourth case, take the first input data and the second input data as an example where there are multiple target Partitions , the processing process for the same target Partition in the first input data and the second input data is the same as the third case. However, in the case where multiple target Partitions exist in both the first input data and the second input data, each same target partition of the first input data and the second input data needs to be processed in the above-mentioned manner.
上述示例是以Join阶段的工作节点主要实现单纯的Join连接,而不包含数据sorted(排序)/Partitioned(分片)属性的算子。而在实际作业中,作为SQL语法中最为复杂和灵活的Join操作,其工作节点是有可能包含各种各样的算子。在这种情况下,对于输入Join的Partition发生数据倾斜时,除需要利用上述示例的方式将Partition进行拆分外,还需在完成数据Join之后加入一个union(联合)操作,从而实现将拆分后的子Partition在Join之后重新收拢起来。在数据的特殊属性需要得到保留(比如数据不是直接落盘,而是还有下游操作/stage)的时候,需要通过union操作来保证后续执行的正确性。In the above example, the worker nodes in the Join phase mainly implement pure Join connections, and do not include operators with data sorted (sorted)/Partitioned (sharded) attributes. In practice, as the most complex and flexible Join operation in SQL syntax, its work node may contain various operators. In this case, when the data of the input Join is skewed, in addition to using the above example to split the Partition, it is also necessary to add a union (union) operation after completing the data Join, so as to realize the split The subsequent sub-Partitions are re-folded after the Join. When the special properties of the data need to be preserved (for example, the data is not directly placed on the disk, but there are downstream operations/stages), the union operation needs to be used to ensure the correctness of subsequent executions.
图5F为在图5D基础上进一步示出的union操作的示例图。结合图5D和图5F所示,在Join的单路输入M1的Parition1发生数据倾斜时,除利用图5D所示过程实现对M1的Parition1对应的子Parition,和M2的Parition1进行Join操作外,还需将各子Parition的Join操作结果进行联合,从而生成一个新的Parition1。FIG. 5F is an example diagram of a union operation further shown on the basis of FIG. 5D . As shown in FIG. 5D and FIG. 5F , when the data skew occurs in the Parition1 of the single input M1 of Join, in addition to using the process shown in FIG. 5D to implement the Join operation on the sub-Parition corresponding to the Parition1 of M1 and the Parition1 of M2, the The Join operation results of each sub-Parition need to be combined to generate a new Partition1.
图5G为在图5E基础上进一步示出的union操作的示例图。在Join的多路输入M1和M2的Partition1均存在数据倾斜时,除利用图5E所示过程实现M1的Parition1的子Parition,和M2的Parition1的相同分区编号的子Parition进行Join操作外,还需将各子Parition的Join操作结果进行联合,从而生成一个新的Parition1。FIG. 5G is an example diagram of a union operation further shown on the basis of FIG. 5E . When both the multiple-input M1 of the Join and the Partition1 of the M2 have data skew, in addition to using the process shown in FIG. 5E to realize the sub-Parition of the Partition1 of M1, and to perform the Join operation with the sub-Parition of the same partition number of the Partition1 of the M2, it is necessary to perform the Join operation. Combine the Join operation results of each sub-Parition to generate a new Parition1.
也就是说,主节点还需要配置将属于相同数据分区的Join操作结果进行联合,所述相同数据分区的Join操作结果包括:一路输入数据的子数据分区与其他路输入数据中相同分区编号的数据分区的Join操作结果,或者,一路输入数据的子数据分区与其他路输入数据中相同分区编号的子数据分区的Join操作结果。That is to say, the master node also needs to be configured to combine the Join operation results belonging to the same data partition. The Join operation results of the same data partition include: the sub-data partition of one input data and the data of the same partition number in the other input data The join operation result of the partition, or the join operation result of the sub-data partition of one input data and the sub-data partition of the same partition number in the other input data.
本申请实施例针对Join操作的输入数据的倾斜场景,能够在Join输入的多路或单路存在倾斜的Parition时,进行自适应的拆分与分配,实现Join操作的输入数据分布均匀,并保障新的shuffle模式下,Join操作的正确性。相比通过人工调整作业数据处理逻辑的方式,以及对数据的预处理等需要大量终端用户介入的操作,本申请实施例能够使得Join操作的输入数据的倾斜处理具有自适应性以及普适性。尤其是在主节点收集各stage的输出数据的统计信息的情况下,本申请实施例使得数据倾斜的调整能够自动化执行,而不需要用户手动调整。本申请实施例提供方案的自适应性使得其能够在作业运行期间,根据实际Join输入的数据特点作出动态决策,无需终端用户感知和参与,并且能够使得数据在多个分布式的工作节点间均匀分布,对于存在数据倾斜的作业能起到显著的加速作用,实现分布式系统性能的显著提升。The embodiments of the present application can perform adaptive splitting and allocation when there is a skewed partition in the multi-channel or single-channel input of the Join operation, so as to realize the uniform distribution of the input data of the Join operation, and ensure that the input data of the Join operation is evenly distributed. The correctness of the Join operation in the new shuffle mode. Compared with manual adjustment of job data processing logic and operations that require a large number of end-user interventions, such as data preprocessing, the embodiment of the present application can make the tilt processing of input data of the Join operation adaptive and universal. Especially in the case where the master node collects the statistical information of the output data of each stage, the embodiment of the present application enables the adjustment of the data inclination to be performed automatically without requiring manual adjustment by the user. The self-adaptability of the solution provided by the embodiments of the present application enables it to make dynamic decisions according to the data characteristics of the actual Join input during the operation of the job, without the need for end-user perception and participation, and enables the data to be evenly distributed among multiple distributed work nodes. Distribution can significantly accelerate jobs with skewed data and significantly improve the performance of distributed systems.
除了针对Partition的动态调整之外,根据上游stage的统计信息,本申请实施例可以自适应的选择后续执行的执行路径。下面对具体实现方案进行介绍。In addition to the dynamic adjustment for Partition, according to the statistical information of the upstream stage, the embodiment of the present application can adaptively select an execution path for subsequent execution. The specific implementation scheme is introduced below.
本申请实施例可在作业执行过程中对执行计划的逻辑图进行动态调整(即在作业执行过程中对执行计划的逻辑进行动态调整)。需要说明的是,对于逻辑图的选择可能与数据分布和特性相关,而这些数据特性只能在作业执行过程中才能准确获得。不同的数据特性,可能需要配置不同逻辑的执行计划才能有效、准确的实现。因此针对静态执行计划而言,如果执行计划一旦确定且无法在作业的执行过程中动态调整,那么无疑无法对执行计划的逻辑实现合理、准确的配置。This embodiment of the present application can dynamically adjust the logic diagram of the execution plan during the job execution process (that is, dynamically adjust the logic of the execution plan during the job execution process). It should be noted that the choice of logic diagram may be related to data distribution and characteristics, and these data characteristics can only be accurately obtained during job execution. Different data characteristics may require different logic execution plans to be implemented effectively and accurately. Therefore, for the static execution plan, if the execution plan is determined and cannot be dynamically adjusted during the execution of the job, it is undoubtedly impossible to achieve a reasonable and accurate configuration of the logic of the execution plan.
以分布式SQL中的Join操作为例,要实现分布式作业的Join,有多种逻辑上完全等效不同Join算法,例如Sort Merge Join(排序合并连接)和Broadcast Join(广播连接)等。以实现Table1与Table2这两个源表的Join为例,下面对Sort Merge Join和BroadcastJoin的实现过程进行说明。Taking the Join operation in distributed SQL as an example, to realize the Join operation of a distributed job, there are many different Join algorithms that are completely equivalent in logic, such as Sort Merge Join (sort merge join) and Broadcast Join (broadcast join). Taking the Join of the two source tables, Table1 and Table2, as an example, the implementation process of Sort Merge Join and BroadcastJoin is described below.
图6A示出了Sort Merge Join的示例图。如图6A所示,两个输入端源表Table1与Table2,可经过M1与M2读取并初步处理,例如M1对Table1进行数据filter(过滤)后,对输出数据按照shuffle/Join key进行分区;M1和M2的输出数据可在下游工作节点按照相同的key进行merge Join操作。在上述过程中,实现Merge Join需要对M1与M2的输出数据进行全量的shuffle和sort(排序)操作,通过保证相同分区的数据都能被分配到同一个下游工作节点。然而在分布式系统中,Sort Merge Join依赖外排等具体实现能够做到对于任意数据量的处理,但是过程中涉及的大量的shuffle以及sort操作,需要消耗较多的计算及网络资源;并且在数据分布不均匀时,shuffle后的数据可能导致严重的长尾,影响执行效率。Figure 6A shows an example diagram of Sort Merge Join. As shown in Figure 6A, the two input source tables, Table1 and Table2, can be read and preliminarily processed by M1 and M2. For example, after M1 performs data filter (filtering) on Table1, the output data is partitioned according to the shuffle/Join key; The output data of M1 and M2 can be merged with the same key on the downstream worker nodes. In the above process, the implementation of Merge Join requires full shuffle and sort (sorting) operations on the output data of M1 and M2, by ensuring that the data in the same partition can be allocated to the same downstream worker node. However, in a distributed system, Sort Merge Join relies on specific implementations such as outflow to process any amount of data, but the large number of shuffle and sort operations involved in the process require a lot of computing and network resources; When the data distribution is uneven, the shuffled data may cause serious long tails and affect the execution efficiency.
图6B示出了Broadcast Join的示例图。分布式系统需要具备处理大表(例如事实表)与大表的Join能力,而对于大表与小表(例如维度表)的Join操作,如果小表数据源的大小能被载入单个工作节点内存,那么分布式shuffle和sort的计算/网络消耗能够被降低。结合图6B所示,图6B中的Table1为大表,Table2为小表,则小表的数据可以Broadcast到所有大表的工作节点,并根据小表数据建立全量的hash table(哈希表),从而大表数据读取后可通过hash table lookup(哈希表查表)来进行Join。这样大表(Table1)的数据只需要读取一次,而且不需要进行任何的数据shuffle和sort。同时,Broadcast Join除能避免shuffle和sort的计算/网络消耗以外,shuffle可能带来的数据倾斜以及长尾也能得以避免,这是因为在Broadcast Join的Join pattern(模式)中,Join的逻辑实际上发生在大表的读取逻辑中(例如,Map stage)。基于此,Broadcast Join也被称为Broadcast Map Join。当然,Broadcast Join在带来的资源/性能等众多好处时,也存在其特定的适用范围:用于建立hash table的小表数据,必须能全量被单个工作节点载入,如果优化器选择了Broadcast Join执行作业,但是执行过程中发现小表数据量超过了内存限制,那么整个作业就会失败。FIG. 6B shows an example diagram of Broadcast Join. Distributed systems need to have the ability to process large tables (such as fact tables) and large tables. For Join operations of large tables and small tables (such as dimension tables), if the size of the data source of the small table can be loaded into a single worker node memory, then the computation/network consumption of distributed shuffle and sort can be reduced. With reference to Figure 6B, Table1 in Figure 6B is a large table, and Table2 is a small table, then the data of the small table can be broadcast to all the working nodes of the large table, and a full hash table (hash table) can be established according to the data of the small table. , so that after the large table data is read, Join can be performed through the hash table lookup (hash table lookup table). In this way, the data of the large table (Table1) only needs to be read once, and there is no need to perform any data shuffle and sort. At the same time, in addition to avoiding the computation/network consumption of shuffle and sort, Broadcast Join can also avoid data skew and long tails that may be caused by shuffle. This is because in the Join pattern (pattern) of Broadcast Join, the logic of Join is actually This happens in the read logic of the large table (for example, the Map stage). Based on this, Broadcast Join is also known as Broadcast Map Join. Of course, when Broadcast Join brings many benefits such as resources/performance, it also has its specific scope of application: the small table data used to build the hash table must be fully loaded by a single worker node. If the optimizer chooses Broadcast Join executes the job, but the data volume of the small table exceeds the memory limit during execution, and the entire job will fail.
基于前文的描述,Broadcast Join在性能方面具有显著优势,Sort Merge Join本身具有更好的通用性,因此分布式系统的优化器如果需要在两者之间选择合适的Join算法,那么就需要更为准确、合理的判断:在保证作业能成功完成的前提下,尽可能的配置高效的执行计划逻辑。但在实际线上场景中,优化器要在作业执行之前,就做出上述判断是非常困难的,主要原因如下:Based on the above description, Broadcast Join has significant advantages in performance, and Sort Merge Join itself has better versatility. Therefore, if the optimizer of a distributed system needs to choose a suitable Join algorithm between the two, it needs to be more Accurate and reasonable judgment: On the premise that the job can be successfully completed, configure the efficient execution plan logic as much as possible. However, in the actual online scenario, it is very difficult for the optimizer to make the above judgment before the job is executed. The main reasons are as follows:
缺失准确的统计信息,分布式系统存储的数据的来源多种多样,由于源表数据导入渠道以及导入时间等多种原因,都有可能造成源表数据的统计信息的缺失或不准确(比如一张表刚刚导入统计信息还未产生,或者,表内容的更新刚刚触及废弃统计数据的阈值等)。总之,统计信息的缺失以及不准确,使得优化器无法准确预估Join的上游输入的大小,Join的上游输入可能是源表,或者源表经过一定逻辑转换后的输出。Lack of accurate statistical information. The sources of data stored in the distributed system are various. Due to various reasons such as source table data import channels and import time, the statistical information of source table data may be missing or inaccurate (for example, a The table has just been imported and the statistics have not been generated, or the update of the table content has just hit the threshold for discarding statistics, etc.). In short, the lack and inaccuracy of statistical information makes it impossible for the optimizer to accurately estimate the size of the upstream input of Join. The upstream input of Join may be the source table, or the output of the source table after a certain logical transformation.
数据处理逻辑以及数据特性的复杂多变,即便在源表数据上能够获取完整准确的统计信息,但Join的操作客观上可能存在于一个数据处理流程中(DAG内部)的任意一个工作节点,源表的数据随着上游的selection(选择)/filter/aggregation(聚集)等复杂转换,可能穿插各种用户自定义代码逻辑(UDF),这些都对优化器事先预估Join的输入数据量造成了困难。The data processing logic and data characteristics are complex and changeable. Even if complete and accurate statistical information can be obtained from the source table data, the Join operation may objectively exist in any work node in a data processing process (inside the DAG). With the complex transformation of upstream selection (selection)/filter/aggregation (aggregation), the data of the table may be interspersed with various user-defined code logic (UDF), which cause the optimizer to estimate the input data volume of Join in advance. difficulty.
正是由于上述这些局限,优化器如果在作业执行之前,静态的选择Join算法,就需要面对两难的取舍:一方面,在输入数据的大小预估不准的情况下,对于Map Join执行计划的选择只能尽量保守,例如,将小表的门限设置的尽可能低,从而丧失了大量的优化机会。而即便小表门限已经配置得很低,由于数据预估错误,数据膨胀等原因,依然会因误判Broadcast Join导致作业执行失败的情况。由于这些极端情况下会导致作业执行失败的后果,因此反馈到优化器的策略上,优化器会进一步选择更加保守的策略,从而造成负面循环。另外,Broadcast Join的触发很大程度上基于人工手动添加的Map Join hint(MapJoin提示),即将Broadcast Join计划的产生交由用户来决定;这种优化器职能的外放,给用户逻辑维护带来了额外的困难,而实际上,用户只能比较准确的感知源表数据体量,而无法准确得知非源表数据在经过数据变化后的输出大小,因此用户指定Map Join hint同样无法避免数据及上游处理逻辑变化而导致的作业执行失败。Because of these limitations, if the optimizer statically selects the Join algorithm before job execution, it needs to face a dilemma. The choice of , can only be conservative as much as possible, for example, setting the threshold of the small table as low as possible, thus losing a lot of optimization opportunities. Even if the small table threshold has been configured very low, due to data estimation errors, data expansion and other reasons, the job execution will still fail due to misjudgment of Broadcast Join. Since these extreme cases will lead to the consequences of job execution failure, it is fed back to the optimizer's strategy, and the optimizer will further choose a more conservative strategy, resulting in a negative cycle. In addition, the triggering of Broadcast Join is largely based on the manually added Map Join hint (MapJoin hint), that is, the generation of the Broadcast Join plan is left to the user to decide; this kind of externalization of the optimizer function brings about the maintenance of user logic. In fact, the user can only accurately perceive the data volume of the source table, but cannot accurately know the output size of the non-source table data after the data change, so the user can not avoid the data by specifying the Map Join hint. and job execution failures caused by changes in upstream processing logic.
基于前文的描述,要求主节点的优化器在作业执行之前,就对Join算法做出准确选择,其准确度受到各种客观原因的影响。总体上来说,数据的特性(包括数据大小以及分布等)需要在作业执行过程中,由上游工作节点完成之后才能获得,因此要做出Join算法的准确判断,则需要在分布式作业的执行过程中进行判断,而不是在作业执行之前进行判断。然而,在作业执行过程中进行Join算法的判断和选择对于执行引擎的DAG动态能力提出了挑战:选择Sort Merge Join以及Broadcast Join等不同的Join算法时,所产生的执行计划不仅在物理属性(并发度,shuffle模式等)会有区别,而且在DAG的拓扑逻辑结构上,也会有较大的变化,因此如果要在作业执行过程中来进行动态调整,就需要提供动态逻辑图的能力(也就是说,执行计划的逻辑能够被动态调整)。需要说明的是,逻辑图的调整往往也伴随着执行计划的物理属性调整,因此这实际是对DAG动态逻辑图以及动态物理图能力均提出了要求。Based on the above description, the optimizer of the master node is required to make an accurate selection of the Join algorithm before the job is executed, and its accuracy is affected by various objective reasons. Generally speaking, the characteristics of the data (including data size and distribution, etc.) can only be obtained after the upstream worker node completes the job execution process. Therefore, to make an accurate judgment of the Join algorithm, it is necessary to execute the distributed job process. The judgment is made during the execution of the job, rather than before the job is executed. However, the judgment and selection of the Join algorithm during job execution poses a challenge to the DAG dynamic capability of the execution engine: when different Join algorithms such as Sort Merge Join and Broadcast Join are selected, the execution plan generated is not only in physical attributes (concurrency) degree, shuffle mode, etc.) will be different, and there will be major changes in the topology and logical structure of DAG. Therefore, if you want to make dynamic adjustments during job execution, you need to provide the ability to provide dynamic logic diagrams (also That is, the logic of the execution plan can be dynamically adjusted). It should be noted that the adjustment of the logic diagram is often accompanied by the adjustment of the physical attributes of the execution plan, so this actually puts forward requirements for both the DAG dynamic logic diagram and the dynamic physical diagram capabilities.
基于上述描述的情况,图6C示出了本申请实施例提供的分布式作业调整方法的又另一流程图。该方法流程可由主节点执行实现。参照图6C,该方法流程可以包括如下步骤。Based on the situation described above, FIG. 6C shows yet another flowchart of the distributed job adjustment method provided by the embodiment of the present application. The method process can be implemented by the master node. Referring to FIG. 6C , the method flow may include the following steps.
在步骤S610中,获取用户提交的作业。In step S610, the job submitted by the user is acquired.
在步骤S611中,生成所述作业的执行计划,所述执行计划包括多个stage以及控制节点,所述多个stage包括上游stage,其中,所述上游stage在下游存在候选的多条执行路径,一条执行路径包括所述上游stage的一个或多个下游stage;所述控制节点用于从所述多条执行路径中选择所述上游stage在下游实际执行的目标执行路径。In step S611, an execution plan of the job is generated, the execution plan includes multiple stages and a control node, the multiple stages include an upstream stage, and there are multiple candidate execution paths downstream of the upstream stage, An execution path includes one or more downstream stages of the upstream stage; the control node is configured to select a target execution path that the upstream stage actually executes downstream from the multiple execution paths.
在本申请实施例中,作业的执行计划在生成时,可以在上游stage的下游携带候选的多条执行路径,一条执行路径可以包括该上游stage的一个或多个下游。例如,一条执行路径可以包括该上游stage的直接下游stage,或者,直接下游stage和间接下游stage。在一些实施例中,执行计划中的一个或多个上游stage可以在其下游携带候选的多条执行路径。In this embodiment of the present application, when the execution plan of the job is generated, it may carry multiple candidate execution paths downstream of the upstream stage, and one execution path may include one or more downstreams of the upstream stage. For example, an execution path may include a directly downstream stage of the upstream stage, or, a direct downstream stage and an indirect downstream stage. In some embodiments, one or more upstream stages in an execution plan may carry candidate execution paths downstream thereof.
在本申请实施例中,执行计划除包括多个stage外,还可以在上游stage的下游设置控制节点,该控制节点可与该上游stage的多条执行路径相联通,并且可从该多条执行路径中选择一条执行路径,作为该上游stage在下游实际执行的目标执行路径。需要说明的是,该控制节点并不属于执行计划中的执行阶段,并不实际调度工作节点等计算资源,而是标识执行计划在上游stage的下游需要加入控制逻辑,以便从多条执行路径中选择实际执行的目标执行路径。In this embodiment of the present application, in addition to including multiple stages, the execution plan may also set a control node downstream of the upstream stage. The control node may be communicated with multiple execution paths of the upstream stage, and may be executed from the multiple execution paths. An execution path is selected from the path as the target execution path of the upstream stage actually executed downstream. It should be noted that the control node does not belong to the execution stage in the execution plan, and does not actually schedule computing resources such as worker nodes, but identifies that the execution plan needs to be added downstream of the upstream stage. Select the target execution path for actual execution.
在一些实施例中,步骤S611所指的执行计划可由DAG描述,例如DAG的一个或多个上游stage可以在下游存在候选的多条执行路径。作为一种示例,图6D示例性的示出了携带多条执行路径的执行计划的示意图。如图6D所示,M1的下游存在候选的两条执行路径:Path0和Path1。并且M1的下游设置有控制节点C8_1,控制节点C8_1与Path0和Path1相联通,可用于从Path0和Path1中,选择M1的下游实际执行的目标执行路径。In some embodiments, the execution plan referred to in step S611 may be described by a DAG, for example, one or more upstream stages of the DAG may have multiple candidate execution paths downstream. As an example, FIG. 6D exemplarily shows a schematic diagram of an execution plan carrying multiple execution paths. As shown in Figure 6D, there are two candidate execution paths downstream of M1: Path0 and Path1. In addition, a control node C8_1 is arranged downstream of M1, and the control node C8_1 communicates with Path0 and Path1, and can be used to select a target execution path actually executed downstream of M1 from Path0 and Path1.
在步骤S612中,在作业执行过程中,获取所述上游stage的输出数据的统计信息。In step S612, during the job execution process, the statistical information of the output data of the upstream stage is acquired.
在步骤S613中,通过所述控制节点,基于所述统计信息,从所述多条执行路径中选择目标执行路径。In step S613, the control node selects a target execution path from the plurality of execution paths based on the statistical information.
在作业执行过程中,主节点可采集执行计划的各个stage的输出数据的统计信息,其中包括下游携带多条执行路径的上游stage的输出数据的统计信息。例如,结合图6D所示,M1在作业执行过程中执行完成后,M1的输出数据的统计信息可被主节点采集。基于主节点采集的上游stage的输出数据的统计信息,主节点可通过控制节点,基于该统计信息,从上游stage在下游的多条执行路径中选择实际执行的目标执行路径。例如,结合图6D所示,控制节点C8_1可基于M1的输出数据的统计信息,从Path0和Path1中,选择M1的下游实际执行的目标执行路径。During job execution, the master node can collect statistical information on the output data of each stage of the execution plan, including the statistical information on the output data of the upstream stage that carries multiple execution paths downstream. For example, as shown in FIG. 6D , after the execution of M1 is completed during the job execution process, the statistical information of the output data of M1 can be collected by the master node. Based on the statistical information of the output data of the upstream stage collected by the master node, the master node can select the actual execution target execution path from the multiple execution paths of the upstream stage based on the statistical information through the control node. For example, as shown in FIG. 6D , the control node C8_1 may select the target execution path actually executed downstream of M1 from Path0 and Path1 based on the statistical information of the output data of M1 .
本申请实施例可以在生成执行计划时,使得执行计划的上游stage可以在下游存在多条候选的执行路径,并且在作业执行过程中,基于上游stage的输出数据的统计信息,来从多条候选的执行路径中选择最终执行的目标执行路径。通过这样的方式可以在作业的执行过程中,根据上游stage的输出数据的实际情况来选取最终的执行路径,使得执行路径的选择更为合理、准确。基于合理、准确的执行路径来执行作业,能够显著提升分布式系统的性能。In this embodiment of the present application, when an execution plan is generated, the upstream stage of the execution plan can have multiple candidate execution paths downstream, and during the job execution process, based on the statistical information of the output data of the upstream stage, multiple candidate execution paths can be selected from the upstream stage. Select the final execution target execution path from the execution path of . In this way, during the execution of the job, the final execution path can be selected according to the actual situation of the output data of the upstream stage, so that the selection of the execution path is more reasonable and accurate. Executing jobs based on reasonable and accurate execution paths can significantly improve the performance of distributed systems.
作为一种可选实现,执行计划的上游stage在下游携带的候选执行路径可以包括两条执行路径,分别为第一执行路径和第二执行路径。图7A示例性的示出了生成携带多条执行路径的执行计划的流程图。该方法流程可由主节点执行实现。参照图7A,该方法流程可以包括如下步骤。As an optional implementation, the candidate execution paths carried downstream by the upstream stage of the execution plan may include two execution paths, namely the first execution path and the second execution path. FIG. 7A exemplarily shows a flow chart of generating an execution plan carrying multiple execution paths. The method process can be implemented by the master node. Referring to FIG. 7A , the method flow may include the following steps.
在步骤S710中,根据作业的源数据预估的数据大小,生成具有多个候选信息的物理计划,一个候选信息表示所述源数据在作业执行过程中可能使用的一个任务,所述多个候选信息包括第一候选信息以及第二候选信息。In step S710, according to the estimated data size of the source data of the job, a physical plan with multiple candidate information is generated. The information includes first candidate information and second candidate information.
在作业提交之时,优化器可预估作业的源数据(例如源表)的数据大小,如果预估的源数据的数据大小小于预设的第一门限值,则优化器可以触发conditional executionplan(有条件执行计划),基于源数据在作业执行过程中可能使用的任务,优化器可生成该源数据的多个candidates(候选信息),以及具有该多个candidates的物理计划。在一些实施例中,任务可以包括一个或多个算子(operator),例如通过算子的处理过程来实现具体任务的执行。When the job is submitted, the optimizer can estimate the data size of the source data (for example, the source table) of the job. If the estimated data size of the source data is smaller than the preset first threshold value, the optimizer can trigger the conditional executionplan (Conditional execution plan), based on the tasks that the source data may use during job execution, the optimizer can generate multiple candidates (candidate information) for the source data, and a physical plan with the multiple candidates. In some embodiments, a task may include one or more operators, for example, the execution of a specific task is implemented through the processing of the operator.
在一个示例中,假设源数据为源表,则源表在作业执行过程中进行Join操作时可以使用Broadcast Join和Sort Merge Join,因此优化器可基于源表在Join操作时可能使用的Broadcast Join和Sort Merge Join,生成该源表的两个candidates,其中一个candidate表示Broadcast Join,另一个candidate表示Sort Merge Join。在一些实施例中,物理计划可以认为是节点树的数据结构。In one example, assuming that the source data is the source table, the source table can use Broadcast Join and Sort Merge Join during the Join operation during job execution, so the optimizer can use Broadcast Join and Sort Merge Join that the source table may use during the Join operation. Sort Merge Join, generate two candidates for the source table, one candidate represents Broadcast Join, and the other candidate represents Sort Merge Join. In some embodiments, a physical plan can be thought of as a data structure of a tree of nodes.
在步骤S711中,将物理计划转换为算子树,算子树包括物理计划的一个或多个原始任务,一个原始任务使用一个或多个算子。In step S711, the physical plan is converted into an operator tree, the operator tree includes one or more original tasks of the physical plan, and one original task uses one or more operators.
在步骤S712中,当第一次遍历到预设算子时,根据与第一执行路径对应的第一候选信息,在算子树中新增第一执行路径的任务,以及将新增任务中的算子与原始任务中的相关算子进行data pipe(数据管道)连接,以在算子树中转化出第一执行路径。In step S712, when the preset operator is traversed for the first time, according to the first candidate information corresponding to the first execution path, the task of the first execution path is added to the operator tree, and the new task is added to the The operator is connected with the relevant operator in the original task by data pipe (data pipe) to convert the first execution path in the operator tree.
为生成携带多条执行路径的执行计划,优化器在将物理计划转换为算子树后,可根据多个候选信息在算子树中转化出多条执行路径。在一些实施例中,优化器可对算子树进行多次遍历,并且预先设置进行执行路径转化的预设算子,当优化器第一次遍历到预设算子时,则可对多个候选信息中第一候选信息对应的第一执行路径进行转化。在一些实施例中,多个候选信息至少包括第一候选信息以及第二候选信息,其中,第一候选信息与第一执行路径相对应,可记录第一执行路径对应的任务,第二候选信息可与第二执行路径相对应,可记录第二执行路径对应的任务。In order to generate an execution plan with multiple execution paths, after converting the physical plan into an operator tree, the optimizer can convert multiple execution paths in the operator tree according to multiple candidate information. In some embodiments, the optimizer may traverse the operator tree multiple times, and preset a preset operator for performing path conversion. When the optimizer traverses the preset operator for the first time, it may traverse multiple The first execution path corresponding to the first candidate information in the candidate information is converted. In some embodiments, the plurality of candidate information includes at least first candidate information and second candidate information, wherein the first candidate information corresponds to the first execution path, the task corresponding to the first execution path can be recorded, and the second candidate information It may correspond to the second execution path, and the task corresponding to the second execution path may be recorded.
在一些实施例中,当优化器第一次遍历到预设算子时,可根据第一候选信息在算子树中新增第一执行路径的任务,新增的任务可以包括一个或多个算子,为使得第一执行路径的任务能够与算子树的原始任务进行联通,优化器可将新增任务的算子与原始任务中的相关算子进行管道连接,原始任务中的相关算子可以认为是原始任务中与新增任务的算子相关的算子(例如原始任务中处于新增任务的算子的上游、下游的算子)。通过上述处理,优化器可以在算法树中转化出第一执行路径。In some embodiments, when the optimizer traverses the preset operator for the first time, it may add a task of the first execution path to the operator tree according to the first candidate information, and the newly added task may include one or more tasks Operator, in order to enable the task of the first execution path to communicate with the original task of the operator tree, the optimizer can connect the operator of the new task with the related operator in the original task, and the related operator in the original task can be connected by pipeline. An operator can be considered as an operator in the original task that is related to the operator of the newly added task (for example, the operator in the original task that is upstream and downstream of the operator of the newly added task). Through the above processing, the optimizer can convert the first execution path in the algorithm tree.
在步骤S713中,当第二次遍历到预设算子时,根据与第二执行路径对应的第二候选信息,在算子树中新增第二执行路径的任务,以及将新增任务中的算子与已有任务中的相关算子进行data pipe连接,以在算子树中转化出第二执行路径。In step S713, when the preset operator is traversed for the second time, according to the second candidate information corresponding to the second execution path, the task of the second execution path is added to the operator tree, and the new task is added to the The operator is connected with the relevant operator in the existing task by data pipe, so as to convert the second execution path in the operator tree.
在一些实施例中,当优化器第二次遍历到预设算子时,可根据第二候选信息在算子树中转化出第二执行路径。优化器可根据第二候选信息,在算子树中新增第二执行路径的任务,新增的任务可以包括一个或多个算子,为使得第二执行路径的任务能够与算子树中的已有任务进行联通,优化器可将新增任务的算子与已有任务中的相关算子进行datapipe连接,已有任务中的相关算子可以认为是已有任务中与新增任务的算子相关的算子(例如原始任务中处于新增任务的算子的上游、下游的算子)。通过上述处理,优化器可以在算法树中转化出第二执行路径。In some embodiments, when the optimizer traverses the preset operator for the second time, it can convert the second execution path in the operator tree according to the second candidate information. The optimizer can add a task of the second execution path in the operator tree according to the second candidate information, and the newly added task can include one or more operators, so that the task of the second execution path can be combined with the operator tree. The optimizer can connect the operator of the new task with the related operator in the existing task through datapipe connection, and the related operator in the existing task can be considered as the connection between the existing task and the new task. Operator-related operators (for example, operators in the original task that are upstream and downstream of the operator of the newly added task). Through the above processing, the optimizer can convert the second execution path in the algorithm tree.
作为可选实现,步骤S712可以认为是在算法树中基于第一候选信息转化第一执行路径的可选实现,步骤S713可以认为是在算法树中基于第二候选信息转化第二执行路径的可选实现。As an optional implementation, step S712 can be considered as an optional implementation of converting the first execution path based on the first candidate information in the algorithm tree, and step S713 can be considered as an optional implementation of converting the second execution path based on the second candidate information in the algorithm tree Choose to implement.
在步骤S714中,在算子树的第一执行路径和第二执行路径的上游设置控制节点,以得到执行计划。In step S714, control nodes are set upstream of the first execution path and the second execution path of the operator tree to obtain an execution plan.
在算子树中完成第一执行路径和第二执行路径的转化之后,为使得作业执行过程中,能够在第一执行路径和第二执行路径的上游stage执行完成之后,实现从第一执行路径和第二执行路径中选择目标选择路径,优化器还可在算子树的第一执行路径和第二执行路径的上游设置控制节点,以通过控制节点实现作业执行过程中第一执行路径和第二执行路径的动态选择。通过上述过程,本申请实施例可生成携带多条执行路径的执行计划。After the transformation of the first execution path and the second execution path is completed in the operator tree, in order to enable the job execution process, after the execution of the upstream stages of the first execution path and the second execution path is completed, the first execution path can be executed from the first execution path. The optimizer can also set control nodes in the upstream of the first execution path and the second execution path of the operator subtree, so as to realize the first execution path and the second execution path in the job execution process through the control nodes. 2. Dynamic selection of execution paths. Through the above process, the embodiment of the present application can generate an execution plan that carries multiple execution paths.
为便于说明上述生成携带多条执行路径的执行计划的过程,本申请实施例引入conditional execution plan(有条件执行计划)的概念:允许优化器在作业提交时,基于多个candidates(候选信息)产生具有多条执行路径的执行计划。最终哪个candidate的执行路径被选择使用,则是由主节点在作业执行过程中,根据上游stage产生的输出数据的统计信息,来进行动态选择。In order to facilitate the description of the above process of generating an execution plan carrying multiple execution paths, the embodiment of the present application introduces the concept of a conditional execution plan: allowing the optimizer to generate based on multiple candidates (candidate information) when a job is submitted Execution plans with multiple execution paths. In the end, which candidate execution path is selected to be used is dynamically selected by the master node during the job execution process based on the statistical information of the output data generated by the upstream stage.
作为一种可选实现,优化器可通过Cost-based Optimization(基于成本的优化)以及Execution plan generation(执行计划生成)两个环节,实现产生携带多条执行路径的执行计划。As an optional implementation, the optimizer can generate an execution plan with multiple execution paths through two links, Cost-based Optimization and Execution plan generation.
在Cost-based Optimization环节,以Join场景为例,优化器可在Join的buildrule(构造规则)中通过源表统计等信息,估算Join的小表在内存中的数据大小,例如,基于小表的RowCount(行数)*AverageRowSize(平均行大小)估算得到小表在内存中的数据大小。进而,优化器可判断小表在内存中的数据大小是否小于预设的第一门限值(第一门限值可以预先设置并定义为threshold1)。如果小表在内存中的数据大小,小于预设的第一门限值,则优化器会触发数据库的conditional Map Join(有条件的映射连接),例如产生小表数据进行Map Join的多个candidates,一个candidate可以表示小表使用的一个Join算子(例如Broadcast Join或Sort Merge Join)。基于conditional Map Join提供的多个candidates来实现后继的动态决策,优化器可以将Cost-based Optimization环节的第一门限值threshold1配置的比较大(默认值为512M),也就是只要按照比较宽松的概率,判断出一个作业可能利用Broadcast Join,就产生conditional Map Join的计划,而最终选择执行路径的选择,则交给执行引擎在作业执行过程中来进行动态选择。In the Cost-based Optimization section, taking the Join scenario as an example, the optimizer can estimate the data size of the Join's small table in memory through the source table statistics and other information in the Join's buildrule (construction rule). RowCount (number of rows)*AverageRowSize (average row size) estimates the data size of the small table in memory. Further, the optimizer can determine whether the data size of the small table in the memory is smaller than a preset first threshold value (the first threshold value can be preset and defined as threshold1). If the data size of the small table in memory is smaller than the preset first threshold value, the optimizer will trigger the conditional Map Join of the database (conditional map join), such as generating multiple candidates for Map Join with small table data , a candidate can represent a Join operator (such as Broadcast Join or Sort Merge Join) used by small tables. Based on the multiple candidates provided by conditional Map Join to realize subsequent dynamic decision-making, the optimizer can configure the first threshold value threshold1 of the Cost-based Optimization link to be relatively large (the default value is 512M), that is, as long as the looser value is used Probability, it is judged that a job may use Broadcast Join to generate a conditional Map Join plan, and the final selection of the execution path is handed over to the execution engine for dynamic selection during the job execution process.
上述conditional Map Join的计划可以视为是physical plan(物理计划),而并不是最终的执行计划,physical plan可以是RelNode tree(控制节点树)的结构,其中RelNode表示relational expression(关系表达式)。在Cost-based Optimization环节,physical plan中包含了conditional Map Join这样的控制节点,而该控制节点内部基于多个candidates可以包含多条Path(执行路径),以表达进行Join算法可能选择的路径,例如Broadcast Join和Sort Merge Join对应的执行路径。而无论最终选择哪条Path,小表的计算和数据会在该多条Path共享。conditional Map Join在优化器的cost model中定义的cost可以介于Broadcast Join和Sort Merge Join两者的cost之间,并且根据最终选择的Path的概率来决定两者的cost比例。The above conditional Map Join plan can be regarded as a physical plan (physical plan), rather than the final execution plan, the physical plan can be a RelNode tree (control node tree) structure, where RelNode represents a relational expression (relational expression). In the Cost-based Optimization link, the physical plan contains control nodes such as conditional Map Join, and the control node can contain multiple Paths (execution paths) based on multiple candidates to express the paths that may be selected for the Join algorithm, such as Execution paths corresponding to Broadcast Join and Sort Merge Join. No matter which Path is finally selected, the calculation and data of the small table will be shared among the multiple Paths. The cost defined by conditional Map Join in the cost model of the optimizer can be between the costs of Broadcast Join and Sort Merge Join, and the cost ratio of the two is determined according to the probability of the final selected Path.
在Execution plan generation环节,优化器将上一步产生的physical plan转化为最终的execution plan(执行计划),例如,将优化器产生的physical plan转化为能被runtime(运行时组件)理解的physical operator tree(物理算子树)以及构造出DAGtopology(拓扑结构)。因此最终的执行计划包括了由task和edge组成的DAG,以及每个工作节点内部的operator tree(即物理算子树)。总体而言,上述过程是一个physical plan(物理计划)的后序遍历过程,并在遍历过程中动态切分task以及添加Edge。不同于普通query,conditional Map Join要实现运行时的动态决策和选择Path,那么在execution plan产生过程中需要添加控制任务和dependent edge(从属连接边),从而准确地通过dependentedge来描述上下游工作节点之间的调度依赖关系(无数据流动)以及一个单纯的控制节点。例如,将包含正常执行operator的普通工作节点通过dependent edge和控制节点相连。通过上述设置可以在DAG运行过程中,由控制节点通过判断小表实际运行后的大小,来选择后续的执行路径。In the Execution plan generation step, the optimizer converts the physical plan generated in the previous step into the final execution plan (execution plan), for example, converts the physical plan generated by the optimizer into a physical operator tree that can be understood by the runtime (runtime component). (Physical operator tree) and construct DAGtopology (topology). Therefore, the final execution plan includes the DAG composed of tasks and edges, as well as the operator tree (ie, the physical operator tree) inside each worker node. In general, the above process is a post-order traversal process of a physical plan, and dynamically divides tasks and adds edges during the traversal process. Different from ordinary query, Conditional Map Join needs to implement dynamic decision-making and path selection at runtime, so it is necessary to add control tasks and dependent edges (dependent edge) in the process of execution plan generation, so as to accurately describe the upstream and downstream work nodes through the dependent edge. Scheduling dependencies between (no data flow) and a pure control node. For example, connect a normal worker node containing a normally executing operator to a control node via a dependent edge. Through the above settings, the control node can select the subsequent execution path by judging the size of the small table after the actual operation of the DAG during the operation of the DAG.
图7B、图7C、图7D和图7E示例性的示出了物理计划转化为执行计划的过程。其中,主节点完成图7B之后可执行图7C,完成图7C之后可执行图7D,完成图7D之后可执行图7E。其中,图示中的粗箭头指代data pipeline(数据管道连接)、细虚线箭头表示controlpipeline(控制管道连接)、细实线箭头表示runtime operator data flow(运行期间算子的数据流)。在一个实现示例中,当主节点在遍历physical plan的过程中,如果遍历到conditional Map Join,并准备转化conditional Map Join包含的多个Path时,主节点可将conditional Map Join的输入relNode tree(控制节点树)转换为operator tree(算子树),operator tree中的operator可以形成图7B所示的M1和M2这两个task,例如M1使用TableScan1(表扫描1)和Filter1(过滤1)这两个operator,M2使用TableScan2(表扫描2)、Filter2(过滤2)和StreamlineWrite1(流线写1)这三个operator。7B, 7C, 7D and 7E exemplarily show the process of converting a physical plan into an execution plan. The master node may execute FIG. 7C after completing FIG. 7B , execute FIG. 7D after completing FIG. 7C , and execute FIG. 7E after completing FIG. 7D . Among them, the thick arrows in the figure refer to the data pipeline (data pipeline connection), the thin dashed arrow represents the control pipeline (control pipeline connection), and the thin solid arrow represents the runtime operator data flow (the data flow of the operator during operation). In an implementation example, when the master node traverses the physical plan, if it traverses to the conditional Map Join and is ready to transform multiple Paths included in the conditional Map Join, the master node can input the conditional Map Join into the relNode tree (control node tree) is converted into an operator tree (operator tree), and the operators in the operator tree can form the two tasks M1 and M2 shown in Figure 7B. For example, M1 uses TableScan1 (table scan 1) and Filter1 (filter 1). operator, M2 uses the three operators TableScan2 (table scan 2), Filter2 (filter 2) and StreamlineWrite1 (streamline write 1).
假设conditional Map Join包含的多个candidates为2个candidates,且其中一个candidate对应Path0,另一个candidate对应Path1,Path0可以对应Broadcast HashJoin的Join实现,Path1可以对应Sort Merge Join的Join实现。结合图7C所示,主节点在遍历operator tree的过程中,如果第一次遍历到M1的filter1,则可对Path0进行转化。具体来说,基于Path0的candidate记录的task,主节点可在算子树中新增Path0的task,以及将新增的task中的算子与M1和M2进行data pipe连接。结合图7C所示,R3为新增的task并且具有StreamlineRead1(流线读取1),StreamlineWrite2(流线写2)这两个新增的算子,R1中新增了StreamlineRead2(流线读取2)以及ConditionalMapJoin1。其中,M1中新增的ConditionalMapJoin1是Broadcast Hash Join的operator id(算子标识),其operator类型是HashJoin。Assume that the multiple candidates contained in the conditional map join are two candidates, and one candidate corresponds to Path0 and the other candidate corresponds to Path1. Path0 can correspond to the Join implementation of Broadcast HashJoin, and Path1 can correspond to the Join implementation of Sort Merge Join. With reference to FIG. 7C , in the process of traversing the operator tree, if the master node traverses to filter1 of M1 for the first time, Path0 can be converted. Specifically, based on the task recorded by the candidate of Path0, the master node can add a task of Path0 to the operator tree, and connect the operator in the newly added task to M1 and M2 with a data pipe. As shown in Figure 7C, R3 is a new task and has two new operators, StreamlineRead1 (streamline read 1) and StreamlineWrite2 (streamline write 2). R1 adds StreamlineRead2 (streamline read). 2) and ConditionalMapJoin1. Among them, ConditionalMapJoin1 newly added in M1 is the operator id (operator identifier) of Broadcast Hash Join, and its operator type is HashJoin.
上述过程是第一次遍历到M1中的filter1时执行,以在算子树的基础上转化Path0,从而在执行计划中表达出Path0的执行路径;在这个过程中,主节点可对算子树中M1原本的operator进行记录以便后续进行copy(复制)。从当前新增加的四个operator可以推断出目前正在处理的是R3和M1,此时,将其记录下来作为一条候选的Path0。The above process is executed when traversing filter1 in M1 for the first time, so as to transform Path0 on the basis of the operator tree, so as to express the execution path of Path0 in the execution plan; in this process, the master node can The original operator of M1 is recorded for subsequent copying. From the four newly added operators, it can be inferred that R3 and M1 are currently being processed. At this time, they are recorded as a candidate Path0.
然后开始对Path1做转化,Path1和Path0共享的是StreamlineWrite和Filter,例如StreamlineWrite1和Filter1。基于Path1和Path0共享的算子Filter1,当主节点第二次遍历到Filter1时,可对Path1进行转化。具体来说,基于Path1的candidate记录的task,主节点可在已转化Path0的算子树上新增Path1的task,以及将新增的task中的算子与当前任务中的相关算子进行data pipe连接。结合图7D所示,M5和J6为新增的task,在新增M5的task时,可以将前面记录的M1的operator(例如图7D中的TableScan1和Filter1),copy到M5中,同时在M5中新增StreamlineWrite3(流线写3)算子。在新增J6的task时,可以在J6中新增StreamlineRead3(流线读取3)、StreamlineRead4(流线读取4)和ConditionalMapJoin1这三个算子,并且实现J6与M2和M5之间在算子层面的data pipe。Then start converting Path1. Path1 and Path0 share StreamlineWrite and Filter, such as StreamlineWrite1 and Filter1. Based on the operator Filter1 shared by Path1 and Path0, when the main node traverses to Filter1 for the second time, Path1 can be transformed. Specifically, based on the task recorded by the candidate of Path1, the master node can add a task of Path1 to the operator tree of Path0 that has been transformed, and perform data analysis between the operator in the newly added task and the relevant operator in the current task. pipe connection. As shown in Figure 7D, M5 and J6 are newly added tasks. When adding a M5 task, you can copy the previously recorded operators of M1 (such as TableScan1 and Filter1 in Figure 7D) to M5, and at the same time in M5 Added StreamlineWrite3 operator in . When adding a J6 task, you can add three operators, StreamlineRead3 (streamline read 3), StreamlineRead4 (streamline read 4), and ConditionalMapJoin1 in J6, and realize the computation between J6 and M2 and M5. Sublevel data pipe.
结合图7E所示,在完成Path0和Path1的转化之后,需要在算子树中创建控制节点C8,C8中只有一个operator,即Conditional operator,其作用是在表的实际作业过程中,基于表执行过程中的大小,来从Path0和Path1中选择进行Join的执行路径。As shown in Figure 7E, after the conversion of Path0 and Path1 is completed, a control node C8 needs to be created in the operator tree. There is only one operator in C8, that is, the Conditional operator. The size of the process to select the execution path for Join from Path0 and Path1.
在进一步的一些实施例中,在生成带有多条执行路径的执行计划之后,主节点可以在作业执行过程中,获取上游stage的输出数据的数据量,判断该数据量是否小于预设的第二门限值,若是,则选择第一执行路径作为目标执行路径,若否,则选择第二执行路径作为目标执行路径。假设上游stage处理的数据为小表,则在作业执行过程时,如果小表对应的task执行完毕,则控制节点可根据小表实际执行的大小,判断小表的数据输出结果是否小于预设的第二门限值(定义为threshold2),若小表的执行输出结果小于第二门限值,则选中Broadcast Join的执行路径(即第一执行路径为执行Broadcast Join的执行路径);若小表的执行输出结果大于第二门限值,则选中Sort Merge Join的执行路径(即第二执行路径为执行Sort Merge Join的执行路径)。需要说明的是,threshold2考虑的是在实际运行时,小表数据在内存加载成hash table所占用的内存大小,其默认值可以和threshold1一样(例如512M),当然本申请实施例也可设置threshold2与threshold1的数值不同。另外需要说明的是,只有在小表估算的大小满足threshold1,才能触发conditional Join plan,而在作业运行中,主节点收集到小表真正的大小时,是通过threhold2来决策小表进行Join最终的执行路径;所以threshold1是在优化器阶段做决策用,threshold2是在DAG运行阶段做决策用,通过threshold1和threshold2这两个数值的调整,以及优化器和DAG的互相配合来完成整个conditional(条件)的生成以及最终执行计划的选择。In some further embodiments, after generating an execution plan with multiple execution paths, the master node can obtain the data volume of the output data of the upstream stage during the job execution process, and determine whether the data volume is less than the preset first Two threshold values, if yes, select the first execution path as the target execution path; if not, select the second execution path as the target execution path. Assuming that the data processed by the upstream stage is a small table, during the job execution process, if the task corresponding to the small table is executed, the control node can judge whether the data output result of the small table is smaller than the preset value according to the actual execution size of the small table. The second threshold value (defined as threshold2), if the execution output result of the small table is less than the second threshold value, the execution path of Broadcast Join is selected (that is, the first execution path is the execution path for executing Broadcast Join); If the execution output result is greater than the second threshold value, the execution path of Sort Merge Join is selected (that is, the second execution path is the execution path for executing Sort Merge Join). It should be noted that threshold2 considers the memory size occupied by the small table data loaded into the hash table in the actual operation, and its default value can be the same as that of threshold1 (for example, 512M). Of course, the embodiment of this application can also set threshold2 Different from the value of threshold1. In addition, it should be noted that the conditional Join plan can be triggered only when the estimated size of the small table meets the threshold1. During the job operation, when the master node collects the real size of the small table, it uses the threshold2 to decide the final join of the small table. Execution path; so threshold1 is used to make decisions in the optimizer stage, and threshold2 is used to make decisions in the DAG running stage. Through the adjustment of the two values of threshold1 and threshold2, as well as the cooperation between the optimizer and DAG, the entire conditional is completed. generation and selection of the final execution plan.
在执行路径的动态选择方面,结合图6D所示,当作业初始提交时,因为还没有确认最终的执行路径,所以提交的DAG中包含了两条可能的执行路径(例如图6D虚线所示的两条执行路径Path0和Path1)。同时DAG中新增加了控制节点(C8_1),这个控制节点只有逻辑上的控制意义不会拉起任何工作节点,控制节点根据M1的输出来选择执行Path0还是Path1。例如,在作业执行过程中,M1在读取小表输出数据的candidate并做处理后,其实际输出的数据量会被主节点收集,并在控制节点基于小表实际输出的数据量进行执行路径的决策选择。如果选择执行Path0,则完整执行计划可以例如图7F所示,如果选择执行Path1,则完整执行计划可以例如图7G所示。In terms of dynamic selection of execution paths, as shown in Figure 6D, when the job is initially submitted, because the final execution path has not been confirmed, the submitted DAG contains two possible execution paths (for example, the dotted line in Figure 6D shows Two execution paths Path0 and Path1). At the same time, a new control node (C8_1) is added to the DAG. This control node has only logical control meaning and will not pull up any working nodes. The control node chooses to execute Path0 or Path1 according to the output of M1. For example, during job execution, after M1 reads the candidate output data of the small table and processes it, the actual output data volume will be collected by the master node, and the control node will execute the execution path based on the actual output data volume of the small table. decision choice. If Path0 is selected to be executed, the complete execution plan may be as shown in FIG. 7F , and if Path1 is selected to be executed, the complete execution plan may be as shown in FIG. 7G .
本申请实施例可以在作业执行之前对执行计划进行调整,使得执行计划的上游stage可以在下游存在多条候选的执行路径,并且在作业执行过程中,基于上游stage的输出数据的统计信息,来从多条候选的执行路径中选择最终执行的目标执行路径。通过这样的方式可以在作业的执行过程中,根据上游stage的输出数据的实际情况来选取最终的执行路径,使得执行路径的选择更为合理、准确,从而基于合理、准确的执行路径执行作业,能够显著提升分布式系统的性能。In this embodiment of the present application, the execution plan can be adjusted before the job is executed, so that the upstream stage of the execution plan can have multiple candidate execution paths downstream, and during the job execution process, based on the statistical information of the output data of the upstream stage, the The final execution target execution path is selected from a plurality of candidate execution paths. In this way, during the execution of the job, the final execution path can be selected according to the actual situation of the output data of the upstream stage, so that the selection of the execution path is more reasonable and accurate, so that the job can be executed based on a reasonable and accurate execution path. It can significantly improve the performance of distributed systems.
随着深度学习(Deep-Learning)的广泛应用,分布式系统针对深度学习作业需要满足的处理需求越来越多,并且出现了多种适用于深度学习作业的执行引擎。然而针对深度学习作业,分布式系统在调度与执行上仍然存在各种缺陷。例如对于深度学习系统(比如Tensorflow)的原生逻辑而言,原生逻辑的调度与执行完全依赖外部系统而并没有配置在分布式系统的执行引擎内。以Parameter Server(参数服务器,PS)架构为例,分布式系统的工作节点可以分为两类:PS节点和Worker(工作器)节点,其中PS节点存放深度学习参数(例如深度学习模型的参数),而Worker节点用于计算深度学习参数的梯度。在深度学习的每个迭代过程,Worker节点从PS节点中获得深度学习参数,然后将计算的梯度返回给PS节点,PS节点聚合从Worker节点传回的梯度并更新深度学习参数,PS节点将更新的深度学习参数再广播给Worker节点,以此方式不断执行,从而实现深度学习参数迭代调整。在分布式系统的PS架构中,PS节点与Worker节点在运行过程中存在如下特点:With the wide application of deep learning (Deep-Learning), distributed systems need to meet more and more processing requirements for deep learning jobs, and a variety of execution engines suitable for deep learning jobs have emerged. However, for deep learning jobs, distributed systems still have various shortcomings in scheduling and execution. For example, for the native logic of a deep learning system (such as Tensorflow), the scheduling and execution of the native logic completely depends on the external system and is not configured in the execution engine of the distributed system. Taking the Parameter Server (parameter server, PS) architecture as an example, the working nodes of a distributed system can be divided into two categories: PS nodes and Worker (worker) nodes, where the PS nodes store deep learning parameters (such as parameters of a deep learning model) , and the Worker node is used to calculate the gradient of the deep learning parameters. In each iteration of deep learning, the Worker node obtains the deep learning parameters from the PS node, and then returns the calculated gradient to the PS node. The PS node aggregates the gradients returned from the Worker node and updates the deep learning parameters, and the PS node will update the The deep learning parameters are broadcast to the Worker node, and they are continuously executed in this way, so as to realize the iterative adjustment of the deep learning parameters. In the PS architecture of the distributed system, the PS nodes and the Worker nodes have the following characteristics in the running process:
PS节点与Worker节点行使的职责有显著不同,并且对应执行计划的不同stage,其中。PS节点对应的stage可称为PS stage(参数服务器执行阶段),Worker节点对应的stage可称为Worker stage(工作器执行阶段);The responsibilities performed by the PS node and the Worker node are significantly different, and correspond to different stages of the execution plan, among which. The stage corresponding to the PS node can be called the PS stage (parameter server execution stage), and the stage corresponding to the Worker node can be called the Worker stage (worker execution stage);
PS节点作为Parameter的serving entity(服务实体),可以独立运行;As the serving entity of Parameter, the PS node can run independently;
Worker节点在使用和更新Parameter(参数)时,需要PS节点在运行后才能有效的运行,并且在运行过程中需要和PS节点持续的进行数据交互。When the Worker node uses and updates the Parameter (parameter), it needs the PS node to run effectively before it runs, and it needs to continuously exchange data with the PS node during the running process.
上述特点在许多分布式执行框架中描述起来是比较困难的:虽然PS节点和Worker节点之间存在调度上的前后依赖关系,但是因为PS节点与Worker节点能够同时运行,这种依赖关系无法映射到上游节点运行完毕再调度下游节点的逻辑中。基于此,在许多外部系统中,PS节点与Worker节点只能在执行计划中对应两个孤立无联系的stage来分开调度和运行,而这完全有可能导致Worker节点在PS节点调度起来之前就已经在空转,从而造成Worker节点的资源浪费。除此之外,由于缺失PS节点、Worker节点在不同stage之间的关系描述,许多基本功能和动态功能都无法实现。The above characteristics are difficult to describe in many distributed execution frameworks: although there is a scheduling dependency between PS nodes and Worker nodes, because PS nodes and Worker nodes can run at the same time, this dependency cannot be mapped to In the logic of scheduling downstream nodes after the upstream node has finished running. Based on this, in many external systems, the PS node and the Worker node can only be scheduled and run separately in the execution plan corresponding to two isolated and unconnected stages, which may cause the Worker node to be scheduled before the PS node is scheduled. It is idling, resulting in wasted resources of the Worker node. In addition, many basic functions and dynamic functions cannot be realized due to the lack of description of the relationship between PS nodes and Worker nodes in different stages.
在深度学习领域,深度学习作业使用的资源(尤其是GPU资源)一般都是直接开放给终端用户来指定。例如,终端用户指定深度学习作业的工作节点并发度、每个工作节点使用的资源大小(比如一个工作节点使用的GPU数量等)。然而让用户自己选择合适的资源是比较困难的,用户为了保证深度学习作业能够有足够使用的资源,往往会过量的申请资源,导致资源的浪费情况。例如,用户为了保证深度学习作业的GPU使用得到保障,会在对实际使用的GPU资源无法准确预测的情况下,为每个工作节点申请多张GPU卡,然而深度学习作业实际执行过程中,可能只利用了其中的25%,这导致剩余的GPU资源出现闲置而产生浪费。这种情况导致的一个累积效应就是分布式系统上大量的GPU资源由于用户的申请而被预留,甚至用户申请的GPU资源超过分布式系统的GPU资源总量,出现分布式系统的实际GPU资源利用率较低,而其他作业则需要排队使用GPU资源的现象。另一方面,许多深度学习作业对于资源(尤其是GPU资源)的使用是特别敏感的,如果盲目降低允许用户申请的资源上限,可能会导致深度学习作业的执行性能出现下降。In the field of deep learning, resources (especially GPU resources) used by deep learning jobs are generally directly open to end users to specify. For example, the end user specifies the worker node concurrency of the deep learning job, the resource size used by each worker node (such as the number of GPUs used by a worker node, etc.). However, it is difficult for users to choose appropriate resources. In order to ensure sufficient resources for deep learning operations, users often apply for resources excessively, resulting in a waste of resources. For example, in order to ensure that the GPU usage of deep learning jobs is guaranteed, users will apply for multiple GPU cards for each worker node when the actual GPU resources used cannot be accurately predicted. However, during the actual execution of deep learning jobs, it is possible that Only 25% of it is utilized, which results in wasted and idle GPU resources. A cumulative effect caused by this situation is that a large number of GPU resources on the distributed system are reserved due to the user's application, and even the GPU resources requested by the user exceed the total GPU resources of the distributed system, and the actual GPU resources of the distributed system appear. Utilization is low, while other jobs need to queue up to use GPU resources. On the other hand, many deep learning jobs are particularly sensitive to the use of resources (especially GPU resources). If you blindly reduce the upper limit of resources that users are allowed to apply for, the execution performance of deep learning jobs may be degraded.
基于上述情况,如何保证深度学习作业的资源申请精度,并且提升分布式系统的资源利用率,以在节约分布式系统计算资源的同时,保证深度学习作业的执行性能不受影响,成为了亟待解决的问题。Based on the above situation, how to ensure the resource application accuracy of deep learning jobs and improve the resource utilization of distributed systems, so as to save the computing resources of distributed systems and ensure that the execution performance of deep learning jobs is not affected, has become an urgent solution. The problem.
为解决上述问题,基于DAG中的顶点和连接边都可以对应不同的逻辑与物理属性,本申请实施例在连接边的物理属性上引入sequential edge(顺序边)与concurrent edge(并行边)的物理属性,并使得顺序边和并行边与数据传输进行解耦。并行边描述的是并行边连接的上、下游stage的工作节点可以同时处于运行状态,但是调度上依然有先后,而且调度的时机可以自定义。例如并行边连接的上、下游stage的工作节点可以同步调度,也可以是下游stage的工作节点在上游stage的工作节点运行后进行调度,也可以下游stage的工作节点在上游stage的工作节点运行到一定阶段后再由事件触发调度等。而顺序边描述的是顺序边连接的下游stage的工作节点,需要等到上游stage的工作节点全部或者部分执行结束后才能调度运行。In order to solve the above problems, based on the fact that the vertices and connecting edges in the DAG can correspond to different logical and physical attributes, the embodiment of the present application introduces the physical attributes of sequential edges (sequential edges) and concurrent edges (parallel edges) on the physical attributes of the connecting edges. properties, and decouples sequential and parallel edges from data transfer. The parallel edge describes that the worker nodes of the upstream and downstream stages connected by the parallel edge can be running at the same time, but there is still a sequence in the scheduling, and the timing of the scheduling can be customized. For example, the worker nodes of the upstream and downstream stages connected by parallel edges can be scheduled synchronously, or the worker nodes of the downstream stage can be scheduled after the worker nodes of the upstream stage run, or the worker nodes of the downstream stage can run to the worker nodes of the upstream stage. After a certain period of time, it will be triggered by events and scheduled. The sequence edge describes the work nodes of the downstream stage connected by the sequence edge, and it needs to wait until all or part of the work nodes of the upstream stage are executed before they can be scheduled to run.
基于并行边,深度学习作业的PS节点和Worker节点的关系能够更加准确的进行描述。图8A示例性的示出了并行边连接的PS stage(参数服务器执行阶段)和Worker stage(工作器执行节点)的示例图。如图8A所示,PS stage为PS节点在执行计划中对应的stage,Worker stage为Worker节点在执行计划对应的stage,在执行计划中PS stage通过并行边与Worker stage连接,且并行边由PS stage输入Worker stage。从而,Worker节点和Worker节点可以同时处于运行状态,并且调度时机可以进行自定义。在一些实施例中,下表1示例性的示出了并行边连接的下游节点的调度时机的类型(称为调度类型),可进行参照,其中,上游节点可以认为是并行边连接的上游stage对应的工作节点(例如图8A所示的PS stage对应的PS节点),下游节点可以认为是并行边连接的下游stage对应的工作节点(例如图8A所示的Worker stage对应的Worker节点)。Based on parallel edges, the relationship between PS nodes and Worker nodes of deep learning jobs can be described more accurately. FIG. 8A exemplarily shows an example diagram of a PS stage (parameter server execution stage) and a Worker stage (worker execution node) connected by parallel edges. As shown in Figure 8A, the PS stage is the stage corresponding to the PS node in the execution plan, and the Worker stage is the stage corresponding to the Worker node in the execution plan. In the execution plan, the PS stage is connected to the Worker stage through a parallel edge, and the parallel edge is connected by the PS stage Enter Worker stage. Thus, the Worker node and the Worker node can be running at the same time, and the scheduling timing can be customized. In some embodiments, the following table 1 exemplarily shows the types of scheduling opportunities (called scheduling types) of downstream nodes connected by parallel edges, for reference, wherein the upstream nodes can be considered as the upstream stages connected by parallel edges Corresponding worker nodes (eg, PS nodes corresponding to PS stage shown in FIG. 8A ), downstream nodes can be considered as worker nodes corresponding to downstream stages connected by parallel edges (eg, worker nodes corresponding to Worker stage shown in FIG. 8A ).
表1Table 1
基于连接边的顺序边、并行边的物理属性,以及下游节点的调度类型等内容,本申请实施例能够完整的描述各种复杂的DAG逻辑,从而支持各种负载,例如:批处理作业,流式作业,近实时/准实时作业,深度学习作业等。Based on the sequential edges of connecting edges, the physical properties of parallel edges, and the scheduling types of downstream nodes, the embodiments of the present application can completely describe various complex DAG logics, thereby supporting various loads, such as batch jobs, streaming type jobs, near-real-time/quasi-real-time jobs, deep learning jobs, etc.
针对深度学习作业,以PS作业为例,由于Worker节点可使用和更新Parameter,并且Worker节点在PS节点运行后才能运行,因此在深度学习作业的执行计划中,PS stage和Worker stage之间的连接边为并行边,且并行边由PS stage输入Worker stage;也就是说,在执行计划中,Worker stage作为PS stage的直接下游stage,并且通过并行边连接。由于Worker只能在PS开始运行后运行才有意义,因此下游的Worker节点的调度类型为SOURCE_TASK_STARTED,也就是说,上游的PS节点的实例启动后,开始调度下游的Worker节点。在数据生成方面,由于PS节点上的数据仅在其处于运行状态时有效,且上游数据源不感知下游状态,因此数据源类型为EPHEMERAL_STATELESS(短暂无状态)的类型;而在数据传输类型方面,由于PS节点与Worker节点的数据传输可以不由执行框架感知,因此数据传输类型为NONE(空)。因此连接PS stage和Worker stage的连接边可以描述为{CONCURRENT,EPHEMERAL_STATELESS,NONE,SOURCE_TASK_PROGRESS}。For deep learning jobs, take PS jobs as an example, because the Worker node can use and update Parameter, and the Worker node can run after the PS node runs, so in the execution plan of the deep learning job, the connection between the PS stage and the Worker stage The edge is a parallel edge, and the parallel edge is input to the Worker stage from the PS stage; that is, in the execution plan, the Worker stage acts as a direct downstream stage of the PS stage and is connected by the parallel edge. Since the Worker only makes sense to run after the PS starts running, the scheduling type of the downstream Worker node is SOURCE_TASK_STARTED, that is, after the instance of the upstream PS node is started, the downstream Worker node starts to be scheduled. In terms of data generation, since the data on the PS node is only valid when it is in the running state, and the upstream data source does not perceive the downstream state, the data source type is EPHEMERAL_STATELESS (ephemeral stateless); and in terms of data transmission type, Since the data transmission between the PS node and the Worker node may not be perceived by the execution framework, the data transmission type is NONE (null). So the connecting edge connecting PS stage and Worker stage can be described as {CONCURRENT, EPHEMERAL_STATELESS, NONE, SOURCE_TASK_PROGRESS}.
基于上述描述,本申请实施例在深度学习作业的执行过程中,PS节点与Worker节点可以同时运行,同时Worker节点必须在PS节点运行起来以后才能调度资源进行数据处理。这种针对深度学习作业的执行计划的描述,使得作业运行过程中的动态调整成为可能。具体来说,深度学习的执行引擎需要终端用户为执行计划提供大量的配置参数,例如stage的并发度、需求的资源大小与类型、分布策略等,而这些配置参数由终端用户提供的难度极大。基于此,本申请实施例在分布式系统的执行引擎引入Resource Optimization(资源优化)节点,资源优化节点作为一个作业执行的控制节点,可以对资源相关请求进行协调和动态调整。Based on the above description, during the execution of a deep learning job in this embodiment of the present application, the PS node and the Worker node can run at the same time, and the Worker node can only schedule resources for data processing after the PS node is running. This description of the execution plan for a deep learning job makes it possible to dynamically adjust the job's running process. Specifically, the execution engine of deep learning requires end users to provide a large number of configuration parameters for the execution plan, such as stage concurrency, required resource size and type, distribution strategy, etc. These configuration parameters are extremely difficult to provide by end users . Based on this, the embodiment of the present application introduces a Resource Optimization (resource optimization) node into the execution engine of the distributed system. The resource optimization node, as a control node for job execution, can coordinate and dynamically adjust resource-related requests.
在一个示例中,以PS作业为例,本申请实施例在PS节点,Worker节点以外添加一个新的资源优化节点。资源优化节点负责根据一定的规则来决定如何动态调整Worker节点的资源。在此基础上,除了在执行计划中设置由PS stage输入Worker stage的并行边外,还需在执行计划中增加设置与Resource Optimization节点对应的Resource Optimizationstage(资源优化执行阶段),以及由Resource Optimization stage输入Worker stage的并行边。连接Resource Optimization stage和Worker stage的连接边可以描述为{CONCURRENT,EPHEMERAL_STATELESS,NONE,SOURCE_TASK_PROGRESS}。与连接PS stage和Worker stage的连接边的描述不同的是,连接Resource Optimization stage和Workerstage的连接边采用了下游节点是SOURCE_TASK_PROGRESS的调度类型。也就是说,Worker节点需要在上游的Resource Optimization节点的执行进度达到一定阈值后进行调度。In an example, taking a PS job as an example, this embodiment of the present application adds a new resource optimization node other than the PS node and the Worker node. The resource optimization node is responsible for determining how to dynamically adjust the resources of the worker node according to certain rules. On this basis, in addition to setting the parallel edge input from the PS stage to the Worker stage in the execution plan, it is also necessary to add the Resource Optimization stage (resource optimization execution stage) corresponding to the Resource Optimization node in the execution plan, and the Resource Optimization stage Enter the parallel edges of the Worker stage. The connecting edge connecting the Resource Optimization stage and the Worker stage can be described as {CONCURRENT, EPHEMERAL_STATELESS, NONE, SOURCE_TASK_PROGRESS}. Different from the description of the connecting edge connecting the PS stage and the Worker stage, the connecting edge connecting the Resource Optimization stage and the Worker stage adopts the scheduling type whose downstream node is SOURCE_TASK_PROGRESS. That is, the Worker node needs to be scheduled after the execution progress of the upstream Resource Optimization node reaches a certain threshold.
在作业执行过程中,PS节点和Resource Optimization节点会先启动,当PS节点的实例启动后会通知下游Worker节点进行调度,并且Resource Optimization节点的实例进度达到阈值时通知下游Worker节点进行调度。而Worker节点收到上述两个通知后,可基于Resource Optimization节点通知,更新调度的资源后启动相应实例(资源基于ResourceOptimization节点的控制而动态变化)。在一些实施例中,Resource Optimization节点可以动态调整Worker节点的以下资源:节点并发度,节点的GPU、CPU、MEMORY等资源的使用需求。During job execution, the PS node and the Resource Optimization node will be started first. When the instance of the PS node is started, the downstream Worker node will be notified for scheduling, and when the instance progress of the Resource Optimization node reaches the threshold, the downstream Worker node will be notified for scheduling. After receiving the above two notifications, the Worker node can start the corresponding instance after updating the scheduled resource based on the notification of the Resource Optimization node (the resource changes dynamically based on the control of the Resource Optimization node). In some embodiments, the Resource Optimization node can dynamically adjust the following resources of the Worker node: node concurrency, and usage requirements of resources such as GPU, CPU, and MEMORY of the node.
作为可选实现,图8B示出了本申请实施例提供的分布式作业调整方法的又再一流程图。该方法可由主节点执行实现。参照图8B,该方法流程可以包括如下步骤。As an optional implementation, FIG. 8B shows yet another flowchart of the distributed job adjustment method provided by the embodiment of the present application. This method can be implemented by the master node. Referring to FIG. 8B , the method flow may include the following steps.
在步骤S810中,获取用户提交的深度学习作业。In step S810, the deep learning assignment submitted by the user is acquired.
在步骤S811中,生成深度学习作业的执行计划,所述执行计划包括多个stage,该多个stage包括:Worker stage和Resource Optimization stage。In step S811, an execution plan of the deep learning job is generated, where the execution plan includes multiple stages, and the multiple stages include: a Worker stage and a Resource Optimization stage.
在一些实施例中,Resource Optimization stage通过并行边输入Worker stage。In some embodiments, the Resource Optimization stage feeds the Worker stage through parallel edges.
在进一步的一些实施例中,该多个stage包括还可以包括:PS stage。其中,PSstage和Resource Optimization stage分别通过并行边输入Worker stage。In some further embodiments, the plurality of stages may further include: a PS stage. Among them, the PS stage and the Resource Optimization stage are respectively input to the Worker stage through parallel edges.
在本申请实施例中,深度学习作业(例如PS作业)可以被描述成PS、Worker和Resource Optimization的结构。也就是说,在生成深度学习作业的执行计划时(执行计划可由DAG描述),执行计划中可以包括PS stage、Worker stage以及Resource Optimizationstage。并且在DAG图中,PS stage由并行边输入Worker stage,Resource Optimizationstage也由并行边输入Worker stage。在进一步的一些实施例中,PS stage和Worker stage的并行边描述,以及Resource Optimization stage和Worker stage的并行边描述,可参照前文相应的描述,此处不再展开。In this embodiment of the present application, a deep learning job (for example, a PS job) can be described as a structure of PS, Worker, and Resource Optimization. That is to say, when generating an execution plan of a deep learning job (the execution plan can be described by DAG), the execution plan can include PS stage, Worker stage, and Resource Optimization stage. And in the DAG diagram, the PS stage is input to the Worker stage from the parallel side, and the Resource Optimization stage is also input to the Worker stage from the parallel side. In some further embodiments, for the description of the parallel edges of the PS stage and the Worker stage, and the description of the parallel edges of the Resource Optimization stage and the Worker stage, reference may be made to the corresponding descriptions above, which will not be expanded here.
在步骤S812中,在深度学习作业的执行过程中,调度Resource Optimizationstage对应的Resource Optimization节点,通过Resource Optimization节点确定当前适应于Worker stage的资源信息。In step S812, during the execution of the deep learning job, the Resource Optimization node corresponding to the Resource Optimization stage is scheduled, and the resource information currently adapted to the Worker stage is determined through the Resource Optimization node.
在本申请实施例中,Resource Optimization节点可以先于Worker节点进行调度,例如Resource Optimization节点的实例进度达到阈值时,才通知Worker节点进行调度。在一些实施例中,Resource Optimization节点调度之后,Resource Optimization节点可确定当前适应于Worker stage的资源信息。在一些实施例中,Resource Optimization节点可确定与深度学习作业的当前执行状态相匹配的历史使用的资源信息,将所述当前执行状态相匹配的历史使用的资源信息,作为当前适应于Worker stage的资源信息。In this embodiment of the present application, the Resource Optimization node may be scheduled before the Worker node. For example, when the instance progress of the Resource Optimization node reaches a threshold, the Worker node is notified to perform scheduling. In some embodiments, after the Resource Optimization node is scheduled, the Resource Optimization node may determine resource information currently adapted to the Worker stage. In some embodiments, the Resource Optimization node may determine historically used resource information that matches the current execution state of the deep learning job, and uses the historically used resource information that matches the current execution state as the resource information currently adapted to the Worker stage. resource information.
作为可选实现,与深度学习作业的当前执行状态相匹配的历史使用的资源信息可以包括:与所述当前执行状态相同或相似的历史执行状态历史使用的资源信息。As an optional implementation, the historically used resource information matching the current execution state of the deep learning job may include: historically used resource information of a historical execution state that is the same as or similar to the current execution state.
在一些实施例中,Resource Optimization节点可基于深度学习作业的当前执行状态,从历史数据库中确定适应于Worker stage的资源信息,历史数据库可以记录历史深度学习作业在各个历史执行状态实际使用的资源信息,也就是说,历史数据库可以记录历史执行结束的深度学习作业在各个执行状态实际使用的资源信息。从而ResourceOptimization节点可基于历史数据库中的记录,确定出适应于深度学习作业的当前执行状态的资源信息,来为Worker节点进行资源配置。In some embodiments, the Resource Optimization node can determine resource information suitable for the Worker stage from a historical database based on the current execution state of the deep learning job, and the historical database can record the resource information actually used by the historical deep learning job in each historical execution state , that is to say, the historical database can record the resource information actually used by the deep learning jobs whose historical execution ends in each execution state. Therefore, the ResourceOptimization node can determine resource information suitable for the current execution state of the deep learning job based on the records in the historical database, and configure resources for the worker node.
在一些实施例中,Resource Optimization节点可基于深度学习作业的当前执行状态,从历史数据库中查找与所述当前执行状态相似或者相同的历史执行状态,将查找的历史执行状态实际使用的资源信息,确定为当前适应于Worker stage的资源信息。作为可选实现,Resource Optimization节点可先从历史数据库中查找与当前执行状态相同的历史执行状态,若查找到,则将历史数据库中记录的该相同的历史执行状态相应的资源信息,作为当前适应于Worker stage的资源信息;若未查找到,则查找与当前执行状态相似的历史执行状态(例如查找与当前执行状态的差异最小的历史执行状态),将历史数据库中记录的该相似的历史执行状态相应的资源信息,作为当前适应于Worker stage的资源信息。在一些实施例中,深度学习作业的当前执行状态例如深度学习作业的当前学习模式、当前输入数据的特点(例如当前训练数据的数据量)、当前剩余的参数迭代次数等。In some embodiments, the Resource Optimization node may, based on the current execution state of the deep learning job, search a historical database for a historical execution state similar to or the same as the current execution state, and use the resource information actually used by the searched historical execution state, Determined as the resource information currently adapted to the Worker stage. As an optional implementation, the Resource Optimization node can first search the historical database for the same historical execution state as the current execution state. If found, the resource information corresponding to the same historical execution state recorded in the historical database will be used as the current adaptation. Resource information in the Worker stage; if not found, search for a historical execution state similar to the current execution state (for example, find the historical execution state with the smallest difference from the current execution state), and record the similar historical execution state in the historical database. The resource information corresponding to the state is used as the resource information currently adapted to the Worker stage. In some embodiments, the current execution status of the deep learning job is, for example, the current learning mode of the deep learning job, the characteristics of the current input data (eg, the data volume of the current training data), the current remaining number of parameter iterations, and the like.
在步骤S813中,通过Resource Optimization节点,为Worker stage配置所述资源信息。In step S813, the resource information is configured for the Worker stage through the Resource Optimization node.
通过Resource Optimization节点确定出当前适应于Worker stage的资源信息(例如,与所述当前执行状态相匹配的历史使用的资源信息)后,主节点可通过ResourceOptimization节点为执行计划中的Worker stage配置该资源信息,以使得Worker stage配置的资源信息适应于深度学习作业的当前执行状态。进而,Worker节点调度时,Worker节点可基于所述资源信息调度使用的资源,使得Worker节点以合理的资源执行task,提升Worker节点的资源利用率。After determining the resource information currently suitable for the Worker stage through the Resource Optimization node (for example, the historically used resource information matching the current execution state), the master node can configure the resource for the Worker stage in the execution plan through the ResourceOptimization node information, so that the resource information configured by the Worker stage is adapted to the current execution state of the deep learning job. Furthermore, when the Worker node is scheduled, the Worker node can schedule resources to be used based on the resource information, so that the Worker node executes tasks with reasonable resources and improves the resource utilization rate of the Worker node.
在一些实施例中,Worker节点需在PS节点的实例启动后,以及ResourceOptimization节点的实例进度达到阈值时进行调度。In some embodiments, the Worker node needs to be scheduled after the instance of the PS node is started and when the progress of the instance of the ResourceOptimization node reaches a threshold.
在一些实施例中,如果用户事先指定了Worker stage的资源信息,则ResourceOptimization节点可基于确定的当前适应于Worker stage的资源信息,对用户指定的资源信息进行调整,避免用户由于过量指定Worker stage的资源而导致资源闲置的情况。例如,用户指定了Worker节点使用2个CPU core(核)和1个GPU core(核)的资源单位,如果Resource Optimization节点基于深度学习作业的当前执行状态确定Worker节点实际不需要使用这么多的资源,则Resource Optimization节点可将用户事先指定的CPU core和GPUcore的数量进行调整,例如调整为Worker节点使用为1个CPU core和半个GPU core,避免由于用户为Worker节点指定过多数量的CPU core和GPU core,而导致CPU和GPU资源的闲置浪费。In some embodiments, if the user specifies the resource information of the Worker stage in advance, the ResourceOptimization node can adjust the resource information specified by the user based on the determined resource information currently suitable for the Worker stage, so as to prevent the user from specifying too much of the Worker stage. resources that lead to idle resources. For example, the user specifies that the Worker node uses 2 CPU cores (cores) and 1 GPU core (core) resource units. If the Resource Optimization node determines based on the current execution state of the deep learning job, the Worker node does not actually need to use so many resources. , the Resource Optimization node can adjust the number of CPU cores and GPUcores specified by the user in advance. For example, it can be adjusted to use 1 CPU core and half a GPU core for the Worker node, so as to avoid the excessive number of CPU cores specified by the user for the Worker node. and GPU core, resulting in idle waste of CPU and GPU resources.
作为一些实现示例,图8C示出了Resource Optimization节点调整Worker节点的资源示例。如图8C所示,Worker节点计划配置的资源为200个CPU核和100个GPU核(可由用户指定),在深度学习作业的执行过程中,Resource Optimization节点可基于作业的当前执行状态,从历史数据库中查找相似或者相同的历史执行状态实际使用的资源信息,例如与作业的当前执行状态相似或者相同的历史执行状态实际使用100个CPU核和50个GPU核,则Resource Optimization节点可将Worker节点配置的CPU核数量调整为100个、GPU核数量调整为50个,使得Worker节点能够通过配置较低的资源,来保障深度学习作业的具体执行。As some implementation examples, Figure 8C shows an example of the Resource Optimization node adjusting the resources of the Worker node. As shown in Figure 8C, the planned resources of the Worker node are 200 CPU cores and 100 GPU cores (which can be specified by the user). During the execution of the deep learning job, the Resource Optimization node can be based on the current execution status of the job, from the historical Find the resource information actually used by the similar or the same historical execution state in the database. For example, if the current execution state of the job is similar or the same historical execution state actually uses 100 CPU cores and 50 GPU cores, the Resource Optimization node can convert the Worker node The configured number of CPU cores is adjusted to 100, and the number of GPU cores is adjusted to 50, so that worker nodes can ensure the specific execution of deep learning jobs by configuring lower resources.
在进一步的一些实施例中,Resource Optimization节点可以预先设置Worker节点的多个资源需求方案,Resource Optimization节点可以根据的深度学习作业的当前执行状态,从多个资源需求方案中选择与当前执行状态相匹配的资源需求方案,并以选择的资源需求方案为Worker stage配置资源信息。与当前执行状态相匹配的资源需求方案可以认为是能够满足当前执行状态的作业执行,且资源需求量最少的方案。In some further embodiments, the Resource Optimization node can preset multiple resource demand schemes of the Worker node, and the Resource Optimization node can select from multiple resource demand schemes according to the current execution status of the deep learning job, which is related to the current execution status. Match the resource requirement scheme, and configure resource information for the Worker stage with the selected resource requirement scheme. The resource requirement scheme that matches the current execution state can be considered as the scheme that can satisfy the job execution of the current execution state and has the least amount of resource requirements.
本申请实施例可基于深度学习作业的当前执行状态,为Worker节点配置精度更高的资源,能够在保障Worker节点执行深度学习作业的执行性能不受影响的前提下,节约分布式系统计算资源,提升分布式系统的资源利用率。本申请实施例可以在大规模生产集群上线使用后,能够在不影响用户针对深度学习的实际训练耗时的前提下,大幅提升深度学习作业的资源(尤其是GPU资源)的利用率,同时带来了作业的吞吐率大幅度提升,用户作业排队的情况能够得到极大缓解。The embodiments of the present application can configure resources with higher precision for the worker nodes based on the current execution state of the deep learning jobs, which can save the computing resources of the distributed system on the premise that the execution performance of the deep learning jobs executed by the worker nodes is not affected. Improve the resource utilization of distributed systems. The embodiments of the present application can greatly improve the utilization rate of resources (especially GPU resources) of deep learning jobs after the large-scale production cluster goes online, without affecting the actual training time of users for deep learning. The throughput rate of incoming jobs is greatly improved, and the situation of user job queuing can be greatly alleviated.
本申请实施例能够可对Map-Reduce、离线作业运行模式等的执行计划进行描述扩展,使得执行计划能准确的描述包括PS在内的深度学习作业。这种执行计划的描述方式,能够避免工作节点的资源空转等待,并能够提供更准确的作业执行控制流程。并且,本申请实施例能够在作业运行期间,对深度学习作业所需要实际使用的GPU等资源进行动态选择和调整,确保资源的使用能根据作业实际运行需求和算法特点进行适配,保障高效的资源使用,从而避免了大规模多用户的分布式系统中,超量资源申请以及实际资源使用率较低的矛盾。The embodiment of the present application can describe and expand the execution plan of Map-Reduce, offline job running mode, etc., so that the execution plan can accurately describe the deep learning operation including PS. This way of describing the execution plan can avoid the idle waiting of resources of the worker nodes, and can provide a more accurate job execution control flow. In addition, the embodiments of the present application can dynamically select and adjust resources such as GPUs that are actually used by deep learning operations during the operation of the job, so as to ensure that the use of resources can be adapted according to the actual operation requirements of the job and the characteristics of the algorithm, ensuring efficient operation. resource usage, thus avoiding the contradiction between excessive resource application and low actual resource utilization in a large-scale multi-user distributed system.
本申请实施例还提供一种主节点,该主节点可被配置为执行本申请实施例提供的分布式作业调整方法。The embodiment of the present application further provides a master node, where the master node can be configured to execute the distributed job adjustment method provided by the embodiment of the present application.
本申请实施例还提供一种分布式系统,该分布式系统的结构可以结合前文相应部分的描述,该分布式系统可以包括上述所述的主节点。The embodiment of the present application further provides a distributed system, the structure of the distributed system may be combined with the description in the corresponding part above, and the distributed system may include the above-mentioned master node.
本申请实施例还提供一种物理机,该物理机可以设置本申请实施例提供的主节点。作为一种可选实现,图9示出了物理机的结构框图。如图9所示,该物理机可以包括:至少一个处理器1,至少一个通信接口2,至少一个存储器3和至少一个通信总线4。在本申请实施例中,处理器1、通信接口2、存储器3、通信总线4的数量为至少一个,且处理器1、通信接口2、存储器3通过通信总线4完成相互间的通信。可选的,通信接口2可以为用于进行网络通信的通信模块的接口。可选的,处理器1可能是CPU(中央处理器),GPU(Graphics ProcessingUnit,图形处理器),NPU(嵌入式神经网络处理器),FPGA(Field Programmable GateArray,现场可编程逻辑门阵列),TPU(张量处理单元),AI芯片,特定集成电路ASIC(Application Specific Integrated Circuit),或者是被配置成实施本申请实施例的一个或多个集成电路等。存储器3可能包含高速RAM存储器,也可能还包括非易失性存储器(non-volatile memory),例如至少一个磁盘存储器。其中,存储器3存储一条或多条计算机可执行指令,处理器1调用所述一条或多条计算机可执行指令,以执行本申请实施例提供的分布式作业调整方法。The embodiment of the present application further provides a physical machine, and the physical machine can be set with the master node provided by the embodiment of the present application. As an optional implementation, FIG. 9 shows a structural block diagram of a physical machine. As shown in FIG. 9 , the physical machine may include: at least one
本申请实施例还提供一种存储介质,该存储介质可以存储一条或多条计算机可执行指令,该一条或多条计算机可执行指令被执行时,实现如本申请实施例提供的分布式作业调整方法。Embodiments of the present application further provide a storage medium, where the storage medium can store one or more computer-executable instructions. When the one or more computer-executable instructions are executed, the distributed job adjustment as provided by the embodiments of the present application is implemented. method.
本申请实施例还提供一种计算机程序,该计算机程序用于执行如本申请实施例提供的分布式作业调整方法。The embodiment of the present application further provides a computer program, and the computer program is used to execute the distributed job adjustment method provided by the embodiment of the present application.
上文描述了本申请实施例提供的多个实施例方案,各实施例方案介绍的各可选方式可在不冲突的情况下相互结合、交叉引用,从而延伸出多种可能的实施例方案,这些均可认为是本申请实施例披露、公开的实施例方案。The multiple embodiments provided by the embodiments of the present application have been described above, and the optional modes introduced by the embodiments can be combined and cross-referenced without conflict, thereby extending a variety of possible embodiments. All of these can be considered as embodiments disclosed and disclosed in the embodiments of the present application.
虽然本申请实施例披露如上,但本申请并非限定于此。任何本领域技术人员,在不脱离本申请的精神和范围内,均可作各种更动与修改,因此本申请的保护范围应当以权利要求所限定的范围为准。Although the embodiments of the present application are disclosed as above, the present application is not limited thereto. Any person skilled in the art can make various changes and modifications without departing from the spirit and scope of the present application. Therefore, the protection scope of the present application should be based on the scope defined by the claims.
Claims (10)
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202111583453.8A CN114490027B (en) | 2021-08-18 | 2021-08-18 | Distributed job adjustment method, master node, system, physical machine and storage medium |
Applications Claiming Priority (2)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202110950182.9A CN113407354B (en) | 2021-08-18 | 2021-08-18 | Distributed job adjustment method, master node, system, physical machine, and storage medium |
CN202111583453.8A CN114490027B (en) | 2021-08-18 | 2021-08-18 | Distributed job adjustment method, master node, system, physical machine and storage medium |
Related Parent Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202110950182.9A Division CN113407354B (en) | 2021-08-18 | 2021-08-18 | Distributed job adjustment method, master node, system, physical machine, and storage medium |
Publications (2)
Publication Number | Publication Date |
---|---|
CN114490027A true CN114490027A (en) | 2022-05-13 |
CN114490027B CN114490027B (en) | 2025-01-24 |
Family
ID=77688654
Family Applications (2)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202111583453.8A Active CN114490027B (en) | 2021-08-18 | 2021-08-18 | Distributed job adjustment method, master node, system, physical machine and storage medium |
CN202110950182.9A Active CN113407354B (en) | 2021-08-18 | 2021-08-18 | Distributed job adjustment method, master node, system, physical machine, and storage medium |
Family Applications After (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202110950182.9A Active CN113407354B (en) | 2021-08-18 | 2021-08-18 | Distributed job adjustment method, master node, system, physical machine, and storage medium |
Country Status (1)
Country | Link |
---|---|
CN (2) | CN114490027B (en) |
Families Citing this family (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN116028198A (en) * | 2021-10-25 | 2023-04-28 | 中国石油化工股份有限公司 | A computing resource allocation method, storage medium and device |
CN114065946A (en) * | 2021-11-11 | 2022-02-18 | 杭州海康威视数字技术股份有限公司 | Inference method and device based on Flink framework and Flink service system |
CN114443633A (en) * | 2022-01-11 | 2022-05-06 | 北京沃东天骏信息技术有限公司 | Data processing method and device, storage medium and electronic equipment |
CN116719625A (en) * | 2023-06-21 | 2023-09-08 | 抖音视界有限公司 | Data processing method, device, equipment and readable storage medium |
Citations (9)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN105117286A (en) * | 2015-09-22 | 2015-12-02 | 北京大学 | Task scheduling and pipelining executing method in MapReduce |
CN105956021A (en) * | 2016-04-22 | 2016-09-21 | 华中科技大学 | Automated task parallel method suitable for distributed machine learning and system thereof |
CN107612886A (en) * | 2017-08-15 | 2018-01-19 | 中国科学院大学 | A kind of Spark platforms Shuffle process compresses algorithm decision-making techniques |
CN108762902A (en) * | 2018-05-22 | 2018-11-06 | 齐鲁工业大学 | Multi-scenario tasks dispatching method and device in Distributed Calculation based on Spark |
CN109951438A (en) * | 2019-01-15 | 2019-06-28 | 中国科学院信息工程研究所 | A communication optimization method and system for distributed deep learning |
CN110673794A (en) * | 2019-09-18 | 2020-01-10 | 中兴通讯股份有限公司 | Distributed data equalization processing method and device, computing terminal and storage medium |
US20200076884A1 (en) * | 2018-08-31 | 2020-03-05 | Nakamoto & Turing Labs Inc | Methods and apparatus for performing distributed computing using blockchain |
CN110955553A (en) * | 2019-12-06 | 2020-04-03 | 南京录信软件技术有限公司 | Data tilt overload protection method |
US20200293916A1 (en) * | 2019-03-14 | 2020-09-17 | Yadong Li | Distributed system generating rule compiler engine apparatuses, methods, systems and media |
Family Cites Families (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
US10108683B2 (en) * | 2015-04-24 | 2018-10-23 | International Business Machines Corporation | Distributed balanced optimization for an extract, transform, and load (ETL) job |
CN106815071A (en) * | 2017-01-12 | 2017-06-09 | 上海轻维软件有限公司 | Big data job scheduling system based on directed acyclic graph |
CN110502337B (en) * | 2019-07-12 | 2023-02-07 | 上海交通大学 | An optimized system for the shuffle phase in Hadoop MapReduce |
CN113157413B (en) * | 2021-04-16 | 2022-04-26 | 上海交通大学 | Method and system for optimal allocation of deep learning task resources based on service quality requirements |
-
2021
- 2021-08-18 CN CN202111583453.8A patent/CN114490027B/en active Active
- 2021-08-18 CN CN202110950182.9A patent/CN113407354B/en active Active
Patent Citations (9)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN105117286A (en) * | 2015-09-22 | 2015-12-02 | 北京大学 | Task scheduling and pipelining executing method in MapReduce |
CN105956021A (en) * | 2016-04-22 | 2016-09-21 | 华中科技大学 | Automated task parallel method suitable for distributed machine learning and system thereof |
CN107612886A (en) * | 2017-08-15 | 2018-01-19 | 中国科学院大学 | A kind of Spark platforms Shuffle process compresses algorithm decision-making techniques |
CN108762902A (en) * | 2018-05-22 | 2018-11-06 | 齐鲁工业大学 | Multi-scenario tasks dispatching method and device in Distributed Calculation based on Spark |
US20200076884A1 (en) * | 2018-08-31 | 2020-03-05 | Nakamoto & Turing Labs Inc | Methods and apparatus for performing distributed computing using blockchain |
CN109951438A (en) * | 2019-01-15 | 2019-06-28 | 中国科学院信息工程研究所 | A communication optimization method and system for distributed deep learning |
US20200293916A1 (en) * | 2019-03-14 | 2020-09-17 | Yadong Li | Distributed system generating rule compiler engine apparatuses, methods, systems and media |
CN110673794A (en) * | 2019-09-18 | 2020-01-10 | 中兴通讯股份有限公司 | Distributed data equalization processing method and device, computing terminal and storage medium |
CN110955553A (en) * | 2019-12-06 | 2020-04-03 | 南京录信软件技术有限公司 | Data tilt overload protection method |
Non-Patent Citations (2)
Title |
---|
YINGGEN XU: ""DAG-Aware joint task scheduling and cache management in spark clusters"", 《2020 IEEE INTERNATIONAL PARALLEL AND DISTRIBUTED PROCESSING SYMPOSIUM》, 14 July 2020 (2020-07-14) * |
张凯: ""基于分布式缓存加速容器化深度学习的优化方法"", 《大数据》, 17 June 2021 (2021-06-17) * |
Also Published As
Publication number | Publication date |
---|---|
CN113407354B (en) | 2022-01-21 |
CN114490027B (en) | 2025-01-24 |
CN113407354A (en) | 2021-09-17 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN114490027B (en) | Distributed job adjustment method, master node, system, physical machine and storage medium | |
US11249997B1 (en) | System-wide query optimization | |
US9852204B2 (en) | Read-only operations processing in a paxos replication system | |
CN104360903B (en) | The method that task data decoupling is realized in Spark job scheduling systems | |
CN104885078B (en) | For the method for the Two-phrase query optimization in MPP data-base cluster | |
CN104050042B (en) | The resource allocation methods and device of ETL operations | |
CN105956021A (en) | Automated task parallel method suitable for distributed machine learning and system thereof | |
WO2023179415A1 (en) | Machine learning computation optimization method and platform | |
CN116089414B (en) | Time series database writing performance optimization method and device based on massive data scenarios | |
CN114756629B (en) | Multi-source heterogeneous data interaction analysis engine and method based on SQL | |
CN116108057A (en) | A distributed database access method, device, equipment and storage medium | |
CN118733282A (en) | A distributed training method, device, equipment and medium based on heterogeneous devices | |
Wolf et al. | On the optimization of schedules for MapReduce workloads in the presence of shared scans | |
CN113434302A (en) | Distributed job execution method, master node, system, physical machine, and storage medium | |
CN112015765B (en) | Spark cache elimination method and system based on cache value | |
CN116302574A (en) | Concurrent processing method based on MapReduce | |
CN110083609B (en) | Real-time query method for graph structure data in rail transit network passenger flow data | |
CN112000649A (en) | Incremental data synchronization method and device based on map reduce | |
Dong et al. | Optimization of service scheduling in computing force network | |
CN104063230B (en) | The parallel reduction method of rough set based on MapReduce, apparatus and system | |
CN114443686A (en) | Compression graph construction method and device based on relational data | |
CN114996299A (en) | Plan execution method, device and system for distributed database | |
WO2023273157A1 (en) | Workflow generation method and apparatus, and device and storage medium | |
CN108984308A (en) | A kind of cloud data processing method and system based on workload | |
CN118035135B (en) | Cache replacement method and storage medium |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |