# flink-cdc-code1997
**Repository Path**: code1997/flink-cdc-code1997
## Basic Information
- **Project Name**: flink-cdc-code1997
- **Description**: Flink CDC的demo仓库
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 1
- **Forks**: 0
- **Created**: 2022-04-26
- **Last Updated**: 2023-03-17
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# Flink CDC
> 官方网址:https://ververica.github.io/flink-cdc-connectors/master/
>
> 本文代码仓库:https://gitee.com/code1997/flink-cdc-code1997.git
## 第1章 基本介绍
### 1.1 什么是 CDC
CDC:Change Data Capture,即变更数据获取。
核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
### 1.2 CDC 的种类
CDC 主要分为基于查询和基于 Binlog 两种方式,我们主要了解一下这两种之间的区别:
| | 基于查询的 CDC | 基于 Binlog 的 CDC |
| ------------------------ | ------------------------- | ------------------------ |
| 开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
| 执行模式 | Batch | Streaming |
| 是否可以捕获所有数据变化 | 否,可能会存在数据丢失 | 是 |
| 延迟性 | 高延迟 | 低延迟 |
| 是否增加数据库压力 | 是,会对数据库进行查询 | 否 |
### 1.3 Flink-CDC
Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 `MySQL`、`PostgreSQL` 等数据库直接读取全量数据和增量变更数据的 source 组件。
目前也已开源,开源地址: [https://github.com/ververica/flink](https://github.com/ververica/flink-cdc-connectors)[-](https://github.com/ververica/flink-cdc-connectors)[cdc](https://github.com/ververica/flink-cdc-connectors)[-](https://github.com/ververica/flink-cdc-connectors)[connectors](https://github.com/ververica/flink-cdc-connectors)[ ](https://github.com/ververica/flink-cdc-connectors)

## 第2章 FlinkCDC 案例实操
> Flink-cdc对接kafka:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html
### 2.1 DataStream 方式的应用
#### 2.1.1 导入依赖
采用`flink 1.13.6`以及`flink-cdc 2.2.1`,注意查看Flink和Flink-cdc在版本对应关系。
```xml
4.0.0
org.itcode
flink-cdc-code1997
1.0-SNAPSHOT
8
8
2.12
1.13.6
2.2.1
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.compile.version}
${flink.version}
org.apache.flink
flink-clients_${scala.compile.version}
${flink.version}
org.apache.flink
flink-table-api-scala-bridge_${scala.compile.version}
${flink.version}
org.apache.flink
flink-table-planner_${scala.compile.version}
${flink.version}
org.apache.hadoop
hadoop-client
3.1.3
mysql
mysql-connector-java
8.0.16
com.ververica
flink-connector-mysql-cdc
${flink-cdc.version}
com.alibaba
fastjson
1.2.75
flinl-cdc-code1997
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
```
#### 2.1.2 编写代码
> 需要注意hdfs认证问题,我这里使用kerberos认证,并且提前创建好check-point目录,赋予好目录的权限。
```java
package com.itcode;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.security.UserGroupInformation;
import javax.security.auth.Subject;
import java.io.IOException;
import java.net.URL;
/**
* Flink CDC的样例类
*
* @author code1997
*/
public class FlinkCDC {
public static final String ENV = "env";
public static final String LOCAL_ENV = "local";
public static void main(String[] args) throws Exception {
//注意远程提交的kerberos 认证问题
String env = ParameterTool.fromArgs(args).get(ENV, LOCAL_ENV);
if (LOCAL_ENV.equalsIgnoreCase(env)) {
kerberosAuthentication();
}
//1.创建执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//2.Flink-CDC将读取Binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Check point或者SavePoint上启动程序
//2.1 开始checkpoint,5s一次
executionEnvironment.enableCheckpointing(5000L);
//2.2 指定CK的一致性语义
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(10000L);
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
//2.3 设置任务关闭的之前保存最后一次CK数据
executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定CK自动重启策略:设置值如果Flink job的重启策略,尝试3次,间隔2s. 默认情况下如何checkpoint那么就没有重启策略,如果设置了checkpoint,那么就会自动使用固定延迟重启策略。
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//2.5 设置状态后端和checkpoint保存路径,主要用于故障恢复和断电续传:fs,Memory,Rocksdb
//executionEnvironment.setStateBackend(new FsStateBackend("hdfs://hadoop02:8020/flinkCDC"));//过时的,使用HashMapStateBacked和Checkpoint来代替。。
executionEnvironment.setStateBackend(new HashMapStateBackend());
executionEnvironment.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop02:8020/flink/flink-cdc/check-point");
//2.6 设置访问HDFS的用户名
//我的hdfs环境设置了kerberos认证的方式,所以这里的代码可以去掉,如果是用户名代理的方式,则可以直接使用一下代码。
//System.setProperty("HADOOP_USER_NAME", "code1997");
//因为使用了kerberos认证,所以需要使用
//3.创建Flink-mysql-cdc的source
MySqlSource mySqlSource = MySqlSource.builder()
.hostname("hadoop02")
.port(3307)
.username("root")
.password("19971001")
.databaseList("gmall_flink")
//如果不指定会读取数据库下的所有表
.tableList("gmall_flink.activity_info")
//初始化读取,会读取全量数据,适合第一次连接.
.startupOptions(StartupOptions.initial())
//设置反序列化的方式
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//4.使用CDC source从mysql中读取数据
DataStreamSource mysqlDS = executionEnvironment.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
//5.打印数据
mysqlDS.print();
//6.执行任务
executionEnvironment.execute();
}
public static void kerberosAuthentication() {
try {
System.setProperty("java.security.krb5.conf", "src/main/resources/krb5/krb5.ini");
UserGroupInformation.loginUserFromKeytab("code1997@CODE1997.COM", "src/main/resources/keytab/code1997.keytab");
System.out.println(UserGroupInformation.getCurrentUser() + "=====" + UserGroupInformation.getLoginUser());
} catch (IOException e) {
throw new RuntimeException("认证失败");
}
}
}
```
本地测试结果:

- op=r:读取数据,第一次启动
- op=c:新增数据,存在after
- op=u:修改数据,存在before和after
- op=d:删除数据,存在before
测试结果分析:
1)初始化数据,op=r,存在after

