use itsdangerous

This commit is contained in:
posativ 2012-10-17 22:20:02 +02:00
parent 759b304cb5
commit 05336728f4
3 changed files with 38 additions and 12 deletions

View File

@ -1,7 +1,8 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
#
# Copyright 2012 posativ <info@posativ.org>. All rights reserved.
# Copyright 2012, Martin Zimmermann <info@posativ.org>.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
@ -17,12 +18,14 @@
# those of the authors and should not be interpreted as representing official
# policies, either expressed or implied, of posativ <info@posativ.org>.
#
# lightweight Disqus alternative
# Isso a lightweight Disqus alternative
__version__ = '0.1'
import json
from itsdangerous import URLSafeTimedSerializer
from werkzeug.routing import Map, Rule
from werkzeug.serving import run_simple
from werkzeug.wrappers import Request, Response
@ -32,7 +35,7 @@ from isso import admin, comment, db, utils
_dumps = json.dumps
setattr(json, 'dumps', lambda obj: _dumps(obj, cls=utils.IssoEncoder))
setattr(json, 'dumps', lambda obj, **kw: _dumps(obj, cls=utils.IssoEncoder, **kw))
url = lambda path, endpoint, methods: Rule(path, endpoint=endpoint, methods=methods)
@ -54,13 +57,22 @@ class Isso:
MODERATION = False
SQLITE = None
MAX_AGE = 15*60
def __init__(self, conf):
self.__dict__.update(dict((k, v) for k, v in conf.iteritems() if k.isupper()))
self.signer = URLSafeTimedSerializer(self.SECRET_KEY)
if self.SQLITE:
self.db = db.SQLite(self)
def sign(self, obj):
return self.signer.dumps(obj)
def unsign(self, obj):
return self.signer.loads(obj, max_age=self.MAX_AGE)
def dispatch(self, request, start_response):
adapter = url_map.bind_to_environ(request.environ)
try:

View File

@ -2,6 +2,8 @@
from werkzeug.wrappers import Response
from werkzeug.exceptions import abort
from itsdangerous import SignatureExpired, BadSignature
from isso import json, models
@ -12,7 +14,9 @@ def create(app, environ, request, path):
except ValueError as e:
return Response(unicode(e), 400)
return Response(json.dumps(rv), 201, content_type='application/json')
response = Response(json.dumps(rv), 201, content_type='application/json')
response.set_cookie('session', app.signer.dumps([path, rv.id]), max_age=app.MAX_AGE)
return response
def get(app, environ, request, path, id=None):
@ -25,6 +29,14 @@ def get(app, environ, request, path, id=None):
def modify(app, environ, request, path, id):
try:
rv = app.unsign(request.cookies.get('session', ''))
except (SignatureExpired, BadSignature):
return abort(403)
if not (rv[0] == '*' or rv == [path, id]):
abort(401)
if request.method == 'PUT':
try:
rv = app.db.update(path, id, models.Comment.fromjson(request.data))

View File

@ -15,15 +15,16 @@ def comment(**kw):
class TestComments(unittest.TestCase):
get = lambda self, *x, **z: Client(self.app, Response).get(*x, **z)
put = lambda self, *x, **z: Client(self.app, Response).put(*x, **z)
post = lambda self, *x, **z: Client(self.app, Response).post(*x, **z)
delete = lambda self, *x, **z: Client(self.app, Response).delete(*x, **z)
def setUp(self):
fd, self.path = tempfile.mkstemp()
self.app = Isso({'SQLITE': self.path})
self.client = Client(self.app, Response)
self.get = lambda *x, **z: self.client.get(*x, **z)
self.put = lambda *x, **z: self.client.put(*x, **z)
self.post = lambda *x, **z: self.client.post(*x, **z)
self.delete = lambda *x, **z: self.client.delete(*x, **z)
def testGet(self):
self.post('/comment/path/new', data=json.dumps(comment(text='Lorem ipsum ...')))
@ -40,7 +41,7 @@ class TestComments(unittest.TestCase):
rv = self.post('/comment/path/new', data=json.dumps(comment(text='Lorem ipsum ...')))
assert rv.status_code == 201
# XXX assert cookie
assert len(filter(lambda header: header[0] == 'Set-Cookie', rv.headers)) == 1
c = Comment.fromjson(rv.data)
@ -91,10 +92,11 @@ class TestComments(unittest.TestCase):
def testDeleteWithReference(self):
self.post('/comment/path/new', data=json.dumps(comment(text='First')))
client = Client(self.app, Response)
resp = client.post('/comment/path/new', data=json.dumps(comment(text='First')))
self.post('/comment/path/new', data=json.dumps(comment(text='Second', parent=1)))
r = self.delete('/comment/path/1')
r = client.delete('/comment/path/1')
assert r.status_code == 200
assert Comment(**json.loads(r.data)).deleted