refactor and migration to SQLAlchemy

* split db/view into model, controller and views
* use SQLAlchemy for comments, threads and preferences
This commit is contained in:
Martin Zimmermann 2014-07-21 10:33:03 +02:00
parent 7886c20aef
commit 7a3251ddfc
32 changed files with 1678 additions and 1607 deletions

View File

@ -218,7 +218,7 @@ enabled
purposes. purposes.
ratelimit ratelimit
limit to N new comments per minute. limit to N new comments per minute. Use -1 to disable rate limit.
direct-reply direct-reply
how many comments directly to the thread (prevent a simple how many comments directly to the thread (prevent a simple

View File

@ -44,16 +44,13 @@ import errno
import atexit import atexit
import logging import logging
import threading
import multiprocessing
from os.path import dirname, join from os.path import dirname, join
from argparse import ArgumentParser from argparse import ArgumentParser
from functools import partial, reduce from functools import partial, reduce
from itsdangerous import URLSafeTimedSerializer from itsdangerous import URLSafeTimedSerializer
from werkzeug.routing import Map from werkzeug.routing import Map, Rule, redirect
from werkzeug.exceptions import HTTPException, InternalServerError from werkzeug.exceptions import HTTPException, InternalServerError
from werkzeug.wsgi import SharedDataMiddleware from werkzeug.wsgi import SharedDataMiddleware
@ -70,10 +67,9 @@ try:
except ImportError: except ImportError:
uwsgi = None uwsgi = None
from isso import cache, config, db, migrate, wsgi, ext, views, queue from isso import cache, config, db, migrate, ext, queue, spam, views, wsgi
from isso.wsgi import origin, urlsplit from isso.wsgi import origin, urlsplit
from isso.utils import http, JSONRequest, html, hash from isso.utils import http, JSONRequest, html, hash
from isso.views import comments
from isso.ext.notifications import Stdout, SMTP from isso.ext.notifications import Stdout, SMTP
@ -87,63 +83,64 @@ logger = logging.getLogger("isso")
class Isso(object): class Isso(object):
def __init__(self, conf, cacheobj=None, connection=None, def __init__(self, conf, cacheobj=None, dbobj=None):
queueobj=None, shared=False):
if cacheobj is None: if cacheobj is None:
cacheobj = cache.Cache(1024) cacheobj = cache.Cache(1024)
if connection is None: if dbobj is None:
connection = db.SQLite3(":memory:") dbobj = db.Adapter("sqlite:///:memory:")
if queueobj is None:
queueobj = queue.Queue(1024)
self.conf = conf self.conf = conf
self.cache = cacheobj self.db = dbobj
self.connection = connection
self.queue = queueobj
self.db = db.Adapter(connection, conf) signer = URLSafeTimedSerializer(
self.signer = URLSafeTimedSerializer(self.db.preferences.get("session-key")) dbobj.preferences.get("session-key"))
self.markup = html.Markup( markup = html.Markup(
conf.getlist("markup", "options"), conf.getlist("markup", "options"),
conf.getlist("markup", "allowed-elements"), conf.getlist("markup", "allowed-elements"),
conf.getlist("markup", "allowed-attributes")) conf.getlist("markup", "allowed-attributes"))
self.hasher = hash.new( hasher = hash.new(
conf.get("hash", "algorithm"), conf.get("hash", "algorithm"),
conf.get("hash", "salt")) conf.get("hash", "salt"))
guard = spam.Guard(
dbobj,
conf.getboolean("guard", "enabled"),
conf.getint("guard", "ratelimit"),
conf.getint("guard", "direct-reply"),
conf.getboolean("guard", "reply-to-self"),
conf.getint("general", "max-age"))
if shared: urls = Map()
self.lock = multiprocessing.Lock() Isso.routes(
self.shared = True urls,
else: views.API(conf, cacheobj, dbobj, guard, hasher.uhash, markup, signer),
self.lock = threading.Lock() views.Info(conf))
self.shared = False
subscribers = [] self.urls = urls
for backend in conf.getlist("general", "notify"):
if backend == "stdout":
subscribers.append(Stdout(None))
elif backend in ("smtp", "SMTP"):
subscribers.append(SMTP(self))
else:
logger.warn("unknown notification backend '%s'", backend)
self.signal = ext.Signal(*subscribers) @classmethod
def routes(cls, urls, api, info):
self.urls = Map() for rule in [
Rule("/demo/", endpoint=lambda *z: redirect("/demo/index.html")),
Rule("/info", endpoint=info.show)
]:
urls.add(rule)
views.Info(self) for func, (method, rule) in [
comments.API(self, self.hasher) ('fetch', ('GET', '/')),
('new', ('POST', '/new')),
def render(self, text): ('count', ('POST', '/count')),
return self.markup.render(text) ('view', ('GET', '/id/<int:id>')),
('edit', ('PUT', '/id/<int:id>')),
def sign(self, obj): ('delete', ('DELETE', '/id/<int:id>')),
return self.signer.dumps(obj) ('moderate',('GET', '/id/<int:id>/<any(activate,delete):action>/<string:key>')),
('moderate',('POST', '/id/<int:id>/<any(activate,delete):action>/<string:key>')),
def unsign(self, obj, max_age=None): ('like', ('POST', '/id/<int:id>/like')),
return self.signer.loads(obj, max_age=max_age or self.conf.getint('general', 'max-age')) ('dislike', ('POST', '/id/<int:id>/dislike')),
]:
urls.add(Rule(rule, methods=[method], endpoint=getattr(api, func)))
def dispatch(self, request): def dispatch(self, request):
local.request = request local.request = request
@ -167,10 +164,6 @@ class Isso(object):
return InternalServerError() return InternalServerError()
else: else:
return response return response
finally:
# FIXME: always close connection but rather fix tests
if self.shared:
self.connection.close()
def wsgi_app(self, environ, start_response): def wsgi_app(self, environ, start_response):
response = self.dispatch(JSONRequest(environ)) response = self.dispatch(JSONRequest(environ))
@ -180,23 +173,24 @@ class Isso(object):
return self.wsgi_app(environ, start_response) return self.wsgi_app(environ, start_response)
def make_app(conf, shared=False): def make_app(conf):
connection = db.SQLite3(conf.get("general", "dbpath")) dbobj = db.Adapter(conf.get("general", "dbpath"))
if uwsgi is not None: if uwsgi is not None:
cacheobj = cache.uWSGICache(timeout=3600) cacheobj = cache.uWSGICache(timeout=3600)
else: else:
cacheobj = cache.SQLite3Cache(connection, threshold=2048) cacheobj = cache.SQLite3Cache(db.SQLite3("/dev/shm/isso"), threshold=2048)
jobs = queue.Jobs() jobs = queue.Jobs()
jobs.register("purge-db", db.Adapter(connection, conf), conf.getint("moderation", "purge-after")) jobs.register("db-purge", dbobj, conf.getint("moderation", "purge-after"))
queueobj = queue.Queue(1024) queueobj = queue.Queue(1024)
worker = queue.Worker(queueobj, jobs) worker = queue.Worker(queueobj, jobs)
atexit.register(worker.join, 0.25)
isso = Isso(conf, cacheobj, connection, queueobj, shared) isso = Isso(conf, cacheobj, dbobj)
atexit.register(worker.join, 0.25)
worker.start() worker.start()
# check HTTP server connection # check HTTP server connection
@ -212,7 +206,7 @@ def make_app(conf, shared=False):
wrapper = [local_manager.make_middleware] wrapper = [local_manager.make_middleware]
if isso.conf.getboolean("server", "profile"): if conf.getboolean("server", "profile"):
wrapper.append(partial(ProfilerMiddleware, wrapper.append(partial(ProfilerMiddleware,
sort_by=("cumulative", ), restrictions=("isso/(?!lib)", 10))) sort_by=("cumulative", ), restrictions=("isso/(?!lib)", 10)))
@ -223,7 +217,7 @@ def make_app(conf, shared=False):
})) }))
wrapper.append(partial(wsgi.CORSMiddleware, wrapper.append(partial(wsgi.CORSMiddleware,
origin=origin(isso.conf.getiter("general", "host")), origin=origin(conf.getiter("general", "host")),
allowed=("Origin", "Referer", "Content-Type"), allowed=("Origin", "Referer", "Content-Type"),
exposed=("X-Set-Cookie", "Date"))) exposed=("X-Set-Cookie", "Date")))

View File

View File

