Merge pull request #723 from jzelinskie/lock-tx

database: make locks SOI & add Extend method
master
Jimmy Zelinskie 5 years ago committed by GitHub
commit 4fa03d1c78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,4 @@
// Copyright 2017 clair authors
// Copyright 2019 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -182,16 +182,22 @@ type Session interface {
// FindKeyValue retrieves a value from the given key.
FindKeyValue(key string) (value string, found bool, err error)
// Lock acquires or renews a lock in the database with the given name, owner
// and duration without blocking. After the specified duration, the lock
// expires if it hasn't already been unlocked in order to prevent a deadlock.
// AcquireLock acquires a brand new lock in the database with a given name
// for the given duration.
//
// If the acquisition of a lock is not successful, expiration should be
// the time that existing lock expires.
Lock(name string, owner string, duration time.Duration, renew bool) (success bool, expiration time.Time, err error)
// A lock can only have one owner.
// This method should NOT block until a lock is acquired.
AcquireLock(name, owner string, duration time.Duration) (acquired bool, expiration time.Time, err error)
// Unlock releases an existing Lock.
Unlock(name, owner string) error
// ExtendLock extends an existing lock such that the lock will expire at the
// current time plus the provided duration.
//
// This method should return immediately with an error if the lock does not
// exist.
ExtendLock(name, owner string, duration time.Duration) (extended bool, expiration time.Time, err error)
// ReleaseLock releases an existing lock.
ReleaseLock(name, owner string) error
}
// Datastore represents a persistent data store

