clair/vendor/github.com/google/cayley/graph/leveldb/quadstore.go
2015-11-13 14:11:28 -05:00

515 lines
12 KiB
Go

// Copyright 2014 The Cayley Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package leveldb
import (
"bytes"
"crypto/sha1"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"hash"
"sync"
"github.com/barakmich/glog"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/google/cayley/graph"
"github.com/google/cayley/graph/iterator"
"github.com/google/cayley/quad"
)
func init() {
graph.RegisterQuadStore(QuadStoreType, graph.QuadStoreRegistration{
NewFunc: newQuadStore,
NewForRequestFunc: nil,
UpgradeFunc: nil,
InitFunc: createNewLevelDB,
IsPersistent: true,
})
}
const (
DefaultCacheSize = 2
DefaultWriteBufferSize = 20
QuadStoreType = "leveldb"
)
var (
hashPool = sync.Pool{
New: func() interface{} { return sha1.New() },
}
hashSize = sha1.Size
)
type Token []byte
func (t Token) Key() interface{} {
return string(t)
}
type QuadStore struct {
dbOpts *opt.Options
db *leveldb.DB
path string
open bool
size int64
horizon int64
writeopts *opt.WriteOptions
readopts *opt.ReadOptions
}
func createNewLevelDB(path string, _ graph.Options) error {
opts := &opt.Options{}
db, err := leveldb.OpenFile(path, opts)
if err != nil {
glog.Errorf("Error: could not create database: %v", err)
return err
}
defer db.Close()
qs := &QuadStore{}
qs.db = db
qs.writeopts = &opt.WriteOptions{
Sync: true,
}
qs.Close()
return nil
}
func newQuadStore(path string, options graph.Options) (graph.QuadStore, error) {
var qs QuadStore
var err error
qs.path = path
cacheSize := DefaultCacheSize
val, ok, err := options.IntKey("cache_size_mb")
if err != nil {
return nil, err
} else if ok {
cacheSize = val
}
qs.dbOpts = &opt.Options{
BlockCacheCapacity: cacheSize * opt.MiB,
}
qs.dbOpts.ErrorIfMissing = true
writeBufferSize := DefaultWriteBufferSize
val, ok, err = options.IntKey("writeBufferSize")
if err != nil {
return nil, err
} else if ok {
writeBufferSize = val
}
qs.dbOpts.WriteBuffer = writeBufferSize * opt.MiB
qs.writeopts = &opt.WriteOptions{
Sync: false,
}
qs.readopts = &opt.ReadOptions{}
db, err := leveldb.OpenFile(qs.path, qs.dbOpts)
if err != nil {
glog.Errorln("Error, could not open! ", err)
return nil, err
}
qs.db = db
glog.Infoln(qs.GetStats())
err = qs.getMetadata()
if err != nil {
return nil, err
}
return &qs, nil
}
func (qs *QuadStore) GetStats() string {
out := ""
stats, err := qs.db.GetProperty("leveldb.stats")
if err == nil {
out += fmt.Sprintln("Stats: ", stats)
}
out += fmt.Sprintln("Size: ", qs.size)
return out
}
func (qs *QuadStore) Size() int64 {
return qs.size
}
func (qs *QuadStore) Horizon() graph.PrimaryKey {
return graph.NewSequentialKey(qs.horizon)
}
func hashOf(s string) []byte {
h := hashPool.Get().(hash.Hash)
h.Reset()
defer hashPool.Put(h)
key := make([]byte, 0, hashSize)
h.Write([]byte(s))
key = h.Sum(key)
return key
}
func (qs *QuadStore) createKeyFor(d [4]quad.Direction, q quad.Quad) []byte {
key := make([]byte, 0, 2+(hashSize*4))
// TODO(kortschak) Remove dependence on String() method.
key = append(key, []byte{d[0].Prefix(), d[1].Prefix()}...)
key = append(key, hashOf(q.Get(d[0]))...)
key = append(key, hashOf(q.Get(d[1]))...)
key = append(key, hashOf(q.Get(d[2]))...)
key = append(key, hashOf(q.Get(d[3]))...)
return key
}
func (qs *QuadStore) createValueKeyFor(s string) []byte {
key := make([]byte, 0, 1+hashSize)
key = append(key, []byte("z")...)
key = append(key, hashOf(s)...)
return key
}
type IndexEntry struct {
quad.Quad
History []int64
}
// Short hand for direction permutations.
var (
spo = [4]quad.Direction{quad.Subject, quad.Predicate, quad.Object, quad.Label}
osp = [4]quad.Direction{quad.Object, quad.Subject, quad.Predicate, quad.Label}
pos = [4]quad.Direction{quad.Predicate, quad.Object, quad.Subject, quad.Label}
cps = [4]quad.Direction{quad.Label, quad.Predicate, quad.Subject, quad.Object}
)
func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOpts) error {
batch := &leveldb.Batch{}
resizeMap := make(map[string]int64)
sizeChange := int64(0)
for _, d := range deltas {
if d.Action != graph.Add && d.Action != graph.Delete {
return errors.New("leveldb: invalid action")
}
bytes, err := json.Marshal(d)
if err != nil {
return err
}
batch.Put(keyFor(d), bytes)
err = qs.buildQuadWrite(batch, d.Quad, d.ID.Int(), d.Action == graph.Add)
if err != nil {
if err == graph.ErrQuadExists && ignoreOpts.IgnoreDup {
continue
}
if err == graph.ErrQuadNotExist && ignoreOpts.IgnoreMissing {
continue
}
return err
}
delta := int64(1)
if d.Action == graph.Delete {
delta = int64(-1)
}
resizeMap[d.Quad.Subject] += delta
resizeMap[d.Quad.Predicate] += delta
resizeMap[d.Quad.Object] += delta
if d.Quad.Label != "" {
resizeMap[d.Quad.Label] += delta
}
sizeChange += delta
qs.horizon = d.ID.Int()
}
for k, v := range resizeMap {
if v != 0 {
err := qs.UpdateValueKeyBy(k, v, batch)
if err != nil {
return err
}
}
}
err := qs.db.Write(batch, qs.writeopts)
if err != nil {
glog.Error("could not write to DB for quadset.")
return err
}
qs.size += sizeChange
return nil
}
func keyFor(d graph.Delta) []byte {
key := make([]byte, 0, 19)
key = append(key, 'd')
key = append(key, []byte(fmt.Sprintf("%018x", d.ID.Int()))...)
return key
}
func (qs *QuadStore) buildQuadWrite(batch *leveldb.Batch, q quad.Quad, id int64, isAdd bool) error {
var entry IndexEntry
data, err := qs.db.Get(qs.createKeyFor(spo, q), qs.readopts)
if err != nil && err != leveldb.ErrNotFound {
glog.Error("could not access DB to prepare index: ", err)
return err
}
if err == nil {
// We got something.
err = json.Unmarshal(data, &entry)
if err != nil {
return err
}
} else {
entry.Quad = q
}
if isAdd && len(entry.History)%2 == 1 {
glog.Errorf("attempt to add existing quad %v: %#v", entry, q)
return graph.ErrQuadExists
}
if !isAdd && len(entry.History)%2 == 0 {
glog.Error("attempt to delete non-existent quad %v: %#c", entry, q)
return graph.ErrQuadNotExist
}
entry.History = append(entry.History, id)
bytes, err := json.Marshal(entry)
if err != nil {
glog.Errorf("could not write to buffer for entry %#v: %s", entry, err)
return err
}
batch.Put(qs.createKeyFor(spo, q), bytes)
batch.Put(qs.createKeyFor(osp, q), bytes)
batch.Put(qs.createKeyFor(pos, q), bytes)
if q.Get(quad.Label) != "" {
batch.Put(qs.createKeyFor(cps, q), bytes)
}
return nil
}
type ValueData struct {
Name string
Size int64
}
func (qs *QuadStore) UpdateValueKeyBy(name string, amount int64, batch *leveldb.Batch) error {
value := &ValueData{name, amount}
key := qs.createValueKeyFor(name)
b, err := qs.db.Get(key, qs.readopts)
// Error getting the node from the database.
if err != nil && err != leveldb.ErrNotFound {
glog.Errorf("Error reading Value %s from the DB.", name)
return err
}
// Node exists in the database -- unmarshal and update.
if b != nil && err != leveldb.ErrNotFound {
err = json.Unmarshal(b, value)
if err != nil {
glog.Errorf("Error: could not reconstruct value: %v", err)
return err
}
value.Size += amount
}
// Are we deleting something?
if value.Size <= 0 {
value.Size = 0
}
// Repackage and rewrite.
bytes, err := json.Marshal(&value)
if err != nil {
glog.Errorf("could not write to buffer for value %s: %s", name, err)
return err
}
if batch == nil {
qs.db.Put(key, bytes, qs.writeopts)
} else {
batch.Put(key, bytes)
}
return nil
}
func (qs *QuadStore) Close() {
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, qs.size)
if err == nil {
werr := qs.db.Put([]byte("__size"), buf.Bytes(), qs.writeopts)
if werr != nil {
glog.Error("could not write size before closing!")
}
} else {
glog.Errorf("could not convert size before closing!")
}
buf.Reset()
err = binary.Write(buf, binary.LittleEndian, qs.horizon)
if err == nil {
werr := qs.db.Put([]byte("__horizon"), buf.Bytes(), qs.writeopts)
if werr != nil {
glog.Error("could not write horizon before closing!")
}
} else {
glog.Errorf("could not convert horizon before closing!")
}
qs.db.Close()
qs.open = false
}
func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
var q quad.Quad
b, err := qs.db.Get(k.(Token), qs.readopts)
if err != nil && err != leveldb.ErrNotFound {
glog.Error("Error: could not get quad from DB.")
return quad.Quad{}
}
if err == leveldb.ErrNotFound {
// No harm, no foul.
return quad.Quad{}
}
err = json.Unmarshal(b, &q)
if err != nil {
glog.Error("Error: could not reconstruct quad.")
return quad.Quad{}
}
return q
}
func (qs *QuadStore) ValueOf(s string) graph.Value {
return Token(qs.createValueKeyFor(s))
}
func (qs *QuadStore) valueData(key []byte) ValueData {
var out ValueData
if glog.V(3) {
glog.V(3).Infof("%c %v", key[0], key)
}
b, err := qs.db.Get(key, qs.readopts)
if err != nil && err != leveldb.ErrNotFound {
glog.Errorln("Error: could not get value from DB")
return out
}
if b != nil && err != leveldb.ErrNotFound {
err = json.Unmarshal(b, &out)
if err != nil {
glog.Errorln("Error: could not reconstruct value")
return ValueData{}
}
}
return out
}
func (qs *QuadStore) NameOf(k graph.Value) string {
if k == nil {
glog.V(2).Info("k was nil")
return ""
}
return qs.valueData(k.(Token)).Name
}
func (qs *QuadStore) SizeOf(k graph.Value) int64 {
if k == nil {
return 0
}
return int64(qs.valueData(k.(Token)).Size)
}
func (qs *QuadStore) getInt64ForKey(key string, empty int64) (int64, error) {
var out int64
b, err := qs.db.Get([]byte(key), qs.readopts)
if err != nil && err != leveldb.ErrNotFound {
glog.Errorln("could not read " + key + ": " + err.Error())
return 0, err
}
if err == leveldb.ErrNotFound {
// Must be a new database. Cool
return empty, nil
}
buf := bytes.NewBuffer(b)
err = binary.Read(buf, binary.LittleEndian, &out)
if err != nil {
glog.Errorln("Error: could not parse", key)
return 0, err
}
return out, nil
}
func (qs *QuadStore) getMetadata() error {
var err error
qs.size, err = qs.getInt64ForKey("__size", 0)
if err != nil {
return err
}
qs.horizon, err = qs.getInt64ForKey("__horizon", 0)
return err
}
func (qs *QuadStore) SizeOfPrefix(pre []byte) (int64, error) {
limit := make([]byte, len(pre))
copy(limit, pre)
end := len(limit) - 1
limit[end]++
ranges := make([]util.Range, 1)
ranges[0].Start = pre
ranges[0].Limit = limit
sizes, err := qs.db.SizeOf(ranges)
if err == nil {
return (int64(sizes[0]) >> 6) + 1, nil
}
return 0, nil
}
func (qs *QuadStore) QuadIterator(d quad.Direction, val graph.Value) graph.Iterator {
var prefix string
switch d {
case quad.Subject:
prefix = "sp"
case quad.Predicate:
prefix = "po"
case quad.Object:
prefix = "os"
case quad.Label:
prefix = "cp"
default:
panic("unreachable " + d.String())
}
return NewIterator(prefix, d, val, qs)
}
func (qs *QuadStore) NodesAllIterator() graph.Iterator {
return NewAllIterator("z", quad.Any, qs)
}
func (qs *QuadStore) QuadsAllIterator() graph.Iterator {
return NewAllIterator("po", quad.Predicate, qs)
}
func (qs *QuadStore) QuadDirection(val graph.Value, d quad.Direction) graph.Value {
v := val.(Token)
offset := PositionOf(v[0:2], d, qs)
if offset != -1 {
return Token(append([]byte("z"), v[offset:offset+hashSize]...))
}
return Token(qs.Quad(val).Get(d))
}
func compareBytes(a, b graph.Value) bool {
return bytes.Equal(a.(Token), b.(Token))
}
func (qs *QuadStore) FixedIterator() graph.FixedIterator {
return iterator.NewFixed(compareBytes)
}
func (qs *QuadStore) Type() string {
return QuadStoreType
}