@ -0,0 +1,289 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import re
import time
from sqlalchemy.sql import func, select, not_
from isso.spam import Guard
from isso.utils import Bloomfilter
from isso.models import Comment
from isso.compat import string_types
class Invalid(Exception):
pass
class Denied(Exception):
pass
class Validator(object):
# from Django appearently, looks good to me *duck*
__url_re = re.compile(
r'^'
r'(https?://)?'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)'
r'$', re.IGNORECASE)
@classmethod
def isurl(cls, text):
return Validator.__url_re.match(text) is not None
@classmethod
def verify(cls, comment):
if not isinstance(comment["parent"], (int, type(None))):
return False, "parent must be an integer or null"
if not isinstance(comment["text"], string_types):
return False, "text must be a string"
if len(comment["text"].rstrip()) < 3:
return False, "text is too short (minimum length: 3)"
for key in ("author", "email", "website"):
if not isinstance(comment[key], (string_types, type(None))):
return False, "%s must be a string or null" % key
if len(comment["email"] or "") > 254:
return False, "http://tools.ietf.org/html/rfc5321#section-4.5.3"
if comment["website"]:
if len(comment["website"]) > 254:
return False, "arbitrary length limit"
if not Validator.isurl(comment["website"]):
return False, "Website not Django-conform"
return True, ""
class Controller(object):
def __init__(self, db, guard=None):
if guard is None:
guard = Guard(db, enabled=False)
self.db = db
self.guard = guard
@classmethod
def Comment(cls, rv):
return Comment(
id=rv[0], parent=rv[1], thread=rv[2], created=rv[3], modified=rv[4],
mode=rv[5], remote_addr=rv[6], text=rv[7], author=rv[8], email=rv[9],
website=rv[10], likes=rv[11], dislikes=rv[12], voters=Bloomfilter(bytearray(rv[13])))
def new(self, remote_addr, thread, obj, moderated=False):
obj.setdefault("text", "")
obj.setdefault("parent", None)
for field in ("email", "author", "website"):
obj.setdefault(field, None)
valid, reason = Validator.verify(obj)
if not valid:
raise Invalid(reason)
if self.guard.enabled:
valid, reason = self.guard.validate(remote_addr, thread, obj)
if not valid:
raise Denied(reason)
obj["id"] = None
obj["thread"] = thread.id
obj["mode"] = 2 if moderated else 1
obj["created"] = time.time()
obj["modified"] = None
obj["remote_addr"] = remote_addr
obj["likes"] = obj["dislikes"] = 0
obj["voters"] = Bloomfilter(iterable=[remote_addr])
if obj["parent"] is not None:
parent = self.get(obj["parent"])
if parent is None:
obj["parent"] = None
elif parent.parent: # normalize to max depth of 1
obj["parent"] = parent.parent
obj = Comment(**obj)
_id = self.db.engine.execute(self.db.comments.insert()
.values((obj.id, obj.parent, obj.thread, obj.created, obj.modified,
obj.mode, obj.remote_addr, obj.text, obj.author, obj.email,
obj.website, obj.likes, obj.dislikes, buffer(obj.voters.array)))
).inserted_primary_key[0]
return obj.new(id=_id)
def edit(self, _id, new):
obj = self.get(_id)
if not obj:
return None
new.setdefault("text", "")
new.setdefault("parent", None)
for field in ("email", "author", "website"):
new.setdefault(field, None)
valid, reason = Validator.verify(new)
if not valid:
raise Invalid(reason)
obj = obj.new(text=new["text"], author=new["author"], email=new["email"],
website=new["website"], modified=time.time())
self.db.engine.execute(self.db.comments.update()
.values(text=obj.text, author=obj.author, email=obj.email,
website=obj.website, modified=obj.modified)
.where(self.db.comments.c.id == _id))
return obj
def get(self, _id):
"""Retrieve comment with :param id: if any.
"""
rv = self.db.engine.execute(
self.db.comments.select(self.db.comments.c.id == _id)).fetchone()
if not rv:
return None
return Controller.Comment(rv)
def all(self, thread, mode=1, after=0, parent='any', order_by='id', limit=None):
stmt = (self.db.comments.select()
.where(self.db.comments.c.thread == thread.id)
.where(self.db.comments.c.mode.op("|")(mode) == self.db.comments.c.mode)
.where(self.db.comments.c.created > after))
if parent != 'any':
stmt = stmt.where(self.db.comments.c.parent == parent)
stmt.order_by(getattr(self.db.comments.c, order_by))
if limit:
stmt.limit(limit)
return list(map(Controller.Comment, self.db.engine.execute(stmt).fetchall()))
def vote(self, remote_addr, _id, like):
"""Vote with +1 or -1 on comment :param id:
Returns True on success (in either direction), False if :param
remote_addr: has already voted. A comment can not be voted by its
original author.
"""
obj = self.get(_id)
if obj is None:
return False
if remote_addr in obj.voters:
return False
if like:
obj = obj.new(likes=obj.likes + 1)
else:
obj = obj.new(dislikes=obj.dislikes + 1)
obj.voters.add(remote_addr)
self.db.engine.execute(self.db.comments.update()
.values(likes=obj.likes, dislikes=obj.dislikes,
voters=buffer(obj.voters.array))
.where(self.db.comments.c.id == _id))
return True
def like(self, remote_addr, _id):
return self.vote(remote_addr, _id, like=True)
def dislike(self, remote_addr, _id):
return self.vote(remote_addr, _id, like=False)
def delete(self, _id):
"""
Delete comment with :param id:
If the comment is referenced by another (not yet deleted) comment, the
comment is *not* removed, but cleared and flagged as deleted.
"""
refs = self.db.engine.execute(
self.db.comments.select(self.db.comments.c.id).where(
self.db.comments.c.parent == _id)).fetchone()
if refs is None:
self.db.engine.execute(
self.db.comments.delete(self.db.comments.c.id == _id))
self.db.engine.execute(
self.db.comments.delete()
.where(self.db.comments.c.mode.op("|")(4) == self.db.comments.c.mode)
.where(not_(self.db.comments.c.id.in_(
select([self.db.comments.c.parent]).where(
self.db.comments.c.parent != None)))))
return None
obj = self.get(_id)
obj = obj.new(text="", author=None, email=None, website=None, mode=4)
self.db.engine.execute(self.db.comments.update()
.values(text=obj.text, author=obj.email, website=obj.website, mode=obj.mode)
.where(self.db.comments.c.id == _id))
return obj
def count(self, *threads):
"""Retrieve comment count for :param threads:
"""
ids = [getattr(th, "id", None) for th in threads]
threads = dict(
self.db.engine.execute(
select([self.db.comments.c.thread, func.count(self.db.comments)])
.where(self.db.comments.c.thread.in_(ids))
.group_by(self.db.comments.c.thread)).fetchall())
return [threads.get(_id, 0) for _id in ids]
def reply_count(self, thread, mode=5, after=0):
rv = self.db.engine.execute(
select([self.db.comments.c.parent, func.count(self.db.comments)])
.where(self.db.comments.c.thread == thread.id)
# .where(self.db.comments.c.mode.op("|")(mode) == mode)
.where(self.db.comments.c.created)
.group_by(self.db.comments.c.parent)).fetchall()
return dict(rv)
def activate(self, _id):
"""Activate comment :param id: and return True on success.
"""
obj = self.get(_id)
if obj is None:
return False
i = self.db.engine.execute(self.db.comments.update()
.values(mode=1)
.where(self.db.comments.c.id == _id)
.where(self.db.comments.c.mode == 2)).rowcount
return i > 0
def prune(self, delta):
"""Remove comments still in moderation queue older than max-age.
"""
now = time.time()
self.db.engine.execute(
self.db.comments.delete()
.where(self.db.comments.c.mode == 2)
.where(now - self.db.comments.c.created > delta))
return

View File

@ -0,0 +1,36 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
from isso.models import Thread
class Controller(object):
def __init__(self, db):
self.db = db
def new(self, uri, title=None):
_id = self.db.engine.execute(
self.db.threads.insert().values(uri=uri, title=title)
).inserted_primary_key[0]
return Thread(_id, uri, title)
def get(self, uri):
rv = self.db.engine.execute(
self.db.threads.select(self.db.threads.c.uri == uri)).fetchone()
if not rv:
return None
return Thread(*rv)
def delete(self, uri):
thread = self.get(uri)
self.db.engine.execute(
self.db.comments.delete().where(self.db.comments.c.thread == thread.id))
self.db.engine.execute(
self.db.threads.delete().where(self.db.threads.c.id == thread.id))

View File

@ -4,6 +4,7 @@ from __future__ import unicode_literals
import logging import logging
import sqlite3 import sqlite3
import binascii
import operator import operator
import threading import threading
@ -11,12 +12,82 @@ import os.path
from collections import defaultdict from collections import defaultdict
from sqlalchemy import Table, Column, MetaData, create_engine
from sqlalchemy import ForeignKey, Integer, Float, String, LargeBinary
from sqlalchemy.sql import select
logger = logging.getLogger("isso") logger = logging.getLogger("isso")
from isso.db.comments import Comments
from isso.db.threads import Threads class Adapter(object):
from isso.db.spam import Guard
from isso.db.preferences import Preferences def __init__(self, db):
self.engine = create_engine(db, echo=False)
self.metadata = MetaData()
self.comments = Table("comments", self.metadata,
Column("id", Integer, primary_key=True),
Column("parent", Integer),
Column("thread", None, ForeignKey("threads.id")),
Column("created", Float),
Column("modified", Float),
Column("mode", Integer),
Column("remote_addr", String(48)), # XXX use a BigInt
Column("text", String(65535)),
Column("author", String(255)),
Column("email", String(255)),
Column("website", String(255)),
Column("likes", Integer),
Column("dislikes", Integer),
Column("voters", LargeBinary(256)))
self.threads = Table("threads", self.metadata,
Column("id", Integer, primary_key=True),
Column("uri", String(255), unique=True),
Column("title", String(255)))
preferences = Table("preferences", self.metadata,
Column("key", String(255), primary_key=True),
Column("value", String(255)))
self.metadata.create_all(self.engine)
self.preferences = Preferences(self.engine, preferences)
@property
def transaction(self):
return self.engine.begin()
class Preferences(object):
"""A simple key-value store using SQL.
"""
defaults = [
("session-key", binascii.b2a_hex(os.urandom(24))),
]
def __init__(self, engine, preferences):
self.engine = engine
self.preferences = preferences
for (key, value) in Preferences.defaults:
if self.get(key) is None:
self.set(key, value)
def get(self, key, default=None):
rv = self.engine.execute(
select([self.preferences.c.value])
.where(self.preferences.c.key == key)).fetchone()
if rv is None:
return default
return rv[0]
def set(self, key, value):
self.engine.execute(
self.preferences.insert().values(
key=key, value=value))
class Transaction(object): class Transaction(object):
@ -87,7 +158,7 @@ class SQLite3(object):
return self.connection.total_changes return self.connection.total_changes
class Adapter(object): class Foo(object):
"""DB-dependend wrapper around SQLite3. """DB-dependend wrapper around SQLite3.
Runs migration if `user_version` is older than `MAX_VERSION` and register Runs migration if `user_version` is older than `MAX_VERSION` and register

View File

@ -1,234 +0,0 @@
# -*- encoding: utf-8 -*-
import time
from isso.utils import Bloomfilter
from isso.compat import buffer
class Comments:
"""Hopefully DB-independend SQL to store, modify and retrieve all
comment-related actions. Here's a short scheme overview:
| tid (thread id) | id (comment id) | parent | ... | voters | remote_addr |
+-----------------+-----------------+--------+-----+--------+-------------+
| 1 | 1 | null | ... | BLOB | 127.0.0.0 |
| 1 | 2 | 1 | ... | BLOB | 127.0.0.0 |
+-----------------+-----------------+--------+-----+--------+-------------+
The tuple (tid, id) is unique and thus primary key.
"""
fields = ['tid', 'id', 'parent', 'created', 'modified', 'mode', 'remote_addr',
'text', 'author', 'email', 'website', 'likes', 'dislikes', 'voters']
def __init__(self, db):
self.db = db
self.db.execute([
'CREATE TABLE IF NOT EXISTS comments (',
' tid REFERENCES threads(id), id INTEGER PRIMARY KEY, parent INTEGER,',
' created FLOAT NOT NULL, modified FLOAT, mode INTEGER, remote_addr VARCHAR,',
' text VARCHAR, author VARCHAR, email VARCHAR, website VARCHAR,',
' likes INTEGER DEFAULT 0, dislikes INTEGER DEFAULT 0, voters BLOB NOT NULL);'])
def add(self, uri, c):
"""
Add new comment to DB and return a mapping of :attribute:`fields` and
database values.
"""
if c.get("parent") is not None:
ref = self.get(c["parent"])
if ref.get("parent") is not None:
c["parent"] = ref["parent"]
self.db.execute([
'INSERT INTO comments (',
' tid, parent,'
' created, modified, mode, remote_addr,',
' text, author, email, website, voters )',
'SELECT',
' threads.id, ?,',
' ?, ?, ?, ?,',
' ?, ?, ?, ?, ?',
'FROM threads WHERE threads.uri = ?;'], (
c.get('parent'),
c.get('created') or time.time(), None, c["mode"], c['remote_addr'],
c['text'], c.get('author'), c.get('email'), c.get('website'), buffer(
Bloomfilter(iterable=[c['remote_addr']]).array),
uri)
)
return dict(zip(Comments.fields, self.db.execute(
'SELECT *, MAX(c.id) FROM comments AS c INNER JOIN threads ON threads.uri = ?',
(uri, )).fetchone()))
def activate(self, id):
"""
Activate comment id if pending.
"""
self.db.execute([
'UPDATE comments SET',
' mode=1',
'WHERE id=? AND mode=2'], (id, ))
def update(self, id, data):
"""
Update comment :param:`id` with values from :param:`data` and return
updated comment.
"""
self.db.execute([
'UPDATE comments SET',
','.join(key + '=' + '?' for key in data),
'WHERE id=?;'],
list(data.values()) + [id])
return self.get(id)
def get(self, id):
"""
Search for comment :param:`id` and return a mapping of :attr:`fields`
and values.
"""
rv = self.db.execute('SELECT * FROM comments WHERE id=?', (id, )).fetchone()
if rv:
return dict(zip(Comments.fields, rv))
return None
def fetch(self, uri, mode=5, after=0, parent='any', order_by='id', limit=None):
"""
Return comments for :param:`uri` with :param:`mode`.
"""
sql = [ 'SELECT comments.* FROM comments INNER JOIN threads ON',
' threads.uri=? AND comments.tid=threads.id AND (? | comments.mode) = ?',
' AND comments.created>?']
sql_args = [uri, mode, mode, after]
if parent != 'any':
if parent is None:
sql.append('AND comments.parent IS NULL')
else:
sql.append('AND comments.parent=?')
sql_args.append(parent)
sql.append('ORDER BY ? ASC')
sql_args.append(order_by)
if limit:
sql.append('LIMIT ?')
sql_args.append(limit)
rv = self.db.execute(sql, sql_args).fetchall()
for item in rv:
yield dict(zip(Comments.fields, item))
def _remove_stale(self):
sql = ('DELETE FROM',
' comments',
'WHERE',
' mode=4 AND id NOT IN (',
' SELECT',
' parent',
' FROM',
' comments',
' WHERE parent IS NOT NULL)')
while self.db.execute(sql).rowcount:
continue
def delete(self, id):
"""
Delete a comment. There are two distinctions: a comment is referenced
by another valid comment's parent attribute or stand-a-lone. In this
case the comment can't be removed without losing depending comments.
Hence, delete removes all visible data such as text, author, email,
website sets the mode field to 4.
In the second case this comment can be safely removed without any side
effects."""
refs = self.db.execute('SELECT * FROM comments WHERE parent=?', (id, )).fetchone()
if refs is None:
self.db.execute('DELETE FROM comments WHERE id=?', (id, ))
self._remove_stale()
return None
self.db.execute('UPDATE comments SET text=? WHERE id=?', ('', id))
self.db.execute('UPDATE comments SET mode=? WHERE id=?', (4, id))
for field in ('author', 'website'):
self.db.execute('UPDATE comments SET %s=? WHERE id=?' % field, (None, id))
self._remove_stale()
return self.get(id)
def vote(self, upvote, id, remote_addr):
"""+1 a given comment. Returns the new like count (may not change because
the creater can't vote on his/her own comment and multiple votes from the
same ip address are ignored as well)."""
rv = self.db.execute(
'SELECT likes, dislikes, voters FROM comments WHERE id=?', (id, )) \
.fetchone()
if rv is None:
return None
likes, dislikes, voters = rv
if likes + dislikes >= 142:
return {'likes': likes, 'dislikes': dislikes}
bf = Bloomfilter(bytearray(voters), likes + dislikes)
if remote_addr in bf:
return {'likes': likes, 'dislikes': dislikes}
bf.add(remote_addr)
self.db.execute([
'UPDATE comments SET',
' likes = likes + 1,' if upvote else 'dislikes = dislikes + 1,',
' voters = ?'
'WHERE id=?;'], (buffer(bf.array), id))
if upvote:
return {'likes': likes + 1, 'dislikes': dislikes}
return {'likes': likes, 'dislikes': dislikes + 1}
def reply_count(self, url, mode=5, after=0):
"""
Return comment count for main thread and all reply threads for one url.
"""
sql = ['SELECT comments.parent,count(*)',
'FROM comments INNER JOIN threads ON',
' threads.uri=? AND comments.tid=threads.id AND',
' (? | comments.mode = ?) AND',
' comments.created > ?',
'GROUP BY comments.parent']
return dict(self.db.execute(sql, [url, mode, mode, after]).fetchall())
def count(self, *urls):
"""
Return comment count for one ore more urls..
"""
threads = dict(self.db.execute([
'SELECT threads.uri, COUNT(comments.id) FROM comments',
'LEFT OUTER JOIN threads ON threads.id = tid AND comments.mode = 1',
'GROUP BY threads.uri'
]).fetchall())
return [threads.get(url, 0) for url in urls]
def purge(self, delta):
"""
Remove unmoderated comments older than :param:`delta`.
"""
self.db.execute([
'DELETE FROM comments WHERE mode = 2 AND ? - created > ?;'
], (time.time(), delta))
self._remove_stale()

