autopep8 the code and then flake8

pull/390/head
cclauss 6 years ago committed by Benoît Latinier
parent a68d392f9c
commit 7f55c81e08

@ -13,11 +13,12 @@ matrix:
env: TOX_ENV=wheezy env: TOX_ENV=wheezy
install: install:
- pip install -U pip - pip install -U pip
- pip install pyflakes tox - pip install flake8 tox
- sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm - sudo rm -rf /dev/shm && sudo ln -s /run/shm /dev/shm
script: script:
- tox -e $TOX_ENV - tox -e $TOX_ENV
- python -m pyflakes.__main__ $(git ls-files | grep -E "^isso/.+.py$" | grep -v "^isso/compat.py") - IGNORE=E226,E241,E265,E402,E501,E704
- flake8 . --count --ignore=${IGNORE} --max-line-length=127 --show-source --statistics
notifications: notifications:
irc: irc:
channels: channels:

@ -9,4 +9,3 @@ class IssoTranslator(HTMLTranslator):
if self.section_level == 1: if self.section_level == 1:
raise nodes.SkipNode raise nodes.SkipNode
HTMLTranslator.visit_title(self, node) HTMLTranslator.visit_title(self, node)

@ -200,22 +200,22 @@ htmlhelp_basename = 'Issodoc'
# -- Options for LaTeX output --------------------------------------------- # -- Options for LaTeX output ---------------------------------------------
latex_elements = { latex_elements = {
# The paper size ('letterpaper' or 'a4paper'). # The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper', #'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt'). # The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt', #'pointsize': '10pt',
# Additional stuff for the LaTeX preamble. # Additional stuff for the LaTeX preamble.
#'preamble': '', #'preamble': '',
} }
# Grouping the document tree into LaTeX files. List of tuples # Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, # (source start file, target name, title,
# author, documentclass [howto, manual, or own class]). # author, documentclass [howto, manual, or own class]).
latex_documents = [ latex_documents = [
('index', 'Isso.tex', u'Isso Documentation', ('index', 'Isso.tex', u'Isso Documentation',
u'Martin Zimmermann', 'manual'), u'Martin Zimmermann', 'manual'),
] ]
# The name of an image file (relative to this directory) to place at the top of # The name of an image file (relative to this directory) to place at the top of
@ -260,9 +260,9 @@ man_pages = [
# (source start file, target name, title, author, # (source start file, target name, title, author,
# dir menu entry, description, category) # dir menu entry, description, category)
texinfo_documents = [ texinfo_documents = [
('index', 'Isso', u'Isso Documentation', ('index', 'Isso', u'Isso Documentation',
u'Martin Zimmermann', 'Isso', 'a commenting server similar to Disqus', u'Martin Zimmermann', 'Isso', 'a commenting server similar to Disqus',
'Miscellaneous'), 'Miscellaneous'),
] ]
# Documents to append as an appendix to all manuals. # Documents to append as an appendix to all manuals.

@ -35,7 +35,8 @@ import sys
if sys.argv[0].startswith("isso"): if sys.argv[0].startswith("isso"):
try: try:
import gevent.monkey; gevent.monkey.patch_all() import gevent.monkey
gevent.monkey.patch_all()
except ImportError: except ImportError:
pass pass
@ -87,7 +88,8 @@ class Isso(object):
self.conf = conf self.conf = conf
self.db = db.SQLite3(conf.get('general', 'dbpath'), conf) self.db = db.SQLite3(conf.get('general', 'dbpath'), conf)
self.signer = URLSafeTimedSerializer(self.db.preferences.get("session-key")) self.signer = URLSafeTimedSerializer(
self.db.preferences.get("session-key"))
self.markup = html.Markup(conf.section('markup')) self.markup = html.Markup(conf.section('markup'))
self.hasher = hash.new(conf.section("hash")) self.hasher = hash.new(conf.section("hash"))
@ -122,7 +124,8 @@ class Isso(object):
local.request = request local.request = request
local.host = wsgi.host(request.environ) local.host = wsgi.host(request.environ)
local.origin = origin(self.conf.getiter("general", "host"))(request.environ) local.origin = origin(self.conf.getiter(
"general", "host"))(request.environ)
adapter = self.urls.bind_to_environ(request.environ) adapter = self.urls.bind_to_environ(request.environ)
@ -136,7 +139,8 @@ class Isso(object):
except HTTPException as e: except HTTPException as e:
return e return e
except Exception: except Exception:
logger.exception("%s %s", request.method, request.environ["PATH_INFO"]) logger.exception("%s %s", request.method,
request.environ["PATH_INFO"])
return InternalServerError() return InternalServerError()
else: else:
return response return response
@ -181,19 +185,19 @@ def make_app(conf=None, threading=True, multiprocessing=False, uwsgi=False):
if isso.conf.getboolean("server", "profile"): if isso.conf.getboolean("server", "profile"):
wrapper.append(partial(ProfilerMiddleware, wrapper.append(partial(ProfilerMiddleware,
sort_by=("cumulative", ), restrictions=("isso/(?!lib)", 10))) sort_by=("cumulative", ), restrictions=("isso/(?!lib)", 10)))
wrapper.append(partial(SharedDataMiddleware, exports={ wrapper.append(partial(SharedDataMiddleware, exports={
'/js': join(dirname(__file__), 'js/'), '/js': join(dirname(__file__), 'js/'),
'/css': join(dirname(__file__), 'css/'), '/css': join(dirname(__file__), 'css/'),
'/img': join(dirname(__file__), 'img/'), '/img': join(dirname(__file__), 'img/'),
'/demo': join(dirname(__file__), 'demo/') '/demo': join(dirname(__file__), 'demo/')
})) }))
wrapper.append(partial(wsgi.CORSMiddleware, wrapper.append(partial(wsgi.CORSMiddleware,
origin=origin(isso.conf.getiter("general", "host")), origin=origin(isso.conf.getiter("general", "host")),
allowed=("Origin", "Referer", "Content-Type"), allowed=("Origin", "Referer", "Content-Type"),
exposed=("X-Set-Cookie", "Date"))) exposed=("X-Set-Cookie", "Date")))
wrapper.extend([wsgi.SubURI, ProxyFix]) wrapper.extend([wsgi.SubURI, ProxyFix])
@ -208,9 +212,10 @@ def main():
parser = ArgumentParser(description="a blog comment hosting service") parser = ArgumentParser(description="a blog comment hosting service")
subparser = parser.add_subparsers(help="commands", dest="command") subparser = parser.add_subparsers(help="commands", dest="command")
parser.add_argument('--version', action='version', version='%(prog)s ' + dist.version) parser.add_argument('--version', action='version',
version='%(prog)s ' + dist.version)
parser.add_argument("-c", dest="conf", default="/etc/isso.conf", parser.add_argument("-c", dest="conf", default="/etc/isso.conf",
metavar="/etc/isso.conf", help="set configuration file") metavar="/etc/isso.conf", help="set configuration file")
imprt = subparser.add_parser('import', help="import Disqus XML export") imprt = subparser.add_parser('import', help="import Disqus XML export")
imprt.add_argument("dump", metavar="FILE") imprt.add_argument("dump", metavar="FILE")
@ -225,7 +230,8 @@ def main():
subparser.add_parser("run", help="run server") subparser.add_parser("run", help="run server")
args = parser.parse_args() args = parser.parse_args()
conf = config.load(join(dist.location, dist.project_name, "defaults.ini"), args.conf) conf = config.load(
join(dist.location, dist.project_name, "defaults.ini"), args.conf)
if args.command == "import": if args.command == "import":
conf.set("guard", "enabled", "off") conf.set("guard", "enabled", "off")

@ -12,11 +12,13 @@ except NameError: # Python 3
if not PY2K: if not PY2K:
buffer = memoryview buffer = memoryview
filter, map, zip = filter, map, zip filter, map, zip = filter, map, zip
iteritems = lambda dikt: iter(dikt.items()) # noqa: E731
def iteritems(dikt): return iter(dikt.items()) # noqa: E731
from functools import reduce from functools import reduce
else: else:
buffer = buffer buffer = buffer
from itertools import ifilter, imap, izip from itertools import ifilter, imap, izip
filter, map, zip = ifilter, imap, izip filter, map, zip = ifilter, imap, izip
iteritems = lambda dikt: dikt.iteritems() # noqa: E731
def iteritems(dikt): return dikt.iteritems() # noqa: E731
reduce = reduce reduce = reduce

@ -123,8 +123,8 @@ def new(options=None):
def load(default, user=None): def load(default, user=None):
# return set of (section, option) # return set of (section, option)
setify = lambda cp: set((section, option) for section in cp.sections() def setify(cp): return set((section, option) for section in cp.sections()
for option in cp.options(section)) for option in cp.options(section))
parser = new() parser = new()
parser.read(default) parser.read(default)

@ -122,7 +122,8 @@ class uWSGIMixin(Mixin):
self.cache = uWSGICache self.cache = uWSGICache
timedelta = conf.getint("moderation", "purge-after") timedelta = conf.getint("moderation", "purge-after")
purge = lambda signum: self.db.comments.purge(timedelta)
def purge(signum): return self.db.comments.purge(timedelta)
uwsgi.register_signal(1, "", purge) uwsgi.register_signal(1, "", purge)
uwsgi.add_timer(1, timedelta) uwsgi.add_timer(1, timedelta)

@ -98,10 +98,11 @@ class SQLite3:
# limit max. nesting level to 1 # limit max. nesting level to 1
if self.version == 2: if self.version == 2:
first = lambda rv: list(map(operator.itemgetter(0), rv)) def first(rv): return list(map(operator.itemgetter(0), rv))
with sqlite3.connect(self.path) as con: with sqlite3.connect(self.path) as con:
top = first(con.execute("SELECT id FROM comments WHERE parent IS NULL").fetchall()) top = first(con.execute(
"SELECT id FROM comments WHERE parent IS NULL").fetchall())
flattened = defaultdict(set) flattened = defaultdict(set)
for id in top: for id in top:
@ -109,13 +110,15 @@ class SQLite3:
ids = [id, ] ids = [id, ]
while ids: while ids:
rv = first(con.execute("SELECT id FROM comments WHERE parent=?", (ids.pop(), ))) rv = first(con.execute(
"SELECT id FROM comments WHERE parent=?", (ids.pop(), )))
ids.extend(rv) ids.extend(rv)
flattened[id].update(set(rv)) flattened[id].update(set(rv))
for id in flattened.keys(): for id in flattened.keys():
for n in flattened[id]: for n in flattened[id]:
con.execute("UPDATE comments SET parent=? WHERE id=?", (id, n)) con.execute(
"UPDATE comments SET parent=? WHERE id=?", (id, n))
con.execute('PRAGMA user_version = 3') con.execute('PRAGMA user_version = 3')
logger.info("%i rows changed", con.total_changes) logger.info("%i rows changed", con.total_changes)

@ -83,7 +83,7 @@ class Comments:
""" """
self.db.execute([ self.db.execute([
'UPDATE comments SET', 'UPDATE comments SET',
','.join(key + '=' + '?' for key in data), ','.join(key + '=' + '?' for key in data),
'WHERE id=?;'], 'WHERE id=?;'],
list(data.values()) + [id]) list(data.values()) + [id])
@ -94,7 +94,8 @@ class Comments:
Search for comment :param:`id` and return a mapping of :attr:`fields` Search for comment :param:`id` and return a mapping of :attr:`fields`
and values. and values.
""" """
rv = self.db.execute('SELECT * FROM comments WHERE id=?', (id, )).fetchone() rv = self.db.execute(
'SELECT * FROM comments WHERE id=?', (id, )).fetchone()
if rv: if rv:
return dict(zip(Comments.fields, rv)) return dict(zip(Comments.fields, rv))
@ -122,7 +123,7 @@ class Comments:
for f in fields_comments]) for f in fields_comments])
sql_threads_fields = ', '.join(['threads.' + f sql_threads_fields = ', '.join(['threads.' + f
for f in fields_threads]) for f in fields_threads])
sql = ['SELECT ' + sql_comments_fields + ', ' + \ sql = ['SELECT ' + sql_comments_fields + ', ' +
sql_threads_fields + ' ' sql_threads_fields + ' '
'FROM comments INNER JOIN threads ' 'FROM comments INNER JOIN threads '
'ON comments.tid=threads.id ' 'ON comments.tid=threads.id '
@ -162,9 +163,9 @@ class Comments:
""" """
Return comments for :param:`uri` with :param:`mode`. Return comments for :param:`uri` with :param:`mode`.
""" """
sql = [ 'SELECT comments.* FROM comments INNER JOIN threads ON', sql = ['SELECT comments.* FROM comments INNER JOIN threads ON',
' threads.uri=? AND comments.tid=threads.id AND (? | comments.mode) = ?', ' threads.uri=? AND comments.tid=threads.id AND (? | comments.mode) = ?',
' AND comments.created>?'] ' AND comments.created>?']
sql_args = [uri, mode, mode, after] sql_args = [uri, mode, mode, after]
@ -216,7 +217,8 @@ class Comments:
In the second case this comment can be safely removed without any side In the second case this comment can be safely removed without any side
effects.""" effects."""
refs = self.db.execute('SELECT * FROM comments WHERE parent=?', (id, )).fetchone() refs = self.db.execute(
'SELECT * FROM comments WHERE parent=?', (id, )).fetchone()
if refs is None: if refs is None:
self.db.execute('DELETE FROM comments WHERE id=?', (id, )) self.db.execute('DELETE FROM comments WHERE id=?', (id, ))
@ -226,7 +228,8 @@ class Comments:
self.db.execute('UPDATE comments SET text=? WHERE id=?', ('', id)) self.db.execute('UPDATE comments SET text=? WHERE id=?', ('', id))
self.db.execute('UPDATE comments SET mode=? WHERE id=?', (4, id)) self.db.execute('UPDATE comments SET mode=? WHERE id=?', (4, id))
for field in ('author', 'website'): for field in ('author', 'website'):
self.db.execute('UPDATE comments SET %s=? WHERE id=?' % field, (None, id)) self.db.execute('UPDATE comments SET %s=? WHERE id=?' %
field, (None, id))
self._remove_stale() self._remove_stale()
return self.get(id) return self.get(id)

@ -24,7 +24,7 @@ class Preferences:
def get(self, key, default=None): def get(self, key, default=None):
rv = self.db.execute( rv = self.db.execute(
'SELECT value FROM preferences WHERE key=?', (key, )).fetchone() 'SELECT value FROM preferences WHERE key=?', (key, )).fetchone()
if rv is None: if rv is None:
return default return default

@ -50,7 +50,7 @@ class Guard:
return False, "%i direct responses to %s" % (len(rv), uri) return False, "%i direct responses to %s" % (len(rv), uri)
# block replies to self unless :param:`reply-to-self` is enabled # block replies to self unless :param:`reply-to-self` is enabled
elif self.conf.getboolean("reply-to-self") == False: elif self.conf.getboolean("reply-to-self") is False:
rv = self.db.execute([ rv = self.db.execute([
'SELECT id FROM comments WHERE' 'SELECT id FROM comments WHERE'
' remote_addr = ?', ' remote_addr = ?',

@ -26,5 +26,6 @@ class Threads(object):
return Thread(*self.db.execute("SELECT * FROM threads WHERE uri=?", (uri, )).fetchone()) return Thread(*self.db.execute("SELECT * FROM threads WHERE uri=?", (uri, )).fetchone())
def new(self, uri, title): def new(self, uri, title):
self.db.execute("INSERT INTO threads (uri, title) VALUES (?, ?)", (uri, title)) self.db.execute(
"INSERT INTO threads (uri, title) VALUES (?, ?)", (uri, title))
return self[uri] return self[uri]

@ -25,7 +25,8 @@ class Dispatcher(DispatcherMiddleware):
def __init__(self, *confs): def __init__(self, *confs):
self.isso = {} self.isso = {}
default = os.path.join(dist.location, dist.project_name, "defaults.ini") default = os.path.join(
dist.location, dist.project_name, "defaults.ini")
for i, path in enumerate(confs): for i, path in enumerate(confs):
conf = config.load(default, path) conf = config.load(default, path)
@ -45,7 +46,8 @@ class Dispatcher(DispatcherMiddleware):
return super(Dispatcher, self).__call__(environ, start_response) return super(Dispatcher, self).__call__(environ, start_response)
def default(self, environ, start_response): def default(self, environ, start_response):
resp = Response("\n".join(self.isso.keys()), 404, content_type="text/plain") resp = Response("\n".join(self.isso.keys()),
404, content_type="text/plain")
return resp(environ, start_response) return resp(environ, start_response)

@ -58,7 +58,8 @@ class SMTP(object):
uwsgi.spooler = spooler uwsgi.spooler = spooler
def __enter__(self): def __enter__(self):
klass = (smtplib.SMTP_SSL if self.conf.get('security') == 'ssl' else smtplib.SMTP) klass = (smtplib.SMTP_SSL if self.conf.get(
'security') == 'ssl' else smtplib.SMTP)
self.client = klass(host=self.conf.get('host'), self.client = klass(host=self.conf.get('host'),
port=self.conf.getint('port'), port=self.conf.getint('port'),
timeout=self.conf.getint('timeout')) timeout=self.conf.getint('timeout'))
@ -104,7 +105,8 @@ class SMTP(object):
rv.write("User's URL: %s\n" % comment["website"]) rv.write("User's URL: %s\n" % comment["website"])
rv.write("IP address: %s\n" % comment["remote_addr"]) rv.write("IP address: %s\n" % comment["remote_addr"])
rv.write("Link to comment: %s\n" % (local("origin") + thread["uri"] + "#isso-%i" % comment["id"])) rv.write("Link to comment: %s\n" %
(local("origin") + thread["uri"] + "#isso-%i" % comment["id"]))
rv.write("\n") rv.write("\n")
uri = local("host") + "/id/%i" % comment["id"] uri = local("host") + "/id/%i" % comment["id"]
@ -173,7 +175,8 @@ class Stdout(object):
logger.info("comment created: %s", json.dumps(comment)) logger.info("comment created: %s", json.dumps(comment))
def _edit_comment(self, comment): def _edit_comment(self, comment):
logger.info('comment %i edited: %s', comment["id"], json.dumps(comment)) logger.info('comment %i edited: %s',
comment["id"], json.dumps(comment))
def _delete_comment(self, id): def _delete_comment(self, id):
logger.info('comment %i deleted', id) logger.info('comment %i deleted', id)

@ -30,6 +30,7 @@ from xml.etree import ElementTree
logger = logging.getLogger("isso") logger = logging.getLogger("isso")
def strip(val): def strip(val):
if isinstance(val, string_types): if isinstance(val, string_types):
return val.strip() return val.strip()
@ -82,7 +83,8 @@ class Disqus(object):
remap = dict() remap = dict()
if path not in self.db.threads: if path not in self.db.threads:
self.db.threads.new(path, thread.find(Disqus.ns + 'title').text.strip()) self.db.threads.new(path, thread.find(
Disqus.ns + 'title').text.strip())
for item in sorted(posts, key=lambda k: k['created']): for item in sorted(posts, key=lambda k: k['created']):
@ -112,9 +114,11 @@ class Disqus(object):
} }
if post.find(Disqus.ns + 'parent') is not None: if post.find(Disqus.ns + 'parent') is not None:
item['dsq:parent'] = post.find(Disqus.ns + 'parent').attrib.get(Disqus.internals + 'id') item['dsq:parent'] = post.find(
Disqus.ns + 'parent').attrib.get(Disqus.internals + 'id')
res[post.find('%sthread' % Disqus.ns).attrib.get(Disqus.internals + 'id')].append(item) res[post.find('%sthread' % Disqus.ns).attrib.get(
Disqus.internals + 'id')].append(item)
progress = Progress(len(tree.findall(Disqus.ns + 'thread'))) progress = Progress(len(tree.findall(Disqus.ns + 'thread')))
for i, thread in enumerate(tree.findall(Disqus.ns + 'thread')): for i, thread in enumerate(tree.findall(Disqus.ns + 'thread')):
@ -135,7 +139,8 @@ class Disqus(object):
progress.finish("{0} threads, {1} comments".format( progress.finish("{0} threads, {1} comments".format(
len(self.threads), len(self.comments))) len(self.threads), len(self.comments)))
orphans = set(map(lambda e: e.attrib.get(Disqus.internals + "id"), tree.findall(Disqus.ns + "post"))) - self.comments orphans = set(map(lambda e: e.attrib.get(Disqus.internals + "id"),
tree.findall(Disqus.ns + "post"))) - self.comments
if orphans and not self.threads: if orphans and not self.threads:
print("Isso couldn't import any thread, try again with --empty-id") print("Isso couldn't import any thread, try again with --empty-id")
elif orphans: elif orphans:
@ -258,22 +263,22 @@ def autodetect(peek):
def dispatch(type, db, dump, empty_id=False): def dispatch(type, db, dump, empty_id=False):
if db.execute("SELECT * FROM comments").fetchone(): if db.execute("SELECT * FROM comments").fetchone():
if input("Isso DB is not empty! Continue? [y/N]: ") not in ("y", "Y"): if input("Isso DB is not empty! Continue? [y/N]: ") not in ("y", "Y"):
raise SystemExit("Abort.") raise SystemExit("Abort.")
if type == "disqus": if type == "disqus":
cls = Disqus cls = Disqus
elif type == "wordpress": elif type == "wordpress":
cls = WordPress cls = WordPress
else: else:
with io.open(dump, encoding="utf-8") as fp: with io.open(dump, encoding="utf-8") as fp:
cls = autodetect(fp.read(io.DEFAULT_BUFFER_SIZE)) cls = autodetect(fp.read(io.DEFAULT_BUFFER_SIZE))
if cls is None: if cls is None:
raise SystemExit("Unknown format, abort.") raise SystemExit("Unknown format, abort.")
if cls is Disqus: if cls is Disqus:
cls = functools.partial(cls, empty_id=empty_id) cls = functools.partial(cls, empty_id=empty_id)
cls(db, dump).migrate() cls(db, dump).migrate()

@ -37,5 +37,7 @@ class Dummy:
pass pass
curl = lambda method, host, path: Dummy() def curl(method, host, path): return Dummy()
loads = lambda data: json.loads(data.decode('utf-8'))
def loads(data): return json.loads(data.decode('utf-8'))

@ -50,7 +50,8 @@ class TestComments(unittest.TestCase):
def testGet(self): def testGet(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'})) self.post('/new?uri=%2Fpath%2F',
data=json.dumps({'text': 'Lorem ipsum ...'}))
r = self.get('/id/1') r = self.get('/id/1')
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
@ -61,7 +62,8 @@ class TestComments(unittest.TestCase):
def testCreate(self): def testCreate(self):
rv = self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'})) rv = self.post('/new?uri=%2Fpath%2F',
data=json.dumps({'text': 'Lorem ipsum ...'}))
self.assertEqual(rv.status_code, 201) self.assertEqual(rv.status_code, 201)
self.assertIn("Set-Cookie", rv.headers) self.assertIn("Set-Cookie", rv.headers)
@ -73,7 +75,8 @@ class TestComments(unittest.TestCase):
def textCreateWithNonAsciiText(self): def textCreateWithNonAsciiText(self):
rv = self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Здравствуй, мир!'})) rv = self.post('/new?uri=%2Fpath%2F',
data=json.dumps({'text': 'Здравствуй, мир!'}))
self.assertEqual(rv.status_code, 201) self.assertEqual(rv.status_code, 201)
rv = loads(rv.data) rv = loads(rv.data)
@ -105,14 +108,16 @@ class TestComments(unittest.TestCase):
def testCreateInvalidParent(self): def testCreateInvalidParent(self):
self.post('/new?uri=test', data=json.dumps({'text': '...'})) self.post('/new?uri=test', data=json.dumps({'text': '...'}))
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1})) self.post('/new?uri=test',
invalid = self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 2})) data=json.dumps({'text': '...', 'parent': 1}))
invalid = self.post(
'/new?uri=test', data=json.dumps({'text': '...', 'parent': 2}))
self.assertEqual(loads(invalid.data)["parent"], 1) self.assertEqual(loads(invalid.data)["parent"], 1)
def testVerifyFields(self): def testVerifyFields(self):
verify = lambda comment: comments.API.verify(comment)[0] def verify(comment): return comments.API.verify(comment)[0]
# text is missing # text is missing
self.assertFalse(verify({})) self.assertFalse(verify({}))
@ -128,10 +133,12 @@ class TestComments(unittest.TestCase):
# email/website length # email/website length
self.assertTrue(verify({"text": "...", "email": "*"*254})) self.assertTrue(verify({"text": "...", "email": "*"*254}))
self.assertTrue(verify({"text": "...", "website": "google.de/" + "a"*128})) self.assertTrue(
verify({"text": "...", "website": "google.de/" + "a"*128}))
self.assertFalse(verify({"text": "...", "email": "*"*1024})) self.assertFalse(verify({"text": "...", "email": "*"*1024}))
self.assertFalse(verify({"text": "...", "website": "google.de/" + "*"*1024})) self.assertFalse(
verify({"text": "...", "website": "google.de/" + "*"*1024}))
# valid website url # valid website url
self.assertTrue(comments.isurl("example.tld")) self.assertTrue(comments.isurl("example.tld"))
@ -139,7 +146,8 @@ class TestComments(unittest.TestCase):
self.assertTrue(comments.isurl("https://example.tld")) self.assertTrue(comments.isurl("https://example.tld"))
self.assertTrue(comments.isurl("https://example.tld:1337/")) self.assertTrue(comments.isurl("https://example.tld:1337/"))
self.assertTrue(comments.isurl("https://example.tld:1337/foobar")) self.assertTrue(comments.isurl("https://example.tld:1337/foobar"))
self.assertTrue(comments.isurl("https://example.tld:1337/foobar?p=1#isso-thread")) self.assertTrue(comments.isurl(
"https://example.tld:1337/foobar?p=1#isso-thread"))
self.assertFalse(comments.isurl("ftp://example.tld/")) self.assertFalse(comments.isurl("ftp://example.tld/"))
self.assertFalse(comments.isurl("tel:+1234567890")) self.assertFalse(comments.isurl("tel:+1234567890"))
@ -149,7 +157,8 @@ class TestComments(unittest.TestCase):
def testGetInvalid(self): def testGetInvalid(self):
self.assertEqual(self.get('/?uri=%2Fpath%2F&id=123').status_code, 404) self.assertEqual(self.get('/?uri=%2Fpath%2F&id=123').status_code, 404)
self.assertEqual(self.get('/?uri=%2Fpath%2Fspam%2F&id=123').status_code, 404) self.assertEqual(
self.get('/?uri=%2Fpath%2Fspam%2F&id=123').status_code, 404)
self.assertEqual(self.get('/?uri=?uri=%foo%2F').status_code, 404) self.assertEqual(self.get('/?uri=?uri=%foo%2F').status_code, 404)
def testGetLimited(self): def testGetLimited(self):
@ -166,7 +175,8 @@ class TestComments(unittest.TestCase):
def testGetNested(self): def testGetNested(self):
self.post('/new?uri=test', data=json.dumps({'text': '...'})) self.post('/new?uri=test', data=json.dumps({'text': '...'}))
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1})) self.post('/new?uri=test',
data=json.dumps({'text': '...', 'parent': 1}))
r = self.get('/?uri=test&parent=1') r = self.get('/?uri=test&parent=1')
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
@ -178,7 +188,8 @@ class TestComments(unittest.TestCase):
self.post('/new?uri=test', data=json.dumps({'text': '...'})) self.post('/new?uri=test', data=json.dumps({'text': '...'}))
for i in range(20): for i in range(20):
self.post('/new?uri=test', data=json.dumps({'text': '...', 'parent': 1})) self.post('/new?uri=test',
data=json.dumps({'text': '...', 'parent': 1}))
r = self.get('/?uri=test&parent=1&limit=10') r = self.get('/?uri=test&parent=1&limit=10')
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
@ -188,7 +199,8 @@ class TestComments(unittest.TestCase):
def testUpdate(self): def testUpdate(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'})) self.post('/new?uri=%2Fpath%2F',
data=json.dumps({'text': 'Lorem ipsum ...'}))
self.put('/id/1', data=json.dumps({ self.put('/id/1', data=json.dumps({
'text': 'Hello World', 'author': 'me', 'website': 'http://example.com/'})) 'text': 'Hello World', 'author': 'me', 'website': 'http://example.com/'}))
@ -203,7 +215,8 @@ class TestComments(unittest.TestCase):
def testDelete(self): def testDelete(self):
self.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Lorem ipsum ...'})) self.post('/new?uri=%2Fpath%2F',
data=json.dumps({'text': 'Lorem ipsum ...'}))
r = self.delete('/id/1') r = self.delete('/id/1')
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
self.assertEqual(loads(r.data), None) self.assertEqual(loads(r.data), None)
@ -213,7 +226,8 @@ class TestComments(unittest.TestCase):
client = JSONClient(self.app, Response) client = JSONClient(self.app, Response)
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'})) client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First', 'parent': 1})) client.post('/new?uri=%2Fpath%2F',
data=json.dumps({'text': 'First', 'parent': 1}))
r = client.delete('/id/1') r = client.delete('/id/1')
self.assertEqual(r.status_code, 200) self.assertEqual(r.status_code, 200)
@ -242,8 +256,10 @@ class TestComments(unittest.TestCase):
client = JSONClient(self.app, Response) client = JSONClient(self.app, Response)
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'})) client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Second', 'parent': 1})) client.post('/new?uri=%2Fpath%2F',
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Third', 'parent': 1})) data=json.dumps({'text': 'Second', 'parent': 1}))
client.post('/new?uri=%2Fpath%2F',
data=json.dumps({'text': 'Third', 'parent': 1}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Last'})) client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Last'}))
client.delete('/id/1') client.delete('/id/1')
@ -261,10 +277,11 @@ class TestComments(unittest.TestCase):
for path in paths: for path in paths:
self.assertEqual(self.post('/new?' + urlencode({'uri': path}), self.assertEqual(self.post('/new?' + urlencode({'uri': path}),
data=json.dumps({'text': '...'})).status_code, 201) data=json.dumps({'text': '...'})).status_code, 201)
for i, path in enumerate(paths): for i, path in enumerate(paths):
self.assertEqual(self.get('/?' + urlencode({'uri': path})).status_code, 200) self.assertEqual(
self.get('/?' + urlencode({'uri': path})).status_code, 200)
self.assertEqual(self.get('/id/%i' % (i + 1)).status_code, 200) self.assertEqual(self.get('/id/%i' % (i + 1)).status_code, 200)
def testDeleteAndCreateByDifferentUsersButSamePostId(self): def testDeleteAndCreateByDifferentUsersButSamePostId(self):
@ -283,7 +300,8 @@ class TestComments(unittest.TestCase):
a = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Aaa"})) a = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Aaa"}))
b = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Bbb"})) b = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Bbb"}))
c = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "Ccc", "email": "..."})) c = self.post('/new?uri=%2Fpath%2F',
data=json.dumps({"text": "Ccc", "email": "..."}))
a = loads(a.data) a = loads(a.data)
b = loads(b.data) b = loads(b.data)
@ -295,7 +313,8 @@ class TestComments(unittest.TestCase):
def testVisibleFields(self): def testVisibleFields(self):
rv = self.post('/new?uri=%2Fpath%2F', data=json.dumps({"text": "...", "invalid": "field"})) rv = self.post('/new?uri=%2Fpath%2F',
data=json.dumps({"text": "...", "invalid": "field"}))
self.assertEqual(rv.status_code, 201) self.assertEqual(rv.status_code, 201)
rv = loads(rv.data) rv = loads(rv.data)
@ -333,7 +352,8 @@ class TestComments(unittest.TestCase):
for uri, count in iteritems(expected): for uri, count in iteritems(expected):
for _ in range(count): for _ in range(count):
self.post('/new?uri=%s' % uri, data=json.dumps({"text": "..."})) self.post('/new?uri=%s' %
uri, data=json.dumps({"text": "..."}))
rv = self.post('/count', data=json.dumps(list(expected.keys()))) rv = self.post('/count', data=json.dumps(list(expected.keys())))
self.assertEqual(loads(rv.data), list(expected.values())) self.assertEqual(loads(rv.data), list(expected.values()))
@ -362,22 +382,26 @@ class TestComments(unittest.TestCase):
self.post('/new?uri=%2F', data=json.dumps({"text": "..."})) self.post('/new?uri=%2F', data=json.dumps({"text": "..."}))
# no header is fine (default for XHR) # no header is fine (default for XHR)
self.assertEqual(self.post('/id/1/dislike', content_type="").status_code, 200) self.assertEqual(
self.post('/id/1/dislike', content_type="").status_code, 200)
# x-www-form-urlencoded is definitely not RESTful # x-www-form-urlencoded is definitely not RESTful
self.assertEqual(self.post('/id/1/dislike', content_type=form).status_code, 403) self.assertEqual(
self.post('/id/1/dislike', content_type=form).status_code, 403)
self.assertEqual(self.post('/new?uri=%2F', data=json.dumps({"text": "..."}), self.assertEqual(self.post('/new?uri=%2F', data=json.dumps({"text": "..."}),
content_type=form).status_code, 403) content_type=form).status_code, 403)
# just for the record # just for the record
self.assertEqual(self.post('/id/1/dislike', content_type=js).status_code, 200) self.assertEqual(
self.post('/id/1/dislike', content_type=js).status_code, 200)
def testPreview(self): def testPreview(self):
response = self.post('/preview', data=json.dumps({'text': 'This is **mark***down*'})) response = self.post(
'/preview', data=json.dumps({'text': 'This is **mark***down*'}))
self.assertEqual(response.status_code, 200) self.assertEqual(response.status_code, 200)
rv = loads(response.data) rv = loads(response.data)
self.assertEqual(rv["text"], '<p>This is <strong>mark</strong><em>down</em></p>') self.assertEqual(
rv["text"], '<p>This is <strong>mark</strong><em>down</em></p>')
class TestModeratedComments(unittest.TestCase): class TestModeratedComments(unittest.TestCase):
@ -402,7 +426,8 @@ class TestModeratedComments(unittest.TestCase):
def testAddComment(self): def testAddComment(self):
rv = self.client.post('/new?uri=test', data=json.dumps({"text": "..."})) rv = self.client.post(
'/new?uri=test', data=json.dumps({"text": "..."}))
self.assertEqual(rv.status_code, 202) self.assertEqual(rv.status_code, 202)
self.assertEqual(self.client.get('/id/1').status_code, 200) self.assertEqual(self.client.get('/id/1').status_code, 200)

