updater: refactor to use errgroup

This addresses a race condition and makes this code much more
understandable.
This commit is contained in:
Jimmy Zelinskie 2019-01-07 14:20:31 -05:00
parent 399deab100
commit 6c5be7e1c6

View File

@ -15,20 +15,22 @@
package clair package clair
import ( import (
"context"
"errors"
"fmt" "fmt"
"math/rand"
"strconv" "strconv"
"sync" "sync"
"time" "time"
"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/coreos/clair/database" "github.com/coreos/clair/database"
"github.com/coreos/clair/ext/vulnmdsrc" "github.com/coreos/clair/ext/vulnmdsrc"
"github.com/coreos/clair/ext/vulnsrc" "github.com/coreos/clair/ext/vulnsrc"
"github.com/coreos/clair/pkg/stopper" "github.com/coreos/clair/pkg/stopper"
"github.com/coreos/clair/pkg/timeutil"
"github.com/pborman/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
) )
const ( const (
@ -76,9 +78,8 @@ type vulnerabilityChange struct {
new *database.VulnerabilityWithAffected new *database.VulnerabilityWithAffected
} }
// RunUpdater begins a process that updates the vulnerability database at /*
// regular intervals. func runUpdaterOld(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) {
func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) {
defer st.End() defer st.End()
// Do not run the updater if there is no config or if the interval is 0. // Do not run the updater if there is no config or if the interval is 0.
@ -168,23 +169,129 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper
log.Info("updater service stopped") log.Info("updater service stopped")
} }
*/
// sleepUpdater sleeps the updater for an approximate duration, but remains // RunUpdater begins a process that updates the vulnerability database at
// able to be cancelled by a stopper. // regular intervals.
func sleepUpdater(approxWakeup time.Time, st *stopper.Stopper) (stopped bool) { func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) {
waitUntil := approxWakeup.Add(time.Duration(rand.ExpFloat64()/0.5) * time.Second) defer st.End()
log.WithField("scheduled time", waitUntil).Debug("updater sleeping")
if !waitUntil.Before(time.Now().UTC()) { // Do not run the updater if there is no config or if the interval is 0.
if !st.Sleep(waitUntil.Sub(time.Now())) { if config == nil || config.Interval == 0 || len(config.EnabledUpdaters) == 0 {
return true log.Info("updater service is disabled.")
return
}
// Clean up any resources the updater left behind.
defer func() {
for _, appenders := range vulnmdsrc.Appenders() {
appenders.Clean()
}
for _, updaters := range vulnsrc.Updaters() {
updaters.Clean()
}
log.Info("updater service stopped")
}()
// Create a new unique identity for tracking who owns global locks.
whoAmI := uuid.New()
log.WithField("owner", whoAmI).Info("updater service started")
sleepDuration := updaterSleepBetweenLoopsDuration
for {
// Determine if this is the first update and define the next update time.
// The next update time is (last update time + interval) or now if this is the first update.
nextUpdate := time.Now().UTC()
lastUpdate, isFirstUpdate, err := GetLastUpdateTime(datastore)
if err != nil {
log.WithError(err).Error("an error occurred while getting the last update time")
nextUpdate = nextUpdate.Add(config.Interval)
}
log.WithFields(log.Fields{
"firstUpdate": isFirstUpdate,
"nextUpdate": nextUpdate,
}).Debug("fetched last update time")
if !isFirstUpdate {
nextUpdate = lastUpdate.Add(config.Interval)
}
// If the next update timer is in the past, then try to update.
if nextUpdate.Before(time.Now().UTC()) {
// Attempt to get a lock on the the update.
log.Debug("attempting to obtain update lock")
acquiredLock, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false)
if lockExpiration.IsZero() {
// Any failures to acquire the lock should instantly expire.
var instantExpiration time.Duration
sleepDuration = instantExpiration
}
if acquiredLock {
sleepDuration, err = updateWhileRenewingLock(datastore, whoAmI, isFirstUpdate, st)
if err != nil {
if err == errReceivedStopSignal {
return
}
log.WithError(err).Debug("failed to acquired lock")
sleepDuration = timeutil.ExpBackoff(sleepDuration, config.Interval)
}
} else {
sleepDuration = updaterSleepBetweenLoopsDuration
}
} else {
sleepDuration = time.Until(nextUpdate)
}
if stopped := timeutil.ApproxSleep(time.Now().Add(sleepDuration), st); stopped {
return
} }
} }
return false }
var errReceivedStopSignal = errors.New("stopped")
func updateWhileRenewingLock(datastore database.Datastore, whoAmI string, isFirstUpdate bool, st *stopper.Stopper) (sleepDuration time.Duration, err error) {
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
// todo handle ctx
return update(ctx, datastore, isFirstUpdate)
})
g.Go(func() error {
var refreshDuration = updaterLockRefreshDuration
for {
select {
case <-time.After(timeutil.FractionalDuration(0.9, refreshDuration)):
success, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockRefreshDuration, true)
if !success {
return errors.New("failed to extend lock")
}
refreshDuration = time.Until(lockExpiration)
case <-ctx.Done():
database.ReleaseLock(datastore, updaterLockName, whoAmI)
return nil
}
}
})
g.Go(func() error {
select {
case <-st.Chan():
return errReceivedStopSignal
case <-ctx.Done():
return nil
}
})
err = g.Wait()
return
} }
// update fetches all the vulnerabilities from the registered fetchers, updates // update fetches all the vulnerabilities from the registered fetchers, updates
// vulnerabilities, and updater flags, and logs notes from updaters. // vulnerabilities, and updater flags, and logs notes from updaters.
func update(datastore database.Datastore, firstUpdate bool) { func update(ctx context.Context, datastore database.Datastore, firstUpdate bool) error {
defer setUpdaterDuration(time.Now()) defer setUpdaterDuration(time.Now())
log.Info("updating vulnerabilities") log.Info("updating vulnerabilities")
@ -209,7 +316,7 @@ func update(datastore database.Datastore, firstUpdate bool) {
if err := database.PersistNamespacesAndCommit(datastore, namespaces); err != nil { if err := database.PersistNamespacesAndCommit(datastore, namespaces); err != nil {
log.WithError(err).Error("Unable to insert namespaces") log.WithError(err).Error("Unable to insert namespaces")
return return err
} }
changes, err := updateVulnerabilities(datastore, vulnerabilities) changes, err := updateVulnerabilities(datastore, vulnerabilities)
@ -222,21 +329,21 @@ func update(datastore database.Datastore, firstUpdate bool) {
if err != nil { if err != nil {
log.WithError(err).Error("Unable to update vulnerabilities") log.WithError(err).Error("Unable to update vulnerabilities")
return return err
} }
if !firstUpdate { if !firstUpdate {
err = createVulnerabilityNotifications(datastore, changes) err = createVulnerabilityNotifications(datastore, changes)
if err != nil { if err != nil {
log.WithError(err).Error("Unable to create notifications") log.WithError(err).Error("Unable to create notifications")
return return err
} }
} }
err = updateUpdaterFlags(datastore, flags) err = updateUpdaterFlags(datastore, flags)
if err != nil { if err != nil {
log.WithError(err).Error("Unable to update updater flags") log.WithError(err).Error("Unable to update updater flags")
return return err
} }
for _, note := range notes { for _, note := range notes {
@ -248,11 +355,12 @@ func update(datastore database.Datastore, firstUpdate bool) {
err = setLastUpdateTime(datastore) err = setLastUpdateTime(datastore)
if err != nil { if err != nil {
log.WithError(err).Error("Unable to set last update time") log.WithError(err).Error("Unable to set last update time")
return return err
} }
} }
log.Info("update finished") log.Info("update finished")
return nil
} }
func setUpdaterDuration(start time.Time) { func setUpdaterDuration(start time.Time) {