handle WP's query-string "pages" and variable WXR namespaces

Site links such as /?p=1234 are imported *as is* and maybe do work in
Isso. Do not use a query-based URL structure as permalinks. Ever.

Also, depending on the pages you are going to export, WXR' XML namespace
may change from ../export/1.0/ to ../export/1.2/. Isso tries to import
any WXR 1.x
This commit is contained in:
Martin Zimmermann 2014-05-02 13:06:06 +02:00
parent 333bba728b
commit 123ea26ca9
4 changed files with 111 additions and 23 deletions

View File

@ -68,7 +68,8 @@ on *Discussions* and select the *Export* tab. You'll receive an email with your
comments. Unfortunately, Disqus does not export up- and downvotes. comments. Unfortunately, Disqus does not export up- and downvotes.
To export comments from your previous WordPress installation, go to *Tools*, To export comments from your previous WordPress installation, go to *Tools*,
export your data. export your data. WordPress WXR import is quite new and may not work for you;
please report any failures.
Now import the XML dump: Now import the XML dump:

View File

@ -5,6 +5,7 @@ from __future__ import division, print_function
import sys import sys
import os import os
import io import io
import re
import textwrap import textwrap
from time import mktime, strptime, time from time import mktime, strptime, time
@ -145,6 +146,14 @@ class Disqus(object):
initial_indent=" ", subsequent_indent=" ")) initial_indent=" ", subsequent_indent=" "))
print("") print("")
@classmethod
def detect(cls, peek):
if 'xmlns="http://disqus.com' in peek:
return "http://disqus.com"
return None
class WordPress(object): class WordPress(object):
@ -155,12 +164,23 @@ class WordPress(object):
self.xmlfile = xmlfile self.xmlfile = xmlfile
self.count = 0 self.count = 0
with io.open(xmlfile) as fp:
ns = WordPress.detect(fp.read(io.DEFAULT_BUFFER_SIZE))
if ns:
self.ns = "{" + ns + "}"
def insert(self, thread): def insert(self, thread):
path = urlparse(thread.find("link").text).path url = urlparse(thread.find("link").text)
path = url.path
if url.query:
path += "?" + url.query
self.db.threads.new(path, thread.find("title").text.strip()) self.db.threads.new(path, thread.find("title").text.strip())
comments = list(map(WordPress.Comment, thread.findall(WordPress.ns + "comment"))) comments = list(map(self.Comment, thread.findall(self.ns + "comment")))
comments.sort(key=lambda k: k["id"]) comments.sort(key=lambda k: k["id"])
remap = {} remap = {}
@ -188,31 +208,46 @@ class WordPress(object):
def migrate(self): def migrate(self):
tree = ElementTree.parse(self.xmlfile) tree = ElementTree.parse(self.xmlfile)
skip = 0
items = tree.findall("channel/item") items = tree.findall("channel/item")
progress = Progress(len(items)) progress = Progress(len(items))
for i, thread in enumerate(items): for i, thread in enumerate(items):
if thread.find("title").text is None or thread.find(self.ns + "comment") is None:
skip += 1
continue
progress.update(i, thread.find("title").text) progress.update(i, thread.find("title").text)
self.insert(thread) self.insert(thread)
progress.finish("{0} threads, {1} comments".format(len(items), self.count)) progress.finish("{0} threads, {1} comments".format(
len(items) - skip, self.count))
def Comment(self, el):
return {
"text": strip(el.find(self.ns + "comment_content").text),
"author": strip(el.find(self.ns + "comment_author").text),
"email": strip(el.find(self.ns + "comment_author_email").text),
"website": strip(el.find(self.ns + "comment_author_url").text),
"remote_addr": anonymize(
strip(el.find(self.ns + "comment_author_IP").text)),
"created": mktime(strptime(
strip(el.find(self.ns + "comment_date_gmt").text),
"%Y-%m-%d %H:%M:%S")),
"mode": 1 if el.find(self.ns + "comment_approved").text == "1" else 2,
"id": int(el.find(self.ns + "comment_id").text),
"parent": int(el.find(self.ns + "comment_parent").text) or None
}
@classmethod @classmethod
def Comment(cls, el): def detect(cls, peek):
return {
"text": strip(el.find(WordPress.ns + "comment_content").text), m = re.search("http://wordpress.org/export/1\.\d/", peek)
"author": strip(el.find(WordPress.ns + "comment_author").text), if m:
"email": strip(el.find(WordPress.ns + "comment_author_email").text), return m.group(0)
"website": strip(el.find(WordPress.ns + "comment_author_url").text),
"remote_addr": anonymize( return None
strip(el.find(WordPress.ns + "comment_author_IP").text)),
"created": mktime(strptime(
strip(el.find(WordPress.ns + "comment_date_gmt").text),
"%Y-%m-%d %H:%M:%S")),
"mode": 1 if el.find(WordPress.ns + "comment_approved").text == "1" else 2,
"id": int(el.find(WordPress.ns + "comment_id").text),
"parent": int(el.find(WordPress.ns + "comment_parent").text) or None
}
def dispatch(type, db, dump): def dispatch(type, db, dump):
@ -223,12 +258,12 @@ def dispatch(type, db, dump):
if type is None: if type is None:
with io.open(dump) as fp: with io.open(dump) as fp:
peek = fp.read(2048) peek = fp.read(io.DEFAULT_BUFFER_SIZE)
if 'xmlns:wp="%s"' % WordPress.ns[1:-1] in peek: if WordPress.detect(peek):
type = "wordpress" type = "wordpress"
if '<disqus xmlns=' in peek: if Disqus.detect(peek):
type = "disqus" type = "disqus"
if type == "wordpress": if type == "wordpress":

