[go: up one dir, main page]

CN111767092B - Job Execution Method, Device, System, and Computer-Readable Storage Medium - Google Patents

Job Execution Method, Device, System, and Computer-Readable Storage Medium Download PDF

Info

Publication number
CN111767092B
CN111767092B CN202010624055.5A CN202010624055A CN111767092B CN 111767092 B CN111767092 B CN 111767092B CN 202010624055 A CN202010624055 A CN 202010624055A CN 111767092 B CN111767092 B CN 111767092B
Authority
CN
China
Prior art keywords
spark
job
engine
target
version
Prior art date
Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
Active
Application number
CN202010624055.5A
Other languages
Chinese (zh)
Other versions
CN111767092A (en
Inventor
刘有
尹强
王和平
黄山
杨峙岳
冯朝阁
杨永坤
邸帅
卢道和
Current Assignee (The listed assignees may be inaccurate. Google has not performed a legal analysis and makes no representation or warranty as to the accuracy of the list.)
WeBank Co Ltd
Original Assignee
WeBank Co Ltd
Priority date (The priority date is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the date listed.)
Filing date
Publication date
Application filed by WeBank Co Ltd filed Critical WeBank Co Ltd
Priority to CN202010624055.5A priority Critical patent/CN111767092B/en
Publication of CN111767092A publication Critical patent/CN111767092A/en
Priority to PCT/CN2021/081960 priority patent/WO2022001209A1/en
Application granted granted Critical
Publication of CN111767092B publication Critical patent/CN111767092B/en
Active legal-status Critical Current
Anticipated expiration legal-status Critical

Links

Images

Classifications

    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/44Arrangements for executing specific programs
    • G06F9/445Program loading or initiating
    • G06F9/44505Configuring for program initiating, e.g. using registry, configuration files
    • G06F9/4451User profiles; Roaming
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/44Arrangements for executing specific programs
    • G06F9/445Program loading or initiating
    • G06F9/44521Dynamic linking or loading; Link editing at or after load time, e.g. Java class loading
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/44Arrangements for executing specific programs
    • G06F9/445Program loading or initiating
    • G06F9/44536Selecting among different versions
    • GPHYSICS
    • G06COMPUTING; CALCULATING OR COUNTING
    • G06FELECTRIC DIGITAL DATA PROCESSING
    • G06F9/00Arrangements for program control, e.g. control units
    • G06F9/06Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
    • G06F9/44Arrangements for executing specific programs
    • G06F9/448Execution paradigms, e.g. implementations of programming paradigms
    • G06F9/4488Object-oriented
    • G06F9/449Object-oriented method invocation or resolution
    • YGENERAL TAGGING OF NEW TECHNOLOGICAL DEVELOPMENTS; GENERAL TAGGING OF CROSS-SECTIONAL TECHNOLOGIES SPANNING OVER SEVERAL SECTIONS OF THE IPC; TECHNICAL SUBJECTS COVERED BY FORMER USPC CROSS-REFERENCE ART COLLECTIONS [XRACs] AND DIGESTS
    • Y02TECHNOLOGIES OR APPLICATIONS FOR MITIGATION OR ADAPTATION AGAINST CLIMATE CHANGE
    • Y02DCLIMATE CHANGE MITIGATION TECHNOLOGIES IN INFORMATION AND COMMUNICATION TECHNOLOGIES [ICT], I.E. INFORMATION AND COMMUNICATION TECHNOLOGIES AIMING AT THE REDUCTION OF THEIR OWN ENERGY USE
    • Y02D10/00Energy efficient computing, e.g. low power processors, power management or thermal management

Landscapes

  • Engineering & Computer Science (AREA)
  • Software Systems (AREA)
  • Theoretical Computer Science (AREA)
  • Physics & Mathematics (AREA)
  • General Engineering & Computer Science (AREA)
  • General Physics & Mathematics (AREA)
  • Stored Programmes (AREA)

Abstract

The invention relates to the technical field of financial science and technology, and discloses a job execution method, a job execution device, a job execution system and a computer readable storage medium. The job execution method includes: when an execution request of Spark job is received, acquiring a version number, dynamic configuration parameters and Spark job codes of a target Spark engine according to the execution request; determining deployment catalog information and version loading rules of a target Spark engine according to the version number; acquiring static configuration parameters according to the deployment catalog information, and initializing a target Spark engine by using the dynamic configuration parameters and the static configuration parameters according to the version loading rules so as to start the target Spark engine; the Spark job code is submitted to the target Spark engine to execute the job. The invention can realize that Spark operation of multiple versions is supported in one Linkis service cluster at the same time, and reduces operation and maintenance cost.

Description

作业执行方法、装置、系统及计算机可读存储介质Job Execution Method, Device, System, and Computer-Readable Storage Medium

技术领域technical field

本发明涉及金融科技(Fintech)技术领域,尤其涉及一种作业执行方法、装置、系统及计算机可读存储介质。The present invention relates to the technical field of financial technology (Fintech), in particular to a job execution method, device, system and computer-readable storage medium.

背景技术Background technique

随着计算机技术的发展,越来越多的技术应用在金融领域,传统金融业正在逐步向金融科技(Fintech)转变,Spark技术也不例外,但由于金融行业的安全性、实时性要求,也对Spark技术提出了更高的要求。With the development of computer technology, more and more technologies are applied in the financial field. The traditional financial industry is gradually transforming into financial technology (Fintech). Spark technology is no exception. However, due to the security and real-time requirements of the financial industry, it is also Higher requirements are put forward for Spark technology.

现有的大数据集群应用中,金融机构(如银行)内的大数据处理环境都比较集中,数据也非常集中,数据量也非常大。由于数据的集中处理,很多大数据应用组件也是集中部署,而这些组件的版本更新也比较频繁,每年都会好几个版本的更新,比如正在使用ApacheSpark(为大规模数据处理而设计的快速通用的计算引擎)。In existing big data cluster applications, the big data processing environment in financial institutions (such as banks) is relatively centralized, and the data is also very concentrated and the amount of data is also very large. Due to the centralized processing of data, many big data application components are also deployed centrally, and the versions of these components are updated frequently, with several versions updated every year. engine).

Spark每次的版本更新都会带来很多的新特性,但有时并不能很好的兼容旧版本的作业,而一个环境内往往存在着大量的用户,有的用户需要使用新版本的Spark,而有的用户需要旧版本的Spark,现在的Linkis(一种打通了多个计算存储引擎的数据中间件)服务集群下却只能支持一种版本的Spark作业运行,无法融合多版本的Spark同时运行。因此,为了满足这些用户多版本的需求,通常需要部署多套Linkis集群,从而需要大量的机器来部署不同版本的Spark Driver,同时,用户的作业也需要在不同的环境之间切换部署,每增加一套Linkis,就增加了运维成本和难度,用户要维护每套环境的使用也比较困难。Each version update of Spark will bring many new features, but sometimes it is not well compatible with the old version of the job, and there are often a large number of users in an environment, some users need to use the new version of Spark, and some Many users need an old version of Spark, but the current Linkis (a data middleware that connects multiple computing and storage engines) service clusters can only support one version of Spark jobs, and cannot integrate multiple versions of Spark to run at the same time. Therefore, in order to meet the multi-version requirements of these users, it is usually necessary to deploy multiple sets of Linkis clusters, which requires a large number of machines to deploy different versions of Spark Driver. At the same time, user jobs also need to be switched between different environments. A set of Linkis increases the cost and difficulty of operation and maintenance, and it is also difficult for users to maintain the use of each environment.

发明内容Contents of the invention

本发明的主要目的在于提供一种作业执行方法、装置、系统及计算机可读存储介质,旨在实现在一个Linkis服务集群内同时支持多版本的Spark作业运行,减少运维成本。The main purpose of the present invention is to provide a job execution method, device, system, and computer-readable storage medium, aiming at simultaneously supporting multiple versions of Spark job operation in a Linkis service cluster and reducing operation and maintenance costs.

为实现上述目的,本发明提供一种作业执行方法,所述作业执行方法包括:In order to achieve the above object, the present invention provides a job execution method, the job execution method includes:

在接收到Spark作业的执行请求时,根据所述执行请求获取目标Spark引擎的版本号、动态配置参数和Spark作业代码;When receiving the execution request of the Spark job, obtain the version number, dynamic configuration parameters and the Spark job code of the target Spark engine according to the execution request;

根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;Determine the deployment directory information and version loading rules of the target Spark engine according to the version number;

根据所述部署目录信息获取静态配置参数,根据所述版本加载规则使用所述动态配置参数和所述静态配置参数对所述目标Spark引擎进行初始化,以启动所述目标Spark引擎;Acquire static configuration parameters according to the deployment directory information, use the dynamic configuration parameters and the static configuration parameters to initialize the target Spark engine according to the version loading rules, to start the target Spark engine;

将所述Spark作业代码提交至所述目标Spark引擎,以执行作业。Submit the Spark job code to the target Spark engine to execute the job.

可选地,所述根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则的步骤之前,还包括:Optionally, before the step of determining the deployment directory information and version loading rules of the target Spark engine according to the version number, it also includes:

获取所述执行请求对应的用户标识,检测是否存在与所述用户标识和所述版本号对应的空闲Spark引擎;Obtain the user identification corresponding to the execution request, and detect whether there is an idle Spark engine corresponding to the user identification and the version number;

若不存在,则执行步骤:根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;If it does not exist, then execute the step: determine the deployment directory information and version loading rules of the target Spark engine according to the version number;

若存在,则将所述Spark作业代码提交至所述空闲Spark引擎,以执行作业。If it exists, submit the Spark job code to the idle Spark engine to execute the job.

可选地,所述根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则的步骤之前,还包括:Optionally, before the step of determining the deployment directory information and version loading rules of the target Spark engine according to the version number, it also includes:

获取所述执行请求对应的用户标识,根据所述用户标识判断用户是否在预设灰度名单中;Acquiring the user ID corresponding to the execution request, and judging whether the user is in the preset grayscale list according to the user ID;

若用户不在预设灰度名单中,则执行步骤:根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;If the user is not in the default grayscale list, then perform the step of: determining the deployment directory information and version loading rules of the target Spark engine according to the version number;

若用户在预设灰度名单中,则创建灰度Spark引擎,并将所述Spark作业代码提交至所述灰度Spark引擎,以执行作业。If the user is in the preset grayscale list, create a grayscale Spark engine, and submit the Spark job code to the grayscale Spark engine to execute the job.

可选地,所述作业执行方法还包括:Optionally, the job execution method further includes:

在初始化过程中,根据所述版本号和预设抽象层接口确定目标调用方法;In the initialization process, determine the target calling method according to the version number and the preset abstraction layer interface;

根据所述目标调用方法加载所述部署目录信息对应目录下的所述目标Spark引擎依赖的文件包。The file package on which the target Spark engine depends is loaded under the directory corresponding to the deployment directory information according to the target calling method.

可选地,所述将所述Spark作业代码提交至所述目标Spark引擎,以执行作业的步骤之前,还包括:Optionally, before the step of submitting the Spark job code to the target Spark engine to execute the job, it also includes:

根据所述版本号对所述Spark作业代码进行修改;Modify the Spark job code according to the version number;

所述将所述Spark作业代码提交至所述目标Spark引擎,以执行作业的步骤包括:The step of submitting the Spark job code to the target Spark engine to execute the job includes:

将修改后的Spark作业代码提交至所述目标Spark引擎,以执行作业。Submit the modified Spark job code to the target Spark engine to execute the job.

可选地,所述将所述Spark作业代码提交至所述目标Spark引擎,以执行作业的步骤包括:Optionally, the step of submitting the Spark job code to the target Spark engine to execute the job includes:

将所述Spark作业代码提交至所述目标Spark引擎的驱动器节点;Submit the Spark job code to the driver node of the target Spark engine;

通过所述驱动器节点对所述Spark作业代码进行转化,得到Spark任务;Converting the Spark job code through the driver node to obtain a Spark task;

将所述Spark任务分配至部署在Yarn集群上的执行器节点,以执行作业。The Spark task is assigned to the executor nodes deployed on the Yarn cluster to execute the job.

可选地,所述通过所述驱动器节点对所述Spark作业代码进行转化,得到Spark任务的步骤之前,还包括:Optionally, before the step of converting the Spark job code through the driver node to obtain the Spark task, it also includes:

在初始化过程中,在所述目标Spark引擎的驱动器节点中创建Scala解释器时,将主线程的类加载器注入至所述Scala解释器中,使得所述主线程的类加载器成为Scala解释器类加载器的父级,并使得所述Scala解释器根据父级的类加载器创建对应的类加载器;In the initialization process, when the Scala interpreter is created in the driver node of the target Spark engine, the class loader of the main thread is injected into the Scala interpreter, so that the class loader of the main thread becomes the Scala interpreter The parent of the class loader, and make the Scala interpreter create a corresponding class loader according to the parent's class loader;

所述通过所述驱动器节点对所述Spark作业代码进行转化,得到Spark任务的步骤包括:The step of converting the Spark job code through the driver node to obtain the Spark task includes:

通过所述驱动器节点中创建的Scala解释器的类加载器对所述Spark作业代码进行转化,得到Spark任务;The Spark job code is transformed by the class loader of the Scala interpreter created in the driver node to obtain the Spark task;

所述将所述Spark任务分配至部署在Yarn集群上的执行器节点,以执行作业的步骤之后,还包括:After the step of assigning the Spark task to the executor node deployed on the Yarn cluster to execute the job, it also includes:

在接收到所述执行器节点基于所述Spark任务返回的序列化执行结果时,将所述目标Spark引擎当前线程的类加载器修改为所述Scala解释器的类加载器,以通过所述Scala解释器的类加载器对所述序列化执行结果进行反序列化。When receiving the serialized execution result returned by the executor node based on the Spark task, modify the class loader of the current thread of the target Spark engine to the class loader of the Scala interpreter to pass the Scala The class loader of the interpreter deserializes the serialized execution result.

此外,为实现上述目的,本发明还提供一种作业执行装置,所述作业执行装置包括:In addition, in order to achieve the above purpose, the present invention also provides a job execution device, the job execution device includes:

第一获取模块,用于在接收到Spark作业的执行请求时,根据所述执行请求获取目标Spark引擎的版本号、动态配置参数和Spark作业代码;The first obtaining module is used to obtain the version number, dynamic configuration parameters and Spark job code of the target Spark engine according to the execution request when receiving the execution request of the Spark job;

第一确定模块,用于根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;The first determining module is used to determine the deployment directory information and version loading rules of the target Spark engine according to the version number;

引擎初始化模块,用于根据所述部署目录信息获取静态配置参数,根据所述版本加载规则使用所述动态配置参数和所述静态配置参数对所述目标Spark引擎进行初始化,以启动所述目标Spark引擎;An engine initialization module, configured to obtain static configuration parameters according to the deployment directory information, use the dynamic configuration parameters and the static configuration parameters to initialize the target Spark engine according to the version loading rules, to start the target Spark engine;

作业执行模块,用于将所述Spark作业代码提交至所述目标Spark引擎,以执行作业。A job execution module, configured to submit the Spark job code to the target Spark engine to execute the job.

此外,为实现上述目的,本发明还提供一种作业执行系统,所述作业执行系统包括:存储器、处理器及存储在所述存储器上并可在所述处理器上运行的作业执行程序,所述作业执行程序被所述处理器执行时实现如上所述的作业执行方法的步骤。In addition, in order to achieve the above object, the present invention also provides a job execution system, the job execution system includes: a memory, a processor, and a job execution program stored in the memory and operable on the processor, the When the job execution program is executed by the processor, the above steps of the job execution method are realized.

此外,为实现上述目的,本发明还提供一种计算机可读存储介质,所述计算机可读存储介质上存储有作业执行程序,所述作业执行程序被处理器执行时实现如上所述的作业执行方法的步骤。In addition, in order to achieve the above object, the present invention also provides a computer-readable storage medium, on which a job execution program is stored, and when the job execution program is executed by a processor, the above-mentioned job execution can be realized. method steps.

本发明提供一种作业执行方法、装置、系统及计算机可读存储介质,在接收到Spark作业的执行请求时,根据执行请求获取目标Spark引擎的版本号、动态配置参数和Spark作业代码;然后,根据版本号确定目标Spark引擎的部署目录信息和版本加载规则;根据部署目录信息获取静态配置参数,根据版本加载规则使用动态配置参数和静态配置参数对目标Spark引擎进行初始化,以启动目标Spark引擎;进而将Spark作业代码提交至目标Spark引擎,以执行作业。本发明中,通过预先对不同版本Spark中容易发生冲突的依赖的jab包进行安装部署,在接收到Spark作业的执行请求时,获取请求中的版本号以确定对应的部署目录信息,并获取对应的启动参数(包括动态配置参数和静态配置参数),进而根据启动参数对目标版本的Spark引擎进行初始化,以启动目标版本的Spark引擎,可实现不同版本Spark依赖jar包的动态加载,避免出现多版本jar冲突问题,使得多版本Spark可在同一Linkis集群下并行执行。因此,本发明只需要部署一套Linkis服务集群,在该Linkis服务集群内可创建不同版本的Spark引擎,以支持多版本Spark作业的运行,相比于现有技术中通过部署多套Linkis集群来部署不同版本的Spark引擎,本发明可在很大程度上减少了机器资源和运维成本。The present invention provides a job execution method, device, system, and computer-readable storage medium. When receiving an execution request of a Spark job, the version number, dynamic configuration parameters, and Spark job code of the target Spark engine are obtained according to the execution request; and then, Determine the deployment directory information and version loading rules of the target Spark engine according to the version number; obtain static configuration parameters according to the deployment directory information, and use the dynamic configuration parameters and static configuration parameters to initialize the target Spark engine according to the version loading rules to start the target Spark engine; Then submit the Spark job code to the target Spark engine to execute the job. In the present invention, by pre-installing and deploying dependent jab packages that are prone to conflicts in different versions of Spark, when receiving an execution request for a Spark job, obtain the version number in the request to determine the corresponding deployment directory information, and obtain the corresponding The startup parameters (including dynamic configuration parameters and static configuration parameters), and then initialize the target version of the Spark engine according to the startup parameters to start the target version of the Spark engine, which can realize the dynamic loading of different versions of Spark dependent jar packages, and avoid multiple The version jar conflict problem makes it possible for multiple versions of Spark to be executed in parallel under the same Linkis cluster. Therefore, the present invention only needs to deploy a set of Linkis service clusters, and different versions of Spark engines can be created in the Linkis service clusters to support the operation of multi-version Spark jobs. Compared with the prior art by deploying multiple sets of Linkis clusters By deploying different versions of the Spark engine, the present invention can greatly reduce machine resources and operation and maintenance costs.

附图说明Description of drawings

图1为本发明实施例方案涉及的硬件运行环境的设备结构示意图;FIG. 1 is a schematic diagram of the device structure of the hardware operating environment involved in the solution of the embodiment of the present invention;

图2为本发明作业执行方法第一实施例的流程示意图;FIG. 2 is a schematic flow chart of the first embodiment of the job execution method of the present invention;

图3为本发明作业执行系统的系统架构示意图;3 is a schematic diagram of the system architecture of the job execution system of the present invention;

图4为本发明作业执行装置第一实施例的功能模块示意图。Fig. 4 is a schematic diagram of the functional modules of the first embodiment of the job execution device of the present invention.

本发明目的的实现、功能特点及优点将结合实施例,参照附图做进一步说明。The realization of the purpose of the present invention, functional characteristics and advantages will be further described in conjunction with the embodiments and with reference to the accompanying drawings.

具体实施方式Detailed ways

应当理解,此处所描述的具体实施例仅仅用以解释本发明,并不用于限定本发明。It should be understood that the specific embodiments described here are only used to explain the present invention, not to limit the present invention.

参照图1,图1为本发明实施例方案涉及的硬件运行环境的设备结构示意图。Referring to FIG. 1 , FIG. 1 is a schematic diagram of the device structure of the hardware operating environment involved in the solution of the embodiment of the present invention.

本发明实施例作业执行设备可以是服务器,也可以是PC(Personal Computer,个人计算机)、平板电脑、便携计算机等终端设备。The job execution device in the embodiment of the present invention may be a server, or a terminal device such as a PC (Personal Computer, personal computer), a tablet computer, or a portable computer.

如图1所示,该作业执行设备可以包括:处理器1001,例如CPU,通信总线1002,用户接口1003,网络接口1004,存储器1005。其中,通信总线1002用于实现这些组件之间的连接通信。用户接口1003可以包括显示屏(Display)、输入单元比如键盘(Keyboard),可选用户接口1003还可以包括标准的有线接口、无线接口。网络接口1004可选的可以包括标准的有线接口、无线接口(如Wi-Fi接口)。存储器1005可以是高速RAM存储器,也可以是稳定的存储器(non-volatile memory),例如磁盘存储器。存储器1005可选的还可以是独立于前述处理器1001的存储装置。As shown in FIG. 1 , the job execution device may include: a processor 1001 , such as a CPU, a communication bus 1002 , a user interface 1003 , a network interface 1004 , and a memory 1005 . Wherein, the communication bus 1002 is used to realize connection and communication between these components. The user interface 1003 may include a display screen (Display), an input unit such as a keyboard (Keyboard), and the optional user interface 1003 may also include a standard wired interface and a wireless interface. Optionally, the network interface 1004 may include a standard wired interface and a wireless interface (such as a Wi-Fi interface). The memory 1005 can be a high-speed RAM memory, or a stable memory (non-volatile memory), such as a disk memory. Optionally, the memory 1005 may also be a storage device independent of the aforementioned processor 1001 .

本领域技术人员可以理解,图1中示出的作业执行设备结构并不构成对作业执行设备的限定,可以包括比图示更多或更少的部件,或者组合某些部件,或者不同的部件布置。Those skilled in the art can understand that the structure of the job execution equipment shown in Figure 1 does not constitute a limitation on the job execution equipment, and may include more or less components than those shown in the illustration, or combine certain components, or different components layout.