View File

@ -1,38 +0,0 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import os
import binascii
class Preferences:
defaults = [
("session-key", binascii.b2a_hex(os.urandom(24))),
]
def __init__(self, db):
self.db = db
self.db.execute([
'CREATE TABLE IF NOT EXISTS preferences (',
' key VARCHAR PRIMARY KEY, value VARCHAR',
');'])
for (key, value) in Preferences.defaults:
if self.get(key) is None:
self.set(key, value)
def get(self, key, default=None):
rv = self.db.execute(
'SELECT value FROM preferences WHERE key=?', (key, )).fetchone()
if rv is None:
return default
return rv[0]
def set(self, key, value):
self.db.execute(
'INSERT INTO preferences (key, value) VALUES (?, ?)', (key, value))

View File

@ -1,67 +0,0 @@
# -*- encoding: utf-8 -*-
import time
class Guard:
def __init__(self, db):
self.db = db
self.conf = db.conf.section("guard")
self.max_age = db.conf.getint("general", "max-age")
def validate(self, uri, comment):
if not self.conf.getboolean("enabled"):
return True, ""
for func in (self._limit, self._spam):
valid, reason = func(uri, comment)
if not valid:
return False, reason
return True, ""
@classmethod
def ids(cls, rv):
return [str(col[0]) for col in rv]
def _limit(self, uri, comment):
# block more than :param:`ratelimit` comments per minute
rv = self.db.execute([
'SELECT id FROM comments WHERE remote_addr = ? AND ? - created < 60;'
], (comment["remote_addr"], time.time())).fetchall()
if len(rv) >= self.conf.getint("ratelimit"):
return False, "{0}: ratelimit exceeded ({1})".format(
comment["remote_addr"], ', '.join(Guard.ids(rv)))
# block more than three comments as direct response to the post
if comment["parent"] is None:
rv = self.db.execute([
'SELECT id FROM comments WHERE',
' tid = (SELECT id FROM threads WHERE uri = ?)',
'AND remote_addr = ?',
'AND parent IS NULL;'
], (uri, comment["remote_addr"])).fetchall()
if len(rv) >= self.conf.getint("direct-reply"):
return False, "%i direct responses to %s" % (len(rv), uri)
elif self.conf.getboolean("reply-to-self") == False:
rv = self.db.execute([
'SELECT id FROM comments WHERE'
' remote_addr = ?',
'AND id = ?',
'AND ? - created < ?'
], (comment["remote_addr"], comment["parent"],
time.time(), self.max_age)).fetchall()
if len(rv) > 0:
return False, "edit time frame is still open"
return True, ""
def _spam(self, uri, comment):
return True, ""

View File

@ -1,30 +0,0 @@
# -*- encoding: utf-8 -*-
def Thread(id, uri, title):
return {
"id": id,
"uri": uri,
"title": title
}
class Threads(object):
def __init__(self, db):
self.db = db
self.db.execute([
'CREATE TABLE IF NOT EXISTS threads (',
' id INTEGER PRIMARY KEY, uri VARCHAR(256) UNIQUE, title VARCHAR(256))'])
def __contains__(self, uri):
return self.db.execute("SELECT title FROM threads WHERE uri=?", (uri, )) \
.fetchone() is not None
def __getitem__(self, uri):
return Thread(*self.db.execute("SELECT * FROM threads WHERE uri=?", (uri, )).fetchone())
def new(self, uri, title):
self.db.execute("INSERT INTO threads (uri, title) VALUES (?, ?)", (uri, title))
return self[uri]

45
isso/models/__init__.py Normal file
View File

@ -0,0 +1,45 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import operator
class Thread(object):
__slots__ = ['id', 'uri', 'title']
def __init__(self, id, uri, title=None):
self.id = id
self.uri = uri
self.title = title
class Comment(object):
__slots__ = [
'id', 'parent', 'thread', 'created', 'modified', 'mode', 'remote_addr',
'text', 'author', 'email', 'website', 'likes', 'dislikes', 'voters']
def __init__(self, **kwargs):
for attr in Comment.__slots__:
try:
setattr(self, attr, kwargs.pop(attr))
except KeyError:
raise TypeError("missing '{0}' argument".format(attr))
if kwargs:
raise TypeError("unexpected argument '{0}'".format(*kwargs.popitem()))
def new(self, **kwargs):
kw = dict(zip(Comment.__slots__, operator.attrgetter(*Comment.__slots__)(self)))
kw.update(kwargs)
return Comment(**kw)
@property
def moderated(self):
return self.mode | 2 == self.mode
@property
def deleted(self):
return self.mode | 4 == self.mode

View File

@ -209,22 +209,14 @@ class Cron(Task):
return return
class PurgeDB(Cron): from .tasks import db
def __init__(self, db, after):
super(PurgeDB, self).__init__(hours=1)
self.db = db
self.after = after
def run(self, data):
self.db.comments.purge(self.after)
class Jobs(dict): class Jobs(dict):
"""Obviously a poor man's factory""" """Obviously a poor man's factory"""
available = { available = {
"purge-db": PurgeDB "db-purge": db.Purge
} }
def __init__(self): def __init__(self):

View File

@ -0,0 +1,58 @@
# -*- encoding: utf-8 -*-
import abc
import datetime
class Task(object):
"""A task. Override :func:`run` with custom functionality. Tasks itself
may cause blocking I/O but should terminate.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def run(self, data):
return
class Cron(Task):
"""Crons are tasks which are re-scheduled after each execution."""
__metaclass__ = abc.ABCMeta
def __init__(self, *args, **kwargs):
self.timedelta = datetime.timedelta(*args, **kwargs)
def run(self, data):
return
from . import db, http, mail
class Jobs(dict):
"""Obviously a poor man's factory"""
available = {
"db-purge": db.Purge,
"http-fetch": http.Fetch,
"mail-send": mail.Send
}
def __init__(self):
super(Jobs, self).__init__()
def register(self, name, *args, **kwargs):
if name in self:
return
try:
cls = Jobs.available[name]
except KeyError:
raise RuntimeError("No such task '%s'" % name)
self[name] = cls(*args, **kwargs)
__all__ = ["Job", "Cron", "Jobs"]

16
isso/queue/tasks/db.py Normal file
View File

@ -0,0 +1,16 @@
# -*- encoding: utf-8 -*-
from isso.controllers import comments
from . import Cron
class Purge(Cron):
def __init__(self, db, after):
super(Purge, self).__init__(hours=1)
self.comments = comments.Controller(db)
self.after = after
def run(self, data):
self.comments.purge(self.after)

12
isso/queue/tasks/http.py Normal file
View File

@ -0,0 +1,12 @@
# -*- encoding: utf-8 -*-
from . import Task
class Fetch(Task):
def __init__(self, db):
self.db = db
def run(self, data):
raise NotImplemented

59
isso/queue/tasks/mail.py Normal file
View File

@ -0,0 +1,59 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import sys
import smtplib
from email.utils import formatdate
from email.header import Header
from email.mime.text import MIMEText
from . import Task
class Send(Task):
def __init__(self, section):
self.conf = section
self.client = None
def __enter__(self):
klass = (smtplib.SMTP_SSL if self.conf.get('security') == 'ssl' else smtplib.SMTP)
self.client = klass(host=self.conf.get('host'),
port=self.conf.getint('port'),
timeout=self.conf.getint('timeout'))
if self.conf.get('security') == 'starttls':
if sys.version_info >= (3, 4):
import ssl
self.client.starttls(context=ssl.create_default_context())
else:
self.client.starttls()
if self.conf.get('username') and self.conf.get('password'):
self.client.login(self.conf.get('username'),
self.conf.get('password'))
return self.client
def __exit__(self, exc_type, exc_val, exc_tb):
if self.client is not None:
self.client.quit()
self.client = None
def run(self, data):
subject = data["subject"]
body = data["body"]
from_addr = self.conf.get("from")
to_addr = self.conf.get("to")
msg = MIMEText(body, 'plain', 'utf-8')
msg['From'] = from_addr
msg['To'] = to_addr
msg['Date'] = formatdate(localtime=True)
msg['Subject'] = Header(subject, 'utf-8')
with self as con:
con.sendmail(from_addr, to_addr, msg.as_string())

View File

@ -10,5 +10,4 @@ from isso import dist, config
application = make_app( application = make_app(
config.load( config.load(
os.path.join(dist.location, "isso", "defaults.ini"), os.path.join(dist.location, "isso", "defaults.ini"),
os.environ.get('ISSO_SETTINGS')), os.environ.get('ISSO_SETTINGS')))
shared=True)

56
isso/spam/__init__.py Normal file
View File