View File

@ -1,5 +1,7 @@
# -*- encoding: utf-8 -*- # -*- encoding: utf-8 -*-
from __future__ import unicode_literals
try: try:
import unittest2 as unittest import unittest2 as unittest
except ImportError: except ImportError:
@ -49,7 +51,11 @@ class TestMigration(unittest.TestCase):
self.assertEqual(db.threads["/2014/test/"]["title"], "Hello, World!") self.assertEqual(db.threads["/2014/test/"]["title"], "Hello, World!")
self.assertEqual(db.threads["/2014/test/"]["id"], 1) self.assertEqual(db.threads["/2014/test/"]["id"], 1)
self.assertEqual(len(db.execute("SELECT id FROM comments").fetchall()), 6) self.assertEqual(db.threads["/?p=4"]["title"], "...")
self.assertEqual(db.threads["/?p=4"]["id"], 2)
self.assertEqual(len(db.execute("SELECT id FROM threads").fetchall()), 2)
self.assertEqual(len(db.execute("SELECT id FROM comments").fetchall()), 7)
first = db.comments.get(1) first = db.comments.get(1)
self.assertEqual(first["author"], "Ohai") self.assertEqual(first["author"], "Ohai")
@ -66,3 +72,24 @@ class TestMigration(unittest.TestCase):
last = db.comments.get(6) last = db.comments.get(6)
self.assertEqual(last["author"], "Letzter :/") self.assertEqual(last["author"], "Letzter :/")
self.assertEqual(last["parent"], None) self.assertEqual(last["parent"], None)
def test_detection(self):
wp = """\
<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0"
xmlns:content="http://purl.org/rss/1.0/modules/content/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:wp="http://wordpress.org/export/%s/">"""
self.assertEqual(WordPress.detect(wp % "invalid"), None)
for version in ("1.0", "1.1", "1.2", "1.3"):
self.assertEqual(WordPress.detect(wp % version),
"http://wordpress.org/export/%s/" % version)
dq = '''\
<?xml version="1.0"?>
<disqus xmlns="http://disqus.com"
xmlns:dsq="http://disqus.com/disqus-internals"'''
self.assertIsNotNone(Disqus.detect(dq))

View File

@ -115,5 +115,30 @@
<wp:comment_user_id>1</wp:comment_user_id> <wp:comment_user_id>1</wp:comment_user_id>
</wp:comment> </wp:comment>
</item> </item>
<!-- handle ?p=X urls -->
<item>
<title>...</title>
<link>http://example.tld/?p=4</link>
<wp:comment>
<wp:comment_id>11</wp:comment_id>
<wp:comment_author><![CDATA[Anonymous]]></wp:comment_author>
<wp:comment_author_email>info@posativ.org
</wp:comment_author_email>
<wp:comment_author_url></wp:comment_author_url>
<wp:comment_author_IP>::ffff:86.56.63.0</wp:comment_author_IP>
<wp:comment_date>2014-04-29 15:21:56</wp:comment_date>
<wp:comment_date_gmt>2014-04-29 15:21:57</wp:comment_date_gmt>
<wp:comment_content><![CDATA[...]]></wp:comment_content>
<wp:comment_approved>1</wp:comment_approved>
<wp:comment_type></wp:comment_type>
<wp:comment_parent>0</wp:comment_parent>
<wp:comment_user_id>1</wp:comment_user_id>
</wp:comment>
</item>
<item>
<title>No comments</title>
<link>http://example.tld/?p=6</link>
</item>
</channel> </channel>
</rss> </rss>