如图1所示,作为一种计算机存储介质的存储器1005中可以包括操作系统、网络通信模块以及作业执行程序。As shown in FIG. 1 , the memory 1005 as a computer storage medium may include an operating system, a network communication module, and a job execution program.

在图1所示的终端中,网络接口1004主要用于连接后台服务器,与后台服务器进行数据通信;用户接口1003主要用于连接客户端,与客户端进行数据通信;而处理器1001可以用于调用存储器1005中存储的作业执行程序,并执行以下作业执行方法的各个步骤。In the terminal shown in Figure 1, the network interface 1004 is mainly used to connect to the background server and perform data communication with the background server; the user interface 1003 is mainly used to connect to the client and perform data communication with the client; and the processor 1001 can be used for The job execution program stored in the memory 1005 is called, and each step of the following job execution method is executed.

基于上述硬件结构,提出本发明作业执行方法的各实施例。Based on the above hardware structure, various embodiments of the job execution method of the present invention are proposed.

本发明提供一种作业执行方法。The invention provides a job execution method.

参照图2,图2为本发明作业执行方法第一实施例的流程示意图。Referring to FIG. 2 , FIG. 2 is a schematic flowchart of a first embodiment of a job execution method according to the present invention.

在本实施例中,该作业执行方法包括:In this embodiment, the job execution method includes:

步骤S10,在接收到Spark作业的执行请求时,根据所述执行请求获取目标Spark引擎的版本号、动态配置参数和Spark作业代码;Step S10, when receiving the execution request of the Spark job, obtain the version number, dynamic configuration parameters and Spark job code of the target Spark engine according to the execution request;

本实施例的作业执行方法应用于作业执行系统,如图3所示,该作业执行系统包括Linkis集群服务和Yarn集群,其中,Linkis集群服务中包括对外服务入口和引擎管理服务,对外服务入口中包括Job作业管理模块和引擎管理器。具体的,相比于现有的作业执行系统,对引擎管理服务进行了改造,增加了多版本spark依赖jar的动态加载方案,以及类加载器的动态调整,来解决不同版本的spark作业并行执行问题。此外,在Spark作业的执行请求参数中增加了版本标识,对应的,增设了引擎管理器,用于管理不同版本和不同用户的Spark引擎服务,还用于根据版本号创建或选择引擎。The job execution method of this embodiment is applied to a job execution system. As shown in FIG. 3 , the job execution system includes a Linkis cluster service and a Yarn cluster, wherein the Linkis cluster service includes an external service portal and an engine management service, and the external service portal includes Including Job job management module and engine manager. Specifically, compared to the existing job execution system, the engine management service has been transformed, adding a dynamic loading scheme for multi-version spark dependent jars, and dynamic adjustment of the class loader to solve the parallel execution of spark jobs of different versions question. In addition, a version identifier is added to the execution request parameters of the Spark job, and an engine manager is added correspondingly, which is used to manage Spark engine services of different versions and different users, and is also used to create or select an engine according to the version number.

其中,Linkis集群服务是一个打通了多个计算存储引擎,对外提供统一Restful(一种网络应用程序的设计风格和开发方式)接口,提交执行SQL(Structured QueryLanguage,结构化查询语言)、PySpark(Spark为Python开发者提供的API)、HiveQL(HiveQuery Language,一个数据仓库工具的查询语言)、Scala(一门多范式的编程语言)等脚本的数据中间件。对外服务入口中的Job作业管理模块用于接收Spark作业的执行请求,还用于自动调整Spark作业的错误代码。引擎管理器,用于管理不同版本和不同用户的Spark引擎服务,还用于根据版本号创建或选择引擎。引擎管理服务用于管理Spark Context(Spark上下文内容)所在的进程创建、状态跟踪、进程销毁等,引擎管理服务包括一个或多个Spark引擎,多个Spark引擎可并行执行,Spark引擎通过RPC(Remote Procedure Call,远程过程调用)与对外服务入口进行通信。Yarn集群为提供大数据平台中作业调度和集群资源管理的框架。Among them, the Linkis cluster service is a multi-computing and storage engine, which provides a unified Restful (a design style and development method for network applications) interface, and submits and executes SQL (Structured Query Language), PySpark (Spark API for Python developers), HiveQL (HiveQuery Language, a query language for data warehouse tools), Scala (a multi-paradigm programming language) and other script data middleware. The job management module in the external service portal is used to receive the execution request of the Spark job, and is also used to automatically adjust the error code of the Spark job. The engine manager is used to manage Spark engine services of different versions and users, and is also used to create or select an engine based on the version number. The engine management service is used to manage the process creation, status tracking, and process destruction of the Spark Context (Spark context content). The engine management service includes one or more Spark engines, and multiple Spark engines can be executed in parallel. The Spark engine passes RPC (Remote Procedure Call, remote procedure call) communicates with the external service portal. Yarn cluster provides a framework for job scheduling and cluster resource management in big data platforms.

在本实施例中,对外服务入口在接收到Spark作业的执行请求时,根据该执行请求获取目标Spark引擎的版本号、动态配置参数和Spark作业代码。其中,动态配置参数包括执行器节点(Executor)的执行个数、CPU(Central Processing Unit,中央处理器)个数、内存大小,以及驱动器节点(Driver)的CPU个数、内存大小中的一个或多个。In this embodiment, when the external service portal receives the execution request of the Spark job, it obtains the version number of the target Spark engine, the dynamic configuration parameters and the Spark job code according to the execution request. Among them, the dynamic configuration parameters include the execution number of executor nodes (Executor), the number of CPUs (Central Processing Unit, central processing unit), the size of memory, and the number of CPUs of driver nodes (Driver), the size of memory or one or more Multiple.

步骤S20,根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;Step S20, determining the deployment directory information and version loading rules of the target Spark engine according to the version number;

然后,根据版本号确定目标Spark引擎的部署目录信息和版本加载规则。其中,部署目录信息包括根目录部署信息和配置目录部署信息,版本加载规则包括不同Spark版本依赖库的加载规则。Then, determine the deployment directory information and version loading rules of the target Spark engine according to the version number. Among them, the deployment directory information includes root directory deployment information and configuration directory deployment information, and the version loading rules include loading rules for different Spark versions dependent libraries.

需要说明的是,在实施时,需预先在引擎管理服务所在的机器建立Spark服务的根目录,然后在根目录下根据不同的Spark版本号建立Spark_home目录和Spark_conf_dir目录,在安装部署Spark的时候,就根据这个路径规则进行部署,具体的,在Spark_home目录和Spark_conf_dir目录中下设各个版本的子目录,对应的将Spark安装至对应的子目录下。需要说明的是,在安装时,需将不同版本Spark中容易发生冲突的依赖的jab包分开部署。其中,Spark_home目录为根目录,是Spark的安装目录,Spark_conf_dir目录下包括不同版本Spark的静态配置参数,如应用程序属性参数、运行环境参数、运行时行为参数、网络参数等。It should be noted that during implementation, the root directory of the Spark service needs to be established in advance on the machine where the engine management service is located, and then the Spark_home directory and the Spark_conf_dir directory are created in the root directory according to different Spark version numbers. When installing and deploying Spark, Deploy according to this path rule. Specifically, set subdirectories of each version in the Spark_home directory and Spark_conf_dir directory, and install Spark to the corresponding subdirectories accordingly. It should be noted that during installation, the dependent jab packages that are prone to conflicts in different versions of Spark need to be deployed separately. Among them, the Spark_home directory is the root directory, which is the installation directory of Spark. The Spark_conf_dir directory includes static configuration parameters of different versions of Spark, such as application property parameters, operating environment parameters, runtime behavior parameters, and network parameters.

此外,还需要说明的是,在安装部署不同版本的Spark之前,为实现Spark引擎本身多版本的支持,可先对不同版本的Spark包的API接口进行比对,获取差异API,根据所述差异API对不同版本的Spark包进行编译,进而,将编译得到的Spark包部署至对应的目录下。即,先对Spark包进行编译,将API(Application Programming Interface,应用程序接口)有变化的部分独立出来,进而将经过编译处理的Spark包进行安装部署。比如,对于Vector(向量)相关的API那么我们可以剥离出两个工程,每个工程适配对应的版本,然后发布jar包,在Maven(麦文,项目对象模型)中根据Profile(用户配置文件)机制,根据Spark版本引入不同的适配包。In addition, it should be noted that before installing and deploying different versions of Spark, in order to support multiple versions of the Spark engine itself, you can first compare the API interfaces of different versions of the Spark package to obtain the difference API. The API compiles different versions of the Spark package, and then deploys the compiled Spark package to the corresponding directory. That is, the Spark package is compiled first, and the changed parts of the API (Application Programming Interface, application program interface) are separated, and then the compiled Spark package is installed and deployed. For example, for APIs related to Vector (vector), we can separate two projects, each project is adapted to the corresponding version, and then release the jar package, and in Maven (Maven, project object model) according to the Profile (user configuration file ) mechanism to introduce different adaptation packages according to the Spark version.

步骤S30,根据所述部署目录信息获取静态配置参数,根据所述版本加载规则使用所述动态配置参数和所述静态配置参数对所述目标Spark引擎进行初始化,以启动所述目标Spark引擎;Step S30, obtaining static configuration parameters according to the deployment directory information, using the dynamic configuration parameters and the static configuration parameters to initialize the target Spark engine according to the version loading rules, so as to start the target Spark engine;

然后,根据部署目录信息获取静态配置参数,具体的,可获取Spark_conf_dir目录下的静态配置参数,如应用程序属性参数、运行环境参数、运行时行为参数、网络参数等。进而,根据版本加载规则使用启动参数(包括动态配置参数和静态配置参数)对目标Spark引擎进行初始化,以启动目标Spark引擎。即,将动态配置参数和静态配置参数对应填充至目标Spark引擎的配置文件的配置参数中,以进行初始化。Then, obtain the static configuration parameters according to the deployment directory information, specifically, obtain the static configuration parameters in the Spark_conf_dir directory, such as application attribute parameters, operating environment parameters, runtime behavior parameters, network parameters, etc. Furthermore, the target Spark engine is initialized with startup parameters (including dynamic configuration parameters and static configuration parameters) according to the version loading rule, so as to start the target Spark engine. That is, the dynamic configuration parameters and the static configuration parameters are correspondingly filled into the configuration parameters of the configuration file of the target Spark engine for initialization.

此外,需要说明的是,在初始化过程中,对于pySpark(Spark为Python开发者提供的API)用到的archives(参数值)也需要根据不同的版本上传到对应的hdfs(HadoopDistributed File System,分布式文件系统)目录下,方便Spark引擎的executor(执行器节点)从正确的位置下载这些第三方的文件,以用于解决Executor启动时依赖的Jar问题。In addition, it should be noted that during the initialization process, the archives (parameter values) used by pySpark (the API provided by Spark for Python developers) also need to be uploaded to the corresponding hdfs (Hadoop Distributed File System, distributed file system) according to different versions. file system) directory, it is convenient for the executor (executor node) of the Spark engine to download these third-party files from the correct location to solve the Jar problem that the Executor depends on when starting.