@ -0,0 +1,56 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import time
from sqlalchemy.sql import select, func
class Guard(object):
def __init__(self, db, enabled, ratelimit=2, direct_reply=3,
reply_to_self=False, max_age=15*60):
self.db = db
self.enabled = enabled
self.ratelimit = ratelimit
self.direct_reply = direct_reply
self.reply_to_self = reply_to_self
self.max_age = max_age
def validate(self, remote_addr, thread, comment):
now = time.time()
# block more than :param:`ratelimit` comments per minute
count = self.db.engine.execute(
select([func.count(self.db.comments)])
.where(self.db.comments.c.remote_addr == remote_addr)
.where(now - self.db.comments.c.created < 60)).fetchone()[0]
if count >= self.ratelimit > -1:
return False, "%s: ratelimit exceeded" % remote_addr
# block more than three comments as direct response to the post
if comment["parent"] is None:
count = self.db.engine.execute(
select([func.count(self.db.comments)])
.where(self.db.comments.c.thread == thread.id)
.where(self.db.comments.c.remote_addr == remote_addr)
.where(self.db.comments.c.parent == None)).fetchone()[0]
if count >= self.direct_reply:
return False, "only {0} direct response(s) to {1}".format(
count, thread.uri)
elif not self.reply_to_self:
count = self.db.engine.execute(
select([func.count(self.db.comments)])
.where(self.db.comments.c.remote_addr == remote_addr)
.where(now - self.db.comments.c.created < self.max_age)).fetchone()[0]
if count > 0:
return False, "editing frame is still open"
return True, ""

View File

@ -0,0 +1,236 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import time
import unittest
from isso import db
from isso.models import Thread
from isso.controllers import comments
IP = "127.0.0.1"
TH = Thread(1, "/")
class TestValidator(unittest.TestCase):
def test_validator(self):
default = dict(zip(["parent", "text", "author", "email", "website"], [None]*5))
new = lambda **z: dict(default, **z)
verify = lambda data: comments.Validator.verify(data)[0]
# invalid types
self.assertFalse(verify(new(text=None)))
self.assertFalse(verify(new(parent="xxx")))
for key in ("author", "website", "email"):
self.assertFalse(verify(new(text="...", **{key: 3.14})))
# text too short and/or blank
for text in ("", "\n\n\n"):
self.assertFalse(verify(new(text=text)))
# email/website length
self.assertTrue(verify(new(text="...", email="*"*254)))
self.assertTrue(verify(new(text="...", website="google.de/" + "a"*128)))
self.assertFalse(verify(new(text="...", email="*"*1024)))
self.assertFalse(verify(new(text="...", website="google.de/" + "*"*1024)))
# invalid url
self.assertFalse(verify(new(text="...", website="spam")))
def test_isurl(self):
isurl = comments.Validator.isurl
# valid website url
self.assertTrue(isurl("example.tld"))
self.assertTrue(isurl("http://example.tld"))
self.assertTrue(isurl("https://example.tld"))
self.assertTrue(isurl("https://example.tld:1337/"))
self.assertTrue(isurl("https://example.tld:1337/foobar"))
self.assertTrue(isurl("https://example.tld:1337/foobar?p=1#isso-thread"))
self.assertFalse(isurl("ftp://example.tld/"))
self.assertFalse(isurl("tel:+1234567890"))
self.assertFalse(isurl("+1234567890"))
self.assertFalse(isurl("spam"))
class TestController(unittest.TestCase):
def setUp(self):
self.controller = comments.Controller(db.Adapter("sqlite:///:memory:"))
def test_new(self):
obj = self.controller.new(IP, TH, dict(text="Здравствуй, мир!"))
self.assertEqual(obj.id, 1)
self.assertEqual(obj.text, "Здравствуй, мир!")
self.assertLess(obj.created, time.time())
self.assertIn(IP, obj.voters)
self.assertFalse(obj.moderated)
self.assertFalse(obj.deleted)
sec = self.controller.new(IP, TH, dict(text="Ohai"), moderated=True)
self.assertEqual(sec.id, 2)
self.assertEqual(sec.text, "Ohai")
self.assertTrue(sec.moderated)
self.assertFalse(sec.deleted)
self.assertRaises(comments.Invalid, self.controller.new, IP, TH, dict())
def test_create_invalid_parent(self):
a = self.controller.new(IP, TH, dict(text="..."))
b = self.controller.new(IP, TH, dict(text="...", parent=a.id))
c = self.controller.new(IP, TH, dict(text="...", parent=b.id))
# automatic insertion to a maximum nesting level of 1
self.assertEqual(c.parent, b.parent)
# remove invalid reference
d = self.controller.new(IP, TH, dict(text="...", parent=42))
self.assertIsNone(d.parent)
def test_edit(self):
a = self.controller.new(IP, TH, dict(text="Hello!"))
z = self.controller.new(IP, TH, dict(text="Dummy"))
a = self.controller.edit(a.id, dict(
text="Hello, World!", author="Hans", email="123", website="http://example.tld/"))
b = self.controller.get(a.id)
self.assertEqual(a.text, "Hello, World!")
self.assertEqual(a.author, "Hans")
self.assertEqual(a.email, "123")
self.assertEqual(a.website, "http://example.tld/")
for attr in ("text", "author", "email", "website"):
self.assertEqual(getattr(a, attr), getattr(b, attr))
self.assertEqual(self.controller.get(z.id).text, z.text)
# edit invalid data
self.assertRaises(comments.Invalid, self.controller.edit, a.id, dict(text=""))
# edit invalid comment
self.assertIsNone(self.controller.edit(23, dict(text="...")))
def test_get(self):
obj = self.controller.get(23)
self.assertIsNone(obj)
self.controller.new(IP, TH, dict(text="..."))
obj = self.controller.get(1)
self.assertEqual(obj.id, 1)
self.assertEqual(obj.thread, 1)
def test_all(self):
foo, bar = Thread(0, "/foo"), Thread(1, "/bar")
args = ("One", "Two", "three")
for text in args:
self.controller.new(IP, foo, dict(text=text))
# control group
self.controller.new(IP, bar, dict(text="..."))
rv = self.controller.all(foo)
self.assertEqual(len(rv), 3)
for text, obj in zip(args, rv):
self.assertEqual(text, obj.text)
def test_delete(self):
for _ in range(3):
self.controller.new(IP, TH, dict(text="..."))
for n in range(3):
self.controller.delete(n + 1)
self.assertIsNone(self.controller.get(n+1))
# delete invalid comment
self.assertIsNone(self.controller.delete(23))
def test_delete_nested(self):
p = self.controller.new(IP, TH, dict(text="parent"))
c1 = self.controller.new(IP, TH, dict(text="child", parent=p.id))
c2 = self.controller.new(IP, TH, dict(text="child", parent=p.id))
self.controller.delete(p.id)
p = self.controller.get(p.id)
self.assertIsNotNone(p)
self.assertTrue(p.deleted)
self.assertEqual(p.text, "")
self.controller.delete(c1.id)
self.assertIsNone(self.controller.get(c1.id))
self.assertIsNotNone(self.controller.get(c2.id))
self.assertIsNotNone(self.controller.get(p.id))
self.controller.delete(c2.id)
self.assertIsNone(self.controller.get(p.id))
def test_count(self):
threads = [Thread(0, "a"), None, Thread(1, "c")]
counter = [1, 0, 2]
self.assertEqual(self.controller.count(*threads), [0, 0, 0])
for thread, count in zip(threads, counter):
if thread is None:
continue
for _ in range(count):
self.controller.new(IP, thread, dict(text="..."))
self.assertEqual(self.controller.count(*threads), counter)
def test_votes(self):
author = "127.0.0.1"
foo, bar = "1.2.3.4", "1.3.3.7"
c = self.controller.new(author, TH, dict(text="..."))
self.assertEqual(c.likes, 0)
self.assertEqual(c.dislikes, 0)
# author can not vote on own comment
self.assertFalse(self.controller.like(author, c.id))
# but others can (at least once)
self.assertTrue(self.controller.like(foo, c.id))
self.assertTrue(self.controller.dislike(bar, c.id))
self.assertFalse(self.controller.like(foo, c.id))
self.assertFalse(self.controller.like(bar, c.id))
c = self.controller.get(c.id)
self.assertEqual(c.likes, 1)
self.assertEqual(c.dislikes, 1)
# vote a non-existent comment
self.assertFalse(self.controller.like(foo, 23))
def test_activation(self):
c = self.controller.new(IP, TH, dict(text="..."), moderated=True)
self.assertEqual(len(self.controller.all(TH)), 0)
self.assertTrue(self.controller.activate(c.id))
self.assertFalse(self.controller.activate(c.id))
self.assertEqual(len(self.controller.all(TH)), 1)
# invalid comment returns False
self.assertFalse(self.controller.activate(23))
def test_prune(self):
c = self.controller.new(IP, TH, dict(text="..."), moderated=True)
self.controller.prune(42)
self.assertIsNotNone(self.controller.get(c.id))
self.controller.prune(0)
self.assertIsNone(self.controller.get(c.id))

View File

@ -0,0 +1,56 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import unittest
from isso import db
from isso.controllers import comments, threads
class TestController(unittest.TestCase):
def setUp(self):
_db = db.Adapter("sqlite:///:memory:")
self.comments = comments.Controller(_db)
self.threads = threads.Controller(_db)
def test_new(self):
thread = self.threads.new("/", None)
self.assertEqual(thread.id, 1)
self.assertEqual(thread.uri, "/")
self.assertEqual(thread.title, None)
def test_get(self):
self.assertIsNone(self.threads.get("/"))
th = self.threads.get(self.threads.new("/", None).uri)
self.assertIsNotNone(th)
self.assertEqual(th.id, 1)
self.assertEqual(th.uri, "/")
self.assertEqual(th.title, None)
def test_delete(self):
th = self.threads.new("/", None)
self.threads.delete(th.uri)
self.assertIsNone(self.threads.get(th.uri))
def test_delete_removes_comments(self):
th = self.threads.new("/", None)
cg = self.threads.new("/control/group", None)
for _ in range(3):
self.comments.new("127.0.0.1", th, dict(text="..."))
self.comments.new("127.0.0.1", cg, dict(text="..."))
self.assertEqual(self.comments.count(th), [3])
self.assertEqual(self.comments.count(cg), [3])
# now remove the thread
self.threads.delete(th.uri)
self.assertEqual(self.comments.count(th), [0])
self.assertEqual(self.comments.count(cg), [3])

View File

@ -1,41 +0,0 @@
# -*- encoding: utf-8 -*-
import json
from werkzeug.test import Client
class FakeIP(object):
def __init__(self, app, ip):
self.app = app
self.ip = ip
def __call__(self, environ, start_response):
environ['REMOTE_ADDR'] = self.ip
return self.app(environ, start_response)
class JSONClient(Client):
def open(self, *args, **kwargs):
kwargs.setdefault('content_type', 'application/json')
return super(JSONClient, self).open(*args, **kwargs)
class Dummy:
status = 200
def __enter__(self):
return self
def read(self):
return ''
def __exit__(self, exc_type, exc_val, exc_tb):
pass
curl = lambda method, host, path: Dummy()
loads = lambda data: json.loads(data.decode('utf-8'))

View File

