問題描述
從如果不是很酷"的問題類別...
From the wouldn't-it-be-cool-if category of questions ...
我所說的類似隊列的東西"是指支持以下操作:
By "queue-like-thing" I mean supports the following operations:
- append(entry:Entry) - 將條目添加到隊列尾部
- take(): Entry - 從隊列頭部移除條目并返回
- promote(entry_id) - 將條目移動到靠近頭部的位置;當前占據該位置的條目被移動到舊位置
- demote(entry_id) - 與promote(entry_id)相反
可選操作類似于:
- promote(entry_id, amount) - 與promote(entry_id) 類似,但您指定職位數量
- demote(entry_id, amount) - 與promote(entry_id, amount)相反
- 當然,如果我們允許數量為正數或負數,我們可以將提升/降級方法與單個 move(entry_id, amount) 方法合并
如果可以在隊列上以分布式方式執行以下操作(多個客戶端與隊列交互),那將是理想的:
It would be ideal if the following operations could be performed on the queue in a distributed fashion (multiple clients interacting with the queue):
queue = ...
queue.append( a )
queue.append( b )
queue.append( c )
print queue
"a b c"
queue.promote( b.id )
print queue
"b a c"
queue.demote( a.id )
"b c a"
x = queue.take()
print x
"b"
print queue
"c a"
是否有任何數據存儲特別適合此用例?即使多個用戶同時修改隊列,隊列也應始終處于一致狀態.
Are there any data stores that are particularly apt for this use case? The queue should always be in a consistent state even if multiple users are modifying the queue simultaneously.
如果沒有升級/降級/移動的要求,不會有太大問題.
If it weren't for the promote/demote/move requirement, there wouldn't be much of a problem.
如果有 Java 和/或 Python 庫來完成上述任務,則可以加分.
Bonus points if there are Java and/or Python libraries to accomplish the task outlined above.
解決方案的擴展性應該非常好.
Solution should scale extremely well.
推薦答案
Python: "Batteries Included"
我認為 Python 和一些庫足以解決這個問題,而不是像 RabbitMQ、Redis 或 RDBMS 這樣的數據存儲.有些人可能會抱怨這種自己動手的方法是在重新發明輪子,但我更喜歡運行 100 行 Python 代碼而不是管理另一個數據存儲.
Python: "Batteries Included"
Rather than looking to a data store like RabbitMQ, Redis, or an RDBMS, I think python and a couple libraries have more than enough to solve this problem. Some may complain that this do-it-yourself approach is re-inventing the wheel but I prefer running a hundred lines of python code over managing another data store.
您定義的操作:追加、獲取、提升和降級,描述了一個優先級隊列.不幸的是,python 沒有內置的優先級隊列數據類型.但它確實有一個名為 heapq 的堆庫,優先級隊列通常以堆的形式實現.這是我實現的滿足您要求的優先級隊列:
The operations that you define: append, take, promote, and demote, describe a priority queue. Unfortunately python doesn't have a built-in priority queue data type. But it does have a heap library called heapq and priority queues are often implemented as heaps. Here's my implementation of a priority queue meeting your requirements:
class PQueue:
"""
Implements a priority queue with append, take, promote, and demote
operations.
"""
def __init__(self):
"""
Initialize empty priority queue.
self.toll is max(priority) and max(rowid) in the queue
self.heap is the heap maintained for take command
self.rows is a mapping from rowid to items
self.pris is a mapping from priority to items
"""
self.toll = 0
self.heap = list()
self.rows = dict()
self.pris = dict()
def append(self, value):
"""
Append value to our priority queue.
The new value is added with lowest priority as an item. Items are
threeple lists consisting of [priority, rowid, value]. The rowid
is used by the promote/demote commands.
Returns the new rowid corresponding to the new item.
"""
self.toll += 1
item = [self.toll, self.toll, value]
self.heap.append(item)
self.rows[self.toll] = item
self.pris[self.toll] = item
return self.toll
def take(self):
"""
Take the highest priority item out of the queue.
Returns the value of the item.
"""
item = heapq.heappop(self.heap)
del self.pris[item[0]]
del self.rows[item[1]]
return item[2]
def promote(self, rowid):
"""
Promote an item in the queue.
The promoted item swaps position with the next highest item.
Returns the number of affected rows.
"""
if rowid not in self.rows: return 0
item = self.rows[rowid]
item_pri, item_row, item_val = item
next = item_pri - 1
if next in self.pris:
iota = self.pris[next]
iota_pri, iota_row, iota_val = iota
iota[1], iota[2] = item_row, item_val
item[1], item[2] = iota_row, iota_val
self.rows[item_row] = iota
self.rows[iota_row] = item
return 2
return 0
降級命令與提升命令幾乎相同,因此為簡潔起見,我將省略它.請注意,這僅取決于 python 的列表、字典和 heapq 庫.
The demote command is nearly identical to the promote command so I'll omit it for brevity. Note that this depends only on python's lists, dicts, and heapq library.
現在使用 PQueue 數據類型,我們希望允許與實例進行分布式交互.gevent 是一個很好的庫.盡管 gevent 相對較新且仍處于測試階段,但它的速度非常快且經過良好測試.使用 gevent,我們可以很容易地設置一個監聽 localhost:4040 的套接字服務器.這是我的服務器代碼:
Now with the PQueue data type, we'd like to allow distributed interactions with an instance. A great library for this is gevent. Though gevent is relatively new and still beta, it's wonderfully fast and well tested. With gevent, we can setup a socket server listening on localhost:4040 pretty easily. Here's my server code:
pqueue = PQueue()
def pqueue_server(sock, addr):
text = sock.recv(1024)
cmds = text.split(' ')
if cmds[0] == 'append':
result = pqueue.append(cmds[1])
elif cmds[0] == 'take':
result = pqueue.take()
elif cmds[0] == 'promote':
result = pqueue.promote(int(cmds[1]))
elif cmds[0] == 'demote':
result = pqueue.demote(int(cmds[1]))
else:
result = ''
sock.sendall(str(result))
print 'Request:', text, '; Response:', str(result)
if args.listen:
server = StreamServer(('127.0.0.1', 4040), pqueue_server)
print 'Starting pqueue server on port 4040...'
server.serve_forever()
在生產中運行之前,您當然希望做一些更好的錯誤/緩沖區處理.但它適用于快速原型制作.請注意,這不需要圍繞 pqueue 對象進行任何鎖定.Gevent 實際上并沒有并行運行代碼,它只是給人一種印象.缺點是更多的內核無濟于事,但好處是無鎖代碼.
Before that runs in production, you'll of course want to do some better error/buffer handling. But it'll work just fine for rapid-prototyping. Notice that this doesn't require any locking around the pqueue object. Gevent doesn't actually run code in parallel, it just gives that impression. The drawback is that more cores won't help but the benefit is lock-free code.
不要誤會,gevent SocketServer 會同時處理多個請求.但它通過協作多任務處理在響應請求之間切換.這意味著您必須讓出協程的時間片.雖然 gevents 套接字 I/O 函數旨在讓出,但我們的 pqueue 實現卻不是.幸運的是,pqueue 非常快地完成了它的任務.
Don't get me wrong, the gevent SocketServer will process multiple requests at the same time. But it switches between answering requests through cooperative multitasking. This means you have to yield the coroutine's time slice. While gevents socket I/O functions are designed to yield, our pqueue implementation is not. Fortunately, the pqueue completes it's tasks really quickly.
在進行原型設計時,我發現擁有一個客戶也很有用.編寫客戶端需要一些谷歌搜索,所以我也會分享該代碼:
While prototyping, I found it useful to have a client as well. It took some googling to write a client so I'll share that code too:
if args.client:
while True:
msg = raw_input('> ')
sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
sock.connect(('127.0.0.1', 4040))
sock.sendall(msg)
text = sock.recv(1024)
sock.close()
print text
要使用新的數據存儲,首先啟動服務器,然后啟動客戶端.在客戶提示下,您應該能夠做到:
To use the new data store, first start the server and then start the client. At the client prompt you ought to be able to do:
> append one
1
> append two
2
> append three
3
> promote 2
2
> promote 2
0
> take
two
擴展性非常好
考慮到您對數據存儲的看法,您似乎真的很關心吞吐量和持久性.但是規模非常好"并不能量化您的需求.所以我決定用一個測試功能對上面的內容進行基準測試.下面是測試函數:
Scaling Extremely Well
Given your thinking about a data store, it seems you're really concerned with throughput and durability. But "scale extremely well" doesn't quantify your needs. So I decided to benchmark the above with a test function. Here's the test function:
def test():
import time
import urllib2
import subprocess
import random
random = random.Random(0)
from progressbar import ProgressBar, Percentage, Bar, ETA
widgets = [Percentage(), Bar(), ETA()]
def make_name():
alphabet = 'abcdefghijklmnopqrstuvwxyz'
return ''.join(random.choice(alphabet)
for rpt in xrange(random.randrange(3, 20)))
def make_request(cmds):
sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
sock.connect(('127.0.0.1', 4040))
sock.sendall(cmds)
text = sock.recv(1024)
sock.close()
print 'Starting server and waiting 3 seconds.'
subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l',
shell=True)
time.sleep(3)
tests = []
def wrap_test(name, limit=10000):
def wrap(func):
def wrapped():
progress = ProgressBar(widgets=widgets)
for rpt in progress(xrange(limit)):
func()
secs = progress.seconds_elapsed
print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format(
name, limit, secs, limit / secs)
tests.append(wrapped)
return wrapped
return wrap
def direct_append():
name = make_name()
pqueue.append(name)
count = 1000000
@wrap_test('Loaded', count)
def direct_append_test(): direct_append()
def append():
name = make_name()
make_request('append ' + name)
@wrap_test('Appended')
def append_test(): append()
...
print 'Running speed tests.'
for tst in tests: tst()
基準測試結果
我對筆記本電腦上運行的服務器進行了 6 次測試.我認為結果非常好.這是輸出:
Benchmark Results
I ran 6 tests against the server running on my laptop. I think the results scale extremely well. Here's the output:
Starting server and waiting 3 seconds.
Running speed tests.
100%|############################################################|Time: 0:00:21
Loaded 1000000 records in 21.770 s at 45934.773 r/s
100%|############################################################|Time: 0:00:06
Appended 10000 records in 6.825 s at 1465.201 r/s
100%|############################################################|Time: 0:00:06
Promoted 10000 records in 6.270 s at 1594.896 r/s
100%|############################################################|Time: 0:00:05
Demoted 10000 records in 5.686 s at 1758.706 r/s
100%|############################################################|Time: 0:00:05
Took 10000 records in 5.950 s at 1680.672 r/s
100%|############################################################|Time: 0:00:07
Mixed load processed 10000 records in 7.410 s at 1349.528 r/s
最終邊界:持久性
最后,耐用性是我沒有完全原型化的唯一問題.但我也不認為這有那么難.在我們的優先級隊列中,項目的堆(列表)包含我們需要將數據類型保存到磁盤的所有信息.由于使用 gevent,我們還可以以多處理方式生成函數,我想象使用這樣的函數:
Final Frontier: Durability
Finally, durability is the only problem I didn't completely prototype. But I don't think it's that hard either. In our priority queue, the heap (list) of items has all the information we need to persist the data type to disk. Since, with gevent, we can also spawn functions in a multi-processing way, I imagined using a function like this:
def save_heap(heap, toll):
name = 'heap-{0}.txt'.format(toll)
with open(name, 'w') as temp:
for val in heap:
temp.write(str(val))
gevent.sleep(0)
并將保存功能添加到我們的優先級隊列中:
and adding a save function to our priority queue:
def save(self):
heap_copy = tuple(self.heap)
toll = self.toll
gevent.spawn(save_heap, heap_copy, toll)
您現在可以復制 Redis 模型的分叉并將數據存儲寫入磁盤每隔幾分鐘.如果您需要更高的耐用性,請將上述內容與將命令記錄到磁盤的系統相結合.這些是 Redis 使用的 AFP 和 RDB 持久化方法.
You could now copy the Redis model of forking and writing the data store to disk every few minutes. If you need even greater durability then couple the above with a system that logs commands to disk. Together, those are the AFP and RDB persistence methods that Redis uses.
這篇關于如何在 RBDMS 或 NOSQL 數據存儲或其他消息系統(例如,rabbitmq)之上實現分布式隊列類事物?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!