remove uWSGI/Process/Threading mixin

* atexit is ignored by uWSGI and the worker thread can not exit
* queue API and naming is not final

Gunicorn ignores an error on shutdown:

    Exception SystemExit: 0 in <module 'threading' from
    '/usr/lib64/python2.7/threading.pyc'> ignored
This commit is contained in:
Martin Zimmermann 2014-06-28 18:03:42 +02:00
parent 10960ecf1e
commit 4f152d03ac
9 changed files with 111 additions and 149 deletions

View File

@ -41,8 +41,12 @@ if sys.argv[0].startswith("isso"):
import os
import errno
import atexit
import logging
import threading
import multiprocessing
from os.path import dirname, join
from argparse import ArgumentParser
from functools import partial, reduce
@ -66,8 +70,7 @@ try:
except ImportError:
uwsgi = None
from isso import cache, config, db, migrate, wsgi, ext, views
from isso.core import ThreadedMixin, ProcessMixin, uWSGIMixin
from isso import cache, config, db, migrate, wsgi, ext, views, queue
from isso.wsgi import origin, urlsplit
from isso.utils import http, JSONRequest, html, hash
from isso.views import comments
@ -85,16 +88,21 @@ logger = logging.getLogger("isso")
class Isso(object):
def __init__(self, conf, cacheobj=None, connection=None,
multiprocessing=False):
queueobj=None, shared=False):
if cacheobj is None:
cacheobj = cache.Cache(threshold=1024)
cacheobj = cache.Cache(1024)
if connection is None:
connection = db.SQLite3(":memory:")
if queueobj is None:
queueobj = queue.Queue(1024)
self.conf = conf
self.cache = cacheobj
self.connection = connection
self.queue = queueobj
self.db = db.Adapter(connection, conf)
self.signer = URLSafeTimedSerializer(self.db.preferences.get("session-key"))
self.markup = html.Markup(
@ -104,9 +112,13 @@ class Isso(object):
self.hasher = hash.new(
conf.get("hash", "algorithm"),
conf.get("hash", "salt"))
self.shared = True if multiprocessing else False
super(Isso, self).__init__(conf)
if shared:
self.lock = multiprocessing.Lock()
self.shared = True
else:
self.lock = threading.Lock()
self.shared = False
subscribers = []
for backend in conf.getlist("general", "notify"):
@ -168,29 +180,24 @@ class Isso(object):
return self.wsgi_app(environ, start_response)
def make_app(conf, multiprocessing=True):
def make_app(conf, shared=False):
connection = db.SQLite3(conf.get("general", "dbpath"))
cacheobj = cache.SQLite3Cache(connection, threshold=2048)
if multiprocessing:
if uwsgi is not None:
class App(Isso, uWSGIMixin):
pass
cacheobj = cache.uWSGICache(timeout=3600)
else:
class App(Isso, ProcessMixin):
pass
if uwsgi is not None:
cacheobj = cache.uWSGICache(timeout=3600)
else:
class App(Isso, ThreadedMixin):
pass
cacheobj = cache.SQLite3Cache(connection, threshold=2048)
isso = App(
conf,
cacheobj=cacheobj,
connection=connection,
multiprocessing=multiprocessing)
jobs = queue.Jobs()
jobs.register("purge-db", db.Adapter(connection, conf), conf.getint("moderation", "purge-after"))
queueobj = queue.Queue(1024)
worker = queue.Worker(queueobj, jobs)
atexit.register(worker.join, 0.25)
isso = Isso(conf, cacheobj, connection, queueobj, shared)
worker.start()
# check HTTP server connection
for host in conf.getiter("general", "host"):
@ -263,7 +270,7 @@ def main():
logger.error("No website(s) configured, Isso won't work.")
sys.exit(1)
app = make_app(conf, multiprocessing=False)
app = make_app(conf)
if conf.get("server", "listen").startswith("http://"):
host, port, _ = urlsplit(conf.get("server", "listen"))

View File

@ -5,7 +5,6 @@ from __future__ import absolute_import, unicode_literals
import time
from . import Base
from isso.db import SQLite3
class SQLite3Cache(Base):

View File

@ -1,84 +0,0 @@
# -*- encoding: utf-8 -*-
from __future__ import print_function
import time
import logging
import threading
import multiprocessing
try:
import uwsgi
except ImportError:
uwsgi = None
from isso.compat import PY2K
if PY2K:
import thread
else:
import _thread as thread
logger = logging.getLogger("isso")
class Mixin(object):
def __init__(self, conf):
self.lock = threading.Lock()
def notify(self, subject, body, retries=5):
pass
def threaded(func):
"""
Decorator to execute each :param func: call in a separate thread.
"""
def dec(self, *args, **kwargs):
thread.start_new_thread(func, (self, ) + args, kwargs)
return dec
class ThreadedMixin(Mixin):
def __init__(self, conf):
super(ThreadedMixin, self).__init__(conf)
if conf.getboolean("moderation", "enabled"):
self.purge(conf.getint("moderation", "purge-after"))
@threaded
def purge(self, delta):
while True:
with self.lock:
self.db.comments.purge(delta)
time.sleep(delta)
class ProcessMixin(ThreadedMixin):
def __init__(self, conf):
super(ProcessMixin, self).__init__(conf)
self.lock = multiprocessing.Lock()
class uWSGIMixin(Mixin):
def __init__(self, conf):
super(uWSGIMixin, self).__init__(conf)
self.lock = multiprocessing.Lock()
timedelta = conf.getint("moderation", "purge-after")
purge = lambda signum: self.db.comments.purge(timedelta)
uwsgi.register_signal(1, "", purge)
uwsgi.add_timer(1, timedelta)
# run purge once
purge(1)

View File

@ -226,7 +226,7 @@ class Comments:
def purge(self, delta):
"""
Remove comments older than :param:`delta`.
Remove unmoderated comments older than :param:`delta`.
"""
self.db.execute([
'DELETE FROM comments WHERE mode = 2 AND ? - created > ?;'

View File

@ -3,6 +3,7 @@
from __future__ import unicode_literals, division
import abc
import json
import logging
import threading
@ -11,6 +12,7 @@ import bisect
import functools
import time
import datetime
try:
import queue
@ -18,6 +20,7 @@ except ImportError:
import Queue as queue
from isso.utils import total_seconds
from isso.compat import iteritems
logger = logging.getLogger("isso")
@ -59,10 +62,10 @@ class Queue(object):
:param maxlen: upperbound limit
:param timeout: maximum retry interval after which a :func:`retry` call
raises :exception:`Timeout` (defaults to 15 min)
raises :exception:`Timeout` (defaults to ~34 min)
"""
def __init__(self, maxlen=-1, timeout=900):
def __init__(self, maxlen=-1, timeout=2**10):
self.queue = []
self.maxlen = maxlen
self.timeout = timeout
@ -80,14 +83,16 @@ class Queue(object):
def get(self):
with self.mutex:
try:
msg = self.queue[0]
msg = self.queue.pop(0)
except IndexError:
raise queue.Empty
if msg.timestamp + msg.wait <= time.time():
return self.queue.pop(0)
return msg
raise queue.Empty
self.queue.insert(0, msg)
raise queue.Empty
def retry(self, msg):
self.put(Queue.delay(msg, self.timeout))
@ -109,6 +114,17 @@ class Queue(object):
class Worker(threading.Thread):
"""Thread that pulls data from the queue, does the actual work. If the queue
is empty, sleep for longer intervals (see :func:`wait` for details)
On startup, all recurring tasks are automatically queued with zero delay
to run at least once.
A task may throw :exception Retry: to indicate a expected failure (e.g.
network not reachable) and asking to retry later.
:param queue: a Queue
:param targets: a mapping of task names and the actual task objects"""
interval = 0.05
@ -119,6 +135,10 @@ class Worker(threading.Thread):
self.queue = queue
self.targets = targets
for name, target in iteritems(targets):
if isinstance(target, Cron):
queue.put(Message(name, None))
def run(self):
while self.alive:
try:
@ -126,8 +146,13 @@ class Worker(threading.Thread):
except queue.Empty:
Worker.wait(0.5)
else:
task = self.targets[payload.type]
task = self.targets.get(payload.type)
if task is None:
logger.warn("No such task '%s'", payload.type)
continue
try:
logger.debug("Executing {0} with '{1}'".format(
payload.type, json.dumps(payload.data)))
task.run(payload.data)
except Retry:
try:
@ -163,10 +188,6 @@ class Task(object):
__metaclass__ = abc.ABCMeta
@property
def id(self):
return threading.currentThread().ident
@abc.abstractmethod
def run(self, data):
return
@ -176,11 +197,45 @@ class Cron(Task):
__metaclass__ = abc.ABCMeta
def __init__(self, timedelta):
self.timedelta = timedelta
def __init__(self, *args, **kwargs):
self.timedelta = datetime.timedelta(*args, **kwargs)
def run(self, data):
return
__all__ = ["Full", "Empty", "Retry", "Timeout", "Message", "Queue", "Task", "Cron"]
class PurgeDB(Cron):
def __init__(self, db, after):
super(PurgeDB, self).__init__(hours=1)
self.db = db
self.after = after
def run(self, data):
self.db.comments.purge(self.after)
class Jobs(dict):
"""Obviously a poor man's factory"""
available = {
"purge-db": PurgeDB
}
def __init__(self):
super(Jobs, self).__init__()
def register(self, name, *args, **kwargs):
if name in self:
return
try:
cls = Jobs.available[name]
except KeyError:
raise RuntimeError("No such task '%s'" % name)
self[name] = cls(*args, **kwargs)
__all__ = ["Full", "Empty", "Retry", "Timeout", "Message", "Queue", "Targets",
"Task", "Cron"]

View File

@ -8,4 +8,5 @@ from isso import dist, config
application = make_app(
config.load(
os.path.join(dist.location, "isso", "defaults.ini"),
os.environ.get('ISSO_SETTINGS')))
os.environ.get('ISSO_SETTINGS')),
shared=True)

View File

@ -17,7 +17,7 @@ except ImportError:
from werkzeug.wrappers import Response
from isso import Isso, core, config, dist
from isso import Isso, config, dist
from isso.utils import http
from isso.views import comments
@ -34,10 +34,7 @@ class TestComments(unittest.TestCase):
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
class App(Isso, core.Mixin):
pass
self.app = App(conf)
self.app = Isso(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = JSONClient(self.app, Response)
@ -378,10 +375,7 @@ class TestModeratedComments(unittest.TestCase):
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
class App(Isso, core.Mixin):
pass
self.app = App(conf)
self.app = Isso(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = JSONClient(self.app, Response)
@ -405,10 +399,7 @@ class TestPurgeComments(unittest.TestCase):
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
class App(Isso, core.Mixin):
pass
self.app = App(conf)
self.app = Isso(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = JSONClient(self.app, Response)

View File

@ -9,13 +9,12 @@ except ImportError:
import os
import json
import tempfile
from werkzeug import __version__
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, config, core, db, dist
from isso import Isso, config, db, dist
from isso.utils import http
from fixtures import curl, FakeIP
@ -44,10 +43,7 @@ class TestGuard(unittest.TestCase):
conf.set("guard", "direct-reply", str(direct_reply))
conf.set("guard", "reply-to-self", "1" if self_reply else "0")
class App(Isso, core.Mixin):
pass
app = App(conf, connection=self.connection)
app = Isso(conf, connection=self.connection)
app.wsgi_app = FakeIP(app.wsgi_app, ip)
return Client(app, Response)

View File

@ -11,7 +11,7 @@ except ImportError:
from werkzeug.wrappers import Response
from isso import Isso, cache, core, config, dist
from isso import Isso, config, dist
from isso.utils import http
from fixtures import curl, loads, FakeIP, JSONClient
@ -25,10 +25,7 @@ class TestVote(unittest.TestCase):
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
class App(Isso, core.Mixin):
pass
self.app = App(conf)
self.app = Isso(conf)
def makeClient(self, ip):
return JSONClient(FakeIP(self.app.wsgi_app, ip), Response)