@ -19,31 +19,37 @@ class CORSTest(unittest.TestCase):
def test_simple(self): def test_simple(self):
app = CORSMiddleware(hello_world, app = CORSMiddleware(hello_world,
origin=origin([ origin=origin([
"https://example.tld/", "https://example.tld/",
"http://example.tld/", "http://example.tld/",
]), ]),
allowed=("Foo", "Bar"), exposed=("Spam", )) allowed=("Foo", "Bar"), exposed=("Spam", ))
client = Client(app, Response) client = Client(app, Response)
rv = client.get("/", headers={"Origin": "https://example.tld"}) rv = client.get("/", headers={"Origin": "https://example.tld"})
self.assertEqual(rv.headers["Access-Control-Allow-Origin"], "https://example.tld") self.assertEqual(
self.assertEqual(rv.headers["Access-Control-Allow-Credentials"], "true") rv.headers["Access-Control-Allow-Origin"], "https://example.tld")
self.assertEqual(rv.headers["Access-Control-Allow-Methods"], "HEAD, GET, POST, PUT, DELETE") self.assertEqual(
self.assertEqual(rv.headers["Access-Control-Allow-Headers"], "Foo, Bar") rv.headers["Access-Control-Allow-Credentials"], "true")
self.assertEqual(
rv.headers["Access-Control-Allow-Methods"], "HEAD, GET, POST, PUT, DELETE")
self.assertEqual(
rv.headers["Access-Control-Allow-Headers"], "Foo, Bar")
self.assertEqual(rv.headers["Access-Control-Expose-Headers"], "Spam") self.assertEqual(rv.headers["Access-Control-Expose-Headers"], "Spam")
a = client.get("/", headers={"Origin": "http://example.tld"}) a = client.get("/", headers={"Origin": "http://example.tld"})
self.assertEqual(a.headers["Access-Control-Allow-Origin"], "http://example.tld") self.assertEqual(
a.headers["Access-Control-Allow-Origin"], "http://example.tld")
b = client.get("/", headers={"Origin": "http://example.tld"}) b = client.get("/", headers={"Origin": "http://example.tld"})
self.assertEqual(b.headers["Access-Control-Allow-Origin"], "http://example.tld") self.assertEqual(
b.headers["Access-Control-Allow-Origin"], "http://example.tld")
c = client.get("/", headers={"Origin": "http://foo.other"}) c = client.get("/", headers={"Origin": "http://foo.other"})
self.assertEqual(c.headers["Access-Control-Allow-Origin"], "https://example.tld") self.assertEqual(
c.headers["Access-Control-Allow-Origin"], "https://example.tld")
def test_preflight(self): def test_preflight(self):
@ -51,10 +57,12 @@ class CORSTest(unittest.TestCase):
allowed=("Foo", ), exposed=("Bar", )) allowed=("Foo", ), exposed=("Bar", ))
client = Client(app, Response) client = Client(app, Response)
rv = client.open(method="OPTIONS", path="/", headers={"Origin": "http://example.tld"}) rv = client.open(method="OPTIONS", path="/",
headers={"Origin": "http://example.tld"})
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
for hdr in ("Origin", "Headers", "Credentials", "Methods"): for hdr in ("Origin", "Headers", "Credentials", "Methods"):
self.assertIn("Access-Control-Allow-%s" % hdr, rv.headers) self.assertIn("Access-Control-Allow-%s" % hdr, rv.headers)
self.assertEqual(rv.headers["Access-Control-Allow-Origin"], "http://example.tld") self.assertEqual(
rv.headers["Access-Control-Allow-Origin"], "http://example.tld")

