diff --git a/database/dbutil.go b/database/dbutil.go index 474c6a9e..31d48503 100644 --- a/database/dbutil.go +++ b/database/dbutil.go @@ -15,6 +15,8 @@ package database import ( + "time" + "github.com/deckarep/golang-set" ) @@ -304,3 +306,47 @@ func MergeLayers(l *Layer, new *Layer) *Layer { return l } + +// AcquireLock acquires a named global lock for a duration. +// +// If renewal is true, the lock is extended as long as the same owner is +// attempting to renew the lock. +func AcquireLock(datastore Datastore, name, owner string, duration time.Duration, renewal bool) (success bool, expiration time.Time) { + // any error will cause the function to catch the error and return false. + tx, err := datastore.Begin() + if err != nil { + return false, time.Time{} + } + + defer tx.Rollback() + + locked, t, err := tx.Lock(name, owner, duration, renewal) + if err != nil { + return false, time.Time{} + } + + if locked { + if err := tx.Commit(); err != nil { + return false, time.Time{} + } + } + + return locked, t +} + +// ReleaseLock releases a named global lock. +func ReleaseLock(datastore Datastore, name, owner string) { + tx, err := datastore.Begin() + if err != nil { + return + } + + defer tx.Rollback() + + if err := tx.Unlock(name, owner); err != nil { + return + } + if err := tx.Commit(); err != nil { + return + } +} diff --git a/notifier.go b/notifier.go index 9f78977f..74cf91c4 100644 --- a/notifier.go +++ b/notifier.go @@ -102,7 +102,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop if interrupted { running = false } - unlock(datastore, notification.Name, whoAmI) + database.ReleaseLock(datastore, notification.Name, whoAmI) done <- true }() @@ -113,7 +113,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop case <-done: break outer case <-time.After(notifierLockRefreshDuration): - lock(datastore, notification.Name, whoAmI, notifierLockDuration, true) + database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, true) case <-stopper.Chan(): running = false break @@ -141,7 +141,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA } // Lock the notification. - if hasLock, _ := lock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock { + if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock { log.WithField(logNotiName, notification.Name).Info("found and locked a notification") return ¬ification } @@ -208,44 +208,3 @@ func markNotificationAsRead(datastore database.Datastore, name string) error { } return tx.Commit() } - -// unlock removes a lock with provided name, owner. Internally, it handles -// database transaction and catches error. -func unlock(datastore database.Datastore, name, owner string) { - tx, err := datastore.Begin() - if err != nil { - return - } - - defer tx.Rollback() - - if err := tx.Unlock(name, owner); err != nil { - return - } - if err := tx.Commit(); err != nil { - return - } -} - -func lock(datastore database.Datastore, name string, owner string, duration time.Duration, renew bool) (bool, time.Time) { - // any error will cause the function to catch the error and return false. - tx, err := datastore.Begin() - if err != nil { - return false, time.Time{} - } - - defer tx.Rollback() - - locked, t, err := tx.Lock(name, owner, duration, renew) - if err != nil { - return false, time.Time{} - } - - if locked { - if err := tx.Commit(); err != nil { - return false, time.Time{} - } - } - - return locked, t -} diff --git a/updater.go b/updater.go index bd092a9b..1ab7700d 100644 --- a/updater.go +++ b/updater.go @@ -91,16 +91,14 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper log.WithField("lock identifier", whoAmI).Info("updater service started") for { - var stop bool - // Determine if this is the first update and define the next update time. // The next update time is (last update time + interval) or now if this is the first update. nextUpdate := time.Now().UTC() - lastUpdate, firstUpdate, err := GetLastUpdateTime(datastore) + lastUpdate, isFirstUpdate, err := GetLastUpdateTime(datastore) if err != nil { log.WithError(err).Error("an error occurred while getting the last update time") nextUpdate = nextUpdate.Add(config.Interval) - } else if !firstUpdate { + } else if !isFirstUpdate { nextUpdate = lastUpdate.Add(config.Interval) } @@ -108,29 +106,30 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper if nextUpdate.Before(time.Now().UTC()) { // Attempt to get a lock on the the update. log.Debug("attempting to obtain update lock") - hasLock, hasLockUntil := lock(datastore, updaterLockName, whoAmI, updaterLockDuration, false) + hasLock, hasLockUntil := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false) if hasLock { // Launch update in a new go routine. doneC := make(chan bool, 1) go func() { - update(datastore, firstUpdate) + update(datastore, isFirstUpdate) doneC <- true }() + var stop bool for done := false; !done && !stop; { select { case <-doneC: done = true case <-time.After(updaterLockRefreshDuration): // Refresh the lock until the update is done. - lock(datastore, updaterLockName, whoAmI, updaterLockDuration, true) + database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, true) case <-st.Chan(): stop = true } } // Unlock the updater. - unlock(datastore, updaterLockName, whoAmI) + database.ReleaseLock(datastore, updaterLockName, whoAmI) if stop { break