@ -18,7 +18,6 @@ package updater
import (
"encoding/json"
"fmt"
"math/rand"
"strconv"
"time"
@ -32,16 +31,18 @@ import (
)
const (
flagName = "updater"
notesFlagName = "updater/notes"
refreshLockDuration = time . Minute * 8
flagName = "updater/last"
notesFlagName = "updater/notes"
lockName = "updater"
lockDuration = refreshLockDuration + time . Minute * 2
refreshLockDuration = time . Minute * 8
)
var log = capnslog . NewPackageLogger ( "github.com/coreos/clair" , "updater" )
// Run updates the vulnerability database at regular intervals.
func Run ( config * config . UpdaterConfig , st * utils . Stopper ) {
func Run ( config * config . UpdaterConfig , datastore database . Datastore , st * utils . Stopper ) {
defer st . End ( )
// Do not run the updater if there is no config or if the interval is 0.
@ -62,7 +63,7 @@ func Run(config *config.UpdaterConfig, st *utils.Stopper) {
// occurs.
var nextUpdate time . Time
var stop bool
if lastUpdate := getLastUpdate ( ) ; ! lastUpdate . IsZero ( ) {
if lastUpdate := getLastUpdate ( datastore ) ; ! lastUpdate . IsZero ( ) {
nextUpdate = lastUpdate . Add ( config . Interval )
} else {
nextUpdate = time . Now ( ) . UTC ( )
@ -72,12 +73,12 @@ func Run(config *config.UpdaterConfig, st *utils.Stopper) {
if nextUpdate . Before ( time . Now ( ) . UTC ( ) ) {
// Attempt to get a lock on the the update.
log . Debug ( "attempting to obtain update lock" )
hasLock , hasLockUntil := data base. Lock ( flagName , lockDuration , whoAmI )
hasLock , hasLockUntil := data store. Lock ( lockName , whoAmI , lockDuration , false )
if hasLock {
// Launch update in a new go routine.
doneC := make ( chan bool , 1 )
go func ( ) {
Update ( )
Update ( datastore )
doneC <- true
} ( )
@ -87,21 +88,21 @@ func Run(config *config.UpdaterConfig, st *utils.Stopper) {
done = true
case <- time . After ( refreshLockDuration ) :
// Refresh the lock until the update is done.
data base. Lock ( flagName , lockDuration , whoAmI )
data store. Lock ( lockName , whoAmI , lockDuration , true )
case <- st . Chan ( ) :
stop = true
}
}
// Unlock the update.
data base. Unlock ( flag Name, whoAmI )
data store. Unlock ( lock Name, whoAmI )
if stop {
break
}
continue
} else {
lockOwner , lockExpiration , err := data base. FindLock ( flag Name)
lockOwner , lockExpiration , err := data store. FindLock ( lock Name)
if err != nil {
log . Debug ( "update lock is already taken" )
nextUpdate = hasLockUntil
@ -128,69 +129,50 @@ func Run(config *config.UpdaterConfig, st *utils.Stopper) {
// Update fetches all the vulnerabilities from the registered fetchers, upserts
// them into the database and then sends notifications.
func Update ( ) {
func Update ( datastore database . Datastore ) {
log . Info ( "updating vulnerabilities" )
// Fetch updates.
status , responses := fetch ( )
// Merge responses.
vulnerabilities , packages , flags , notes , err := mergeAndVerify ( responses )
if err != nil {
log . Errorf ( "an error occured when merging update responses: %s" , err )
return
}
responses = nil
status , vulnerabilities , flags , notes := fetch ( datastore )
// TODO(Quentin-M): Complete informations using NVD
// Insert packages.
log . Tracef ( "beginning insertion of %d packages for update" , len ( packages ) )
err = database . InsertPackages ( packages )
if err != nil {
log . Errorf ( "an error occured when inserting packages for update: %s" , err )
return
}
packages = nil
// Insert vulnerabilities.
log . Tracef ( "beginning insertion of %d vulnerabilities for update" , len ( vulnerabilities ) )
notifications, err := data ba se. InsertVulnerabilities ( vulnerabilities )
err := datastore . InsertVulnerabilities ( vulnerabilities )
if err != nil {
log . Errorf ( "an error occured when inserting vulnerabilities for update: %s" , err )
return
}
vulnerabilities = nil
// Insert notifications into the database.
err = database . InsertNotifications ( notifications , database . GetDefaultNotificationWrapper ( ) )
if err != nil {
log . Errorf ( "an error occured when inserting notifications for update: %s" , err )
return
}
notifications = nil
// Update flags and notes.
for flagName , flagValue := range flags {
data base. UpdateFlag ( flagName , flagValue )
datastore . InsertKeyValue ( flagName , flagValue )
}
database . UpdateFlag ( notesFlagName , notes )
bnotes , _ := json . Marshal ( notes )
datastore . InsertKeyValue ( notesFlagName , string ( bnotes ) )
// Update last successful update if every fetchers worked properly.
if status {
data base. UpdateFlag ( flagName , strconv . FormatInt ( time . Now ( ) . UTC ( ) . Unix ( ) , 10 ) )
data store. InsertKeyValue ( flagName , strconv . FormatInt ( time . Now ( ) . UTC ( ) . Unix ( ) , 10 ) )
}
log . Info ( "update finished" )
}
// fetch get data from the registered fetchers, in parallel.
func fetch ( ) ( status bool , responses [ ] * FetcherResponse ) {
func fetch ( datastore database . Datastore ) ( bool , [ ] database . Vulnerability , map [ string ] string , [ ] string ) {
var vulnerabilities [ ] database . Vulnerability
var notes [ ] string
status := true
flags := make ( map [ string ] string )
// Fetch updates in parallel.
status = true
var responseC = make ( chan * FetcherResponse , 0 )
for n , f := range fetchers {
go func ( name string , fetcher Fetcher ) {
response , err := fetcher . FetchUpdate ( )
response , err := fetcher . FetchUpdate ( datastore )
if err != nil {
log . Errorf ( "an error occured when fetching update '%s': %s." , name , err )
status = false
@ -206,93 +188,21 @@ func fetch() (status bool, responses []*FetcherResponse) {
for i := 0 ; i < len ( fetchers ) ; i ++ {
resp := <- responseC
if resp != nil {
responses = append ( responses , resp )
}
}
close ( responseC )
return
}
// merge put all the responses together (vulnerabilities, packages, flags, notes), ensure the
// uniqueness of vulnerabilities and packages and verify that every vulnerability's fixedInNodes
// have their corresponding package definition.
func mergeAndVerify ( responses [ ] * FetcherResponse ) ( svulnerabilities [ ] * database . Vulnerability , spackages [ ] * database . Package , flags map [ string ] string , snotes string , err error ) {
vulnerabilities := make ( map [ string ] * database . Vulnerability )
packages := make ( map [ string ] * database . Package )
flags = make ( map [ string ] string )
var notes [ ] string
// Merge responses.
for _ , response := range responses {
// Notes
notes = append ( notes , response . Notes ... )
// Flags
if response . FlagName != "" && response . FlagValue != "" {
flags [ response . FlagName ] = response . FlagValue
}
// Packages
for _ , p := range response . Packages {
node := p . GetNode ( )
if _ , ok := packages [ node ] ; ! ok {
packages [ node ] = p
}
}
// Vulnerabilities
for _ , v := range response . Vulnerabilities {
if vulnerability , ok := vulnerabilities [ v . ID ] ; ! ok {
vulnerabilities [ v . ID ] = v
} else {
mergeVulnerability ( vulnerability , v )
vulnerabilities = append ( vulnerabilities , resp . Vulnerabilities ... )
notes = append ( notes , resp . Notes ... )
if resp . FlagName != "" && resp . FlagValue != "" {
flags [ resp . FlagName ] = resp . FlagValue
}
}
}
// Verify that the packages used in the vulnerabilities are specified.
for _ , v := range vulnerabilities {
for _ , node := range v . FixedInNodes {
if _ , ok := packages [ node ] ; ! ok {
err = fmt . Errorf ( "vulnerability %s is fixed by an unspecified package" , v . ID )
return
}
}
}
// Convert data and return
for _ , v := range vulnerabilities {
svulnerabilities = append ( svulnerabilities , v )
}
for _ , p := range packages {
spackages = append ( spackages , p )
}
bnotes , _ := json . Marshal ( notes )
snotes = string ( bnotes )
return
}
// mergeVulnerability updates the target vulnerability structure using the specified one.
func mergeVulnerability ( target , source * database . Vulnerability ) {
if source . Link != "" {
target . Link = source . Link
}
if source . Description != "" {
target . Description = source . Description
}
if source . Priority . Compare ( target . Priority ) > 0 {
target . Priority = source . Priority
}
for _ , node := range source . FixedInNodes {
if ! utils . Contains ( node , target . FixedInNodes ) {
target . FixedInNodes = append ( target . FixedInNodes , node )
}
}
close ( responseC )
return status , vulnerabilities , flags , notes
}
// Healthcheck returns the health of the updater service.
func Healthcheck ( ) health . Status {
notes := getNotes ( )
func Healthcheck ( datastore database . Datastore ) health . Status {
notes := getNotes ( datastore )
return health . Status {
IsEssential : false ,
@ -301,14 +211,14 @@ func Healthcheck() health.Status {
LatestSuccessfulUpdate time . Time
Notes [ ] string ` json:",omitempty" `
} {
LatestSuccessfulUpdate : getLastUpdate ( ) ,
LatestSuccessfulUpdate : getLastUpdate ( datastore ) ,
Notes : notes ,
} ,
}
}
func getLastUpdate ( ) time . Time {
if lastUpdateTSS , err := data base. GetFlag Value( flagName ) ; err == nil && lastUpdateTSS != "" {
func getLastUpdate ( datastore database . Datastore ) time . Time {
if lastUpdateTSS , err := data store. GetKey Value( flagName ) ; err == nil && lastUpdateTSS != "" {
if lastUpdateTS , err := strconv . ParseInt ( lastUpdateTSS , 10 , 64 ) ; err == nil {
return time . Unix ( lastUpdateTS , 0 ) . UTC ( )
}
@ -316,8 +226,8 @@ func getLastUpdate() time.Time {
return time . Time { }
}
func getNotes ( ) ( notes [ ] string ) {
if jsonNotes , err := data base. GetFlag Value( notesFlagName ) ; err == nil && jsonNotes != "" {
func getNotes ( datastore database . Datastore ) ( notes [ ] string ) {
if jsonNotes , err := data store. GetKey Value( notesFlagName ) ; err == nil && jsonNotes != "" {
json . Unmarshal ( [ ] byte ( jsonNotes ) , notes )
}
return