@ -66,10 +66,10 @@ class TestDBMigration(unittest.TestCase):
tree = { tree = {
1: None, 1: None,
2: None, 2: None,
3: 2, 3: 2,
4: 3, 4: 3,
7: 3, 7: 3,
5: 2, 5: 2,
6: None 6: None
} }
@ -91,7 +91,8 @@ class TestDBMigration(unittest.TestCase):
" dislikes INTEGER DEFAULT 0," " dislikes INTEGER DEFAULT 0,"
" voters BLOB)") " voters BLOB)")
con.execute("INSERT INTO threads (uri, title) VALUES (?, ?)", ("/", "Test")) con.execute(
"INSERT INTO threads (uri, title) VALUES (?, ?)", ("/", "Test"))
for (id, parent) in iteritems(tree): for (id, parent) in iteritems(tree):
con.execute("INSERT INTO comments (" con.execute("INSERT INTO comments ("
" id, parent, created)" " id, parent, created)"
@ -108,13 +109,14 @@ class TestDBMigration(unittest.TestCase):
flattened = list(iteritems({ flattened = list(iteritems({
1: None, 1: None,
2: None, 2: None,
3: 2, 3: 2,
4: 2, 4: 2,
5: 2, 5: 2,
6: None, 6: None,
7: 2 7: 2
})) }))
with sqlite3.connect(self.path) as con: with sqlite3.connect(self.path) as con:
rv = con.execute("SELECT id, parent FROM comments ORDER BY created").fetchall() rv = con.execute(
"SELECT id, parent FROM comments ORDER BY created").fetchall()
self.assertEqual(flattened, rv) self.assertEqual(flattened, rv)

@ -68,7 +68,8 @@ class TestGuard(unittest.TestCase):
alice = self.makeClient("1.2.3.4", 2) alice = self.makeClient("1.2.3.4", 2)
for i in range(2): for i in range(2):
self.assertEqual(alice.post("/new?uri=test", data=self.data).status_code, 201) self.assertEqual(alice.post(
"/new?uri=test", data=self.data).status_code, 201)
bob.application.db.execute([ bob.application.db.execute([
"UPDATE comments SET", "UPDATE comments SET",
@ -76,7 +77,8 @@ class TestGuard(unittest.TestCase):
"WHERE remote_addr = '127.0.0.0'" "WHERE remote_addr = '127.0.0.0'"
]) ])
self.assertEqual(bob.post("/new?uri=test", data=self.data).status_code, 201) self.assertEqual(
bob.post("/new?uri=test", data=self.data).status_code, 201)
def testDirectReply(self): def testDirectReply(self):
@ -95,11 +97,13 @@ class TestGuard(unittest.TestCase):
def testSelfReply(self): def testSelfReply(self):
payload = lambda id: json.dumps({"text": "...", "parent": id}) def payload(id): return json.dumps({"text": "...", "parent": id})
client = self.makeClient("127.0.0.1", self_reply=False) client = self.makeClient("127.0.0.1", self_reply=False)
self.assertEqual(client.post("/new?uri=test", data=self.data).status_code, 201) self.assertEqual(client.post(
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 403) "/new?uri=test", data=self.data).status_code, 201)
self.assertEqual(client.post(
"/new?uri=test", data=payload(1)).status_code, 403)
client.application.db.execute([ client.application.db.execute([
"UPDATE comments SET", "UPDATE comments SET",
@ -107,39 +111,55 @@ class TestGuard(unittest.TestCase):
"WHERE id = 1" "WHERE id = 1"
], (client.application.conf.getint("general", "max-age"), )) ], (client.application.conf.getint("general", "max-age"), ))
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 201) self.assertEqual(client.post(
"/new?uri=test", data=payload(1)).status_code, 201)
client = self.makeClient("128.0.0.1", ratelimit=3, self_reply=False) client = self.makeClient("128.0.0.1", ratelimit=3, self_reply=False)
self.assertEqual(client.post("/new?uri=test", data=self.data).status_code, 201) self.assertEqual(client.post(
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 201) "/new?uri=test", data=self.data).status_code, 201)
self.assertEqual(client.post("/new?uri=test", data=payload(2)).status_code, 201) self.assertEqual(client.post(
"/new?uri=test", data=payload(1)).status_code, 201)
self.assertEqual(client.post(
"/new?uri=test", data=payload(2)).status_code, 201)
def testRequireEmail(self): def testRequireEmail(self):
payload = lambda email: json.dumps({"text": "...", "email": email}) def payload(email): return json.dumps({"text": "...", "email": email})
client = self.makeClient("127.0.0.1", ratelimit=4, require_email=False) client = self.makeClient("127.0.0.1", ratelimit=4, require_email=False)
client_strict = self.makeClient("127.0.0.2", ratelimit=4, require_email=True) client_strict = self.makeClient(
"127.0.0.2", ratelimit=4, require_email=True)
# if we don't require email # if we don't require email
self.assertEqual(client.post("/new?uri=test", data=payload("")).status_code, 201) self.assertEqual(client.post(
self.assertEqual(client.post("/new?uri=test", data=payload("test@me.more")).status_code, 201) "/new?uri=test", data=payload("")).status_code, 201)
self.assertEqual(client.post(
"/new?uri=test", data=payload("test@me.more")).status_code, 201)
# if we do require email # if we do require email
self.assertEqual(client_strict.post("/new?uri=test", data=payload("")).status_code, 403) self.assertEqual(client_strict.post(
self.assertEqual(client_strict.post("/new?uri=test", data=payload("test@me.more")).status_code, 201) "/new?uri=test", data=payload("")).status_code, 403)
self.assertEqual(client_strict.post(
"/new?uri=test", data=payload("test@me.more")).status_code, 201)
def testRequireAuthor(self): def testRequireAuthor(self):
payload = lambda author: json.dumps({"text": "...", "author": author}) def payload(author): return json.dumps(
{"text": "...", "author": author})
client = self.makeClient("127.0.0.1", ratelimit=4, require_author=False) client = self.makeClient(
client_strict = self.makeClient("127.0.0.2", ratelimit=4, require_author=True) "127.0.0.1", ratelimit=4, require_author=False)
client_strict = self.makeClient(
"127.0.0.2", ratelimit=4, require_author=True)
# if we don't require author # if we don't require author
self.assertEqual(client.post("/new?uri=test", data=payload("")).status_code, 201) self.assertEqual(client.post(
self.assertEqual(client.post("/new?uri=test", data=payload("pipo author")).status_code, 201) "/new?uri=test", data=payload("")).status_code, 201)
self.assertEqual(client.post(
"/new?uri=test", data=payload("pipo author")).status_code, 201)
# if we do require author # if we do require author
self.assertEqual(client_strict.post("/new?uri=test", data=payload("")).status_code, 403) self.assertEqual(client_strict.post(
self.assertEqual(client_strict.post("/new?uri=test", data=payload("pipo author")).status_code, 201) "/new?uri=test", data=payload("")).status_code, 403)
self.assertEqual(client_strict.post(
"/new?uri=test", data=payload("pipo author")).status_code, 201)

