another approach to fix #40 (return 403 on false Content-Type)

When an attacker uses a <form> to downvote a comment, the browser
*should* add a `Content-Type: ...` header with three possible values:

    * application/x-www-form-urlencoded
    * multipart/form-data
    * text/plain

If the header is not sent or requests `application/json`, the
request is not forged (XHR is restricted by CORS separately).
This commit is contained in:
Martin Zimmermann 2013-12-04 23:19:51 +01:00
parent 1db06bbf39
commit 232e2fb474
5 changed files with 69 additions and 16 deletions

View File

@ -85,6 +85,7 @@ define(["q"], function(Q) {
try {
xhr.open(method, url, true);
xhr.withCredentials = true;
xhr.setRequestHeader("Content-Type", "application/json");
if (method === "GET") {
xhr.setRequestHeader("X-Origin", window.location.origin);

View File

@ -31,6 +31,30 @@ class JSON(Response):
return super(JSON, self).__init__(*args, content_type='application/json')
def xhr(func):
"""A decorator to check for CSRF on POST/PUT/DELETE using a <form>
element and JS to execute automatically (see #40 for a proof-of-concept).
When an attacker uses a <form> to downvote a comment, the browser *should*
add a `Content-Type: ...` header with three possible values:
* application/x-www-form-urlencoded
* multipart/form-data
* text/plain
If the header is not sent or requests `application/json`, the request is
not forged (XHR is restricted by CORS separately).
"""
def dec(self, env, req, *args, **kwargs):
if req.content_type and not req.content_type.startswith("application/json"):
raise Forbidden("CSRF")
return func(self, env, req, *args, **kwargs)
return dec
class API(object):
FIELDS = set(['id', 'parent', 'text', 'author', 'website', 'email',
@ -91,6 +115,7 @@ class API(object):
return True, ""
@xhr
@requires(str, 'uri')
def new(self, environ, request, uri):
@ -174,6 +199,7 @@ class API(object):
return Response(json.dumps(rv), 200, content_type='application/json')
@xhr
def edit(self, environ, request, id):
try:
@ -217,6 +243,7 @@ class API(object):
resp.headers.add("X-Set-Cookie", cookie("isso-%i" % rv["id"]))
return resp
@xhr
def delete(self, environ, request, id, key=None):
try:
@ -319,11 +346,13 @@ class API(object):
return JSON(json.dumps(rv), 200)
@xhr
def like(self, environ, request, id):
nv = self.comments.vote(True, id, utils.anonymize(str(request.remote_addr)))
return Response(json.dumps(nv), 200)
@xhr
def dislike(self, environ, request, id):
nv = self.comments.vote(False, id, utils.anonymize(str(request.remote_addr)))

View File

@ -2,6 +2,7 @@
import json
from werkzeug.test import Client
class FakeIP(object):
@ -14,6 +15,13 @@ class FakeIP(object):
return self.app(environ, start_response)
class JSONClient(Client):
def open(self, *args, **kwargs):
kwargs.setdefault('content_type', 'application/json')
return super(JSONClient, self).open(*args, **kwargs)
class Dummy:
status = 200

View File

@ -12,14 +12,13 @@ try:
except ImportError:
from urllib import urlencode
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, core
from isso.utils import http
from isso.views import comments
from fixtures import curl, loads, FakeIP
from fixtures import curl, loads, FakeIP, JSONClient
http.curl = curl
@ -37,7 +36,7 @@ class TestComments(unittest.TestCase):
self.app = App(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = Client(self.app, Response)
self.client = JSONClient(self.app, Response)
self.get = self.client.get
self.put = self.client.put
self.post = self.client.post
@ -139,7 +138,7 @@ class TestComments(unittest.TestCase):
def testDeleteWithReference(self):
client = Client(self.app, Response)
client = JSONClient(self.app, Response)
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First', 'parent': 1}))
@ -166,7 +165,7 @@ class TestComments(unittest.TestCase):
--- [ comment 4, ref 2 ]
[ comment 5 ]
"""
client = Client(self.app, Response)
client = JSONClient(self.app, Response)
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'First'}))
client.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Second', 'parent': 1}))
@ -199,11 +198,11 @@ class TestComments(unittest.TestCase):
def testDeleteAndCreateByDifferentUsersButSamePostId(self):
mallory = Client(self.app, Response)
mallory = JSONClient(self.app, Response)
mallory.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Foo'}))
mallory.delete('/id/1')
bob = Client(self.app, Response)
bob = JSONClient(self.app, Response)
bob.post('/new?uri=%2Fpath%2F', data=json.dumps({'text': 'Bar'}))
assert mallory.delete('/id/1').status_code == 403
@ -269,10 +268,27 @@ class TestComments(unittest.TestCase):
def testDeleteCommentRemovesThread(self):
rv = self.client.post('/new?uri=%2F', data=json.dumps({"text": "..."}))
assert '/' in self.app.db.threads
self.client.delete('/id/1')
assert '/' not in self.app.db.threads
rv = self.client.post('/new?uri=%2F', data=json.dumps({"text": "..."}))
assert '/' in self.app.db.threads
self.client.delete('/id/1')
assert '/' not in self.app.db.threads
def testCSRF(self):
js = "application/json"
form = "application/x-www-form-urlencoded"
self.post('/new?uri=%2F', data=json.dumps({"text": "..."}))
# no header is fine (default for XHR)
assert self.post('/id/1/dislike', content_type="").status_code == 200
# x-www-form-urlencoded is definitely not RESTful
assert self.post('/id/1/dislike', content_type=form).status_code == 403
assert self.post('/new?uri=%2F', data=json.dumps({"text": "..."}),
content_type=form).status_code == 403
# just for the record
assert self.post('/id/1/dislike', content_type=js).status_code == 200
class TestModeratedComments(unittest.TestCase):
@ -289,7 +305,7 @@ class TestModeratedComments(unittest.TestCase):
self.app = App(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = Client(self.app, Response)
self.client = JSONClient(self.app, Response)
def tearDown(self):
os.unlink(self.path)
@ -320,7 +336,7 @@ class TestPurgeComments(unittest.TestCase):
self.app = App(conf)
self.app.wsgi_app = FakeIP(self.app.wsgi_app, "192.168.1.1")
self.client = Client(self.app, Response)
self.client = JSONClient(self.app, Response)
def testPurgeDoesNoHarm(self):
self.client.post('/new?uri=test', data=json.dumps({"text": "..."}))

View File

@ -6,13 +6,12 @@ import json
import tempfile
import unittest
from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, core
from isso.utils import http
from fixtures import curl, loads, FakeIP
from fixtures import curl, loads, FakeIP, JSONClient
http.curl = curl
@ -33,7 +32,7 @@ class TestVote(unittest.TestCase):
app = App(conf)
app.wsgi_app = FakeIP(app.wsgi_app, ip)
return Client(app, Response)
return JSONClient(app, Response)
def testZeroLikes(self):