diff --git a/isso/queue/sqlite.py b/isso/queue/sqlite.py index 8ee97a4..7d054ad 100644 --- a/isso/queue/sqlite.py +++ b/isso/queue/sqlite.py @@ -5,6 +5,10 @@ from __future__ import unicode_literals import json import time +from sqlalchemy import Table, Column, MetaData, create_engine +from sqlalchemy import Integer, Float, String, LargeBinary +from sqlalchemy.sql import select, func + from . import Queue, Full, Empty, Message @@ -13,46 +17,51 @@ unpickle = lambda val: json.loads(val.decode("utf-8")) class SQLite3Queue(Queue): - """Implements a shared queue using SQLite3. - - :param connection: SQLite3 connection + """Implements a shared queue using SQLAlchemy Core """ - def __init__(self, connection, maxlen=-1, timeout=2**10): + def __init__(self, db, maxlen=-1, timeout=2**10): super(SQLite3Queue, self).__init__(maxlen, timeout) - self.connection = connection - self.connection.execute( - 'CREATE TABLE IF NOT EXISTS queue (' - ' id INTEGER PRIMARY KEY AUTOINCREMENT,' - ' type TEXT,' - ' data BLOB,' - ' timestamp FLOAT,' - ' wait FLOAT)') + self.metadata = MetaData() + self.engine = create_engine(db) + self.queue = Table("queue", self.metadata, + Column("id", Integer, primary_key=True), + Column("type", String(16)), + Column("data", LargeBinary(65535)), + Column("timestamp", Float), + Column("wait", Float)) + + self.metadata.create_all(self.engine) def put(self, item): - with self.connection.transaction as con: - count = con.execute('SELECT COUNT(*) FROM queue').fetchone()[0] + 1 + with self.engine.begin() as con: + count = self._size(con) + 1 if -1 < self.maxlen < count: raise Full - con.execute( - 'INSERT INTO queue (type, data, timestamp, wait) VALUES (?, ?, ?, ?)', - (item.type, pickle(item.data), item.timestamp, item.wait)) + con.execute(self.queue.insert().values( + type=item.type, data=pickle(item.data), + timestamp=item.timestamp, wait=item.wait)) def get(self): - with self.connection.transaction as con: - row = con.execute( - 'SELECT id, type, data FROM queue ' - 'WHERE (? > timestamp + wait) ORDER BY timestamp LIMIT 1', - (time.time(), )).fetchone() - if not row: + with self.engine.begin() as con: + obj = con.execute( + select([self.queue.c.id, self.queue.c.type, self.queue.c.data]) + .where(time.time() > self.queue.c.timestamp + self.queue.c.wait) + .order_by(self.queue.c.timestamp) + .limit(1)).fetchone() + + if not obj: raise Empty - id, type, data = row - con.execute('DELETE FROM queue WHERE id = ?', (str(id), )) - return Message(type, unpickle(data)) + _id, _type, data = obj + con.execute(self.queue.delete(self.queue.c.id == _id)) + + return Message(_type, unpickle(data)) + + def _size(self, con): + return con.execute(select([func.count(self.queue)])).fetchone()[0] @property def size(self): - with self.connection.transaction as con: - return con.execute('SELECT COUNT(*) FROM queue').fetchone()[0] + return self._size(self.engine.connect()) diff --git a/isso/tests/test_queue.py b/isso/tests/test_queue.py index ba2a197..7d21c48 100644 --- a/isso/tests/test_queue.py +++ b/isso/tests/test_queue.py @@ -100,4 +100,4 @@ class TestQueue(unittest.TestCase): class TestSQLite3Queue(TestQueue): def setUp(self): - self.cls = lambda *x, **z: SQLite3Queue(SQLite3(":memory:"), *x, **z) + self.cls = lambda *x, **z: SQLite3Queue("sqlite:///:memory:", *x, **z)