add basic SQLite adapter + tests

This commit is contained in:
posativ 2012-10-16 19:11:59 +02:00
parent 0333f7efb0
commit 1199424a2b
3 changed files with 148 additions and 0 deletions

View File

@ -2,5 +2,22 @@
from werkzeug.wrappers import Response
class Comment(object):
fields = ['text', 'author', 'email', 'website', 'id', 'parent', 'timestamp']
def __init__(self, **kw):
for field in self.fields:
if field == 'text' and field not in kw:
raise ValueError('Comment needs at least text, but no text was provided.')
self.__dict__[field] = kw.get(field)
@property
def json(self):
return ''
def comment(app, environ, request, path, id=None):
return Response('', 200)

95
isso/db.py Normal file
View File

@ -0,0 +1,95 @@
import abc
import time
import sqlite3
from os.path import join
from isso.comments import Comment
class Abstract:
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def initialize(self, conf):
return
@abc.abstractmethod
def shutdown(self):
return
@abc.abstractmethod
def add(path, comment):
return
@abc.abstractmethod
def update(self, path, comment):
return
@abc.abstractmethod
def delete(self, path, comment):
return
@abc.abstractmethod
def retrieve(self, path, limit=20):
return
class SQLite(Abstract):
fields = [
'id', 'path', 'timestamp',
'text', 'author', 'email', 'website', 'parent'
]
def initialize(self, conf):
self.dbpath = join(conf['DATA_DIR'], 'comments.db')
with sqlite3.connect(self.dbpath) as con:
sql = ('main.comments (id INTEGER NOT NULL, path VARCHAR(255) NOT NULL,'
'timestamp FLOAT NOT NULL, text VARCHAR, author VARCHAR(64),'
'email VARCHAR(64), website VARCHAR(64), parent INTEGER,'
'PRIMARY KEY (id, path))')
con.execute("CREATE TABLE IF NOT EXISTS %s;" % sql)
# increment id if (id, path) is no longer unique
con.execute("""\
CREATE TRIGGER IF NOT EXISTS increment AFTER INSERT ON comments
BEGIN
UPDATE comments SET
id=(SELECT MAX(id)+1 FROM comments WHERE path=NEW.path)
WHERE rowid=NEW.rowid;
END;""")
def shutdown(self):
return
def query2comment(self, query):
return Comment(
text=query[3], author=query[4], email=query[5], website=query[6],
parent=query[7], timestamp=query[2], id=query[0]
)
def add(self, path, c):
with sqlite3.connect(self.dbpath) as con:
keys = ','.join(self.fields)
values = ','.join('?'*len(self.fields))
con.execute('INSERT INTO comments (%s) VALUES (%s);' % (keys, values),
[0, path, time.time(), c.text, c.author, c.email, c.website, c.parent])
def update(self, path, comment):
return
def delete(self, path, comment):
return
def retrieve(self, path, limit=20):
with sqlite3.connect(self.dbpath) as con:
rv = con.execute("SELECT * FROM comments WHERE path = '%s'" % path \
+ " ORDER BY id DESC;").fetchall()
for item in rv:
yield self.query2comment(item)

36
specs/test_db.py Normal file
View File

@ -0,0 +1,36 @@
import shutil
import tempfile
import unittest
from isso.comments import Comment
from isso.db import SQLite
class TestSQLite(unittest.TestCase):
def setUp(self):
self.path = tempfile.mkdtemp()
self.db = SQLite()
self.db.initialize({'DATA_DIR': self.path})
def test_add(self):
self.db.add('/', Comment(text='Foo'))
self.db.add('/', Comment(text='Bar'))
self.db.add('/path/', Comment(text='Baz'))
rv = list(self.db.retrieve('/'))
assert rv[0].id == 2
assert rv[0].text == 'Bar'
assert rv[1].id == 1
assert rv[1].text == 'Foo'
rv = list(self.db.retrieve('/path/'))
assert rv[0].id == 1
assert rv[0].text == 'Baz'
def tearDown(self):
shutil.rmtree(self.path)