notifier/database: draft new notification system

pull/71/head
Quentin Machu 9 years ago committed by Jimmy Zelinskie
parent 5759af5bcf
commit c60d0054fa

@ -25,6 +25,7 @@ import (
"github.com/coreos/clair/api" "github.com/coreos/clair/api"
"github.com/coreos/clair/config" "github.com/coreos/clair/config"
"github.com/coreos/clair/database/pgsql" "github.com/coreos/clair/database/pgsql"
"github.com/coreos/clair/notifier"
"github.com/coreos/clair/updater" "github.com/coreos/clair/updater"
"github.com/coreos/clair/utils" "github.com/coreos/clair/utils"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
@ -46,8 +47,8 @@ func Boot(config *config.Config) {
defer db.Close() defer db.Close()
// Start notifier // Start notifier
// st.Begin() st.Begin()
// go notifier.Run(config.Notifier, st) go notifier.Run(config.Notifier, db, st)
// Start API // Start API
st.Begin() st.Begin()

@ -22,8 +22,11 @@ updater:
# Use 0 to disable the updater entirely. # Use 0 to disable the updater entirely.
interval: 2h interval: 2h
notifier: notifier:
# How many attempts will the notifier do when a notifier backend fails. # Number of attempts that the notifier does when a notification backend fails
# before it gives up temporarly and try to d
attempts: 3 attempts: 3
# After a notification has been sent
renotifyInterval: 2h
# Configuration for HTTP notifier # Configuration for HTTP notifier
http: http:
# Endpoint that will receive notifications with POST requests. # Endpoint that will receive notifications with POST requests.

@ -45,6 +45,7 @@ type UpdaterConfig struct {
// NotifierConfig is the configuration for the Notifier service and its registered notifiers. // NotifierConfig is the configuration for the Notifier service and its registered notifiers.
type NotifierConfig struct { type NotifierConfig struct {
Attempts int Attempts int
RenotifyInterval time.Duration
Params map[string]interface{} `yaml:",inline"` Params map[string]interface{} `yaml:",inline"`
} }
@ -71,6 +72,7 @@ var DefaultConfig = Config{
}, },
Notifier: &NotifierConfig{ Notifier: &NotifierConfig{
Attempts: 5, Attempts: 5,
RenotifyInterval: 2 * time.Hour,
}, },
} }

@ -48,10 +48,11 @@ type Datastore interface {
FindVulnerability(namespaceName, name string) (Vulnerability, error) FindVulnerability(namespaceName, name string) (Vulnerability, error)
// Notifications // Notifications
// InsertNotifications([]Notification) error CountAvailableNotifications() (int, error)
// FindNotificationToSend() (Notification, error) GetAvailableNotification(renotifyInterval time.Duration) (string, error)
// CountNotificationsToSend() (int, error) GetNotification(name string, limit, page int) (string, interface{}, error)
// MarkNotificationAsSent(id string) SetNotificationNotified(name string) error
DeleteNotification(name string) error
// Key/Value // Key/Value
InsertKeyValue(key, value string) error InsertKeyValue(key, value string) error

@ -69,3 +69,14 @@ type Vulnerability struct {
// is already about a specific Feature/FeatureVersion. // is already about a specific Feature/FeatureVersion.
FixedBy types.Version `json:",omitempty"` FixedBy types.Version `json:",omitempty"`
} }
type NewVulnerabilityNotification struct {
VulnerabilityID int
}
type NewVulnerabilityNotificationPage struct {
Vulnerability Vulnerability
Layers []Layer
}
// ...