2)数据库中新增一条记录
> 因为数据库的默认事务隔离级别是read uncommited,我使用的图形化界面进行修改,所以会读出来多个记录,第一个为c,后面的为u。

3)删除一个记录:op=d,存在before,不存在after。

#### 2.1.3 打包测试
> 打包测试:
1) 开启flink和hdfs集群
2)开启 MySQL Binlog 并重启 MySQL
```ini
[mysqld]
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall_flink
```
3)使用command提交程序
```shell
bin/flink run -d -m hadoop02:8081 \
-yD java.security.krb5.conf=/etc/krb5.conf \
-yD security.kerberos.login.keytab=/home/code1997/code1997.keytab \
-yD security.kerberos.login.principal=code1997@CODE1997.COM \
-c com.itcode.FlinkCDC ./flink-cdc-code1997-jar-with-dependencies.jar -env dev
```
4)查看log

5)在 MySQL 的 `gmall_flink.activity_info`表中添加、修改或者删除数据
6)查看当前已经保存的`checkpoint`

7)给当前的 Flink 程序创建 Savepoint
```shell
bin/flink savepoint ebc43dc460e01fd4e76f3af13c463b51 hdfs://hadoop02:8020/flink/flink-cdc/save-point
```

8)关闭程序并更改一条记录
9)从 Savepoint 重启程序
```shell
[root@hadoop02 flink-1.13.6]# bin/flink run -s hdfs://hadoop02:8020/flink/flink-cdc/save-point/savepoint-ebc43d-eadbd2c0c54c -d -c com.itcode.FlinkCDC ./flink-cdc-code1997-jar-with-dependencies.jar -env dev
```

只显示`save point`保存之后更改的数据。