@ -64,7 +64,8 @@ class TestHTML(unittest.TestCase):
sanitizer = html.Sanitizer(elements=[], attributes=[]) sanitizer = html.Sanitizer(elements=[], attributes=[])
examples = [ examples = [
('Look: <img src="..." />', 'Look: '), ('Look: <img src="..." />', 'Look: '),
('<a href="http://example.org/">Ha</a>', '<a href="http://example.org/">Ha</a>'), ('<a href="http://example.org/">Ha</a>',
'<a href="http://example.org/">Ha</a>'),
('<a href="sms:+1234567890">Ha</a>', '<a>Ha</a>'), ('<a href="sms:+1234567890">Ha</a>', '<a>Ha</a>'),
('<p style="visibility: hidden;">Test</p>', '<p>Test</p>'), ('<p style="visibility: hidden;">Test</p>', '<p>Test</p>'),
('<script>alert("Onoe")</script>', 'alert("Onoe")')] ('<script>alert("Onoe")</script>', 'alert("Onoe")')]

@ -29,7 +29,8 @@ class TestMigration(unittest.TestCase):
db = SQLite3(xxx.name, conf) db = SQLite3(xxx.name, conf)
Disqus(db, xml).migrate() Disqus(db, xml).migrate()
self.assertEqual(len(db.execute("SELECT id FROM comments").fetchall()), 2) self.assertEqual(
len(db.execute("SELECT id FROM comments").fetchall()), 2)
self.assertEqual(db.threads["/"]["title"], "Hello, World!") self.assertEqual(db.threads["/"]["title"], "Hello, World!")
self.assertEqual(db.threads["/"]["id"], 1) self.assertEqual(db.threads["/"]["id"], 1)
@ -57,8 +58,10 @@ class TestMigration(unittest.TestCase):
self.assertEqual(db.threads["/?p=4"]["title"], "...") self.assertEqual(db.threads["/?p=4"]["title"], "...")
self.assertEqual(db.threads["/?p=4"]["id"], 2) self.assertEqual(db.threads["/?p=4"]["id"], 2)
self.assertEqual(len(db.execute("SELECT id FROM threads").fetchall()), 2) self.assertEqual(
self.assertEqual(len(db.execute("SELECT id FROM comments").fetchall()), 7) len(db.execute("SELECT id FROM threads").fetchall()), 2)
self.assertEqual(
len(db.execute("SELECT id FROM comments").fetchall()), 7)
first = db.comments.get(1) first = db.comments.get(1)
self.assertEqual(first["author"], "Ohai") self.assertEqual(first["author"], "Ohai")