@ -1,416 +0,0 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import os
import json
import unittest
try:
from urllib.parse import urlencode
except ImportError:
from urllib import urlencode
from werkzeug.wrappers import Response
from isso import Isso, config, dist
from isso.utils import http
from isso.views import comments
from isso.compat import iteritems
from fixtures import curl, loads, FakeIP, JSONClient
http.curl = curl
class TestComments(unittest.TestCase):
def setUp(self):
conf = config.load(os.path.join(dist.location, "isso", "defaults.ini"))
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
self.app = Isso(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = JSONClient(self.app, Response)
self.get = self.client.get
self.put = self.client.put
self.post = self.client.post
self.delete = self.client.delete
def testGet(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'}))
r = self.get('/id/1')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(rv['id'], 1)
self.assertEqual(rv['text'], '<p>Lorem ipsum ...</p>')
def testCreate(self):
rv = self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'}))
self.assertEqual(rv.status_code, 201)
self.assertIn("Set-Cookie", rv.headers)
rv = loads(rv.data)
self.assertEqual(rv["mode"], 1)
self.assertEqual(rv["text"], '<p>Lorem ipsum ...</p>')
def textCreateWithNonAsciiText(self):
rv = self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Здравствуй, мир!'}))
self.assertEqual(rv.status_code, 201)
rv = loads(rv.data)
self.assertEqual(rv["mode"], 1)
self.assertEqual(rv["text"], '<p>Здравствуй, мир!</p>')
def testCreateMultiple(self):
a = self.post('/new?uri=test', data=json.dumps({'text': '...'}))
b = self.post('/new?uri=test', data=json.dumps({'text': '...'}))
c = self.post('/new?uri=test', data=json.dumps({'text': '...'}))
self.assertEqual(loads(a.data)["id"], 1)
self.assertEqual(loads(b.data)["id"], 2)
self.assertEqual(loads(c.data)["id"], 3)
def testCreateAndGetMultiple(self):
for i in range(20):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Spam'}))
r = self.get('/?uri=%2Fpath%2F')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(len(rv['replies']), 20)
def testCreateInvalidParent(self):
self.post('/new?uri=test', data=json.dumps({'text': '...'}))
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1}))
invalid = self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 2}))
self.assertEqual(loads(invalid.data)["parent"], 1)
def testVerifyFields(self):
verify = lambda comment: comments.API.verify(comment)[0]
# text is missing
self.assertFalse(verify({}))
# invalid types
self.assertFalse(verify({"text": "...", "parent": "xxx"}))
for key in ("author", "website", "email"):
self.assertFalse(verify({"text": True, key: 3.14}))
# text too short and/or blank
for text in ("", "\n\n\n"):
self.assertFalse(verify({"text": text}))
# email/website length
self.assertTrue(verify({"text": "...", "email": "*"*254}))
self.assertTrue(verify({"text": "...", "website": "google.de/" + "a"*128}))
self.assertFalse(verify({"text": "...", "email": "*"*1024}))
self.assertFalse(verify({"text": "...", "website": "google.de/" + "*"*1024}))
# valid website url
self.assertTrue(comments.isurl("example.tld"))
self.assertTrue(comments.isurl("http://example.tld"))
self.assertTrue(comments.isurl("https://example.tld"))
self.assertTrue(comments.isurl("https://example.tld:1337/"))
self.assertTrue(comments.isurl("https://example.tld:1337/foobar"))
self.assertTrue(comments.isurl("https://example.tld:1337/foobar?p=1#isso-thread"))
self.assertFalse(comments.isurl("ftp://example.tld/"))
self.assertFalse(comments.isurl("tel:+1234567890"))
self.assertFalse(comments.isurl("+1234567890"))
self.assertFalse(comments.isurl("spam"))
def testGetInvalid(self):
self.assertEqual(self.get('/?uri=%2Fpath%2F&id=123').status_code, 404)
self.assertEqual(self.get('/?uri=%2Fpath%2Fspam%2F&id=123').status_code, 404)
self.assertEqual(self.get('/?uri=?uri=%foo%2F').status_code, 404)
def testGetLimited(self):
for i in range(20):
self.post('/new?uri=test', data=json.dumps({'text': '...'}))
r = self.get('/?uri=test&limit=10')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(len(rv['replies']), 10)
def testGetNested(self):
self.post('/new?uri=test', data=json.dumps({'text': '...'}))
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1}))
r = self.get('/?uri=test&parent=1')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(len(rv['replies']), 1)
def testGetLimitedNested(self):
self.post('/new?uri=test', data=json.dumps({'text': '...'}))
for i in range(20):
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1}))
r = self.get('/?uri=test&parent=1&limit=10')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(len(rv['replies']), 10)
def testUpdate(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'}))
self.put('/id/1', data=json.dumps({
'text': 'Hello World', 'author': 'me', 'website': 'http://example.com/'}))
r = self.get('/id/1?plain=1')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(rv['text'], 'Hello World')
self.assertEqual(rv['author'], 'me')
self.assertEqual(rv['website'], 'http://example.com/')
self.assertIn('modified', rv)
def testDelete(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'}))
r = self.delete('/id/1')
self.assertEqual(r.status_code, 200)
self.assertEqual(loads(r.data), None)
self.assertEqual(self.get('/id/1').status_code, 404)
def testDeleteWithReference(self):
client = JSONClient(self.app, Response)
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First', 'parent': 1}))
r = client.delete('/id/1')
self.assertEqual(r.status_code, 200)
self.assertEqual(loads(r.data)['mode'], 4)
self.assertIn('/path/', self.app.db.threads)
data = loads(client.get("/?uri=%2Fpath%2F").data)
self.assertEqual(data["total_replies"], 1)
self.assertEqual(self.get('/?uri=%2Fpath%2F&id=1').status_code, 200)
self.assertEqual(self.get('/?uri=%2Fpath%2F&id=2').status_code, 200)
r = client.delete('/id/2')
self.assertEqual(self.get('/?uri=%2Fpath%2F').status_code, 404)
self.assertNotIn('/path/', self.app.db.threads)
def testDeleteWithMultipleReferences(self):
"""
[ comment 1 ]
|
--- [ comment 2, ref 1 ]
|
--- [ comment 3, ref 1 ]
[ comment 4 ]
"""
client = JSONClient(self.app, Response)
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Second', 'parent': 1}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Third', 'parent': 1}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Last'}))
client.delete('/id/1')
self.assertEqual(self.get('/?uri=%2Fpath%2F').status_code, 200)
client.delete('/id/2')
self.assertEqual(self.get('/?uri=%2Fpath%2F').status_code, 200)
client.delete('/id/3')
self.assertEqual(self.get('/?uri=%2Fpath%2F').status_code, 200)
client.delete('/id/4')
self.assertEqual(self.get('/?uri=%2Fpath%2F').status_code, 404)
def testPathVariations(self):
paths = ['/sub/path/', '/path.html', '/sub/path.html', 'path', '/']
for path in paths:
self.assertEqual(self.post('/new?' + urlencode({'uri': path}),
data=json.dumps({'text': '...'})).status_code, 201)
for i, path in enumerate(paths):
self.assertEqual(self.get('/?' + urlencode({'uri': path})).status_code, 200)
self.assertEqual(self.get('/id/%i' % (i + 1)).status_code, 200)
def testDeleteAndCreateByDifferentUsersButSamePostId(self):
mallory = JSONClient(self.app, Response)
mallory.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Foo'}))
mallory.delete('/id/1')
bob = JSONClient(self.app, Response)
bob.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Bar'}))
self.assertEqual(mallory.delete('/id/1').status_code, 403)
self.assertEqual(bob.delete('/id/1').status_code, 200)
def testHash(self):
a = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Aaa"}))
b = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Bbb"}))
c = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Ccc", "email": "..."}))
a = loads(a.data)
b = loads(b.data)
c = loads(c.data)
self.assertNotEqual(a['hash'], '192.168.1.1')
self.assertEqual(a['hash'], b['hash'])
self.assertNotEqual(a['hash'], c['hash'])
def testVisibleFields(self):
rv = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "...", "invalid": "field"}))
self.assertEqual(rv.status_code, 201)
rv = loads(rv.data)
for key in comments.API.FIELDS:
rv.pop(key)
self.assertListEqual(list(rv.keys()), [])
def testCounts(self):
self.assertEqual(self.get('/count?uri=%2Fpath%2F').status_code, 404)
self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "..."}))
rv = self.get('/count?uri=%2Fpath%2F')
self.assertEqual(rv.status_code, 200)
self.assertEqual(loads(rv.data), 1)
for x in range(3):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "..."}))
rv = self.get('/count?uri=%2Fpath%2F')
self.assertEqual(rv.status_code, 200)
self.assertEqual(loads(rv.data), 4)
for x in range(4):
self.delete('/id/%i' % (x + 1))
rv = self.get('/count?uri=%2Fpath%2F')
self.assertEqual(rv.status_code, 404)
def testMultipleCounts(self):
expected = {'a': 1, 'b': 2, 'c': 0}
for uri, count in iteritems(expected):
for _ in range(count):
self.post('/new?uri=%s' % uri, data=json.dumps({"text": "..."}))
rv = self.post('/count', data=json.dumps(list(expected.keys())))
self.assertEqual(loads(rv.data), list(expected.values()))
def testModify(self):
self.post('/new?uri=test', data=json.dumps({"text": "Tpyo"}))
self.put('/id/1', data=json.dumps({"text": "Tyop"}))
self.assertEqual(loads(self.get('/id/1').data)["text"], "<p>Tyop</p>")
self.put('/id/1', data=json.dumps({"text": "Typo"}))
self.assertEqual(loads(self.get('/id/1').data)["text"], "<p>Typo</p>")
def testDeleteCommentRemovesThread(self):
self.client.post('/new?uri=%2F', data=json.dumps({"text": "..."}))
self.assertIn('/', self.app.db.threads)
self.client.delete('/id/1')
self.assertNotIn('/', self.app.db.threads)
def testCSRF(self):
js = "application/json"
form = "application/x-www-form-urlencoded"
self.post('/new?uri=%2F', data=json.dumps({"text": "..."}))
# no header is fine (default for XHR)
self.assertEqual(self.post('/id/1/dislike', content_type="").status_code, 200)
# x-www-form-urlencoded is definitely not RESTful
self.assertEqual(self.post('/id/1/dislike', content_type=form).status_code, 403)
self.assertEqual(self.post('/new?uri=%2F', data=json.dumps({"text": "..."}),
content_type=form).status_code, 403)
# just for the record
self.assertEqual(self.post('/id/1/dislike', content_type=js).status_code, 200)
class TestModeratedComments(unittest.TestCase):
def setUp(self):
conf = config.load(os.path.join(dist.location, "isso", "defaults.ini"))
conf.set("moderation", "enabled", "true")
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
self.app = Isso(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = JSONClient(self.app, Response)
def testAddComment(self):
rv = self.client.post('/new?uri=test', data=json.dumps({"text": "..."}))
self.assertEqual(rv.status_code, 202)
self.assertEqual(self.client.get('/id/1').status_code, 200)
self.assertEqual(self.client.get('/?uri=test').status_code, 404)
self.app.db.comments.activate(1)
self.assertEqual(self.client.get('/?uri=test').status_code, 200)
class TestPurgeComments(unittest.TestCase):
def setUp(self):
conf = config.load(os.path.join(dist.location, "isso", "defaults.ini"))
conf.set("moderation", "enabled", "true")
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
self.app = Isso(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = JSONClient(self.app, Response)
def testPurgeDoesNoHarm(self):
self.client.post('/new?uri=test', data=json.dumps({"text": "..."}))
self.app.db.comments.activate(1)
self.app.db.comments.purge(0)
self.assertEqual(self.client.get('/?uri=test').status_code, 200)
def testPurgeWorks(self):
self.client.post('/new?uri=test', data=json.dumps({"text": "..."}))
self.app.db.comments.purge(0)
self.assertEqual(self.client.get('/id/1').status_code, 404)
self.client.post('/new?uri=test', data=json.dumps({"text": "..."}))
self.app.db.comments.purge(3600)
self.assertEqual(self.client.get('/id/1').status_code, 200)

View File

@ -1,107 +0,0 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import unittest
import os
import json
from werkzeug import __version__
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, config, db, dist
from isso.utils import http
from fixtures import curl, FakeIP
http.curl = curl
if __version__.startswith("0.8"):
class Response(Response):
def get_data(self, as_text=False):
return self.data.decode("utf-8")
class TestGuard(unittest.TestCase):
data = json.dumps({"text": "Lorem ipsum."})
def setUp(self):
self.connection = db.SQLite3(":memory:")
def makeClient(self, ip, ratelimit=2, direct_reply=3, self_reply=False):
conf = config.load(os.path.join(dist.location, "isso", "defaults.ini"))
conf.set("hash", "algorithm", "none")
conf.set("guard", "enabled", "true")
conf.set("guard", "ratelimit", str(ratelimit))
conf.set("guard", "direct-reply", str(direct_reply))
conf.set("guard", "reply-to-self", "1" if self_reply else "0")
app = Isso(conf, connection=self.connection)
app.wsgi_app = FakeIP(app.wsgi_app, ip)
return Client(app, Response)
def testRateLimit(self):
bob = self.makeClient("127.0.0.1", 2)
for i in range(2):
rv = bob.post('/new?uri=test', data=self.data)
self.assertEqual(rv.status_code, 201)
rv = bob.post('/new?uri=test', data=self.data)
self.assertEqual(rv.status_code, 403)
self.assertIn("ratelimit exceeded", rv.get_data(as_text=True))
alice = self.makeClient("1.2.3.4", 2)
for i in range(2):
self.assertEqual(alice.post("/new?uri=test", data=self.data).status_code, 201)
bob.application.db.execute([
"UPDATE comments SET",
" created = created - 60",
"WHERE remote_addr = '127.0.0.0'"
])
self.assertEqual(bob.post("/new?uri=test", data=self.data).status_code, 201)
def testDirectReply(self):
client = self.makeClient("127.0.0.1", 15, 3)
for url in ("foo", "bar", "baz", "spam"):
for _ in range(3):
rv = client.post("/new?uri=%s" % url, data=self.data)
self.assertEqual(rv.status_code, 201)
for url in ("foo", "bar", "baz", "spam"):
rv = client.post("/new?uri=%s" % url, data=self.data)
self.assertEqual(rv.status_code, 403)
self.assertIn("direct responses to", rv.get_data(as_text=True))
def testSelfReply(self):
payload = lambda id: json.dumps({"text": "...", "parent": id})
client = self.makeClient("127.0.0.1", self_reply=False)
self.assertEqual(client.post("/new?uri=test", data=self.data).status_code, 201)
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 403)
client.application.db.execute([
"UPDATE comments SET",
" created = created - ?",
"WHERE id = 1"
], (client.application.conf.getint("general", "max-age"), ))
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 201)
client = self.makeClient("128.0.0.1", ratelimit=3, self_reply=False)
self.assertEqual(client.post("/new?uri=test", data=self.data).status_code, 201)
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 201)
self.assertEqual(client.post("/new?uri=test", data=payload(2)).status_code, 201)

