improve doctests and refactor title extraction

legacy/0.2
Martin Zimmermann 11 years ago
parent 89b1ca8846
commit cb40c7ca42

@ -4,16 +4,17 @@ import sys
PY2K = sys.version_info[0] == 2
if not PY2K:
# iterkeys = lambda d: iter(d.keys())
# iteritems = lambda d: iter(d.items())
map, zip, filter = map, zip, filter
text_type = str
string_types = (str, )
buffer = memoryview
else:
# iterkeys = lambda d: d.iterkeys()
# iteritems = lambda d: d.iteritems()
from itertools import imap, izip, ifilter
map, zip, filter = imap, izip, ifilter
text_type = unicode
string_types = (str, unicode)

@ -117,6 +117,9 @@ class Mixin(object):
def threaded(func):
"""
Decorator to execute each :param func: call in a separate thread.
"""
def dec(self, *args, **kwargs):
thread.start_new_thread(func, (self, ) + args, kwargs)

@ -15,10 +15,10 @@ def anonymize(remote_addr):
Anonymize IPv4 and IPv6 :param remote_addr: to /24 (zero'd)
and /48 (zero'd).
>>> anonymize(u'12.34.56.78')
u'12.34.56.0'
>>> anonymize(u'1234:5678:90ab:cdef:fedc:ba09:8765:4321')
u'1234:5678:90ab:0000:0000:0000:0000:0000'
>>> anonymize(u'12.34.56.78') # doctest: +IGNORE_UNICODE
'12.34.56.0'
>>> anonymize(u'1234:5678:90ab:cdef:fedc:ba09:8765:4321') # doctest: +IGNORE_UNICODE
'1234:5678:90ab:0000:0000:0000:0000:0000'
"""
try:
ipv4 = ipaddress.IPv4Address(remote_addr)

@ -9,74 +9,17 @@ try:
except ImportError:
import http.client as httplib
import html5lib
from isso.utils import parse
def urlexists(host, path):
def curl(method, host, path, timeout=3):
host, port, ssl = parse.host(host)
http = httplib.HTTPSConnection if ssl else httplib.HTTPConnection
with closing(http(host, port, timeout=3)) as con:
with closing(http(host, port, timeout=timeout)) as con:
try:
con.request('HEAD', path)
con.request(method, path)
except (httplib.HTTPException, socket.error):
return False
return con.getresponse().status == 200
def heading(host, path):
"""Connect to `host`, GET path and start from #isso-thread to search for
a possible heading (h1). Returns `None` if nothing found."""
host, port, ssl = parse.host(host)
http = httplib.HTTPSConnection if ssl else httplib.HTTPConnection
with closing(http(host, port, timeout=15)) as con:
con.request('GET', path)
html = html5lib.parse(con.getresponse().read(), treebuilder="dom")
assert html.lastChild.nodeName == "html"
html = html.lastChild
# aka getElementById
el = list(filter(lambda i: i.attributes["id"].value == "isso-thread",
filter(lambda i: i.attributes.has_key("id"), html.getElementsByTagName("div"))))
if not el:
return "Untitled"
el = el[0]
visited = []
def recurse(node):
for child in node.childNodes:
if child.nodeType != child.ELEMENT_NODE:
continue
if child.nodeName.upper() == "H1":
return child
if child not in visited:
return recurse(child)
def gettext(rv):
for child in rv.childNodes:
if child.nodeType == child.TEXT_NODE:
yield child.nodeValue
if child.nodeType == child.ELEMENT_NODE:
for item in gettext(child):
yield item
while el is not None: # el.parentNode is None in the very end
visited.append(el)
rv = recurse(el)
if rv:
return ''.join(gettext(rv)).strip()
el = el.parentNode
return "Untitled."
return None
return con.getresponse()

@ -4,11 +4,21 @@ from __future__ import print_function
import re
import datetime
from itertools import chain
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
import html5lib
from isso.compat import map, filter, PY2K
if PY2K: # http://bugs.python.org/issue12984
from xml.dom.minidom import NamedNodeMap
NamedNodeMap.__contains__ = lambda self, key: self.has_key(key)
def timedelta(value):
"""
@ -63,3 +73,78 @@ def host(name):
if rv.scheme == 'https' and rv.port is None:
return (rv.netloc, 443, True)
return (rv.netloc.rsplit(':')[0], rv.port or 80, rv.scheme == 'https')
def title(data, default=u"Untitled."):
"""
Extract <h1> title from web page. The title is *probably* the text node,
which is the nearest H1 node in context to an element with the `isso-thread` id.
>>> title("asdf") # doctest: +IGNORE_UNICODE
u'Untitled.'
>>> title('''
... <html>
... <head>
... <title>Foo!</title>
... </head>
... <body>
... <header>
... <h1>generic website title.</h1>
... <h2>subtile title.</h2>
... </header>
... <article>
... <header>
... <h1>Can you find me?</h1>
... </header>
... <section id="isso-thread">
... </section>
... </article>
... </body>
... </html>''') # doctest: +IGNORE_UNICODE
u'Can you find me?'
"""
html = html5lib.parse(data, treebuilder="dom")
assert html.lastChild.nodeName == "html"
html = html.lastChild
# aka getElementById
el = list(filter(lambda i: i.attributes["id"].value == "isso-thread",
filter(lambda i: "id" in i.attributes,
chain(*map(html.getElementsByTagName, ("div", "section"))))))
if not el:
return default
el = el[0]
visited = []
def recurse(node):
for child in node.childNodes:
if child.nodeType != child.ELEMENT_NODE:
continue
if child.nodeName.upper() == "H1":
return child
if child not in visited:
return recurse(child)
def gettext(rv):
for child in rv.childNodes:
if child.nodeType == child.TEXT_NODE:
yield child.nodeValue
if child.nodeType == child.ELEMENT_NODE:
for item in gettext(child):
yield item
while el is not None: # el.parentNode is None in the very end
visited.append(el)
rv = recurse(el)
if rv:
return ''.join(gettext(rv)).strip()
el = el.parentNode
return default

@ -6,7 +6,6 @@ import time
import hashlib
import logging
import sqlite3
import logging
from itsdangerous import SignatureExpired, BadSignature
@ -16,7 +15,7 @@ from werkzeug.exceptions import abort, BadRequest
from isso.compat import text_type as str
from isso import utils, notify, db
from isso.utils import http
from isso.utils import http, parse
from isso.crypto import pbkdf2
logger = logging.getLogger("isso")
@ -50,9 +49,6 @@ class requires:
@requires(str, 'uri')
def new(app, environ, request, uri):
if uri not in app.db.threads and not http.urlexists(app.conf.get('general', 'host'), uri):
return Response('URI does not exist', 404)
try:
data = json.loads(request.get_data().decode('utf-8'))
except ValueError:
@ -74,11 +70,23 @@ def new(app, environ, request, uri):
data['mode'] = (app.conf.getboolean('moderation', 'enabled') and 2) or 1
data['remote_addr'] = utils.anonymize(str(request.remote_addr))
# extract site's <h1> title
if uri not in app.db.threads:
for host in app.conf.getiter('general', 'host'):
resp = http.curl('HEAD', host, uri)
if resp and resp.status == 200:
title = parse.title(resp.read())
break
else:
return Response('URI does not exist', 404)
else:
title = app.db.threads[uri].title
with app.lock:
if uri not in app.db.threads:
app.db.threads.new(uri, http.heading(app.conf.get('general', 'host'), uri))
logger.info('new thread: %s -> %s', uri, app.db.threads[uri].title)
title = app.db.threads[uri].title
app.db.threads.new(uri, title)
logger.info('new thread: %s -> %s', uri, title)
try:
with app.lock:

@ -18,8 +18,14 @@ from isso import Isso, notify, views, core
from isso.utils import http
from isso.views import comment
http.heading = lambda *args: "Untitled."
http.urlexists = lambda *args: True
class Dummy:
status = 200
def read(self):
return ''
http.curl = lambda method, host, path: Dummy()
loads = lambda data: json.loads(data.decode('utf-8'))

@ -10,9 +10,16 @@ from werkzeug.test import Client
from werkzeug.wrappers import Response
from isso import Isso, notify, utils, core
from isso.utils import http
utils.heading = lambda *args: "Untitled."
utils.urlexists = lambda *args: True
class Dummy:
status = 200
def read(self):
return ''
http.curl = lambda method, host, path: Dummy()
loads = lambda data: json.loads(data.decode('utf-8'))

@ -15,6 +15,7 @@ deps =
deps =
nose
ipaddress
doctest-ignore-unicode
commands=
nosetests --with-doctest isso/
nosetests --with-doctest --with-doctest-ignore-unicode isso/
nosetests specs/

Loading…
Cancel
Save