SQLite3 queue uses SQLAlchemy now

pull/108/head
Martin Zimmermann 10 years ago
parent a0a2662cc9
commit e706fabb26

@ -5,6 +5,10 @@ from __future__ import unicode_literals
import json import json
import time import time
from sqlalchemy import Table, Column, MetaData, create_engine
from sqlalchemy import Integer, Float, String, LargeBinary
from sqlalchemy.sql import select, func
from . import Queue, Full, Empty, Message from . import Queue, Full, Empty, Message
@ -13,46 +17,51 @@ unpickle = lambda val: json.loads(val.decode("utf-8"))
class SQLite3Queue(Queue): class SQLite3Queue(Queue):
"""Implements a shared queue using SQLite3. """Implements a shared queue using SQLAlchemy Core
:param connection: SQLite3 connection
""" """
def __init__(self, connection, maxlen=-1, timeout=2**10): def __init__(self, db, maxlen=-1, timeout=2**10):
super(SQLite3Queue, self).__init__(maxlen, timeout) super(SQLite3Queue, self).__init__(maxlen, timeout)
self.connection = connection self.metadata = MetaData()
self.connection.execute( self.engine = create_engine(db)
'CREATE TABLE IF NOT EXISTS queue (' self.queue = Table("queue", self.metadata,
' id INTEGER PRIMARY KEY AUTOINCREMENT,' Column("id", Integer, primary_key=True),
' type TEXT,' Column("type", String(16)),
' data BLOB,' Column("data", LargeBinary(65535)),
' timestamp FLOAT,' Column("timestamp", Float),
' wait FLOAT)') Column("wait", Float))
self.metadata.create_all(self.engine)
def put(self, item): def put(self, item):
with self.connection.transaction as con: with self.engine.begin() as con:
count = con.execute('SELECT COUNT(*) FROM queue').fetchone()[0] + 1 count = self._size(con) + 1
if -1 < self.maxlen < count: if -1 < self.maxlen < count:
raise Full raise Full
con.execute( con.execute(self.queue.insert().values(
'INSERT INTO queue (type, data, timestamp, wait) VALUES (?, ?, ?, ?)', type=item.type, data=pickle(item.data),
(item.type, pickle(item.data), item.timestamp, item.wait)) timestamp=item.timestamp, wait=item.wait))
def get(self): def get(self):
with self.connection.transaction as con: with self.engine.begin() as con:
row = con.execute( obj = con.execute(
'SELECT id, type, data FROM queue ' select([self.queue.c.id, self.queue.c.type, self.queue.c.data])
'WHERE (? > timestamp + wait) ORDER BY timestamp LIMIT 1', .where(time.time() > self.queue.c.timestamp + self.queue.c.wait)
(time.time(), )).fetchone() .order_by(self.queue.c.timestamp)
if not row: .limit(1)).fetchone()
if not obj:
raise Empty raise Empty
id, type, data = row _id, _type, data = obj
con.execute('DELETE FROM queue WHERE id = ?', (str(id), )) con.execute(self.queue.delete(self.queue.c.id == _id))
return Message(type, unpickle(data))
return Message(_type, unpickle(data))
def _size(self, con):
return con.execute(select([func.count(self.queue)])).fetchone()[0]
@property @property
def size(self): def size(self):
with self.connection.transaction as con: return self._size(self.engine.connect())
return con.execute('SELECT COUNT(*) FROM queue').fetchone()[0]

@ -100,4 +100,4 @@ class TestQueue(unittest.TestCase):
class TestSQLite3Queue(TestQueue): class TestSQLite3Queue(TestQueue):
def setUp(self): def setUp(self):
self.cls = lambda *x, **z: SQLite3Queue(SQLite3(":memory:"), *x, **z) self.cls = lambda *x, **z: SQLite3Queue("sqlite:///:memory:", *x, **z)

Loading…
Cancel
Save