simplify import format detection

pull/101/head
Martin Zimmermann 10 years ago
parent 3f50db4174
commit 91e63c7e5f

@ -6,6 +6,7 @@ import sys
import os import os
import io import io
import re import re
import logging
import textwrap import textwrap
from time import mktime, strptime, time from time import mktime, strptime, time
@ -26,6 +27,7 @@ except ImportError:
from xml.etree import ElementTree from xml.etree import ElementTree
logger = logging.getLogger("isso")
def strip(val): def strip(val):
if isinstance(val, string_types): if isinstance(val, string_types):
@ -146,14 +148,6 @@ class Disqus(object):
initial_indent=" ", subsequent_indent=" ")) initial_indent=" ", subsequent_indent=" "))
print("") print("")
@classmethod
def detect(cls, peek):
if 'xmlns="http://disqus.com' in peek:
return "http://disqus.com"
return None
class WordPress(object): class WordPress(object):
@ -164,11 +158,13 @@ class WordPress(object):
self.xmlfile = xmlfile self.xmlfile = xmlfile
self.count = 0 self.count = 0
with io.open(xmlfile) as fp: for line in io.open(xmlfile):
ns = WordPress.detect(fp.read(io.DEFAULT_BUFFER_SIZE)) m = WordPress.detect(line)
if m:
if ns: self.ns = WordPress.ns.replace("1.0", m.group(1))
self.ns = "{" + ns + "}" break
else:
logger.warn("No WXR namespace found, assuming 1.0")
def insert(self, thread): def insert(self, thread):
@ -242,12 +238,19 @@ class WordPress(object):
@classmethod @classmethod
def detect(cls, peek): def detect(cls, peek):
return re.compile("http://wordpress.org/export/(1\.\d)/").search(peek)
m = re.search("http://wordpress.org/export/1\.\d/", peek) def autodetect(peek):
if m:
return m.group(0)
return None if 'xmlns="http://disqus.com' in peek:
return Disqus
m = WordPress.detect(peek)
if m:
return WordPress
return None
def dispatch(type, db, dump): def dispatch(type, db, dump):
@ -255,20 +258,15 @@ def dispatch(type, db, dump):
if input("Isso DB is not empty! Continue? [y/N]: ") not in ("y", "Y"): if input("Isso DB is not empty! Continue? [y/N]: ") not in ("y", "Y"):
raise SystemExit("Abort.") raise SystemExit("Abort.")
if type is None: if type == "disqus":
cls = Disqus
elif type == "wordpress":
cls = WordPress
else:
with io.open(dump) as fp: with io.open(dump) as fp:
peek = fp.read(io.DEFAULT_BUFFER_SIZE) cls = autodetect(fp.read(io.DEFAULT_BUFFER_SIZE))
if WordPress.detect(peek): if cls is None:
type = "wordpress"
if Disqus.detect(peek):
type = "disqus"
if type == "wordpress":
WordPress(db, dump).migrate()
elif type == "disqus":
Disqus(db, dump).migrate()
else:
raise SystemExit("Unknown format, abort.") raise SystemExit("Unknown format, abort.")
cls(db, dump).migrate()

@ -13,7 +13,7 @@ from os.path import join, dirname
from isso.core import Config from isso.core import Config
from isso.db import SQLite3 from isso.db import SQLite3
from isso.migrate import Disqus, WordPress from isso.migrate import Disqus, WordPress, autodetect
class TestMigration(unittest.TestCase): class TestMigration(unittest.TestCase):
@ -82,14 +82,13 @@ class TestMigration(unittest.TestCase):
xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:wp="http://wordpress.org/export/%s/">""" xmlns:wp="http://wordpress.org/export/%s/">"""
self.assertEqual(WordPress.detect(wp % "invalid"), None) self.assertEqual(autodetect(wp % "foo"), None)
for version in ("1.0", "1.1", "1.2", "1.3"): for version in ("1.0", "1.1", "1.2", "1.3"):
self.assertEqual(WordPress.detect(wp % version), self.assertEqual(autodetect(wp % version), WordPress)
"http://wordpress.org/export/%s/" % version)
dq = '''\ dq = '''\
<?xml version="1.0"?> <?xml version="1.0"?>
<disqus xmlns="http://disqus.com" <disqus xmlns="http://disqus.com"
xmlns:dsq="http://disqus.com/disqus-internals"''' xmlns:dsq="http://disqus.com/disqus-internals"'''
self.assertIsNotNone(Disqus.detect(dq)) self.assertEqual(autodetect(dq), Disqus)

@ -141,4 +141,4 @@
<link>http://example.tld/?p=6</link> <link>http://example.tld/?p=6</link>
</item> </item>
</channel> </channel>
</rss> </rss>

Loading…
Cancel
Save