refactor views and introduce an API for notifications
Keep Isso modular, not monolithic. Make it easy to integrate a web interface or add XMPP notifications. This refactorization includes minor bugfixes and changes: * CORS middleware did not work properly due to wrong unit tests * more type checks on JSON input * new detection for origin and public url, closes #28 * new activation and delete url (no redirect for old urls, but you can convert the old urls: copy hash after `/activate/` (or delete) and open `/id/<id of comment>/activate/<hash>` * move crypto.py to utils/ With this commit, SMTP is no longer automatically configured: add `notify = smtp` to the `[general]` section to use SMTP.
This commit is contained in:
parent
a442b8e0ee
commit
6e85c54a2e
@ -36,6 +36,7 @@ session key and hostname. Here are the default values for this section:
|
||||
host = http://localhost:8080/
|
||||
max-age = 15m
|
||||
session-key = ... # python: binascii.b2a_hex(os.urandom(24))
|
||||
notify =
|
||||
|
||||
dbpath
|
||||
file location to the SQLite3 database, highly recommended to change this
|
||||
@ -67,6 +68,10 @@ max-age
|
||||
time range that allows users to edit/remove their own comments. See
|
||||
:ref:`Appendum: Timedelta <appendum-timedelta>` for valid values.
|
||||
|
||||
notify
|
||||
Select notification backend for new comments. Currently, only SMTP
|
||||
is available.
|
||||
|
||||
|
||||
Moderation
|
||||
----------
|
||||
@ -131,8 +136,8 @@ SMTP
|
||||
----
|
||||
|
||||
Isso can notify you on new comments via SMTP. In the email notification, you
|
||||
also can moderate comments. If the server connection fails during startup, a
|
||||
null mailer is used.
|
||||
also can moderate (=activate or delete) comments. Don't forget to configure
|
||||
``notify = smtp`` in the general section.
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
|
@ -46,21 +46,26 @@ import logging
|
||||
from os.path import dirname, join
|
||||
from argparse import ArgumentParser
|
||||
|
||||
import misaka
|
||||
from itsdangerous import URLSafeTimedSerializer
|
||||
|
||||
from werkzeug.routing import Map, Rule
|
||||
from werkzeug.routing import Map
|
||||
from werkzeug.exceptions import HTTPException, InternalServerError
|
||||
|
||||
from werkzeug.wsgi import SharedDataMiddleware
|
||||
from werkzeug.local import Local, LocalManager
|
||||
from werkzeug.serving import run_simple, WSGIRequestHandler
|
||||
from werkzeug.contrib.fixers import ProxyFix
|
||||
|
||||
from isso import db, migrate, wsgi
|
||||
local = Local()
|
||||
local_manager = LocalManager([local])
|
||||
|
||||
from isso import db, migrate, wsgi, ext
|
||||
from isso.core import ThreadedMixin, uWSGIMixin, Config
|
||||
from isso.utils import parse, http, JSONRequest
|
||||
from isso.utils import parse, http, JSONRequest, origin
|
||||
from isso.views import comments
|
||||
|
||||
from isso.ext.notifications import Stdout, SMTP
|
||||
|
||||
logging.getLogger('werkzeug').setLevel(logging.ERROR)
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@ -72,20 +77,6 @@ logger = logging.getLogger("isso")
|
||||
class Isso(object):
|
||||
|
||||
salt = b"Eech7co8Ohloopo9Ol6baimi"
|
||||
urls = Map([
|
||||
Rule('/new', methods=['POST'], endpoint=comments.new),
|
||||
|
||||
Rule('/id/<int:id>', methods=['GET', 'PUT', 'DELETE'], endpoint=comments.single),
|
||||
Rule('/id/<int:id>/like', methods=['POST'], endpoint=comments.like),
|
||||
Rule('/id/<int:id>/dislike', methods=['POST'], endpoint=comments.dislike),
|
||||
|
||||
Rule('/', methods=['GET'], endpoint=comments.fetch),
|
||||
Rule('/count', methods=['GET'], endpoint=comments.count),
|
||||
Rule('/delete/<string:auth>', endpoint=comments.delete),
|
||||
Rule('/activate/<string:auth>', endpoint=comments.activate),
|
||||
|
||||
Rule('/check-ip', endpoint=comments.checkip)
|
||||
])
|
||||
|
||||
def __init__(self, conf):
|
||||
|
||||
@ -95,19 +86,30 @@ class Isso(object):
|
||||
|
||||
super(Isso, self).__init__(conf)
|
||||
|
||||
subscribers = []
|
||||
subscribers.append(Stdout(None))
|
||||
|
||||
if conf.get("general", "notify") == "smtp":
|
||||
subscribers.append(SMTP(self))
|
||||
|
||||
self.signal = ext.Signal(*subscribers)
|
||||
|
||||
self.urls = Map()
|
||||
self.api = comments.API(self)
|
||||
|
||||
def sign(self, obj):
|
||||
return self.signer.dumps(obj)
|
||||
|
||||
def unsign(self, obj, max_age=None):
|
||||
return self.signer.loads(obj, max_age=max_age or self.conf.getint('general', 'max-age'))
|
||||
|
||||
def markdown(self, text):
|
||||
return misaka.html(text, extensions=misaka.EXT_STRIKETHROUGH
|
||||
| misaka.EXT_SUPERSCRIPT | misaka.EXT_AUTOLINK
|
||||
| misaka.HTML_SKIP_HTML | misaka.HTML_SKIP_IMAGES | misaka.HTML_SAFELINK)
|
||||
|
||||
def dispatch(self, request):
|
||||
adapter = Isso.urls.bind_to_environ(request.environ)
|
||||
local.request = request
|
||||
|
||||
local.host = wsgi.host(request.environ)
|
||||
local.origin = origin(self.conf.getiter("general", "host"))(request.environ)
|
||||
|
||||
adapter = self.urls.bind_to_environ(request.environ)
|
||||
|
||||
try:
|
||||
handler, values = adapter.match()
|
||||
@ -115,7 +117,7 @@ class Isso(object):
|
||||
return e
|
||||
else:
|
||||
try:
|
||||
response = handler(self, request.environ, request, **values)
|
||||
response = handler(request.environ, request, **values)
|
||||
except HTTPException as e:
|
||||
return e
|
||||
except Exception:
|
||||
@ -125,7 +127,6 @@ class Isso(object):
|
||||
return response
|
||||
|
||||
def wsgi_app(self, environ, start_response):
|
||||
|
||||
response = self.dispatch(JSONRequest(environ))
|
||||
return response(environ, start_response)
|
||||
|
||||
@ -164,10 +165,11 @@ def make_app(conf=None):
|
||||
wsgi.SubURI(
|
||||
wsgi.CORSMiddleware(
|
||||
SharedDataMiddleware(
|
||||
ProfilerMiddleware(isso), {
|
||||
'/js': join(dirname(__file__), 'js/'),
|
||||
'/css': join(dirname(__file__), 'css/')}),
|
||||
list(isso.conf.getiter("general", "host")))))
|
||||
ProfilerMiddleware(
|
||||
local_manager.make_middleware(isso)), {
|
||||
'/js': join(dirname(__file__), 'js/'),
|
||||
'/css': join(dirname(__file__), 'css/')}),
|
||||
origin(isso.conf.getiter("general", "host")))))
|
||||
|
||||
return app
|
||||
|
||||
|
65
isso/core.py
65
isso/core.py
@ -9,9 +9,6 @@ import binascii
|
||||
import threading
|
||||
import logging
|
||||
|
||||
import socket
|
||||
import smtplib
|
||||
|
||||
from configparser import ConfigParser
|
||||
|
||||
try:
|
||||
@ -26,7 +23,6 @@ if PY2K:
|
||||
else:
|
||||
import _thread as thread
|
||||
|
||||
from isso import notify
|
||||
from isso.utils import parse
|
||||
from isso.compat import text_type as str
|
||||
|
||||
@ -35,6 +31,25 @@ from werkzeug.contrib.cache import NullCache, SimpleCache
|
||||
logger = logging.getLogger("isso")
|
||||
|
||||
|
||||
class Section:
|
||||
|
||||
def __init__(self, conf, section):
|
||||
self.conf = conf
|
||||
self.section = section
|
||||
|
||||
def get(self, key):
|
||||
return self.conf.get(self.section, key)
|
||||
|
||||
def getint(self, key):
|
||||
return self.conf.getint(self.section, key)
|
||||
|
||||
def getiter(self, key):
|
||||
return self.conf.getiter(self.section, key)
|
||||
|
||||
def getboolean(self, key):
|
||||
return self.conf.getboolean(self.section, key)
|
||||
|
||||
|
||||
class IssoParser(ConfigParser):
|
||||
"""
|
||||
Extended :class:`ConfigParser` to parse human-readable timedeltas
|
||||
@ -81,6 +96,9 @@ class IssoParser(ConfigParser):
|
||||
if item:
|
||||
yield item
|
||||
|
||||
def section(self, section):
|
||||
return Section(self, section)
|
||||
|
||||
|
||||
class Config:
|
||||
|
||||
@ -88,6 +106,7 @@ class Config:
|
||||
"[general]",
|
||||
"dbpath = /tmp/isso.db", "session-key = %r" % binascii.b2a_hex(os.urandom(24)),
|
||||
"host = http://localhost:8080/", "max-age = 15m",
|
||||
"notify = ",
|
||||
"[moderation]",
|
||||
"enabled = false",
|
||||
"purge-after = 30d",
|
||||
@ -128,18 +147,6 @@ class Config:
|
||||
return rv
|
||||
|
||||
|
||||
def SMTP(conf):
|
||||
|
||||
try:
|
||||
mailer = notify.SMTPMailer(conf)
|
||||
logger.info("connected to SMTP server")
|
||||
except (socket.error, smtplib.SMTPException):
|
||||
logger.warn("unable to connect to SMTP server")
|
||||
mailer = notify.NullMailer()
|
||||
|
||||
return mailer
|
||||
|
||||
|
||||
class Cache:
|
||||
"""Wrapper around werkzeug's cache class, to make it compatible to
|
||||
uWSGI's cache framework.
|
||||
@ -188,20 +195,8 @@ class ThreadedMixin(Mixin):
|
||||
if conf.getboolean("moderation", "enabled"):
|
||||
self.purge(conf.getint("moderation", "purge-after"))
|
||||
|
||||
self.mailer = SMTP(conf)
|
||||
self.cache = Cache(SimpleCache(threshold=1024, default_timeout=3600))
|
||||
|
||||
@threaded
|
||||
def notify(self, subject, body, retries=5):
|
||||
|
||||
for x in range(retries):
|
||||
try:
|
||||
self.mailer.sendmail(subject, body)
|
||||
except Exception:
|
||||
time.sleep(60)
|
||||
else:
|
||||
break
|
||||
|
||||
@threaded
|
||||
def purge(self, delta):
|
||||
while True:
|
||||
@ -246,18 +241,7 @@ class uWSGIMixin(Mixin):
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
uwsgi.unlock()
|
||||
|
||||
def spooler(args):
|
||||
try:
|
||||
self.mailer.sendmail(args["subject"].decode('utf-8'), args["body"].decode('utf-8'))
|
||||
except smtplib.SMTPConnectError:
|
||||
return uwsgi.SPOOL_RETRY
|
||||
else:
|
||||
return uwsgi.SPOOL_OK
|
||||
|
||||
uwsgi.spooler = spooler
|
||||
|
||||
self.lock = Lock()
|
||||
self.mailer = SMTP(conf)
|
||||
self.cache = uWSGICache
|
||||
|
||||
timedelta = conf.getint("moderation", "purge-after")
|
||||
@ -267,6 +251,3 @@ class uWSGIMixin(Mixin):
|
||||
|
||||
# run purge once
|
||||
purge(1)
|
||||
|
||||
def notify(self, subject, body, retries=5):
|
||||
uwsgi.spool({"subject": subject.encode('utf-8'), "body": body.encode('utf-8')})
|
||||
|
@ -1,12 +1,12 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
|
||||
class Thread(object):
|
||||
|
||||
def __init__(self, id, uri, title):
|
||||
self.id = id
|
||||
self.uri = uri
|
||||
self.title = title
|
||||
def Thread(id, uri, title):
|
||||
return {
|
||||
"id": id,
|
||||
"uri": uri,
|
||||
"title": title
|
||||
}
|
||||
|
||||
|
||||
class Threads(object):
|
||||
@ -27,3 +27,4 @@ class Threads(object):
|
||||
|
||||
def new(self, uri, title):
|
||||
self.db.execute("INSERT INTO threads (uri, title) VALUES (?, ?)", (uri, title))
|
||||
return self[uri]
|
||||
|
17
isso/ext/__init__.py
Normal file
17
isso/ext/__init__.py
Normal file
@ -0,0 +1,17 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
class Signal(object):
|
||||
|
||||
def __init__(self, *subscriber):
|
||||
self.subscriptions = defaultdict(list)
|
||||
|
||||
for sub in subscriber:
|
||||
for signal, func in sub:
|
||||
self.subscriptions[signal].append(func)
|
||||
|
||||
def __call__(self, origin, *args, **kwargs):
|
||||
for subscriber in self.subscriptions[origin]:
|
||||
subscriber(*args, **kwargs)
|
160
isso/ext/notifications.py
Normal file
160
isso/ext/notifications.py
Normal file
@ -0,0 +1,160 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import time
|
||||
import json
|
||||
|
||||
import socket
|
||||
import smtplib
|
||||
|
||||
from email.header import Header
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger("isso")
|
||||
|
||||
try:
|
||||
import uwsgi
|
||||
except ImportError:
|
||||
uwsgi = None
|
||||
|
||||
from isso.compat import PY2K
|
||||
from isso import local
|
||||
|
||||
if PY2K:
|
||||
from thread import start_new_thread
|
||||
else:
|
||||
from _thread import start_new_thread
|
||||
|
||||
|
||||
class SMTP(object):
|
||||
|
||||
def __init__(self, isso):
|
||||
|
||||
self.isso = isso
|
||||
self.conf = isso.conf.section("smtp")
|
||||
|
||||
# test SMTP connectivity
|
||||
try:
|
||||
with self:
|
||||
logger.info("connected to SMTP server")
|
||||
except (socket.error, smtplib.SMTPException):
|
||||
logger.warn("unable to connect to SMTP server")
|
||||
|
||||
if uwsgi:
|
||||
def spooler(args):
|
||||
try:
|
||||
self._sendmail(args["subject"].decode("utf-8"),
|
||||
args["body"].decode("utf-8"))
|
||||
except smtplib.SMTPConnectError:
|
||||
return uwsgi.SPOOL_RETRY
|
||||
else:
|
||||
return uwsgi.SPOOL_OK
|
||||
|
||||
uwsgi.spooler = spooler
|
||||
|
||||
def __enter__(self):
|
||||
klass = (smtplib.SMTP_SSL if self.conf.getboolean('ssl') else smtplib.SMTP)
|
||||
self.client = klass(host=self.conf.get('host'), port=self.conf.getint('port'))
|
||||
|
||||
if self.conf.get('username') and self.conf.get('password'):
|
||||
self.client.login(self.conf.get('username'),
|
||||
self.conf.get('password'))
|
||||
|
||||
return self.client
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.client.quit()
|
||||
|
||||
def __iter__(self):
|
||||
yield "comments.new:after-save", self.notify
|
||||
|
||||
def format(self, thread, comment):
|
||||
|
||||
permalink = local("origin") + thread["uri"] + "#isso-%i" % comment["id"]
|
||||
|
||||
rv = []
|
||||
rv.append("%s schrieb:" % (comment["author"] or "Jemand"))
|
||||
rv.append("")
|
||||
rv.append(comment["text"])
|
||||
rv.append("")
|
||||
|
||||
if comment["website"]:
|
||||
rv.append("Webseite des Kommentators: %s" % comment["website"])
|
||||
|
||||
rv.append("IP Adresse: %s" % comment["remote_addr"])
|
||||
rv.append("Link zum Kommentar: %s" % permalink)
|
||||
rv.append("")
|
||||
|
||||
uri = local("host") + "/id/%i" % comment["id"]
|
||||
key = self.isso.sign(comment["id"])
|
||||
|
||||
rv.append("---")
|
||||
rv.append("Kommentar löschen: %s" % uri + "/delete/" + key)
|
||||
|
||||
if comment["mode"] == 2:
|
||||
rv.append("Kommentar freischalten: %s" % uri + "/activate/" + key)
|
||||
|
||||
return u'\n'.join(rv)
|
||||
|
||||
def notify(self, thread, comment):
|
||||
|
||||
body = self.format(thread, comment)
|
||||
|
||||
if uwsgi:
|
||||
uwsgi.spool({b"subject": thread["title"].encode("utf-8"),
|
||||
b"body": body.encode("utf-8")})
|
||||
else:
|
||||
start_new_thread(self._retry, (thread["title"], body))
|
||||
|
||||
def _sendmail(self, subject, body):
|
||||
|
||||
from_addr = self.conf.get("from")
|
||||
to_addr = self.conf.get("to")
|
||||
|
||||
msg = MIMEText(body, 'plain', 'utf-8')
|
||||
msg['From'] = "Ich schrei sonst! <%s>" % from_addr
|
||||
msg['To'] = to_addr
|
||||
msg['Subject'] = Header(subject, 'utf-8')
|
||||
|
||||
with self as con:
|
||||
con.sendmail(from_addr, to_addr, msg.as_string())
|
||||
|
||||
def _retry(self, subject, body):
|
||||
for x in range(5):
|
||||
try:
|
||||
self._sendmail(subject, body)
|
||||
except smtplib.SMTPConnectError:
|
||||
time.sleep(60)
|
||||
else:
|
||||
break
|
||||
|
||||
|
||||
class Stdout(object):
|
||||
|
||||
def __init__(self, conf):
|
||||
pass
|
||||
|
||||
def __iter__(self):
|
||||
|
||||
yield "comments.new:new-thread", self._new_thread
|
||||
yield "comments.new:finish", self._new_comment
|
||||
yield "comments.edit", self._edit_comment
|
||||
yield "comments.delete", self._delete_comment
|
||||
yield "comments.activate", self._activate_comment
|
||||
|
||||
def _new_thread(self, thread):
|
||||
logger.info("new thread %(id)s: %(title)s" % thread)
|
||||
|
||||
def _new_comment(self, thread, comment):
|
||||
logger.info("comment created: %s", json.dumps(comment))
|
||||
|
||||
def _edit_comment(self, comment):
|
||||
logger.info('comment %i edited: %s', comment["id"], json.dumps(comment))
|
||||
|
||||
def _delete_comment(self, id):
|
||||
logger.info('comment %i deleted', id)
|
||||
|
||||
def _activate_comment(self, id):
|
||||
logger.info("comment %s activated" % id)
|
@ -1,83 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from smtplib import SMTP, SMTP_SSL
|
||||
from email.header import Header
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
|
||||
def format(comment, permalink, remote_addr, deletion_key, activation_key=None):
|
||||
|
||||
rv = []
|
||||
rv.append("%s schrieb:" % (comment["author"] or "Jemand"))
|
||||
rv.append("")
|
||||
rv.append(comment["text"])
|
||||
rv.append("")
|
||||
|
||||
if comment["website"]:
|
||||
rv.append("Webseite des Kommentators: %s" % comment["website"])
|
||||
|
||||
rv.append("IP Adresse: %s" % remote_addr)
|
||||
rv.append("Link zum Kommentar: %s" % permalink)
|
||||
|
||||
rv.append("")
|
||||
rv.append("---")
|
||||
rv.append("Kommentar löschen: %s" % deletion_key)
|
||||
|
||||
if activation_key:
|
||||
rv.append("Kommentar freischalten: %s" % activation_key)
|
||||
|
||||
return u'\n'.join(rv)
|
||||
|
||||
|
||||
class Connection(object):
|
||||
"""
|
||||
Establish connection to SMTP server, optional with authentication, and
|
||||
close connection afterwards.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
|
||||
def __enter__(self):
|
||||
self.server = (SMTP_SSL if self.conf.getboolean('smtp', 'ssl') else SMTP)(
|
||||
host=self.conf.get('smtp', 'host'), port=self.conf.getint('smtp', 'port'))
|
||||
|
||||
if self.conf.get('smtp', 'username') and self.conf.get('smtp', 'password'):
|
||||
self.server.login(self.conf.get('smtp', 'username'),
|
||||
self.conf.get('smtp', 'password'))
|
||||
|
||||
return self.server
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.server.quit()
|
||||
|
||||
|
||||
class SMTPMailer(object):
|
||||
|
||||
def __init__(self, conf):
|
||||
|
||||
self.conf = conf
|
||||
self.from_addr = conf.get('smtp', 'from')
|
||||
self.to_addr = conf.get('smtp', 'to')
|
||||
|
||||
# test SMTP connectivity
|
||||
with Connection(self.conf):
|
||||
pass
|
||||
|
||||
def sendmail(self, subject, body):
|
||||
|
||||
msg = MIMEText(body, 'plain', 'utf-8')
|
||||
msg['From'] = "Ich schrei sonst! <%s>" % self.from_addr
|
||||
msg['To'] = self.to_addr
|
||||
msg['Subject'] = Header(subject, 'utf-8')
|
||||
|
||||
with Connection(self.conf) as con:
|
||||
con.sendmail(self.from_addr, self.to_addr, msg.as_string())
|
||||
|
||||
|
||||
class NullMailer(object):
|
||||
|
||||
def sendmail(self, subject, body):
|
||||
pass
|
@ -14,6 +14,7 @@ from string import ascii_letters, digits
|
||||
from werkzeug.wrappers import Request
|
||||
from werkzeug.exceptions import BadRequest
|
||||
|
||||
import misaka
|
||||
import ipaddress
|
||||
|
||||
|
||||
@ -103,3 +104,23 @@ class JSONRequest(Request):
|
||||
return json.loads(self.get_data(as_text=True))
|
||||
except ValueError:
|
||||
raise BadRequest('Unable to read JSON request')
|
||||
|
||||
|
||||
def markdown(text):
|
||||
return misaka.html(text, extensions= misaka.EXT_STRIKETHROUGH
|
||||
| misaka.EXT_SUPERSCRIPT | misaka.EXT_AUTOLINK
|
||||
| misaka.HTML_SKIP_HTML | misaka.HTML_SKIP_IMAGES | misaka.HTML_SAFELINK)
|
||||
|
||||
|
||||
def origin(hosts):
|
||||
|
||||
hosts = [x.rstrip("/") for x in hosts]
|
||||
|
||||
def func(environ):
|
||||
for host in hosts:
|
||||
if environ.get("HTTP_ORIGIN", None) == host:
|
||||
return host
|
||||
else:
|
||||
return hosts[0]
|
||||
|
||||
return func
|
||||
|
@ -20,7 +20,7 @@ class requires:
|
||||
self.type = type
|
||||
|
||||
def __call__(self, func):
|
||||
def dec(app, env, req, *args, **kwargs):
|
||||
def dec(cls, env, req, *args, **kwargs):
|
||||
|
||||
if self.param not in req.args:
|
||||
raise BadRequest("missing %s query" % self.param)
|
||||
@ -30,6 +30,6 @@ class requires:
|
||||
except TypeError:
|
||||
raise BadRequest("invalid type for %s, expected %s" % (self.param, self.type))
|
||||
|
||||
return func(app, env, req, *args, **kwargs)
|
||||
return func(cls, env, req, *args, **kwargs)
|
||||
|
||||
return dec
|
||||
|
@ -4,141 +4,186 @@ import cgi
|
||||
import json
|
||||
import time
|
||||
import hashlib
|
||||
import logging
|
||||
import functools
|
||||
|
||||
from itsdangerous import SignatureExpired, BadSignature
|
||||
|
||||
from werkzeug.http import dump_cookie
|
||||
from werkzeug.routing import Rule
|
||||
from werkzeug.wrappers import Response
|
||||
from werkzeug.exceptions import BadRequest, Forbidden, NotFound
|
||||
|
||||
from isso.compat import text_type as str
|
||||
|
||||
from isso import utils, notify, db
|
||||
from isso.utils import http, parse
|
||||
from isso import utils, local
|
||||
from isso.utils import http, parse, markdown
|
||||
from isso.utils.crypto import pbkdf2
|
||||
from isso.views import requires
|
||||
from isso.crypto import pbkdf2
|
||||
|
||||
logger = logging.getLogger("isso")
|
||||
|
||||
FIELDS = set(['id', 'parent', 'text', 'author', 'website', 'email', 'mode',
|
||||
'created', 'modified', 'likes', 'dislikes', 'hash'])
|
||||
|
||||
|
||||
@requires(str, 'uri')
|
||||
def new(app, environ, request, uri):
|
||||
def md5(text):
|
||||
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
||||
|
||||
data = request.get_json()
|
||||
|
||||
for field in set(data.keys()) - set(['text', 'author', 'website', 'email', 'parent']):
|
||||
data.pop(field)
|
||||
class JSON(Response):
|
||||
|
||||
if "text" not in data or data["text"] is None or len(data["text"]) < 3:
|
||||
raise BadRequest("no text given")
|
||||
def __init__(self, *args):
|
||||
return super(JSON, self).__init__(*args, content_type='application/json')
|
||||
|
||||
if "id" in data and not isinstance(data["id"], int):
|
||||
raise BadRequest("parent id must be an integer")
|
||||
|
||||
if len(data.get("email") or "") > 254:
|
||||
raise BadRequest("http://tools.ietf.org/html/rfc5321#section-4.5.3")
|
||||
class API(object):
|
||||
|
||||
for field in ("author", "email"):
|
||||
if data.get(field):
|
||||
data[field] = cgi.escape(data[field])
|
||||
FIELDS = set(['id', 'parent', 'text', 'author', 'website', 'email',
|
||||
'mode', 'created', 'modified', 'likes', 'dislikes', 'hash'])
|
||||
|
||||
data['mode'] = (app.conf.getboolean('moderation', 'enabled') and 2) or 1
|
||||
data['remote_addr'] = utils.anonymize(str(request.remote_addr))
|
||||
# comment fields, that can be submitted
|
||||
ACCEPT = set(['text', 'author', 'website', 'email', 'parent'])
|
||||
|
||||
with app.lock:
|
||||
if uri not in app.db.threads:
|
||||
for host in app.conf.getiter('general', 'host'):
|
||||
with http.curl('GET', host, uri) as resp:
|
||||
VIEWS = [
|
||||
('fetch', ('GET', '/')),
|
||||
('new', ('POST', '/new')),
|
||||
('count', ('GET', '/count')),
|
||||
('view', ('GET', '/id/<int:id>')),
|
||||
('edit', ('PUT', '/id/<int:id>')),
|
||||
('delete', ('DELETE', '/id/<int:id>')),
|
||||
('delete', ('GET', '/id/<int:id>/delete/<string:key>')),
|
||||
('like', ('POST', '/id/<int:id>/like')),
|
||||
('dislike', ('POST', '/id/<int:id>/dislike')),
|
||||
('checkip', ('GET', '/check-ip'))
|
||||
]
|
||||
|
||||
def __init__(self, isso):
|
||||
|
||||
self.isso = isso
|
||||
self.cache = isso.cache
|
||||
self.signal = isso.signal
|
||||
|
||||
self.conf = isso.conf.section("general")
|
||||
self.moderated = isso.conf.getboolean("moderation", "enabled")
|
||||
|
||||
self.threads = isso.db.threads
|
||||
self.comments = isso.db.comments
|
||||
|
||||
for (view, (method, path)) in self.VIEWS:
|
||||
isso.urls.add(
|
||||
Rule(path, methods=[method], endpoint=getattr(self, view)))
|
||||
|
||||
@classmethod
|
||||
def verify(cls, comment):
|
||||
|
||||
if "text" not in comment:
|
||||
return False, "text is missing"
|
||||
|
||||
if not isinstance(comment.get("parent"), (int, type(None))):
|
||||
return False, "parent must be an integer or null"
|
||||
|
||||
for key in ("text", "author", "website", "email"):
|
||||
if not isinstance(comment.get(key), (str, type(None))):
|
||||
return False, "%s must be a string or null" % key
|
||||
|
||||
if len(comment["text"]) < 3:
|
||||
return False, "text is too short (minimum length: 3)"
|
||||
|
||||
if len(comment.get("email") or "") > 254:
|
||||
return False, "http://tools.ietf.org/html/rfc5321#section-4.5.3"
|
||||
|
||||
return True, ""
|
||||
|
||||
@requires(str, 'uri')
|
||||
def new(self, environ, request, uri):
|
||||
|
||||
data = request.get_json()
|
||||
|
||||
for field in set(data.keys()) - API.ACCEPT:
|
||||
data.pop(field)
|
||||
|
||||
for key in ("author", "email", "website", "parent"):
|
||||
data.setdefault(key, None)
|
||||
|
||||
valid, reason = API.verify(data)
|
||||
if not valid:
|
||||
return BadRequest(reason)
|
||||
|
||||
for field in ("author", "email"):
|
||||
if data.get(field) is not None:
|
||||
data[field] = cgi.escape(data[field])
|
||||
|
||||
data['mode'] = 2 if self.moderated else 1
|
||||
data['remote_addr'] = utils.anonymize(str(request.remote_addr))
|
||||
|
||||
with self.isso.lock:
|
||||
if uri not in self.threads:
|
||||
with http.curl('GET', local("origin"), uri) as resp:
|
||||
if resp and resp.status == 200:
|
||||
title = parse.title(resp.read())
|
||||
break
|
||||
else:
|
||||
return NotFound('URI does not exist')
|
||||
|
||||
thread = self.threads.new(uri, title)
|
||||
self.signal("comments.new:new-thread", thread)
|
||||
else:
|
||||
return Response('URI does not exist', 404)
|
||||
thread = self.threads[uri]
|
||||
|
||||
app.db.threads.new(uri, title)
|
||||
logger.info('new thread: %s -> %s', uri, title)
|
||||
else:
|
||||
title = app.db.threads[uri].title
|
||||
# notify extensions that the new comment is about to save
|
||||
self.signal("comments.new:before-save", thread, data)
|
||||
|
||||
try:
|
||||
with app.lock:
|
||||
rv = app.db.comments.add(uri, data)
|
||||
except db.IssoDBException:
|
||||
raise Forbidden
|
||||
if data is None:
|
||||
raise Forbidden
|
||||
|
||||
host = list(app.conf.getiter('general', 'host'))[0].rstrip("/")
|
||||
href = host + uri + "#isso-%i" % rv["id"]
|
||||
with self.isso.lock:
|
||||
rv = self.comments.add(uri, data)
|
||||
|
||||
deletion = host + environ["SCRIPT_NAME"] + "/delete/" + app.sign(str(rv["id"]))
|
||||
activation = None
|
||||
# notify extension, that the new comment has been successfully saved
|
||||
self.signal("comments.new:after-save", thread, rv)
|
||||
|
||||
if app.conf.getboolean('moderation', 'enabled'):
|
||||
activation = host + environ["SCRIPT_NAME"] + "/activate/" + app.sign(str(rv["id"]))
|
||||
cookie = functools.partial(dump_cookie,
|
||||
value=self.isso.sign([rv["id"], md5(rv["text"])]),
|
||||
max_age=self.conf.getint('max-age'))
|
||||
|
||||
app.notify(title, notify.format(rv, href, utils.anonymize(str(request.remote_addr)),
|
||||
activation_key=activation, deletion_key=deletion))
|
||||
rv["text"] = markdown(rv["text"])
|
||||
rv["hash"] = str(pbkdf2(rv['email'] or rv['remote_addr'], self.isso.salt, 1000, 6))
|
||||
|
||||
# save checksum of text into cookie, so mallory can't modify/delete a comment, if
|
||||
# he add a comment, then removed it but not the signed cookie.
|
||||
checksum = hashlib.md5(rv["text"].encode('utf-8')).hexdigest()
|
||||
self.cache.set('hash', (rv['email'] or rv['remote_addr']).encode('utf-8'), rv['hash'])
|
||||
|
||||
rv["text"] = app.markdown(rv["text"])
|
||||
rv["hash"] = str(pbkdf2(rv['email'] or rv['remote_addr'], app.salt, 1000, 6))
|
||||
for key in set(rv.keys()) - API.FIELDS:
|
||||
rv.pop(key)
|
||||
|
||||
app.cache.set('hash', (rv['email'] or rv['remote_addr']).encode('utf-8'), rv['hash'])
|
||||
# success!
|
||||
self.signal("comments.new:finish", thread, rv)
|
||||
|
||||
for key in set(rv.keys()) - FIELDS:
|
||||
rv.pop(key)
|
||||
resp = JSON(json.dumps(rv), 202 if rv["mode"] == 2 else 201)
|
||||
resp.headers.add("Set-Cookie", cookie(str(rv["id"])))
|
||||
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % rv["id"]))
|
||||
return resp
|
||||
|
||||
# success!
|
||||
logger.info('comment created: %s', json.dumps(rv))
|
||||
def view(self, environ, request, id):
|
||||
|
||||
cookie = functools.partial(dump_cookie,
|
||||
value=app.sign([rv["id"], checksum]),
|
||||
max_age=app.conf.getint('general', 'max-age'))
|
||||
|
||||
resp = Response(json.dumps(rv), 202 if rv["mode"] == 2 else 201, content_type='application/json')
|
||||
resp.headers.add("Set-Cookie", cookie(str(rv["id"])))
|
||||
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % rv["id"]))
|
||||
return resp
|
||||
|
||||
|
||||
def single(app, environ, request, id):
|
||||
|
||||
if request.method == 'GET':
|
||||
rv = app.db.comments.get(id)
|
||||
rv = self.comments.get(id)
|
||||
if rv is None:
|
||||
raise NotFound
|
||||
|
||||
for key in set(rv.keys()) - FIELDS:
|
||||
for key in set(rv.keys()) - API.FIELDS:
|
||||
rv.pop(key)
|
||||
|
||||
if request.args.get('plain', '0') == '0':
|
||||
rv['text'] = app.markdown(rv['text'])
|
||||
rv['text'] = markdown(rv['text'])
|
||||
|
||||
return Response(json.dumps(rv), 200, content_type='application/json')
|
||||
|
||||
try:
|
||||
rv = app.unsign(request.cookies.get(str(id), ''))
|
||||
except (SignatureExpired, BadSignature):
|
||||
def edit(self, environ, request, id):
|
||||
|
||||
try:
|
||||
rv = app.unsign(request.cookies.get('admin', ''))
|
||||
rv = self.isso.unsign(request.cookies.get(str(id), ''))
|
||||
except (SignatureExpired, BadSignature):
|
||||
raise Forbidden
|
||||
|
||||
if rv[0] != id:
|
||||
raise Forbidden
|
||||
if rv[0] != id:
|
||||
raise Forbidden
|
||||
|
||||
# verify checksum, mallory might skip cookie deletion when he deletes a comment
|
||||
if rv[1] != hashlib.md5(app.db.comments.get(id)["text"].encode('utf-8')).hexdigest():
|
||||
raise Forbidden
|
||||
# verify checksum, mallory might skip cookie deletion when he deletes a comment
|
||||
if rv[1] != md5(self.comments.get(id)["text"]):
|
||||
raise Forbidden
|
||||
|
||||
if request.method == 'PUT':
|
||||
data = request.get_json()
|
||||
|
||||
if "text" not in data or data["text"] is None or len(data["text"]) < 3:
|
||||
@ -149,123 +194,121 @@ def single(app, environ, request, id):
|
||||
|
||||
data['modified'] = time.time()
|
||||
|
||||
with app.lock:
|
||||
rv = app.db.comments.update(id, data)
|
||||
with self.isso.lock:
|
||||
rv = self.comments.update(id, data)
|
||||
|
||||
for key in set(rv.keys()) - FIELDS:
|
||||
for key in set(rv.keys()) - API.FIELDS:
|
||||
rv.pop(key)
|
||||
|
||||
logger.info('comment %i edited: %s', id, json.dumps(rv))
|
||||
|
||||
checksum = hashlib.md5(rv["text"].encode('utf-8')).hexdigest()
|
||||
rv["text"] = app.markdown(rv["text"])
|
||||
self.signal("comments.edit", rv)
|
||||
|
||||
cookie = functools.partial(dump_cookie,
|
||||
value=app.sign([rv["id"], checksum]),
|
||||
max_age=app.conf.getint('general', 'max-age'))
|
||||
value=self.isso.sign([rv["id"], md5(rv["text"])]),
|
||||
max_age=self.conf.getint('max-age'))
|
||||
|
||||
resp = Response(json.dumps(rv), 200, content_type='application/json')
|
||||
rv["text"] = markdown(rv["text"])
|
||||
|
||||
resp = JSON(json.dumps(rv), 200)
|
||||
resp.headers.add("Set-Cookie", cookie(str(rv["id"])))
|
||||
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % rv["id"]))
|
||||
return resp
|
||||
|
||||
if request.method == 'DELETE':
|
||||
def delete(self, environ, request, id, key=None):
|
||||
|
||||
item = app.db.comments.get(id)
|
||||
app.cache.delete('hash', (item['email'] or item['remote_addr']).encode('utf-8'))
|
||||
try:
|
||||
rv = self.isso.unsign(request.cookies.get(str(id), ""))
|
||||
except (SignatureExpired, BadSignature):
|
||||
try:
|
||||
id = self.isso.unsign(key or "", max_age=2**32)
|
||||
except (BadSignature, SignatureExpired):
|
||||
raise Forbidden
|
||||
else:
|
||||
if rv[0] != id:
|
||||
raise Forbidden
|
||||
|
||||
rv = app.db.comments.delete(id)
|
||||
# verify checksum, mallory might skip cookie deletion when he deletes a comment
|
||||
if rv[1] != md5(self.comments.get(id)["text"]):
|
||||
raise Forbidden
|
||||
|
||||
item = self.comments.get(id)
|
||||
|
||||
if item is None:
|
||||
raise NotFound
|
||||
|
||||
self.cache.delete('hash', (item['email'] or item['remote_addr']).encode('utf-8'))
|
||||
|
||||
rv = self.comments.delete(id)
|
||||
if rv:
|
||||
for key in set(rv.keys()) - FIELDS:
|
||||
for key in set(rv.keys()) - API.FIELDS:
|
||||
rv.pop(key)
|
||||
|
||||
logger.info('comment %i deleted', id)
|
||||
self.signal("comments.delete", id)
|
||||
|
||||
resp = JSON(json.dumps(rv), 200)
|
||||
cookie = functools.partial(dump_cookie, expires=0, max_age=0)
|
||||
|
||||
resp = Response(json.dumps(rv), 200, content_type='application/json')
|
||||
resp.headers.add("Set-Cookie", cookie(str(id)))
|
||||
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % id))
|
||||
return resp
|
||||
|
||||
def activate(self, environ, request, _, key):
|
||||
|
||||
@requires(str, 'uri')
|
||||
def fetch(app, environ, request, uri):
|
||||
try:
|
||||
id = self.isso.unsign(key, max_age=2**32)
|
||||
except (BadSignature, SignatureExpired):
|
||||
raise Forbidden
|
||||
|
||||
rv = list(app.db.comments.fetch(uri))
|
||||
if not rv:
|
||||
raise NotFound
|
||||
with self.isso.lock:
|
||||
self.comments.activate(id)
|
||||
|
||||
for item in rv:
|
||||
self.signal("comments.activate", id)
|
||||
return Response("Yo", 200)
|
||||
|
||||
key = item['email'] or item['remote_addr']
|
||||
val = app.cache.get('hash', key.encode('utf-8'))
|
||||
@requires(str, 'uri')
|
||||
def fetch(self, environ, request, uri):
|
||||
|
||||
if val is None:
|
||||
val = str(pbkdf2(key, app.salt, 1000, 6))
|
||||
app.cache.set('hash', key.encode('utf-8'), val)
|
||||
rv = list(self.comments.fetch(uri))
|
||||
if not rv:
|
||||
raise NotFound
|
||||
|
||||
item['hash'] = val
|
||||
|
||||
for key in set(item.keys()) - FIELDS:
|
||||
item.pop(key)
|
||||
|
||||
if request.args.get('plain', '0') == '0':
|
||||
for item in rv:
|
||||
item['text'] = app.markdown(item['text'])
|
||||
|
||||
return Response(json.dumps(rv), 200, content_type='application/json')
|
||||
key = item['email'] or item['remote_addr']
|
||||
val = self.cache.get('hash', key.encode('utf-8'))
|
||||
|
||||
if val is None:
|
||||
val = str(pbkdf2(key, self.isso.salt, 1000, 6))
|
||||
self.cache.set('hash', key.encode('utf-8'), val)
|
||||
|
||||
def like(app, environ, request, id):
|
||||
item['hash'] = val
|
||||
|
||||
nv = app.db.comments.vote(True, id, utils.anonymize(str(request.remote_addr)))
|
||||
return Response(json.dumps(nv), 200)
|
||||
for key in set(item.keys()) - API.FIELDS:
|
||||
item.pop(key)
|
||||
|
||||
if request.args.get('plain', '0') == '0':
|
||||
for item in rv:
|
||||
item['text'] = markdown(item['text'])
|
||||
|
||||
def dislike(app, environ, request, id):
|
||||
return JSON(json.dumps(rv), 200)
|
||||
|
||||
nv = app.db.comments.vote(False, id, utils.anonymize(str(request.remote_addr)))
|
||||
return Response(json.dumps(nv), 200)
|
||||
def like(self, environ, request, id):
|
||||
|
||||
nv = self.comments.vote(True, id, utils.anonymize(str(request.remote_addr)))
|
||||
return Response(json.dumps(nv), 200)
|
||||
|
||||
@requires(str, 'uri')
|
||||
def count(app, environ, request, uri):
|
||||
def dislike(self, environ, request, id):
|
||||
|
||||
rv = app.db.comments.count(uri)[0]
|
||||
nv = self.comments.vote(False, id, utils.anonymize(str(request.remote_addr)))
|
||||
return Response(json.dumps(nv), 200)
|
||||
|
||||
if rv == 0:
|
||||
raise NotFound
|
||||
@requires(str, 'uri')
|
||||
def count(self, environ, request, uri):
|
||||
|
||||
return Response(json.dumps(rv), 200, content_type='application/json')
|
||||
rv = self.comments.count(uri)[0]
|
||||
|
||||
if rv == 0:
|
||||
raise NotFound
|
||||
|
||||
def activate(app, environ, request, auth):
|
||||
return JSON(json.dumps(rv), 200)
|
||||
|
||||
try:
|
||||
id = app.unsign(auth, max_age=2**32)
|
||||
except (BadSignature, SignatureExpired):
|
||||
raise Forbidden
|
||||
|
||||
with app.lock:
|
||||
app.db.comments.activate(id)
|
||||
|
||||
logger.info("comment %s activated" % id)
|
||||
return Response("Yo", 200)
|
||||
|
||||
def delete(app, environ, request, auth):
|
||||
|
||||
try:
|
||||
id = app.unsign(auth, max_age=2**32)
|
||||
except (BadSignature, SignatureExpired):
|
||||
raise Forbidden
|
||||
|
||||
with app.lock:
|
||||
app.db.comments.delete(id)
|
||||
|
||||
logger.info("comment %s deleted" % id)
|
||||
return Response("%s successfully removed" % id)
|
||||
|
||||
|
||||
def checkip(app, env, req):
|
||||
return Response(utils.anonymize(str(req.remote_addr)), 200)
|
||||
def checkip(self, env, req):
|
||||
return Response(utils.anonymize(str(req.remote_addr)), 200)
|
||||
|
42
isso/wsgi.py
42
isso/wsgi.py
@ -1,8 +1,36 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
try:
|
||||
from urllib import quote
|
||||
except ImportError:
|
||||
from urllib.parse import quote
|
||||
|
||||
from werkzeug.datastructures import Headers
|
||||
|
||||
|
||||
def host(environ):
|
||||
"""
|
||||
Reconstruct host from environment. A modified version
|
||||
of http://www.python.org/dev/peps/pep-0333/#url-reconstruction
|
||||
"""
|
||||
|
||||
url = environ['wsgi.url_scheme']+'://'
|
||||
|
||||
if environ.get('HTTP_HOST'):
|
||||
url += environ['HTTP_HOST']
|
||||
else:
|
||||
url += environ['SERVER_NAME']
|
||||
|
||||
if environ['wsgi.url_scheme'] == 'https':
|
||||
if environ['SERVER_PORT'] != '443':
|
||||
url += ':' + environ['SERVER_PORT']
|
||||
else:
|
||||
if environ['SERVER_PORT'] != '80':
|
||||
url += ':' + environ['SERVER_PORT']
|
||||
|
||||
return url + quote(environ.get('SCRIPT_NAME', ''))
|
||||
|
||||
|
||||
class SubURI(object):
|
||||
|
||||
def __init__(self, app):
|
||||
@ -22,23 +50,15 @@ class SubURI(object):
|
||||
|
||||
class CORSMiddleware(object):
|
||||
|
||||
def __init__(self, app, hosts):
|
||||
def __init__(self, app, origin):
|
||||
self.app = app
|
||||
self.hosts = hosts
|
||||
self.origin = origin
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
|
||||
def add_cors_headers(status, headers, exc_info=None):
|
||||
|
||||
for host in self.hosts:
|
||||
if environ.get("HTTP_ORIGIN", None) == host.rstrip("/"):
|
||||
origin = host.rstrip("/")
|
||||
break
|
||||
else:
|
||||
origin = host.rstrip("/")
|
||||
|
||||
headers = Headers(headers)
|
||||
headers.add("Access-Control-Allow-Origin", origin)
|
||||
headers.add("Access-Control-Allow-Origin", self.origin(environ))
|
||||
headers.add("Access-Control-Allow-Headers", "Origin, Content-Type")
|
||||
headers.add("Access-Control-Allow-Credentials", "true")
|
||||
headers.add("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE")
|
||||
|
@ -16,7 +16,7 @@ except ImportError:
|
||||
from werkzeug.test import Client
|
||||
from werkzeug.wrappers import Response
|
||||
|
||||
from isso import Isso, notify, views, core
|
||||
from isso import Isso, core
|
||||
from isso.utils import http
|
||||
from isso.views import comments
|
||||
|
||||
@ -250,7 +250,7 @@ class TestComments(unittest.TestCase):
|
||||
|
||||
rv = loads(rv.data)
|
||||
|
||||
for key in comments.FIELDS:
|
||||
for key in comments.API.FIELDS:
|
||||
rv.pop(key)
|
||||
|
||||
assert not any(rv.keys())
|
||||
|
@ -5,6 +5,7 @@ from werkzeug.test import Client
|
||||
from werkzeug.wrappers import Response
|
||||
|
||||
from isso.wsgi import CORSMiddleware
|
||||
from isso.utils import origin
|
||||
|
||||
|
||||
def hello_world(environ, start_response):
|
||||
@ -14,11 +15,11 @@ def hello_world(environ, start_response):
|
||||
|
||||
def test_simple_CORS():
|
||||
|
||||
app = CORSMiddleware(hello_world, hosts=[
|
||||
app = CORSMiddleware(hello_world, origin=origin([
|
||||
"https://example.tld/",
|
||||
"http://example.tld/",
|
||||
"http://example.tld",
|
||||
])
|
||||
]))
|
||||
|
||||
client = Client(app, Response)
|
||||
|
||||
@ -30,19 +31,19 @@ def test_simple_CORS():
|
||||
assert rv.headers["Access-Control-Allow-Methods"] == "GET, POST, PUT, DELETE"
|
||||
assert rv.headers["Access-Control-Expose-Headers"] == "X-Set-Cookie"
|
||||
|
||||
a = client.get("/", headers={"ORIGIN": "http://example.tld/"})
|
||||
a = client.get("/", headers={"ORIGIN": "http://example.tld"})
|
||||
assert a.headers["Access-Control-Allow-Origin"] == "http://example.tld"
|
||||
|
||||
b = client.get("/", headers={"ORIGIN": "http://example.tld"})
|
||||
assert a.headers["Access-Control-Allow-Origin"] == "http://example.tld"
|
||||
assert b.headers["Access-Control-Allow-Origin"] == "http://example.tld"
|
||||
|
||||
c = client.get("/", headers={"ORIGIN": "http://foo.other/"})
|
||||
assert a.headers["Access-Control-Allow-Origin"] == "http://example.tld"
|
||||
c = client.get("/", headers={"ORIGIN": "http://foo.other"})
|
||||
assert c.headers["Access-Control-Allow-Origin"] == "https://example.tld"
|
||||
|
||||
|
||||
def test_preflight_CORS():
|
||||
|
||||
app = CORSMiddleware(hello_world, hosts=["http://example.tld"])
|
||||
app = CORSMiddleware(hello_world, origin=origin(["http://example.tld"]))
|
||||
client = Client(app, Response)
|
||||
|
||||
rv = client.open(method="OPTIONS", path="/", headers={"ORIGIN": "http://example.tld"})
|
||||
|
@ -9,7 +9,7 @@ import unittest
|
||||
from werkzeug.test import Client
|
||||
from werkzeug.wrappers import Response
|
||||
|
||||
from isso import Isso, notify, utils, core
|
||||
from isso import Isso, core
|
||||
from isso.utils import http
|
||||
|
||||
class Dummy:
|
||||
|
Loading…
Reference in New Issue
Block a user