一、CDC
CDC (Change Data Capture) ,在廣義的概念上,只要能捕獲數(shù)據(jù)變更的技術(shù),都可以稱為 CDC 。但通常我們說的CDC 技術(shù)主要面向數(shù)據(jù)庫(包括常見的mysql,Oracle, MongoDB等)的變更,是一種用于捕獲數(shù)據(jù)庫中數(shù)據(jù)變更的技術(shù)。
二、常見CDC的比較
常見的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。
- DataX,Sqoop和kettle的CDC實現(xiàn)技術(shù)主要是基于查詢的方式實現(xiàn)的,通過離線調(diào)度查詢作業(yè),實現(xiàn)批處理請求。這種作業(yè)方式無法保證數(shù)據(jù)的一致性,實時性也較差。
- Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技術(shù)。這種技術(shù),利用流處理的方式,實時處理日志數(shù)據(jù),保證了數(shù)據(jù)的一致性,為其他服務(wù)提供了實時數(shù)據(jù)。
三、Flink CDC
2020年 Flink cdc 首次在 Flink forward 大會上官宣, 由 Jark Wu & Qingsheng Ren 兩位大佬提出。
Flink CDC connector 可以捕獲在一個或多個表中發(fā)生的所有變更。該模式通常有一個前記錄和一個后記錄。Flink CDC connector 可以直接在Flink中以非約束模式(流)使用,而不需要使用類似 kafka 之類的中間件中轉(zhuǎn)數(shù)據(jù)。
四、Flink CDC支持的數(shù)據(jù)庫
PS:
Flink CDC 2.2才新增OceanBase,PolarDB-X,SqlServer,TiDB 四種數(shù)據(jù)源接入,均支持全量和增量一體化同步。
截止到目前FlinkCDC已經(jīng)支持12+數(shù)據(jù)源。
五、阿里實現(xiàn)的FlinkCDC使用示例
依賴引入
<!-- flink table支持 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 阿里實現(xiàn)的flink mysql CDC -->
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.80</version>
</dependency>
<!-- jackson報錯解決 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>${jackson.version}</version>
</dependency>
基于table
package spendreport.cdc;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
;
/**
* @author zhengwen
**/
public class TestMySqlFlinkCDC {
public static void main(String[] args) throws Exception {
//1.創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.Flink-CDC 將讀取 binlog 的位置信息以狀態(tài)的方式保存在 CK,如果想要做到斷點續(xù)傳, 需要從 Checkpoint 或者 Savepoint 啟動程序
//2.1 開啟 Checkpoint,每隔 5 秒鐘做一次 CK
env.enableCheckpointing(5000L);
//2.2 指定 CK 的一致性語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//2.3 設(shè)置任務(wù)關(guān)閉的時候保留最后一次 CK 數(shù)據(jù)
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//2.4 指定從 CK 自動重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("127.0.0.1")
.serverTimeZone("GMT+8") //時區(qū)報錯增加這個設(shè)置
.port(3306)
.username("root")
.password("123456")
.databaseList("wz")
.tableList("wz.user_info") //注意表一定要寫庫名.表名這種,多個,隔開
.startupOptions(StartupOptions.initial())
//自定義轉(zhuǎn)json格式化
.deserializer(new MyJsonDebeziumDeserializationSchema())
//自帶string格式序列化
//.deserializer(new StringDebeziumDeserializationSchema())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//TODO 可以keyBy,比如根據(jù)table或type,然后開窗處理
//3.打印數(shù)據(jù)
streamSource.print();
//streamSource.addSink(); 輸出
//4.執(zhí)行任務(wù)
env.execute("flinkTableCDC");
}
private static class MyJsonDebeziumDeserializationSchema implements
com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
throws Exception {
Struct value = (Struct) sourceRecord.value();
Struct source = value.getStruct("source");
//獲取數(shù)據(jù)庫名稱
String db = source.getString("db");
String table = source.getString("table");
//獲取數(shù)據(jù)類型
String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
if (type.equals("create")) {
type = "insert";
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("database", db);
jsonObject.put("table", table);
jsonObject.put("type", type);
//獲取數(shù)據(jù)data
Struct after = value.getStruct("after");
JSONObject dataJson = new JSONObject();
List<Field> fields = after.schema().fields();
for (Field field : fields) {
String field_name = field.name();
Object fieldValue = after.get(field);
dataJson.put(field_name, fieldValue);
}
jsonObject.put("data", dataJson);
collector.collect(JSONObject.toJSONString(jsonObject));
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
}
運行效果
PS:
- 操作數(shù)據(jù)庫的增刪改就會立馬觸發(fā)
- 這里是自定義的序列化轉(zhuǎn)json格式字符串,自帶的字符串序列化也是可以的(可以自己試試打印的內(nèi)容)
基于sql
package spendreport.cdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author zhengwen
**/
public class TestMySqlFlinkCDC2 {
public static void main(String[] args) throws Exception {
//1.創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.創(chuàng)建 Flink-MySQL-CDC 的 Source
String connectorName = "mysql-cdc";
String dbHostName = "127.0.0.1";
String dbPort = "3306";
String dbUsername = "root";
String dbPassword = "123456";
String dbDatabaseName = "wz";
String dbTableName = "user_info";
String tableSql = "CREATE TABLE t_user_info ("
+ "id int,mobile varchar(20),"
+ "user_name varchar(30),"
+ "real_name varchar(60),"
+ "id_card varchar(20),"
+ "org_name varchar(100),"
+ "user_stars int,"
+ "create_by int,"
// + "create_time datetime,"
+ "update_by int,"
// + "update_time datetime,"
+ "is_deleted int) "
+ " WITH ("
+ " 'connector' = '" + connectorName + "',"
+ " 'hostname' = '" + dbHostName + "',"
+ " 'port' = '" + dbPort + "',"
+ " 'username' = '" + dbUsername + "',"
+ " 'password' = '" + dbPassword + "',"
+ " 'database-name' = '" + dbDatabaseName + "',"
+ " 'table-name' = '" + dbTableName + "'"
+ ")";
tableEnv.executeSql(tableSql);
tableEnv.executeSql("select * from t_user_info").print();
env.execute();
}
}
運行效果:
總結(jié)
既然是基于日志,那么數(shù)據(jù)庫的配置文件肯定要開啟日志功能,這里mysql需要開啟內(nèi)容
server-id=1
log_bin=mysql-bin
binlog_format=ROW #目前還只能支持行
expire_logs_days=30
binlog_do_db=wz #這里binlog的庫如果有多個就再寫一行,千萬不要寫成用,隔開
- 實時性確實高,比那些自動任務(wù)定時取體驗號百倍
- 流示的確實絲滑
最后肯定證明這種方式同步數(shù)據(jù)可行,而且實時性特高,但是就是不知道我們的目標(biāo)數(shù)據(jù)庫是否可以開啟這些日志配置。UP!
到此這篇關(guān)于Flink流處理引擎零基礎(chǔ)速通之?dāng)?shù)據(jù)的抽取篇的文章就介紹到這了,更多相關(guān)Flink數(shù)據(jù)的抽取內(nèi)容請搜索html5模板網(wǎng)以前的文章希望大家以后多多支持html5模板網(wǎng)!