进一步地,该作业执行方法还包括:Further, the job execution method also includes:

步骤a1,在初始化过程中,根据所述版本号和预设抽象层接口确定目标调用方法;Step a1, during the initialization process, determine the target calling method according to the version number and the preset abstraction layer interface;

步骤a2,根据所述目标调用方法加载所述部署目录信息对应目录下的所述目标Spark引擎依赖的文件包。Step a2, according to the target calling method, load the file package that the target Spark engine depends on under the directory corresponding to the deployment directory information.

进一步地,由于Linkis的Spark引擎服务运行时对于Spark存在一些依赖,目前这些依赖通过都以jar包的形式放在根目录的一个lib目录下。当存在不同的Spark版本时,这些jar就会出现冲突,因此,需要预先把这些依赖的jar抽离出来,不是在应用启动时才全部加载,而是根据不同的引擎版本,在Spark引擎服务中加一层抽象接口定义,实现多版本依赖抽象层,把解决好jar冲突的底层包做成版本模块(即下文的依赖的文件包),创建引擎的时候只加载指定版本模块,避免多版本的jar冲突问题。Furthermore, since the Spark engine service of Linkis has some dependencies on Spark at runtime, these dependencies are currently placed in a lib directory in the root directory in the form of jar packages. When there are different Spark versions, these jars will conflict. Therefore, these dependent jars need to be extracted in advance, not all loaded when the application starts, but according to different engine versions, in the Spark engine service Add a layer of abstract interface definition to realize multi-version dependent abstraction layer, make the underlying package that resolves the jar conflict into a version module (that is, the dependent file package below), and only load the specified version module when creating the engine to avoid multi-version jar conflict problem.

具体的,在初始化过程中,根据版本号和预设抽象层接口确定目标调用方法,然后,根据目标调用方法加载部署目录信息对应目录下的目标Spark引擎依赖的文件包,以根据版本差异加载无冲突的依赖库。Specifically, during the initialization process, the target invocation method is determined according to the version number and the preset abstraction layer interface, and then, according to the target invocation method, the file package that the target Spark engine depends on under the directory corresponding to the deployment directory information is loaded, so that no Conflicting dependencies.

步骤S40,将所述Spark作业代码提交至所述目标Spark引擎,以执行作业。Step S40, submit the Spark job code to the target Spark engine to execute the job.

最后,将Spark作业代码提交至目标Spark引擎,以执行作业。具体的,将Spark作业代码提交至目标Spark引擎的驱动器节点(Driver),然后,通过Driver对Spark作业代码进行转化,得到Spark任务(task),具体的转化过程可参照现有技术;最后,将Spark任务分配至部署在Yarn集群上的执行器节点(Executor),以执行作业。Finally, submit the Spark job code to the target Spark engine to execute the job. Specifically, the Spark job code is submitted to the driver node (Driver) of the target Spark engine, and then the Spark job code is converted by the Driver to obtain a Spark task (task). The specific conversion process can refer to the prior art; finally, the Spark tasks are assigned to executor nodes (Executor) deployed on the Yarn cluster to execute jobs.

本发明实施例提供一种作业执行方法,在接收到Spark作业的执行请求时,根据执行请求获取目标Spark引擎的版本号、动态配置参数和Spark作业代码;然后,根据版本号确定目标Spark引擎的部署目录信息和版本加载规则;根据部署目录信息获取静态配置参数,根据版本加载规则使用动态配置参数和静态配置参数对目标Spark引擎进行初始化,以启动目标Spark引擎;进而将Spark作业代码提交至目标Spark引擎,以执行作业。本发明实施例中,通过预先对不同版本Spark中容易发生冲突的依赖的jab包进行安装部署,在接收到Spark作业的执行请求时,获取请求中的版本号以确定对应的部署目录信息,并获取对应的启动参数(包括动态配置参数和静态配置参数),进而根据启动参数对目标版本的Spark引擎进行初始化,以启动目标版本的Spark引擎,可实现不同版本Spark依赖jar包的动态加载,避免出现多版本jar冲突问题,使得多版本Spark可在同一Linkis集群下并行执行。因此,本发明只需要部署一套Linkis服务集群,在该Linkis服务集群内可创建不同版本的Spark引擎,以支持多版本Spark作业的运行,相比于现有技术中通过部署多套Linkis集群来部署不同版本的Spark引擎,本发明可在很大程度上减少了机器资源和运维成本。The embodiment of the present invention provides a kind of job execution method, when receiving the execution request of Spark job, obtain the version number of target Spark engine, dynamic configuration parameter and Spark job code according to execution request; Then, determine the version number of target Spark engine according to version number Deploy directory information and version loading rules; obtain static configuration parameters according to the deployment directory information, use dynamic configuration parameters and static configuration parameters to initialize the target Spark engine according to the version loading rules to start the target Spark engine; and then submit the Spark job code to the target Spark engine to execute jobs. In the embodiment of the present invention, by pre-installing and deploying dependent jab packages that are prone to conflicts in different versions of Spark, when receiving an execution request for a Spark job, obtain the version number in the request to determine the corresponding deployment directory information, and Obtain the corresponding startup parameters (including dynamic configuration parameters and static configuration parameters), and then initialize the Spark engine of the target version according to the startup parameters to start the Spark engine of the target version, which can realize dynamic loading of different versions of Spark dependent jar packages, avoiding The problem of multi-version jar conflicts occurs, so that multi-version Spark can be executed in parallel under the same Linkis cluster. Therefore, the present invention only needs to deploy a set of Linkis service clusters, and different versions of Spark engines can be created in the Linkis service clusters to support the operation of multi-version Spark jobs. Compared with the prior art by deploying multiple sets of Linkis clusters By deploying different versions of the Spark engine, the present invention can greatly reduce machine resources and operation and maintenance costs.

进一步地,由于启动一个Spark引擎需要很久的时间,而且一旦启动成功就会锁定一部分的集群计算资源,当一个用户的作业运行完成后,并不会立即结束掉这个已经处在运行状态的引擎,而会让用户的下一个作业得到立即执行,以提升用户体验。引擎复用在很大程度上节约了时间和计算资源,然而现有的Linkis用户的Session(会话控制)模式管理下,在复用已有的Spark引擎时,是随机向空闲的Spark引擎提交作业的。当在一个环境下存在不同版本的Spark引擎时,用户的一部分作业会被提交到旧版本的Spark引擎,而一部分作业会被提交到新版本的Spark引擎,从而导致作业执行失败的情况。Furthermore, since it takes a long time to start a Spark engine, and once it is started successfully, part of the cluster computing resources will be locked. When a user's job is completed, the engine that is already running will not be terminated immediately. Instead, the user's next job will be executed immediately to improve user experience. Engine reuse saves time and computing resources to a large extent. However, under the Session (session control) mode management of existing Linkis users, when reusing existing Spark engines, jobs are randomly submitted to idle Spark engines. of. When different versions of the Spark engine exist in an environment, part of the user's jobs will be submitted to the old version of the Spark engine, while some jobs will be submitted to the new version of the Spark engine, resulting in job execution failures.

对此,基于上述第一实施例,提出本发明作业执行方法的第二实施例。In this regard, based on the first embodiment above, a second embodiment of the job execution method of the present invention is proposed.

在本实施例中,在上述步骤S20之前,该作业执行方法还包括:In this embodiment, before the above step S20, the job execution method further includes:

步骤A,获取所述执行请求对应的用户标识,检测是否存在与所述用户标识和所述版本号对应的空闲Spark引擎;Step A, obtaining the user ID corresponding to the execution request, and detecting whether there is an idle Spark engine corresponding to the user ID and the version number;

在本实施例中,对外服务入口在获取到Spark作业的执行请求时,在获取目标Spark引擎的版本号、动态配置参数和Spark作业代码的同时,还可以获取该执行请求对应的用户标识,其中,用户标识可以为用户名或用户编号等。然后,检测是否存在与用户标识和版本号对应的空闲Spark引擎。在检测时,可从引擎管理器中获取当前空闲的Spark引擎的版本号及所属用户,进而与获取到的用户标识和版本号进行匹配。In this embodiment, when the external service portal obtains the execution request of the Spark job, it can also obtain the user ID corresponding to the execution request while obtaining the version number of the target Spark engine, the dynamic configuration parameters and the Spark job code, where , the user ID can be a user name or user ID, etc. Then, detect whether there is an idle Spark engine corresponding to the user ID and version number. During detection, the version number and user of the currently idle Spark engine can be obtained from the engine manager, and then matched with the obtained user ID and version number.

若不存在,则执行步骤S20:根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;If it does not exist, then perform step S20: determine the deployment directory information and version loading rules of the target Spark engine according to the version number;

若不存在与用户标识和版本号对应的空闲Spark引擎,则需创建新的Spark引擎,此时,则根据版本号确定目标Spark引擎的部署目录信息和版本加载规则,进而执行后续步骤,具体的执行过程可参照上述第一实施例,此次不作赘述。If there is no idle Spark engine corresponding to the user ID and version number, a new Spark engine needs to be created. At this time, the deployment directory information and version loading rules of the target Spark engine are determined according to the version number, and then the next steps are executed. Specifically For the execution process, reference may be made to the above-mentioned first embodiment, which will not be described in detail this time.

若存在,则执行步骤B:将所述Spark作业代码提交至所述空闲Spark引擎,以执行作业。If it exists, execute step B: submit the Spark job code to the idle Spark engine to execute the job.

若存在与用户标识和版本号对应的空闲Spark引擎,则直接将该Spark作业代码提交至与用户标识和版本号对应的空闲Spark引擎,以执行作业。If there is an idle Spark engine corresponding to the user ID and version number, directly submit the Spark job code to the idle Spark engine corresponding to the user ID and version number to execute the job.

本实施例中,在Spark作业的执行请求中添加版本标签(即版本号),同时通过引擎管理器管理不同用户及版本号对应的空闲Spark引擎,在Linkis的对外服务入口接收到执行请求后,通过检测是否存在与用户标识和版本号对应的空闲Spark引擎,进而自动根据作业的版本标签提交给对应版本的空闲Spark引擎,可在实现引擎复用的同时,避免Spark作业被随机提交到不同版本的Spark引擎、使得作业执行失败的情况发生。In this embodiment, a version label (i.e. version number) is added to the execution request of the Spark job, and at the same time, the idle Spark engine corresponding to different users and version numbers is managed through the engine manager. After the external service entry of Linkis receives the execution request, By detecting whether there is an idle Spark engine corresponding to the user ID and version number, and then automatically submitting it to the idle Spark engine of the corresponding version according to the version label of the job, it can prevent Spark jobs from being randomly submitted to different versions while realizing engine reuse. If the Spark engine is used, the job execution fails.

