isso/specs/test_guard.py

109 lines
3.3 KiB
Python
Raw Normal View History

2013-11-13 15:12:47 +00:00
# -*- encoding: utf-8 -*-
import unittest
import json
import tempfile
2013-11-18 11:40:27 +00:00
from werkzeug import __version__
2013-11-13 15:12:47 +00:00
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, core
from isso.utils import http
from fixtures import curl, FakeIP
http.curl = curl
2013-11-18 11:40:27 +00:00
if __version__.startswith("0.8"):
class Response(Response):
def get_data(self, as_text=False):
return self.data.decode("utf-8")
2013-11-13 15:12:47 +00:00
class TestGuard(unittest.TestCase):
data = json.dumps({"text": "Lorem ipsum."})
def setUp(self):
self.path = tempfile.NamedTemporaryFile().name
def makeClient(self, ip, ratelimit=2, direct_reply=3, self_reply=False):
2013-11-13 15:12:47 +00:00
conf = core.Config.load(None)
conf.set("general", "dbpath", self.path)
conf.set("guard", "enabled", "true")
conf.set("guard", "ratelimit", str(ratelimit))
conf.set("guard", "direct-reply", str(direct_reply))
conf.set("guard", "reply-to-self", "1" if self_reply else "0")
2013-11-13 15:12:47 +00:00
class App(Isso, core.Mixin):
pass
app = App(conf)
app.wsgi_app = FakeIP(app.wsgi_app, ip)
return Client(app, Response)
def testRateLimit(self):
bob = self.makeClient("127.0.0.1", 2)
for i in range(2):
rv = bob.post('/new?uri=test', data=self.data)
assert rv.status_code == 201
rv = bob.post('/new?uri=test', data=self.data)
assert rv.status_code == 403
2013-11-13 20:30:18 +00:00
assert "ratelimit exceeded" in rv.get_data(as_text=True)
2013-11-13 15:12:47 +00:00
alice = self.makeClient("1.2.3.4", 2)
for i in range(2):
assert alice.post("/new?uri=test", data=self.data).status_code == 201
bob.application.db.execute([
"UPDATE comments SET",
" created = created - 60",
"WHERE remote_addr = '127.0.0.0'"
])
assert bob.post("/new?uri=test", data=self.data).status_code == 201
def testDirectReply(self):
client = self.makeClient("127.0.0.1", 15, 3)
for url in ("foo", "bar", "baz", "spam"):
for _ in range(3):
rv = client.post("/new?uri=%s" % url, data=self.data)
assert rv.status_code == 201
for url in ("foo", "bar", "baz", "spam"):
rv = client.post("/new?uri=%s" % url, data=self.data)
assert rv.status_code == 403
2013-11-13 20:30:18 +00:00
assert "direct responses to" in rv.get_data(as_text=True)
def testSelfReply(self):
payload = lambda id: json.dumps({"text": "...", "parent": id})
client = self.makeClient("127.0.0.1", self_reply=False)
assert client.post("/new?uri=test", data=self.data).status_code == 201
assert client.post("/new?uri=test", data=payload(1)).status_code == 403
client.application.db.execute([
"UPDATE comments SET",
" created = created - ?",
"WHERE id = 1"
], (client.application.conf.getint("general", "max-age"), ))
assert client.post("/new?uri=test", data=payload(1)).status_code == 201
client = self.makeClient("128.0.0.1", ratelimit=3, self_reply=False)
assert client.post("/new?uri=test", data=self.data).status_code == 201
assert client.post("/new?uri=test", data=payload(1)).status_code == 201
assert client.post("/new?uri=test", data=payload(2)).status_code == 201