diff --git a/database/database.go b/database/database.go index 43ff4f3c..c7026a58 100644 --- a/database/database.go +++ b/database/database.go @@ -1,4 +1,4 @@ -// Copyright 2017 clair authors +// Copyright 2019 clair authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -182,16 +182,22 @@ type Session interface { // FindKeyValue retrieves a value from the given key. FindKeyValue(key string) (value string, found bool, err error) - // Lock acquires or renews a lock in the database with the given name, owner - // and duration without blocking. After the specified duration, the lock - // expires if it hasn't already been unlocked in order to prevent a deadlock. + // AcquireLock acquires a brand new lock in the database with a given name + // for the given duration. // - // If the acquisition of a lock is not successful, expiration should be - // the time that existing lock expires. - Lock(name string, owner string, duration time.Duration, renew bool) (success bool, expiration time.Time, err error) + // A lock can only have one owner. + // This method should NOT block until a lock is acquired. + AcquireLock(name, owner string, duration time.Duration) (acquired bool, expiration time.Time, err error) - // Unlock releases an existing Lock. - Unlock(name, owner string) error + // ExtendLock extends an existing lock such that the lock will expire at the + // current time plus the provided duration. + // + // This method should return immediately with an error if the lock does not + // exist. + ExtendLock(name, owner string, duration time.Duration) (extended bool, expiration time.Time, err error) + + // ReleaseLock releases an existing lock. + ReleaseLock(name, owner string) error } // Datastore represents a persistent data store diff --git a/database/dbutil.go b/database/dbutil.go index dece4027..e7c85c8f 100644 --- a/database/dbutil.go +++ b/database/dbutil.go @@ -322,19 +322,14 @@ func MergeLayers(l *Layer, new *Layer) *Layer { } // AcquireLock acquires a named global lock for a duration. -// -// If renewal is true, the lock is extended as long as the same owner is -// attempting to renew the lock. -func AcquireLock(datastore Datastore, name, owner string, duration time.Duration, renewal bool) (success bool, expiration time.Time) { - // any error will cause the function to catch the error and return false. +func AcquireLock(datastore Datastore, name, owner string, duration time.Duration) (acquired bool, expiration time.Time) { tx, err := datastore.Begin() if err != nil { return false, time.Time{} } - defer tx.Rollback() - locked, t, err := tx.Lock(name, owner, duration, renewal) + locked, t, err := tx.AcquireLock(name, owner, duration) if err != nil { return false, time.Time{} } @@ -348,16 +343,38 @@ func AcquireLock(datastore Datastore, name, owner string, duration time.Duration return locked, t } +// ExtendLock extends the duration of an existing global lock for the given +// duration. +func ExtendLock(ds Datastore, name, whoami string, desiredLockDuration time.Duration) (extended bool, expiration time.Time) { + tx, err := ds.Begin() + if err != nil { + return false, time.Time{} + } + defer tx.Rollback() + + locked, expiration, err := tx.ExtendLock(name, whoami, desiredLockDuration) + if err != nil { + return false, time.Time{} + } + + if locked { + if err := tx.Commit(); err == nil { + return + } + } + + return false, time.Time{} +} + // ReleaseLock releases a named global lock. func ReleaseLock(datastore Datastore, name, owner string) { tx, err := datastore.Begin() if err != nil { return } - defer tx.Rollback() - if err := tx.Unlock(name, owner); err != nil { + if err := tx.ReleaseLock(name, owner); err != nil { return } if err := tx.Commit(); err != nil { diff --git a/database/mock.go b/database/mock.go index 2c909fdc..0d3e6edf 100644 --- a/database/mock.go +++ b/database/mock.go @@ -46,8 +46,9 @@ type MockSession struct { FctDeleteNotification func(name string) error FctUpdateKeyValue func(key, value string) error FctFindKeyValue func(key string) (string, bool, error) - FctLock func(name string, owner string, duration time.Duration, renew bool) (bool, time.Time, error) - FctUnlock func(name, owner string) error + FctAcquireLock func(name, owner string, duration time.Duration) (bool, time.Time, error) + FctExtendLock func(name, owner string, duration time.Duration) (bool, time.Time, error) + FctReleaseLock func(name, owner string) error } func (ms *MockSession) Commit() error { @@ -205,16 +206,23 @@ func (ms *MockSession) FindKeyValue(key string) (string, bool, error) { panic("required mock function not implemented") } -func (ms *MockSession) Lock(name string, owner string, duration time.Duration, renew bool) (bool, time.Time, error) { - if ms.FctLock != nil { - return ms.FctLock(name, owner, duration, renew) +func (ms *MockSession) AcquireLock(name, owner string, duration time.Duration) (bool, time.Time, error) { + if ms.FctAcquireLock != nil { + return ms.FctAcquireLock(name, owner, duration) } panic("required mock function not implemented") } -func (ms *MockSession) Unlock(name, owner string) error { - if ms.FctUnlock != nil { - return ms.FctUnlock(name, owner) +func (ms *MockSession) ExtendLock(name, owner string, duration time.Duration) (bool, time.Time, error) { + if ms.FctExtendLock != nil { + return ms.FctExtendLock(name, owner, duration) + } + panic("required mock function not implemented") +} + +func (ms *MockSession) ReleaseLock(name, owner string) error { + if ms.FctReleaseLock != nil { + return ms.FctReleaseLock(name, owner) } panic("required mock function not implemented") } diff --git a/database/pgsql/lock.go b/database/pgsql/lock.go index ddb916ce..5d9489c6 100644 --- a/database/pgsql/lock.go +++ b/database/pgsql/lock.go @@ -1,4 +1,4 @@ -// Copyright 2017 clair authors +// Copyright 2019 clair authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,71 +19,77 @@ import ( "time" log "github.com/sirupsen/logrus" - - "github.com/coreos/clair/pkg/commonerr" ) const ( - soiLock = `INSERT INTO lock(name, owner, until) VALUES ($1, $2, $3)` searchLock = `SELECT until FROM Lock WHERE name = $1` updateLock = `UPDATE Lock SET until = $3 WHERE name = $1 AND owner = $2` removeLock = `DELETE FROM Lock WHERE name = $1 AND owner = $2` removeLockExpired = `DELETE FROM LOCK WHERE until < CURRENT_TIMESTAMP` + + soiLock = `WITH new_lock AS ( + INSERT INTO lock (name, owner, until) + VALUES ( $1, $2, $3) + WHERE NOT EXISTS (SELECT id FROM lock WHERE name = $1) + RETURNING owner, until + ) + SELECT * FROM new_lock + UNION + SELECT owner, until FROM lock WHERE name = $1` ) var ( errLockNotFound = errors.New("lock is not in database") ) -// Lock tries to set a temporary lock in the database. -// -// Lock does not block, instead, it returns true and its expiration time -// is the lock has been successfully acquired or false otherwise. -func (tx *pgSession) Lock(name string, owner string, duration time.Duration, renew bool) (bool, time.Time, error) { - if name == "" || owner == "" || duration == 0 { - log.Warning("could not create an invalid lock") - return false, time.Time{}, commonerr.NewBadRequestError("Invalid Lock Parameters") +func (tx *pgSession) AcquireLock(lockName, whoami string, desiredDuration time.Duration) (bool, time.Time, error) { + if lockName == "" || whoami == "" || desiredDuration == 0 { + panic("invalid lock parameters") } - until := time.Now().Add(duration) - if renew { - defer observeQueryTime("Lock", "update", time.Now()) - // Renew lock. - r, err := tx.Exec(updateLock, name, owner, until) - if err != nil { - return false, until, handleError("updateLock", err) - } - - if n, err := r.RowsAffected(); err == nil { - return n > 0, until, nil - } - return false, until, handleError("updateLock", err) - } else if err := tx.pruneLocks(); err != nil { - return false, until, err + if err := tx.pruneLocks(); err != nil { + return false, time.Time{}, err } - // Lock. + var ( + desiredLockedUntil = time.Now().Add(desiredDuration) + + lockedUntil time.Time + lockOwner string + ) + defer observeQueryTime("Lock", "soiLock", time.Now()) - _, err := tx.Exec(soiLock, name, owner, until) + err := tx.QueryRow(soiLock, lockName, whoami, desiredLockedUntil).Scan(&lockOwner, &lockedUntil) + return lockOwner == whoami, lockedUntil, err +} + +func (tx *pgSession) ExtendLock(lockName, whoami string, desiredDuration time.Duration) (bool, time.Time, error) { + if lockName == "" || whoami == "" || desiredDuration == 0 { + panic("invalid lock parameters") + } + + desiredLockedUntil := time.Now().Add(desiredDuration) + + defer observeQueryTime("Lock", "update", time.Now()) + result, err := tx.Exec(updateLock, lockName, whoami, desiredLockedUntil) if err != nil { - if isErrUniqueViolation(err) { - // Return the existing locks expiration. - err := tx.QueryRow(searchLock, name).Scan(&until) - return false, until, handleError("searchLock", err) - } - return false, until, handleError("insertLock", err) + return false, time.Time{}, handleError("updateLock", err) } - return true, until, nil + + if numRows, err := result.RowsAffected(); err == nil { + // This is the only happy path. + return numRows > 0, desiredLockedUntil, nil + } + + return false, time.Time{}, handleError("updateLock", err) } -// Unlock unlocks a lock specified by its name if I own it -func (tx *pgSession) Unlock(name, owner string) error { +func (tx *pgSession) ReleaseLock(name, owner string) error { if name == "" || owner == "" { - return commonerr.NewBadRequestError("Invalid Lock Parameters") + panic("invalid lock parameters") } defer observeQueryTime("Unlock", "all", time.Now()) - _, err := tx.Exec(removeLock, name, owner) return err } diff --git a/database/pgsql/lock_test.go b/database/pgsql/lock_test.go index d2a7ebf3..87806ca3 100644 --- a/database/pgsql/lock_test.go +++ b/database/pgsql/lock_test.go @@ -1,4 +1,4 @@ -// Copyright 2016 clair authors +// Copyright 2019 clair authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,8 +19,23 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +func TestAcquireLockReturnsExistingLockDuration(t *testing.T) { + datastore, tx := openSessionForTest(t, "Lock", true) + defer datastore.Close() + + acquired, originalExpiration, err := tx.AcquireLock("test1", "owner1", time.Minute) + require.Nil(t, err) + require.True(t, acquired) + + acquired2, expiration, err := tx.AcquireLock("test1", "owner2", time.Hour) + require.Nil(t, err) + require.False(t, acquired2) + require.Equal(t, expiration, originalExpiration) +} + func TestLock(t *testing.T) { datastore, tx := openSessionForTest(t, "Lock", true) defer datastore.Close() @@ -28,52 +43,52 @@ func TestLock(t *testing.T) { var l bool // Create a first lock. - l, _, err := tx.Lock("test1", "owner1", time.Minute, false) + l, _, err := tx.AcquireLock("test1", "owner1", time.Minute) assert.Nil(t, err) assert.True(t, l) tx = restartSession(t, datastore, tx, true) // lock again by itself, the previous lock is not expired yet. - l, _, err = tx.Lock("test1", "owner1", time.Minute, false) + l, _, err = tx.AcquireLock("test1", "owner1", time.Minute) assert.Nil(t, err) assert.False(t, l) tx = restartSession(t, datastore, tx, false) // Try to renew the same lock with another owner. - l, _, err = tx.Lock("test1", "owner2", time.Minute, true) + l, _, err = tx.ExtendLock("test1", "owner2", time.Minute) assert.Nil(t, err) assert.False(t, l) tx = restartSession(t, datastore, tx, false) - l, _, err = tx.Lock("test1", "owner2", time.Minute, false) + l, _, err = tx.AcquireLock("test1", "owner2", time.Minute) assert.Nil(t, err) assert.False(t, l) tx = restartSession(t, datastore, tx, false) // Renew the lock. - l, _, err = tx.Lock("test1", "owner1", 2*time.Minute, true) + l, _, err = tx.ExtendLock("test1", "owner1", 2*time.Minute) assert.Nil(t, err) assert.True(t, l) tx = restartSession(t, datastore, tx, true) // Unlock and then relock by someone else. - err = tx.Unlock("test1", "owner1") + err = tx.ReleaseLock("test1", "owner1") assert.Nil(t, err) tx = restartSession(t, datastore, tx, true) - l, _, err = tx.Lock("test1", "owner2", time.Minute, false) + l, _, err = tx.AcquireLock("test1", "owner2", time.Minute) assert.Nil(t, err) assert.True(t, l) tx = restartSession(t, datastore, tx, true) // Create a second lock which is actually already expired ... - l, _, err = tx.Lock("test2", "owner1", -time.Minute, false) + l, _, err = tx.AcquireLock("test2", "owner1", -time.Minute) assert.Nil(t, err) assert.True(t, l) tx = restartSession(t, datastore, tx, true) // Take over the lock - l, _, err = tx.Lock("test2", "owner2", time.Minute, false) + l, _, err = tx.AcquireLock("test2", "owner2", time.Minute) assert.Nil(t, err) assert.True(t, l) tx = restartSession(t, datastore, tx, true) diff --git a/notifier.go b/notifier.go index 74cf91c4..69eb6058 100644 --- a/notifier.go +++ b/notifier.go @@ -1,4 +1,4 @@ -// Copyright 2017 clair authors +// Copyright 2019 clair authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -113,7 +113,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop case <-done: break outer case <-time.After(notifierLockRefreshDuration): - database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, true) + database.ExtendLock(datastore, notification.Name, whoAmI, notifierLockDuration) case <-stopper.Chan(): running = false break @@ -141,7 +141,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA } // Lock the notification. - if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock { + if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration); hasLock { log.WithField(logNotiName, notification.Name).Info("found and locked a notification") return ¬ification } diff --git a/updater.go b/updater.go index 08582635..e0d29ad9 100644 --- a/updater.go +++ b/updater.go @@ -124,7 +124,7 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper if nextUpdate.Before(time.Now().UTC()) { // Attempt to get a lock on the update. log.Debug("attempting to obtain update lock") - acquiredLock, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false) + acquiredLock, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration) if lockExpiration.IsZero() { // Any failures to acquire the lock should instantly expire. var instantExpiration time.Duration @@ -167,7 +167,7 @@ func updateWhileRenewingLock(datastore database.Datastore, whoAmI string, isFirs for { select { case <-time.After(timeutil.FractionalDuration(0.9, refreshDuration)): - success, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockRefreshDuration, true) + success, lockExpiration := database.ExtendLock(datastore, updaterLockName, whoAmI, updaterLockRefreshDuration) if !success { return errors.New("failed to extend lock") }