进一步地,当需要更新Spark版本时,Linkis需要先kill(杀)掉所有正在运行的Spark引擎,再停止已有的Spark引擎管理服务,更新所有Spark引擎管理服务器的配置文件,并更新所有的Spark引擎管理服务用到的依赖的Jar(一种软件包文件格式)包,这样对于业务的影响非常大,用户的Spark作业在更新期间都不能执行。此外,在更新后,还存在有些Spark作业在新版本Spark引擎下无法正确执行的风险。Furthermore, when the Spark version needs to be updated, Linkis needs to kill (kill) all the running Spark engines first, then stop the existing Spark engine management services, update the configuration files of all Spark engine management servers, and update all Spark engines. The dependent Jar (a software package file format) package used by the engine management service has a great impact on the business, and the user's Spark job cannot be executed during the update period. In addition, after the update, there is a risk that some Spark jobs cannot be executed correctly under the new version of the Spark engine.

对此,基于上述第一实施例,提出本发明作业执行方法的第三实施例。In this regard, based on the first embodiment above, a third embodiment of the job execution method of the present invention is proposed.

在本实施例中,在上述步骤S20之前,该作业执行方法还包括:In this embodiment, before the above step S20, the job execution method further includes:

步骤C,获取所述执行请求对应的用户标识,根据所述用户标识判断用户是否在预设灰度名单中;Step C, obtaining the user ID corresponding to the execution request, and judging whether the user is in the preset grayscale list according to the user ID;

在本实施例中,对外服务入口在获取到Spark作业的执行请求时,在获取目标Spark引擎的版本号、动态配置参数和Spark作业代码的同时,还可以获取该执行请求对应的用户标识,其中,用户标识可以为用户名或用户编号等。然后,根据用户标识判断该用户是否在预设灰度名单中。其中,预设灰度名单是预先设定的,用于指定部分用户进行灰度,即,将该指定部分的用户的作用提交至新部署的Linkis服务集群下新版本的Spark引擎上执行。在具体实施时,可根据指定用户的作业在新版本的Spark引擎上执行的成功率,对应调整预设灰度名单中指定的灰度用户数量,使得用户的Spark作业逐步迁移至新部署的Linkis服务集群下新版本的Spark引擎上执行。In this embodiment, when the external service portal obtains the execution request of the Spark job, it can also obtain the user ID corresponding to the execution request while obtaining the version number of the target Spark engine, the dynamic configuration parameters and the Spark job code, where , the user ID can be a user name or user ID, etc. Then, judge whether the user is in the preset grayscale list according to the user identifier. Among them, the preset grayscale list is pre-set, and is used to specify some users to perform grayscale, that is, submit the role of the specified part of users to the new version of the Spark engine under the newly deployed Linkis service cluster for execution. In actual implementation, according to the success rate of the specified user's job execution on the new version of the Spark engine, the number of grayscale users specified in the preset grayscale list can be adjusted accordingly, so that the user's Spark job can be gradually migrated to the newly deployed Linkis Execute on the new version of the Spark engine under the service cluster.

若用户不在预设灰度名单中,则执行步骤S20:根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;If the user is not in the preset grayscale list, then perform step S20: determine the deployment directory information and version loading rules of the target Spark engine according to the version number;

若用户在预设灰度名单中,则执行步骤D:创建灰度Spark引擎,并将所述Spark作业代码提交至所述灰度Spark引擎,以执行作业。If the user is in the preset grayscale list, perform step D: create a grayscale Spark engine, and submit the Spark job code to the grayscale Spark engine to execute the job.

若用户不在预设灰度名单中,则该用户不属于指定的灰度用户,此时,只需将其作业提交至原Linkis服务集群下的Spark引擎上执行即可,具体的,先根据版本号确定目标Spark引擎的部署目录信息和版本加载规则,进而执行后续步骤,具体的执行过程可参照上述第一实施例,此次不作赘述。If the user is not in the preset grayscale list, the user does not belong to the specified grayscale user. At this time, only need to submit the job to the Spark engine under the original Linkis service cluster for execution. Specifically, according to the version No. Determine the deployment directory information and version loading rules of the target Spark engine, and then execute the subsequent steps. The specific execution process can refer to the first embodiment above, and will not be repeated this time.

若用户在预设灰度名单中,则创建灰度Spark引擎,并将Spark作业代码提交至灰度Spark引擎,以执行作业。其中,灰度Spark引擎即为新部署的Linkis服务集群下新版本的Spark引擎,其创建方式与上述第一实施例中目标Spark引擎的创建方式相同,此次不作赘述。需要说明的是,在具体实施时,该灰度Spark引擎可以是预先创建好的,此时,则直接将Spark作业代码提交至预先创建好的灰度Spark引擎,以执行作业。If the user is in the default grayscale list, create a grayscale Spark engine, and submit the Spark job code to the grayscale Spark engine to execute the job. Among them, the grayscale Spark engine is a new version of the Spark engine under the newly deployed Linkis service cluster, and its creation method is the same as the creation method of the target Spark engine in the first embodiment above, which will not be repeated this time. It should be noted that, during specific implementation, the grayscale Spark engine may be pre-created. In this case, the Spark job code is directly submitted to the pre-created grayscale Spark engine to execute the job.

还需要说明的是,由于所要提交的Spark引擎版本变化了,对应的版本号、配置参数均需预先设定好,在创建灰度Spark引擎,则是基于预先设定好的版本号、配置参数进行构建。此外,由于目前的Spark版本参数定义较为复杂,在具体实施时,前后端可采用统一的版本代号机制配置到数据库中,如v1代表Spark1.6.0等,以进行统一编号,避免多版本不可控制。任由用户随意指定加载,只有版本号是统一编号的版本号才可以正确提交,否则作业提交失败。It should also be noted that since the version of the Spark engine to be submitted has changed, the corresponding version number and configuration parameters must be pre-set. When creating a grayscale Spark engine, it is based on the pre-set version number and configuration parameters. to build. In addition, due to the complexity of the current Spark version parameter definition, in actual implementation, the front and back ends can be configured into the database using a unified version code mechanism, such as v1 for Spark1.6. Allow the user to specify the loading at will, and only the version number with a unified number can be submitted correctly, otherwise the job submission will fail.

本实施例中,通过建立基于用户的版本灰度机制,对指定的部分用户进行灰度,将灰度用户的作用请求提交至新版本的灰度Spark引擎上执行。通过上述方式,在Linkis下更新Spark版本时,无需停止已有的Spark引擎管理服务,同时用户的Spark作业在更新期间仍可执行,从而可避免对业务造成不良影响。In this embodiment, by establishing a user-based version grayscale mechanism, the specified part of users is grayscaled, and the role request of the grayscale user is submitted to the grayscale Spark engine of the new version for execution. Through the above method, when updating the Spark version under Linkis, there is no need to stop the existing Spark engine management service, and the user's Spark job can still be executed during the update period, thereby avoiding adverse effects on the business.

进一步地,由于不同版本的Spark引擎,其Spark作业代码会存在一些不同。而用户在发送Spark作业时,可能并未采用所需提交的新Spark版本的代码,而是仍然使用旧Spark版本的代码来发送Spark作业的执行请求,此时,则会存在作业执行失败的问题。Furthermore, due to different versions of the Spark engine, the Spark job codes will be somewhat different. When the user sends the Spark job, the code of the new Spark version that needs to be submitted may not be used, but the code of the old Spark version is still used to send the execution request of the Spark job. At this time, there will be a problem of job execution failure. .

对此,基于上述第一至第三实施例,提出本发明作业执行方法的第四实施例。In this regard, based on the above first to third embodiments, a fourth embodiment of the job execution method of the present invention is proposed.

在本实施例中,在上述步骤S40之前,该作业执行方法还包括:In this embodiment, before the above step S40, the job execution method further includes:

步骤E,根据所述版本号对所述Spark作业代码进行修改;Step E, modifying the Spark job code according to the version number;

在本实施例中,在根据Spark作业的执行请求获取到目标Spark引擎的版本号和Spark作业代码之后,可根据版本号对该Spark作业代码进行修改,以使得修改后的Spark作业代码与其版本号兼容匹配。In this embodiment, after obtaining the version number of the target Spark engine and the Spark job code according to the execution request of the Spark job, the Spark job code can be modified according to the version number, so that the modified Spark job code and its version number Compatible matches.

具体的,在对外服务入口定义一个不同版本的代码解析器,通过该代码解析器预先定义了不同版本之间的变化及其对应的修改策略,以基于已知的变化,预定义不同版本的语法替换操作,比如不同版本的包引入变化问题,或者API(Application ProgrammingInterface,应用程序接口)函数调用接口的变化问题。在修改时,代码解析器根据版本号将Spark作业代码与预定义的不同版本的代码差异及其修改策略进行匹配,进而根据匹配结果进行对应的修改。Specifically, a different version of the code parser is defined at the external service entrance, and the changes between different versions and the corresponding modification strategies are predefined through the code parser, so as to predefine the syntax of different versions based on known changes Replacement operations, such as the introduction of changes in different versions of packages, or changes in the API (Application Programming Interface, application program interface) function call interface. When modifying, the code parser matches the Spark job code with the predefined code differences and modification strategies of different versions according to the version number, and then performs corresponding modification according to the matching result.

此时,步骤S40包括:将修改后的Spark作业代码提交至所述目标Spark引擎,以执行作业。At this point, step S40 includes: submitting the modified Spark job code to the target Spark engine to execute the job.

然后,将修改后的Spark作业代码提交至目标Spark引擎,以执行作业。Then, submit the modified Spark job code to the target Spark engine to execute the job.

本实施例中,通过在代码提交阶段,根据不同版本的代码差异及其修改策略自动修改Spark作业代码,可在用户不用修改已有代码的情况下,能够兼容多版本运行。In this embodiment, by automatically modifying the Spark job code according to the code differences and modification strategies of different versions during the code submission stage, it can run compatible with multiple versions without the user needing to modify the existing code.

进一步地,基于上述第一至第三实施例,提出本发明作业执行方法的第五实施例。Further, based on the first to third embodiments above, a fifth embodiment of the job execution method of the present invention is proposed.

在本实施例中,步骤S40包括:In this embodiment, step S40 includes:

步骤a41,将所述Spark作业代码提交至所述目标Spark引擎的驱动器节点;Step a41, submitting the Spark job code to the driver node of the target Spark engine;

在本实施例中,作业执行过程如下:In this embodiment, the job execution process is as follows:

先将Spark作业代码提交至目标Spark引擎的驱动器节点(Driver)。其中,Driver用于将用户的Spark作业代码转为多个物理执行的单元,这些单元也被称为任务(task),还用于跟踪Executor的运行状况、根据当前的Executor节点集合,将所有Task基于数据所在位置分配给合适的Executor。First submit the Spark job code to the driver node (Driver) of the target Spark engine. Among them, the Driver is used to convert the user's Spark job code into multiple physical execution units. These units are also called tasks (tasks). It is also used to track the running status of the Executor. Assign the appropriate Executor based on the location of the data.

步骤a42,通过所述驱动器节点对所述Spark作业代码进行转化,得到Spark任务;Step a42, converting the Spark job code through the driver node to obtain the Spark task;

