136 lines
3.1 KiB
Go
136 lines
3.1 KiB
Go
|
package bolt
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// Batch calls fn as part of a batch. It behaves similar to Update,
|
||
|
// except:
|
||
|
//
|
||
|
// 1. concurrent Batch calls can be combined into a single Bolt
|
||
|
// transaction.
|
||
|
//
|
||
|
// 2. the function passed to Batch may be called multiple times,
|
||
|
// regardless of whether it returns error or not.
|
||
|
//
|
||
|
// This means that Batch function side effects must be idempotent and
|
||
|
// take permanent effect only after a successful return is seen in
|
||
|
// caller.
|
||
|
//
|
||
|
// Batch is only useful when there are multiple goroutines calling it.
|
||
|
func (db *DB) Batch(fn func(*Tx) error) error {
|
||
|
errCh := make(chan error, 1)
|
||
|
|
||
|
db.batchMu.Lock()
|
||
|
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
|
||
|
// There is no existing batch, or the existing batch is full; start a new one.
|
||
|
db.batch = &batch{
|
||
|
db: db,
|
||
|
}
|
||
|
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
|
||
|
}
|
||
|
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
|
||
|
if len(db.batch.calls) >= db.MaxBatchSize {
|
||
|
// wake up batch, it's ready to run
|
||
|
go db.batch.trigger()
|
||
|
}
|
||
|
db.batchMu.Unlock()
|
||
|
|
||
|
err := <-errCh
|
||
|
if err == trySolo {
|
||
|
err = db.Update(fn)
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
type call struct {
|
||
|
fn func(*Tx) error
|
||
|
err chan<- error
|
||
|
}
|
||
|
|
||
|
type batch struct {
|
||
|
db *DB
|
||
|
timer *time.Timer
|
||
|
start sync.Once
|
||
|
calls []call
|
||
|
}
|
||
|
|
||
|
// trigger runs the batch if it hasn't already been run.
|
||
|
func (b *batch) trigger() {
|
||
|
b.start.Do(b.run)
|
||
|
}
|
||
|
|
||
|
// run performs the transactions in the batch and communicates results
|
||
|
// back to DB.Batch.
|
||
|
func (b *batch) run() {
|
||
|
b.db.batchMu.Lock()
|
||
|
b.timer.Stop()
|
||
|
// Make sure no new work is added to this batch, but don't break
|
||
|
// other batches.
|
||
|
if b.db.batch == b {
|
||
|
b.db.batch = nil
|
||
|
}
|
||
|
b.db.batchMu.Unlock()
|
||
|
|
||
|
retry:
|
||
|
for len(b.calls) > 0 {
|
||
|
var failIdx = -1
|
||
|
err := b.db.Update(func(tx *Tx) error {
|
||
|
for i, c := range b.calls {
|
||
|
if err := safelyCall(c.fn, tx); err != nil {
|
||
|
failIdx = i
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
})
|
||
|
|
||
|
if failIdx >= 0 {
|
||
|
// take the failing transaction out of the batch. it's
|
||
|
// safe to shorten b.calls here because db.batch no longer
|
||
|
// points to us, and we hold the mutex anyway.
|
||
|
c := b.calls[failIdx]
|
||
|
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
|
||
|
// tell the submitter re-run it solo, continue with the rest of the batch
|
||
|
c.err <- trySolo
|
||
|
continue retry
|
||
|
}
|
||
|
|
||
|
// pass success, or bolt internal errors, to all callers
|
||
|
for _, c := range b.calls {
|
||
|
if c.err != nil {
|
||
|
c.err <- err
|
||
|
}
|
||
|
}
|
||
|
break retry
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// trySolo is a special sentinel error value used for signaling that a
|
||
|
// transaction function should be re-run. It should never be seen by
|
||
|
// callers.
|
||
|
var trySolo = errors.New("batch function returned an error and should be re-run solo")
|
||
|
|
||
|
type panicked struct {
|
||
|
reason interface{}
|
||
|
}
|
||
|
|
||
|
func (p panicked) Error() string {
|
||
|
if err, ok := p.reason.(error); ok {
|
||
|
return err.Error()
|
||
|
}
|
||
|
return fmt.Sprintf("panic: %v", p.reason)
|
||
|
}
|
||
|
|
||
|
func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
|
||
|
defer func() {
|
||
|
if p := recover(); p != nil {
|
||
|
err = panicked{p}
|
||
|
}
|
||
|
}()
|
||
|
return fn(tx)
|
||
|
}
|