From 1199424a2b44d5245e1fbfef0ad11469068b9ff6 Mon Sep 17 00:00:00 2001 From: posativ Date: Tue, 16 Oct 2012 19:11:59 +0200 Subject: [PATCH] add basic SQLite adapter + tests --- isso/comments.py | 17 +++++++++ isso/db.py | 95 ++++++++++++++++++++++++++++++++++++++++++++++++ specs/test_db.py | 36 ++++++++++++++++++ 3 files changed, 148 insertions(+) create mode 100644 isso/db.py create mode 100644 specs/test_db.py diff --git a/isso/comments.py b/isso/comments.py index 7e569c8..368ce23 100644 --- a/isso/comments.py +++ b/isso/comments.py @@ -2,5 +2,22 @@ from werkzeug.wrappers import Response +class Comment(object): + + fields = ['text', 'author', 'email', 'website', 'id', 'parent', 'timestamp'] + + def __init__(self, **kw): + + for field in self.fields: + if field == 'text' and field not in kw: + raise ValueError('Comment needs at least text, but no text was provided.') + self.__dict__[field] = kw.get(field) + + + @property + def json(self): + return '' + + def comment(app, environ, request, path, id=None): return Response('', 200) diff --git a/isso/db.py b/isso/db.py new file mode 100644 index 0000000..2038b09 --- /dev/null +++ b/isso/db.py @@ -0,0 +1,95 @@ + +import abc +import time +import sqlite3 + +from os.path import join + +from isso.comments import Comment + + +class Abstract: + + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def initialize(self, conf): + return + + @abc.abstractmethod + def shutdown(self): + return + + @abc.abstractmethod + def add(path, comment): + return + + @abc.abstractmethod + def update(self, path, comment): + return + + @abc.abstractmethod + def delete(self, path, comment): + return + + @abc.abstractmethod + def retrieve(self, path, limit=20): + return + + +class SQLite(Abstract): + + fields = [ + 'id', 'path', 'timestamp', + 'text', 'author', 'email', 'website', 'parent' + ] + + def initialize(self, conf): + + self.dbpath = join(conf['DATA_DIR'], 'comments.db') + + with sqlite3.connect(self.dbpath) as con: + sql = ('main.comments (id INTEGER NOT NULL, path VARCHAR(255) NOT NULL,' + 'timestamp FLOAT NOT NULL, text VARCHAR, author VARCHAR(64),' + 'email VARCHAR(64), website VARCHAR(64), parent INTEGER,' + 'PRIMARY KEY (id, path))') + con.execute("CREATE TABLE IF NOT EXISTS %s;" % sql) + + # increment id if (id, path) is no longer unique + con.execute("""\ + CREATE TRIGGER IF NOT EXISTS increment AFTER INSERT ON comments + BEGIN + UPDATE comments SET + id=(SELECT MAX(id)+1 FROM comments WHERE path=NEW.path) + WHERE rowid=NEW.rowid; + END;""") + + def shutdown(self): + return + + def query2comment(self, query): + return Comment( + text=query[3], author=query[4], email=query[5], website=query[6], + parent=query[7], timestamp=query[2], id=query[0] + ) + + def add(self, path, c): + with sqlite3.connect(self.dbpath) as con: + keys = ','.join(self.fields) + values = ','.join('?'*len(self.fields)) + con.execute('INSERT INTO comments (%s) VALUES (%s);' % (keys, values), + [0, path, time.time(), c.text, c.author, c.email, c.website, c.parent]) + + def update(self, path, comment): + return + + def delete(self, path, comment): + return + + def retrieve(self, path, limit=20): + with sqlite3.connect(self.dbpath) as con: + rv = con.execute("SELECT * FROM comments WHERE path = '%s'" % path \ + + " ORDER BY id DESC;").fetchall() + + for item in rv: + yield self.query2comment(item) diff --git a/specs/test_db.py b/specs/test_db.py new file mode 100644 index 0000000..f00b4bb --- /dev/null +++ b/specs/test_db.py @@ -0,0 +1,36 @@ + +import shutil +import tempfile +import unittest + +from isso.comments import Comment +from isso.db import SQLite + + +class TestSQLite(unittest.TestCase): + + def setUp(self): + + self.path = tempfile.mkdtemp() + self.db = SQLite() + self.db.initialize({'DATA_DIR': self.path}) + + def test_add(self): + + self.db.add('/', Comment(text='Foo')) + self.db.add('/', Comment(text='Bar')) + self.db.add('/path/', Comment(text='Baz')) + + rv = list(self.db.retrieve('/')) + assert rv[0].id == 2 + assert rv[0].text == 'Bar' + + assert rv[1].id == 1 + assert rv[1].text == 'Foo' + + rv = list(self.db.retrieve('/path/')) + assert rv[0].id == 1 + assert rv[0].text == 'Baz' + + def tearDown(self): + shutil.rmtree(self.path)