久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

使用 RabbitMQ 源的 Spark 結(jié)構(gòu)化流

Spark Structured Streaming with RabbitMQ source(使用 RabbitMQ 源的 Spark 結(jié)構(gòu)化流)
本文介紹了使用 RabbitMQ 源的 Spark 結(jié)構(gòu)化流的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

限時(shí)送ChatGPT賬號(hào)..

我正在嘗試為 Structured Streaming 編寫(xiě)一個(gè)自定義接收器,它將使用來(lái)自 RabbitMQ 的消息.Spark 最近發(fā)布 DataSource V2 API,看起來(lái)很有前途.由于它抽象了許多細(xì)節(jié),我想使用這個(gè) API 以既簡(jiǎn)單又性能好.但是,由于它很新,因此可用的資源并不多.我需要經(jīng)驗(yàn)豐富的 Spark 人員的說(shuō)明,因?yàn)樗麄儠?huì)更容易掌握關(guān)鍵點(diǎn).我們開(kāi)始:

I am trying to write a custom receiver for Structured Streaming that will consume messages from RabbitMQ. Spark recently released DataSource V2 API, which seems very promising. Since it abstracts away many details, I want to use this API for the sake of both simplicity and performance. However, since it's quite new, there are not many sources available. I need some clarification from experienced Spark guys, since they will grasp the key points easier. Here we go:

我的起點(diǎn)是博客文章系列,第一部分 這里.它展示了如何在沒(méi)有流式傳輸功能的情況下實(shí)現(xiàn)數(shù)據(jù)源.為了制作流媒體源,我稍微改變了它們,因?yàn)槲倚枰獙?shí)現(xiàn) MicroBatchReadSupport 代替(或補(bǔ)充)DataSourceV2.

My starting point is the blog post series, with the first part here. It shows how to implement a data source, without streaming capability. To make a streaming source, I slightly changed them, since I need to implement MicroBatchReadSupport instead of (or in addition to) DataSourceV2.

為了提高效率,明智的做法是讓多個(gè) spark 執(zhí)行器同時(shí)使用 RabbitMQ,即來(lái)自同一個(gè)隊(duì)列.如果我不感到困惑,輸入的每個(gè)分區(qū) - 在 Spark 的術(shù)語(yǔ)中 - 對(duì)應(yīng)于隊(duì)列中的消費(fèi)者 - 在 RabbitMQ 術(shù)語(yǔ)中.因此,我們需要為輸入流設(shè)置多個(gè)分區(qū),對(duì)吧?

To be efficient, it's wise to have multiple spark executors consuming RabbitMQ concurrently, i.e. from the same queue. If I'm not confused, every partition of the input -in Spark's terminology- corresponds to a consumer from the queue -in RabbitMQ terminology. Thus, we need to have multiple partitions for the input stream, right?

與 該系列的第 4 部分類似,我實(shí)現(xiàn)了 MicroBatchReader如下:

Similar with part 4 of the series, I implemented MicroBatchReader as follows:

@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
    int partition = options.getInt(RMQ.PARTITICN, 5);
    List<DataReaderFactory<Row>> factories = new LinkedList<>();
    for (int i = 0; i < partition; i++) {
        factories.add(new RMQDataReaderFactory(options));
    }
    return factories;
}

我正在返回一個(gè)工廠列表,并希望列表中的每個(gè)實(shí)例都將用于創(chuàng)建一個(gè)讀取器,該讀取器也是一個(gè)消費(fèi)者.這種方法正確嗎?

I am returning a list of factories, and hope that every instance in the list will be used to create a reader, which will be also a consumer. Is that approach correct?

