From ad0531acc7614cf6fa68d9ce7c66ff293832dfcf Mon Sep 17 00:00:00 2001 From: Quentin Machu Date: Fri, 22 Jan 2016 15:59:46 -0500 Subject: [PATCH] notifier/database: refactor notification system and add initial Prometheus support --- database/database.go | 5 +- database/models.go | 37 +++++---- database/pgsql/feature.go | 4 + .../migrations/20151222113213_Initial.sql | 15 ++-- database/pgsql/namespace.go | 2 + database/pgsql/notification.go | 83 ++++++++++--------- database/pgsql/pgsql.go | 39 +++++++-- database/pgsql/queries.go | 23 +++-- database/pgsql/vulnerability.go | 2 +- notifier/notifier.go | 65 ++++++++------- notifier/notifiers/webhook.go | 7 +- 11 files changed, 157 insertions(+), 125 deletions(-) diff --git a/database/database.go b/database/database.go index 46a0e5e8..38173099 100644 --- a/database/database.go +++ b/database/database.go @@ -48,9 +48,8 @@ type Datastore interface { DeleteVulnerability(namespaceName, name string) error // Notifications - CountAvailableNotifications() (int, error) - GetAvailableNotification(renotifyInterval time.Duration) (string, error) - GetNotification(name string, limit, page int) (string, interface{}, error) + GetAvailableNotification(renotifyInterval time.Duration) (VulnerabilityNotification, error) // Does not fill old/new Vulnerabilities. + GetNotification(name string, limit, page int) (VulnerabilityNotification, error) SetNotificationNotified(name string) error DeleteNotification(name string) error diff --git a/database/models.go b/database/models.go index 2b033756..300d9f95 100644 --- a/database/models.go +++ b/database/models.go @@ -14,7 +14,11 @@ package database -import "github.com/coreos/clair/utils/types" +import ( + "time" + + "github.com/coreos/clair/utils/types" +) // ID is only meant to be used by database implementations and should never be used for anything else. type Model struct { @@ -25,10 +29,10 @@ type Layer struct { Model Name string - EngineVersion int `json:",omitempty"` - Parent *Layer `json:",omitempty"` - Namespace *Namespace `json:",omitempty"` - Features []FeatureVersion `json:",omitempty"` + EngineVersion int + Parent *Layer + Namespace *Namespace + Features []FeatureVersion } type Namespace struct { @@ -42,7 +46,6 @@ type Feature struct { Name string Namespace Namespace - // FixedBy map[types.Version]Vulnerability // <<-- WRONG. } type FeatureVersion struct { @@ -50,7 +53,7 @@ type FeatureVersion struct { Feature Feature Version types.Version - AffectedBy []Vulnerability `json:",omitempty"` + AffectedBy []Vulnerability } type Vulnerability struct { @@ -62,21 +65,21 @@ type Vulnerability struct { Link string Severity types.Priority - FixedIn []FeatureVersion `json:",omitempty"` - //Affects []FeatureVersion + FixedIn []FeatureVersion + LayersIntroducingVulnerability []Layer // For output purposes. Only make sense when the vulnerability // is already about a specific Feature/FeatureVersion. FixedBy types.Version `json:",omitempty"` } -type NewVulnerabilityNotification struct { - VulnerabilityID int -} +type VulnerabilityNotification struct { + Name string -type NewVulnerabilityNotificationPage struct { - Vulnerability Vulnerability - Layers []Layer -} + Created time.Time + Notified time.Time + Deleted time.Time -// ... + OldVulnerability *Vulnerability + NewVulnerability Vulnerability +} diff --git a/database/pgsql/feature.go b/database/pgsql/feature.go index f8a08ba4..84c2cc10 100644 --- a/database/pgsql/feature.go +++ b/database/pgsql/feature.go @@ -28,8 +28,10 @@ func (pgSQL *pgSQL) insertFeature(feature database.Feature) (int, error) { } if pgSQL.cache != nil { + promCacheQueriesTotal.WithLabelValues("feature").Inc() id, found := pgSQL.cache.Get("feature:" + feature.Namespace.Name + ":" + feature.Name) if found { + promCacheHitsTotal.WithLabelValues("feature").Inc() return id.(int), nil } } @@ -60,9 +62,11 @@ func (pgSQL *pgSQL) insertFeatureVersion(featureVersion database.FeatureVersion) } if pgSQL.cache != nil { + promCacheQueriesTotal.WithLabelValues("featureversion").Inc() id, found := pgSQL.cache.Get("featureversion:" + featureVersion.Feature.Namespace.Name + ":" + featureVersion.Feature.Name + ":" + featureVersion.Version.String()) if found { + promCacheHitsTotal.WithLabelValues("featureversion").Inc() return id.(int), nil } } diff --git a/database/pgsql/migrations/20151222113213_Initial.sql b/database/pgsql/migrations/20151222113213_Initial.sql index a9f4032a..86a891cc 100644 --- a/database/pgsql/migrations/20151222113213_Initial.sql +++ b/database/pgsql/migrations/20151222113213_Initial.sql @@ -143,17 +143,18 @@ CREATE INDEX ON Lock (owner); -- ----------------------------------------------------- --- Table Notification +-- Table VulnerabilityNotification -- ----------------------------------------------------- -CREATE TABLE IF NOT EXISTS Notification ( +CREATE TABLE IF NOT EXISTS Vulnerability_Notification ( id SERIAL PRIMARY KEY, name VARCHAR(64) NOT NULL UNIQUE, - kind VARCHAR(64) NOT NULL, + created_at TIMESTAMP WITH TIME ZONE, notified_at TIMESTAMP WITH TIME ZONE NULL, deleted_at TIMESTAMP WITH TIME ZONE NULL, - data TEXT); + old_vulnerability TEXT, + new_vulnerability TEXT); -CREATE INDEX ON Notification (notified_at, deleted_at); +CREATE INDEX ON Vulnerability_Notification (notified_at); -- +goose Down @@ -165,7 +166,7 @@ DROP TABLE IF EXISTS Namespace, Vulnerability, Vulnerability_FixedIn_Feature, Vulnerability_Affects_FeatureVersion, + Vulnerability_Notification, KeyValue, - Lock, - Notification + Lock CASCADE; diff --git a/database/pgsql/namespace.go b/database/pgsql/namespace.go index f807efba..5d7b406e 100644 --- a/database/pgsql/namespace.go +++ b/database/pgsql/namespace.go @@ -25,7 +25,9 @@ func (pgSQL *pgSQL) insertNamespace(namespace database.Namespace) (int, error) { } if pgSQL.cache != nil { + promCacheQueriesTotal.WithLabelValues("namespace").Inc() if id, found := pgSQL.cache.Get("namespace:" + namespace.Name); found { + promCacheHitsTotal.WithLabelValues("namespace").Inc() return id.(int), nil } } diff --git a/database/pgsql/notification.go b/database/pgsql/notification.go index 847e8f97..ed98c17a 100644 --- a/database/pgsql/notification.go +++ b/database/pgsql/notification.go @@ -3,8 +3,6 @@ package pgsql import ( "database/sql" "encoding/json" - "fmt" - "reflect" "time" "github.com/coreos/clair/database" @@ -13,15 +11,22 @@ import ( ) // do it in tx so we won't insert/update a vuln without notification and vice-versa. -func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) error { - kind := reflect.Indirect(reflect.ValueOf(notification)).Type().String() - data, err := json.Marshal(notification) +// name and created doesn't matter. +func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification database.VulnerabilityNotification) error { + // Marshal old and new Vulnerabilities. + oldVulnerability, err := json.Marshal(notification.OldVulnerability) if err != nil { tx.Rollback() - return cerrors.NewBadRequestError("could not marshal notification in insertNotification") + return cerrors.NewBadRequestError("could not marshal old Vulnerability in insertNotification") + } + newVulnerability, err := json.Marshal(notification.NewVulnerability) + if err != nil { + tx.Rollback() + return cerrors.NewBadRequestError("could not marshal new Vulnerability in insertNotification") } - _, err = tx.Exec(getQuery("i_notification"), uuid.New(), kind, data) + // Insert Notification. + _, err = tx.Exec(getQuery("i_notification"), uuid.New(), oldVulnerability, newVulnerability) if err != nil { tx.Rollback() return handleError("i_notification", err) @@ -30,51 +35,47 @@ func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) err return nil } -func (pgSQL *pgSQL) CountAvailableNotifications() (int, error) { - var count int - err := pgSQL.QueryRow(getQuery("c_notification_available")).Scan(&count) - if err != nil { - return 0, handleError("c_notification_available", err) - } - - return count, nil -} - -// Get one available notification (!locked && !deleted && (!notified || notified_but_timed-out)). -func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (string, error) { +// Get one available notification name (!locked && !deleted && (!notified || notified_but_timed-out)). +// Does not fill new/old vuln. +func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (database.VulnerabilityNotification, error) { before := time.Now().Add(-renotifyInterval) - var name string - err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(&name) + var notification database.VulnerabilityNotification + err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(¬ification.Name, + ¬ification.Created, ¬ification.Notified, ¬ification.Deleted) if err != nil { - return "", handleError("s_notification_available", err) + return notification, handleError("s_notification_available", err) } - return name, nil + return notification, nil } -func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (string, interface{}, error) { - var kind, data string - err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(&kind, &data) +func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (database.VulnerabilityNotification, error) { + // Get Notification. + var notification database.VulnerabilityNotification + var oldVulnerability []byte + var newVulnerability []byte + + err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(¬ification.Name, + ¬ification.Created, ¬ification.Notified, ¬ification.Deleted, &newVulnerability, + &oldVulnerability) if err != nil { - return "", struct{}{}, handleError("s_notification", err) + return notification, handleError("s_notification", err) } - return constructNotification(kind, data, limit, page) -} - -func constructNotification(kind, data string, limit, page int) (string, interface{}, error) { - switch kind { - case "NotificationNewVulnerability": - var notificationPage database.NewVulnerabilityNotificationPage - - // TODO: Request database to fill NewVulnerabilityNotificationPage properly. - - return kind, notificationPage, nil - default: - msg := fmt.Sprintf("could not construct notification, '%s' is an unknown notification type", kind) - return "", struct{}{}, cerrors.NewBadRequestError(msg) + // Unmarshal old and new Vulnerabilities. + err = json.Unmarshal(oldVulnerability, notification.OldVulnerability) + if err != nil { + return notification, cerrors.NewBadRequestError("could not unmarshal old Vulnerability in GetNotification") } + err = json.Unmarshal(newVulnerability, ¬ification.NewVulnerability) + if err != nil { + return notification, cerrors.NewBadRequestError("could not unmarshal new Vulnerability in GetNotification") + } + + // TODO(Quentin-M): Fill LayersIntroducingVulnerability. + + return notification, nil } func (pgSQL *pgSQL) SetNotificationNotified(name string) error { diff --git a/database/pgsql/pgsql.go b/database/pgsql/pgsql.go index ca4bc5a8..22bef59a 100644 --- a/database/pgsql/pgsql.go +++ b/database/pgsql/pgsql.go @@ -31,9 +31,32 @@ import ( "github.com/hashicorp/golang-lru" "github.com/lib/pq" "github.com/pborman/uuid" + "github.com/prometheus/client_golang/prometheus" ) -var log = capnslog.NewPackageLogger("github.com/coreos/clair", "pgsql") +var ( + log = capnslog.NewPackageLogger("github.com/coreos/clair", "pgsql") + + promErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "clair_pgsql_errors_total", + Help: "Number of errors that PostgreSQL requests generates.", + }, []string{"request"}) + + promCacheHitsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "clair_pgsql_cache_hits_total", + Help: "Number of cache hits that the PostgreSQL backend does.", + }, []string{"object"}) + + promCacheQueriesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "clair_pgsql_cache_queries_total", + Help: "Number of cache queries that the PostgreSQL backend does.", + }, []string{"object"}) +) + +func init() { + prometheus.MustRegister(promCacheHitsTotal) + prometheus.MustRegister(promCacheQueriesTotal) +} type pgSQL struct { *sql.DB @@ -198,17 +221,17 @@ func OpenForTest(name string, withTestData bool) (*pgSQLTest, error) { // handleError logs an error with an extra description and masks the error if it's an SQL one. // This ensures we never return plain SQL errors and leak anything. func handleError(desc string, err error) error { - if _, ok := err.(*pq.Error); ok { - log.Errorf("%s: %v", desc, err) - return database.ErrBackendException - } else if err == sql.ErrNoRows { + if err == sql.ErrNoRows { return cerrors.ErrNotFound - } else if err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") { - log.Errorf("%s: %v", desc, err) - return database.ErrBackendException } log.Errorf("%s: %v", desc, err) + promErrorsTotal.WithLabelValues(desc).Inc() + + if _, o := err.(*pq.Error); o || err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") { + return database.ErrBackendException + } + return err } diff --git a/database/pgsql/queries.go b/database/pgsql/queries.go index 64b90969..7716b661 100644 --- a/database/pgsql/queries.go +++ b/database/pgsql/queries.go @@ -186,27 +186,24 @@ func init() { AND name = $2` // notification.go - queries["i_notification"] = `INSERT INTO Notification(name, kind, data) VALUES($1, $2, $3)` + queries["i_notification"] = ` + INSERT INTO Vulnerability_Notification(name, created_at, old_vulnerability, new_vulnerability) + VALUES($1, CURRENT_TIMESTAMP, $2, $3)` - queries["r_notification"] = `UPDATE Notification SET deleted_at = CURRENT_TIMESTAMP` - - queries["c_notification_available"] = ` - SELECT COUNT(name) FROM Notification - FROM Notification - WHERE notified_at = NULL - AND name NOT IN (SELECT name FROM Lock) - ORDER BY Random() - LIMIT 1` + queries["r_notification"] = `UPDATE Vulnerability_Notification SET deleted_at = CURRENT_TIMESTAMP` queries["s_notification_available"] = ` - SELECT name FROM Notification - FROM Notification + SELECT name, created_at, notified_at, deleted_at + FROM Vulnerability_Notification WHERE notified_at = NULL OR notified_at < $1 AND name NOT IN (SELECT name FROM Lock) ORDER BY Random() LIMIT 1` - queries["s_notification"] = `SELECT data FROM Notification WHERE name = $1` + queries["s_notification"] = ` + SELECT name, created_at, notified_at, deleted_at, old_vulnerability, new_vulnerability + FROM Vulnerability_Notification + WHERE name = $1` // complex_test.go queries["s_complextest_featureversion_affects"] = ` diff --git a/database/pgsql/vulnerability.go b/database/pgsql/vulnerability.go index 52bbcabe..ee3b54d0 100644 --- a/database/pgsql/vulnerability.go +++ b/database/pgsql/vulnerability.go @@ -185,7 +185,7 @@ func (pgSQL *pgSQL) insertVulnerability(vulnerability database.Vulnerability) er } // Create NewVulnerabilityNotification. - notification := database.NewVulnerabilityNotification{VulnerabilityID: vulnerability.ID} + notification := database.VulnerabilityNotification{NewVulnerability: vulnerability} if err := pgSQL.insertNotification(tx, notification); err != nil { return err } diff --git a/notifier/notifier.go b/notifier/notifier.go index 711eca28..0590198d 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package notifier fetches notifications from the database and sends their names -// to the specified remote handler, inviting the third party to actively query the API about it. +// Package notifier fetches notifications from the database and informs the specified remote handler +// about their existences, inviting the third party to actively query the API about it. package notifier import ( @@ -22,16 +22,14 @@ import ( "github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/timeutil" "github.com/pborman/uuid" + "github.com/prometheus/client_golang/prometheus" "github.com/coreos/clair/config" "github.com/coreos/clair/database" - "github.com/coreos/clair/health" "github.com/coreos/clair/utils" cerrors "github.com/coreos/clair/utils/errors" ) -var log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier") - const ( checkInterval = 5 * time.Minute refreshLockDuration = time.Minute * 2 @@ -39,21 +37,24 @@ const ( maxBackOff = 15 * time.Minute ) -// A Notification represents the structure of the notifications that are sent by a Notifier. -type Notification struct { - Name, Type string - Content interface{} -} +var ( + log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier") -var notifiers = make(map[string]Notifier) + notifiers = make(map[string]Notifier) + + promNotifierLatencySeconds = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "clair_notifier_latency_seconds", + Help: "Time it takes to send a notification after it's been created.", + }) +) // Notifier represents anything that can transmit notifications. type Notifier interface { // Configure attempts to initialize the notifier with the provided configuration. // It returns whether the notifier is enabled or not. Configure(*config.NotifierConfig) (bool, error) - // Send transmits the specified notification name. - Send(notificationName string) error + // Send informs the existence of the specified notification. + Send(notification database.VulnerabilityNotification) error } // RegisterNotifier makes a Fetcher available by the provided name. @@ -102,9 +103,8 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u for running := true; running; { // Find task. - // TODO(Quentin-M): Combine node and notification. - notificationName := findTask(datastore, config.RenotifyInterval, whoAmI, stopper) - if notificationName == "" { + notification := findTask(datastore, config.RenotifyInterval, whoAmI, stopper) + if notification == nil { // Interrupted while finding a task, Clair is stopping. break } @@ -112,14 +112,15 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u // Handle task. done := make(chan bool, 1) go func() { - success, interrupted := handleTask(notificationName, stopper, config.Attempts) + success, interrupted := handleTask(*notification, stopper, config.Attempts) if success { - datastore.SetNotificationNotified(notificationName) + promNotifierLatencySeconds.Set(float64(time.Now().Sub(notification.Created))) + datastore.SetNotificationNotified(notification.Name) } if interrupted { running = false } - datastore.Unlock(notificationName, whoAmI) + datastore.Unlock(notification.Name, whoAmI) done <- true }() @@ -130,7 +131,7 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u case <-done: break outer case <-time.After(refreshLockDuration): - datastore.Lock(notificationName, whoAmI, lockDuration, true) + datastore.Lock(notification.Name, whoAmI, lockDuration, true) } } } @@ -138,10 +139,10 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u log.Info("notifier service stopped") } -func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoAmI string, stopper *utils.Stopper) string { +func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoAmI string, stopper *utils.Stopper) *database.VulnerabilityNotification { for { // Find a notification to send. - notificationName, err := datastore.GetAvailableNotification(renotifyInterval) + notification, err := datastore.GetAvailableNotification(renotifyInterval) if err != nil { // There is no notification or an error occured. if err != cerrors.ErrNotFound { @@ -150,21 +151,21 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA // Wait. if !stopper.Sleep(checkInterval) { - return "" + return nil } continue } // Lock the notification. - if hasLock, _ := datastore.Lock(notificationName, whoAmI, lockDuration, false); hasLock { - log.Infof("found and locked a notification: %s", notificationName) - return notificationName + if hasLock, _ := datastore.Lock(notification.Name, whoAmI, lockDuration, false); hasLock { + log.Infof("found and locked a notification: %s", notification.Name) + return ¬ification } } } -func handleTask(notificationName string, st *utils.Stopper, maxAttempts int) (bool, bool) { +func handleTask(notification database.VulnerabilityNotification, st *utils.Stopper, maxAttempts int) (bool, bool) { // Send notification. for notifierName, notifier := range notifiers { var attempts int @@ -172,22 +173,22 @@ func handleTask(notificationName string, st *utils.Stopper, maxAttempts int) (bo for { // Max attempts exceeded. if attempts >= maxAttempts { - log.Infof("giving up on sending notification '%s' to notifier '%s': max attempts exceeded (%d)\n", notificationName, notifierName, maxAttempts) + log.Infof("giving up on sending notification '%s' to notifier '%s': max attempts exceeded (%d)\n", notification.Name, notifierName, maxAttempts) return false, false } // Backoff. if backOff > 0 { - log.Infof("waiting %v before retrying to send notification '%s' to notifier '%s' (Attempt %d / %d)\n", backOff, notificationName, notifierName, attempts+1, maxAttempts) + log.Infof("waiting %v before retrying to send notification '%s' to notifier '%s' (Attempt %d / %d)\n", backOff, notification.Name, notifierName, attempts+1, maxAttempts) if !st.Sleep(backOff) { return false, true } } // Send using the current notifier. - if err := notifier.Send(notificationName); err != nil { + if err := notifier.Send(notification); err != nil { // Send failed; increase attempts/backoff and retry. - log.Errorf("could not send notification '%s' to notifier '%s': %s", notificationName, notifierName, err) + log.Errorf("could not send notification '%s' to notifier '%s': %s", notification.Name, notifierName, err) backOff = timeutil.ExpBackoff(backOff, maxBackOff) attempts++ } @@ -197,6 +198,6 @@ func handleTask(notificationName string, st *utils.Stopper, maxAttempts int) (bo } } - log.Infof("successfully sent notification '%s'\n", notificationName) + log.Infof("successfully sent notification '%s'\n", notification.Name) return true, false } diff --git a/notifier/notifiers/webhook.go b/notifier/notifiers/webhook.go index f9bc56fc..cc7d15ef 100644 --- a/notifier/notifiers/webhook.go +++ b/notifier/notifiers/webhook.go @@ -29,6 +29,7 @@ import ( "gopkg.in/yaml.v2" "github.com/coreos/clair/config" + "github.com/coreos/clair/database" "github.com/coreos/clair/notifier" ) @@ -48,7 +49,7 @@ type WebhookNotifierConfiguration struct { } func init() { - //notifier.RegisterNotifier("webhook", &WebhookNotifier{}) + notifier.RegisterNotifier("webhook", &WebhookNotifier{}) } func (h *WebhookNotifier) Configure(config *config.NotifierConfig) (bool, error) { @@ -92,9 +93,9 @@ func (h *WebhookNotifier) Configure(config *config.NotifierConfig) (bool, error) return true, nil } -func (h *WebhookNotifier) Send(notification *notifier.Notification) error { +func (h *WebhookNotifier) Send(notification database.VulnerabilityNotification) error { // Marshal notification. - jsonNotification, err := json.Marshal(notification) + jsonNotification, err := json.Marshal(notification.Name) if err != nil { return fmt.Errorf("could not marshal: %s", err) }