# -*- encoding: utf-8 -*- import unittest import json import tempfile from werkzeug import __version__ from werkzeug.test import Client from werkzeug.wrappers import Response from isso import Isso, core from isso.utils import http from fixtures import curl, FakeIP http.curl = curl if __version__.startswith("0.8"): class Response(Response): def get_data(self, as_text=False): return self.data.decode("utf-8") class TestGuard(unittest.TestCase): data = json.dumps({"text": "Lorem ipsum."}) def setUp(self): self.path = tempfile.NamedTemporaryFile().name def makeClient(self, ip, ratelimit=2, direct_reply=3, self_reply=False): conf = core.Config.load(None) conf.set("general", "dbpath", self.path) conf.set("guard", "enabled", "true") conf.set("guard", "ratelimit", str(ratelimit)) conf.set("guard", "direct-reply", str(direct_reply)) conf.set("guard", "reply-to-self", "1" if self_reply else "0") class App(Isso, core.Mixin): pass app = App(conf) app.wsgi_app = FakeIP(app.wsgi_app, ip) return Client(app, Response) def testRateLimit(self): bob = self.makeClient("127.0.0.1", 2) for i in range(2): rv = bob.post('/new?uri=test', data=self.data) assert rv.status_code == 201 rv = bob.post('/new?uri=test', data=self.data) assert rv.status_code == 403 assert "ratelimit exceeded" in rv.get_data(as_text=True) alice = self.makeClient("1.2.3.4", 2) for i in range(2): assert alice.post("/new?uri=test", data=self.data).status_code == 201 bob.application.db.execute([ "UPDATE comments SET", " created = created - 60", "WHERE remote_addr = '127.0.0.0'" ]) assert bob.post("/new?uri=test", data=self.data).status_code == 201 def testDirectReply(self): client = self.makeClient("127.0.0.1", 15, 3) for url in ("foo", "bar", "baz", "spam"): for _ in range(3): rv = client.post("/new?uri=%s" % url, data=self.data) assert rv.status_code == 201 for url in ("foo", "bar", "baz", "spam"): rv = client.post("/new?uri=%s" % url, data=self.data) assert rv.status_code == 403 assert "direct responses to" in rv.get_data(as_text=True) def testSelfReply(self): payload = lambda id: json.dumps({"text": "...", "parent": id}) client = self.makeClient("127.0.0.1", self_reply=False) assert client.post("/new?uri=test", data=self.data).status_code == 201 assert client.post("/new?uri=test", data=payload(1)).status_code == 403 client.application.db.execute([ "UPDATE comments SET", " created = created - ?", "WHERE id = 1" ], (client.application.conf.getint("general", "max-age"), )) assert client.post("/new?uri=test", data=payload(1)).status_code == 201 client = self.makeClient("128.0.0.1", ratelimit=3, self_reply=False) assert client.post("/new?uri=test", data=self.data).status_code == 201 assert client.post("/new?uri=test", data=payload(1)).status_code == 201 assert client.post("/new?uri=test", data=payload(2)).status_code == 201