我希望我的接收器是可靠的,即在每條處理過(guò)的消息之后(或至少寫(xiě)入檢查點(diǎn)目錄以進(jìn)行進(jìn)一步處理),我需要將其返回給 RabbitMQ.問(wèn)題從這里開(kāi)始:這些工廠是在驅(qū)動(dòng)程序中創(chuàng)建的,實(shí)際的讀取過(guò)程通過(guò) DataReaders.但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader.由于每個(gè) MicroBatchReader 有許多 DataReader,我應(yīng)該如何將這些消息回傳給 RabbitMQ?或者我應(yīng)該確認(rèn) next 方法在 DataReader 上被調(diào)用?安全嗎?如果是這樣,那么 commit 函數(shù)的目的是什么?

I want my reciever to be reliable, i.e. after every processed message (or at least written to chekpoint directory for further processing), I need to ack it back to RabbitMQ. The problem starts after here: these factories are created at the driver, and the actual reading process takes place at executors through DataReaders. However, the commit method is a part of MicroBatchReader, not DataReader. Since I have many DataReaders per MicroBatchReader, how should I ack these messages back to RabbitMQ? Or should I ack when the next method is called on DataReader? Is it safe? If so, what is the purpose of commit function then?

澄清: OBFUSCATION:答案中提供的有關(guān)重命名某些類/函數(shù)的鏈接(除了那里的解釋)讓一切更加清晰 比以往任何時(shí)候都更糟.引用 那里:

CLARIFICATION: OBFUSCATION: The link provided in the answer about the renaming of some classes/functions (in addition to the explanations there) made everything much more clear worse than ever. Quoting from there:

重命名:

  • DataReaderFactoryInputPartition

DataReaderInputPartitionReader

...

InputPartition 的目的是管理關(guān)聯(lián)的閱讀器,現(xiàn)在稱為 InputPartitionReader,帶有顯式創(chuàng)建操作以鏡像關(guān)閉操作.這是沒(méi)有從 API 中清除的時(shí)間更長(zhǎng),因?yàn)?DataReaderFactory 似乎更多比它更通用,并且不清楚為什么要生產(chǎn)一組它們閱讀.

InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read.

但是,docs 明確表示讀取器工廠將被序列化并發(fā)送到執(zhí)行器,然后將在執(zhí)行器上創(chuàng)建數(shù)據(jù)讀取器并進(jìn)行實(shí)際讀取."

However, the docs clearly say that "the reader factory will be serialized and sent to executors, then the data reader will be created on executors and do the actual reading."

為了使消費(fèi)者可靠,我必須僅在特定消息在 Spark 端提交后才對(duì)它進(jìn)行 ACK.請(qǐng)注意消息必須在傳遞消息的同一連接上進(jìn)行確認(rèn),但在驅(qū)動(dòng)程序節(jié)點(diǎn)調(diào)用提交函數(shù).如何在 worker/executor 節(jié)點(diǎn)上提交?

To make the consumer reliable, I have to ACK for a particular message only after it is committed at Spark side. Note that the messages have to be ACKed on the same connection that it has been delivered through, but commit function is called at driver node. How can I commit at the worker/executor node?

推薦答案

