database: reduce pruneLocks/Unlock transaction.
pruneLocks could create deadlocked transactions on PostgreSQL if multiple locks expired and pruneLocks is called by multiple instances. Also adds some logging.
This commit is contained in:
parent
8bb6c50f80
commit
b0142e1982
@ -68,20 +68,32 @@ func Lock(name string, duration time.Duration, owner string) (bool, time.Time) {
|
|||||||
func Unlock(name, owner string) {
|
func Unlock(name, owner string) {
|
||||||
pruneLocks()
|
pruneLocks()
|
||||||
|
|
||||||
t := cayley.NewTransaction()
|
unlocked := 0
|
||||||
it, _ := cayley.StartPath(store, name).Has("locked", "locked").Has("locked_by", owner).Save("locked_until", "locked_until").BuildIterator().Optimize()
|
it, _ := cayley.StartPath(store, name).Has("locked", "locked").Has("locked_by", owner).Save("locked_until", "locked_until").BuildIterator().Optimize()
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
|
|
||||||
for cayley.RawNext(it) {
|
for cayley.RawNext(it) {
|
||||||
tags := make(map[string]graph.Value)
|
tags := make(map[string]graph.Value)
|
||||||
it.TagResults(tags)
|
it.TagResults(tags)
|
||||||
|
|
||||||
|
t := cayley.NewTransaction()
|
||||||
t.RemoveQuad(cayley.Quad(name, "locked", "locked", ""))
|
t.RemoveQuad(cayley.Quad(name, "locked", "locked", ""))
|
||||||
t.RemoveQuad(cayley.Quad(name, "locked_until", store.NameOf(tags["locked_until"]), ""))
|
t.RemoveQuad(cayley.Quad(name, "locked_until", store.NameOf(tags["locked_until"]), ""))
|
||||||
t.RemoveQuad(cayley.Quad(name, "locked_by", owner, ""))
|
t.RemoveQuad(cayley.Quad(name, "locked_by", owner, ""))
|
||||||
}
|
err := store.ApplyTransaction(t)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed transaction (Unlock): %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
store.ApplyTransaction(t)
|
unlocked++
|
||||||
|
}
|
||||||
|
if it.Err() != nil {
|
||||||
|
log.Errorf("failed query in Unlock: %s", it.Err())
|
||||||
|
}
|
||||||
|
if unlocked > 1 {
|
||||||
|
// We should never see this, it would mean that our database doesn't ensure quad uniqueness
|
||||||
|
// and that the entire lock system is jeopardized.
|
||||||
|
log.Errorf("found inconsistency in Unlock: matched %d times a locked named: %s", unlocked, name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// LockInfo returns the owner of a lock specified by its name and its
|
// LockInfo returns the owner of a lock specified by its name and its
|
||||||
@ -109,7 +121,6 @@ func pruneLocks() {
|
|||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// Delete every expired locks
|
// Delete every expired locks
|
||||||
tr := cayley.NewTransaction()
|
|
||||||
it, _ := cayley.StartPath(store, "locked").In("locked").Save("locked_until", "locked_until").Save("locked_by", "locked_by").BuildIterator().Optimize()
|
it, _ := cayley.StartPath(store, "locked").In("locked").Save("locked_until", "locked_until").Save("locked_by", "locked_by").BuildIterator().Optimize()
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
for cayley.RawNext(it) {
|
for cayley.RawNext(it) {
|
||||||
@ -123,12 +134,20 @@ func pruneLocks() {
|
|||||||
|
|
||||||
if now.Unix() > tt {
|
if now.Unix() > tt {
|
||||||
log.Debugf("Lock %s owned by %s has expired.", n, o)
|
log.Debugf("Lock %s owned by %s has expired.", n, o)
|
||||||
|
|
||||||
|
tr := cayley.NewTransaction()
|
||||||
tr.RemoveQuad(cayley.Quad(n, "locked", "locked", ""))
|
tr.RemoveQuad(cayley.Quad(n, "locked", "locked", ""))
|
||||||
tr.RemoveQuad(cayley.Quad(n, "locked_until", t, ""))
|
tr.RemoveQuad(cayley.Quad(n, "locked_until", t, ""))
|
||||||
tr.RemoveQuad(cayley.Quad(n, "locked_by", o, ""))
|
tr.RemoveQuad(cayley.Quad(n, "locked_by", o, ""))
|
||||||
|
err := store.ApplyTransaction(tr)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed transaction (pruneLocks): %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
store.ApplyTransaction(tr)
|
if it.Err() != nil {
|
||||||
|
log.Errorf("failed query in Unlock: %s", it.Err())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLockedNodes returns every nodes that are currently locked
|
// getLockedNodes returns every nodes that are currently locked
|
||||||
|
@ -35,7 +35,7 @@ func TestLock(t *testing.T) {
|
|||||||
l, _ = Lock("test1", time.Minute, "owner2")
|
l, _ = Lock("test1", time.Minute, "owner2")
|
||||||
assert.False(t, l)
|
assert.False(t, l)
|
||||||
// Renew the lock
|
// Renew the lock
|
||||||
l, _ = Lock("test1", time.Minute, "owner1")
|
l, _ = Lock("test1", 2*time.Minute, "owner1")
|
||||||
assert.True(t, l)
|
assert.True(t, l)
|
||||||
// Unlock and then relock by someone else
|
// Unlock and then relock by someone else
|
||||||
Unlock("test1", "owner1")
|
Unlock("test1", "owner1")
|
||||||
|
Loading…
Reference in New Issue
Block a user