@ -17,6 +17,8 @@
package updater
import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"
@ -30,23 +32,12 @@ import (
const (
flagName = "updater"
notesFlagName = "updater/notes"
refreshLockDuration = time . Minute * 8
lockDuration = refreshLockDuration + time . Minute * 2
// healthMaxConsecutiveLocalFailures defines the number of times the updater
// can fail before we should tag it as unhealthy
healthMaxConsecutiveLocalFailures = 5
)
var (
log = capnslog . NewPackageLogger ( "github.com/coreos/clair" , "updater" )
healthLatestSuccessfulUpdate time . Time
healthLockOwner string
healthIdentifier string
healthConsecutiveLocalFailures int
healthNotes [ ] string
)
var log = capnslog . NewPackageLogger ( "github.com/coreos/clair" , "updater" )
func init ( ) {
health . RegisterHealthchecker ( "updater" , Healthcheck )
@ -63,19 +54,17 @@ func Run(interval time.Duration, st *utils.Stopper) {
}
whoAmI := uuid . New ( )
healthIdentifier = whoAmI
log . Infof ( "updater service started. lock identifier: %s" , whoAmI )
for {
// Set the next update time to (last update time + interval) or now if there
// is no last update time stored in database (first update) or if an error
// occurs
nextUpdate := time . Now ( ) . UTC ( )
if lastUpdateTSS , err := database . GetFlagValue ( flagName ) ; err == nil && lastUpdateTSS != "" {
if lastUpdateTS , err := strconv . ParseInt ( lastUpdateTSS , 10 , 64 ) ; err == nil {
healthLatestSuccessfulUpdate = time . Unix ( lastUpdateTS , 0 )
nextUpdate = time . Unix ( lastUpdateTS , 0 ) . Add ( interval )
}
var nextUpdate time . Time
if lastUpdate := getLastUpdate ( ) ; ! lastUpdate . IsZero ( ) {
nextUpdate = lastUpdate . Add ( interval )
} else {
nextUpdate = time . Now ( ) . UTC ( )
}
// If the next update timer is in the past, then try to update.
@ -84,8 +73,6 @@ func Run(interval time.Duration, st *utils.Stopper) {
log . Debug ( "attempting to obtain update lock" )
hasLock , hasLockUntil := database . Lock ( flagName , lockDuration , whoAmI )
if hasLock {
healthLockOwner = healthIdentifier
// Launch update in a new go routine.
doneC := make ( chan bool , 1 )
go func ( ) {
@ -105,6 +92,8 @@ func Run(interval time.Duration, st *utils.Stopper) {
// Unlock the update.
database . Unlock ( flagName , whoAmI )
continue
} else {
lockOwner , lockExpiration , err := database . LockInfo ( flagName )
if err != nil {
@ -113,7 +102,6 @@ func Run(interval time.Duration, st *utils.Stopper) {
} else {
log . Debugf ( "update lock is already taken by %s until %v" , lockOwner , lockExpiration )
nextUpdate = lockExpiration
healthLockOwner = lockOwner
}
}
}
@ -137,8 +125,62 @@ func Run(interval time.Duration, st *utils.Stopper) {
func Update ( ) {
log . Info ( "updating vulnerabilities" )
// Fetch updates.
status , responses := fetch ( )
// Merge responses.
vulnerabilities , packages , flags , notes , err := mergeAndVerify ( responses )
if err != nil {
log . Errorf ( "an error occured when merging update responses: %s" , err )
return
}
responses = nil
// TODO(Quentin-M): Complete informations using NVD
// Insert packages.
log . Tracef ( "beginning insertion of %d packages for update" , len ( packages ) )
err = database . InsertPackages ( packages )
if err != nil {
log . Errorf ( "an error occured when inserting packages for update: %s" , err )
return
}
packages = nil
// Insert vulnerabilities.
log . Tracef ( "beginning insertion of %d vulnerabilities for update" , len ( vulnerabilities ) )
notifications , err := database . InsertVulnerabilities ( vulnerabilities )
if err != nil {
log . Errorf ( "an error occured when inserting vulnerabilities for update: %s" , err )
return
}
vulnerabilities = nil
// Insert notifications into the database.
err = database . InsertNotifications ( notifications , database . GetDefaultNotificationWrapper ( ) )
if err != nil {
log . Errorf ( "an error occured when inserting notifications for update: %s" , err )
return
}
notifications = nil
// Update flags and notes.
for flagName , flagValue := range flags {
database . UpdateFlag ( flagName , flagValue )
}
database . UpdateFlag ( notesFlagName , notes )
// Update last successful update if every fetchers worked properly.
if status {
database . UpdateFlag ( flagName , strconv . FormatInt ( time . Now ( ) . UTC ( ) . Unix ( ) , 10 ) )
}
log . Info ( "update finished" )
}
// fetch get data from the registered fetchers, in parallel.
func fetch ( ) ( status bool , responses [ ] * FetcherResponse ) {
// Fetch updates in parallel.
var status = true
status = true
var responseC = make ( chan * FetcherResponse , 0 )
for n , f := range fetchers {
go func ( name string , fetcher Fetcher ) {
@ -155,129 +197,122 @@ func Update() {
}
// Collect results of updates.
var responses [ ] * FetcherResponse
var notes [ ] string
for i := 0 ; i < len ( fetchers ) ; {
select {
case resp := <- responseC :
if resp != nil {
responses = append ( responses , resp )
notes = append ( notes , resp . Notes ... )
}
i ++
for i := 0 ; i < len ( fetchers ) ; i ++ {
resp := <- responseC
if resp != nil {
responses = append ( responses , resp )
}
}
close ( responseC )
return
}
// TODO(Quentin-M): Merge responses together
// TODO(Quentin-M): Complete informations using NVD
// merge put all the responses together (vulnerabilities, packages, flags, notes), ensure the
// uniqueness of vulnerabilities and packages and verify that every vulnerability's fixedInNodes
// have their corresponding package definition.
func mergeAndVerify ( responses [ ] * FetcherResponse ) ( svulnerabilities [ ] * database . Vulnerability , spackages [ ] * database . Package , flags map [ string ] string , snotes string , err error ) {
vulnerabilities := make ( map [ string ] * database . Vulnerability )
packages := make ( map [ string ] * database . Package )
flags = make ( map [ string ] string )
var notes [ ] string
// Store flags out of the response struct.
flags := make ( map [ string ] string )
// Merge responses.
for _ , response := range responses {
// Notes
notes = append ( notes , response . Notes ... )
// Flags
if response . FlagName != "" && response . FlagValue != "" {
flags [ response . FlagName ] = response . FlagValue
}
}
// Update health notes.
healthNotes = notes
// Build list of packages.
var packages [ ] * database . Package
for _ , response := range responses {
// Packages
for _ , p := range response . Packages {
node := p . GetNode ( )
if _ , ok := packages [ node ] ; ! ok {
packages [ node ] = p
}
}
// Vulnerabilities
for _ , v := range response . Vulnerabilities {
packages = append ( packages , v . FixedIn ... )
if vulnerability , ok := vulnerabilities [ v . ID ] ; ! ok {
vulnerabilities [ v . ID ] = v
} else {
mergeVulnerability ( vulnerability , v )
}
}
}
// Insert packages into the database.
log . Tracef ( "beginning insertion of %d packages for update" , len ( packages ) )
t := time . Now ( )
err := database . InsertPackages ( packages )
log . Tracef ( "inserting %d packages took %v" , len ( packages ) , time . Since ( t ) )
if err != nil {
log . Errorf ( "an error occured when inserting packages for update: %s" , err )
updateHealth ( false )
return
}
packages = nil
// Build a list of vulnerabilties.
var vulnerabilities [ ] * database . Vulnerability
for _ , response := range responses {
for _ , v := range response . Vulnerabilities {
var packageNodes [ ] string
for _ , pkg := range v . FixedIn {
packageNodes = append ( packageNodes , pkg . Node )
// Verify that the packages used in the vulnerabilities are specified.
for _ , v := range vulnerabilities {
for _ , node := range v . FixedInNodes {
if _ , ok := packages [ node ] ; ! ok {
err = fmt . Errorf ( "vulnerability %s is fixed by an unspecified package" , v . ID )
return
}
vulnerabilities = append ( vulnerabilities , & database . Vulnerability { ID : v . ID , Link : v . Link , Priority : v . Priority , Description : v . Description , FixedInNodes : packageNodes } )
}
}
responses = nil
// Insert vulnerabilities into the database.
log . Tracef ( "beginning insertion of %d vulnerabilities for update" , len ( vulnerabilities ) )
t = time . Now ( )
notifications , err := database . InsertVulnerabilities ( vulnerabilities )
log . Tracef ( "inserting %d vulnerabilities took %v" , len ( vulnerabilities ) , time . Since ( t ) )
if err != nil {
log . Errorf ( "an error occured when inserting vulnerabilities for update: %s" , err )
updateHealth ( false )
return
// Convert data and return
for _ , v := range vulnerabilities {
svulnerabilities = append ( svulnerabilities , v )
}
vulnerabilities = nil
// Insert notifications into the database.
err = database . InsertNotifications ( notifications , database . GetDefaultNotificationWrapper ( ) )
if err != nil {
log . Errorf ( "an error occured when inserting notifications for update: %s" , err )
updateHealth ( false )
return
for _ , p := range packages {
spackages = append ( spackages , p )
}
notifications = nil
// Update flags in the database.
for flagName , flagValue := range flags {
database . UpdateFlag ( flagName , flagValue )
}
bnotes , _ := json . Marshal ( notes )
snotes = string ( bnotes )
// Update health depending on the status of the fetchers.
updateHealth ( status )
if status {
now := time . Now ( ) . UTC ( )
database . UpdateFlag ( flagName , strconv . FormatInt ( now . Unix ( ) , 10 ) )
healthLatestSuccessfulUpdate = now
}
log . Info ( "update finished" )
return
}
func updateHealth ( s bool ) {
if s == false {
healthConsecutiveLocalFailures ++
} else {
healthConsecutiveLocalFailures = 0
// mergeVulnerability updates the target vulnerability structure using the specified one.
func mergeVulnerability ( target , source * database . Vulnerability ) {
if source . Link != "" {
target . Link = source . Link
}
if source . Description != "" {
target . Description = source . Description
}
if source . Priority . Compare ( target . Priority ) > 0 {
target . Priority = source . Priority
}
for _ , node := range source . FixedInNodes {
if ! utils . Contains ( node , target . FixedInNodes ) {
target . FixedInNodes = append ( target . FixedInNodes , node )
}
}
}
// Healthcheck returns the health of the updater service.
func Healthcheck ( ) health . Status {
notes := getNotes ( )
return health . Status {
IsEssential : false ,
IsHealthy : healthConsecutiveLocalFailures < healthMaxConsecutiveLocalFailures ,
IsHealthy : len ( notes ) == 0 ,
Details : struct {
HealthIdentifier string
HealthLockOwner string
LatestSuccessfulUpdate time . Time
ConsecutiveLocalFailures int
Notes [ ] string ` json:",omitempty" `
LatestSuccessfulUpdate time . Time
Notes [ ] string ` json:",omitempty" `
} {
HealthIdentifier : healthIdentifier ,
HealthLockOwner : healthLockOwner ,
LatestSuccessfulUpdate : healthLatestSuccessfulUpdate ,
ConsecutiveLocalFailures : healthConsecutiveLocalFailures ,
Notes : healthNotes ,
LatestSuccessfulUpdate : getLastUpdate ( ) ,
Notes : notes ,
} ,
}
}
func getLastUpdate ( ) time . Time {
if lastUpdateTSS , err := database . GetFlagValue ( flagName ) ; err == nil && lastUpdateTSS != "" {
if lastUpdateTS , err := strconv . ParseInt ( lastUpdateTSS , 10 , 64 ) ; err == nil {
return time . Unix ( lastUpdateTS , 0 ) . UTC ( )
}
}
return time . Time { }
}
func getNotes ( ) ( notes [ ] string ) {
if jsonNotes , err := database . GetFlagValue ( notesFlagName ) ; err == nil && jsonNotes != "" {
json . Unmarshal ( [ ] byte ( jsonNotes ) , notes )
}
return
}