You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
clair/vendor/gopkg.in/mgo.v2/txn/flusher.go

986 lines
26 KiB

9 years ago
package txn
import (
"fmt"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
)
func flush(r *Runner, t *transaction) error {
f := &flusher{
Runner: r,
goal: t,
goalKeys: make(map[docKey]bool),
queue: make(map[docKey][]token),
debugId: debugPrefix(),
}
for _, dkey := range f.goal.docKeys() {
f.goalKeys[dkey] = true
}
return f.run()
}
type flusher struct {
*Runner
goal *transaction
goalKeys map[docKey]bool
queue map[docKey][]token
debugId string
}
func (f *flusher) run() (err error) {
if chaosEnabled {
defer f.handleChaos(&err)
}
f.debugf("Processing %s", f.goal)
seen := make(map[bson.ObjectId]*transaction)
if err := f.recurse(f.goal, seen); err != nil {
return err
}
if f.goal.done() {
return nil
}
// Sparse workloads will generally be managed entirely by recurse.
// Getting here means one or more transactions have dependencies
// and perhaps cycles.
// Build successors data for Tarjan's sort. Must consider
// that entries in txn-queue are not necessarily valid.
successors := make(map[bson.ObjectId][]bson.ObjectId)
ready := true
for _, dqueue := range f.queue {
NextPair:
for i := 0; i < len(dqueue); i++ {
pred := dqueue[i]
predid := pred.id()
predt := seen[predid]
if predt == nil || predt.Nonce != pred.nonce() {
continue
}
predsuccids, ok := successors[predid]
if !ok {
successors[predid] = nil
}
for j := i + 1; j < len(dqueue); j++ {
succ := dqueue[j]
succid := succ.id()
succt := seen[succid]
if succt == nil || succt.Nonce != succ.nonce() {
continue
}
if _, ok := successors[succid]; !ok {
successors[succid] = nil
}
// Found a valid pred/succ pair.
i = j - 1
for _, predsuccid := range predsuccids {
if predsuccid == succid {
continue NextPair
}
}
successors[predid] = append(predsuccids, succid)
if succid == f.goal.Id {
// There are still pre-requisites to handle.
ready = false
}
continue NextPair
}
}
}
f.debugf("Queues: %v", f.queue)
f.debugf("Successors: %v", successors)
if ready {
f.debugf("Goal %s has no real pre-requisites", f.goal)
return f.advance(f.goal, nil, true)
}
// Robert Tarjan's algorithm for detecting strongly-connected
// components is used for topological sorting and detecting
// cycles at once. The order in which transactions are applied
// in commonly affected documents must be a global agreement.
sorted := tarjanSort(successors)
if debugEnabled {
f.debugf("Tarjan output: %v", sorted)
}
pull := make(map[bson.ObjectId]*transaction)
for i := len(sorted) - 1; i >= 0; i-- {
scc := sorted[i]
f.debugf("Flushing %v", scc)
if len(scc) == 1 {
pull[scc[0]] = seen[scc[0]]
}
for _, id := range scc {
if err := f.advance(seen[id], pull, true); err != nil {
return err
}
}
if len(scc) > 1 {
for _, id := range scc {
pull[id] = seen[id]
}
}
}
return nil
}
func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error {
seen[t.Id] = t
err := f.advance(t, nil, false)
if err != errPreReqs {
return err
}
for _, dkey := range t.docKeys() {
for _, dtt := range f.queue[dkey] {
id := dtt.id()
if seen[id] != nil {
continue
}
qt, err := f.load(id)
if err != nil {
return err
}
err = f.recurse(qt, seen)
if err != nil {
return err
}
}
}
return nil
}
func (f *flusher) advance(t *transaction, pull map[bson.ObjectId]*transaction, force bool) error {
for {
switch t.State {
case tpreparing, tprepared:
revnos, err := f.prepare(t, force)
if err != nil {
return err
}
if t.State != tprepared {
continue
}
if err = f.assert(t, revnos, pull); err != nil {
return err
}
if t.State != tprepared {
continue
}
if err = f.checkpoint(t, revnos); err != nil {
return err
}
case tapplying:
return f.apply(t, pull)
case taborting:
return f.abortOrReload(t, nil, pull)
case tapplied, taborted:
return nil
default:
panic(fmt.Errorf("transaction in unknown state: %q", t.State))
}
}
panic("unreachable")
}
type stash string
const (
stashStable stash = ""
stashInsert stash = "insert"
stashRemove stash = "remove"
)
type txnInfo struct {
Queue []token `bson:"txn-queue"`
Revno int64 `bson:"txn-revno,omitempty"`
Insert bson.ObjectId `bson:"txn-insert,omitempty"`
Remove bson.ObjectId `bson:"txn-remove,omitempty"`
}
type stashState string
const (
stashNew stashState = ""
stashInserting stashState = "inserting"
)
var txnFields = bson.D{{"txn-queue", 1}, {"txn-revno", 1}, {"txn-remove", 1}, {"txn-insert", 1}}
var errPreReqs = fmt.Errorf("transaction has pre-requisites and force is false")
// prepare injects t's id onto txn-queue for all affected documents
// and collects the current txn-queue and txn-revno values during
// the process. If the prepared txn-queue indicates that there are
// pre-requisite transactions to be applied and the force parameter
// is false, errPreReqs will be returned. Otherwise, the current
// tip revision numbers for all the documents are returned.
func (f *flusher) prepare(t *transaction, force bool) (revnos []int64, err error) {
if t.State != tpreparing {
return f.rescan(t, force)
}
f.debugf("Preparing %s", t)
// dkeys being sorted means stable iteration across all runners. This
// isn't strictly required, but reduces the chances of cycles.
dkeys := t.docKeys()
revno := make(map[docKey]int64)
info := txnInfo{}
tt := tokenFor(t)
NextDoc:
for _, dkey := range dkeys {
change := mgo.Change{
Update: bson.D{{"$addToSet", bson.D{{"txn-queue", tt}}}},
ReturnNew: true,
}
c := f.tc.Database.C(dkey.C)
cquery := c.FindId(dkey.Id).Select(txnFields)
RetryDoc:
change.Upsert = false
chaos("")
if _, err := cquery.Apply(change, &info); err == nil {
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
continue NextDoc
} else {
// Handle remove in progress before preparing it.
if err := f.loadAndApply(info.Remove); err != nil {
return nil, err
}
goto RetryDoc
}
} else if err != mgo.ErrNotFound {
return nil, err
}
// Document missing. Use stash collection.
change.Upsert = true
chaos("")
_, err := f.sc.FindId(dkey).Apply(change, &info)
if err != nil {
return nil, err
}
if info.Insert != "" {
// Handle insert in progress before preparing it.
if err := f.loadAndApply(info.Insert); err != nil {
return nil, err
}
goto RetryDoc
}
// Must confirm stash is still in use and is the same one
// prepared, since applying a remove overwrites the stash.
docFound := false
stashFound := false
if err = c.FindId(dkey.Id).Select(txnFields).One(&info); err == nil {
docFound = true
} else if err != mgo.ErrNotFound {
return nil, err
} else if err = f.sc.FindId(dkey).One(&info); err == nil {
stashFound = true
if info.Revno == 0 {
// Missing revno in the stash only happens when it
// has been upserted, in which case it defaults to -1.
// Txn-inserted documents get revno -1 while in the stash
// for the first time, and -revno-1 == 2 when they go live.
info.Revno = -1
}
} else if err != mgo.ErrNotFound {
return nil, err
}
if docFound && info.Remove == "" || stashFound && info.Insert == "" {
for _, dtt := range info.Queue {
if dtt != tt {
continue
}
// Found tt properly prepared.
if stashFound {
f.debugf("[B] Prepared document %v on stash with revno %d and queue: %v", dkey, info.Revno, info.Queue)
} else {
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
}
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
continue NextDoc
}
}
// The stash wasn't valid and tt got overwriten. Try again.
f.unstashToken(tt, dkey)
goto RetryDoc
}
// Save the prepared nonce onto t.
nonce := tt.nonce()
qdoc := bson.D{{"_id", t.Id}, {"s", tpreparing}}
udoc := bson.D{{"$set", bson.D{{"s", tprepared}, {"n", nonce}}}}
chaos("set-prepared")
err = f.tc.Update(qdoc, udoc)
if err == nil {
t.State = tprepared
t.Nonce = nonce
} else if err == mgo.ErrNotFound {
f.debugf("Can't save nonce of %s: LOST RACE", tt)
if err := f.reload(t); err != nil {
return nil, err
} else if t.State == tpreparing {
panic("can't save nonce yet transaction is still preparing")
} else if t.State != tprepared {
return t.Revnos, nil
}
tt = t.token()
} else if err != nil {
return nil, err
}
prereqs, found := f.hasPreReqs(tt, dkeys)
if !found {
// Must only happen when reloading above.
return f.rescan(t, force)
} else if prereqs && !force {
f.debugf("Prepared queue with %s [has prereqs & not forced].", tt)
return nil, errPreReqs
}
revnos = assembledRevnos(t.Ops, revno)
if !prereqs {
f.debugf("Prepared queue with %s [no prereqs]. Revnos: %v", tt, revnos)
} else {
f.debugf("Prepared queue with %s [forced] Revnos: %v", tt, revnos)
}
return revnos, nil
}
func (f *flusher) unstashToken(tt token, dkey docKey) error {
qdoc := bson.D{{"_id", dkey}, {"txn-queue", tt}}
udoc := bson.D{{"$pull", bson.D{{"txn-queue", tt}}}}
chaos("")
if err := f.sc.Update(qdoc, udoc); err == nil {
chaos("")
err = f.sc.Remove(bson.D{{"_id", dkey}, {"txn-queue", bson.D{}}})
} else if err != mgo.ErrNotFound {
return err
}
return nil
}
func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error) {
f.debugf("Rescanning %s", t)
if t.State != tprepared {
panic(fmt.Errorf("rescanning transaction in invalid state: %q", t.State))
}
// dkeys being sorted means stable iteration across all
// runners. This isn't strictly required, but reduces the chances
// of cycles.
dkeys := t.docKeys()
tt := t.token()
if !force {
prereqs, found := f.hasPreReqs(tt, dkeys)
if found && prereqs {
// Its state is already known.
return nil, errPreReqs
}
}
revno := make(map[docKey]int64)
info := txnInfo{}
for _, dkey := range dkeys {
const retries = 3
retry := -1
RetryDoc:
retry++
c := f.tc.Database.C(dkey.C)
if err := c.FindId(dkey.Id).Select(txnFields).One(&info); err == mgo.ErrNotFound {
// Document is missing. Look in stash.
chaos("")
if err := f.sc.FindId(dkey).One(&info); err == mgo.ErrNotFound {
// Stash also doesn't exist. Maybe someone applied it.
if err := f.reload(t); err != nil {
return nil, err
} else if t.State != tprepared {
return t.Revnos, err
}
// Not applying either.
if retry < retries {
// Retry since there might be an insert/remove race.
goto RetryDoc
}
// Neither the doc nor the stash seem to exist.
return nil, fmt.Errorf("cannot find document %v for applying transaction %s", dkey, t)
} else if err != nil {
return nil, err
}
// Stash found.
if info.Insert != "" {
// Handle insert in progress before assuming ordering is good.
if err := f.loadAndApply(info.Insert); err != nil {
return nil, err
}
goto RetryDoc
}
if info.Revno == 0 {
// Missing revno in the stash means -1.
info.Revno = -1
}
} else if err != nil {
return nil, err
} else if info.Remove != "" {
// Handle remove in progress before assuming ordering is good.
if err := f.loadAndApply(info.Remove); err != nil {
return nil, err
}
goto RetryDoc
}
revno[dkey] = info.Revno
found := false
for _, id := range info.Queue {
if id == tt {
found = true
break
}
}
f.queue[dkey] = info.Queue
if !found {
// Rescanned transaction id was not in the queue. This could mean one
// of three things:
// 1) The transaction was applied and popped by someone else. This is
// the common case.
// 2) We've read an out-of-date queue from the stash. This can happen
// when someone else was paused for a long while preparing another
// transaction for this document, and improperly upserted to the
// stash when unpaused (after someone else inserted the document).
// This is rare but possible.
// 3) There's an actual bug somewhere, or outside interference. Worst
// possible case.
f.debugf("Rescanned document %v misses %s in queue: %v", dkey, tt, info.Queue)
err := f.reload(t)
if t.State == tpreparing || t.State == tprepared {
if retry < retries {
// Case 2.
goto RetryDoc
}
// Case 3.
return nil, fmt.Errorf("cannot find transaction %s in queue for document %v", t, dkey)
}
// Case 1.
return t.Revnos, err
}
}
prereqs, found := f.hasPreReqs(tt, dkeys)
if !found {
panic("rescanning loop guarantees that this can't happen")
} else if prereqs && !force {
f.debugf("Rescanned queue with %s: has prereqs, not forced", tt)
return nil, errPreReqs
}
revnos = assembledRevnos(t.Ops, revno)
if !prereqs {
f.debugf("Rescanned queue with %s: no prereqs, revnos: %v", tt, revnos)
} else {
f.debugf("Rescanned queue with %s: has prereqs, forced, revnos: %v", tt, revnos)
}
return revnos, nil
}
func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 {
revnos := make([]int64, len(ops))
for i, op := range ops {
dkey := op.docKey()
revnos[i] = revno[dkey]
drevno := revno[dkey]
switch {
case op.Insert != nil && drevno < 0:
revno[dkey] = -drevno + 1
case op.Update != nil && drevno >= 0:
revno[dkey] = drevno + 1
case op.Remove && drevno >= 0:
revno[dkey] = -drevno - 1
}
}
return revnos
}
func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) {
found = true
NextDoc:
for _, dkey := range dkeys {
for _, dtt := range f.queue[dkey] {
if dtt == tt {
continue NextDoc
} else if dtt.id() != tt.id() {
prereqs = true
}
}
found = false
}
return
}
func (f *flusher) reload(t *transaction) error {
var newt transaction
query := f.tc.FindId(t.Id)
query.Select(bson.D{{"s", 1}, {"n", 1}, {"r", 1}})
if err := query.One(&newt); err != nil {
return fmt.Errorf("failed to reload transaction: %v", err)
}
t.State = newt.State
t.Nonce = newt.Nonce
t.Revnos = newt.Revnos
f.debugf("Reloaded %s: %q", t, t.State)
return nil
}
func (f *flusher) loadAndApply(id bson.ObjectId) error {
t, err := f.load(id)
if err != nil {
return err
}
return f.advance(t, nil, true)
}
// assert verifies that all assertions in t match the content that t
// will be applied upon. If an assertion fails, the transaction state
// is changed to aborted.
func (f *flusher) assert(t *transaction, revnos []int64, pull map[bson.ObjectId]*transaction) error {
f.debugf("Asserting %s with revnos %v", t, revnos)
if t.State != tprepared {
panic(fmt.Errorf("asserting transaction in invalid state: %q", t.State))
}
qdoc := make(bson.D, 3)
revno := make(map[docKey]int64)
for i, op := range t.Ops {
dkey := op.docKey()
if _, ok := revno[dkey]; !ok {
revno[dkey] = revnos[i]
}
if op.Assert == nil {
continue
}
if op.Assert == DocMissing {
if revnos[i] >= 0 {
return f.abortOrReload(t, revnos, pull)
}
continue
}
if op.Insert != nil {
return fmt.Errorf("Insert can only Assert txn.DocMissing", op.Assert)
}
// if revnos[i] < 0 { abort }?
qdoc = append(qdoc[:0], bson.DocElem{"_id", op.Id})
if op.Assert != DocMissing {
var revnoq interface{}
if n := revno[dkey]; n == 0 {
revnoq = bson.D{{"$exists", false}}
} else {
revnoq = n
}
// XXX Add tt to the query here, once we're sure it's all working.
// Not having it increases the chances of breaking on bad logic.
qdoc = append(qdoc, bson.DocElem{"txn-revno", revnoq})
if op.Assert != DocExists {
qdoc = append(qdoc, bson.DocElem{"$or", []interface{}{op.Assert}})
}
}
c := f.tc.Database.C(op.C)
if err := c.Find(qdoc).Select(bson.D{{"_id", 1}}).One(nil); err == mgo.ErrNotFound {
// Assertion failed or someone else started applying.
return f.abortOrReload(t, revnos, pull)
} else if err != nil {
return err
}
}
f.debugf("Asserting %s succeeded", t)
return nil
}
func (f *flusher) abortOrReload(t *transaction, revnos []int64, pull map[bson.ObjectId]*transaction) (err error) {
f.debugf("Aborting or reloading %s (was %q)", t, t.State)
if t.State == tprepared {
qdoc := bson.D{{"_id", t.Id}, {"s", tprepared}}
udoc := bson.D{{"$set", bson.D{{"s", taborting}}}}
chaos("set-aborting")
if err = f.tc.Update(qdoc, udoc); err == nil {
t.State = taborting
} else if err == mgo.ErrNotFound {
if err = f.reload(t); err != nil || t.State != taborting {
f.debugf("Won't abort %s. Reloaded state: %q", t, t.State)
return err
}
} else {
return err
}
} else if t.State != taborting {
panic(fmt.Errorf("aborting transaction in invalid state: %q", t.State))
}
if len(revnos) > 0 {
if pull == nil {
pull = map[bson.ObjectId]*transaction{t.Id: t}
}
seen := make(map[docKey]bool)
for i, op := range t.Ops {
dkey := op.docKey()
if seen[op.docKey()] {
continue
}
seen[dkey] = true
pullAll := tokensToPull(f.queue[dkey], pull, "")
if len(pullAll) == 0 {
continue
}
udoc := bson.D{{"$pullAll", bson.D{{"txn-queue", pullAll}}}}
chaos("")
if revnos[i] < 0 {
err = f.sc.UpdateId(dkey, udoc)
} else {
c := f.tc.Database.C(dkey.C)
err = c.UpdateId(dkey.Id, udoc)
}
if err != nil && err != mgo.ErrNotFound {
return err
}
}
}
udoc := bson.D{{"$set", bson.D{{"s", taborted}}}}
chaos("set-aborted")
if err := f.tc.UpdateId(t.Id, udoc); err != nil && err != mgo.ErrNotFound {
return err
}
t.State = taborted
f.debugf("Aborted %s", t)
return nil
}
func (f *flusher) checkpoint(t *transaction, revnos []int64) error {
var debugRevnos map[docKey][]int64
if debugEnabled {
debugRevnos = make(map[docKey][]int64)
for i, op := range t.Ops {
dkey := op.docKey()
debugRevnos[dkey] = append(debugRevnos[dkey], revnos[i])
}
f.debugf("Ready to apply %s. Saving revnos %v", t, debugRevnos)
}
// Save in t the txn-revno values the transaction must run on.
qdoc := bson.D{{"_id", t.Id}, {"s", tprepared}}
udoc := bson.D{{"$set", bson.D{{"s", tapplying}, {"r", revnos}}}}
chaos("set-applying")
err := f.tc.Update(qdoc, udoc)
if err == nil {
t.State = tapplying
t.Revnos = revnos
f.debugf("Ready to apply %s. Saving revnos %v: DONE", t, debugRevnos)
} else if err == mgo.ErrNotFound {
f.debugf("Ready to apply %s. Saving revnos %v: LOST RACE", t, debugRevnos)
return f.reload(t)
}
return nil
}
func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) error {
f.debugf("Applying transaction %s", t)
if t.State != tapplying {
panic(fmt.Errorf("applying transaction in invalid state: %q", t.State))
}
if pull == nil {
pull = map[bson.ObjectId]*transaction{t.Id: t}
}
logRevnos := append([]int64(nil), t.Revnos...)
logDoc := bson.D{{"_id", t.Id}}
tt := tokenFor(t)
for i := range t.Ops {
op := &t.Ops[i]
dkey := op.docKey()
dqueue := f.queue[dkey]
revno := t.Revnos[i]
var opName string
if debugEnabled {
opName = op.name()
f.debugf("Applying %s op %d (%s) on %v with txn-revno %d", t, i, opName, dkey, revno)
}
c := f.tc.Database.C(op.C)
qdoc := bson.D{{"_id", dkey.Id}, {"txn-revno", revno}, {"txn-queue", tt}}
if op.Insert != nil {
qdoc[0].Value = dkey
if revno == -1 {
qdoc[1].Value = bson.D{{"$exists", false}}
}
} else if revno == 0 {
// There's no document with revno 0. The only way to see it is
// when an existent document participates in a transaction the
// first time. Txn-inserted documents get revno -1 while in the
// stash for the first time, and -revno-1 == 2 when they go live.
qdoc[1].Value = bson.D{{"$exists", false}}
}
pullAll := tokensToPull(dqueue, pull, tt)
var d bson.D
var outcome string
var err error
switch {
case op.Update != nil:
if revno < 0 {
err = mgo.ErrNotFound
f.debugf("Won't try to apply update op; negative revision means the document is missing or stashed")
} else {
newRevno := revno + 1
logRevnos[i] = newRevno
if d, err = objToDoc(op.Update); err != nil {
return err
}
if d, err = addToDoc(d, "$pullAll", bson.D{{"txn-queue", pullAll}}); err != nil {
return err
}
if d, err = addToDoc(d, "$set", bson.D{{"txn-revno", newRevno}}); err != nil {
return err
}
chaos("")
err = c.Update(qdoc, d)
}
case op.Remove:
if revno < 0 {
err = mgo.ErrNotFound
} else {
newRevno := -revno - 1
logRevnos[i] = newRevno
nonce := newNonce()
stash := txnInfo{}
change := mgo.Change{
Update: bson.D{{"$push", bson.D{{"n", nonce}}}},
Upsert: true,
ReturnNew: true,
}
if _, err = f.sc.FindId(dkey).Apply(change, &stash); err != nil {
return err
}
change = mgo.Change{
Update: bson.D{{"$set", bson.D{{"txn-remove", t.Id}}}},
ReturnNew: true,
}
var info txnInfo
if _, err = c.Find(qdoc).Apply(change, &info); err == nil {
// The document still exists so the stash previously
// observed was either out of date or necessarily
// contained the token being applied.
f.debugf("Marked document %v to be removed on revno %d with queue: %v", dkey, info.Revno, info.Queue)
updated := false
if !hasToken(stash.Queue, tt) {
var set, unset bson.D
if revno == 0 {
// Missing revno in stash means -1.
set = bson.D{{"txn-queue", info.Queue}}
unset = bson.D{{"n", 1}, {"txn-revno", 1}}
} else {
set = bson.D{{"txn-queue", info.Queue}, {"txn-revno", newRevno}}
unset = bson.D{{"n", 1}}
}
qdoc := bson.D{{"_id", dkey}, {"n", nonce}}
udoc := bson.D{{"$set", set}, {"$unset", unset}}
if err = f.sc.Update(qdoc, udoc); err == nil {
updated = true
} else if err != mgo.ErrNotFound {
return err
}
}
if updated {
f.debugf("Updated stash for document %v with revno %d and queue: %v", dkey, newRevno, info.Queue)
} else {
f.debugf("Stash for document %v was up-to-date", dkey)
}
err = c.Remove(qdoc)
}
}
case op.Insert != nil:
if revno >= 0 {
err = mgo.ErrNotFound
} else {
newRevno := -revno + 1
logRevnos[i] = newRevno
if d, err = objToDoc(op.Insert); err != nil {
return err
}
change := mgo.Change{
Update: bson.D{{"$set", bson.D{{"txn-insert", t.Id}}}},
ReturnNew: true,
}
chaos("")
var info txnInfo
if _, err = f.sc.Find(qdoc).Apply(change, &info); err == nil {
f.debugf("Stash for document %v has revno %d and queue: %v", dkey, info.Revno, info.Queue)
d = setInDoc(d, bson.D{{"_id", op.Id}, {"txn-revno", newRevno}, {"txn-queue", info.Queue}})
// Unlikely yet unfortunate race in here if this gets seriously
// delayed. If someone inserts+removes meanwhile, this will
// reinsert, and there's no way to avoid that while keeping the
// collection clean or compromising sharding. applyOps can solve
// the former, but it can't shard (SERVER-1439).
chaos("insert")
err = c.Insert(d)
if err == nil || mgo.IsDup(err) {
if err == nil {
f.debugf("New document %v inserted with revno %d and queue: %v", dkey, info.Revno, info.Queue)
} else {
f.debugf("Document %v already existed", dkey)
}
chaos("")
if err = f.sc.Remove(qdoc); err == nil {
f.debugf("Stash for document %v removed", dkey)
}
}
}
}
case op.Assert != nil:
// Pure assertion. No changes to apply.
}
if err == nil {
outcome = "DONE"
} else if err == mgo.ErrNotFound || mgo.IsDup(err) {
outcome = "MISS"
err = nil
} else {
outcome = err.Error()
}
if debugEnabled {
f.debugf("Applying %s op %d (%s) on %v with txn-revno %d: %s", t, i, opName, dkey, revno, outcome)
}
if err != nil {
return err
}
if f.lc != nil && op.isChange() {
// Add change to the log document.
var dr bson.D
for li := range logDoc {
elem := &logDoc[li]
if elem.Name == op.C {
dr = elem.Value.(bson.D)
break
}
}
if dr == nil {
logDoc = append(logDoc, bson.DocElem{op.C, bson.D{{"d", []interface{}{}}, {"r", []int64{}}}})
dr = logDoc[len(logDoc)-1].Value.(bson.D)
}
dr[0].Value = append(dr[0].Value.([]interface{}), op.Id)
dr[1].Value = append(dr[1].Value.([]int64), logRevnos[i])
}
}
t.State = tapplied
if f.lc != nil {
// Insert log document into the changelog collection.
f.debugf("Inserting %s into change log", t)
err := f.lc.Insert(logDoc)
if err != nil && !mgo.IsDup(err) {
return err
}
}
// It's been applied, so errors are ignored here. It's fine for someone
// else to win the race and mark it as applied, and it's also fine for
// it to remain pending until a later point when someone will perceive
// it has been applied and mark it at such.
f.debugf("Marking %s as applied", t)
chaos("set-applied")
f.tc.Update(bson.D{{"_id", t.Id}, {"s", tapplying}}, bson.D{{"$set", bson.D{{"s", tapplied}}}})
return nil
}
func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token {
var result []token
for j := len(dqueue) - 1; j >= 0; j-- {
dtt := dqueue[j]
if dtt == dontPull {
continue
}
if _, ok := pull[dtt.id()]; ok {
// It was handled before and this is a leftover invalid
// nonce in the queue. Cherry-pick it out.
result = append(result, dtt)
}
}
return result
}
func objToDoc(obj interface{}) (d bson.D, err error) {
data, err := bson.Marshal(obj)
if err != nil {
return nil, err
}
err = bson.Unmarshal(data, &d)
if err != nil {
return nil, err
}
return d, err
}
func addToDoc(doc bson.D, key string, add bson.D) (bson.D, error) {
for i := range doc {
elem := &doc[i]
if elem.Name != key {
continue
}
if old, ok := elem.Value.(bson.D); ok {
elem.Value = append(old, add...)
return doc, nil
} else {
return nil, fmt.Errorf("invalid %q value in change document: %#v", key, elem.Value)
}
}
return append(doc, bson.DocElem{key, add}), nil
}
func setInDoc(doc bson.D, set bson.D) bson.D {
dlen := len(doc)
NextS:
for s := range set {
sname := set[s].Name
for d := 0; d < dlen; d++ {
if doc[d].Name == sname {
doc[d].Value = set[s].Value
continue NextS
}
}
doc = append(doc, set[s])
}
return doc
}
func hasToken(tokens []token, tt token) bool {
for _, ttt := range tokens {
if ttt == tt {
return true
}
}
return false
}
func (f *flusher) debugf(format string, args ...interface{}) {
if !debugEnabled {
return
}
debugf(f.debugId+format, args...)
}