add in-memory queue, worker and task skeleton
This commit is contained in:
parent
1e542e612a
commit
10960ecf1e
186
isso/queue/__init__.py
Normal file
186
isso/queue/__init__.py
Normal file
@ -0,0 +1,186 @@
|
|||||||
|
# -*- encoding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import unicode_literals, division
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
|
||||||
|
import math
|
||||||
|
import bisect
|
||||||
|
import functools
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
try:
|
||||||
|
import queue
|
||||||
|
except ImportError:
|
||||||
|
import Queue as queue
|
||||||
|
|
||||||
|
from isso.utils import total_seconds
|
||||||
|
|
||||||
|
logger = logging.getLogger("isso")
|
||||||
|
|
||||||
|
Full = queue.Full
|
||||||
|
Empty = queue.Empty
|
||||||
|
|
||||||
|
|
||||||
|
class Retry(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Timeout(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@functools.total_ordering
|
||||||
|
class Message(object):
|
||||||
|
"""Queue payload sortable by time.
|
||||||
|
|
||||||
|
:param type: task type
|
||||||
|
:param data: task payload
|
||||||
|
:param delay: initial delay before the job gets executed
|
||||||
|
:param wait: subsequent delays for retrying
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, type, data, delay=0, wait=0):
|
||||||
|
self.type = type
|
||||||
|
self.data = data
|
||||||
|
|
||||||
|
self.wait = wait
|
||||||
|
self.timestamp = time.time() + delay
|
||||||
|
|
||||||
|
def __le__(self, other):
|
||||||
|
return self.timestamp + self.wait <= other.timestamp + other.wait
|
||||||
|
|
||||||
|
|
||||||
|
class Queue(object):
|
||||||
|
"""An in-memory queue with requeuing abilities.
|
||||||
|
|
||||||
|
:param maxlen: upperbound limit
|
||||||
|
:param timeout: maximum retry interval after which a :func:`retry` call
|
||||||
|
raises :exception:`Timeout` (defaults to 15 min)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, maxlen=-1, timeout=900):
|
||||||
|
self.queue = []
|
||||||
|
self.maxlen = maxlen
|
||||||
|
self.timeout = timeout
|
||||||
|
|
||||||
|
# lock destructive queue operations
|
||||||
|
self.mutex = threading.Lock()
|
||||||
|
|
||||||
|
def put(self, item):
|
||||||
|
with self.mutex:
|
||||||
|
if -1 < self.maxlen < len(self.queue) + 1:
|
||||||
|
raise queue.Full
|
||||||
|
|
||||||
|
bisect.insort(self.queue, item)
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
with self.mutex:
|
||||||
|
try:
|
||||||
|
msg = self.queue[0]
|
||||||
|
except IndexError:
|
||||||
|
raise queue.Empty
|
||||||
|
|
||||||
|
if msg.timestamp + msg.wait <= time.time():
|
||||||
|
return self.queue.pop(0)
|
||||||
|
|
||||||
|
raise queue.Empty
|
||||||
|
|
||||||
|
def retry(self, msg):
|
||||||
|
self.put(Queue.delay(msg, self.timeout))
|
||||||
|
|
||||||
|
def requeue(self, msg, timedelta):
|
||||||
|
self.put(Message(msg.type, msg.data, total_seconds(timedelta)))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def size(self):
|
||||||
|
with self.mutex:
|
||||||
|
return len(self.queue)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def delay(cls, msg, timeout, delayfunc=lambda i: max(1, i * 2)):
|
||||||
|
wait = delayfunc(msg.wait)
|
||||||
|
if wait >= timeout:
|
||||||
|
raise Timeout("Exceeded time limit of {0}".format(timeout))
|
||||||
|
return Message(msg.type, msg.data, 0, wait)
|
||||||
|
|
||||||
|
|
||||||
|
class Worker(threading.Thread):
|
||||||
|
|
||||||
|
interval = 0.05
|
||||||
|
|
||||||
|
def __init__(self, queue, targets):
|
||||||
|
super(Worker, self).__init__()
|
||||||
|
|
||||||
|
self.alive = True
|
||||||
|
self.queue = queue
|
||||||
|
self.targets = targets
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while self.alive:
|
||||||
|
try:
|
||||||
|
payload = self.queue.get()
|
||||||
|
except queue.Empty:
|
||||||
|
Worker.wait(0.5)
|
||||||
|
else:
|
||||||
|
task = self.targets[payload.type]
|
||||||
|
try:
|
||||||
|
task.run(payload.data)
|
||||||
|
except Retry:
|
||||||
|
try:
|
||||||
|
self.queue.retry(payload)
|
||||||
|
except Timeout:
|
||||||
|
logger.exception("Uncaught exception while retrying "
|
||||||
|
"%s.run", task)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Uncaught exception while executing "
|
||||||
|
"%s.run", task)
|
||||||
|
finally:
|
||||||
|
if isinstance(task, Cron):
|
||||||
|
self.queue.requeue(payload, task.timedelta)
|
||||||
|
|
||||||
|
def join(self, timeout=None):
|
||||||
|
self.alive = False
|
||||||
|
super(Worker, self).join(timeout)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def wait(cls, seconds):
|
||||||
|
"""Sleep for :param seconds: but split into :var interval: sleeps to
|
||||||
|
be interruptable.
|
||||||
|
"""
|
||||||
|
f, i = math.modf(seconds / Worker.interval)
|
||||||
|
|
||||||
|
for x in range(int(i)):
|
||||||
|
time.sleep(Worker.interval)
|
||||||
|
|
||||||
|
time.sleep(f * Worker.interval)
|
||||||
|
|
||||||
|
|
||||||
|
class Task(object):
|
||||||
|
|
||||||
|
__metaclass__ = abc.ABCMeta
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self):
|
||||||
|
return threading.currentThread().ident
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def run(self, data):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
class Cron(Task):
|
||||||
|
|
||||||
|
__metaclass__ = abc.ABCMeta
|
||||||
|
|
||||||
|
def __init__(self, timedelta):
|
||||||
|
self.timedelta = timedelta
|
||||||
|
|
||||||
|
def run(self, data):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["Full", "Empty", "Retry", "Timeout", "Message", "Queue", "Task", "Cron"]
|
91
isso/tests/test_queue.py
Normal file
91
isso/tests/test_queue.py
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
# -*- encoding: utf-8 -*-
|
||||||
|
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
|
try:
|
||||||
|
import unittest2 as unittest
|
||||||
|
except ImportError:
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from isso.queue import Message, Queue, Full, Empty, Timeout
|
||||||
|
|
||||||
|
|
||||||
|
class TestMessage(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_message(self):
|
||||||
|
a = Message("Foo", None)
|
||||||
|
b = Message("Bar", None)
|
||||||
|
|
||||||
|
self.assertLess(a, b)
|
||||||
|
|
||||||
|
def test_message_delay(self):
|
||||||
|
a = Message("Foo", None, delay=1)
|
||||||
|
b = Message("Bar", None, delay=0)
|
||||||
|
|
||||||
|
self.assertGreater(a, b)
|
||||||
|
|
||||||
|
def test_message_wait(self):
|
||||||
|
a = Message("Foo", None)
|
||||||
|
b = Message("Foo", None)
|
||||||
|
a = Queue.delay(a, 1, delayfunc=lambda i: 0.5)
|
||||||
|
|
||||||
|
self.assertGreater(a, b)
|
||||||
|
|
||||||
|
|
||||||
|
class TestQueue(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_queue(self):
|
||||||
|
q = Queue()
|
||||||
|
msgs = [Message("Foo", None) for _ in range(3)]
|
||||||
|
|
||||||
|
for msg in msgs:
|
||||||
|
q.put(msg)
|
||||||
|
|
||||||
|
self.assertEqual(q.size, 3)
|
||||||
|
|
||||||
|
for msg in msgs:
|
||||||
|
self.assertEqual(q.get(), msg)
|
||||||
|
|
||||||
|
self.assertEqual(q.size, 0)
|
||||||
|
|
||||||
|
|
||||||
|
def test_queue_full(self):
|
||||||
|
q = Queue(maxlen=1)
|
||||||
|
q.put(Message("Foo", None))
|
||||||
|
|
||||||
|
self.assertRaises(Full, q.put, Message("Bar", None))
|
||||||
|
|
||||||
|
def test_queue_empty(self):
|
||||||
|
q = Queue()
|
||||||
|
msg = Message("Foo", None)
|
||||||
|
|
||||||
|
self.assertRaises(Empty, q.get)
|
||||||
|
q.put(msg)
|
||||||
|
q.get()
|
||||||
|
self.assertRaises(Empty, q.get)
|
||||||
|
|
||||||
|
def test_retry(self):
|
||||||
|
q = Queue()
|
||||||
|
msg = Message("Foo", None)
|
||||||
|
|
||||||
|
q.retry(msg)
|
||||||
|
self.assertRaises(Empty, q.get)
|
||||||
|
self.assertEqual(q.size, 1)
|
||||||
|
|
||||||
|
def test_retry_timeout(self):
|
||||||
|
q = Queue(timeout=0)
|
||||||
|
msg = Message("Foo", None)
|
||||||
|
|
||||||
|
self.assertRaises(Timeout, q.retry, msg)
|
||||||
|
|
||||||
|
def test_requeue(self):
|
||||||
|
q = Queue()
|
||||||
|
msg = Message("Foo", None)
|
||||||
|
|
||||||
|
q.put(msg)
|
||||||
|
q.requeue(q.get(), datetime.timedelta(seconds=1))
|
||||||
|
|
||||||
|
self.assertRaises(Empty, q.get)
|
||||||
|
self.assertEqual(q.size, 1)
|
Loading…
Reference in New Issue
Block a user