commit
0be3c69e1d
@ -1,47 +1,68 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import time
|
||||
import functools
|
||||
|
||||
from isso.db import IssoDBException
|
||||
|
||||
class Guard:
|
||||
|
||||
class TooManyComments(IssoDBException):
|
||||
pass
|
||||
def __init__(self, db):
|
||||
|
||||
self.db = db
|
||||
self.conf = db.conf.section("guard")
|
||||
self.max_age = db.conf.getint("general", "max-age")
|
||||
|
||||
def check(func):
|
||||
def validate(self, uri, comment):
|
||||
|
||||
def dec(self, uri, c):
|
||||
if not self.conf.getboolean("enabled"):
|
||||
return True, ""
|
||||
|
||||
if not self.db.conf.getboolean("guard", "enabled"):
|
||||
return func(self, uri, c)
|
||||
for func in (self._limit, self._spam):
|
||||
valid, reason = func(uri, comment)
|
||||
if not valid:
|
||||
return False, reason
|
||||
return True, ""
|
||||
|
||||
# block more than two comments per minute
|
||||
@classmethod
|
||||
def ids(cls, rv):
|
||||
return [str(col[0]) for col in rv]
|
||||
|
||||
def _limit(self, uri, comment):
|
||||
|
||||
# block more than :param:`ratelimit` comments per minute
|
||||
rv = self.db.execute([
|
||||
'SELECT id FROM comments WHERE remote_addr = ? AND created > ?;'
|
||||
], (c["remote_addr"], time.time() + 60)).fetchall()
|
||||
'SELECT id FROM comments WHERE remote_addr = ? AND ? - created < 60;'
|
||||
], (comment["remote_addr"], time.time())).fetchall()
|
||||
|
||||
if len(rv) >= self.db.conf.getint("guard", "ratelimit"):
|
||||
raise TooManyComments
|
||||
if len(rv) >= self.conf.getint("ratelimit"):
|
||||
return False, "{0}: ratelimit exceeded ({1})".format(
|
||||
comment["remote_addr"], ', '.join(Guard.ids(rv)))
|
||||
|
||||
# block more than three comments as direct response to the post
|
||||
rv = self.db.execute([
|
||||
'SELECT id FROM comments WHERE remote_addr = ? AND parent IS NULL;'
|
||||
], (c["remote_addr"], )).fetchall()
|
||||
if comment["parent"] is None:
|
||||
rv = self.db.execute([
|
||||
'SELECT id FROM comments WHERE',
|
||||
' tid = (SELECT id FROM threads WHERE uri = ?)',
|
||||
'AND remote_addr = ?',
|
||||
'AND parent IS NULL;'
|
||||
], (uri, comment["remote_addr"])).fetchall()
|
||||
|
||||
if len(rv) >= 3:
|
||||
raise TooManyComments
|
||||
if len(rv) >= self.conf.getint("direct-reply"):
|
||||
return False, "%i direct responses to %s" % (len(rv), uri)
|
||||
|
||||
# block reply to own comment if the cookie is still available (max age)
|
||||
if "parent" in c:
|
||||
elif self.conf.getboolean("reply-to-self") == False:
|
||||
rv = self.db.execute([
|
||||
'SELECT id FROM comments WHERE remote_addr = ? AND id = ? AND ? - created < ?;'
|
||||
], (c["remote_addr"], c["parent"], time.time(),
|
||||
self.db.conf.getint("general", "max-age"))).fetchall()
|
||||
'SELECT id FROM comments WHERE'
|
||||
' remote_addr = ?',
|
||||
'AND id = ?',
|
||||
'AND ? - created < ?'
|
||||
], (comment["remote_addr"], comment["parent"],
|
||||
time.time(), self.max_age)).fetchall()
|
||||
|
||||
if len(rv) > 0:
|
||||
raise TooManyComments
|
||||
return False, "edit time frame is still open"
|
||||
|
||||
return func(self, uri, c)
|
||||
return True, ""
|
||||
|
||||
return dec
|
||||
def _spam(self, uri, comment):
|
||||
return True, ""
|
||||
|
@ -0,0 +1,31 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import json
|
||||
|
||||
|
||||
class FakeIP(object):
|
||||
|
||||
def __init__(self, app, ip):
|
||||
self.app = app
|
||||
self.ip = ip
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
environ['REMOTE_ADDR'] = self.ip
|
||||
return self.app(environ, start_response)
|
||||
|
||||
|
||||
class Dummy:
|
||||
|
||||
status = 200
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def read(self):
|
||||
return ''
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
pass
|
||||
|
||||
curl = lambda method, host, path: Dummy()
|
||||
loads = lambda data: json.loads(data.decode('utf-8'))
|
@ -0,0 +1,102 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import unittest
|
||||
|
||||
import os
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
from werkzeug.test import Client
|
||||
from werkzeug.wrappers import Response
|
||||
|
||||
from isso import Isso, core
|
||||
from isso.utils import http
|
||||
|
||||
from fixtures import curl, FakeIP
|
||||
http.curl = curl
|
||||
|
||||
|
||||
class TestGuard(unittest.TestCase):
|
||||
|
||||
data = json.dumps({"text": "Lorem ipsum."})
|
||||
|
||||
def setUp(self):
|
||||
self.path = tempfile.NamedTemporaryFile().name
|
||||
|
||||
def makeClient(self, ip, ratelimit=2, direct_reply=3, self_reply=False):
|
||||
|
||||
conf = core.Config.load(None)
|
||||
conf.set("general", "dbpath", self.path)
|
||||
conf.set("guard", "enabled", "true")
|
||||
conf.set("guard", "ratelimit", str(ratelimit))
|
||||
conf.set("guard", "direct-reply", str(direct_reply))
|
||||
conf.set("guard", "reply-to-self", "1" if self_reply else "0")
|
||||
|
||||
class App(Isso, core.Mixin):
|
||||
pass
|
||||
|
||||
app = App(conf)
|
||||
app.wsgi_app = FakeIP(app.wsgi_app, ip)
|
||||
|
||||
return Client(app, Response)
|
||||
|
||||
def testRateLimit(self):
|
||||
|
||||
bob = self.makeClient("127.0.0.1", 2)
|
||||
|
||||
for i in range(2):
|
||||
rv = bob.post('/new?uri=test', data=self.data)
|
||||
assert rv.status_code == 201
|
||||
|
||||
rv = bob.post('/new?uri=test', data=self.data)
|
||||
|
||||
assert rv.status_code == 403
|
||||
assert "ratelimit exceeded" in rv.get_data(as_text=True)
|
||||
|
||||
alice = self.makeClient("1.2.3.4", 2)
|
||||
for i in range(2):
|
||||
assert alice.post("/new?uri=test", data=self.data).status_code == 201
|
||||
|
||||
bob.application.db.execute([
|
||||
"UPDATE comments SET",
|
||||
" created = created - 60",
|
||||
"WHERE remote_addr = '127.0.0.0'"
|
||||
])
|
||||
|
||||
assert bob.post("/new?uri=test", data=self.data).status_code == 201
|
||||
|
||||
def testDirectReply(self):
|
||||
|
||||
client = self.makeClient("127.0.0.1", 15, 3)
|
||||
|
||||
for url in ("foo", "bar", "baz", "spam"):
|
||||
for _ in range(3):
|
||||
rv = client.post("/new?uri=%s" % url, data=self.data)
|
||||
assert rv.status_code == 201
|
||||
|
||||
for url in ("foo", "bar", "baz", "spam"):
|
||||
rv = client.post("/new?uri=%s" % url, data=self.data)
|
||||
|
||||
assert rv.status_code == 403
|
||||
assert "direct responses to" in rv.get_data(as_text=True)
|
||||
|
||||
def testSelfReply(self):
|
||||
|
||||
payload = lambda id: json.dumps({"text": "...", "parent": id})
|
||||
|
||||
client = self.makeClient("127.0.0.1", self_reply=False)
|
||||
assert client.post("/new?uri=test", data=self.data).status_code == 201
|
||||
assert client.post("/new?uri=test", data=payload(1)).status_code == 403
|
||||
|
||||
client.application.db.execute([
|
||||
"UPDATE comments SET",
|
||||
" created = created - ?",
|
||||
"WHERE id = 1"
|
||||
], (client.application.conf.getint("general", "max-age"), ))
|
||||
|
||||
assert client.post("/new?uri=test", data=payload(1)).status_code == 201
|
||||
|
||||
client = self.makeClient("128.0.0.1", ratelimit=3, self_reply=False)
|
||||
assert client.post("/new?uri=test", data=self.data).status_code == 201
|
||||
assert client.post("/new?uri=test", data=payload(1)).status_code == 201
|
||||
assert client.post("/new?uri=test", data=payload(2)).status_code == 201
|
Loading…
Reference in new issue