92
isso/tests/test_spam.py Normal file
View File

@ -0,0 +1,92 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import unittest
from isso import db, spam
from isso.models import Thread
from isso.controllers import comments
bob = "127.0.0.1"
alice = "127.0.0.2"
comment = dict(text="...")
class TestGuard(unittest.TestCase):
def setUp(self):
self.db = db.Adapter("sqlite:///:memory:")
def test_ratelimit(self):
thread = Thread(0, "/")
guard = spam.Guard(self.db, enabled=True, ratelimit=2)
controller = comments.Controller(self.db, guard)
for _ in range(2):
controller.new(bob, thread, comment)
try:
controller.new(bob, thread, comment)
except Exception as ex:
self.assertIsInstance(ex, comments.Denied)
self.assertIn("ratelimit exceeded", ex.message)
else:
self.assertTrue(False)
# check if Alice can still comment
for _ in range(2):
controller.new(alice, thread, comment)
# 60 seconds are gone now
self.db.engine.execute(
self.db.comments.update().values(
created=self.db.comments.c.created - 60
).where(self.db.comments.c.remote_addr == bob))
controller.new(bob, thread, comment)
def test_direct_reply(self):
threads = [Thread(0, "/foo"), Thread(1, "/bar")]
guard = spam.Guard(self.db, enabled=True, ratelimit=-1, direct_reply=3)
controller = comments.Controller(self.db, guard)
for thread in threads:
for _ in range(3):
controller.new(bob, thread, comment)
for thread in threads:
try:
controller.new(bob, thread, comment)
except Exception as ex:
self.assertIsInstance(ex, comments.Denied)
self.assertIn("direct response", ex.message)
else:
self.assertTrue(False)
def test_self_reply(self):
thread = Thread(0, "/")
guard = spam.Guard(self.db, enabled=True, reply_to_self=False)
controller = comments.Controller(self.db, guard)
ref = controller.new(bob, thread, comment).id
try:
controller.new(bob, thread, dict(text="...", parent=ref))
except Exception as ex:
self.assertIsInstance(ex, comments.Denied)
self.assertIn("editing frame is still open", ex.message)
else:
self.assertTrue(False)
# fast-forward `max-age` seconds
self.db.engine.execute(
self.db.comments.update().values(
created=self.db.comments.c.created - guard.max_age
).where(self.db.comments.c.id == ref))
controller.new(bob, thread, dict(text="...", parent=ref))

View File

@ -1,72 +0,0 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import unittest
import os
import json
from werkzeug.wrappers import Response
from isso import Isso, config, dist
from isso.utils import http
from fixtures import curl, loads, FakeIP, JSONClient
http.curl = curl
class TestVote(unittest.TestCase):
def setUp(self):
conf = config.load(os.path.join(dist.location, "isso", "defaults.ini"))
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
self.app = Isso(conf)
def makeClient(self, ip):
return JSONClient(FakeIP(self.app.wsgi_app, ip), Response)
def testZeroLikes(self):
rv = self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."}))
self.assertEqual(loads(rv.data)['likes'], 0)
self.assertEqual(loads(rv.data)['dislikes'], 0)
def testSingleLike(self):
self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."}))
rv = self.makeClient("0.0.0.0").post("/id/1/like")
self.assertEqual(rv.status_code, 200)
self.assertEqual(loads(rv.data)["likes"], 1)
def testSelfLike(self):
bob = self.makeClient("127.0.0.1")
bob.post("/new?uri=test", data=json.dumps({"text": "..."}))
rv = bob.post('/id/1/like')
self.assertEqual(rv.status_code, 200)
self.assertEqual(loads(rv.data)["likes"], 0)
def testMultipleLikes(self):
self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."}))
for num in range(15):
rv = self.makeClient("1.2.%i.0" % num).post('/id/1/like')
self.assertEqual(rv.status_code, 200)
self.assertEqual(loads(rv.data)["likes"], num + 1)
def testVoteOnNonexistentComment(self):
rv = self.makeClient("1.2.3.4").post('/id/1/like')
self.assertEqual(rv.status_code, 200)
self.assertEqual(loads(rv.data), None)
def testDislike(self):
self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."}))
rv = self.makeClient("1.2.3.4").post('/id/1/dislike')
self.assertEqual(rv.status_code, 200)
self.assertEqual(loads(rv.data)['likes'], 0)
self.assertEqual(loads(rv.data)['dislikes'], 1)

View File

