database: add (Acquire|Release)Lock dbutils

This commit is contained in:
Jimmy Zelinskie 2018-11-27 13:59:07 -05:00
parent 504f0f3af3
commit 4fbeb9ced5
3 changed files with 56 additions and 52 deletions

View File

@ -15,6 +15,8 @@
package database package database
import ( import (
"time"
"github.com/deckarep/golang-set" "github.com/deckarep/golang-set"
) )
@ -304,3 +306,47 @@ func MergeLayers(l *Layer, new *Layer) *Layer {
return l return l
} }
// AcquireLock acquires a named global lock for a duration.
//
// If renewal is true, the lock is extended as long as the same owner is
// attempting to renew the lock.
func AcquireLock(datastore Datastore, name, owner string, duration time.Duration, renewal bool) (success bool, expiration time.Time) {
// any error will cause the function to catch the error and return false.
tx, err := datastore.Begin()
if err != nil {
return false, time.Time{}
}
defer tx.Rollback()
locked, t, err := tx.Lock(name, owner, duration, renewal)
if err != nil {
return false, time.Time{}
}
if locked {
if err := tx.Commit(); err != nil {
return false, time.Time{}
}
}
return locked, t
}
// ReleaseLock releases a named global lock.
func ReleaseLock(datastore Datastore, name, owner string) {
tx, err := datastore.Begin()
if err != nil {
return
}
defer tx.Rollback()
if err := tx.Unlock(name, owner); err != nil {
return
}
if err := tx.Commit(); err != nil {
return
}
}

View File

@ -102,7 +102,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop
if interrupted { if interrupted {
running = false running = false
} }
unlock(datastore, notification.Name, whoAmI) database.ReleaseLock(datastore, notification.Name, whoAmI)
done <- true done <- true
}() }()
@ -113,7 +113,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop
case <-done: case <-done:
break outer break outer
case <-time.After(notifierLockRefreshDuration): case <-time.After(notifierLockRefreshDuration):
lock(datastore, notification.Name, whoAmI, notifierLockDuration, true) database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, true)
case <-stopper.Chan(): case <-stopper.Chan():
running = false running = false
break break
@ -141,7 +141,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA
} }
// Lock the notification. // Lock the notification.
if hasLock, _ := lock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock { if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock {
log.WithField(logNotiName, notification.Name).Info("found and locked a notification") log.WithField(logNotiName, notification.Name).Info("found and locked a notification")
return &notification return &notification
} }
@ -208,44 +208,3 @@ func markNotificationAsRead(datastore database.Datastore, name string) error {
} }
return tx.Commit() return tx.Commit()
} }
// unlock removes a lock with provided name, owner. Internally, it handles
// database transaction and catches error.
func unlock(datastore database.Datastore, name, owner string) {
tx, err := datastore.Begin()
if err != nil {
return
}
defer tx.Rollback()
if err := tx.Unlock(name, owner); err != nil {
return
}
if err := tx.Commit(); err != nil {
return
}
}
func lock(datastore database.Datastore, name string, owner string, duration time.Duration, renew bool) (bool, time.Time) {
// any error will cause the function to catch the error and return false.
tx, err := datastore.Begin()
if err != nil {
return false, time.Time{}
}
defer tx.Rollback()
locked, t, err := tx.Lock(name, owner, duration, renew)
if err != nil {
return false, time.Time{}
}
if locked {
if err := tx.Commit(); err != nil {
return false, time.Time{}
}
}
return locked, t
}

View File

@ -91,16 +91,14 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper
log.WithField("lock identifier", whoAmI).Info("updater service started") log.WithField("lock identifier", whoAmI).Info("updater service started")
for { for {
var stop bool
// Determine if this is the first update and define the next update time. // Determine if this is the first update and define the next update time.
// The next update time is (last update time + interval) or now if this is the first update. // The next update time is (last update time + interval) or now if this is the first update.
nextUpdate := time.Now().UTC() nextUpdate := time.Now().UTC()
lastUpdate, firstUpdate, err := GetLastUpdateTime(datastore) lastUpdate, isFirstUpdate, err := GetLastUpdateTime(datastore)
if err != nil { if err != nil {
log.WithError(err).Error("an error occurred while getting the last update time") log.WithError(err).Error("an error occurred while getting the last update time")
nextUpdate = nextUpdate.Add(config.Interval) nextUpdate = nextUpdate.Add(config.Interval)
} else if !firstUpdate { } else if !isFirstUpdate {
nextUpdate = lastUpdate.Add(config.Interval) nextUpdate = lastUpdate.Add(config.Interval)
} }
@ -108,29 +106,30 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper
if nextUpdate.Before(time.Now().UTC()) { if nextUpdate.Before(time.Now().UTC()) {
// Attempt to get a lock on the the update. // Attempt to get a lock on the the update.
log.Debug("attempting to obtain update lock") log.Debug("attempting to obtain update lock")
hasLock, hasLockUntil := lock(datastore, updaterLockName, whoAmI, updaterLockDuration, false) hasLock, hasLockUntil := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false)
if hasLock { if hasLock {
// Launch update in a new go routine. // Launch update in a new go routine.
doneC := make(chan bool, 1) doneC := make(chan bool, 1)
go func() { go func() {
update(datastore, firstUpdate) update(datastore, isFirstUpdate)
doneC <- true doneC <- true
}() }()
var stop bool
for done := false; !done && !stop; { for done := false; !done && !stop; {
select { select {
case <-doneC: case <-doneC:
done = true done = true
case <-time.After(updaterLockRefreshDuration): case <-time.After(updaterLockRefreshDuration):
// Refresh the lock until the update is done. // Refresh the lock until the update is done.
lock(datastore, updaterLockName, whoAmI, updaterLockDuration, true) database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, true)
case <-st.Chan(): case <-st.Chan():
stop = true stop = true
} }
} }
// Unlock the updater. // Unlock the updater.
unlock(datastore, updaterLockName, whoAmI) database.ReleaseLock(datastore, updaterLockName, whoAmI)
if stop { if stop {
break break