@ -12,7 +12,8 @@ class TestUtils(unittest.TestCase):
examples = [ examples = [
(u'12.34.56.78', u'12.34.56.0'), (u'12.34.56.78', u'12.34.56.0'),
(u'1234:5678:90ab:cdef:fedc:ba09:8765:4321', u'1234:5678:90ab:0000:0000:0000:0000:0000'), (u'1234:5678:90ab:cdef:fedc:ba09:8765:4321',
u'1234:5678:90ab:0000:0000:0000:0000:0000'),
(u'::ffff:127.0.0.1', u'127.0.0.0')] (u'::ffff:127.0.0.1', u'127.0.0.0')]
for (addr, anonymized) in examples: for (addr, anonymized) in examples:
@ -56,4 +57,4 @@ class TestParse(unittest.TestCase):
"""), ('test', 'Test')) """), ('test', 'Test'))
self.assertEqual(parse.thread('<section id="isso-thread" data-isso-id="Fuu.">'), self.assertEqual(parse.thread('<section id="isso-thread" data-isso-id="Fuu.">'),
('Fuu.', 'Untitled.')) ('Fuu.', 'Untitled.'))

@ -33,7 +33,8 @@ class TestHasher(unittest.TestCase):
class TestPBKDF2(unittest.TestCase): class TestPBKDF2(unittest.TestCase):
def test_default(self): def test_default(self):
pbkdf2 = PBKDF2(iterations=1000) # original setting (and still default) # original setting (and still default)
pbkdf2 = PBKDF2(iterations=1000)
self.assertEqual(pbkdf2.uhash(""), "42476aafe2e4") self.assertEqual(pbkdf2.uhash(""), "42476aafe2e4")
def test_different_salt(self): def test_different_salt(self):