> 我正在返回一個(gè)工廠列表,并希望列表中的每個(gè)實(shí)例都用于創(chuàng)建一個(gè)讀取器,它也是一個(gè)消費(fèi)者.這種方法正確嗎?源 [socket][1] 源實(shí)現(xiàn)有一個(gè)線程將消息推送到內(nèi)部 ListBuffer.換句話說(shuō),有一個(gè)消費(fèi)者(線程)填充了內(nèi)部 ListBuffer,它**然后**被`planInputPartitions`劃分為多個(gè)分區(qū)(`createDataReaderFactories` [renamed][2] 到 `planInputPartitions`).此外,根據(jù) [MicroBatchReadSupport][3] 的 Javadoc> 執(zhí)行引擎將在流式查詢開(kāi)始時(shí)創(chuàng)建一個(gè)微批處理讀取器,為每個(gè)要處理的批處理交替調(diào)用 setOffsetRange 和 createDataReaderFactories,然后在執(zhí)行完成時(shí)調(diào)用 stop().請(qǐng)注意,由于重新啟動(dòng)或故障恢復(fù),單個(gè)查詢可能會(huì)執(zhí)行多次.換句話說(shuō),`createDataReaderFactories` 應(yīng)該被調(diào)用 **multiple** 次,據(jù)我了解,這表明每個(gè) `DataReader` 負(fù)責(zé)一個(gè)靜態(tài)輸入分區(qū),這意味著 DataReader 不應(yīng)該是消費(fèi)者.----------> 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ... 如果是這樣,那么 commit 函數(shù)的目的是什么?提交函數(shù)的部分基本原理可能是防止 MicroBatchReader 的內(nèi)部緩沖區(qū)變大.通過(guò)提交偏移量,您可以有效地從緩沖區(qū)中刪除小于偏移量的元素,因?yàn)槟兄Z不再處理它們.您可以使用 `batches.trimStart(offsetDiff)` 在套接字源代碼中看到這種情況<小時(shí)><刪除>我不確定是否要實(shí)現(xiàn)一個(gè)可靠的接收器,所以我希望一個(gè)更有經(jīng)驗(yàn)的 Spark 人能過(guò)來(lái)解決你的問(wèn)題,因?yàn)槲乙灿信d趣!希望這可以幫助!

編輯

我只研究了 socket 和 wiki-edit 來(lái)源.這些資源還沒(méi)有準(zhǔn)備好生產(chǎn),這是問(wèn)題所在.相反, kafka 源是更好的起點(diǎn),與前面提到的源不同,它有多個(gè)像作者一樣的消費(fèi)者正在尋找.

I had only studied the socket, and wiki-edit sources. These sources are not production ready, which is something that the question was was not looking for. Instead, the kafka source is the better starting point which has, unlike the aforementioned sources, multiple consumers like the author was looking for.

但是,也許如果您正在尋找不可靠的來(lái)源,上面的套接字和 wikiedit 來(lái)源提供了一個(gè)不太復(fù)雜的解決方案.

However, perhaps if you're looking for unreliable sources, the socket and wikiedit sources above provide a less complicated solution.

這篇關(guān)于使用 RabbitMQ 源的 Spark 結(jié)構(gòu)化流的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!

【網(wǎng)站聲明】本站部分內(nèi)容來(lái)源于互聯(lián)網(wǎng),旨在幫助大家更快的解決問(wèn)題,如果有圖片或者內(nèi)容侵犯了您的權(quán)益,請(qǐng)聯(lián)系我們刪除處理,感謝您的支持!

相關(guān)文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時(shí)間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉(zhuǎn)換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉(zhuǎn)換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當(dāng)前星期幾的值)
主站蜘蛛池模板: 岛国一区| 国产剧情一区 | 天堂一区二区三区 | 欧美视频三级 | 亚洲欧美视频 | 欧美一区二区三区国产 | 成人欧美一区二区三区黑人孕妇 | 一区二区三区四区视频 | 日韩免费一区二区 | 九色www| 99热播精品 | 亚洲国产精品久久久久秋霞不卡 | 欧美精品一区在线 | 蜜桃视频在线观看免费视频网站www | 久久精品 | 2019中文字幕视频 | 不卡的av在线 | 中文字幕在线看第二 | 黑人一级黄色大片 | 久久国产高清 | 欧美视频区| 奇米影视首页 | 999国产视频 | 久久久久国产一区二区三区四区 | 密桃av| 国产成人免费视频网站高清观看视频 | 日本在线视 | 午夜影视免费片在线观看 | 超碰在线人人干 | 午夜精品一区二区三区在线观看 | 亚洲精品一区二区在线 | 黄色av一区| 国产精品a久久久久 | 欧美videosex性极品hd | 免费成人高清在线视频 | 日韩中出 | 成人h动漫精品一区二区器材 | 日本三级电影免费观看 | 91在线视频网址 | 久久国产高清 | 亚洲激情在线观看 |