add SQLite3-based queue

This commit is contained in:
Martin Zimmermann 2014-07-02 18:08:12 +02:00
parent 80cbf2676f
commit d472262fee
3 changed files with 90 additions and 8 deletions

View File

@ -55,6 +55,12 @@ class Message(object):
def __le__(self, other):
return self.timestamp + self.wait <= other.timestamp + other.wait
def __eq__(self, other):
return self.type == other.type and self.data == other.data
def __repr__(self):
return "<Message {0}: {1}>".format(self.type, self.data)
class Queue(object):
"""An in-memory queue with requeuing abilities.
@ -236,5 +242,7 @@ class Jobs(dict):
self[name] = cls(*args, **kwargs)
from .sqlite import SQLite3Queue
__all__ = ["Full", "Empty", "Retry", "Timeout", "Message", "Queue", "Targets",
"Task", "Cron"]
"Task", "Cron", "SQLite3Queue"]

58
isso/queue/sqlite.py Normal file
View File

@ -0,0 +1,58 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import json
import time
from . import Queue, Full, Empty, Message
pickle = lambda val: json.dumps(val).encode("utf-8")
unpickle = lambda val: json.loads(val.decode("utf-8"))
class SQLite3Queue(Queue):
"""Implements a shared queue using SQLite3.
:param connection: SQLite3 connection
"""
def __init__(self, connection, maxlen=-1, timeout=2**10):
super(SQLite3Queue, self).__init__(maxlen, timeout)
self.connection = connection
self.connection.execute(
'CREATE TABLE IF NOT EXISTS queue ('
' id INTEGER PRIMARY KEY AUTOINCREMENT,'
' type TEXT,'
' data BLOB,'
' timestamp FLOAT,'
' wait FLOAT)')
def put(self, item):
with self.connection.transaction as con:
count = con.execute('SELECT COUNT(*) FROM queue').fetchone()[0] + 1
if -1 < self.maxlen < count:
raise Full
con.execute(
'INSERT INTO queue (type, data, timestamp, wait) VALUES (?, ?, ?, ?)',
(item.type, pickle(item.data), item.timestamp, item.wait))
def get(self):
with self.connection.transaction as con:
row = con.execute(
'SELECT id, type, data FROM queue '
'WHERE (? > timestamp + wait) ORDER BY timestamp LIMIT 1',
(time.time(), )).fetchone()
if not row:
raise Empty
id, type, data = row
con.execute('DELETE FROM queue WHERE id = ?', (str(id), ))
return Message(type, unpickle(data))
@property
def size(self):
with self.connection.transaction as con:
return con.execute('SELECT COUNT(*) FROM queue').fetchone()[0]

View File

@ -5,7 +5,8 @@ from __future__ import unicode_literals
import unittest
import datetime
from isso.queue import Message, Queue, Full, Empty, Timeout
from isso.db import SQLite3
from isso.queue import Message, Queue, Full, Empty, Timeout, SQLite3Queue
class TestMessage(unittest.TestCase):
@ -32,8 +33,11 @@ class TestMessage(unittest.TestCase):
class TestQueue(unittest.TestCase):
def setUp(self):
self.cls = Queue
def test_queue(self):
q = Queue()
q = self.cls()
msgs = [Message("Foo", None) for _ in range(3)]
for msg in msgs:
@ -46,15 +50,21 @@ class TestQueue(unittest.TestCase):
self.assertEqual(q.size, 0)
def test_data_primitives(self):
q = self.cls()
m = Message("Foo", {"foo": True, "bar": [2, 3]})
q.put(m)
self.assertEqual(q.get(), m)
def test_queue_full(self):
q = Queue(maxlen=1)
q = self.cls(maxlen=1)
q.put(Message("Foo", None))
self.assertRaises(Full, q.put, Message("Bar", None))
def test_queue_empty(self):
q = Queue()
q = self.cls()
msg = Message("Foo", None)
self.assertRaises(Empty, q.get)
@ -63,7 +73,7 @@ class TestQueue(unittest.TestCase):
self.assertRaises(Empty, q.get)
def test_retry(self):
q = Queue()
q = self.cls()
msg = Message("Foo", None)
q.retry(msg)
@ -71,13 +81,13 @@ class TestQueue(unittest.TestCase):
self.assertEqual(q.size, 1)
def test_retry_timeout(self):
q = Queue(timeout=0)
q = self.cls(timeout=0)
msg = Message("Foo", None)
self.assertRaises(Timeout, q.retry, msg)
def test_requeue(self):
q = Queue()
q = self.cls()
msg = Message("Foo", None)
q.put(msg)
@ -85,3 +95,9 @@ class TestQueue(unittest.TestCase):
self.assertRaises(Empty, q.get)
self.assertEqual(q.size, 1)
class TestSQLite3Queue(TestQueue):
def setUp(self):
self.cls = lambda *x, **z: SQLite3Queue(SQLite3(":memory:"), *x, **z)