@ -37,13 +37,15 @@ class TestVote(unittest.TestCase):
def testZeroLikes(self): def testZeroLikes(self):
rv = self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."})) rv = self.makeClient("127.0.0.1").post(
"/new?uri=test", data=json.dumps({"text": "..."}))
self.assertEqual(loads(rv.data)['likes'], 0) self.assertEqual(loads(rv.data)['likes'], 0)
self.assertEqual(loads(rv.data)['dislikes'], 0) self.assertEqual(loads(rv.data)['dislikes'], 0)
def testSingleLike(self): def testSingleLike(self):
self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."})) self.makeClient("127.0.0.1").post(
"/new?uri=test", data=json.dumps({"text": "..."}))
rv = self.makeClient("0.0.0.0").post("/id/1/like") rv = self.makeClient("0.0.0.0").post("/id/1/like")
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
@ -60,7 +62,8 @@ class TestVote(unittest.TestCase):
def testMultipleLikes(self): def testMultipleLikes(self):
self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."})) self.makeClient("127.0.0.1").post(
"/new?uri=test", data=json.dumps({"text": "..."}))
for num in range(15): for num in range(15):
rv = self.makeClient("1.2.%i.0" % num).post('/id/1/like') rv = self.makeClient("1.2.%i.0" % num).post('/id/1/like')
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
@ -73,7 +76,8 @@ class TestVote(unittest.TestCase):
def testTooManyLikes(self): def testTooManyLikes(self):
self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."})) self.makeClient("127.0.0.1").post(
"/new?uri=test", data=json.dumps({"text": "..."}))
for num in range(256): for num in range(256):
rv = self.makeClient("1.2.%i.0" % num).post('/id/1/like') rv = self.makeClient("1.2.%i.0" % num).post('/id/1/like')
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
@ -84,7 +88,8 @@ class TestVote(unittest.TestCase):
self.assertEqual(loads(rv.data)["likes"], num + 1) self.assertEqual(loads(rv.data)["likes"], num + 1)
def testDislike(self): def testDislike(self):
self.makeClient("127.0.0.1").post("/new?uri=test", data=json.dumps({"text": "..."})) self.makeClient("127.0.0.1").post(
"/new?uri=test", data=json.dumps({"text": "..."}))
rv = self.makeClient("1.2.3.4").post('/id/1/dislike') rv = self.makeClient("1.2.3.4").post('/id/1/dislike')
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)

