# flink-cdc-code1997 **Repository Path**: code1997/flink-cdc-code1997 ## Basic Information - **Project Name**: flink-cdc-code1997 - **Description**: Flink CDC的demo仓库 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2022-04-26 - **Last Updated**: 2023-03-17 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # Flink CDC > 官方网址:https://ververica.github.io/flink-cdc-connectors/master/ > > 本文代码仓库:https://gitee.com/code1997/flink-cdc-code1997.git ## 第1章 基本介绍 ### 1.1 什么是 CDC CDC:Change Data Capture,即变更数据获取。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。 ### 1.2 CDC 的种类 CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别: | | 基于查询的 CDC | 基于 Binlog 的 CDC | | ------------------------ | ------------------------- | ------------------------ | | 开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium | | 执行模式 | Batch | Streaming | | 是否可以捕获所有数据变化 | 否,可能会存在数据丢失 | 是 | | 延迟性 | 高延迟 | 低延迟 | | 是否增加数据库压力 | 是,会对数据库进行查询 | 否 | ### 1.3 Flink-CDC Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 `MySQL`、`PostgreSQL` 等数据库直接读取全量数据和增量变更数据的 source 组件。 目前也已开源,开源地址: [https://github.com/ververica/flink](https://github.com/ververica/flink-cdc-connectors)[-](https://github.com/ververica/flink-cdc-connectors)[cdc](https://github.com/ververica/flink-cdc-connectors)[-](https://github.com/ververica/flink-cdc-connectors)[connectors](https://github.com/ververica/flink-cdc-connectors)[ ](https://github.com/ververica/flink-cdc-connectors) ![image-20220429110617866](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220429110617866.png) ## 第2章 FlinkCDC 案例实操 > Flink-cdc对接kafka:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html ### 2.1 DataStream 方式的应用 #### 2.1.1 导入依赖 采用`flink 1.13.6`以及`flink-cdc 2.2.1`,注意查看Flink和Flink-cdc在版本对应关系。 ```xml 4.0.0 org.itcode flink-cdc-code1997 1.0-SNAPSHOT 8 8 2.12 1.13.6 2.2.1 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.compile.version} ${flink.version} org.apache.flink flink-clients_${scala.compile.version} ${flink.version} org.apache.flink flink-table-api-scala-bridge_${scala.compile.version} ${flink.version} org.apache.flink flink-table-planner_${scala.compile.version} ${flink.version} org.apache.hadoop hadoop-client 3.1.3 mysql mysql-connector-java 8.0.16 com.ververica flink-connector-mysql-cdc ${flink-cdc.version} com.alibaba fastjson 1.2.75 flinl-cdc-code1997 org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single ``` #### 2.1.2 编写代码 > 需要注意hdfs认证问题,我这里使用kerberos认证,并且提前创建好check-point目录,赋予好目录的权限。 ```java package com.itcode; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.hadoop.security.UserGroupInformation; import javax.security.auth.Subject; import java.io.IOException; import java.net.URL; /** * Flink CDC的样例类 * * @author code1997 */ public class FlinkCDC { public static final String ENV = "env"; public static final String LOCAL_ENV = "local"; public static void main(String[] args) throws Exception { //注意远程提交的kerberos 认证问题 String env = ParameterTool.fromArgs(args).get(ENV, LOCAL_ENV); if (LOCAL_ENV.equalsIgnoreCase(env)) { kerberosAuthentication(); } //1.创建执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Flink-CDC将读取Binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Check point或者SavePoint上启动程序 //2.1 开始checkpoint,5s一次 executionEnvironment.enableCheckpointing(5000L); //2.2 指定CK的一致性语义 executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); executionEnvironment.getCheckpointConfig().setCheckpointTimeout(10000L); executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(2); executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000); //2.3 设置任务关闭的之前保存最后一次CK数据 executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //2.4 指定CK自动重启策略:设置值如果Flink job的重启策略,尝试3次,间隔2s. 默认情况下如何checkpoint那么就没有重启策略,如果设置了checkpoint,那么就会自动使用固定延迟重启策略。 executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L)); //2.5 设置状态后端和checkpoint保存路径,主要用于故障恢复和断电续传:fs,Memory,Rocksdb //executionEnvironment.setStateBackend(new FsStateBackend("hdfs://hadoop02:8020/flinkCDC"));//过时的,使用HashMapStateBacked和Checkpoint来代替。。 executionEnvironment.setStateBackend(new HashMapStateBackend()); executionEnvironment.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop02:8020/flink/flink-cdc/check-point"); //2.6 设置访问HDFS的用户名 //我的hdfs环境设置了kerberos认证的方式,所以这里的代码可以去掉,如果是用户名代理的方式,则可以直接使用一下代码。 //System.setProperty("HADOOP_USER_NAME", "code1997"); //因为使用了kerberos认证,所以需要使用 //3.创建Flink-mysql-cdc的source MySqlSource mySqlSource = MySqlSource.builder() .hostname("hadoop02") .port(3307) .username("root") .password("19971001") .databaseList("gmall_flink") //如果不指定会读取数据库下的所有表 .tableList("gmall_flink.activity_info") //初始化读取,会读取全量数据,适合第一次连接. .startupOptions(StartupOptions.initial()) //设置反序列化的方式 .deserializer(new StringDebeziumDeserializationSchema()) .build(); //4.使用CDC source从mysql中读取数据 DataStreamSource mysqlDS = executionEnvironment.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); //5.打印数据 mysqlDS.print(); //6.执行任务 executionEnvironment.execute(); } public static void kerberosAuthentication() { try { System.setProperty("java.security.krb5.conf", "src/main/resources/krb5/krb5.ini"); UserGroupInformation.loginUserFromKeytab("code1997@CODE1997.COM", "src/main/resources/keytab/code1997.keytab"); System.out.println(UserGroupInformation.getCurrentUser() + "=====" + UserGroupInformation.getLoginUser()); } catch (IOException e) { throw new RuntimeException("认证失败"); } } } ``` 本地测试结果: ![image-20220428121314690](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428121314690.png) - op=r:读取数据,第一次启动 - op=c:新增数据,存在after - op=u:修改数据,存在before和after - op=d:删除数据,存在before 测试结果分析: 1)初始化数据,op=r,存在after ![image-20220428123205121](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc//image-20220428123205121.png) 2)数据库中新增一条记录 > 因为数据库的默认事务隔离级别是read uncommited,我使用的图形化界面进行修改,所以会读出来多个记录,第一个为c,后面的为u。 ![image-20220428123706401](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428123706401.png) 3)删除一个记录:op=d,存在before,不存在after。 ![image-20220428123826774](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428123826774.png) #### 2.1.3 打包测试 > 打包测试: 1) 开启flink和hdfs集群 2)开启 MySQL Binlog 并重启 MySQL ```ini [mysqld] log-bin=mysql-bin binlog_format=row binlog-do-db=gmall_flink ``` 3)使用command提交程序 ```shell bin/flink run -d -m hadoop02:8081 \ -yD java.security.krb5.conf=/etc/krb5.conf \ -yD security.kerberos.login.keytab=/home/code1997/code1997.keytab \ -yD security.kerberos.login.principal=code1997@CODE1997.COM \ -c com.itcode.FlinkCDC ./flink-cdc-code1997-jar-with-dependencies.jar -env dev ``` 4)查看log ![image-20220428174914625](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428174914625.png) 5)在 MySQL 的 `gmall_flink.activity_info`表中添加、修改或者删除数据 6)查看当前已经保存的`checkpoint` ![image-20220428175333969](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428175333969.png) 7)给当前的 Flink 程序创建 Savepoint ```shell bin/flink savepoint ebc43dc460e01fd4e76f3af13c463b51 hdfs://hadoop02:8020/flink/flink-cdc/save-point ``` ![image-20220428175503588](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428175503588.png) 8)关闭程序并更改一条记录 9)从 Savepoint 重启程序 ```shell [root@hadoop02 flink-1.13.6]# bin/flink run -s hdfs://hadoop02:8020/flink/flink-cdc/save-point/savepoint-ebc43d-eadbd2c0c54c -d -c com.itcode.FlinkCDC ./flink-cdc-code1997-jar-with-dependencies.jar -env dev ``` ![image-20220428180657611](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428180657611.png) 只显示`save point`保存之后更改的数据。 ![image-20220428180545346](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428180545346.png) ### 2.2 FlinkSQL 方式的应用 #### 2.2.1 添加依赖 > 上一个功能已经添加了一些依赖,本次额外添加如下依赖。 ```xml org.apache.flink flink-table-planner-blink_${scala.compile.version} ${flink.version} ``` #### 2.2.2 代码实现 > flink sql每次只能监控一个表,默认是初始化模式。 ```java package com.itcode; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.calcite.CalciteConfig; import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; /** * Flink CDC的样例类 * * @author code1997 */ public class FlinkCdcBySql { public static final String ENV = "env"; public static final String LOCAL_ENV = "local"; public static void main(String[] args) throws Exception { //1.创建执行环境 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); executionEnvironment.setParallelism(1); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment); tableEnvironment.getConfig().setPlannerConfig(CalciteConfig.DEFAULT()); String sourceDdl = "create table mysql_source (" + "db_name STRING METADATA FROM 'database_name' VIRTUAL," + "table_name STRING METADATA FROM 'table_name' VIRTUAL," + "id INT," + "activity_name String," + "PRIMARY KEY(id) NOT ENFORCED" + ") WITH (" + "'connector'='mysql-cdc'," + "'hostname'='hadoop02'," + "'port'='3307'," + "'username'='root'," + "'password'='19971001'," + "'database-name'='gmall_flink'," + "'table-name'='activity_info')"; tableEnvironment.executeSql(sourceDdl); String query = "select * from mysql_source"; Table result = tableEnvironment.sqlQuery(query); result.printSchema(); //将table转换成流进行打印:因为存在存在修改和删除所以需要使用更改日志流或者retract流 tableEnvironment.toChangelogStream(result).print(); executionEnvironment.execute(); } } ``` 输出结果: ![image-20220428182951794](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428182951794.png) ### 2.3 自定义反序列化器 > 我们发现flink-cdc生成的数据格式可能不是我们想要的,那么就可以自定义一个反序列化器,模仿`StringDebeziumDeserializationSchema`。 1)原始数据格式 不利于我们后续的处理,因此我们需要自定义反序列化器。 ![image-20220428184339998](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428184339998.png) 2)代码实现 ```java package com.itcode.Deserializer; import com.alibaba.fastjson.JSONObject; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; public class CustomerDeserializationSchema implements DebeziumDeserializationSchema { private static final String DATABASE = "database"; private static final String TABLE_NAME = "tableName"; private static final String BEFORE = "before"; private static final String AFTER = "after"; private static final String TYPE = "type"; private static final String DOT_SEPARATOR = "\\."; /** * 封装的格式 * { * "database":"", * "tableName":"", * "type":"c,r,u,d", * "before":{}, * "after":{} * } */ @Override public void deserialize(SourceRecord record, Collector out) throws Exception { JSONObject jsonObject = new JSONObject(); //1.封装数据库和表名 String topic = record.topic(); String[] topicAttributes = topic.split(DOT_SEPARATOR); jsonObject.put(DATABASE, topicAttributes[1]); jsonObject.put(TABLE_NAME, topicAttributes[2]); //2.封装操作类型 Envelope.Operation operation = Envelope.operationFor(record); String operator = operation.name(); jsonObject.put(TYPE, "create".equalsIgnoreCase(operator) ? "insert" : operator.toLowerCase()); //3.封装before和after Struct value = (Struct) record.value(); extractFields(jsonObject, value, BEFORE); extractFields(jsonObject, value, AFTER); out.collect(jsonObject.toJSONString()); } private void extractFields(JSONObject jsonObject, Struct value, String flag) { Struct flagVal = value.getStruct(flag); JSONObject flagJson = new JSONObject(); if (flagVal != null) { for (Field field : flagVal.schema().fields()) { //注意不能使用toString flagJson.put(field.name(), flagVal.get(field)); } } jsonObject.put(flag, flagJson.toJSONString()); } @Override public TypeInformation getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } } ``` 3)结果展示 ![image-20220428222539898](https://cdn.jsdelivr.net/gh/code19971001/blog-images@master/bigdata/flink-cdc/image-20220428222539898.png) ### 2.4 DataStream Vs Flink SQL - DataStream: - 优点:支持多库多表。 - 缺点:需要自定义反序列化器。 - Flink SQL: - 优点:不需要自定义反序列化器。 - 缺点:只支持单表查询。