# azkaban-plugin-jobtype-sql **Repository Path**: centy/azkaban-plugin-jobtype-sql ## Basic Information - **Project Name**: azkaban-plugin-jobtype-sql - **Description**: Azkaban调度系统jobType插件,使得Azkaban能够直接支持SQL脚本类型的job。 - **Primary Language**: Java - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 11 - **Forks**: 2 - **Created**: 2018-09-13 - **Last Updated**: 2022-09-13 ## Categories & Tags **Categories**: task-schedule **Tags**: None ## README # Azkaban SQL jobtype ## 一、功能说明 本项目为Azkaban的JobType扩展插件,实现Azkaban直接调度数据库脚本的Job。 适用场景: - 原本数据库存在大量定时任务+存储过程进行数据处理的项目,可以通过Azkaban进行改造,实现上下级依赖控制及调度监控,使用该插件改造工作量较小。 - 对于较小数据量的数据处理,无需用到大数据处理架构,同时数据库负载也不高的,可以使用Azkaban+插件实现数据处理。 ## 二、安装部署 - 下载最新发行版本,如:`azkaban-plugin-jobtype-sql-1.0.1.RELEASE.zip` 下载地址:[https://gitee.com/centy/azkaban-plugin-jobtype-sql/releases](https://gitee.com/centy/azkaban-plugin-jobtype-sql/releases) - 将`azkaban-plugin-jobtype-sql-1.0.1.RELEASE.zip`拷贝至`${AzkabanHome}/plugins/jobtypes/`目录下,直接解压到当前目录, 解压后目录结构如下: - plugins - sql_job - lib - ant-1.10.5.jar - ant-launcher-1.10.5.jar - azkaban-plugin-jobtype-sql-1.0.1.RELEASE.jar - fastjson-1.2.23.jar - mysql-connector-java-5.1.38.jar - postgresql-42.2.45.jar - private.properties - 重启Azkaban完成安装。 ## 三、插件配置参数 ### Flow参数 Flow参数可配置在Flow文件内,也可定义在系统环境变量、Azkaban全局参数、Azkaban控制台定义的Flow Parameter等,具体参考Azkaban的配置读取优先级。 Azkaban文档地址:[https://azkaban.readthedocs.io/en/latest/getStarted.html](https://azkaban.readthedocs.io/en/latest/getStarted.html) 参数|说明 --|-- sql_job.database.type|数据库类型,支持mysql/postgresql sql_job.database.driver|数据库驱动类com.mysql.jdbc.Driver sql_job.database.host|数据库主机地址 sql_job.database.port|数据库端口 sql_job.database.database|数据库名称 sql_job.database.schema|实例或模式名称,postgresql数据库必填 sql_job.database.username|数据库用户名 sql_job.database.password|数据库密码 ### Job参数 Job参数定义在Job节点内,用于配置每个Job的定义。 参数|说明 --|-- sql_job.scripts|SQL脚本路径,支持绝对路径和相对路径。相对路径是指SQL文件相对project根目录(即上传至Azkaban的zip压缩包的根目录)的路径。
支持多个路径,多个通过英文逗号进行分隔。 ## 四、使用说明 **本文示例基于Azkaban Flow 2.0,对于Flow 1.0也支持。** ### 快速开始 本示例实现一个两个节点的工作流 [示例地址](job_examples/quickstart_example) - 1.创建quickstart_example文件夹,在这个文件夹中创建`quickstart_example.flow`工作流文件。 - 2.在`quickstart_example.flow`文件中添加以下flow参数,将相关配置修改为你本地的配置: ```yaml config: sql_job.database.type: mysql # 支持mysql、postgresql sql_job.database.driver: com.mysql.jdbc.Driver sql_job.database.host: localhost sql_job.database.port: 3306 sql_job.database.database: demo sql_job.database.schema: # postgresql建议必填 sql_job.database.username: root sql_job.database.password: xxxxxxx ``` - 3.实现第1个Job:create_and_insert_job。 -3.1. 在quickstart_example文件夹下创建scripts文件佳,在scripts文件夹中创建一个SQL脚本文件,命名为`create_and_insert.sql`,里面通过脚本创建一个表并插入测试数据。 ```sql create table if not exists table_a( id int(10) not null, name varchar(16) not null, value double(20,3) not null ); insert into table_a values (1,"test-1",1), (2,"test-2",2), (3,"test-3",3), (4,"test-4",4), (5,"test-5",5), (6,"test-6",6), (7,"test-7",7), (8,"test-8",8) ; select count(*) from table_a; ``` -3.2. `quickstart_example.flow`增加Job定义,JobType为"sql_job"。 ```yaml config: sql_job.database.type: mysql # 支持mysql、postgresql sql_job.database.driver: com.mysql.jdbc.Driver sql_job.database.host: localhost sql_job.database.port: 3306 sql_job.database.database: demo sql_job.database.schema: # postgresql建议必填 sql_job.database.username: root sql_job.database.password: 123456 nodes: - name: create_and_insert_job type: sql_job config: sql_job.scripts: quickstart_example/scripts/create_and_insert.sql # 脚本路径 ``` - 4.实现第2个Job:create_and_insert_job。 -4.1. 同3.1,在scripts文件夹中创建一个SQL脚本文件,命名为`create_and_insert.sql`,里面通过脚本创建一个表并插入测试数据。 ```sql update table_a set value = value*2; select count(*) from table_a; ``` -4.2. `quickstart_example.flow`增加Job定义,JobType为"sql_job"。 ```yaml config: sql_job.database.type: mysql # 支持mysql、postgresql sql_job.database.driver: com.mysql.jdbc.Driver sql_job.database.host: localhost sql_job.database.port: 3306 sql_job.database.database: demo sql_job.database.schema: # postgresql建议必填 sql_job.database.username: root sql_job.database.password: 123456 nodes: - name: create_and_insert_job type: sql_job config: sql_job.scripts: quickstart_example/scripts/create_and_insert.sql # 脚本路径 - name: update_value_job type: sql_job dependsOn: - create_and_insert_job config: sql_job.scripts: quickstart_example/scripts/update_value.sql # 脚本路径 ``` - 5.将quickstart_example打包成zip包,上传Azkaban的project中,上传成功后如下图。 ![](images/quickstart-project.png) - 6.执行工作流,结果如下图。 ![](images/quickstart-exec-job-list.png) ![](images/quickstart-exec-job-detail.png) ### 占位符参数 本插件实现SQL脚本的占位符替换,实现原理如下: - SQL脚本中通过"${}"使用占位符参数,其中parameterName命名格式为:"sql_job.<自定义参数名>[.<自定义参数名2>]" - 可在Azkaban配置中增加parameterName参数配置,配置参数位置可以是环境变量、全局参数、Flow参数、甚至Job参数均可,具体可参考Azkaban配置读取逻辑。 - 插件在执行SQL脚本前,先从Azkaban的配置参数获取对应的配置值,替换SQL脚本中的占位符参数,再执行脚本。 约束条件: - 占位参数命名必须严格按照要求。 #### 使用示例 [示例地址](job_examples/replace_parameter_example) - 1.参照上面示例,将sql脚本文件进行修改,表名都使用占位符`${sql_job.tableName}`替换,如下: **create_and_insert.sql** ```sql create table if not exists ${sql_job.tableName}( id int(10) not null, name varchar(16) not null, value double(20,3) not null ); insert into ${sql_job.tableName} values (1,"test-1",1), (2,"test-2",2), (3,"test-3",3), (4,"test-4",4), (5,"test-5",5), (6,"test-6",6), (7,"test-7",7), (8,"test-8",8) ; select count(*) from ${sql_job.tableName}; ``` **update_value.sql** ```sql update ${sql_job.tableName} set value = value*2; select count(*) from ${sql_job.tableName}; ``` - 2.在`quickstart_example.flow`中增加占位符参数定义,如下: ```yaml config: sql_job.database.type: mysql # 支持mysql、postgresql sql_job.database.driver: com.mysql.jdbc.Driver sql_job.database.host: localhost sql_job.database.port: 3306 sql_job.database.database: demo sql_job.database.schema: # postgresql建议必填 sql_job.database.username: root sql_job.database.password: 123456 sql_job.tableName: table_b #增加该占位符变量定义 nodes: - name: create_and_insert_job type: sql_job config: sql_job.scripts: quickstart_example/scripts/create_and_insert.sql # 脚本路径 - name: update_value_job type: sql_job dependsOn: - create_and_insert_job config: sql_job.scripts: quickstart_example/scripts/update_value.sql # 脚本路径 ``` - 3.上传Azkaban执行同上面示例。 ## 五、注意事项 - sql解释执行使用ant.jar包,该包不支持"#"号格式的注释,只支持"--"或"//"格式注释。 - 占位符参数名必须以"sql_job."作为前缀。