@ -0,0 +1,175 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import os
import json
import unittest
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, config, dist
from isso.utils import http
class FakeIP(object):
def __init__(self, app, ip):
self.app = app
self.ip = ip
def __call__(self, environ, start_response):
environ['REMOTE_ADDR'] = self.ip
return self.app(environ, start_response)
class JSONClient(Client):
def open(self, *args, **kwargs):
kwargs.setdefault('content_type', 'application/json')
return super(JSONClient, self).open(*args, **kwargs)
class Dummy(object):
status = 200
def __enter__(self):
return self
def read(self):
return ''
def __exit__(self, exc_type, exc_val, exc_tb):
pass
curl = lambda method, host, path: Dummy()
loads = lambda data: json.loads(data.decode('utf-8'))
http.curl = curl
class TestComments(unittest.TestCase):
def setUp(self):
conf = config.load(os.path.join(dist.location, "isso", "defaults.ini"))
conf.set("general", "dbpath", "sqlite:///:memory:")
conf.set("guard", "enabled", "off")
conf.set("hash", "algorithm", "none")
self.app = Isso(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = JSONClient(self.app, Response)
self.get = self.client.get
self.put = self.client.put
self.post = self.client.post
self.delete = self.client.delete
# done (except Markup)
def testGet(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'}))
r = self.get('/id/1')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(rv['id'], 1)
self.assertEqual(rv['text'], '<p>Lorem ipsum ...</p>')
# done (except Set-Cookie)
def testCreate(self):
rv = self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'}))
self.assertEqual(rv.status_code, 201)
self.assertIn("Set-Cookie", rv.headers)
self.assertIn("X-Set-Cookie", rv.headers)
rv = loads(rv.data)
self.assertEqual(rv["mode"], 1)
self.assertEqual(rv["text"], '<p>Lorem ipsum ...</p>')
def testGetLimited(self):
for i in range(20):
self.post('/new?uri=test', data=json.dumps({'text': '...'}))
r = self.get('/?uri=test&limit=10')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(len(rv['replies']), 10)
def testGetNested(self):
self.post('/new?uri=test', data=json.dumps({'text': '...'}))
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1}))
r = self.get('/?uri=test&parent=1')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(len(rv['replies']), 1)
def testGetLimitedNested(self):
self.post('/new?uri=test', data=json.dumps({'text': '...'}))
for i in range(20):
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1}))
r = self.get('/?uri=test&parent=1&limit=10')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(len(rv['replies']), 10)
# done (except plain)
def testUpdate(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'}))
self.put('/id/1', data=json.dumps({
'text': 'Hello World', 'author': 'me', 'website': 'http://example.com/'}))
r = self.get('/id/1?plain=1')
self.assertEqual(r.status_code, 200)
rv = loads(r.data)
self.assertEqual(rv['text'], 'Hello World')
self.assertEqual(rv['author'], 'me')
self.assertEqual(rv['website'], 'http://example.com/')
self.assertIn('modified', rv)
def testDeleteAndCreateByDifferentUsersButSamePostId(self):
mallory = JSONClient(self.app, Response)
mallory.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Foo'}))
mallory.delete('/id/1')
bob = JSONClient(self.app, Response)
bob.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Bar'}))
self.assertEqual(mallory.delete('/id/1').status_code, 403)
self.assertEqual(bob.delete('/id/1').status_code, 200)
def testCSRF(self):
js = "application/json"
form = "application/x-www-form-urlencoded"
self.post('/new?uri=%2F', data=json.dumps({"text": "..."}))
# no header is fine (default for XHR)
self.assertEqual(self.post('/id/1/dislike', content_type="").status_code, 200)
# x-www-form-urlencoded is definitely not RESTful
self.assertEqual(self.post('/id/1/dislike', content_type=form).status_code, 403)
self.assertEqual(self.post('/new?uri=%2F', data=json.dumps({"text": "..."}),
content_type=form).status_code, 403)
# just for the record
self.assertEqual(self.post('/id/1/dislike', content_type=js).status_code, 200)

View File

@ -53,8 +53,6 @@ class Bloomfilter(object):
>>> for i in range(256): >>> for i in range(256):
... bf.add("1.2.%i.4" % i) ... bf.add("1.2.%i.4" % i)
... ...
>>> len(bf)
256
>>> "1.2.3.4" in bf >>> "1.2.3.4" in bf
True True
>>> "127.0.0.1" in bf >>> "127.0.0.1" in bf
@ -64,9 +62,8 @@ class Bloomfilter(object):
http://code.activestate.com/recipes/577684-bloom-filter/ http://code.activestate.com/recipes/577684-bloom-filter/
""" """
def __init__(self, array=None, elements=0, iterable=()): def __init__(self, array=None, iterable=()):
self.array = array or bytearray(256) self.array = array or bytearray(256)
self.elements = elements
self.k = 11 self.k = 11
self.m = len(self.array) * 8 self.m = len(self.array) * 8
@ -82,14 +79,10 @@ class Bloomfilter(object):
def add(self, key): def add(self, key):
for i in self.get_probes(key): for i in self.get_probes(key):
self.array[i//8] |= 2 ** (i%8) self.array[i//8] |= 2 ** (i%8)
self.elements += 1
def __contains__(self, key): def __contains__(self, key):
return all(self.array[i//8] & (2 ** (i % 8)) for i in self.get_probes(key)) return all(self.array[i//8] & (2 ** (i % 8)) for i in self.get_probes(key))
def __len__(self):
return self.elements
class JSONRequest(Request): class JSONRequest(Request):

View File

@ -2,20 +2,10 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import pkg_resources
dist = pkg_resources.get_distribution("isso")
import json
from werkzeug.wrappers import Response
from werkzeug.routing import Rule
from werkzeug.exceptions import BadRequest from werkzeug.exceptions import BadRequest
from isso import local
from isso.compat import text_type as str
class requires(object):
class requires:
"""Verify that the request URL contains and can parse the parameter. """Verify that the request URL contains and can parse the parameter.
.. code-block:: python .. code-block:: python
@ -46,20 +36,7 @@ class requires:
return dec return dec
from .api import API
from .info import Info
class Info(object): __all__ = ["requires", "API", "Info"]
def __init__(self, isso):
self.moderation = isso.conf.getboolean("moderation", "enabled")
isso.urls.add(Rule('/info', endpoint=self.show))
def show(self, environ, request):
rv = {
"version": dist.version,
"host": str(local("host")),
"origin": str(local("origin")),
"moderation": self.moderation,
}
return Response(json.dumps(rv), 200, content_type="application/json")

376
isso/views/api.py Normal file
View File

@ -0,0 +1,376 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import cgi
import functools
from itsdangerous import SignatureExpired, BadSignature
from werkzeug.http import dump_cookie
from werkzeug.wrappers import Response
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
from isso.compat import text_type as str, string_types
from isso import utils
from isso.utils import JSONResponse as JSON
from isso.views import requires
from isso.utils.hash import sha1
from isso.controllers import threads, comments
def normalize(url):
if not url.startswith(("http://", "https://")):
return "http://" + url
return url
def xhr(func):
"""A decorator to check for CSRF on POST/PUT/DELETE using a <form>
element and JS to execute automatically (see #40 for a proof-of-concept).
When an attacker uses a <form> to downvote a comment, the browser *should*
add a `Content-Type: ...` header with three possible values:
* application/x-www-form-urlencoded
* multipart/form-data
* text/plain
If the header is not sent or requests `application/json`, the request is
not forged (XHR is restricted by CORS separately).
"""
def dec(self, env, req, *args, **kwargs):
if req.content_type and not req.content_type.startswith("application/json"):
raise Forbidden("CSRF")
return func(self, env, req, *args, **kwargs)
return dec
class API(object):
# comment fields, that can be submitted
ACCEPT = set(['text', 'author', 'email', 'website', 'parent'])
def __init__(self, conf, cache, db, guard, hash, markup, signer):
self.conf = conf
self.db = db
self.cache = cache
self.hash = hash
self.markup = markup
self.threads = threads.Controller(db)
self.comments = comments.Controller(db, guard)
self.max_age = conf.getint("general", "max-age")
self.moderated = conf.getboolean("moderation", "enabled")
self.sign = signer.dumps
self.load = functools.partial(signer.loads, max_age=self.max_age)
def serialize(self, comment, markup=True):
_id = str(comment.id)
obj = {
"id": comment.id, "parent": comment.parent,
"mode": comment.mode,
"created": comment.created, "modified": comment.modified,
"text": comment.text, "author": comment.author,
"email": comment.email, "website": comment.website,
"likes": comment.likes, "dislikes": comment.dislikes}
if markup:
html = self.cache.get("text", _id)
if html is None:
html = self.markup.render(comment.text)
self.cache.set("text", _id, html)
obj["text"] = html
hash = self.cache.get("hash", _id)
if hash is None:
hash = self.hash(comment.email or comment.remote_addr)
self.cache.set("hash", _id, hash)
obj["hash"] = hash
return obj
@xhr
@requires(str, 'uri')
def new(self, environ, request, uri):
data = request.get_json()
if not isinstance(data, dict):
raise BadRequest(400, "request data is not an object")
for field in set(data.keys()) - API.ACCEPT:
data.pop(field)
for field in ("author", "email", "website"):
if isinstance(data.get(field, None), string_types):
data[field] = cgi.escape(data[field])
if isinstance(data.get("website", None), string_types):
data["website"] = normalize(data["website"])
remote_addr = utils.anonymize(str(request.remote_addr))
with self.db.transaction:
thread = self.threads.get(uri)
if thread is None:
thread = self.threads.new(uri)
try:
comment = self.comments.new(remote_addr, thread, data,
moderated=self.moderated)
except comments.Invalid as ex:
raise BadRequest(ex.message)
except comments.Denied as ex:
raise Forbidden(ex.message)
# TODO queue new thread, send notification
_id = str(comment.id)
signature = self.sign([comment.id, sha1(comment.text)])
resp = JSON(
self.serialize(comment),
202 if comment.moderated == 2 else 201)
resp.headers.add("Set-Cookie", dump_cookie(_id, signature))
resp.headers.add("X-Set-Cookie", dump_cookie("isso-" + _id, signature))
return resp
def view(self, environ, request, id):
comment = self.comments.get(id)
if comment is None:
raise NotFound
markup = request.args.get('plain', '0') == '0'
return JSON(self.serialize(comment, markup), 200)
@xhr
def edit(self, environ, request, id):
try:
rv = self.load(request.cookies.get(str(id), ""))
except (SignatureExpired, BadSignature):
raise Forbidden
if rv[0] != id:
raise Forbidden
comment = self.comments.get(id)
if comment is None:
raise NotFound
# verify checksum, mallory might skip cookie deletion when
# he deletes a comment
if rv[1] != sha1(comment.text):
raise Forbidden
data = request.get_json()
if not isinstance(data, dict):
raise BadRequest(400, "request data is not an object")
for field in set(data.keys()) - API.ACCEPT:
data.pop(field)
with self.db.transaction:
comment = self.comments.edit(id, data)
_id = str(comment.id)
signature = self.sign([comment.id, sha1(comment.text)])
self.cache.delete("text", _id)
self.cache.delete("hash", _id)
resp = JSON(self.serialize(comment), 200)
resp.headers.add("Set-Cookie", dump_cookie(_id, signature))
resp.headers.add("X-Set-Cookie", dump_cookie("isso-" + _id, signature))
return resp
@xhr
def delete(self, environ, request, id, key=None):
try:
rv = self.load(request.cookies.get(str(id), ""))
except (SignatureExpired, BadSignature):
raise Forbidden
if rv[0] != id:
raise Forbidden
comment = self.comments.get(id)
if comment is None:
raise NotFound
if rv[1] != sha1(comment.text):
raise Forbidden
_id = str(comment.id)
self.cache.delete("text", _id)
self.cache.delete("hash", _id)
with self.db.transaction:
comment = self.comments.delete(id)
cookie = functools.partial(dump_cookie, expires=0, max_age=0)
resp = JSON(self.serialize(comment) if comment else None, 200)
resp.headers.add("Set-Cookie", cookie(_id))
resp.headers.add("X-Set-Cookie", cookie("isso-" + _id))
return resp
def moderate(self, environ, request, id, action, key):
try:
id = self.load(key, max_age=2**32)
except (BadSignature, SignatureExpired):
raise Forbidden
comment = self.comments.get(id)
if comment is None:
raise NotFound
if request.method == "GET":
modal = (
"<!DOCTYPE html>"
"<html>"
"<head>"
"<script>"
" if (confirm('{0}: Are you sure?')) {"
" xhr = new XMLHttpRequest;"
" xhr.open('POST', window.location.href);"
" xhr.send(null);"
" }"
"</script>".format(action.capitalize()))
return Response(modal, 200, content_type="text/html")
if action == "activate":
with self.db.transaction:
self.comments.activate(id)
else:
with self.db.transaction:
self.comments.delete(id)
self.cache.delete("text", str(comment.id))
self.cache.delete("hash", str(comment.id))
return Response("Ok", 200)
# FIXME move logic into controller
@requires(str, 'uri')
def fetch(self, environ, request, uri):
thread = self.threads.get(uri)
if thread is None:
raise NotFound
args = {
'thread': thread,
'after': request.args.get('after', 0)
}
try:
args['limit'] = int(request.args.get('limit'))
except TypeError:
args['limit'] = None
except ValueError:
return BadRequest("limit should be integer")
if request.args.get('parent') is not None:
try:
args['parent'] = int(request.args.get('parent'))
root_id = args['parent']
except ValueError:
return BadRequest("parent should be integer")
else:
args['parent'] = None
root_id = None
reply_counts = self.comments.reply_count(thread, after=args['after'])
if args['limit'] == 0:
root_list = []
else:
root_list = list(self.comments.all(**args))
if not root_list:
raise NotFound
if root_id not in reply_counts:
reply_counts[root_id] = 0
try:
nested_limit = int(request.args.get('nested_limit'))
except TypeError:
nested_limit = None
except ValueError:
return BadRequest("nested_limit should be integer")
rv = {
'id' : root_id,
'total_replies' : reply_counts[root_id],
'hidden_replies' : reply_counts[root_id] - len(root_list),
'replies' : self._process_fetched_list(root_list)
}
# We are only checking for one level deep comments
if root_id is None:
for comment in rv['replies']:
if comment['id'] in reply_counts:
comment['total_replies'] = reply_counts[comment['id']]
if nested_limit is not None:
if nested_limit > 0:
args['parent'] = comment['id']
args['limit'] = nested_limit
replies = list(self.comments.all(**args))
else:
replies = []
else:
args['parent'] = comment['id']
replies = list(self.comments.all(**args))
else:
comment['total_replies'] = 0
replies = []
comment['hidden_replies'] = comment['total_replies'] - len(replies)
comment['replies'] = self._process_fetched_list(replies)
return JSON(rv, 200)
def _process_fetched_list(self, fetched_list):
return map(self.serialize, fetched_list)
@xhr
def like(self, environ, request, id):
remote_addr = utils.anonymize(str(request.remote_addr))
if not self.comments.like(remote_addr, id):
raise BadRequest
return Response("Ok", 200)
@xhr
def dislike(self, environ, request, id):
remote_addr = utils.anonymize(str(request.remote_addr))
if not self.comments.dislike(remote_addr, id):
raise BadRequest
return Response("Ok", 200)
def count(self, environ, request):
data = request.get_json()
if not isinstance(data, list) and not all(isinstance(x, str) for x in data):
raise BadRequest("JSON must be a list of URLs")
th = [self.threads.get(uri) for uri in data]
return JSON(self.comments.count(*th), 200)

View File

@ -1,486 +0,0 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import re
import cgi
import time
import functools
from itsdangerous import SignatureExpired, BadSignature
from werkzeug.http import dump_cookie
from werkzeug.wsgi import get_current_url
from werkzeug.utils import redirect
from werkzeug.routing import Rule
from werkzeug.wrappers import Response
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
from isso.compat import text_type as str
from isso import utils, local
from isso.utils import http, parse, JSONResponse as JSON
from isso.views import requires
from isso.utils.hash import sha1
# from Django appearently, looks good to me *duck*
__url_re = re.compile(
r'^'
r'(https?://)?'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)'
r'$', re.IGNORECASE)
def isurl(text):
return __url_re.match(text) is not None
def normalize(url):
if not url.startswith(("http://", "https://")):
return "http://" + url
return url
def xhr(func):
"""A decorator to check for CSRF on POST/PUT/DELETE using a <form>
element and JS to execute automatically (see #40 for a proof-of-concept).
When an attacker uses a <form> to downvote a comment, the browser *should*
add a `Content-Type: ...` header with three possible values:
* application/x-www-form-urlencoded
* multipart/form-data
* text/plain
If the header is not sent or requests `application/json`, the request is
not forged (XHR is restricted by CORS separately).
"""
def dec(self, env, req, *args, **kwargs):
if req.content_type and not req.content_type.startswith("application/json"):
raise Forbidden("CSRF")
return func(self, env, req, *args, **kwargs)
return dec
class API(object):
FIELDS = set(['id', 'parent', 'text', 'author', 'website', 'email',
'mode', 'created', 'modified', 'likes', 'dislikes', 'hash'])
# comment fields, that can be submitted
ACCEPT = set(['text', 'author', 'website', 'email', 'parent'])
VIEWS = [
('fetch', ('GET', '/')),
('new', ('POST', '/new')),
('count', ('GET', '/count')),
('counts', ('POST', '/count')),
('view', ('GET', '/id/<int:id>')),
('edit', ('PUT', '/id/<int:id>')),
('delete', ('DELETE', '/id/<int:id>')),
('moderate',('GET', '/id/<int:id>/<any(activate,delete):action>/<string:key>')),
('moderate',('POST', '/id/<int:id>/<any(activate,delete):action>/<string:key>')),
('like', ('POST', '/id/<int:id>/like')),
('dislike', ('POST', '/id/<int:id>/dislike')),
('demo', ('GET', '/demo'))
]
def __init__(self, isso, hasher):
self.isso = isso
self.hash = hasher.uhash
self.cache = isso.cache
self.signal = isso.signal
self.conf = isso.conf.section("general")
self.moderated = isso.conf.getboolean("moderation", "enabled")
self.guard = isso.db.guard
self.threads = isso.db.threads
self.comments = isso.db.comments
for (view, (method, path)) in self.VIEWS:
isso.urls.add(
Rule(path, methods=[method], endpoint=getattr(self, view)))
@classmethod
def verify(cls, comment):
if "text" not in comment:
return False, "text is missing"
if not isinstance(comment.get("parent"), (int, type(None))):
return False, "parent must be an integer or null"
for key in ("text", "author", "website", "email"):
if not isinstance(comment.get(key), (str, type(None))):
return False, "%s must be a string or null" % key
if len(comment["text"].rstrip()) < 3:
return False, "text is too short (minimum length: 3)"
if len(comment.get("email") or "") > 254:
return False, "http://tools.ietf.org/html/rfc5321#section-4.5.3"
if comment.get("website"):
if len(comment["website"]) > 254:
return False, "arbitrary length limit"
if not isurl(comment["website"]):
return False, "Website not Django-conform"
return True, ""
@xhr
@requires(str, 'uri')
def new(self, environ, request, uri):
data = request.get_json()
for field in set(data.keys()) - API.ACCEPT:
data.pop(field)
for key in ("author", "email", "website", "parent"):
data.setdefault(key, None)
valid, reason = API.verify(data)
if not valid:
return BadRequest(reason)
for field in ("author", "email", "website"):
if data.get(field) is not None:
data[field] = cgi.escape(data[field])
if data.get("website"):
data["website"] = normalize(data["website"])
data['mode'] = 2 if self.moderated else 1
data['remote_addr'] = utils.anonymize(str(request.remote_addr))
with self.isso.lock:
if uri not in self.threads:
with http.curl('GET', local("origin"), uri) as resp:
if resp and resp.status == 200:
uri, title = parse.thread(resp.read(), id=uri)
else:
return NotFound('URI does not exist')
thread = self.threads.new(uri, title)
self.signal("comments.new:new-thread", thread)
else:
thread = self.threads[uri]
# notify extensions that the new comment is about to save
self.signal("comments.new:before-save", thread, data)
valid, reason = self.guard.validate(uri, data)
if not valid:
self.signal("comments.new:guard", reason)
raise Forbidden(reason)
with self.isso.lock:
rv = self.comments.add(uri, data)
# notify extension, that the new comment has been successfully saved
self.signal("comments.new:after-save", thread, rv)
cookie = functools.partial(dump_cookie,
value=self.isso.sign([rv["id"], sha1(rv["text"])]),
max_age=self.conf.getint('max-age'))
rv["text"] = self.isso.render(rv["text"])
rv["hash"] = self.hash(rv['email'] or rv['remote_addr'])
self.cache.set('hash', (rv['email'] or rv['remote_addr']), rv['hash'])
for key in set(rv.keys()) - API.FIELDS:
rv.pop(key)
# success!
self.signal("comments.new:finish", thread, rv)
resp = JSON(rv, 202 if rv["mode"] == 2 else 201)
resp.headers.add("Set-Cookie", cookie(str(rv["id"])))
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % rv["id"]))
return resp
def view(self, environ, request, id):
rv = self.comments.get(id)
if rv is None:
raise NotFound
for key in set(rv.keys()) - API.FIELDS:
rv.pop(key)
if request.args.get('plain', '0') == '0':
rv['text'] = self.isso.render(rv['text'])
return JSON(rv, 200)
@xhr
def edit(self, environ, request, id):
try:
rv = self.isso.unsign(request.cookies.get(str(id), ''))
except (SignatureExpired, BadSignature):
raise Forbidden
if rv[0] != id:
raise Forbidden
# verify checksum, mallory might skip cookie deletion when he deletes a comment
if rv[1] != sha1(self.comments.get(id)["text"]):
raise Forbidden
data = request.get_json()
if "text" not in data or data["text"] is None or len(data["text"]) < 3:
raise BadRequest("no text given")
for key in set(data.keys()) - set(["text", "author", "website"]):
data.pop(key)
data['modified'] = time.time()
with self.isso.lock:
rv = self.comments.update(id, data)
for key in set(rv.keys()) - API.FIELDS:
rv.pop(key)
self.signal("comments.edit", rv)
cookie = functools.partial(dump_cookie,
value=self.isso.sign([rv["id"], sha1(rv["text"])]),
max_age=self.conf.getint('max-age'))
rv["text"] = self.isso.render(rv["text"])
self.cache.set("text", str(id), rv["text"])
resp = JSON(rv, 200)
resp.headers.add("Set-Cookie", cookie(str(rv["id"])))
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % rv["id"]))
return resp
@xhr
def delete(self, environ, request, id, key=None):
try:
rv = self.isso.unsign(request.cookies.get(str(id), ""))
except (SignatureExpired, BadSignature):
raise Forbidden
else:
if rv[0] != id:
raise Forbidden
# verify checksum, mallory might skip cookie deletion when he deletes a comment
if rv[1] != sha1(self.comments.get(id)["text"]):
raise Forbidden
item = self.comments.get(id)
if item is None:
raise NotFound
self.cache.delete('hash', (item['email'] or item['remote_addr']))
self.cache.delete('text', str(item['id']))
with self.isso.lock:
rv = self.comments.delete(id)
if rv:
for key in set(rv.keys()) - API.FIELDS:
rv.pop(key)
self.signal("comments.delete", id)
resp = JSON(rv, 200)
cookie = functools.partial(dump_cookie, expires=0, max_age=0)
resp.headers.add("Set-Cookie", cookie(str(id)))
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % id))
return resp
def moderate(self, environ, request, id, action, key):
try:
id = self.isso.unsign(key, max_age=2**32)
except (BadSignature, SignatureExpired):
raise Forbidden
item = self.comments.get(id)
if item is None:
raise NotFound
if request.method == "GET":
modal = (
"<!DOCTYPE html>"
"<html>"
"<head>"
"<script>"
" if (confirm('%s: Are you sure?')) {"
" xhr = new XMLHttpRequest;"
" xhr.open('POST', window.location.href);"
" xhr.send(null);"
" }"
"</script>" % action.capitalize())
return Response(modal, 200, content_type="text/html")
if action == "activate":
with self.isso.lock:
self.comments.activate(id)
self.signal("comments.activate", id)
else:
with self.isso.lock:
self.comments.delete(id)
self.cache.delete('hash', (item['email'] or item['remote_addr']))
self.signal("comments.delete", id)
return Response("Yo", 200)
@requires(str, 'uri')
def fetch(self, environ, request, uri):
args = {
'uri': uri,
'after': request.args.get('after', 0)
}
try:
args['limit'] = int(request.args.get('limit'))
except TypeError:
args['limit'] = None
except ValueError:
return BadRequest("limit should be integer")
if request.args.get('parent') is not None:
try:
args['parent'] = int(request.args.get('parent'))
root_id = args['parent']
except ValueError:
return BadRequest("parent should be integer")
else:
args['parent'] = None
root_id = None
# FIXME inverse logic
plain = request.args.get('plain', '0') == '0'
reply_counts = self.comments.reply_count(uri, after=args['after'])
if args['limit'] == 0:
root_list = []
else:
root_list = list(self.comments.fetch(**args))
if not root_list:
raise NotFound
if root_id not in reply_counts:
reply_counts[root_id] = 0
try:
nested_limit = int(request.args.get('nested_limit'))
except TypeError:
nested_limit = None
except ValueError:
return BadRequest("nested_limit should be integer")
rv = {
'id' : root_id,
'total_replies' : reply_counts[root_id],
'hidden_replies' : reply_counts[root_id] - len(root_list),
'replies' : self._process_fetched_list(root_list, plain)
}
# We are only checking for one level deep comments
if root_id is None:
for comment in rv['replies']:
if comment['id'] in reply_counts:
comment['total_replies'] = reply_counts[comment['id']]
if nested_limit is not None:
if nested_limit > 0:
args['parent'] = comment['id']
args['limit'] = nested_limit
replies = list(self.comments.fetch(**args))
else:
replies = []
else:
args['parent'] = comment['id']
replies = list(self.comments.fetch(**args))
else:
comment['total_replies'] = 0
replies = []
comment['hidden_replies'] = comment['total_replies'] - len(replies)
comment['replies'] = self._process_fetched_list(replies, plain)
return JSON(rv, 200)
def _process_fetched_list(self, fetched_list, plain=False):
for item in fetched_list:
key = item['email'] or item['remote_addr']
val = self.cache.get('hash', key)
if val is None:
val = self.hash(key)
self.cache.set('hash', key, val)
item['hash'] = val
for key in set(item.keys()) - API.FIELDS:
item.pop(key)
if plain:
for item in fetched_list:
key = str(item['id'])
val = self.cache.get('text', key)
if val is None:
val = self.isso.render(item['text'])
self.cache.set('text', key, val)
item['text'] = val
return fetched_list
@xhr
def like(self, environ, request, id):
nv = self.comments.vote(True, id, utils.anonymize(str(request.remote_addr)))
return JSON(nv, 200)
@xhr
def dislike(self, environ, request, id):
nv = self.comments.vote(False, id, utils.anonymize(str(request.remote_addr)))
return JSON(nv, 200)
# TODO: remove someday (replaced by :func:`counts`)
@requires(str, 'uri')
def count(self, environ, request, uri):
rv = self.comments.count(uri)[0]
if rv == 0:
raise NotFound
return JSON(rv, 200)
def counts(self, environ, request):
data = request.get_json()
if not isinstance(data, list) and not all(isinstance(x, str) for x in data):
raise BadRequest("JSON must be a list of URLs")
return JSON(self.comments.count(*data), 200)
def demo(self, env, req):
return redirect(get_current_url(env) + '/index.html')

30
isso/views/info.py Normal file
View File

@ -0,0 +1,30 @@
# -*- encoding: utf-8 -*-
from __future__ import unicode_literals
import pkg_resources
dist = pkg_resources.get_distribution("isso")
import json
from werkzeug.wrappers import Response
from isso import local
from isso.compat import text_type as str
class Info(object):
def __init__(self, conf):
self.moderation = conf.getboolean("moderation", "enabled")
def show(self, environ, request):
rv = {
"version": dist.version,
"host": str(local("host")),
"origin": str(local("origin")),
"moderation": self.moderation,
}
return Response(json.dumps(rv), 200, content_type="application/json")

View File

@ -115,7 +115,7 @@ timeout = 10
# enable guard, recommended in production. Not useful for debugging purposes. # enable guard, recommended in production. Not useful for debugging purposes.
enabled = true enabled = true
# limit to N new comments per minute. # limit to N new comments per minute. Use -1 to disable rate limit.
ratelimit = 2 ratelimit = 2
# how many comments directly to the thread (prevent a simple while true; do # how many comments directly to the thread (prevent a simple while true; do