然后,通过驱动器节点对Spark作业代码进行转化,得到Spark任务。具体的转化方式可参照现有技术。Then, convert the Spark job code through the driver node to get the Spark task. The specific conversion method can refer to the prior art.

步骤a43,将所述Spark任务分配至部署在Yarn集群上的执行器节点,以执行作业。Step a43, assigning the Spark task to the executor nodes deployed on the Yarn cluster to execute the job.

最后,将Spark任务分配至部署在Yarn集群上的执行器节点(Executor),以执行作业。其中,Yarn集群为提供大数据平台中作业调度和集群资源管理的框架,可实现对执行器节点的管理和调度。执行器节点用于运行Spark任务,并将执行结果返回给驱动器节点。Finally, the Spark task is assigned to the executor node (Executor) deployed on the Yarn cluster to execute the job. Among them, the Yarn cluster provides a framework for job scheduling and cluster resource management in the big data platform, which can realize the management and scheduling of executor nodes. Executor nodes are used to run Spark tasks and return execution results to driver nodes.

进一步地,由于Spark是分布式的,在Driver动态生成的Spark任务会被序列化后分配至各个Executor上去分布式执行,Driver同时提供一个远程类加载服务,Executor再把序列化后代码反序列化回来进行动态加载时,就有可能出现问题,有一种情况就是Driver收到的Executor执行后的序列化结果不能被正确的反序列化回来。由于Linkis启动Spark引擎的时候是已经存在一个类加载器,该类加载器是Spark Driver默认的类加载器,用户的Spark作业代码是提交给Driver解释执行的,所以Driver还会启动一个Scala解释器,即,在初始化Spark Driver时,也会创建一个SparkILoop,由于SparkILoop复用的是Scala语言的代码解释器,里面也会设定一个类加载器。当用户定义一段代码(即Spark作业代码)提交至Spark引擎时,首先被Spark Driver中的Scala解释器解释执行,所以用户新定义的一些类是完全在Scala解释器的类加载器中,当序列化后的Spark任务提交到Executor执行,如果需要继续返回用户自己定义的类的实例对象的操作引用,就会把序列化执行结果提交给Spark Driver,但这时Spark Driver的类加载器是默认的类加载器,缺乏Scala解释器中的用户动态代码定义的类信息,就会运行失败,即,只能支持部分代码的运行,对于需要返回用户新定义类对象的情况,则无法正确执行。Furthermore, since Spark is distributed, the Spark tasks dynamically generated by the Driver will be serialized and distributed to each Executor for distributed execution. The Driver also provides a remote class loading service, and the Executor deserializes the serialized code When returning to perform dynamic loading, there may be problems. In one case, the serialized result received by the Driver after Executor execution cannot be deserialized correctly. Since there is already a class loader when Linkis starts the Spark engine, this class loader is the default class loader of the Spark Driver, and the user's Spark job code is submitted to the Driver for interpretation and execution, so the Driver will also start a Scala interpreter , that is, when the Spark Driver is initialized, a SparkILoop will also be created. Since the SparkILoop reuses the code interpreter of the Scala language, a class loader will also be set in it. When the user defines a piece of code (that is, the Spark job code) and submits it to the Spark engine, it is first interpreted and executed by the Scala interpreter in the Spark Driver, so some classes newly defined by the user are completely in the class loader of the Scala interpreter. When the sequence The converted Spark task is submitted to the Executor for execution. If it is necessary to continue to return the operation reference of the instance object of the class defined by the user, the serialized execution result will be submitted to the Spark Driver, but at this time, the class loader of the Spark Driver is the default If the class loader lacks the class information defined by the user's dynamic code in the Scala interpreter, it will fail to run, that is, it can only support the operation of part of the code, and it cannot execute correctly when it needs to return the user's newly defined class object.

针对上述情况,基于上述第五实施例,提出本发明作业执行方法的第六实施例。In view of the above situation, based on the fifth embodiment above, a sixth embodiment of the job execution method of the present invention is proposed.

在本实施例中,在步骤a42之前,该作业执行方法还包括:In this embodiment, before step a42, the job execution method further includes:

步骤F,在初始化过程中,在所述目标Spark引擎的驱动器节点中创建Scala解释器时,将主线程的类加载器注入至所述Scala解释器中,使得所述主线程的类加载器成为Scala解释器类加载器的父级,并使得所述Scala解释器根据父级的类加载器创建对应的类加载器;Step F, during the initialization process, when creating a Scala interpreter in the driver node of the target Spark engine, inject the class loader of the main thread into the Scala interpreter, so that the class loader of the main thread becomes The parent of the Scala interpreter class loader, and make the Scala interpreter create a corresponding class loader according to the parent's class loader;

在对目标Spark引擎进行初始化的过程中,会在目标Spark引擎的驱动器节点中创建Scala解释器,此时,将主线程的类加载器注入至Scala解释器中,使得主线程的类加载器成为Scala解释器类加载器的父级,进而使得Scala解释器根据父级的类加载器创建对应的类加载器;In the process of initializing the target Spark engine, a Scala interpreter will be created in the driver node of the target Spark engine. At this time, the class loader of the main thread is injected into the Scala interpreter, so that the class loader of the main thread becomes The parent of the Scala interpreter class loader, which in turn causes the Scala interpreter to create a corresponding class loader based on the parent's class loader;

此时,步骤a42包括:At this point, step a42 includes:

通过所述驱动器节点中创建的Scala解释器的类加载器对所述Spark作业代码进行转化,得到Spark任务;The Spark job code is transformed by the class loader of the Scala interpreter created in the driver node to obtain the Spark task;

然后,通过驱动器节点Driver中创建的Scala解释器的类加载器对Spark作业代码进行转化,得到Spark任务,进而,将Spark任务分配至部署在Yarn集群上的执行器节点Executor,以执行作业。Then, convert the Spark job code through the class loader of the Scala interpreter created in the driver node Driver to obtain the Spark task, and then assign the Spark task to the Executor node deployed on the Yarn cluster to execute the job.

进一步地,在步骤a43之后,该作业执行方法还包括:Further, after step a43, the job execution method further includes:

步骤G,在接收到所述执行器节点基于所述Spark任务返回的序列化执行结果时,将所述目标Spark引擎当前线程的类加载器修改为所述Scala解释器的类加载器,以通过所述Scala解释器的类加载器对所述序列化执行结果进行反序列化。Step G, when receiving the serialized execution result returned by the executor node based on the Spark task, modify the class loader of the current thread of the target Spark engine to the class loader of the Scala interpreter to pass The class loader of the Scala interpreter deserializes the serialized execution result.

在接收到执行器节点Executor基于Spark任务返回的序列化执行结果时,将目标Spark引擎当前线程的类加载器修改为Scala解释器的类加载器,以通过Scala解释器的类加载器对序列化执行结果进行反序列化。When receiving the serialized execution result returned by the executor node Executor based on the Spark task, modify the class loader of the current thread of the target Spark engine to the class loader of the Scala interpreter to serialize through the class loader of the Scala interpreter The execution result is deserialized.

本实施例中,通过动态修改Spark引擎的类加载器,以保证Spark引擎的类加载与Driver中Scala解释器的类加载器的一致性,从而能使得Executor反序列化后的执行结果能被Driver正确解析。In this embodiment, by dynamically modifying the class loader of the Spark engine, the consistency between the class loader of the Spark engine and the class loader of the Scala interpreter in the Driver is ensured, so that the execution result deserialized by the Executor can be read by the Driver correctly parsed.

进一步地,需要说明的是,由于Spark版本的差异,有些代码就很难做到兼容了,切换Spark就无法通过编译,此时,可采用动态编译和反射相结合的方法。正常情况下,可以准备两份不同Spark版本对应的Spark作业代码,然后在运行时决定编译哪份代码。然而,这种方式有一个问题,如果动态编译返回的值是需要被序列化,继而发送至Executor的,由于里面生成的一些匿名类在Executor中并不存在,因此在反序列化时会出现问题。Furthermore, it should be noted that due to differences in Spark versions, some codes are difficult to be compatible, and the compilation cannot be passed when switching Spark. In this case, a method combining dynamic compilation and reflection can be used. Normally, you can prepare two Spark job codes corresponding to different Spark versions, and then decide which code to compile at runtime. However, there is a problem with this method. If the value returned by dynamic compilation needs to be serialized and then sent to Executor, some anonymous classes generated in it do not exist in Executor, so there will be problems when deserializing .

对此,可针对有变化的类做一层自己的封装实现,由于在Spark中,可以通过定义一个类:org.apache.Spark.SPARK_VERSION(用于获取spark版本),来获取Spark的版本。具体的,根据自定义的类在代码中获取不同版本参数,并动态加载对应版本的类,然后通过反射来调用方法,可避免上述编译时的错误。通过自己封装类屏蔽掉不同版本的调用接口差异问题,然而通过反射,就无法使用原来UDF(Universal Disc Format,统一光盘格式)之类的代码了。这是因为udf函数要求能够推导出输入和返回值是哪种具体的类型,而如果通过反射,因为返回值我们无法确定(有可能是org.apache.Spark.ml.linalg.Vector,也有可能是org.apache.Spark.mllib.linalg.Vector),这个时候就无法通过编译了。对于这种情况就需要修改Spark的源码版本,根据当前的版本参数,能够动态加载对应的类型作为返回值。In this regard, you can make your own encapsulation implementation for the changed classes, because in Spark, you can get the Spark version by defining a class: org.apache.Spark.SPARK_VERSION (used to get the spark version). Specifically, obtain different version parameters in the code according to the customized class, dynamically load the corresponding version of the class, and then call the method through reflection, which can avoid the above-mentioned compile-time errors. The problem of differences in calling interfaces of different versions is shielded by encapsulating the class itself. However, through reflection, the original UDF (Universal Disc Format, unified disc format) and other codes cannot be used. This is because the udf function requires the ability to deduce the specific type of the input and return value, and if through reflection, we cannot determine the return value (it may be org.apache.Spark.ml.linalg.Vector, or it may be org.apache.Spark.mllib.linalg.Vector), this time it cannot be compiled. In this case, it is necessary to modify the source code version of Spark. According to the current version parameters, the corresponding type can be dynamically loaded as the return value.

本发明还提供一种作业执行装置。The invention also provides a job execution device.

参照图4,图4为本发明作业执行装置第一实施例的功能模块示意图。Referring to FIG. 4 , FIG. 4 is a schematic diagram of the functional modules of the first embodiment of the job execution device of the present invention.

如图4所示,所述作业执行装置包括:As shown in Figure 4, the job execution device includes:

第一获取模块10,用于在接收到Spark作业的执行请求时,根据所述执行请求获取目标Spark引擎的版本号、动态配置参数和Spark作业代码;The first acquisition module 10 is used to obtain the version number, dynamic configuration parameters and Spark job codes of the target Spark engine according to the execution request when receiving the execution request of the Spark job;

第一确定模块20,用于根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;The first determining module 20 is used to determine the deployment directory information and version loading rules of the target Spark engine according to the version number;

