問題描述
我正在使用 Dataflow SDK 2.X Java API (Apache Beam SDK) 將數據寫入 mysql.我已經基于 Apache Beam SDK 文檔 使用數據流將數據寫入 mysql.它一次插入單行,因為我需要實現批量插入.我在官方文檔中找不到任何啟用批量插入模式的選項.
I'm using Dataflow SDK 2.X Java API ( Apache Beam SDK) to write data into mysql. I've created pipelines based on Apache Beam SDK documentation to write data into mysql using dataflow. It inserts single row at a time where as I need to implement bulk insert. I do not find any option in official documentation to enable bulk inset mode.
想知道是否可以在數據流管道中設置批量插入模式?如果是,請讓我知道我需要在下面的代碼中更改什么.
Wondering, if it's possible to set bulk insert mode in dataflow pipeline? If yes, please let me know what I need to change in below code.
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query) {
query.setInt(1, kv.getKey());
query.setString(2, kv.getValue());
}
})
推薦答案
EDIT 2018-01-27:
事實證明,這個問題與 DirectRunner 有關.如果您使用 DataflowRunner 運行相同的管道,您應該獲得實際上多達 1,000 條記錄的批次.DirectRunner 總是在分組操作后創建大小為 1 的包.
It turns out that this issue is related to the DirectRunner. If you run the same pipeline using the DataflowRunner, you should get batches that are actually up to 1,000 records. The DirectRunner always creates bundles of size 1 after a grouping operation.
原答案:
我在使用 Apache Beam 的 JdbcIO 寫入云數據庫時遇到了同樣的問題.問題是,雖然 JdbcIO 確實支持批量寫入多達 1,000 條記錄,但我從未真正見過它一次寫入超過 1 行(我不得不承認:這總是在開發環境中使用 DirectRunner).
I've run into the same problem when writing to cloud databases using Apache Beam's JdbcIO. The problem is that while JdbcIO does support writing up to 1,000 records in one batch, in I have never actually seen it write more than 1 row at a time (I have to admit: This was always using the DirectRunner in a development environment).
因此,我在 JdbcIO 中添加了一個功能,您可以通過將數據分組在一起并將每個組寫為一個批次來自己控制批次的大小.下面是基于 Apache Beam 原始 WordCount 示例的如何使用此功能的示例.
I have therefore added a feature to JdbcIO where you can control the size of the batches yourself by grouping your data together and writing each group as one batch. Below is an example of how to use this feature based on the original WordCount example of Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
與 JdbcIO 的普通寫入方法的不同之處在于新方法 writeIterable()
將 PCollection
作為輸入而不是 <代碼>PCollection
The difference with the normal write-method of JdbcIO is the new method writeIterable()
that takes a PCollection<Iterable<RowT>>
as input instead of PCollection<RowT>
. Each Iterable is written as one batch to the database.
可以在此處找到具有此附加功能的 JdbcIO 版本:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
The version of JdbcIO with this addition can be found here: https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
可以在此處找到包含上述示例的整個示例項目:https://github.com/olavloite/spanner-beam-example
The entire example project containing the example above can be found here: https://github.com/olavloite/spanner-beam-example
(Apache Beam 上還有一個拉取請求未決,以將其包含在項目中)
(There is also a pull request pending on Apache Beam to include this in the project)
這篇關于Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 數據庫的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!