@ -141,6 +141,20 @@ CREATE TABLE IF NOT EXISTS Lock (
CREATE INDEX ON Lock (owner); CREATE INDEX ON Lock (owner);
-- -----------------------------------------------------
-- Table Notification
-- -----------------------------------------------------
CREATE TABLE IF NOT EXISTS Notification (
id SERIAL PRIMARY KEY,
name VARCHAR(64) NOT NULL UNIQUE,
kind VARCHAR(64) NOT NULL,
notified_at TIMESTAMP WITH TIME ZONE NULL,
deleted_at TIMESTAMP WITH TIME ZONE NULL,
data TEXT);
CREATE INDEX ON Notification (notified_at, deleted_at);
-- +goose Down -- +goose Down
DROP TABLE IF EXISTS Namespace, DROP TABLE IF EXISTS Namespace,
@ -152,5 +166,6 @@ DROP TABLE IF EXISTS Namespace,
Vulnerability_FixedIn_Feature, Vulnerability_FixedIn_Feature,
Vulnerability_Affects_FeatureVersion, Vulnerability_Affects_FeatureVersion,
KeyValue, KeyValue,
Lock Lock,
Notification
CASCADE; CASCADE;

@ -0,0 +1,103 @@
package pgsql
import (
"database/sql"
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/coreos/clair/database"
cerrors "github.com/coreos/clair/utils/errors"
"github.com/pborman/uuid"
)
// do it in tx so we won't insert/update a vuln without notification and vice-versa.
func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) error {
kind := reflect.Indirect(reflect.ValueOf(notification)).Type()
data, err := json.Marshal(notification)
if err != nil {
tx.Rollback()
return cerrors.NewBadRequestError("could not marshal notification in insertNotification")
}
_, err = tx.Exec(getQuery("i_notification"), uuid.New(), kind, data)
if err != nil {
tx.Rollback()
return handleError("i_notification", err)
}
return nil
}
func (pgSQL *pgSQL) CountAvailableNotifications() (int, error) {
var count int
err := pgSQL.QueryRow(getQuery("c_notification_available")).Scan(&count)
if err != nil {
return 0, handleError("c_notification_available", err)
}
return count, nil
}
// Get one available notification (!locked && !deleted && (!notified || notified_but_timed-out)).
func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (string, error) {
before := time.Now().Add(-renotifyInterval)
var name string
err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(&name)
if err != nil {
return "", handleError("s_notification_available", err)
}
return name, nil
}
func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (string, interface{}, error) {
var kind, data string
err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(&kind, &data)
if err != nil {
return "", struct{}{}, handleError("s_notification", err)
}
return constructNotification(kind, data, limit, page)
}
func constructNotification(kind, data string, limit, page int) (string, interface{}, error) {
switch kind {
case "NotificationNewVulnerability":
var notificationPage database.NewVulnerabilityNotificationPage
// TODO: Request database to fill NewVulnerabilityNotificationPage properly.
return kind, notificationPage, nil
default:
msg := fmt.Sprintf("could not construct notification, '%s' is an unknown notification type", kind)
return "", struct{}{}, cerrors.NewBadRequestError(msg)
}
}
func (pgSQL *pgSQL) SetNotificationNotified(name string) error {
if _, err := pgSQL.Exec(getQuery("u_notification_notified"), name); err != nil {
return handleError("u_notification_notified", err)
}
return nil
}
func (pgSQL *pgSQL) DeleteNotification(name string) error {
result, err := pgSQL.Exec(getQuery("r_notification"), name)
if err != nil {
return handleError("r_notification", err)
}
affected, err := result.RowsAffected()
if err != nil {
return handleError("r_notification.RowsAffected()", err)
}
if affected <= 0 {
return cerrors.ErrNotFound
}
return nil
}