引擎初始化模块30,用于根据所述部署目录信息获取静态配置参数,根据所述版本加载规则使用所述动态配置参数和所述静态配置参数对所述目标Spark引擎进行初始化,以启动所述目标Spark引擎;The engine initialization module 30 is used to obtain static configuration parameters according to the deployment directory information, and use the dynamic configuration parameters and the static configuration parameters to initialize the target Spark engine according to the version loading rules to start the target Spark engine;

作业执行模块40,用于将所述Spark作业代码提交至所述目标Spark引擎,以执行作业。The job execution module 40 is configured to submit the Spark job code to the target Spark engine to execute the job.

进一步地,所述作业执行装置还包括:Further, the job execution device further includes:

检测模块,用于获取所述执行请求对应的用户标识,检测是否存在与所述用户标识和所述版本号对应的空闲Spark引擎;A detection module, configured to obtain the user ID corresponding to the execution request, and detect whether there is an idle Spark engine corresponding to the user ID and the version number;

所述第一确定模块20,还用于若不存在与所述用户标识和所述版本号对应的空闲Spark引擎,则执行步骤:根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;The first determination module 20 is also used for if there is no idle Spark engine corresponding to the user identification and the version number, then perform the step of: determining the deployment directory information and Version loading rules;

第一提交模块,用于若存在所述用户标识和所述版本号对应的空闲Spark引擎,则将所述Spark作业代码提交至所述空闲Spark引擎,以执行作业。The first submission module is configured to submit the Spark job code to the idle Spark engine to execute the job if there is an idle Spark engine corresponding to the user ID and the version number.

进一步地,所述作业执行装置还包括:Further, the job execution device further includes:

判断模块,用于获取所述执行请求对应的用户标识,根据所述用户标识判断用户是否在预设灰度名单中;A judging module, configured to acquire the user ID corresponding to the execution request, and judge whether the user is in the preset grayscale list according to the user ID;

所述第一确定模块20,还用于若用户不在预设灰度名单中,则执行步骤:根据所述版本号确定所述目标Spark引擎的部署目录信息和版本加载规则;The first determining module 20 is also used for if the user is not in the preset grayscale list, then perform the step: determine the deployment directory information and version loading rules of the target Spark engine according to the version number;

第二提交模块,用于若用户在预设灰度名单中,则创建灰度Spark引擎,并将所述Spark作业代码提交至所述灰度Spark引擎,以执行作业。The second submission module is used to create a grayscale Spark engine if the user is in the preset grayscale list, and submit the Spark job code to the grayscale Spark engine to execute the job.

进一步地,所述作业执行装置还包括:Further, the job execution device further includes:

第二确定模块,用于在初始化过程中,根据所述版本号和预设抽象层接口确定目标调用方法;The second determination module is used to determine the target calling method according to the version number and the preset abstraction layer interface during the initialization process;

文件包加载模块,用于根据所述目标调用方法加载所述部署目录信息对应目录下的所述目标Spark引擎依赖的文件包。The file package loading module is used to load the file package that the target Spark engine depends on under the directory corresponding to the deployment directory information according to the target calling method.

进一步地,所述作业执行装置还包括:Further, the job execution device further includes:

第一修改模块,用于根据所述版本号对所述Spark作业代码进行修改;A first modification module, configured to modify the Spark job code according to the version number;

所述作业执行模块40,还用于:The job execution module 40 is also used for:

将修改后的Spark作业代码提交至所述目标Spark引擎,以执行作业。Submit the modified Spark job code to the target Spark engine to execute the job.

进一步地,所述作业执行模块40包括:Further, the job execution module 40 includes:

代码提交单元,用于将所述Spark作业代码提交至所述目标Spark引擎的驱动器节点;A code submission unit, configured to submit the Spark job code to the driver node of the target Spark engine;

任务生成单元,用于通过所述驱动器节点对所述Spark作业代码进行转化,得到Spark任务;A task generating unit, configured to convert the Spark job code through the driver node to obtain a Spark task;

任务分配单元,用于将所述Spark任务分配至部署在Yarn集群上的执行器节点,以执行作业。The task allocation unit is configured to allocate the Spark task to the executor nodes deployed on the Yarn cluster to execute the job.

进一步地,所述作业执行装置还包括:Further, the job execution device further includes:

第二修改模块,用于在初始化过程中,在所述目标Spark引擎的驱动器节点中创建Scala解释器时,将主线程的类加载器注入至所述Scala解释器中,使得所述主线程的类加载器成为Scala解释器类加载器的父级,并使得所述Scala解释器根据父级的类加载器创建对应的类加载器;The second modification module is used to inject the class loader of the main thread into the Scala interpreter when creating the Scala interpreter in the driver node of the target Spark engine during the initialization process, so that the main thread The class loader becomes the parent of the Scala interpreter class loader, and makes the Scala interpreter create a corresponding class loader according to the parent's class loader;

所述任务生成单元还用于:通过所述驱动器节点中创建的Scala解释器的类加载器对所述Spark作业代码进行转化,得到Spark任务;The task generating unit is also used for: converting the Spark job code through the class loader of the Scala interpreter created in the driver node to obtain the Spark task;

第三修改模块,用于在接收到所述执行器节点基于所述Spark任务返回的序列化执行结果时,将所述目标Spark引擎当前线程的类加载器修改为所述Scala解释器的类加载器,以通过所述Scala解释器的类加载器对所述序列化执行结果进行反序列化。The third modification module is used to modify the class loader of the current thread of the target Spark engine to the class loader of the Scala interpreter when receiving the serialized execution result returned by the executor node based on the Spark task device, so as to deserialize the serialized execution result through the class loader of the Scala interpreter.

其中,上述作业执行装置中各个模块的功能实现与上述作业执行方法实施例中各步骤相对应,其功能和实现过程在此处不再一一赘述。Wherein, the function implementation of each module in the above-mentioned job execution device corresponds to each step in the above-mentioned job execution method embodiment, and its functions and implementation processes will not be repeated here.

本发明还提供一种计算机可读存储介质,该计算机可读存储介质上存储有作业执行程序,所述作业执行程序被处理器执行时实现如以上任一项实施例所述的作业执行方法的步骤。The present invention also provides a computer-readable storage medium, where a job execution program is stored on the computer-readable storage medium, and when the job execution program is executed by a processor, the job execution method described in any one of the above embodiments is implemented. step.

本发明计算机可读存储介质的具体实施例与上述作业执行方法各实施例基本相同,在此不作赘述。The specific embodiments of the computer-readable storage medium of the present invention are basically the same as the embodiments of the above-mentioned job execution method, and will not be repeated here.

需要说明的是,在本文中,术语“包括”、“包含”或者其任何其他变体意在涵盖非排他性的包含,从而使得包括一系列要素的过程、方法、物品或者系统不仅包括那些要素,而且还包括没有明确列出的其他要素,或者是还包括为这种过程、方法、物品或者系统所固有的要素。在没有更多限制的情况下,由语句“包括一个……”限定的要素,并不排除在包括该要素的过程、方法、物品或者系统中还存在另外的相同要素。It should be noted that, as used herein, the term "comprises", "comprises" or any other variation thereof is intended to cover a non-exclusive inclusion such that a process, method, article or system comprising a set of elements includes not only those elements, It also includes other elements not expressly listed, or elements inherent in the process, method, article, or system. Without further limitations, an element defined by the phrase "comprising a..." does not preclude the presence of additional identical elements in the process, method, article or system comprising that element.

上述本发明实施例序号仅仅为了描述,不代表实施例的优劣。The serial numbers of the above embodiments of the present invention are for description only, and do not represent the advantages and disadvantages of the embodiments.

通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本发明的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在如上所述的一个存储介质(如ROM/RAM、磁碟、光盘)中,包括若干指令用以使得一台终端设备(可以是手机,计算机,服务器,空调器,或者网络设备等)执行本发明各个实施例所述的方法。Through the description of the above embodiments, those skilled in the art can clearly understand that the methods of the above embodiments can be implemented by means of software plus a necessary general-purpose hardware platform, and of course also by hardware, but in many cases the former is better implementation. Based on such an understanding, the technical solution of the present invention can be embodied in the form of a software product in essence or in other words, the part that contributes to the prior art, and the computer software product is stored in a storage medium (such as ROM/RAM) as described above. , magnetic disk, optical disk), including several instructions to enable a terminal device (which may be a mobile phone, computer, server, air conditioner, or network device, etc.) to execute the methods described in various embodiments of the present invention.

以上仅为本发明的优选实施例,并非因此限制本发明的专利范围,凡是利用本发明说明书及附图内容所作的等效结构或等效流程变换,或直接或间接运用在其他相关的技术领域,均同理包括在本发明的专利保护范围内。The above are only preferred embodiments of the present invention, and are not intended to limit the patent scope of the present invention. Any equivalent structure or equivalent process conversion made by using the description of the present invention and the contents of the accompanying drawings, or directly or indirectly used in other related technical fields , are all included in the scope of patent protection of the present invention in the same way.

Claims (9)

