問題描述
我正在嘗試使用 Apache Kafka 將事件從
雖然插入和更新工作正常,但我無法理解了解如何從 MySQL
流式傳輸?shù)?PostgreSQL
.MySQL
中刪除記錄并將此事件流式傳輸?shù)?PostgreSQL
.
I am trying to stream events from MySQL
to PostgreSQL
using Apache Kafka.
Although insertions and updates work fine, I can't figure out how to delete a record from MySQL
and stream this event to PostgreSQL
.
假設(shè)以下拓撲:
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+
我正在使用以下 docker 鏡像;
I am using the following docker images;
- Apache-Zookeper
- Apache-Kafka
- Debezium/JDBC 連接器
然后
# Start the application
export DEBEZIUM_VERSION=0.6
docker-compose up
# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
這里是MySQL數(shù)據(jù)庫的內(nèi)容;
Here is the content of MySQL database;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
并且我們可以驗證PostgresSQL的內(nèi)容是一樣的;
And we can verify that the content of PostgresSQL is identical;
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)
假設(shè)我想從 MySQL 數(shù)據(jù)庫中刪除 id=1004
的記錄;
Assume that I want to delete the record with id=1004
from MySQL database;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> delete from customers where id = 1004;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
+------+------------+-----------+-----------------------+
雖然從 MySQL 中刪除了該記錄,但該條目仍然出現(xiàn)在 PostgresSQL 中
Although the record is deleted from MySQL, the entry still appears in PostgresSQL
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)
我知道支持軟刪除,但是是否可以從 PostgresSQL
中完全刪除該特定條目(通過 Apache-Kafka 從 MySQL 流式傳輸 del 事件)?
I understand that soft deletes are supported however, is it possible to completely delete that particular entry from PostgresSQL
as well (by streaming the del event from MySQL via Apache-Kafka)?
這是source.json
文件的內(nèi)容
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
這里是jdbc-sink.json
文件的內(nèi)容
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
我也嘗試設(shè)置 "pk.mode": "record_key"
和 "delete.enabled": "true"
(錯誤修復建議) 但這種修改似乎不起作用.
I have also tried to set "pk.mode": "record_key"
and "delete.enabled": "true"
(bug fix suggestion) but this modification doesn't seem to work.
推薦答案
Confluent JDBC 接收器連接器當前不支持刪除.有一個待處理的拉取請求(您已鏈接到它),但尚未合并.
Deletes are currently not supported by the Confluent JDBC sink connector. There's a pending pull request (you already linked to it), but this hasn't been merged yet.
目前,您可以自己基于該分支構(gòu)建 JDBC 接收器連接器,也可以創(chuàng)建一個簡單的自定義接收器連接器,該連接器通過在目標數(shù)據(jù)庫上執(zhí)行相應的 DELETE 語句來處理邏輯刪除事件.
For the time being, you could either build the JDBC sink connector based on that branch yourself, or you create a simple custom sink connector which just handles tombstone events by executing a corresponding DELETE statement on the target database.
這篇關(guān)于通過 Apache-kafka 將刪除事件從 MySQL 流式傳輸?shù)?PostgreSQL的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!