move http utils in separate module

This commit is contained in:
Martin Zimmermann 2013-10-24 14:34:45 +02:00
parent 48b4c9f9a5
commit 2db98d0d34
4 changed files with 89 additions and 94 deletions

View File

@ -2,103 +2,14 @@
from __future__ import division from __future__ import division
import socket
try:
import httplib
from urlparse import urlparse
except ImportError:
import http.client as httplib
from urllib.parse import urlparse
import random import random
import hashlib import hashlib
from string import ascii_letters, digits from string import ascii_letters, digits
from contextlib import closing
import html5lib
import ipaddress import ipaddress
def normalize(host):
if not host.startswith(('http://', 'https://')):
host = 'https://' + host
rv = urlparse(host)
if rv.scheme == 'https':
return (rv.netloc, 443)
return (rv.netloc.rsplit(':')[0], rv.port or 80)
def urlexists(host, path):
host, port = normalize(host)
http = httplib.HTTPSConnection if port == 443 else httplib.HTTPConnection
with closing(http(host, port, timeout=3)) as con:
try:
con.request('HEAD', path)
except (httplib.HTTPException, socket.error):
return False
return con.getresponse().status == 200
def heading(host, path):
"""Connect to `host`, GET path and start from #isso-thread to search for
a possible heading (h1). Returns `None` if nothing found."""
host, port = normalize(host)
http = httplib.HTTPSConnection if port == 443 else httplib.HTTPConnection
with closing(http(host, port, timeout=15)) as con:
con.request('GET', path)
html = html5lib.parse(con.getresponse().read(), treebuilder="dom")
assert html.lastChild.nodeName == "html"
html = html.lastChild
# aka getElementById
el = list(filter(lambda i: i.attributes["id"].value == "isso-thread",
filter(lambda i: i.attributes.has_key("id"), html.getElementsByTagName("div"))))
if not el:
return "Untitled"
el = el[0]
visited = []
def recurse(node):
for child in node.childNodes:
if child.nodeType != child.ELEMENT_NODE:
continue
if child.nodeName.upper() == "H1":
return child
if child not in visited:
return recurse(child)
def gettext(rv):
for child in rv.childNodes:
if child.nodeType == child.TEXT_NODE:
yield child.nodeValue
if child.nodeType == child.ELEMENT_NODE:
for item in gettext(child):
yield item
while el is not None: # el.parentNode is None in the very end
visited.append(el)
rv = recurse(el)
if rv:
return ''.join(gettext(rv)).strip()
el = el.parentNode
return "Untitled."
def anonymize(remote_addr): def anonymize(remote_addr):
try: try:
ipv4 = ipaddress.IPv4Address(remote_addr) ipv4 = ipaddress.IPv4Address(remote_addr)

82
isso/utils/http.py Normal file
View File

@ -0,0 +1,82 @@
# -*- encoding: utf-8 -*-
import socket
from contextlib import closing
try:
import httplib
except ImportError:
import http.client as httplib
import html5lib
from isso.utils import parse
def urlexists(host, path):
host, port = parse.host(host)
http = httplib.HTTPSConnection if port == 443 else httplib.HTTPConnection
with closing(http(host, port, timeout=3)) as con:
try:
con.request('HEAD', path)
except (httplib.HTTPException, socket.error):
return False
return con.getresponse().status == 200
def heading(host, path):
"""Connect to `host`, GET path and start from #isso-thread to search for
a possible heading (h1). Returns `None` if nothing found."""
host, port = parse.host(host)
http = httplib.HTTPSConnection if port == 443 else httplib.HTTPConnection
with closing(http(host, port, timeout=15)) as con:
con.request('GET', path)
html = html5lib.parse(con.getresponse().read(), treebuilder="dom")
assert html.lastChild.nodeName == "html"
html = html.lastChild
# aka getElementById
el = list(filter(lambda i: i.attributes["id"].value == "isso-thread",
filter(lambda i: i.attributes.has_key("id"), html.getElementsByTagName("div"))))
if not el:
return "Untitled"
el = el[0]
visited = []
def recurse(node):
for child in node.childNodes:
if child.nodeType != child.ELEMENT_NODE:
continue
if child.nodeName.upper() == "H1":
return child
if child not in visited:
return recurse(child)
def gettext(rv):
for child in rv.childNodes:
if child.nodeType == child.TEXT_NODE:
yield child.nodeValue
if child.nodeType == child.ELEMENT_NODE:
for item in gettext(child):
yield item
while el is not None: # el.parentNode is None in the very end
visited.append(el)
rv = recurse(el)
if rv:
return ''.join(gettext(rv)).strip()
el = el.parentNode
return "Untitled."

View File

@ -15,6 +15,7 @@ from werkzeug.exceptions import abort, BadRequest
from isso.compat import text_type as str from isso.compat import text_type as str
from isso import utils, notify, db from isso import utils, notify, db
from isso.utils import http
from isso.crypto import pbkdf2 from isso.crypto import pbkdf2
FIELDS = set(['id', 'parent', 'text', 'author', 'website', 'email', 'mode', FIELDS = set(['id', 'parent', 'text', 'author', 'website', 'email', 'mode',
@ -46,7 +47,7 @@ class requires:
@requires(str, 'uri') @requires(str, 'uri')
def new(app, environ, request, uri): def new(app, environ, request, uri):
if uri not in app.db.threads and not utils.urlexists(app.conf.get('general', 'host'), uri): if uri not in app.db.threads and not http.urlexists(app.conf.get('general', 'host'), uri):
return Response('URI does not exist', 404) return Response('URI does not exist', 404)
try: try:
@ -72,7 +73,7 @@ def new(app, environ, request, uri):
with app.lock: with app.lock:
if uri not in app.db.threads: if uri not in app.db.threads:
app.db.threads.new(uri, utils.heading(app.conf.get('general', 'host'), uri)) app.db.threads.new(uri, http.heading(app.conf.get('general', 'host'), uri))
title = app.db.threads[uri].title title = app.db.threads[uri].title
try: try:

View File

@ -14,11 +14,12 @@ except ImportError:
from werkzeug.test import Client from werkzeug.test import Client
from werkzeug.wrappers import Response from werkzeug.wrappers import Response
from isso import Isso, notify, utils, views, core from isso import Isso, notify, views, core
from isso.utils import http
from isso.views import comment from isso.views import comment
utils.heading = lambda *args: "Untitled." http.heading = lambda *args: "Untitled."
utils.urlexists = lambda *args: True http.urlexists = lambda *args: True
loads = lambda data: json.loads(data.decode('utf-8')) loads = lambda data: json.loads(data.decode('utf-8'))