diff --git a/updater.go b/updater.go index 6bded33a..23e46ddd 100644 --- a/updater.go +++ b/updater.go @@ -15,20 +15,22 @@ package clair import ( + "context" + "errors" "fmt" - "math/rand" "strconv" "sync" "time" - "github.com/pborman/uuid" - "github.com/prometheus/client_golang/prometheus" - log "github.com/sirupsen/logrus" - "github.com/coreos/clair/database" "github.com/coreos/clair/ext/vulnmdsrc" "github.com/coreos/clair/ext/vulnsrc" "github.com/coreos/clair/pkg/stopper" + "github.com/coreos/clair/pkg/timeutil" + "github.com/pborman/uuid" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" ) const ( @@ -76,9 +78,8 @@ type vulnerabilityChange struct { new *database.VulnerabilityWithAffected } -// RunUpdater begins a process that updates the vulnerability database at -// regular intervals. -func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) { +/* +func runUpdaterOld(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) { defer st.End() // Do not run the updater if there is no config or if the interval is 0. @@ -168,23 +169,129 @@ func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper log.Info("updater service stopped") } +*/ -// sleepUpdater sleeps the updater for an approximate duration, but remains -// able to be cancelled by a stopper. -func sleepUpdater(approxWakeup time.Time, st *stopper.Stopper) (stopped bool) { - waitUntil := approxWakeup.Add(time.Duration(rand.ExpFloat64()/0.5) * time.Second) - log.WithField("scheduled time", waitUntil).Debug("updater sleeping") - if !waitUntil.Before(time.Now().UTC()) { - if !st.Sleep(waitUntil.Sub(time.Now())) { - return true +// RunUpdater begins a process that updates the vulnerability database at +// regular intervals. +func RunUpdater(config *UpdaterConfig, datastore database.Datastore, st *stopper.Stopper) { + defer st.End() + + // Do not run the updater if there is no config or if the interval is 0. + if config == nil || config.Interval == 0 || len(config.EnabledUpdaters) == 0 { + log.Info("updater service is disabled.") + return + } + + // Clean up any resources the updater left behind. + defer func() { + for _, appenders := range vulnmdsrc.Appenders() { + appenders.Clean() + } + for _, updaters := range vulnsrc.Updaters() { + updaters.Clean() + } + + log.Info("updater service stopped") + }() + + // Create a new unique identity for tracking who owns global locks. + whoAmI := uuid.New() + log.WithField("owner", whoAmI).Info("updater service started") + + sleepDuration := updaterSleepBetweenLoopsDuration + for { + // Determine if this is the first update and define the next update time. + // The next update time is (last update time + interval) or now if this is the first update. + nextUpdate := time.Now().UTC() + lastUpdate, isFirstUpdate, err := GetLastUpdateTime(datastore) + if err != nil { + log.WithError(err).Error("an error occurred while getting the last update time") + nextUpdate = nextUpdate.Add(config.Interval) + } + + log.WithFields(log.Fields{ + "firstUpdate": isFirstUpdate, + "nextUpdate": nextUpdate, + }).Debug("fetched last update time") + if !isFirstUpdate { + nextUpdate = lastUpdate.Add(config.Interval) + } + + // If the next update timer is in the past, then try to update. + if nextUpdate.Before(time.Now().UTC()) { + // Attempt to get a lock on the the update. + log.Debug("attempting to obtain update lock") + acquiredLock, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockDuration, false) + if lockExpiration.IsZero() { + // Any failures to acquire the lock should instantly expire. + var instantExpiration time.Duration + sleepDuration = instantExpiration + } + + if acquiredLock { + sleepDuration, err = updateWhileRenewingLock(datastore, whoAmI, isFirstUpdate, st) + if err != nil { + if err == errReceivedStopSignal { + return + } + log.WithError(err).Debug("failed to acquired lock") + sleepDuration = timeutil.ExpBackoff(sleepDuration, config.Interval) + } + } else { + sleepDuration = updaterSleepBetweenLoopsDuration + } + } else { + sleepDuration = time.Until(nextUpdate) + } + + if stopped := timeutil.ApproxSleep(time.Now().Add(sleepDuration), st); stopped { + return } } - return false +} + +var errReceivedStopSignal = errors.New("stopped") + +func updateWhileRenewingLock(datastore database.Datastore, whoAmI string, isFirstUpdate bool, st *stopper.Stopper) (sleepDuration time.Duration, err error) { + g, ctx := errgroup.WithContext(context.Background()) + g.Go(func() error { + // todo handle ctx + return update(ctx, datastore, isFirstUpdate) + }) + + g.Go(func() error { + var refreshDuration = updaterLockRefreshDuration + for { + select { + case <-time.After(timeutil.FractionalDuration(0.9, refreshDuration)): + success, lockExpiration := database.AcquireLock(datastore, updaterLockName, whoAmI, updaterLockRefreshDuration, true) + if !success { + return errors.New("failed to extend lock") + } + refreshDuration = time.Until(lockExpiration) + case <-ctx.Done(): + database.ReleaseLock(datastore, updaterLockName, whoAmI) + return nil + } + } + }) + + g.Go(func() error { + select { + case <-st.Chan(): + return errReceivedStopSignal + case <-ctx.Done(): + return nil + } + }) + + err = g.Wait() + return } // update fetches all the vulnerabilities from the registered fetchers, updates // vulnerabilities, and updater flags, and logs notes from updaters. -func update(datastore database.Datastore, firstUpdate bool) { +func update(ctx context.Context, datastore database.Datastore, firstUpdate bool) error { defer setUpdaterDuration(time.Now()) log.Info("updating vulnerabilities") @@ -209,7 +316,7 @@ func update(datastore database.Datastore, firstUpdate bool) { if err := database.PersistNamespacesAndCommit(datastore, namespaces); err != nil { log.WithError(err).Error("Unable to insert namespaces") - return + return err } changes, err := updateVulnerabilities(datastore, vulnerabilities) @@ -222,21 +329,21 @@ func update(datastore database.Datastore, firstUpdate bool) { if err != nil { log.WithError(err).Error("Unable to update vulnerabilities") - return + return err } if !firstUpdate { err = createVulnerabilityNotifications(datastore, changes) if err != nil { log.WithError(err).Error("Unable to create notifications") - return + return err } } err = updateUpdaterFlags(datastore, flags) if err != nil { log.WithError(err).Error("Unable to update updater flags") - return + return err } for _, note := range notes { @@ -248,11 +355,12 @@ func update(datastore database.Datastore, firstUpdate bool) { err = setLastUpdateTime(datastore) if err != nil { log.WithError(err).Error("Unable to set last update time") - return + return err } } log.Info("update finished") + return nil } func setUpdaterDuration(start time.Time) {