From 10960ecf1e91947ee3e132773e44f4dab217050f Mon Sep 17 00:00:00 2001 From: Martin Zimmermann Date: Sat, 28 Jun 2014 12:39:29 +0200 Subject: [PATCH] add in-memory queue, worker and task skeleton --- isso/queue/__init__.py | 186 +++++++++++++++++++++++++++++++++++++++ isso/tests/test_queue.py | 91 +++++++++++++++++++ 2 files changed, 277 insertions(+) create mode 100644 isso/queue/__init__.py create mode 100644 isso/tests/test_queue.py diff --git a/isso/queue/__init__.py b/isso/queue/__init__.py new file mode 100644 index 0000000..a3dbb2f --- /dev/null +++ b/isso/queue/__init__.py @@ -0,0 +1,186 @@ +# -*- encoding: utf-8 -*- + +from __future__ import unicode_literals, division + +import abc +import logging +import threading + +import math +import bisect +import functools + +import time + +try: + import queue +except ImportError: + import Queue as queue + +from isso.utils import total_seconds + +logger = logging.getLogger("isso") + +Full = queue.Full +Empty = queue.Empty + + +class Retry(Exception): + pass + + +class Timeout(Exception): + pass + + +@functools.total_ordering +class Message(object): + """Queue payload sortable by time. + + :param type: task type + :param data: task payload + :param delay: initial delay before the job gets executed + :param wait: subsequent delays for retrying + """ + + def __init__(self, type, data, delay=0, wait=0): + self.type = type + self.data = data + + self.wait = wait + self.timestamp = time.time() + delay + + def __le__(self, other): + return self.timestamp + self.wait <= other.timestamp + other.wait + + +class Queue(object): + """An in-memory queue with requeuing abilities. + + :param maxlen: upperbound limit + :param timeout: maximum retry interval after which a :func:`retry` call + raises :exception:`Timeout` (defaults to 15 min) + """ + + def __init__(self, maxlen=-1, timeout=900): + self.queue = [] + self.maxlen = maxlen + self.timeout = timeout + + # lock destructive queue operations + self.mutex = threading.Lock() + + def put(self, item): + with self.mutex: + if -1 < self.maxlen < len(self.queue) + 1: + raise queue.Full + + bisect.insort(self.queue, item) + + def get(self): + with self.mutex: + try: + msg = self.queue[0] + except IndexError: + raise queue.Empty + + if msg.timestamp + msg.wait <= time.time(): + return self.queue.pop(0) + + raise queue.Empty + + def retry(self, msg): + self.put(Queue.delay(msg, self.timeout)) + + def requeue(self, msg, timedelta): + self.put(Message(msg.type, msg.data, total_seconds(timedelta))) + + @property + def size(self): + with self.mutex: + return len(self.queue) + + @classmethod + def delay(cls, msg, timeout, delayfunc=lambda i: max(1, i * 2)): + wait = delayfunc(msg.wait) + if wait >= timeout: + raise Timeout("Exceeded time limit of {0}".format(timeout)) + return Message(msg.type, msg.data, 0, wait) + + +class Worker(threading.Thread): + + interval = 0.05 + + def __init__(self, queue, targets): + super(Worker, self).__init__() + + self.alive = True + self.queue = queue + self.targets = targets + + def run(self): + while self.alive: + try: + payload = self.queue.get() + except queue.Empty: + Worker.wait(0.5) + else: + task = self.targets[payload.type] + try: + task.run(payload.data) + except Retry: + try: + self.queue.retry(payload) + except Timeout: + logger.exception("Uncaught exception while retrying " + "%s.run", task) + except Exception: + logger.exception("Uncaught exception while executing " + "%s.run", task) + finally: + if isinstance(task, Cron): + self.queue.requeue(payload, task.timedelta) + + def join(self, timeout=None): + self.alive = False + super(Worker, self).join(timeout) + + @classmethod + def wait(cls, seconds): + """Sleep for :param seconds: but split into :var interval: sleeps to + be interruptable. + """ + f, i = math.modf(seconds / Worker.interval) + + for x in range(int(i)): + time.sleep(Worker.interval) + + time.sleep(f * Worker.interval) + + +class Task(object): + + __metaclass__ = abc.ABCMeta + + @property + def id(self): + return threading.currentThread().ident + + @abc.abstractmethod + def run(self, data): + return + + +class Cron(Task): + + __metaclass__ = abc.ABCMeta + + def __init__(self, timedelta): + self.timedelta = timedelta + + def run(self, data): + return + + +__all__ = ["Full", "Empty", "Retry", "Timeout", "Message", "Queue", "Task", "Cron"] diff --git a/isso/tests/test_queue.py b/isso/tests/test_queue.py new file mode 100644 index 0000000..0b0465f --- /dev/null +++ b/isso/tests/test_queue.py @@ -0,0 +1,91 @@ +# -*- encoding: utf-8 -*- + +from __future__ import unicode_literals + +try: + import unittest2 as unittest +except ImportError: + import unittest + +import datetime + +from isso.queue import Message, Queue, Full, Empty, Timeout + + +class TestMessage(unittest.TestCase): + + def test_message(self): + a = Message("Foo", None) + b = Message("Bar", None) + + self.assertLess(a, b) + + def test_message_delay(self): + a = Message("Foo", None, delay=1) + b = Message("Bar", None, delay=0) + + self.assertGreater(a, b) + + def test_message_wait(self): + a = Message("Foo", None) + b = Message("Foo", None) + a = Queue.delay(a, 1, delayfunc=lambda i: 0.5) + + self.assertGreater(a, b) + + +class TestQueue(unittest.TestCase): + + def test_queue(self): + q = Queue() + msgs = [Message("Foo", None) for _ in range(3)] + + for msg in msgs: + q.put(msg) + + self.assertEqual(q.size, 3) + + for msg in msgs: + self.assertEqual(q.get(), msg) + + self.assertEqual(q.size, 0) + + + def test_queue_full(self): + q = Queue(maxlen=1) + q.put(Message("Foo", None)) + + self.assertRaises(Full, q.put, Message("Bar", None)) + + def test_queue_empty(self): + q = Queue() + msg = Message("Foo", None) + + self.assertRaises(Empty, q.get) + q.put(msg) + q.get() + self.assertRaises(Empty, q.get) + + def test_retry(self): + q = Queue() + msg = Message("Foo", None) + + q.retry(msg) + self.assertRaises(Empty, q.get) + self.assertEqual(q.size, 1) + + def test_retry_timeout(self): + q = Queue(timeout=0) + msg = Message("Foo", None) + + self.assertRaises(Timeout, q.retry, msg) + + def test_requeue(self): + q = Queue() + msg = Message("Foo", None) + + q.put(msg) + q.requeue(q.get(), datetime.timedelta(seconds=1)) + + self.assertRaises(Empty, q.get) + self.assertEqual(q.size, 1)