@ -89,11 +89,11 @@ class Bloomfilter:
def add(self, key): def add(self, key):
for i in self.get_probes(key): for i in self.get_probes(key):
self.array[i//8] |= 2 ** (i%8) self.array[i//8] |= 2 ** (i % 8)
self.elements += 1 self.elements += 1
def __contains__(self, key): def __contains__(self, key):
return all(self.array[i//8] & (2 ** (i%8)) for i in self.get_probes(key)) return all(self.array[i//8] & (2 ** (i % 8)) for i in self.get_probes(key))
def __len__(self): def __len__(self):
return self.elements return self.elements
@ -117,6 +117,7 @@ def render_template(template_name, **context):
'..', 'templates') '..', 'templates')
jinja_env = Environment(loader=FileSystemLoader(template_path), jinja_env = Environment(loader=FileSystemLoader(template_path),
autoescape=True) autoescape=True)
def datetimeformat(value): def datetimeformat(value):
return datetime.fromtimestamp(value).strftime('%H:%M / %d-%m-%Y') return datetime.fromtimestamp(value).strftime('%H:%M / %d-%m-%Y')

@ -73,7 +73,7 @@ class Hash(object):
class PBKDF2(Hash): class PBKDF2(Hash):
def __init__(self, salt=None, iterations=1000, dklen=6, func="sha1"): def __init__(self, salt=None, iterations=1000, dklen=6, func="sha1"):
super(PBKDF2, self).__init__(salt) super(PBKDF2, self).__init__(salt)
@ -109,4 +109,4 @@ def new(conf):
return Hash(salt, algorithm) return Hash(salt, algorithm)
sha1 = Hash(func="sha1").uhash sha1 = Hash(func="sha1").uhash

@ -54,7 +54,8 @@ def sanitize(tokenizer, document):
builder = "simpletree" builder = "simpletree"
stream = html5lib.treewalkers.getTreeWalker(builder)(domtree) stream = html5lib.treewalkers.getTreeWalker(builder)(domtree)
serializer = HTMLSerializer(quote_attr_values=True, omit_optional_tags=False) serializer = HTMLSerializer(
quote_attr_values=True, omit_optional_tags=False)
return serializer.render(stream) return serializer.render(stream)

@ -15,7 +15,7 @@ from isso.compat import map, filter, PY2K
if PY2K: # http://bugs.python.org/issue12984 if PY2K: # http://bugs.python.org/issue12984
from xml.dom.minidom import NamedNodeMap from xml.dom.minidom import NamedNodeMap
NamedNodeMap.__contains__ = lambda self, key: self.has_key(key) NamedNodeMap.__contains__ = lambda self, key: self.has_key(key) # noqa
def thread(data, default=u"Untitled.", id=None): def thread(data, default=u"Untitled.", id=None):
@ -31,8 +31,8 @@ def thread(data, default=u"Untitled.", id=None):
# aka getElementById, but limited to div and section tags # aka getElementById, but limited to div and section tags
el = list(filter(lambda i: i.attributes["id"].value == "isso-thread", el = list(filter(lambda i: i.attributes["id"].value == "isso-thread",
filter(lambda i: "id" in i.attributes, filter(lambda i: "id" in i.attributes,
chain(*map(html.getElementsByTagName, ("div", "section")))))) chain(*map(html.getElementsByTagName, ("div", "section"))))))
if not el: if not el:
return id, default return id, default

@ -40,7 +40,8 @@ class requires:
try: try:
kwargs[self.param] = self.type(req.args[self.param]) kwargs[self.param] = self.type(req.args[self.param])
except TypeError: except TypeError:
raise BadRequest("invalid type for %s, expected %s" % (self.param, self.type)) raise BadRequest("invalid type for %s, expected %s" %
(self.param, self.type))
return func(cls, env, req, *args, **kwargs) return func(cls, env, req, *args, **kwargs)

@ -29,7 +29,8 @@ from isso.utils.hash import sha1
__url_re = re.compile( __url_re = re.compile(
r'^' r'^'
r'(https?://)?' r'(https?://)?'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... # domain...
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'
r'localhost|' # localhost... r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port r'(?::\d+)?' # optional port
@ -62,12 +63,12 @@ def xhr(func):
not forged (XHR is restricted by CORS separately). not forged (XHR is restricted by CORS separately).
""" """
""" """
@apiDefine csrf @apiDefine csrf
@apiHeader {string="application/json"} Content-Type @apiHeader {string="application/json"} Content-Type
The content type must be set to `application/json` to prevent CSRF attacks. The content type must be set to `application/json` to prevent CSRF attacks.
""" """
def dec(self, env, req, *args, **kwargs): def dec(self, env, req, *args, **kwargs):
if req.content_type and not req.content_type.startswith("application/json"): if req.content_type and not req.content_type.startswith("application/json"):
@ -93,8 +94,8 @@ class API(object):
('view', ('GET', '/id/<int:id>')), ('view', ('GET', '/id/<int:id>')),
('edit', ('PUT', '/id/<int:id>')), ('edit', ('PUT', '/id/<int:id>')),
('delete', ('DELETE', '/id/<int:id>')), ('delete', ('DELETE', '/id/<int:id>')),
('moderate',('GET', '/id/<int:id>/<any(edit,activate,delete):action>/<string:key>')), ('moderate', ('GET', '/id/<int:id>/<any(edit,activate,delete):action>/<string:key>')),
('moderate',('POST', '/id/<int:id>/<any(edit,activate,delete):action>/<string:key>')), ('moderate', ('POST', '/id/<int:id>/<any(edit,activate,delete):action>/<string:key>')),
('like', ('POST', '/id/<int:id>/like')), ('like', ('POST', '/id/<int:id>/like')),
('dislike', ('POST', '/id/<int:id>/dislike')), ('dislike', ('POST', '/id/<int:id>/dislike')),
('demo', ('GET', '/demo')), ('demo', ('GET', '/demo')),
@ -280,13 +281,15 @@ class API(object):
self.signal("comments.new:after-save", thread, rv) self.signal("comments.new:after-save", thread, rv)
cookie = functools.partial(dump_cookie, cookie = functools.partial(dump_cookie,
value=self.isso.sign([rv["id"], sha1(rv["text"])]), value=self.isso.sign(
max_age=self.conf.getint('max-age')) [rv["id"], sha1(rv["text"])]),
max_age=self.conf.getint('max-age'))
rv["text"] = self.isso.render(rv["text"]) rv["text"] = self.isso.render(rv["text"])
rv["hash"] = self.hash(rv['email'] or rv['remote_addr']) rv["hash"] = self.hash(rv['email'] or rv['remote_addr'])
self.cache.set('hash', (rv['email'] or rv['remote_addr']).encode('utf-8'), rv['hash']) self.cache.set(
'hash', (rv['email'] or rv['remote_addr']).encode('utf-8'), rv['hash'])
for key in set(rv.keys()) - API.FIELDS: for key in set(rv.keys()) - API.FIELDS:
rv.pop(key) rv.pop(key)
@ -326,6 +329,7 @@ class API(object):
"likes": 1 "likes": 1
} }
""" """
def view(self, environ, request, id): def view(self, environ, request, id):
rv = self.comments.get(id) rv = self.comments.get(id)
@ -409,8 +413,9 @@ class API(object):
self.signal("comments.edit", rv) self.signal("comments.edit", rv)
cookie = functools.partial(dump_cookie, cookie = functools.partial(dump_cookie,
value=self.isso.sign([rv["id"], sha1(rv["text"])]), value=self.isso.sign(
max_age=self.conf.getint('max-age')) [rv["id"], sha1(rv["text"])]),
max_age=self.conf.getint('max-age'))
rv["text"] = self.isso.render(rv["text"]) rv["text"] = self.isso.render(rv["text"])
@ -454,7 +459,8 @@ class API(object):
if item is None: if item is None:
raise NotFound raise NotFound
self.cache.delete('hash', (item['email'] or item['remote_addr']).encode('utf-8')) self.cache.delete(
'hash', (item['email'] or item['remote_addr']).encode('utf-8'))
with self.isso.lock: with self.isso.lock:
rv = self.comments.delete(id) rv = self.comments.delete(id)
@ -505,6 +511,7 @@ class API(object):
@apiSuccessExample Using POST: @apiSuccessExample Using POST:
Yo Yo
""" """
def moderate(self, environ, request, id, action, key): def moderate(self, environ, request, id, action, key):
try: try:
id = self.isso.unsign(key, max_age=2**32) id = self.isso.unsign(key, max_age=2**32)
@ -547,11 +554,11 @@ class API(object):
else: else:
with self.isso.lock: with self.isso.lock:
self.comments.delete(id) self.comments.delete(id)
self.cache.delete('hash', (item['email'] or item['remote_addr']).encode('utf-8')) self.cache.delete(
'hash', (item['email'] or item['remote_addr']).encode('utf-8'))
self.signal("comments.delete", id) self.signal("comments.delete", id)
return Response("Yo", 200) return Response("Yo", 200)
""" """
@api {get} / get comments @api {get} / get comments
@apiGroup Thread @apiGroup Thread
@ -681,10 +688,10 @@ class API(object):
return BadRequest("nested_limit should be integer") return BadRequest("nested_limit should be integer")
rv = { rv = {
'id' : root_id, 'id': root_id,
'total_replies' : reply_counts[root_id], 'total_replies': reply_counts[root_id],
'hidden_replies' : reply_counts[root_id] - len(root_list), 'hidden_replies': reply_counts[root_id] - len(root_list),
'replies' : self._process_fetched_list(root_list, plain) 'replies': self._process_fetched_list(root_list, plain)
} }
# We are only checking for one level deep comments # We are only checking for one level deep comments
if root_id is None: if root_id is None:
@ -705,7 +712,8 @@ class API(object):
comment['total_replies'] = 0 comment['total_replies'] = 0
replies = [] replies = []
comment['hidden_replies'] = comment['total_replies'] - len(replies) comment['hidden_replies'] = comment['total_replies'] - \
len(replies)
comment['replies'] = self._process_fetched_list(replies, plain) comment['replies'] = self._process_fetched_list(replies, plain)
return JSON(rv, 200) return JSON(rv, 200)
@ -762,7 +770,8 @@ class API(object):
@xhr @xhr
def like(self, environ, request, id): def like(self, environ, request, id):
nv = self.comments.vote(True, id, utils.anonymize(str(request.remote_addr))) nv = self.comments.vote(
True, id, utils.anonymize(str(request.remote_addr)))
return JSON(nv, 200) return JSON(nv, 200)
""" """
@ -788,7 +797,8 @@ class API(object):
@xhr @xhr
def dislike(self, environ, request, id): def dislike(self, environ, request, id):
nv = self.comments.vote(False, id, utils.anonymize(str(request.remote_addr))) nv = self.comments.vote(
False, id, utils.anonymize(str(request.remote_addr)))
return JSON(nv, 200) return JSON(nv, 200)
# TODO: remove someday (replaced by :func:`counts`) # TODO: remove someday (replaced by :func:`counts`)
@ -814,6 +824,7 @@ class API(object):
@apiSuccessExample Counts of 5 threads: @apiSuccessExample Counts of 5 threads:
[2, 18, 4, 0, 3] [2, 18, 4, 0, 3]
""" """
def counts(self, environ, request): def counts(self, environ, request):
data = request.get_json() data = request.get_json()
@ -838,10 +849,11 @@ class API(object):
data = req.form data = req.form
password = self.isso.conf.get("general", "admin_password") password = self.isso.conf.get("general", "admin_password")
if data['password'] and data['password'] == password: if data['password'] and data['password'] == password:
response = redirect(get_current_url(env, host_only=True) + '/admin') response = redirect(get_current_url(
env, host_only=True) + '/admin')
cookie = functools.partial(dump_cookie, cookie = functools.partial(dump_cookie,
value=self.isso.sign({"logged": True}), value=self.isso.sign({"logged": True}),
expires=datetime.now() + timedelta(1)) expires=datetime.now() + timedelta(1))
response.headers.add("Set-Cookie", cookie("admin-session")) response.headers.add("Set-Cookie", cookie("admin-session"))
response.headers.add("X-Set-Cookie", cookie("isso-admin-session")) response.headers.add("X-Set-Cookie", cookie("isso-admin-session"))
return response return response

@ -136,11 +136,14 @@ class CORSMiddleware(object):
headers = Headers(headers) headers = Headers(headers)
headers.add("Access-Control-Allow-Origin", self.origin(environ)) headers.add("Access-Control-Allow-Origin", self.origin(environ))
headers.add("Access-Control-Allow-Credentials", "true") headers.add("Access-Control-Allow-Credentials", "true")
headers.add("Access-Control-Allow-Methods", ", ".join(self.methods)) headers.add("Access-Control-Allow-Methods",
", ".join(self.methods))
if self.allowed: if self.allowed:
headers.add("Access-Control-Allow-Headers", ", ".join(self.allowed)) headers.add("Access-Control-Allow-Headers",
", ".join(self.allowed))
if self.exposed: if self.exposed:
headers.add("Access-Control-Expose-Headers", ", ".join(self.exposed)) headers.add("Access-Control-Expose-Headers",
", ".join(self.exposed))
return start_response(status, headers.to_list(), exc_info) return start_response(status, headers.to_list(), exc_info)
if environ.get("REQUEST_METHOD") == "OPTIONS": if environ.get("REQUEST_METHOD") == "OPTIONS":

Loading…
Cancel
Save