久久久久久久av_日韩在线中文_看一级毛片视频_日本精品二区_成人深夜福利视频_武道仙尊动漫在线观看

如何在 RBDMS 或 NOSQL 數據存儲或其他消息系統(例

How could a distributed queue-like-thing be implemented on top of a RBDMS or NOSQL datastore or other messaging system (e.g., rabbitmq)?(如何在 RBDMS 或 NOSQL 數據存儲或其他消息系統(例如,rabbitmq)之上實現分布式隊列類
本文介紹了如何在 RBDMS 或 NOSQL 數據存儲或其他消息系統(例如,rabbitmq)之上實現分布式隊列類事物?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!

問題描述

限時送ChatGPT賬號..

從如果不是很酷"的問題類別...

From the wouldn't-it-be-cool-if category of questions ...

我所說的類似隊列的東西"是指支持以下操作:

By "queue-like-thing" I mean supports the following operations:

  • append(entry:Entry) - 將條目添加到隊列尾部
  • take(): Entry - 從隊列頭部移除條目并返回
  • promote(entry_id) - 將條目移動到靠近頭部的位置;當前占據該位置的條目被移動到舊位置
  • demote(entry_id) - 與promote(entry_id)相反

可選操作類似于:

  • promote(entry_id, amount) - 與promote(entry_id) 類似,但您指定職位數量
  • demote(entry_id, amount) - 與promote(entry_id, amount)相反
  • 當然,如果我們允許數量為正數或負數,我們可以將提升/降級方法與單個 move(entry_id, amount) 方法合并

如果可以在隊列上以分布式方式執行以下操作(多個客戶端與隊列交互),那將是理想的:

It would be ideal if the following operations could be performed on the queue in a distributed fashion (multiple clients interacting with the queue):

queue = ...

queue.append( a )
queue.append( b )
queue.append( c )

print queue
"a b c"

queue.promote( b.id )
print queue
"b a c"

queue.demote( a.id )
"b c a"

x = queue.take()
print x
"b"
print queue
"c a"

是否有任何數據存儲特別適合此用例?即使多個用戶同時修改隊列,隊列也應始終處于一致狀態.

Are there any data stores that are particularly apt for this use case? The queue should always be in a consistent state even if multiple users are modifying the queue simultaneously.

如果沒有升級/降級/移動的要求,不會有太大問題.

If it weren't for the promote/demote/move requirement, there wouldn't be much of a problem.

如果有 Java 和/或 Python 庫來完成上述任務,則可以加分.

Bonus points if there are Java and/or Python libraries to accomplish the task outlined above.

解決方案的擴展性應該非常好.

Solution should scale extremely well.

推薦答案

Python: "Batteries Included"

我認為 Python 和一些庫足以解決這個問題,而不是像 RabbitMQ、Redis 或 RDBMS 這樣的數據存儲.有些人可能會抱怨這種自己動手的方法是在重新發明輪子,但我更喜歡運行 100 行 Python 代碼而不是管理另一個數據存儲.

Python: "Batteries Included"

Rather than looking to a data store like RabbitMQ, Redis, or an RDBMS, I think python and a couple libraries have more than enough to solve this problem. Some may complain that this do-it-yourself approach is re-inventing the wheel but I prefer running a hundred lines of python code over managing another data store.

您定義的操作:追加、獲取、提升和降級,描述了一個優先級隊列.不幸的是,python 沒有內置的優先級隊列數據類型.但它確實有一個名為 heapq 的堆庫,優先級隊列通常以堆的形式實現.這是我實現的滿足您要求的優先級隊列:

The operations that you define: append, take, promote, and demote, describe a priority queue. Unfortunately python doesn't have a built-in priority queue data type. But it does have a heap library called heapq and priority queues are often implemented as heaps. Here's my implementation of a priority queue meeting your requirements:

class PQueue:
    """
    Implements a priority queue with append, take, promote, and demote
    operations.
    """
    def __init__(self):
        """
        Initialize empty priority queue.
        self.toll is max(priority) and max(rowid) in the queue
        self.heap is the heap maintained for take command
        self.rows is a mapping from rowid to items
        self.pris is a mapping from priority to items
        """
        self.toll = 0
        self.heap = list()
        self.rows = dict()
        self.pris = dict()

    def append(self, value):
        """
        Append value to our priority queue.
        The new value is added with lowest priority as an item. Items are
        threeple lists consisting of [priority, rowid, value]. The rowid
        is used by the promote/demote commands.
        Returns the new rowid corresponding to the new item.
        """
        self.toll += 1
        item = [self.toll, self.toll, value]
        self.heap.append(item)
        self.rows[self.toll] = item
        self.pris[self.toll] = item
        return self.toll

    def take(self):
        """
        Take the highest priority item out of the queue.
        Returns the value of the item.
        """
        item = heapq.heappop(self.heap)
        del self.pris[item[0]]
        del self.rows[item[1]]
        return item[2]

    def promote(self, rowid):
        """
        Promote an item in the queue.
        The promoted item swaps position with the next highest item.
        Returns the number of affected rows.
        """
        if rowid not in self.rows: return 0
        item = self.rows[rowid]
        item_pri, item_row, item_val = item
        next = item_pri - 1
        if next in self.pris:
            iota = self.pris[next]
            iota_pri, iota_row, iota_val = iota
            iota[1], iota[2] = item_row, item_val
            item[1], item[2] = iota_row, iota_val
            self.rows[item_row] = iota
            self.rows[iota_row] = item
            return 2
        return 0

降級命令與提升命令幾乎相同,因此為簡潔起見,我將省略它.請注意,這僅取決于 python 的列表、字典和 heapq 庫.

The demote command is nearly identical to the promote command so I'll omit it for brevity. Note that this depends only on python's lists, dicts, and heapq library.

現在使用 PQueue 數據類型,我們希望允許與實例進行分布式交互.gevent 是一個很好的庫.盡管 gevent 相對較新且仍處于測試階段,但它的速度非常快且經過良好測試.使用 gevent,我們可以很容易地設置一個監聽 localhost:4040 的套接字服務器.這是我的服務器代碼:

Now with the PQueue data type, we'd like to allow distributed interactions with an instance. A great library for this is gevent. Though gevent is relatively new and still beta, it's wonderfully fast and well tested. With gevent, we can setup a socket server listening on localhost:4040 pretty easily. Here's my server code:

pqueue = PQueue()

def pqueue_server(sock, addr):
    text = sock.recv(1024)
    cmds = text.split(' ')
    if cmds[0] == 'append':
        result = pqueue.append(cmds[1])
    elif cmds[0] == 'take':
        result = pqueue.take()
    elif cmds[0] == 'promote':
        result = pqueue.promote(int(cmds[1]))
    elif cmds[0] == 'demote':
        result = pqueue.demote(int(cmds[1]))
    else:
        result = ''
    sock.sendall(str(result))
    print 'Request:', text, '; Response:', str(result)

if args.listen:
    server = StreamServer(('127.0.0.1', 4040), pqueue_server)
    print 'Starting pqueue server on port 4040...'
    server.serve_forever()

在生產中運行之前,您當然希望做一些更好的錯誤/緩沖區處理.但它適用于快速原型制作.請注意,這不需要圍繞 pqueue 對象進行任何鎖定.Gevent 實際上并沒有并行運行代碼,它只是給人一種印象.缺點是更多的內核無濟于事,但好處是無鎖代碼.

Before that runs in production, you'll of course want to do some better error/buffer handling. But it'll work just fine for rapid-prototyping. Notice that this doesn't require any locking around the pqueue object. Gevent doesn't actually run code in parallel, it just gives that impression. The drawback is that more cores won't help but the benefit is lock-free code.

不要誤會,gevent SocketServer 會同時處理多個請求.但它通過協作多任務處理在響應請求之間切換.這意味著您必須讓出協程的時間片.雖然 gevents 套接字 I/O 函數旨在讓出,但我們的 pqueue 實現卻不是.幸運的是,pqueue 非常快地完成了它的任務.

Don't get me wrong, the gevent SocketServer will process multiple requests at the same time. But it switches between answering requests through cooperative multitasking. This means you have to yield the coroutine's time slice. While gevents socket I/O functions are designed to yield, our pqueue implementation is not. Fortunately, the pqueue completes it's tasks really quickly.

在進行原型設計時,我發現擁有一個客戶也很有用.編寫客戶端需要一些谷歌搜索,所以我也會分享該代碼:

While prototyping, I found it useful to have a client as well. It took some googling to write a client so I'll share that code too:

if args.client:
    while True:
        msg = raw_input('> ')
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(msg)
        text = sock.recv(1024)
        sock.close()
        print text

要使用新的數據存儲,首先啟動服務器,然后啟動客戶端.在客戶提示下,您應該能夠做到:

To use the new data store, first start the server and then start the client. At the client prompt you ought to be able to do:

> append one
1
> append two
2
> append three
3
> promote 2
2
> promote 2
0
> take
two

擴展性非常好

考慮到您對數據存儲的看法,您似乎真的很關心吞吐量和持久性.但是規模非常好"并不能量化您的需求.所以我決定用一個測試功能對上面的內容進行基準測試.下面是測試函數:

Scaling Extremely Well

Given your thinking about a data store, it seems you're really concerned with throughput and durability. But "scale extremely well" doesn't quantify your needs. So I decided to benchmark the above with a test function. Here's the test function:

def test():
    import time
    import urllib2
    import subprocess

    import random
    random = random.Random(0)

    from progressbar import ProgressBar, Percentage, Bar, ETA
    widgets = [Percentage(), Bar(), ETA()]

    def make_name():
        alphabet = 'abcdefghijklmnopqrstuvwxyz'
        return ''.join(random.choice(alphabet)
                       for rpt in xrange(random.randrange(3, 20)))

    def make_request(cmds):
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(cmds)
        text = sock.recv(1024)
        sock.close()

    print 'Starting server and waiting 3 seconds.'
    subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l',
                    shell=True)
    time.sleep(3)

    tests = []
    def wrap_test(name, limit=10000):
        def wrap(func):
            def wrapped():
                progress = ProgressBar(widgets=widgets)
                for rpt in progress(xrange(limit)):
                    func()
                secs = progress.seconds_elapsed
                print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format(
                    name, limit, secs, limit / secs)
            tests.append(wrapped)
            return wrapped
        return wrap

    def direct_append():
        name = make_name()
        pqueue.append(name)

    count = 1000000
    @wrap_test('Loaded', count)
    def direct_append_test(): direct_append()

    def append():
        name = make_name()
        make_request('append ' + name)

    @wrap_test('Appended')
    def append_test(): append()

    ...

    print 'Running speed tests.'
    for tst in tests: tst()

基準測試結果

我對筆記本電腦上運行的服務器進行了 6 次測試.我認為結果非常好.這是輸出:

Benchmark Results

I ran 6 tests against the server running on my laptop. I think the results scale extremely well. Here's the output:

Starting server and waiting 3 seconds.
Running speed tests.
100%|############################################################|Time: 0:00:21
Loaded 1000000 records in 21.770 s at 45934.773 r/s
100%|############################################################|Time: 0:00:06
Appended 10000 records in 6.825 s at 1465.201 r/s
100%|############################################################|Time: 0:00:06
Promoted 10000 records in 6.270 s at 1594.896 r/s
100%|############################################################|Time: 0:00:05
Demoted 10000 records in 5.686 s at 1758.706 r/s
100%|############################################################|Time: 0:00:05
Took 10000 records in 5.950 s at 1680.672 r/s
100%|############################################################|Time: 0:00:07
Mixed load processed 10000 records in 7.410 s at 1349.528 r/s

最終邊界:持久性

最后,耐用性是我沒有完全原型化的唯一問題.但我也不認為這有那么難.在我們的優先級隊列中,項目的堆(列表)包含我們需要將數據類型保存到磁盤的所有信息.由于使用 gevent,我們還可以以多處理方式生成函數,我想象使用這樣的函數:

Final Frontier: Durability

Finally, durability is the only problem I didn't completely prototype. But I don't think it's that hard either. In our priority queue, the heap (list) of items has all the information we need to persist the data type to disk. Since, with gevent, we can also spawn functions in a multi-processing way, I imagined using a function like this:

def save_heap(heap, toll):
    name = 'heap-{0}.txt'.format(toll)
    with open(name, 'w') as temp:
        for val in heap:
            temp.write(str(val))
            gevent.sleep(0)

并將保存功能添加到我們的優先級隊列中:

and adding a save function to our priority queue:

def save(self):
    heap_copy = tuple(self.heap)
    toll = self.toll
    gevent.spawn(save_heap, heap_copy, toll)

您現在可以復制 Redis 模型的分叉并將數據存儲寫入磁盤每隔幾分鐘.如果您需要更高的耐用性,請將上述內容與將命令記錄到磁盤的系統相結合.這些是 Redis 使用的 AFP 和 RDB 持久化方法.

You could now copy the Redis model of forking and writing the data store to disk every few minutes. If you need even greater durability then couple the above with a system that logs commands to disk. Together, those are the AFP and RDB persistence methods that Redis uses.

這篇關于如何在 RBDMS 或 NOSQL 數據存儲或其他消息系統(例如,rabbitmq)之上實現分布式隊列類事物?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!

【網站聲明】本站部分內容來源于互聯網,旨在幫助大家更快的解決問題,如果有圖片或者內容侵犯了您的權益,請聯系我們刪除處理,感謝您的支持!

相關文檔推薦

Parsing an ISO 8601 string local date-time as if in UTC(解析 ISO 8601 字符串本地日期時間,就像在 UTC 中一樣)
How to convert Gregorian string to Gregorian Calendar?(如何將公歷字符串轉換為公歷?)
Java: What/where are the maximum and minimum values of a GregorianCalendar?(Java:GregorianCalendar 的最大值和最小值是什么/在哪里?)
Calendar to Date conversion for dates before 15 Oct 1582. Gregorian to Julian calendar switch(1582 年 10 月 15 日之前日期的日歷到日期轉換.公歷到儒略歷切換)
java Calendar setFirstDayOfWeek not working(java日歷setFirstDayOfWeek不起作用)
Java: getting current Day of the Week value(Java:獲取當前星期幾的值)
主站蜘蛛池模板: 一区二区三区欧美在线 | 99re视频这里只有精品 | 91精品国产综合久久久久久漫画 | 久久精品综合 | 激情六月丁香 | 亚洲国产欧美一区 | 国产成人久久精品 | 亚洲一区在线观看视频 | 欧一区| 日韩精品一区二区三区视频播放 | 一区二区免费在线观看 | 欧美激情一区二区三级高清视频 | 中文字幕在线观看第一页 | 亚洲激情一区二区三区 | 97国产一区二区精品久久呦 | 99久久婷婷国产精品综合 | 毛片网站在线观看 | 日韩精品一区二区三区四区 | 华人黄网站大全 | 日韩免费一区二区 | 一级爱爱片 | 一级黄色毛片a | 色综合视频 | 亚洲高清成人 | 99亚洲精品视频 | 亚洲欧洲精品在线 | 岛国av在线免费观看 | 日韩欧美中文字幕在线视频 | 亚洲乱码一区二区三区在线观看 | 国产精品一区二区三区久久 | av中文字幕在线观看 | 九九热最新视频 | 一区二区在线不卡 | 97高清国语自产拍 | 国产精品久久久久无码av | www.jizzjizz | 国产精品一区二区久久 | 亚洲毛片| 日本免费在线观看视频 | 精品丝袜在线 | 成人h电影在线观看 |