refactor all the things (use werkzeug instead of NIH to handle WSGI)

Also: use ?uri=%2Fpath%2F as path indicator.
This commit is contained in:
Martin Zimmermann 2013-09-02 14:44:53 +02:00
parent 76d6d46521
commit dd4ba9263a
10 changed files with 230 additions and 460 deletions

82
README
View File

@ -1,79 +1,62 @@
Isso Ich schrei sonst
=======================
You love static blog generators (especially [Acrylamid][1] *cough*) and the
only option to interact with the community is [Disqus][2]. There's nothing
You love static blog generators (especially Acrylamid__ *cough*) and the
only option to interact with the community is Disqus__. There's nothing
wrong with it, but if you care about the privacy of your audience you are
better off with a comment system that is under your control. This is, were
Isso comes into play.
[1]: https://github.com/posativ/acrylamid
[2]: http://disqus.com/
.. __: https://github.com/posativ/acrylamid
.. __: http://disqus.com/
Current Status
--------------
- `nosetests specs/` ⇾ *Ran 17 tests in 0.491s*
- `(cd /path/static/html/ && isso)` ⇾ fire up your browser and visit `localhost:8080`
Features
--------
Features/Roadmap
----------------
...
- [x] lightweight backend (SQLite)
- [ ] transparent backend (JSON)
- [x] simple JSON API, hence comments are JavaScript-only
- [x] create comments and modify/delete within a time range as user
- [ ] Ping/Trackback support
- [x] simple admin interface
- [w] easy integration, similar to Disqus
- [ ] spam filtering using [http:bl][3]
[3]: https://www.projecthoneypot.org/
Roadmap
-------
Development
-----------
- lightweight SQLite backend
- create comments and modify/delete within a time range as user
- Ping/TrackBack™ support
- simple admin interface
- easy integration snippet, similar to Disqus
- spam filtering using `http:bl`__
*Note:* This project is proudly made with the Not Invented Here syndrome,
instead of `werkzeug` or `bottle` it uses about 150 lines to manage WSGI
and instead of JQuery it uses ender.js.
.. __: https://www.projecthoneypot.org/
You'll need [2.6 ≤ python ≤ 2.7][4], [ender.js][5] and [YUI Compressor][6].
Then run:
$ git clone https://github.com/posativ/isso.git && cd isso/
$ make init
$ make js
$ isso
Then go to <http://localhost:8080/static/post.html> and write a comment :-)
[4]: http://python.org
[5]: http://ender.no.de/
[6]: http://developer.yahoo.com/yui/compressor/
Installation
------------
Still a TODO, but later it's simply `easy_install isso`.
Migration from Disqus
---------------------
Go to [disqus.com](https://disqus.com/) and export your "forum" as XML. If you
use Firefox and you get a 403, try a webkit browser, Disqus did something very
weird with that download link. Next:
Go to `disqus.com <https://disqus.com/>`_ and export your "forum" as XML. If you
use Firefox and you get a 403, try a WebKit browser, Disqus did something very
weird with that download link. Next::
$ isso import /path/to/ur/dump.xml
That's it. Visit your admin page to see all threads. If it doesn't work for
you, please file in a bug report \*including\* your dump.
API
---
To fetch all comments for a path, run
To fetch all comments for `http://example.tld/foo-bar/`, run
$ curl http://example.org/comment/foo-bar/
::
$ curl http://example.tld/isso?uri=%2Ffoo-bar%2F
To write a comment, you have to POST a JSON dictionary with the following
key-value pairs. Text is mandatory otherwise you'll get a 400 Bad Request.
@ -81,7 +64,9 @@ You'll also get a 400 when your JSON is invalid.
Let's say you want to comment on /foo-bar/
$ curl http://example.org/comment/foo-bar/new -X POST -d \
::
$ curl http://example.tld/isso/new?uri=%2Ffoo-bar%2F -X POST -d \
'{
"text": "Lorem ipsum ...",
"name": "Hans", "email": "foo@bla.org", "website": "http://blog/log/"
@ -93,9 +78,9 @@ that cookie, it is cryptographically signed. If your cookie is outdated or
modified, you'll get a 403 Forbidden.
For each comment you'll post, you get an unique cookie. Let's try to remove
your comment:
your comment::
$ curl -X DELETE http://example.org/comment/foo-bar/1
$ curl -X DELETE 'http://example.tld/isso?uri=%2Ffoo-bar%2F&id=1'
If your comment has been referenced by another comment, your comment will be
cleared but not deleted to retain depending comments.
@ -103,6 +88,7 @@ cleared but not deleted to retain depending comments.
Alternatives
------------
- [Juvia](https://github.com/phusion/juvia) Ruby on Rails
- [Tildehash.com](http://www.tildehash.com/?article=why-im-reinventing-disqus) PHP
- [SO: Unobtrusive, self-hosted comments](http://stackoverflow.com/q/2053217)
- `talkatv <https://github.com/talkatv/talkatv>`_ Python
- `Juvia <https://github.com/phusion/juvia)>`_ Ruby on Rails
- `Tildehash.com <http://www.tildehash.com/?article=why-im-reinventing-disqus>`_ PHP
- `SO: Unobtrusive, self-hosted comments <http://stackoverflow.com/q/2053217>`_

View File

@ -16,81 +16,57 @@
#
# The views and conclusions contained in the software and documentation are
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of posativ <info@posativ.org>.
# policies, either expressed or implied, of Martin Zimmermann <info@posativ.org>.
#
# Isso a lightweight Disqus alternative
__version__ = '0.3'
from __future__ import print_function
import sys; reload(sys)
sys.setdefaultencoding('utf-8') # we only support UTF-8 and python 2.X :-)
import pkg_resources
dist = pkg_resources.get_distribution("isso")
import sys
import io
import os
import json
import locale
import traceback
from os.path import dirname, join
from optparse import OptionParser, make_option
import misaka
from itsdangerous import URLSafeTimedSerializer
from isso import admin, comment, db, migrate, wsgi
from isso.utils import determine, import_object, IssoEncoder
from werkzeug.routing import Map, Rule
from werkzeug.wrappers import Response, Request
from werkzeug.exceptions import HTTPException, NotFound, InternalServerError
# override default json :func:`dumps`.
_dumps = json.dumps
setattr(json, 'dumps', lambda obj, **kw: _dumps(obj, cls=IssoEncoder, **kw))
from werkzeug.wsgi import SharedDataMiddleware
from werkzeug.serving import run_simple
# set user's preferred locale, XXX conflicts with email.util.parse_date ... m(
# locale.setlocale(locale.LC_ALL, '')
from isso import comment, db, utils, migrate
url_map = Map([
Rule('/', methods=['HEAD', 'GET'], endpoint='comment.get'),
Rule('/', methods=['PUT', 'DELETE'], endpoint='comment.modify'),
Rule('/new', methods=['POST'], endpoint='comment.create'),
])
class Isso(object):
PRODUCTION = True
SECRET = 'secret'
SECRET_KEY = ',\x1e\xbaY\xbb\xdf\xe7@\x85\xe3\xd9\xb4A9\xe4G\xa6O'
MODERATION = False
SQLITE = None
HOST = 'http://localhost:8080/'
BASE_URL = 'http://localhost:8080/'
MAX_AGE = 15 * 60
PRODUCTION = False
HTTP_STATUS_CODES = {
200: 'Ok', 201: 'Created', 202: 'Accepted',
301: 'Moved Permanently', 304: 'Not Modified',
400: 'Bad Request', 404: 'Not Found', 403: 'Forbidden',
500: 'Internal Server Error',
}
def __init__(self, dbpath, secret, base_url, max_age):
def __init__(self, conf):
self.DBPATH = dbpath
self.BASE_URL = utils.normalize(base_url)
self.__dict__.update(dict((k, v) for k, v in conf.iteritems() if k.isupper()))
self.signer = URLSafeTimedSerializer(self.SECRET_KEY)
self.HOST = determine(self.HOST)
if self.SQLITE:
self.db = db.SQLite(self)
self.markup = import_object(conf.get('MARKUP', 'isso.markup.Markdown'))(conf)
self.adapter = map(
lambda r: (wsgi.Rule(r[0]), r[1], r[2] if isinstance(r[2], list) else [r[2]]), [
# moderation panel
('/admin/', admin.index, ['HEAD', 'GET', 'POST']),
# assets
('/<(static|js):directory>/<(.+?):path>', wsgi.static, ['HEAD', 'GET']),
# comment API, note that the client side quotes the URL, but this is
# actually unnecessary. PEP 333 aka WSGI always unquotes PATH_INFO.
('/1.0/<(.+?):path>/new', comment.create,'POST'),
('/1.0/<(.+?):path>/<(int):id>', comment.get, ['HEAD', 'GET']),
('/1.0/<(.+?):path>/<(int):id>', comment.modify, ['PUT', 'DELETE']),
('/1.0/<(.+?):path>/<(int):id>/approve', comment.approve, 'PUT'),
('/1.0/<(.+?):path>', comment.get, 'GET'),
])
self.db = db.SQLite(dbpath, moderation=False)
self.signer = URLSafeTimedSerializer(secret)
def sign(self, obj):
return self.signer.dumps(obj)
@ -98,105 +74,80 @@ class Isso(object):
def unsign(self, obj):
return self.signer.loads(obj, max_age=self.MAX_AGE)
def status(self, code):
return '%i %s' % (code, self.HTTP_STATUS_CODES[code])
def markdown(self, text):
return misaka.html(text, extensions=misaka.EXT_STRIKETHROUGH \
| misaka.EXT_SUPERSCRIPT | misaka.EXT_AUTOLINK \
| misaka.HTML_SKIP_HTML | misaka.HTML_SKIP_IMAGES | misaka.HTML_SAFELINK)
def dispatch(self, path, method):
@classmethod
def dumps(cls, obj, **kw):
return json.dumps(obj, cls=utils.IssoEncoder, **kw)
for rule, handler, methods in self.adapter:
if isinstance(methods, basestring):
methods = [methods, ]
if method not in methods:
continue
m = rule.match(path)
if m is not None:
return handler, m
def dispatch(self, request, start_response):
adapter = url_map.bind_to_environ(request.environ)
try:
endpoint, values = adapter.match()
if hasattr(endpoint, '__call__'):
handler = endpoint
else:
return (lambda app, environ, request, **kw: (404, 'Not Found', {}), {})
module, function = endpoint.split('.', 1)
handler = getattr(globals()[module], function)
return handler(self, request.environ, request, **values)
except NotFound as e:
return Response('Not Found', 404)
except HTTPException as e:
return e
except InternalServerError as e:
return Response(e, 500)
def wsgi(self, environ, start_response):
try:
request = wsgi.Request(environ)
handler, kwargs = self.dispatch(environ['PATH_INFO'], request.method)
code, body, headers = handler(self, environ, request, **kwargs)
if code == 404:
try:
code, body, headers = wsgi.sendfile(environ['PATH_INFO'], os.getcwd(), environ)
except (IOError, OSError):
try:
path = environ['PATH_INFO'].rstrip('/') + '/index.html'
code, body, headers = wsgi.sendfile(path, os.getcwd(), environ)
except (IOError, OSError):
pass
if request == 'HEAD':
body = ''
start_response(self.status(code), headers.items())
return body
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
traceback.print_exc(file=sys.stderr)
headers = [('Content-Type', 'text/html; charset=utf-8')]
start_response(self.status(500), headers)
return '<h1>' + self.status(500) + '</h1>'
def wsgi_app(self, environ, start_response):
response = self.dispatch(Request(environ), start_response)
return response(environ, start_response)
def __call__(self, environ, start_response):
return self.wsgi(environ, start_response)
class ReverseProxied(object):
def __init__(self, app, prefix=None):
self.app = app
self.prefix = prefix if prefix is not None else ''
def __call__(self, environ, start_response):
script_name = environ.get('HTTP_X_SCRIPT_NAME', self.prefix)
if script_name:
environ['SCRIPT_NAME'] = script_name
path_info = environ['PATH_INFO']
if path_info.startswith(script_name):
environ['PATH_INFO'] = path_info[len(script_name):]
scheme = environ.get('HTTP_X_SCHEME', '')
if scheme:
environ['wsgi.url_scheme'] = scheme
return self.app(environ, start_response)
return self.wsgi_app(environ, start_response)
def main():
options = [
make_option("--version", action="store_true", help="print version info and exit"),
make_option("--sqlite", dest="sqlite", metavar='FILE', default="/tmp/sqlite.db",
help="use SQLite3 database"),
make_option("--port", dest="port", default=8000, help="webserver port"),
make_option("--version", action="store_true",
help="print version info and exit"),
make_option("--dbpath", dest="dbpath", metavar='FILE', default=":memory:",
help="database location"),
make_option("--base-url", dest="base_url", default="http://localhost:8080/",
help="set base url for comments"),
make_option("--max-age", dest="max_age", default=15*60, type=int,
help="..."),
make_option("--host", dest="host", default="localhost",
help="webserver address"),
make_option("--port", dest="port", default=8080,
help="webserver port"),
]
parser = OptionParser(option_list=options)
options, args = parser.parse_args()
if options.version:
print 'isso', __version__
print('isso', dist.version)
sys.exit(0)
app = Isso({'SQLITE': options.sqlite, 'MODERATION': True})
isso = Isso(dbpath=options.dbpath, secret=utils.mksecret(12),
base_url=options.base_url, max_age=options.max_age)
if len(args) > 0 and args[0] == 'import':
if len(args) < 2:
print 'Usage: isso import FILE'
print('Usage: isso import FILE')
sys.exit(2)
with io.open(args[1], encoding='utf-8') as fp:
migrate.disqus(app.db, fp.read())
migrate.disqus(isso.db, args[1])
sys.exit(0)
from wsgiref.simple_server import make_server
httpd = make_server('127.0.0.1', 8080, app, server_class=wsgi.ThreadedWSGIServer)
httpd.serve_forever()
app = SharedDataMiddleware(isso.wsgi_app, {
'/static': join(dirname(__file__), 'static/')
})
print(' * Session Key:', isso.signer.secret_key)
run_simple(options.host, options.port, app, threaded=True)

View File

@ -8,95 +8,126 @@ import urllib
from itsdangerous import SignatureExpired, BadSignature
from isso import json, models, utils, wsgi
from werkzeug.wrappers import Response
from werkzeug.exceptions import abort
from isso import models, utils, requires
def create(app, environ, request, path):
class requires:
if app.PRODUCTION and not utils.urlexists(app.HOST, path):
return 400, 'URL does not exist', {}
def __init__(self, type, param):
self.param = param
self.type = type
def __call__(self, func):
def dec(app, env, req, *args, **kwargs):
if self.param not in req.args:
abort(400)
try:
kwargs[self.param] = self.type(req.args[self.param])
except TypeError:
abort(400)
return func(app, env, req, *args, **kwargs)
return dec
@requires(str, 'uri')
def create(app, environ, request, uri):
if app.PRODUCTION and not utils.urlexists(app.HOST, uri):
return Response('URI does not exist', 400)
try:
comment = models.Comment.fromjson(request.data)
except ValueError as e:
return 400, unicode(e), {}
return Response(unicode(e), 400)
for attr in 'author', 'email', 'website':
if getattr(comment, attr) is not None:
try:
setattr(comment, attr, cgi.escape(getattr(comment, attr)))
except AttributeError:
return 400, '', {}
Response('', 400)
try:
rv = app.db.add(path, comment)
rv = app.db.add(uri, comment)
except ValueError:
return 400, '', {}
abort(400) # FIXME: custom exception class, error descr
md5 = rv.md5
rv.text = app.markup.convert(rv.text)
rv.text = app.markdown(rv.text)
return 202 if rv.pending else 201, json.dumps(rv), {
'Content-Type': 'application/json',
'Set-Cookie': wsgi.setcookie('%s-%s' % (path, rv.id),
app.sign([path, rv.id, md5]), max_age=app.MAX_AGE, path='/')
}
resp = Response(app.dumps(rv), 202 if rv.pending else 201,
content_type='application/json')
resp.set_cookie('%s-%s' % (uri, rv.id), app.sign([uri, rv.id, md5]),
max_age=app.MAX_AGE, path='/')
return resp
def get(app, environ, request, path, id=None):
@requires(str, 'uri')
def get(app, environ, request, uri):
rv = list(app.db.retrieve(path)) if id is None else app.db.get(path, id)
id = request.args.get('id', None)
rv = list(app.db.retrieve(uri)) if id is None else app.db.get(uri, id)
if not rv:
return 404, '', {}
abort(404)
if request.args.get('plain', '0') == '0':
if request.args.get('plain', '1') == '0':
if isinstance(rv, list):
for item in rv:
item.text = app.markup.convert(item.text)
item.text = app.markdown(item.text)
else:
rv.text = app.markup.convert(rv.text)
rv.text = app.markdown(rv.text)
return 200, json.dumps(rv), {'Content-Type': 'application/json'}
return Response(app.dumps(rv), 200, content_type='application/json')
def modify(app, environ, request, path, id):
@requires(str, 'uri')
@requires(int, 'id')
def modify(app, environ, request, uri, id):
try:
rv = app.unsign(request.cookies.get('%s-%s' % (urllib.quote(path, ''), id), ''))
rv = app.unsign(request.cookies.get('%s-%s' % (uri, id), ''))
except (SignatureExpired, BadSignature) as e:
try:
rv = app.unsign(request.cookies.get('admin', ''))
except (SignatureExpired, BadSignature):
return 403, '', {}
abort(403)
# verify checksum, mallory might skip cookie deletion when he deletes a comment
if not (rv == '*' or rv[0:2] == [path, id] or app.db.get(path, id).md5 != rv[2]):
return 403, '', {}
if not (rv == '*' or rv[0:2] == [uri, id] or app.db.get(uri, id).md5 != rv[2]):
abort(403)
if request.method == 'PUT':
try:
rv = app.db.update(path, id, models.Comment.fromjson(request.data))
rv.text = app.markup.convert(rv.text)
return 200, json.dumps(rv), {'Content-Type': 'application/json'}
rv = app.db.update(uri, id, models.Comment.fromjson(request.data))
rv.text = app.markdown(rv.text)
return Response(app.dumps(rv), 200, content_type='application/json')
except ValueError as e:
return 400, unicode(e), {}
return Response(unicode(e), 400) # FIXME: custom exception and error descr
if request.method == 'DELETE':
rv = app.db.delete(path, id)
rv = app.db.delete(uri, id)
return 200, json.dumps(rv), {
'Content-Type': 'application/json',
'Set-Cookie': wsgi.setcookie(path + '-' + str(id), 'deleted', max_age=0, path='/')
}
resp = Response(app.dumps(rv), 200, content_type='application/json')
resp.delete_cookie(uri + '-' + str(id), path='/')
return resp
def approve(app, environ, request, path, id):
try:
if app.unsign(request.cookies.get('admin', '')) != '*':
return 403, '', {}
abort(403)
except (SignatureExpired, BadSignature):
return 403, '', {}
abort(403)
app.db.activate(path, id)
return 200, json.dumps(app.db.get(path, id)), {'Content-Type': 'application/json'}
return Response(app.dumps(app.db.get(path, id)), 200,
content_type='application/json')

View File

@ -15,7 +15,7 @@ class Abstract:
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self, app):
def __init__(self, dbpath, moderation):
return
@abc.abstractmethod
@ -79,10 +79,10 @@ class SQLite(Abstract):
'text', 'author', 'email', 'website', 'parent', 'mode'
]
def __init__(self, app):
def __init__(self, dbpath, moderation):
self.dbpath = app.SQLITE
self.mode = 2 if app.MODERATION else 1
self.dbpath = dbpath
self.mode = 2 if moderation else 1
with sqlite3.connect(self.dbpath) as con:
sql = ('main.comments (path VARCHAR(255) NOT NULL, id INTEGER NOT NULL,'

View File

@ -1,28 +0,0 @@
# XXX: BBCode -- http://pypi.python.org/pypi/bbcode
try:
import misaka
except ImportError:
misaka = None # NOQA
class Markup:
def __init__(self, conf):
return
def convert(self, text):
return text
class Markdown(Markup):
def __init__(self, conf):
if misaka is None:
raise ImportError("Markdown requires 'misaka' lib!")
return
def convert(self, text):
return misaka.html(text, extensions = misaka.EXT_STRIKETHROUGH \
| misaka.EXT_SUPERSCRIPT | misaka.EXT_AUTOLINK \
| misaka.HTML_SKIP_HTML | misaka.HTML_SKIP_IMAGES | misaka.HTML_SAFELINK)

View File

@ -4,10 +4,14 @@
# License: BSD Style, 2 clauses. see isso/__init__.py
import json
import socket
import httplib
import random
import contextlib
from string import ascii_letters, digits
from urlparse import urlparse
from isso.models import Comment
@ -31,7 +35,7 @@ def urlexists(host, path):
return con.getresponse().status == 200
def determine(host):
def normalize(host):
"""Make `host` compatible with :py:mod:`httplib`."""
if not host.startswith(('http://', 'https://')):
@ -40,10 +44,5 @@ def determine(host):
return (rv.netloc + ':443') if rv.scheme == 'https' else rv.netloc
def import_object(name):
if '.' not in name:
return __import__(name)
parts = name.split('.')
obj = __import__('.'.join(parts[:-1]), None, None, [parts[-1]], 0)
return getattr(obj, parts[-1])
def mksecret(length):
return ''.join(random.choice(ascii_letters + digits) for x in range(length))

View File

@ -1,163 +0,0 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import io
import os
import re
import cgi
import wsgiref
import tempfile
import urlparse
import mimetypes
from time import gmtime, mktime, strftime, strptime, timezone
from email.utils import parsedate
from os.path import join, dirname, abspath
from urllib import quote
from Cookie import SimpleCookie
from SocketServer import ThreadingMixIn
from wsgiref.simple_server import WSGIServer
class Request(object):
"""A ``werkzeug.wrappers.Request``-like object but much less powerful.
Fits exactly the needs of Isso."""
def __init__(self, environ):
# from bottle.py Copyright 2012, Marcel Hellkamp, License: MIT.
maxread = max(0, int(environ['CONTENT_LENGTH'] or '0'))
stream = environ['wsgi.input']
body = tempfile.TemporaryFile(mode='w+b')
while maxread > 0:
part = stream.read(maxread)
if not part:
break
body.write(part)
maxread -= len(part)
self.body = body
self.body.seek(0)
self.environ = environ
self.query_string = self.environ['QUERY_STRING']
@property
def data(self):
self.body.seek(0)
return self.body.read()
@property
def method(self):
return self.environ['REQUEST_METHOD']
@property
def args(self):
return urlparse.parse_qs(self.environ['QUERY_STRING'])
@property
def form(self):
if self.environ['CONTENT_TYPE'] == 'application/x-www-form-urlencoded':
return cgi.FieldStorage(fp=self.body, environ=self.environ)
return dict()
@property
def cookies(self):
cookie = SimpleCookie(self.environ.get('HTTP_COOKIE', ''))
return {v.key: v.value for v in cookie.values()}
class Rule(str):
"""A quick and dirty approach to URL route creation. It uses the
following format:
- ``<(int):name>`` matches any integer, same for float
- ``<(.+?):path>`` matches any given regular expression
With ``Rule.match(url)`` you can test whether a route a) matches
and if so b) retrieve saved variables."""
repl = {'int': r'[0-9]+', 'float': r'\-?[0-9]+\.[0-9]+'}
def __init__(self, string):
first, last, rv = 0, -1, []
f = lambda m: '(?P<%s>%s)' % (m.group(2), self.repl.get(m.group(1), m.group(1)))
for i, c in enumerate(string):
if c == '<':
first = i
rv.append(re.escape(string[last+1:first]))
if c == '>':
last = i
if last > first:
rv.append(re.sub(r'<\(([^:]+)\):(\w+)>', f, string[first:last+1]))
first = last
rv.append(re.escape(string[last+1:]))
self.rule = re.compile('^' + ''.join(rv) + '$')
def match(self, obj):
match = re.match(self.rule, obj)
if not match: return
kwargs = match.groupdict()
for key, value in kwargs.items():
for type in int, float: # may convert false-positives
try:
kwargs[key] = type(value)
break
except ValueError:
pass
return kwargs
def sendfile(filename, root, environ):
"""Return file object if found. This function is heavily inspired by
bottles's `static_file` function and uses the same mechanism to avoid
access to e.g. `/etc/shadow`."""
headers = {}
root = abspath(root) + os.sep
filename = abspath(join(root, filename.strip('/\\')))
if not filename.startswith(root):
return 403, '', headers
mimetype, encoding = mimetypes.guess_type(filename)
if mimetype: headers['Content-Type'] = mimetype
if encoding: headers['Content-Encoding'] = encoding
stats = os.stat(filename)
headers['Content-Length'] = str(stats.st_size)
headers['Last-Modified'] = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime(stats.st_mtime))
ims = environ.get('HTTP_IF_MODIFIED_SINCE')
if ims:
ims = parsedate(ims)
if ims is not None and mktime(ims) - timezone >= stats.st_mtime:
headers['Date'] = strftime("%a, %d %b %Y %H:%M:%S GMT", gmtime())
return 304, '', headers
return 200, io.open(filename, 'rb'), headers
def static(app, environ, request, directory, path):
"""A view that returns the requested path from directory."""
try:
return sendfile(path, join(dirname(__file__), directory), environ)
except (OSError, IOError):
return 404, '', {}
def setcookie(name, value, **kwargs):
return '; '.join([quote(name, '') + '=' + quote(value, '')] +
[k.replace('_', '-') + '=' + str(v) for k, v in kwargs.iteritems()])
class ThreadedWSGIServer(ThreadingMixIn, WSGIServer):
pass

View File

@ -1,17 +1,11 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import sys
import re
from setuptools import setup, find_packages
version = re.search("__version__ = '([^']+)'",
open('isso/__init__.py').read()).group(1)
setup(
name='isso',
version=version,
version='0.1',
author='Martin Zimmermann',
author_email='info@posativ.org',
packages=find_packages(),
@ -29,7 +23,7 @@ setup(
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 2.7"
],
install_requires=['mako', 'itsdangerous'],
install_requires=['Jinja2>=2.7', 'werkzeug>=0.9', 'itsdangerous', 'misaka'],
entry_points={
'console_scripts':
['isso = isso:main'],

View File

@ -1,4 +1,5 @@
import json
import urllib
import tempfile
import unittest
@ -6,20 +7,19 @@ import unittest
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, json
from isso import Isso
from isso.models import Comment
def comment(**kw):
return Comment.fromjson(json.dumps(kw))
return Comment.fromjson(Isso.dumps(kw))
class TestComments(unittest.TestCase):
def setUp(self):
fd, self.path = tempfile.mkstemp()
self.app = Isso({'SQLITE': self.path, 'PRODUCTION': False,
'MARKUP': 'isso.markup.Markup'})
self.app = Isso(self.path, '...', '...', 15*60)
self.client = Client(self.app, Response)
self.get = lambda *x, **z: self.client.get(*x, **z)
@ -29,8 +29,8 @@ class TestComments(unittest.TestCase):
def testGet(self):
self.post('/1.0/path/new', data=json.dumps(comment(text='Lorem ipsum ...')))
r = self.get('/1.0/path/1')
self.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='Lorem ipsum ...')))
r = self.get('/?uri=%2Fpath%2F&id=1')
assert r.status_code == 200
rv = json.loads(r.data)
@ -40,7 +40,7 @@ class TestComments(unittest.TestCase):
def testCreate(self):
rv = self.post('/1.0/path/new', data=json.dumps(comment(text='Lorem ipsum ...')))
rv = self.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='Lorem ipsum ...')))
assert rv.status_code == 201
assert len(filter(lambda header: header[0] == 'Set-Cookie', rv.headers)) == 1
@ -49,14 +49,14 @@ class TestComments(unittest.TestCase):
assert not c.pending
assert not c.deleted
assert c.text == 'Lorem ipsum ...'
assert c.text == '<p>Lorem ipsum ...</p>\n'
def testCreateAndGetMultiple(self):
for i in range(20):
self.post('/1.0/path/new', data=json.dumps(comment(text='Spam')))
self.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='Spam')))
r = self.get('/1.0/path')
r = self.get('/?uri=%2Fpath%2F')
assert r.status_code == 200
rv = json.loads(r.data)
@ -64,17 +64,17 @@ class TestComments(unittest.TestCase):
def testGetInvalid(self):
assert self.get('/1.0/path/123').status_code == 404
assert self.get('/1.0/path/spam/123').status_code == 404
assert self.get('/1.0/foo/').status_code == 404
assert self.get('/?uri=%2Fpath%2F&id=123').status_code == 404
assert self.get('/?uri=%2Fpath%2Fspam%2F&id=123').status_code == 404
assert self.get('/?uri=?uri=%foo%2F').status_code == 404
def testUpdate(self):
self.post('/1.0/path/new', data=json.dumps(comment(text='Lorem ipsum ...')))
self.put('/1.0/path/1', data=json.dumps(comment(
self.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='Lorem ipsum ...')))
self.put('/?uri=%2Fpath%2F&id=1', data=Isso.dumps(comment(
text='Hello World', author='me', website='http://example.com/')))
r = self.get('/1.0/path/1')
r = self.get('/?uri=%2Fpath%2F&id=1&plain=1')
assert r.status_code == 200
rv = json.loads(r.data)
@ -85,45 +85,45 @@ class TestComments(unittest.TestCase):
def testDelete(self):
self.post('/1.0/path/new', data=json.dumps(comment(text='Lorem ipsum ...')))
r = self.delete('/1.0/path/1')
self.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='Lorem ipsum ...')))
r = self.delete('/?uri=%2Fpath%2F&id=1')
assert r.status_code == 200
assert json.loads(r.data) == None
assert self.get('/1.0/path/1').status_code == 404
assert self.get('/?uri=%2Fpath%2F&id=1').status_code == 404
def testDeleteWithReference(self):
client = Client(self.app, Response)
client.post('/1.0/path/new', data=json.dumps(comment(text='First')))
client.post('/1.0/path/new', data=json.dumps(comment(text='First', parent=1)))
client.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='First')))
client.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='First', parent=1)))
r = client.delete('/1.0/path/1')
r = client.delete('/?uri=%2Fpath%2F&id=1')
assert r.status_code == 200
assert Comment(**json.loads(r.data)).deleted
assert self.get('/1.0/path/1').status_code == 200
assert self.get('/1.0/path/2').status_code == 200
assert self.get('/?uri=%2Fpath%2F&id=1').status_code == 200
assert self.get('/?uri=%2Fpath%2F&id=2').status_code == 200
def testPathVariations(self):
paths = ['/sub/path/', '/path.html', '/sub/path.html', '%2Fpath/%2F', '/']
paths = ['/sub/path/', '/path.html', '/sub/path.html', 'path', '/']
for path in paths:
assert self.post('/1.0/' + path + '/new',
data=json.dumps(comment(text='...'))).status_code == 201
assert self.post('/new?' + urllib.urlencode({'uri': path}),
data=Isso.dumps(comment(text='...'))).status_code == 201
for path in paths:
assert self.get('/1.0/' + path).status_code == 200
assert self.get('/1.0/' + path + '/1').status_code == 200
assert self.get('/?' + urllib.urlencode({'uri': path})).status_code == 200
assert self.get('/?' + urllib.urlencode({'uri': path, id: 1})).status_code == 200
def testDeleteAndCreateByDifferentUsersButSamePostId(self):
mallory = Client(self.app, Response)
mallory.post('/1.0/path/new', data=json.dumps(comment(text='Foo')))
mallory.delete('/1.0/path/1')
mallory.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='Foo')))
mallory.delete('/?uri=%2Fpath%2F&id=1')
bob = Client(self.app, Response)
bob.post('/1.0/path/new', data=json.dumps(comment(text='Bar')))
bob.post('/new?uri=%2Fpath%2F', data=Isso.dumps(comment(text='Bar')))
assert mallory.delete('/1.0/path/1').status_code == 403
assert bob.delete('/1.0/path/1').status_code == 200
assert mallory.delete('/?uri=%2Fpath%2F&id=1').status_code == 403
assert bob.delete('/?uri=%2Fpath%2F&id=1').status_code == 200

View File

@ -19,7 +19,7 @@ class TestSQLite(unittest.TestCase):
def setUp(self):
fd, self.path = tempfile.mkstemp()
self.db = SQLite(isso.Isso({'SQLITE': self.path}))
self.db = SQLite(self.path, False)
def test_get(self):
@ -91,7 +91,7 @@ class TestSQLitePending(unittest.TestCase):
def setUp(self):
fd, self.path = tempfile.mkstemp()
self.db = SQLite(isso.Isso({'SQLITE': self.path, 'MODERATION': True}))
self.db = SQLite(self.path, True)
def test_retrieve(self):