# FlinkSQL **Repository Path**: dhd_index/FlinkSQL ## Basic Information - **Project Name**: FlinkSQL - **Description**: 仿照阿里blink使用sql开发flink的实时程序 - **Primary Language**: Unknown - **License**: Apache-2.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 2 - **Created**: 2021-02-27 - **Last Updated**: 2021-02-27 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README 一.背景 阿里工作的时候是使用Blink进行流数据处理和计算,通过编写sql实现Blink的计算job,开发简单高效,产品易用。 目前尝试实现Flink产品化,类似Blink。使用SQL为统一开发规范,SQL语言的好处是:声明式,易理解,稳定可靠,自动优化。 如果采用API开发的话,最大的问题是对于job调优依赖程序员经验,比较困难,同时API开发方式侵入性太强(数据安全,集群安全等),而sql可以自动调优,避免这种问题的产生。 二.实现思路: 用户输入sql(ddl,query,dml) -> ddl对应为Flink的source和sink -> query/dml的insert into数据处理和计算 --> 封装为对应Flink的Job:env.sqlQuery/env.sqlUpdate --> JobGraph和对应job提交,ClusterClient.submitJob或者ClusterDescriptor.deployJobCluster 三.发布版本: [v3.0.0](https://github.com/ambition119/FlinkSQL/tree/v3.0.0) 2020年1月 1.使用flink 1.10版本 1.10之前的版本自带的sql解析功能不完善,如解析function,watermark等,所以比较鸡肋,还不如不用更换以前开发的解析层功能。 2.使用新接口ClusterClient.submitJob提交job 3. 4. [新特性](/doc/v3.0.0.md) 1. flink自带的sql解析 2. 使用新的job提交接口 2. 流批处理一体化实现 3. 钉钉/微信告警通知 [v2.0.1](https://github.com/ambition119/FlinkSQL/tree/v2.0.1) [v2.0.0](https://github.com/ambition119/FlinkSQL/tree/v2.0.0) 2019年4月 blink-client 接口定义 blink-sql/calcite stream和batch table的sql解析 blink-libraries 自定义source, sink, side开发 blink-batch BatchTableSource和BatchTableSink blink-stream StreamTableSource和StreamTableSink blink-job batch/stream job 提交 [v2.0.1新特性](/doc/v2.0.1.md) [v2.0.0新特性](/doc/v2.0.0.md) 1. 抽取sql层被流和批使用,SQL参考flink issues和对应提供的doc 2. 增加批处理开发 3. 增加维表功能 4. 升级flink版本为1.7.x [v1.0.0](https://github.com/ambition119/FlinkSQL/tree/v1.0.0) 2018年7月 blink-client 接口定义 blink-sqlserver stream table的sql解析 blink-job 封装为stream job [新特性](/doc/v1.0.0.md) 1. 实现create function 2. 实现sql开发流处理程序任务 3. 更改源码实现sql CEP 四.样例 #### v3.0.0 sql开发流任务示例: ```sql ``` #### v2.0.1 sql开发流任务示例: batch sql示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar'; CREATE TABLE csv_source ( id int, name varchar, `date` date , age int ) with ( type=source, connect.type=json, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json' ); CREATE TABLE csv_sink ( `date` date, age int, PRIMARY KEY (`date`) ) with ( type=sink, connect.type=csv, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv' ); create view view_select as SELECT `date`, age FROM csv_source group by `date`,age ; INSERT INTO csv_sink SELECT `date`, sum(age) FROM view_select group by `date` ; ``` stream sql 示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar'; CREATE TABLE kafka_source ( `date` varchar, amount float, proctime timestamp ) with ( type=source, 'connect.type'=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' ); CREATE TABLE mysql_sink ( `date` varchar, total_amount float, PRIMARY KEY (`date`) ) with ( type=mysql, 'connect.type'=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=10, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root ); CREATE VIEW view_select AS SELECT `date`, amount FROM kafka_source GROUP BY `date`, amount ; INSERT INTO mysql_sink SELECT `date`, sum(amount) as total_amount FROM view_select GROUP BY `date` ; ``` #### v2.0.0 sql开发流任务示例: batch sql示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar'; CREATE SOURCE TABLE json_source ( id int, name varchar, `date` date , age int ) with ( type=json, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo.json' ); CREATE SINK TABLE csv_sink ( `date` date, total_age int ) with ( type=csv, 'file.path'='file:///FlinkSQL/blink-job/src/test/resources/demo_out.csv' ); CREATE VIEW view_select as SELECT `date`, age FROM json_source GROUP BY `date`,age; INSERT INTO csv_sink SELECT `date`, sum(age) as total_age FROM view_select GROUP BY `date`; ``` stream sql 示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' LIBRARY 'hdfs://flink/udf/jedis.jar','hdfs://flink/udf/customudf.jar'; CREATE SOURCE TABLE kafka_source ( `date` varchar, amount float, proctime timestamp ) with ( type=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' ); CREATE SINK TABLE mysql_sink ( `date` varchar, total_amount float, PRIMARY KEY (`date`) ) with ( type=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=10, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root ); CREATE VIEW view_select AS SELECT `date`, amount FROM kafka_source GROUP BY `date`, amount ; INSERT INTO mysql_sink SELECT `date`, sum(amount) as total_amount FROM view_select GROUP BY `date` ; ``` #### v1.0.0 sql开发流任务示例: ```sql CREATE FUNCTION demouf AS 'ambition.api.sql.function.DemoUDF' USING JAR 'hdfs://flink/udf/jedis.jar', JAR 'hdfs://flink/udf/customudf.jar'; CREATE TABLE kafka_source ( `date` string, amount float, proctime timestamp ) with ( type=kafka, 'flink.parallelism'=1, 'kafka.topic'=topic, 'kafka.group.id'=flinks, 'kafka.enable.auto.commit'=true, 'kafka.bootstrap.servers'='localhost:9092' ); CREATE TABLE mysql_sink ( `date` string, amount float, PRIMARY KEY (`date`,amount) ) with ( type=mysql, 'mysql.connection'='localhost:3306', 'mysql.db.name'=flink, 'mysql.batch.size'=0, 'mysql.table.name'=flink_table, 'mysql.user'=root, 'mysql.pass'=root ); CREATE VIEW view_select AS SELECT `date`, amount FROM kafka_source GROUP BY `date`, amount ; INSERT INTO mysql_sink SELECT `date`, sum(amount) FROM view_select GROUP BY `date` ; ``` 五.代码关注 [apache flink](https://github.com/apache/flink) [apache calcite](https://github.com/apache/calcite) [uber AthenaX](https://github.com/uber/AthenaX) [DTStack flinkStreamSQL](https://github.com/DTStack/flinkStreamSQL)