### 2.2 FlinkSQL 方式的应用
#### 2.2.1 添加依赖
> 上一个功能已经添加了一些依赖,本次额外添加如下依赖。
```xml
org.apache.flink
flink-table-planner-blink_${scala.compile.version}
${flink.version}
```
#### 2.2.2 代码实现
> flink sql每次只能监控一个表,默认是初始化模式。
```java
package com.itcode;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.calcite.CalciteConfig;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
/**
* Flink CDC的样例类
*
* @author code1997
*/
public class FlinkCdcBySql {
public static final String ENV = "env";
public static final String LOCAL_ENV = "local";
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setParallelism(1);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment);
tableEnvironment.getConfig().setPlannerConfig(CalciteConfig.DEFAULT());
String sourceDdl = "create table mysql_source (" +
"db_name STRING METADATA FROM 'database_name' VIRTUAL," +
"table_name STRING METADATA FROM 'table_name' VIRTUAL," +
"id INT," +
"activity_name String," +
"PRIMARY KEY(id) NOT ENFORCED" +
") WITH (" +
"'connector'='mysql-cdc'," +
"'hostname'='hadoop02'," +
"'port'='3307'," +
"'username'='root'," +
"'password'='19971001'," +
"'database-name'='gmall_flink'," +
"'table-name'='activity_info')";
tableEnvironment.executeSql(sourceDdl);
String query = "select * from mysql_source";
Table result = tableEnvironment.sqlQuery(query);
result.printSchema();
//将table转换成流进行打印:因为存在存在修改和删除所以需要使用更改日志流或者retract流
tableEnvironment.toChangelogStream(result).print();
executionEnvironment.execute();
}
}
```
输出结果:

### 2.3 自定义反序列化器
> 我们发现flink-cdc生成的数据格式可能不是我们想要的,那么就可以自定义一个反序列化器,模仿`StringDebeziumDeserializationSchema`。
1)原始数据格式
不利于我们后续的处理,因此我们需要自定义反序列化器。

2)代码实现
```java
package com.itcode.Deserializer;
import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema {
private static final String DATABASE = "database";
private static final String TABLE_NAME = "tableName";
private static final String BEFORE = "before";
private static final String AFTER = "after";
private static final String TYPE = "type";
private static final String DOT_SEPARATOR = "\\.";
/**
* 封装的格式
* {
* "database":"",
* "tableName":"",
* "type":"c,r,u,d",
* "before":{},
* "after":{}
* }
*/
@Override
public void deserialize(SourceRecord record, Collector out) throws Exception {
JSONObject jsonObject = new JSONObject();
//1.封装数据库和表名
String topic = record.topic();
String[] topicAttributes = topic.split(DOT_SEPARATOR);
jsonObject.put(DATABASE, topicAttributes[1]);
jsonObject.put(TABLE_NAME, topicAttributes[2]);
//2.封装操作类型
Envelope.Operation operation = Envelope.operationFor(record);
String operator = operation.name();
jsonObject.put(TYPE, "create".equalsIgnoreCase(operator) ? "insert" : operator.toLowerCase());
//3.封装before和after
Struct value = (Struct) record.value();
extractFields(jsonObject, value, BEFORE);
extractFields(jsonObject, value, AFTER);
out.collect(jsonObject.toJSONString());
}
private void extractFields(JSONObject jsonObject, Struct value, String flag) {
Struct flagVal = value.getStruct(flag);
JSONObject flagJson = new JSONObject();
if (flagVal != null) {
for (Field field : flagVal.schema().fields()) {
//注意不能使用toString
flagJson.put(field.name(), flagVal.get(field));
}
}
jsonObject.put(flag, flagJson.toJSONString());
}
@Override
public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
```
3)结果展示

### 2.4 DataStream Vs Flink SQL
- DataStream:
- 优点:支持多库多表。
- 缺点:需要自定义反序列化器。
- Flink SQL:
- 优点:不需要自定义反序列化器。
- 缺点:只支持单表查询。