@ -180,6 +180,29 @@ func init() {
queries["f_featureversion_by_feature"] = ` queries["f_featureversion_by_feature"] = `
SELECT id, version FROM FeatureVersion WHERE feature_id = $1` SELECT id, version FROM FeatureVersion WHERE feature_id = $1`
// notification.go
queries["i_notification"] = `INSERT INTO Notification(name, kind, data) VALUES($1, $2, $3)`
queries["r_notification"] = `UPDATE Notification SET deleted_at = CURRENT_TIMESTAMP`
queries["c_notification_available"] = `
SELECT COUNT(name) FROM Notification
FROM Notification
WHERE notified_at = NULL
AND name NOT IN (SELECT name FROM Lock)
ORDER BY Random()
LIMIT 1`
queries["s_notification_available"] = `
SELECT name FROM Notification
FROM Notification
WHERE notified_at = NULL OR notified_at < $1
AND name NOT IN (SELECT name FROM Lock)
ORDER BY Random()
LIMIT 1`
queries["s_notification"] = `SELECT data FROM Notification WHERE name = $1`
// complex_test.go // complex_test.go
queries["s_complextest_featureversion_affects"] = ` queries["s_complextest_featureversion_affects"] = `
SELECT v.name SELECT v.name

@ -183,6 +183,12 @@ func (pgSQL *pgSQL) insertVulnerability(vulnerability database.Vulnerability) er
tx.Rollback() tx.Rollback()
return handleError("i_vulnerability", err) return handleError("i_vulnerability", err)
} }
// Create NewVulnerabilityNotification.
notification := database.NewVulnerabilityNotification{VulnerabilityID: vulnerability.ID}
if err := pgSQL.insertNotification(tx, notification); err != nil {
return err
}
} else { } else {
// Update vulnerability // Update vulnerability
if vulnerability.Description != existingVulnerability.Description || if vulnerability.Description != existingVulnerability.Description ||

@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Package notifier fetches notifications from the database and sends them // Package notifier fetches notifications from the database and sends their names
// to the specified remote handler. // to the specified remote handler, inviting the third party to actively query the API about it.
package notifier package notifier
import ( import (
@ -27,6 +27,7 @@ import (
"github.com/coreos/clair/database" "github.com/coreos/clair/database"
"github.com/coreos/clair/health" "github.com/coreos/clair/health"
"github.com/coreos/clair/utils" "github.com/coreos/clair/utils"
cerrors "github.com/coreos/clair/utils/errors"
) )
var log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier") var log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier")
@ -38,8 +39,6 @@ const (
maxBackOff = 15 * time.Minute maxBackOff = 15 * time.Minute
) )
// TODO(Quentin-M): Allow registering custom notification handlers.
// A Notification represents the structure of the notifications that are sent by a Notifier. // A Notification represents the structure of the notifications that are sent by a Notifier.
type Notification struct { type Notification struct {
Name, Type string Name, Type string
@ -53,8 +52,8 @@ type Notifier interface {
// Configure attempts to initialize the notifier with the provided configuration. // Configure attempts to initialize the notifier with the provided configuration.
// It returns whether the notifier is enabled or not. // It returns whether the notifier is enabled or not.
Configure(*config.NotifierConfig) (bool, error) Configure(*config.NotifierConfig) (bool, error)
// Send transmits the specified notification. // Send transmits the specified notification name.
Send(notification *Notification) error Send(notificationName string) error
} }
// RegisterNotifier makes a Fetcher available by the provided name. // RegisterNotifier makes a Fetcher available by the provided name.
@ -77,7 +76,7 @@ func RegisterNotifier(name string, n Notifier) {
} }
// Run starts the Notifier service. // Run starts the Notifier service.
func Run(config *config.NotifierConfig, stopper *utils.Stopper) { func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *utils.Stopper) {
defer stopper.End() defer stopper.End()
// Configure registered notifiers. // Configure registered notifiers.
@ -107,8 +106,8 @@ func Run(config *config.NotifierConfig, stopper *utils.Stopper) {
for running := true; running; { for running := true; running; {
// Find task. // Find task.
// TODO(Quentin-M): Combine node and notification. // TODO(Quentin-M): Combine node and notification.
node, notification := findTask(whoAmI, stopper) notificationName := findTask(datastore, config.RenotifyInterval, whoAmI, stopper)
if node == "" && notification == nil { if notificationName == "" {
// Interrupted while finding a task, Clair is stopping. // Interrupted while finding a task, Clair is stopping.
break break
} }
@ -116,14 +115,14 @@ func Run(config *config.NotifierConfig, stopper *utils.Stopper) {
// Handle task. // Handle task.
done := make(chan bool, 1) done := make(chan bool, 1)
go func() { go func() {
success, interrupted := handleTask(notification, stopper, config.Attempts) success, interrupted := handleTask(notificationName, stopper, config.Attempts)
if success { if success {
database.MarkNotificationAsSent(node) datastore.SetNotificationNotified(notificationName)
} }
if interrupted { if interrupted {
running = false running = false
} }
database.Unlock(node, whoAmI) datastore.Unlock(notificationName, whoAmI)
done <- true done <- true
}() }()
@ -134,7 +133,7 @@ func Run(config *config.NotifierConfig, stopper *utils.Stopper) {
case <-done: case <-done:
break outer break outer
case <-time.After(refreshLockDuration): case <-time.After(refreshLockDuration):
database.Lock(node, lockDuration, whoAmI) datastore.Lock(notificationName, whoAmI, lockDuration, true)
} }
} }
} }
@ -142,46 +141,33 @@ func Run(config *config.NotifierConfig, stopper *utils.Stopper) {
log.Info("notifier service stopped") log.Info("notifier service stopped")
} }
func findTask(whoAmI string, stopper *utils.Stopper) (string, database.Notification) { func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoAmI string, stopper *utils.Stopper) string {
for { for {
// Find a notification to send. // Find a notification to send.
node, notification, err := database.FindOneNotificationToSend(database.GetDefaultNotificationWrapper()) notificationName, err := datastore.GetAvailableNotification(renotifyInterval)
if err != nil { if err != nil {
log.Warningf("could not get notification to send: %s", err) // There is no notification or an error occured.
} if err != cerrors.ErrNotFound {
log.Warningf("could not get notification to send: %s", err)
}
// No notification or error: wait. // Wait.
if notification == nil || err != nil {
if !stopper.Sleep(checkInterval) { if !stopper.Sleep(checkInterval) {
return "", nil return ""
} }
continue continue
} }
// Lock the notification. // Lock the notification.
if hasLock, _ := database.Lock(node, lockDuration, whoAmI); hasLock { if hasLock, _ := datastore.Lock(notificationName, whoAmI, lockDuration, false); hasLock {
log.Infof("found and locked a notification: %s", notification.GetName()) log.Infof("found and locked a notification: %s", notificationName)
return node, notification return notificationName
} }
} }
} }
func handleTask(notification database.Notification, st *utils.Stopper, maxAttempts int) (bool, bool) { func handleTask(notificationName string, st *utils.Stopper, maxAttempts int) (bool, bool) {
// Get notification content.
// TODO(Quentin-M): Split big notifications.
notificationContent, err := notification.GetContent()
if err != nil {
log.Warningf("could not get content of notification '%s': %s", notification.GetName(), err)
return false, false
}
// Create notification.
payload := &Notification{
Name: notification.GetName(),
Type: notification.GetType(),
Content: notificationContent,
}
// Send notification. // Send notification.
for notifierName, notifier := range notifiers { for notifierName, notifier := range notifiers {
var attempts int var attempts int
@ -189,37 +175,37 @@ func handleTask(notification database.Notification, st *utils.Stopper, maxAttemp
for { for {
// Max attempts exceeded. // Max attempts exceeded.
if attempts >= maxAttempts { if attempts >= maxAttempts {
log.Infof("giving up on sending notification '%s' to notifier '%s': max attempts exceeded (%d)\n", notification.GetName(), notifierName, maxAttempts) log.Infof("giving up on sending notification '%s' to notifier '%s': max attempts exceeded (%d)\n", notificationName, notifierName, maxAttempts)
return false, false return false, false
} }
// Backoff. // Backoff.
if backOff > 0 { if backOff > 0 {
log.Infof("waiting %v before retrying to send notification '%s' to notifier '%s' (Attempt %d / %d)\n", backOff, notification.GetName(), notifierName, attempts+1, maxAttempts) log.Infof("waiting %v before retrying to send notification '%s' to notifier '%s' (Attempt %d / %d)\n", backOff, notificationName, notifierName, attempts+1, maxAttempts)
if !st.Sleep(backOff) { if !st.Sleep(backOff) {
return false, true return false, true
} }
} }
// Send using the current notifier. // Send using the current notifier.
if err := notifier.Send(payload); err == nil { if err := notifier.Send(notificationName); err != nil {
// Send has been successful. Go to the next one. // Send failed; increase attempts/backoff and retry.
break log.Errorf("could not send notification '%s' to notifier '%s': %s", notificationName, notifierName, err)
backOff = timeutil.ExpBackoff(backOff, maxBackOff)
attempts++
} }
// Send failed; increase attempts/backoff and retry. // Send has been successful. Go to the next notifier.
log.Errorf("could not send notification '%s' to notifier '%s': %s", notification.GetName(), notifierName, err) break
backOff = timeutil.ExpBackoff(backOff, maxBackOff)
attempts++
} }
} }
log.Infof("successfully sent notification '%s'\n", notification.GetName()) log.Infof("successfully sent notification '%s'\n", notificationName)
return true, false return true, false
} }
// Healthcheck returns the health of the notifier service. // Healthcheck returns the health of the notifier service.
func Healthcheck() health.Status { func Healthcheck(datastore database.Datastore) health.Status {
queueSize, err := database.CountNotificationsToSend() queueSize, err := datastore.CountAvailableNotifications()
return health.Status{IsEssential: false, IsHealthy: err == nil, Details: struct{ QueueSize int }{QueueSize: queueSize}} return health.Status{IsEssential: false, IsHealthy: err == nil, Details: struct{ QueueSize int }{QueueSize: queueSize}}
} }

Loading…
Cancel
Save