isso/specs/test_guard.py

112 lines
3.5 KiB
Python
Raw Normal View History

2013-11-13 15:12:47 +00:00
# -*- encoding: utf-8 -*-
try:
import unittest2 as unittest
except ImportError:
import unittest
2013-11-13 15:12:47 +00:00
import json
import tempfile
2013-11-18 11:40:27 +00:00
from werkzeug import __version__
2013-11-13 15:12:47 +00:00
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, core
from isso.utils import http
from fixtures import curl, FakeIP
http.curl = curl
2013-11-18 11:40:27 +00:00
if __version__.startswith("0.8"):
class Response(Response):
def get_data(self, as_text=False):
return self.data.decode("utf-8")
2013-11-13 15:12:47 +00:00
class TestGuard(unittest.TestCase):
data = json.dumps({"text": "Lorem ipsum."})
def setUp(self):
self.path = tempfile.NamedTemporaryFile().name
def makeClient(self, ip, ratelimit=2, direct_reply=3, self_reply=False):
2013-11-13 15:12:47 +00:00
conf = core.Config.load(None)
conf.set("general", "dbpath", self.path)
conf.set("guard", "enabled", "true")
conf.set("guard", "ratelimit", str(ratelimit))
conf.set("guard", "direct-reply", str(direct_reply))
conf.set("guard", "reply-to-self", "1" if self_reply else "0")
2013-11-13 15:12:47 +00:00
class App(Isso, core.Mixin):
pass
app = App(conf)
app.wsgi_app = FakeIP(app.wsgi_app, ip)
return Client(app, Response)
def testRateLimit(self):
bob = self.makeClient("127.0.0.1", 2)
for i in range(2):
rv = bob.post('/new?uri=test', data=self.data)
self.assertEqual(rv.status_code, 201)
2013-11-13 15:12:47 +00:00
rv = bob.post('/new?uri=test', data=self.data)
self.assertEqual(rv.status_code, 403)
self.assertIn("ratelimit exceeded", rv.get_data(as_text=True))
2013-11-13 15:12:47 +00:00
alice = self.makeClient("1.2.3.4", 2)
for i in range(2):
self.assertEqual(alice.post("/new?uri=test", data=self.data).status_code, 201)
2013-11-13 15:12:47 +00:00
bob.application.db.execute([
"UPDATE comments SET",
" created = created - 60",
"WHERE remote_addr = '127.0.0.0'"
])
self.assertEqual(bob.post("/new?uri=test", data=self.data).status_code, 201)
2013-11-13 15:12:47 +00:00
def testDirectReply(self):
client = self.makeClient("127.0.0.1", 15, 3)
for url in ("foo", "bar", "baz", "spam"):
for _ in range(3):
rv = client.post("/new?uri=%s" % url, data=self.data)
self.assertEqual(rv.status_code, 201)
2013-11-13 15:12:47 +00:00
for url in ("foo", "bar", "baz", "spam"):
rv = client.post("/new?uri=%s" % url, data=self.data)
self.assertEqual(rv.status_code, 403)
self.assertIn("direct responses to", rv.get_data(as_text=True))
def testSelfReply(self):
payload = lambda id: json.dumps({"text": "...", "parent": id})
client = self.makeClient("127.0.0.1", self_reply=False)
self.assertEqual(client.post("/new?uri=test", data=self.data).status_code, 201)
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 403)
client.application.db.execute([
"UPDATE comments SET",
" created = created - ?",
"WHERE id = 1"
], (client.application.conf.getint("general", "max-age"), ))
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 201)
client = self.makeClient("128.0.0.1", ratelimit=3, self_reply=False)
self.assertEqual(client.post("/new?uri=test", data=self.data).status_code, 201)
self.assertEqual(client.post("/new?uri=test", data=payload(1)).status_code, 201)
self.assertEqual(client.post("/new?uri=test", data=payload(2)).status_code, 201)