@ -322,19 +322,14 @@ func MergeLayers(l *Layer, new *Layer) *Layer {
}
// AcquireLock acquires a named global lock for a duration.
//
// If renewal is true, the lock is extended as long as the same owner is
// attempting to renew the lock.
func AcquireLock(datastore Datastore, name, owner string, duration time.Duration, renewal bool) (success bool, expiration time.Time) {
// any error will cause the function to catch the error and return false.
func AcquireLock(datastore Datastore, name, owner string, duration time.Duration) (acquired bool, expiration time.Time) {
tx, err := datastore.Begin()
if err != nil {
return false, time.Time{}
}
defer tx.Rollback()
locked, t, err := tx.Lock(name, owner, duration, renewal)
locked, t, err := tx.AcquireLock(name, owner, duration)
if err != nil {
return false, time.Time{}
}
@ -348,16 +343,38 @@ func AcquireLock(datastore Datastore, name, owner string, duration time.Duration
return locked, t
}
// ExtendLock extends the duration of an existing global lock for the given
// duration.
func ExtendLock(ds Datastore, name, whoami string, desiredLockDuration time.Duration) (extended bool, expiration time.Time) {
tx, err := ds.Begin()
if err != nil {
return false, time.Time{}
}
defer tx.Rollback()
locked, expiration, err := tx.ExtendLock(name, whoami, desiredLockDuration)
if err != nil {
return false, time.Time{}
}
if locked {
if err := tx.Commit(); err == nil {
return
}
}
return false, time.Time{}
}
// ReleaseLock releases a named global lock.
func ReleaseLock(datastore Datastore, name, owner string) {
tx, err := datastore.Begin()
if err != nil {
return
}
defer tx.Rollback()
if err := tx.Unlock(name, owner); err != nil {
if err := tx.ReleaseLock(name, owner); err != nil {
return
}
if err := tx.Commit(); err != nil {

@ -46,8 +46,9 @@ type MockSession struct {
FctDeleteNotification func(name string) error
FctUpdateKeyValue func(key, value string) error
FctFindKeyValue func(key string) (string, bool, error)
FctLock func(name string, owner string, duration time.Duration, renew bool) (bool, time.Time, error)
FctUnlock func(name, owner string) error
FctAcquireLock func(name, owner string, duration time.Duration) (bool, time.Time, error)
FctExtendLock func(name, owner string, duration time.Duration) (bool, time.Time, error)
FctReleaseLock func(name, owner string) error
}
func (ms *MockSession) Commit() error {
@ -205,16 +206,23 @@ func (ms *MockSession) FindKeyValue(key string) (string, bool, error) {
panic("required mock function not implemented")
}
func (ms *MockSession) Lock(name string, owner string, duration time.Duration, renew bool) (bool, time.Time, error) {
if ms.FctLock != nil {
return ms.FctLock(name, owner, duration, renew)
func (ms *MockSession) AcquireLock(name, owner string, duration time.Duration) (bool, time.Time, error) {
if ms.FctAcquireLock != nil {
return ms.FctAcquireLock(name, owner, duration)
}
panic("required mock function not implemented")
}
func (ms *MockSession) Unlock(name, owner string) error {
if ms.FctUnlock != nil {
return ms.FctUnlock(name, owner)
func (ms *MockSession) ExtendLock(name, owner string, duration time.Duration) (bool, time.Time, error) {
if ms.FctExtendLock != nil {
return ms.FctExtendLock(name, owner, duration)
}
panic("required mock function not implemented")
}
func (ms *MockSession) ReleaseLock(name, owner string) error {
if ms.FctReleaseLock != nil {
return ms.FctReleaseLock(name, owner)
}
panic("required mock function not implemented")
}

@ -1,4 +1,4 @@
// Copyright 2017 clair authors
// Copyright 2019 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -19,71 +19,77 @@ import (
"time"
log "github.com/sirupsen/logrus"
"github.com/coreos/clair/pkg/commonerr"
)
const (
soiLock = `INSERT INTO lock(name, owner, until) VALUES ($1, $2, $3)`
searchLock = `SELECT until FROM Lock WHERE name = $1`
updateLock = `UPDATE Lock SET until = $3 WHERE name = $1 AND owner = $2`
removeLock = `DELETE FROM Lock WHERE name = $1 AND owner = $2`
removeLockExpired = `DELETE FROM LOCK WHERE until < CURRENT_TIMESTAMP`
soiLock = `WITH new_lock AS (
INSERT INTO lock (name, owner, until)
VALUES ( $1, $2, $3)
WHERE NOT EXISTS (SELECT id FROM lock WHERE name = $1)
RETURNING owner, until
)
SELECT * FROM new_lock
UNION
SELECT owner, until FROM lock WHERE name = $1`
)
var (
errLockNotFound = errors.New("lock is not in database")
)
// Lock tries to set a temporary lock in the database.
//
// Lock does not block, instead, it returns true and its expiration time
// is the lock has been successfully acquired or false otherwise.
func (tx *pgSession) Lock(name string, owner string, duration time.Duration, renew bool) (bool, time.Time, error) {
if name == "" || owner == "" || duration == 0 {
log.Warning("could not create an invalid lock")
return false, time.Time{}, commonerr.NewBadRequestError("Invalid Lock Parameters")
func (tx *pgSession) AcquireLock(lockName, whoami string, desiredDuration time.Duration) (bool, time.Time, error) {
if lockName == "" || whoami == "" || desiredDuration == 0 {
panic("invalid lock parameters")
}
until := time.Now().Add(duration)
if renew {
defer observeQueryTime("Lock", "update", time.Now())
// Renew lock.
r, err := tx.Exec(updateLock, name, owner, until)
if err != nil {
return false, until, handleError("updateLock", err)
}
if n, err := r.RowsAffected(); err == nil {
return n > 0, until, nil
}
return false, until, handleError("updateLock", err)
} else if err := tx.pruneLocks(); err != nil {
return false, until, err
if err := tx.pruneLocks(); err != nil {
return false, time.Time{}, err
}
// Lock.
var (
desiredLockedUntil = time.Now().Add(desiredDuration)
lockedUntil time.Time
lockOwner string
)
defer observeQueryTime("Lock", "soiLock", time.Now())
_, err := tx.Exec(soiLock, name, owner, until)
err := tx.QueryRow(soiLock, lockName, whoami, desiredLockedUntil).Scan(&lockOwner, &lockedUntil)
return lockOwner == whoami, lockedUntil, err
}
func (tx *pgSession) ExtendLock(lockName, whoami string, desiredDuration time.Duration) (bool, time.Time, error) {
if lockName == "" || whoami == "" || desiredDuration == 0 {
panic("invalid lock parameters")
}
desiredLockedUntil := time.Now().Add(desiredDuration)
defer observeQueryTime("Lock", "update", time.Now())
result, err := tx.Exec(updateLock, lockName, whoami, desiredLockedUntil)
if err != nil {
if isErrUniqueViolation(err) {
// Return the existing locks expiration.
err := tx.QueryRow(searchLock, name).Scan(&until)
return false, until, handleError("searchLock", err)
}
return false, until, handleError("insertLock", err)
return false, time.Time{}, handleError("updateLock", err)
}
return true, until, nil
if numRows, err := result.RowsAffected(); err == nil {
// This is the only happy path.
return numRows > 0, desiredLockedUntil, nil
}
return false, time.Time{}, handleError("updateLock", err)
}
// Unlock unlocks a lock specified by its name if I own it
func (tx *pgSession) Unlock(name, owner string) error {
func (tx *pgSession) ReleaseLock(name, owner string) error {
if name == "" || owner == "" {
return commonerr.NewBadRequestError("Invalid Lock Parameters")
panic("invalid lock parameters")
}
defer observeQueryTime("Unlock", "all", time.Now())
_, err := tx.Exec(removeLock, name, owner)
return err
}

@ -1,4 +1,4 @@
// Copyright 2016 clair authors
// Copyright 2019 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -19,8 +19,23 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestAcquireLockReturnsExistingLockDuration(t *testing.T) {
datastore, tx := openSessionForTest(t, "Lock", true)
defer datastore.Close()
acquired, originalExpiration, err := tx.AcquireLock("test1", "owner1", time.Minute)
require.Nil(t, err)
require.True(t, acquired)
acquired2, expiration, err := tx.AcquireLock("test1", "owner2", time.Hour)
require.Nil(t, err)
require.False(t, acquired2)
require.Equal(t, expiration, originalExpiration)
}
func TestLock(t *testing.T) {
datastore, tx := openSessionForTest(t, "Lock", true)
defer datastore.Close()
@ -28,52 +43,52 @@ func TestLock(t *testing.T) {
var l bool
// Create a first lock.
l, _, err := tx.Lock("test1", "owner1", time.Minute, false)
l, _, err := tx.AcquireLock("test1", "owner1", time.Minute)
assert.Nil(t, err)
assert.True(t, l)
tx = restartSession(t, datastore, tx, true)
// lock again by itself, the previous lock is not expired yet.
l, _, err = tx.Lock("test1", "owner1", time.Minute, false)
l, _, err = tx.AcquireLock("test1", "owner1", time.Minute)
assert.Nil(t, err)
assert.False(t, l)
tx = restartSession(t, datastore, tx, false)
// Try to renew the same lock with another owner.
l, _, err = tx.Lock("test1", "owner2", time.Minute, true)
l, _, err = tx.ExtendLock("test1", "owner2", time.Minute)
assert.Nil(t, err)
assert.False(t, l)
tx = restartSession(t, datastore, tx, false)
l, _, err = tx.Lock("test1", "owner2", time.Minute, false)
l, _, err = tx.AcquireLock("test1", "owner2", time.Minute)
assert.Nil(t, err)
assert.False(t, l)
tx = restartSession(t, datastore, tx, false)
// Renew the lock.
l, _, err = tx.Lock("test1", "owner1", 2*time.Minute, true)
l, _, err = tx.ExtendLock("test1", "owner1", 2*time.Minute)
assert.Nil(t, err)
assert.True(t, l)
tx = restartSession(t, datastore, tx, true)
// Unlock and then relock by someone else.
err = tx.Unlock("test1", "owner1")
err = tx.ReleaseLock("test1", "owner1")
assert.Nil(t, err)
tx = restartSession(t, datastore, tx, true)
l, _, err = tx.Lock("test1", "owner2", time.Minute, false)
l, _, err = tx.AcquireLock("test1", "owner2", time.Minute)
assert.Nil(t, err)
assert.True(t, l)
tx = restartSession(t, datastore, tx, true)
// Create a second lock which is actually already expired ...
l, _, err = tx.Lock("test2", "owner1", -time.Minute, false)
l, _, err = tx.AcquireLock("test2", "owner1", -time.Minute)
assert.Nil(t, err)
assert.True(t, l)
tx = restartSession(t, datastore, tx, true)
// Take over the lock
l, _, err = tx.Lock("test2", "owner2", time.Minute, false)
l, _, err = tx.AcquireLock("test2", "owner2", time.Minute)
assert.Nil(t, err)
assert.True(t, l)
tx = restartSession(t, datastore, tx, true)

@ -1,4 +1,4 @@
// Copyright 2017 clair authors
// Copyright 2019 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -113,7 +113,7 @@ func RunNotifier(config *notification.Config, datastore database.Datastore, stop
case <-done:
break outer
case <-time.After(notifierLockRefreshDuration):
database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, true)
database.ExtendLock(datastore, notification.Name, whoAmI, notifierLockDuration)
case <-stopper.Chan():
running = false
break
@ -141,7 +141,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA
}
// Lock the notification.
if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration, false); hasLock {
if hasLock, _ := database.AcquireLock(datastore, notification.Name, whoAmI, notifierLockDuration); hasLock {
log.WithField(logNotiName, notification.Name).Info("found and locked a notification")
return &notification
}

@ -124,7 +124,7 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper
if nextUpdate.Before(time.Now().UTC()) {
// Attempt to get a lock on the update.
log.Debug("attempting to obtain update lock")
acquiredLock, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false)
acquiredLock, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration)
if lockExpiration.IsZero() {
// Any failures to acquire the lock should instantly expire.
var instantExpiration time.Duration
@ -167,7 +167,7 @@ func updateWhileRenewingLock(datastore database.Datastore, whoAmI string, isFirs
for {
select {
case <-time.After(timeutil.FractionalDuration(0.9, refreshDuration)):
success, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockRefreshDuration, true)
success, lockExpiration := database.ExtendLock(datastore, updaterLockName, whoAmI, updaterLockRefreshDuration)
if !success {
return errors.New("failed to extend lock")
}

Loading…
Cancel
Save