1. A job execution method, the job execution method comprising:
when an execution request of Spark job is received, acquiring a version number, dynamic configuration parameters and Spark job codes of a target Spark engine according to the execution request;
determining deployment catalog information and version loading rules of the target Spark engine according to the version number;
acquiring static configuration parameters according to the deployment catalog information, and initializing the target Spark engine by using the dynamic configuration parameters and the static configuration parameters according to the version loading rules so as to start the target Spark engine;
submitting the Spark job code to the target Spark engine to execute a job;
before the step of determining the deployment catalog information and the version loading rule of the target Spark engine according to the version number, the method further comprises the following steps:
acquiring a user identifier corresponding to the execution request, and judging whether a user is in a preset gray list or not according to the user identifier;
if the user is not in the preset gray list, executing the steps of: determining deployment catalog information and version loading rules of the target Spark engine according to the version number;
If the user is in the preset gray list, a gray Spark engine is created, and the Spark job code is submitted to the gray Spark engine to execute the job.
2. The job execution method as set forth in claim 1, wherein before the step of determining the deployment catalog information and version loading rules of the target Spark engine according to the version number, further comprising:
acquiring a user identifier corresponding to the execution request, and detecting whether an idle Spark engine corresponding to the user identifier and the version number exists;
if not, executing the steps of: determining deployment catalog information and version loading rules of the target Spark engine according to the version number;
if so, submitting the Spark job code to the idle Spark engine to execute the job.
3. The job execution method as set forth in claim 1, wherein the job execution method further comprises:
in the initialization process, determining a target calling method according to the version number and a preset abstract layer interface;
and loading a file package which is dependent on the target Spark engine under the corresponding directory of the deployment directory information according to the target calling method.
4. A method of executing a job as claimed in any one of claims 1 to 3, wherein before the step of submitting the Spark job code to the target Spark engine to execute the job, further comprising:
modifying the Spark job code according to the version number;
the step of submitting the Spark job code to the target Spark engine to execute a job includes:
and submitting the modified Spark job code to the target Spark engine to execute the job.
5. A job execution method as claimed in any one of claims 1 to 3, wherein said step of submitting said Spark job code to said target Spark engine to execute a job comprises:
submitting the Spark job code to a driver node of the target Spark engine;
converting the Spark job code through the driver node to obtain a Spark task;
and distributing the Spark task to an executor node deployed on the Yarn cluster to execute the job.
6. The job execution method as set forth in claim 5, wherein before the step of converting the Spark job code by the driver node to obtain a Spark task, further comprising:
In the initialization process, when a Scala interpreter is created in a driver node of the target Spark engine, a class loader of a main thread is injected into the Scala interpreter, so that the class loader of the main thread becomes a parent level of the Scala interpreter class loader, and the Scala interpreter creates a corresponding class loader according to the class loader of the parent level;
the step of converting the Spark job code through the driver node to obtain a Spark task includes:
converting the Spark job code through a class loader of a Scala interpreter created in the driver node to obtain a Spark task;
after the step of distributing the Spark task to the executor nodes deployed on the Yarn cluster to execute the job, the method further includes:
and when receiving a serialization execution result returned by the executor node based on the Spark task, modifying a class loader of the current thread of the target Spark engine into a class loader of the Scala interpreter so as to deserialize the serialization execution result through the class loader of the Scala interpreter.
7. A job execution device, the job execution device comprising:
The first acquisition module is used for acquiring the version number, the dynamic configuration parameters and the Spark job code of the target Spark engine according to the execution request when the execution request of the Spark job is received;
the first determining module is used for determining deployment catalog information and version loading rules of the target Spark engine according to the version number;
the engine initialization module is used for acquiring static configuration parameters according to the deployment catalog information, and initializing the target Spark engine by using the dynamic configuration parameters and the static configuration parameters according to the version loading rules so as to start the target Spark engine;
the job execution module is used for submitting the Spark job code to the target Spark engine so as to execute a job;
wherein the job execution apparatus further includes:
the judging module is used for acquiring the user identification corresponding to the execution request and judging whether the user is in a preset gray list or not according to the user identification;
the first determining module is further configured to execute the steps if the user is not in the preset gray list: determining deployment catalog information and version loading rules of the target Spark engine according to the version number;
And the second submitting module is used for creating a gray Spark engine if the user is in the preset gray list and submitting the Spark job code to the gray Spark engine so as to execute the job.
8. A job execution system, the job execution system comprising: a memory, a processor, and a job execution program stored on the memory and executable on the processor, which when executed by the processor, implements the steps of the job execution method according to any one of claims 1 to 6.
9. A computer-readable storage medium, wherein a job execution program is stored thereon, which when executed by a processor, implements the steps of the job execution method according to any one of claims 1 to 6.
CN202010624055.5A 2020-06-30 2020-06-30 Job Execution Method, Device, System, and Computer-Readable Storage Medium Active CN111767092B (en)

Priority Applications (2)

Application Number Priority Date Filing Date Title
CN202010624055.5A CN111767092B (en) 2020-06-30 2020-06-30 Job Execution Method, Device, System, and Computer-Readable Storage Medium
PCT/CN2021/081960 WO2022001209A1 (en) 2020-06-30 2021-03-22 Job execution method, apparatus and system, and computer-readable storage medium

Applications Claiming Priority (1)

Application Number Priority Date Filing Date Title
CN202010624055.5A CN111767092B (en) 2020-06-30 2020-06-30 Job Execution Method, Device, System, and Computer-Readable Storage Medium

Publications (2)

Publication Number Publication Date
CN111767092A CN111767092A (en) 2020-10-13
CN111767092B true CN111767092B (en) 2023-05-12

Family

ID=72724494

Family Applications (1)

Application Number Title Priority Date Filing Date
CN202010624055.5A Active CN111767092B (en) 2020-06-30 2020-06-30 Job Execution Method, Device, System, and Computer-Readable Storage Medium

Country Status (2)

Country Link
CN (1) CN111767092B (en)
WO (1) WO2022001209A1 (en)

Families Citing this family (24)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CN111767092B (en) * 2020-06-30 2023-05-12 深圳前海微众银行股份有限公司 Job Execution Method, Device, System, and Computer-Readable Storage Medium
CN112311603A (en) * 2020-10-30 2021-02-02 上海中通吉网络技术有限公司 Method, device and system for dynamically changing Spark user configuration
CN112286650A (en) * 2020-11-04 2021-01-29 中国电力财务有限公司 Method and device for issuing distributed service
CN112698839B (en) * 2020-12-30 2024-04-12 深圳前海微众银行股份有限公司 Data center node deployment method, device and system and computer storage medium
CN114691766A (en) * 2020-12-30 2022-07-01 北京国双科技有限公司 Data acquisition method and device and electronic equipment
CN113760406B (en) * 2021-02-07 2025-04-15 北京沃东天骏信息技术有限公司 Data processing method, device, equipment, storage medium and program product
CN114968267B (en) * 2021-02-26 2025-03-25 京东方科技集团股份有限公司 Service deployment method, device, electronic device and storage medium
CN113553533A (en) * 2021-06-10 2021-10-26 国网安徽省电力有限公司 Index calculation method based on digital internal five-level market assessment system
CN113642021B (en) * 2021-08-20 2024-05-28 深信服科技股份有限公司 Service code submitting method, processing method, device and electronic equipment
CN113722019B (en) * 2021-11-04 2022-02-08 海尔数字科技(青岛)有限公司 Display method, device and equipment of platform program
CN114615135B (en) * 2022-02-18 2024-03-22 佐朋数科(深圳)信息技术有限责任公司 Front-end gray level publishing method, system and storage medium
CN114880018A (en) * 2022-04-26 2022-08-09 青岛海尔科技有限公司 Request processing method and device, storage medium and electronic device
CN114880073A (en) * 2022-05-09 2022-08-09 南京希音电子商务有限公司 Cloud application engine deployment method, device, equipment and storage medium for shielding Web framework for user
CN115061790B (en) * 2022-06-10 2024-05-14 苏州浪潮智能科技有限公司 A Spark Kmeans core allocation method and system for ARM dual-core servers
CN114995980A (en) * 2022-06-14 2022-09-02 中国银行股份有限公司 A job generating method, job calling method and related equipment
CN115129325B (en) * 2022-06-29 2023-05-23 北京五八信息技术有限公司 Data processing method and device, electronic equipment and storage medium
CN114968274B (en) * 2022-07-29 2022-11-08 之江实验室 A method and system for automatic and rapid deployment of front-end computers based on grayscale publishing
CN115550457A (en) * 2022-09-19 2022-12-30 中国建设银行股份有限公司 A business flow monitoring method, device, equipment and medium
US11954525B1 (en) 2022-09-21 2024-04-09 Zhejiang Lab Method and apparatus of executing collaborative job for spark faced to multiple K8s clusters
CN115242877B (en) * 2022-09-21 2023-01-24 之江实验室 Spark collaborative computing and operating method and device for multiple K8s clusters
CN115237818A (en) * 2022-09-26 2022-10-25 浩鲸云计算科技股份有限公司 A method and system for realizing multi-environment multiplexing based on full-link identification
CN116048817B (en) * 2023-03-29 2023-06-27 腾讯科技(深圳)有限公司 Data processing control method, device, computer equipment and storage medium
CN116909681A (en) * 2023-06-13 2023-10-20 北京远舢智能科技有限公司 Generation method, device, electronic equipment and storage medium of data processing component
CN117453278B (en) * 2023-11-01 2024-05-14 国任财产保险股份有限公司 Rule management system based on business rule

Family Cites Families (9)

* Cited by examiner, † Cited by third party
Publication number Priority date Publication date Assignee Title
CA3011358A1 (en) * 2016-01-12 2017-07-20 Kavi Associates, Llc Multi-technology visual integrated data management and analytics development and deployment environment
CN105867928B (en) * 2016-03-30 2019-06-04 北京奇虎科技有限公司 A method and device for accessing a specified computing model in a specified distributed system
US10275278B2 (en) * 2016-09-14 2019-04-30 Salesforce.Com, Inc. Stream processing task deployment using precompiled libraries
EP3447642B1 (en) * 2017-08-24 2022-03-23 Tata Consultancy Services Limited System and method for predicting application performance for large data size on big data cluster
CN108255689B (en) * 2018-01-11 2021-02-12 哈尔滨工业大学 Automatic Apache Spark application tuning method based on historical task analysis
CN108845884B (en) * 2018-06-15 2024-04-19 中国平安人寿保险股份有限公司 Physical resource allocation method, device, computer equipment and storage medium
CN109614167B (en) * 2018-12-07 2023-10-20 杭州数澜科技有限公司 Method and system for managing plug-ins
CN110262881A (en) * 2019-06-12 2019-09-20 深圳前海微众银行股份有限公司 A kind of submission method and device of Spark operation
CN111767092B (en) * 2020-06-30 2023-05-12 深圳前海微众银行股份有限公司 Job Execution Method, Device, System, and Computer-Readable Storage Medium

Also Published As

Publication number Publication date
WO2022001209A1 (en) 2022-01-06
CN111767092A (en) 2020-10-13

Similar Documents

Publication Publication Date Title
CN111767092B (en) Job Execution Method, Device, System, and Computer-Readable Storage Medium
US10146515B1 (en) Live code updates
US6871345B1 (en) Self managing software agents with introspection
US7039923B2 (en) Class dependency graph-based class loading and reloading
US8738589B2 (en) Classloading technique for an application server that provides dependency enforcement
US8448163B2 (en) Deploying J2EE web applications in an OSGI environment
US6871223B2 (en) System and method for agent reporting in to server
US11748073B2 (en) Robotic process automation system with a command action logic independent execution environment
US20030181196A1 (en) Extensible framework for code generation from XML tags
JP5542796B2 (en) Method and apparatus for managing mobile device software
US20030182625A1 (en) Language and object model for describing MIDlets
US20150220308A1 (en) Model-based development
US20120011496A1 (en) Service providing apparatus, service providing system, method of processing data in service providing apparatus, and computer program
US20090094596A1 (en) Systems and methods for an adaptive installation
US20050262501A1 (en) Software distribution method and system supporting configuration management
CN109614167B (en) Method and system for managing plug-ins
US20030182626A1 (en) On-demand creation of MIDlets
US20050086640A1 (en) Initiating execution of application programs on a data processing arrangement
CN115061717B (en) Application management method, application subscription method and related equipment
US8924947B2 (en) Direct deployment of static content
CN114942796A (en) Plug-in compiling and calling method, device, equipment and storage medium
CN107977243A (en) A kind of third party's interface call method and device
CN115129348A (en) Resource updating method, device and equipment of application program and readable storage medium
WO2022199136A1 (en) Application modification method, and system, cluster, medium and program product
CN118484243A (en) Service processing method, device, system, equipment and medium

Legal Events

Date Code Title Description
PB01 Publication
PB01 Publication
SE01 Entry into force of request for substantive examination
SE01 Entry into force of request for substantive examination
GR01 Patent grant
GR01 Patent grant