久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

<i id='C4yno'><tr id='C4yno'><dt id='C4yno'><q id='C4yno'><span id='C4yno'><b id='C4yno'><form id='C4yno'><ins id='C4yno'></ins><ul id='C4yno'></ul><sub id='C4yno'></sub></form><legend id='C4yno'></legend><bdo id='C4yno'><pre id='C4yno'><center id='C4yno'></center></pre></bdo></b><th id='C4yno'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='C4yno'><tfoot id='C4yno'></tfoot><dl id='C4yno'><fieldset id='C4yno'></fieldset></dl></div>

<small id='C4yno'></small><noframes id='C4yno'>

  • <tfoot id='C4yno'></tfoot>
    <legend id='C4yno'><style id='C4yno'><dir id='C4yno'><q id='C4yno'></q></dir></style></legend>

          <bdo id='C4yno'></bdo><ul id='C4yno'></ul>

        通過 Apache-kafka 將刪除事件從 MySQL 流式傳輸?shù)?

        Stream delete events from MySQL to PostgreSQL via Apache-kafka(通過 Apache-kafka 將刪除事件從 MySQL 流式傳輸?shù)?PostgreSQL)
      1. <small id='m6W7b'></small><noframes id='m6W7b'>

            <tbody id='m6W7b'></tbody>
            <bdo id='m6W7b'></bdo><ul id='m6W7b'></ul>

              <i id='m6W7b'><tr id='m6W7b'><dt id='m6W7b'><q id='m6W7b'><span id='m6W7b'><b id='m6W7b'><form id='m6W7b'><ins id='m6W7b'></ins><ul id='m6W7b'></ul><sub id='m6W7b'></sub></form><legend id='m6W7b'></legend><bdo id='m6W7b'><pre id='m6W7b'><center id='m6W7b'></center></pre></bdo></b><th id='m6W7b'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='m6W7b'><tfoot id='m6W7b'></tfoot><dl id='m6W7b'><fieldset id='m6W7b'></fieldset></dl></div>

                <legend id='m6W7b'><style id='m6W7b'><dir id='m6W7b'><q id='m6W7b'></q></dir></style></legend>
                <tfoot id='m6W7b'></tfoot>
                • 本文介紹了通過 Apache-kafka 將刪除事件從 MySQL 流式傳輸?shù)?PostgreSQL的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

                  問題描述

                  我正在嘗試使用 Apache Kafka 將事件從 MySQL 流式傳輸?shù)?PostgreSQL. 雖然插入和更新工作正常,但我無法理解了解如何從 MySQL 中刪除記錄并將此事件流式傳輸?shù)?PostgreSQL.

                  I am trying to stream events from MySQL to PostgreSQL using Apache Kafka. Although insertions and updates work fine, I can't figure out how to delete a record from MySQL and stream this event to PostgreSQL.

                  假設(shè)以下拓撲:

                                 +-------------+
                                 |             |
                                 |    MySQL    |
                                 |             |
                                 +------+------+
                                        |
                                        |
                                        |
                        +---------------v------------------+
                        |                                  |
                        |           Kafka Connect          |
                        |  (Debezium, JDBC connectors)     |
                        |                                  |
                        +---------------+------------------+
                                        |
                                        |
                                        |
                                        |
                                +-------v--------+
                                |                |
                                |   PostgreSQL   |
                                |                |
                                +----------------+
                  

                  我正在使用以下 docker 鏡像;

                  I am using the following docker images;

                  1. Apache-Zookeper
                  2. Apache-Kafka
                  3. Debezium/JDBC 連接器

                  然后

                  # Start the application
                  export DEBEZIUM_VERSION=0.6
                  docker-compose up
                  
                  # Start PostgreSQL connector
                  curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
                  
                  # Start MySQL connector
                  curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
                  

                  這里是MySQL數(shù)據(jù)庫的內(nèi)容;

                  Here is the content of MySQL database;

                  docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
                  +------+------------+-----------+-----------------------+
                  | id   | first_name | last_name | email                 |
                  +------+------------+-----------+-----------------------+
                  | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
                  | 1002 | George     | Bailey    | gbailey@foobar.com    |
                  | 1003 | Edward     | Walker    | ed@walker.com         |
                  | 1004 | Anne       | Kretchmar | annek@noanswer.org    |
                  +------+------------+-----------+-----------------------+
                  

                  并且我們可以驗證PostgresSQL的內(nèi)容是一樣的;

                  And we can verify that the content of PostgresSQL is identical;

                  docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
                   last_name |  id  | first_name |         email         
                  -----------+------+------------+-----------------------
                   Thomas    | 1001 | Sally      | sally.thomas@acme.com
                   Bailey    | 1002 | George     | gbailey@foobar.com
                   Walker    | 1003 | Edward     | ed@walker.com
                   Kretchmar | 1004 | Anne       | annek@noanswer.org
                  (4 rows)
                  

                  假設(shè)我想從 MySQL 數(shù)據(jù)庫中刪除 id=1004 的記錄;

                  Assume that I want to delete the record with id=1004 from MySQL database;

                  docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'
                  mysql> delete from customers where id = 1004;
                  
                  
                  docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
                  +------+------------+-----------+-----------------------+
                  | id   | first_name | last_name | email                 |
                  +------+------------+-----------+-----------------------+
                  | 1001 | Sally      | Thomas    | sally.thomas@acme.com |
                  | 1002 | George     | Bailey    | gbailey@foobar.com    |
                  | 1003 | Edward     | Walker    | ed@walker.com         |
                  +------+------------+-----------+-----------------------+
                  

                  雖然從 MySQL 中刪除了該記錄,但該條目仍然出現(xiàn)在 PostgresSQL 中

                  Although the record is deleted from MySQL, the entry still appears in PostgresSQL

                  docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
                  
                   last_name |  id  | first_name |         email         
                  -----------+------+------------+-----------------------
                   Thomas    | 1001 | Sally      | sally.thomas@acme.com
                   Bailey    | 1002 | George     | gbailey@foobar.com
                   Walker    | 1003 | Edward     | ed@walker.com
                   Kretchmar | 1004 | Anne       | annek@noanswer.org
                  (4 rows)
                  

                  我知道支持軟刪除,但是是否可以從 PostgresSQL 中完全刪除該特定條目(通過 Apache-Kafka 從 MySQL 流式傳輸 del 事件)?

                  I understand that soft deletes are supported however, is it possible to completely delete that particular entry from PostgresSQL as well (by streaming the del event from MySQL via Apache-Kafka)?

                  這是source.json文件的內(nèi)容

                  {
                      "name": "inventory-connector",
                      "config": {
                          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
                          "tasks.max": "1",
                          "database.hostname": "mysql",
                          "database.port": "3306",
                          "database.user": "debezium",
                          "database.password": "dbz",
                          "database.server.id": "184054",
                          "database.server.name": "dbserver1",
                          "database.whitelist": "inventory",
                          "database.history.kafka.bootstrap.servers": "kafka:9092",
                          "database.history.kafka.topic": "schema-changes.inventory",
                          "transforms": "route",
                          "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
                          "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
                          "transforms.route.replacement": "$3"
                      }
                  }
                  

                  這里是jdbc-sink.json文件的內(nèi)容

                  {
                      "name": "jdbc-sink",
                      "config": {
                          "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
                          "tasks.max": "1",
                          "topics": "customers",
                          "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
                          "transforms": "unwrap",
                          "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
                          "auto.create": "true",
                          "insert.mode": "upsert",
                          "pk.fields": "id",
                          "pk.mode": "record_value"
                      }
                  }
                  

                  我也嘗試設(shè)置 "pk.mode": "record_key""delete.enabled": "true" (錯誤修復建議) 但這種修改似乎不起作用.

                  I have also tried to set "pk.mode": "record_key" and "delete.enabled": "true" (bug fix suggestion) but this modification doesn't seem to work.

                  推薦答案

                  Confluent JDBC 接收器連接器當前不支持刪除.有一個待處理的拉取請求(您已鏈接到它),但尚未合并.

                  Deletes are currently not supported by the Confluent JDBC sink connector. There's a pending pull request (you already linked to it), but this hasn't been merged yet.

                  目前,您可以自己基于該分支構(gòu)建 JDBC 接收器連接器,也可以創(chuàng)建一個簡單的自定義接收器連接器,該連接器通過在目標數(shù)據(jù)庫上執(zhí)行相應的 DELETE 語句來處理邏輯刪除事件.

                  For the time being, you could either build the JDBC sink connector based on that branch yourself, or you create a simple custom sink connector which just handles tombstone events by executing a corresponding DELETE statement on the target database.

                  這篇關(guān)于通過 Apache-kafka 將刪除事件從 MySQL 流式傳輸?shù)?PostgreSQL的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

                  【網(wǎng)站聲明】本站部分內(nèi)容來源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請聯(lián)系我們刪除處理,感謝您的支持!

                  相關(guān)文檔推薦

                  How to use windowing functions efficiently to decide next N number of rows based on N number of previous values(如何有效地使用窗口函數(shù)根據(jù) N 個先前值來決定接下來的 N 個行)
                  reuse the result of a select expression in the quot;GROUP BYquot; clause?(在“GROUP BY中重用選擇表達式的結(jié)果;條款?)
                  Does ignore option of Pyspark DataFrameWriter jdbc function ignore entire transaction or just offending rows?(Pyspark DataFrameWriter jdbc 函數(shù)的 ignore 選項是忽略整個事務還是只是有問題的行?) - IT屋-程序員軟件開發(fā)技
                  Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array(使用 INSERT INTO table ON DUPLICATE KEY 時出錯,使用 for 循環(huán)數(shù)組)
                  pyspark mysql jdbc load An error occurred while calling o23.load No suitable driver(pyspark mysql jdbc load 調(diào)用 o23.load 時發(fā)生錯誤 沒有合適的驅(qū)動程序)
                  How to integrate Apache Spark with MySQL for reading database tables as a spark dataframe?(如何將 Apache Spark 與 MySQL 集成以將數(shù)據(jù)庫表作為 Spark 數(shù)據(jù)幀讀取?)
                • <i id='uNfaN'><tr id='uNfaN'><dt id='uNfaN'><q id='uNfaN'><span id='uNfaN'><b id='uNfaN'><form id='uNfaN'><ins id='uNfaN'></ins><ul id='uNfaN'></ul><sub id='uNfaN'></sub></form><legend id='uNfaN'></legend><bdo id='uNfaN'><pre id='uNfaN'><center id='uNfaN'></center></pre></bdo></b><th id='uNfaN'></th></span></q></dt></tr></i><div class="qwawimqqmiuu" id='uNfaN'><tfoot id='uNfaN'></tfoot><dl id='uNfaN'><fieldset id='uNfaN'></fieldset></dl></div>

                    <bdo id='uNfaN'></bdo><ul id='uNfaN'></ul>
                      <tbody id='uNfaN'></tbody>

                          <small id='uNfaN'></small><noframes id='uNfaN'>

                            <tfoot id='uNfaN'></tfoot>
                          • <legend id='uNfaN'><style id='uNfaN'><dir id='uNfaN'><q id='uNfaN'></q></dir></style></legend>
                          • 主站蜘蛛池模板: 日韩欧美一区二区三区四区 | 精品国产成人 | 成人激情视频网 | 一区二区三区在线观看视频 | 粉色午夜视频 | 毛片1| 九九热最新地址 | 国产精品中文字幕在线播放 | 亚洲一区二区 | 国产女人与拘做受免费视频 | 成人在线视频一区 | 午夜精品网站 | 亚洲在线 | 亚洲国产一 | 国产精品自拍av | 伊人亚洲| 五月婷婷激情网 | 国产在线一区二区三区 | 日韩一级精品视频在线观看 | 在线视频一区二区三区 | 色综合久久天天综合网 | 久久综合伊人 | 亚洲成在线观看 | av在线一区二区 | av国产精品 | 亚洲国产精品久久久 | 成人福利在线观看 | 国内精品视频在线观看 | 亚洲国产高清在线 | 日本激情视频在线播放 | 日韩在线观看中文字幕 | 97avcc| 中文字幕亚洲区 | 欧美一区二区三区一在线观看 | 国产麻豆一区二区三区 | 九七午夜剧场福利写真 | 一区天堂 | 秋霞电影一区二区 | 久久成人精品一区二区三区 | 中文在线一区二区 | 国产免费一区 |