parent
3905e84af1
commit
560e73cc0a
@ -1,277 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2012, Martin Zimmermann <info@posativ.org>. All rights reserved.
|
||||
# License: BSD Style, 2 clauses. see isso/__init__.py
|
||||
|
||||
import abc
|
||||
import time
|
||||
import sqlite3
|
||||
|
||||
from isso.utils import Bloomfilter
|
||||
from isso.models import Comment
|
||||
|
||||
|
||||
class Abstract:
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def __init__(self, dbpath, moderation):
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def add(self, path, comment):
|
||||
"""Add a new comment to the database. Returns a Comment object."""
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def activate(self, path, id):
|
||||
"""Activate comment id if pending and return comment for (path, id)."""
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def update(self, path, id, comment):
|
||||
"""
|
||||
Update an existing comment, but only writeable fields such as text,
|
||||
author, email, website and parent. This method should set the modified
|
||||
field to the current time.
|
||||
"""
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete(self, path, id):
|
||||
"""
|
||||
Delete a comment. There are two distinctions: a comment is referenced
|
||||
by another valid comment's parent attribute or stand-a-lone. In this
|
||||
case the comment can't be removed without losing depending comments.
|
||||
Hence, delete removes all visible data such as text, author, email,
|
||||
website sets the mode field to 4.
|
||||
|
||||
In the second case this comment can be safely removed without any side
|
||||
effects."""
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def retrieve(self, path, mode):
|
||||
"""
|
||||
Return all comments for `path` with `mode`.
|
||||
"""
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def recent(self, mode=7, limit=None):
|
||||
"""
|
||||
Return most recent comments with `mode`. If `limit` is None, return
|
||||
*all* comments that are currently stored, otherwise limit by `limit`.
|
||||
"""
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def like(self, path, id, remote_addr):
|
||||
"""+1 a given comment. Returns the new like count (may not change because
|
||||
the creater can't vote on his/her own comment and multiple votes from the
|
||||
same ip address are ignored as well)."""
|
||||
return
|
||||
|
||||
@abc.abstractmethod
|
||||
def count(self, path=None, mode=1):
|
||||
"""return count of comments for path (optional) and mode (defaults to
|
||||
visible, not deleted or moderated comments)."""
|
||||
return
|
||||
|
||||
|
||||
class Threads(object):
|
||||
|
||||
def __init__(self, dbpath):
|
||||
|
||||
self.dbpath = dbpath
|
||||
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
sql = ('main.threads (path VARCHAR(255) NOT NULL, title TEXT'
|
||||
'PRIMARY KEY path)')
|
||||
con.execute("CREATE TABLE IF NOT EXISTS %s;" % sql)
|
||||
|
||||
def get(self, path):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
rv = con.execute("SELECT title FROM threads WHERE path=?", (path,)).fetchone()
|
||||
if rv is not None:
|
||||
return rv[0]
|
||||
return None
|
||||
|
||||
def add(self, path, title):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
con.execute("INSERT INTO threads (path, title) VALUES (?, ?)", (path, title))
|
||||
return title
|
||||
|
||||
|
||||
class SQLite(Abstract):
|
||||
"""A basic :class:`Abstract` implementation using SQLite3. All comments
|
||||
share a single database. The tuple (id, path) acts as unique identifier
|
||||
for a comment. Multiple comments per path (= that is the URI to your blog
|
||||
post) are ordered by that id."""
|
||||
|
||||
fields = [
|
||||
'path', 'id', 'created', 'modified',
|
||||
'text', 'author', 'hash', 'website', 'parent', 'mode', 'voters'
|
||||
]
|
||||
|
||||
def __init__(self, dbpath, moderation):
|
||||
|
||||
self.dbpath = dbpath
|
||||
self.mode = 2 if moderation else 1
|
||||
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
sql = ('main.comments (path VARCHAR(255) NOT NULL, id INTEGER NOT NULL,'
|
||||
'created FLOAT NOT NULL, modified FLOAT, text VARCHAR,'
|
||||
'author VARCHAR(64), hash VARCHAR(32), website VARCHAR(64),'
|
||||
'parent INTEGER, mode INTEGER,'
|
||||
'likes INTEGER DEFAULT 0, dislikes INTEGER DEFAULT 0, voters BLOB NOT NULL,'
|
||||
'PRIMARY KEY (id, path))')
|
||||
con.execute("CREATE TABLE IF NOT EXISTS %s;" % sql)
|
||||
|
||||
# increment id if (id, path) is no longer unique
|
||||
con.execute("""\
|
||||
CREATE TRIGGER IF NOT EXISTS increment AFTER INSERT ON comments
|
||||
BEGIN
|
||||
UPDATE comments SET
|
||||
id=(SELECT MAX(id)+1 FROM comments WHERE path=NEW.path)
|
||||
WHERE rowid=NEW.rowid;
|
||||
END;""")
|
||||
|
||||
self.threads = Threads(self.dbpath)
|
||||
|
||||
def query2comment(self, query):
|
||||
if query is None:
|
||||
return None
|
||||
|
||||
return Comment(
|
||||
text=query[4], author=query[5], hash=query[6], website=query[7], parent=query[8],
|
||||
path=query[0], id=query[1], created=query[2], modified=query[3], mode=query[9],
|
||||
votes=query[10]
|
||||
)
|
||||
|
||||
def add(self, uri, c, remote_addr):
|
||||
voters = buffer(Bloomfilter(iterable=[remote_addr]).array)
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
keys = ','.join(self.fields)
|
||||
values = ','.join('?' * len(self.fields))
|
||||
con.execute('INSERT INTO comments (%s) VALUES (%s);' % (keys, values), (
|
||||
uri, 0, c['created'] or time.time(), None, c["text"], c["author"],
|
||||
c["hash"], c["website"], c["parent"], self.mode, voters)
|
||||
)
|
||||
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
return self.query2comment(
|
||||
con.execute('SELECT *, MAX(id) FROM comments WHERE path=?;', (uri, )).fetchone())
|
||||
|
||||
def activate(self, path, id):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
con.execute("UPDATE comments SET mode=1 WHERE path=? AND id=? AND mode=2", (path, id))
|
||||
return self.get(path, id)
|
||||
|
||||
def update(self, path, id, values):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
for key, value in values.iteritems():
|
||||
con.execute('UPDATE comments SET %s=? WHERE path=? AND id=?;' % key,
|
||||
(value, path, id))
|
||||
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
con.execute('UPDATE comments SET modified=? WHERE path=? AND id=?',
|
||||
(time.time(), path, id))
|
||||
return self.get(path, id)
|
||||
|
||||
def get(self, path, id):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
return self.query2comment(con.execute(
|
||||
'SELECT * FROM comments WHERE path=? AND id=?;', (path, id)).fetchone())
|
||||
|
||||
def _remove_stale(self, con, path):
|
||||
|
||||
sql = ('DELETE FROM',
|
||||
' comments',
|
||||
'WHERE',
|
||||
' path=? AND mode=4 AND id NOT IN (',
|
||||
' SELECT',
|
||||
' parent',
|
||||
' FROM',
|
||||
' comments',
|
||||
' WHERE path=? AND parent IS NOT NULL)')
|
||||
|
||||
while con.execute(' '.join(sql), (path, path)).rowcount:
|
||||
continue
|
||||
|
||||
def delete(self, path, id):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
sql = 'SELECT * FROM comments WHERE path=? AND parent=?'
|
||||
refs = con.execute(sql, (path, id)).fetchone()
|
||||
|
||||
if refs is None:
|
||||
con.execute('DELETE FROM comments WHERE path=? AND id=?', (path, id))
|
||||
self._remove_stale(con, path)
|
||||
return None
|
||||
|
||||
con.execute('UPDATE comments SET text=? WHERE path=? AND id=?', ('', path, id))
|
||||
con.execute('UPDATE comments SET mode=? WHERE path=? AND id=?', (4, path, id))
|
||||
for field in ('author', 'website'):
|
||||
con.execute('UPDATE comments SET %s=? WHERE path=? AND id=?' % field,
|
||||
(None, path, id))
|
||||
|
||||
self._remove_stale(con, path)
|
||||
|
||||
return self.get(path, id)
|
||||
|
||||
def like(self, path, id, remote_addr):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
rv = con.execute("SELECT likes, dislikes, voters FROM comments" \
|
||||
+ " WHERE path=? AND id=?", (path, id)).fetchone()
|
||||
|
||||
likes, dislikes, voters = rv
|
||||
if likes + dislikes >= 142:
|
||||
return likes
|
||||
|
||||
bf = Bloomfilter(bytearray(voters), likes + dislikes)
|
||||
if remote_addr in bf:
|
||||
return likes
|
||||
|
||||
bf.add(remote_addr)
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
con.execute("UPDATE comments SET likes = likes + 1 WHERE path=? AND id=?", (path, id))
|
||||
con.execute("UPDATE comments SET voters = ? WHERE path=? AND id=?", (
|
||||
buffer(bf.array), path, id))
|
||||
|
||||
return likes + 1
|
||||
|
||||
def count(self, path=None, mode=1):
|
||||
|
||||
if path is not None:
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
return con.execute("SELECT COUNT(*) FROM comments WHERE path=?" \
|
||||
+ " AND (? | mode) = ?", (path, mode, mode)).fetchone()
|
||||
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
return con.execute("SELECT COUNT(*) FROM comments WHERE" \
|
||||
+ "(? | mode) = ?", (path, mode, mode)).fetchone()
|
||||
|
||||
def retrieve(self, path, mode=5):
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
rv = con.execute("SELECT * FROM comments WHERE path=? AND (? | mode) = ?" \
|
||||
+ " ORDER BY id ASC;", (path, mode, mode)).fetchall()
|
||||
|
||||
for item in rv:
|
||||
yield self.query2comment(item)
|
||||
|
||||
def recent(self, mode=7, limit=None):
|
||||
|
||||
sql = 'SELECT * FROM comments WHERE (? | mode) = ? ORDER BY created DESC'
|
||||
args = [mode, mode]
|
||||
|
||||
if limit:
|
||||
sql += ' LIMIT ?'
|
||||
args.append(limit)
|
||||
|
||||
with sqlite3.connect(self.dbpath) as con:
|
||||
rv = con.execute(sql + ';', args).fetchall()
|
||||
|
||||
for item in rv:
|
||||
yield self.query2comment(item)
|
||||
|
@ -0,0 +1,27 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import sqlite3
|
||||
|
||||
from isso.db.comments import Comments
|
||||
from isso.db.threads import Threads
|
||||
|
||||
|
||||
class SQLite3:
|
||||
|
||||
connection = None
|
||||
|
||||
def __init__(self, path, moderation=False):
|
||||
|
||||
self.path = path
|
||||
self.mode = 2 if moderation else 1
|
||||
|
||||
self.threads = Threads(self)
|
||||
self.comments = Comments(self)
|
||||
|
||||
def execute(self, sql, args=()):
|
||||
|
||||
if isinstance(sql, (list, tuple)):
|
||||
sql = ' '.join(sql)
|
||||
|
||||
with sqlite3.connect(self.path) as con:
|
||||
return con.execute(sql, args)
|
@ -0,0 +1,179 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
import time
|
||||
import sqlite3
|
||||
|
||||
from isso.utils import Bloomfilter
|
||||
|
||||
|
||||
class Comments:
|
||||
"""Hopefully DB-independend SQL to store, modify and retrieve all
|
||||
comment-related actions. Here's a short scheme overview:
|
||||
|
||||
| tid (thread id) | cid (comment id) | parent | ... | likes | remote_addr |
|
||||
+-----------------+------------------+--------+-----+-------+-------------+
|
||||
| 1 | 1 | null | ... | BLOB | 127.0.0.0 |
|
||||
| 1 | 2 | 1 | ... | BLOB | 127.0.0.0 |
|
||||
+-----------------+------------------+--------+-----+-------+-------------+
|
||||
|
||||
The tuple (tid, cid) is unique and thus primary key.
|
||||
"""
|
||||
|
||||
fields = ['tid', 'id', 'parent', 'created', 'modified', 'mode', 'remote_addr',
|
||||
'text', 'author', 'email', 'website', 'likes', 'dislikes', 'voters']
|
||||
|
||||
def __init__(self, db):
|
||||
|
||||
self.db = db
|
||||
self.db.execute([
|
||||
'CREATE TABLE IF NOT EXISTS comments (',
|
||||
' tid REFERENCES threads(id), id INTEGER PRIMARY KEY, parent INTEGER,',
|
||||
' created FLOAT NOT NULL, modified FLOAT, mode INTEGER, remote_addr VARCHAR,',
|
||||
' text VARCHAR, author VARCHAR, email VARCHAR, website VARCHAR,',
|
||||
' likes INTEGER DEFAULT 0, dislikes INTEGER DEFAULT 0, voters BLOB NOT NULL);'])
|
||||
|
||||
def add(self, uri, c):
|
||||
"""
|
||||
Add a new comment to the database and return public fields as dict.
|
||||
Initializes voter bloom array with provided :param:`remote_addr` and
|
||||
adds a new thread to the `main.threads` table.
|
||||
"""
|
||||
self.db.execute([
|
||||
'INSERT INTO comments (',
|
||||
' tid, parent,'
|
||||
' created, modified, mode, remote_addr,',
|
||||
' text, author, email, website, voters )',
|
||||
'SELECT',
|
||||
' threads.id, ?,',
|
||||
' ?, ?, ?, ?,',
|
||||
' ?, ?, ?, ?, ?',
|
||||
'FROM threads WHERE threads.uri = ?;'], (
|
||||
c.get('parent'),
|
||||
c.get('created') or time.time(), None, self.db.mode, c['remote_addr'],
|
||||
c['text'], c.get('author'), c.get('email'), c.get('website'), buffer(
|
||||
Bloomfilter(iterable=[c['remote_addr']]).array),
|
||||
uri)
|
||||
)
|
||||
|
||||
return dict(zip(Comments.fields, self.db.execute(
|
||||
'SELECT *, MAX(c.id) FROM comments AS c INNER JOIN threads ON threads.uri = ?',
|
||||
(uri, )).fetchone()))
|
||||
|
||||
# def activate(self, path, id):
|
||||
# """Activate comment id if pending and return comment for (path, id)."""
|
||||
# with sqlite3.connect(self.dbpath) as con:
|
||||
# con.execute("UPDATE comments SET mode=1 WHERE path=? AND id=? AND mode=2", (path, id))
|
||||
# return self.get(path, id)
|
||||
|
||||
def update(self, id, data):
|
||||
"""
|
||||
Update an existing comment, but only writeable fields such as text,
|
||||
author, email, website and parent. This method should set the modified
|
||||
field to the current time.
|
||||
"""
|
||||
|
||||
self.db.execute([
|
||||
'UPDATE comments SET',
|
||||
','.join(key + '=' + '?' for key in data),
|
||||
'WHERE id=?;'],
|
||||
data.values() + [id])
|
||||
|
||||
return self.get(id)
|
||||
|
||||
def get(self, id):
|
||||
|
||||
rv = self.db.execute('SELECT * FROM comments WHERE id=?', (id, )).fetchone()
|
||||
if rv:
|
||||
return dict(zip(Comments.fields, rv))
|
||||
|
||||
return None
|
||||
|
||||
def fetch(self, uri, mode=5):
|
||||
"""
|
||||
Return all comments for `path` with `mode`.
|
||||
"""
|
||||
rv = self.db.execute([
|
||||
'SELECT comments.* FROM comments INNER JOIN threads ON',
|
||||
' threads.uri=? AND comments.tid=threads.id AND (? | comments.mode) = ?'
|
||||
'ORDER BY id ASC;'], (uri, mode, mode)).fetchall()
|
||||
|
||||
for item in rv:
|
||||
yield dict(zip(Comments.fields, item))
|
||||
|
||||
def _remove_stale(self):
|
||||
|
||||
sql = ('DELETE FROM',
|
||||
' comments',
|
||||
'WHERE',
|
||||
' mode=4 AND id NOT IN (',
|
||||
' SELECT',
|
||||
' parent',
|
||||
' FROM',
|
||||
' comments',
|
||||
' WHERE parent IS NOT NULL)')
|
||||
|
||||
while self.db.execute(sql).rowcount:
|
||||
continue
|
||||
|
||||
def delete(self, id):
|
||||
"""
|
||||
Delete a comment. There are two distinctions: a comment is referenced
|
||||
by another valid comment's parent attribute or stand-a-lone. In this
|
||||
case the comment can't be removed without losing depending comments.
|
||||
Hence, delete removes all visible data such as text, author, email,
|
||||
website sets the mode field to 4.
|
||||
|
||||
In the second case this comment can be safely removed without any side
|
||||
effects."""
|
||||
|
||||
refs = self.db.execute('SELECT * FROM comments WHERE parent=?', (id, )).fetchone()
|
||||
|
||||
if refs is None:
|
||||
self.db.execute('DELETE FROM comments WHERE id=?', (id, ))
|
||||
self._remove_stale()
|
||||
return None
|
||||
|
||||
self.db.execute('UPDATE comments SET text=? WHERE id=?', ('', id))
|
||||
self.db.execute('UPDATE comments SET mode=? WHERE id=?', (4, id))
|
||||
for field in ('author', 'website'):
|
||||
self.db.execute('UPDATE comments SET %s=? WHERE id=?' % field, (None, id))
|
||||
|
||||
self._remove_stale()
|
||||
return self.get(id)
|
||||
|
||||
def like(self, id, remote_addr):
|
||||
"""+1 a given comment. Returns the new like count (may not change because
|
||||
the creater can't vote on his/her own comment and multiple votes from the
|
||||
same ip address are ignored as well)."""
|
||||
|
||||
rv = self.db.execute(
|
||||
'SELECT likes, dislikes, voters FROM comments WHERE id=?', (id, )) \
|
||||
.fetchone()
|
||||
|
||||
if rv is None:
|
||||
return 0
|
||||
|
||||
likes, dislikes, voters = rv
|
||||
if likes + dislikes >= 142:
|
||||
return likes
|
||||
|
||||
bf = Bloomfilter(bytearray(voters), likes + dislikes)
|
||||
if remote_addr in bf:
|
||||
return likes
|
||||
|
||||
bf.add(remote_addr)
|
||||
self.db.execute([
|
||||
'UPDATE comments SET',
|
||||
' likes = likes + 1, voters = ?',
|
||||
'WHERE id=?;'], (buffer(bf.array), id))
|
||||
|
||||
return likes + 1
|
||||
|
||||
def count(self, uri):
|
||||
"""
|
||||
return count of comments for uri.
|
||||
"""
|
||||
return self.db.execute([
|
||||
'SELECT COUNT(comments.id) FROM comments INNER JOIN threads ON',
|
||||
' threads.uri=? AND comments.tid=threads.id AND comments.mode=1;'],
|
||||
(uri, )).fetchone()
|
@ -0,0 +1,29 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
|
||||
|
||||
class Thread(object):
|
||||
|
||||
def __init__(self, id, uri, title):
|
||||
self.id = id
|
||||
self.uri = uri
|
||||
self.title = title
|
||||
|
||||
|
||||
class Threads(object):
|
||||
|
||||
def __init__(self, db):
|
||||
|
||||
self.db = db
|
||||
self.db.execute([
|
||||
'CREATE TABLE IF NOT EXISTS threads (',
|
||||
' id INTEGER PRIMARY KEY, uri VARCHAR(256) UNIQUE, title VARCHAR(256))'])
|
||||
|
||||
def __contains__(self, uri):
|
||||
return self.db.execute("SELECT title FROM threads WHERE uri=?", (uri, )) \
|
||||
.fetchone() is not None
|
||||
|
||||
def __getitem__(self, uri):
|
||||
return Thread(*self.db.execute("SELECT * FROM threads WHERE uri=?", (uri, )).fetchone())
|
||||
|
||||
def new(self, uri, title):
|
||||
self.db.execute("INSERT INTO threads (uri, title) VALUES (?, ?)", (uri, title))
|
@ -1,63 +0,0 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2012, Martin Zimmermann <info@posativ.org>. All rights reserved.
|
||||
# License: BSD Style, 2 clauses. see isso/__init__.py
|
||||
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import hashlib
|
||||
|
||||
|
||||
class Comment(object):
|
||||
"""This class represents a regular comment. It needs at least a text
|
||||
field, all other fields are optional (or automatically set by the
|
||||
database driver.
|
||||
|
||||
The field `mode` has a special meaning:
|
||||
|
||||
1: normal
|
||||
2: in moderation queue
|
||||
4: deleted
|
||||
|
||||
You can query for them like with UNIX permission bits, so you get both
|
||||
normal and queued using MODE=3.
|
||||
"""
|
||||
|
||||
fields = ["text", "author", "website", "votes", "hash", "parent", "mode", "id",
|
||||
"created", "modified"]
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
|
||||
self.values = {}
|
||||
|
||||
for key in Comment.fields:
|
||||
self.values[key] = kwargs.get(key, None)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.values[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.values[key] = value
|
||||
|
||||
def iteritems(self):
|
||||
for key in Comment.fields:
|
||||
yield key, self.values[key]
|
||||
|
||||
@property
|
||||
def pending(self):
|
||||
return self.values["mode"] == 2
|
||||
|
||||
@property
|
||||
def deleted(self):
|
||||
return self.values["mode"] == 4
|
||||
|
||||
@property
|
||||
def md5(self):
|
||||
hv = hashlib.md5()
|
||||
|
||||
for key, value in self.iteritems():
|
||||
if key == "parent" or value is None:
|
||||
continue
|
||||
hv.update(unicode(self.values.get(key, "")).encode("utf-8", "replace")) # XXX
|
||||
|
||||
return hv.hexdigest()
|
@ -1,103 +0,0 @@
|
||||
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from isso.models import Comment
|
||||
from isso.db import SQLite
|
||||
|
||||
|
||||
class TestSQLite(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
fd, self.path = tempfile.mkstemp()
|
||||
self.db = SQLite(self.path, False)
|
||||
|
||||
def test_get(self):
|
||||
|
||||
rv = self.db.add('/', Comment(text='Spam'), '')
|
||||
c = self.db.get('/', rv["id"])
|
||||
|
||||
assert c["id"] == 1
|
||||
assert c["text"] == 'Spam'
|
||||
|
||||
def test_add(self):
|
||||
|
||||
self.db.add('/', Comment(text='Foo'), '')
|
||||
self.db.add('/', Comment(text='Bar'), '')
|
||||
self.db.add('/path/', Comment(text='Baz'), '')
|
||||
|
||||
rv = list(self.db.retrieve('/'))
|
||||
assert rv[0]["id"] == 1
|
||||
assert rv[0]["text"] == 'Foo'
|
||||
|
||||
assert rv[1]["id"] == 2
|
||||
assert rv[1]["text"] == 'Bar'
|
||||
|
||||
rv = list(self.db.retrieve('/path/'))
|
||||
assert rv[0]["id"] == 1
|
||||
assert rv[0]["text"] == 'Baz'
|
||||
|
||||
def test_add_return(self):
|
||||
|
||||
self.db.add('/', Comment(text='1'), '')
|
||||
self.db.add('/', Comment(text='2'), '')
|
||||
|
||||
assert self.db.add('/path/', Comment(text='1'), '')["id"] == 1
|
||||
|
||||
def test_update(self):
|
||||
|
||||
rv = self.db.add('/', Comment(text='Foo'), '')
|
||||
time.sleep(0.1)
|
||||
rv = self.db.update('/', rv["id"], {"text": "Bla"})
|
||||
c = self.db.get('/', rv["id"])
|
||||
|
||||
assert c["id"] == 1
|
||||
assert c["text"] == 'Bla'
|
||||
assert c["created"] < c["modified"]
|
||||
|
||||
def test_delete(self):
|
||||
|
||||
rv = self.db.add('/', Comment(
|
||||
text='F**CK', author='P*NIS', website='http://somebadhost.org/'), '')
|
||||
assert self.db.delete('/', rv["id"]) == None
|
||||
|
||||
def test_recent(self):
|
||||
|
||||
self.db.add('/path/', Comment(text='2'), '')
|
||||
|
||||
for x in range(5):
|
||||
self.db.add('/', Comment(text='%i' % (x+1)), '')
|
||||
|
||||
assert len(list(self.db.recent(mode=7))) == 6
|
||||
assert len(list(self.db.recent(mode=7, limit=5))) == 5
|
||||
|
||||
def tearDown(self):
|
||||
os.unlink(self.path)
|
||||
|
||||
|
||||
class TestSQLitePending(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
||||
fd, self.path = tempfile.mkstemp()
|
||||
self.db = SQLite(self.path, True)
|
||||
|
||||
def test_retrieve(self):
|
||||
|
||||
self.db.add('/', Comment(text='Foo'), '')
|
||||
assert len(list(self.db.retrieve('/'))) == 0
|
||||
|
||||
def test_activate(self):
|
||||
|
||||
self.db.add('/', Comment(text='Foo'), '')
|
||||
self.db.add('/', Comment(text='Bar'), '')
|
||||
self.db.activate('/', 2)
|
||||
|
||||
assert len(list(self.db.retrieve('/'))) == 1
|
||||
assert len(list(self.db.retrieve('/', mode=3))) == 2
|
||||
|
||||
def tearDown(self):
|
||||
os.unlink(self.path)
|
Loading…
Reference in new issue