notifier/database: refactor notification system and add initial Prometheus support
This commit is contained in:
parent
b8b7be3f81
commit
ad0531acc7
@ -48,9 +48,8 @@ type Datastore interface {
|
|||||||
DeleteVulnerability(namespaceName, name string) error
|
DeleteVulnerability(namespaceName, name string) error
|
||||||
|
|
||||||
// Notifications
|
// Notifications
|
||||||
CountAvailableNotifications() (int, error)
|
GetAvailableNotification(renotifyInterval time.Duration) (VulnerabilityNotification, error) // Does not fill old/new Vulnerabilities.
|
||||||
GetAvailableNotification(renotifyInterval time.Duration) (string, error)
|
GetNotification(name string, limit, page int) (VulnerabilityNotification, error)
|
||||||
GetNotification(name string, limit, page int) (string, interface{}, error)
|
|
||||||
SetNotificationNotified(name string) error
|
SetNotificationNotified(name string) error
|
||||||
DeleteNotification(name string) error
|
DeleteNotification(name string) error
|
||||||
|
|
||||||
|
@ -14,7 +14,11 @@
|
|||||||
|
|
||||||
package database
|
package database
|
||||||
|
|
||||||
import "github.com/coreos/clair/utils/types"
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/clair/utils/types"
|
||||||
|
)
|
||||||
|
|
||||||
// ID is only meant to be used by database implementations and should never be used for anything else.
|
// ID is only meant to be used by database implementations and should never be used for anything else.
|
||||||
type Model struct {
|
type Model struct {
|
||||||
@ -25,10 +29,10 @@ type Layer struct {
|
|||||||
Model
|
Model
|
||||||
|
|
||||||
Name string
|
Name string
|
||||||
EngineVersion int `json:",omitempty"`
|
EngineVersion int
|
||||||
Parent *Layer `json:",omitempty"`
|
Parent *Layer
|
||||||
Namespace *Namespace `json:",omitempty"`
|
Namespace *Namespace
|
||||||
Features []FeatureVersion `json:",omitempty"`
|
Features []FeatureVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
type Namespace struct {
|
type Namespace struct {
|
||||||
@ -42,7 +46,6 @@ type Feature struct {
|
|||||||
|
|
||||||
Name string
|
Name string
|
||||||
Namespace Namespace
|
Namespace Namespace
|
||||||
// FixedBy map[types.Version]Vulnerability // <<-- WRONG.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type FeatureVersion struct {
|
type FeatureVersion struct {
|
||||||
@ -50,7 +53,7 @@ type FeatureVersion struct {
|
|||||||
|
|
||||||
Feature Feature
|
Feature Feature
|
||||||
Version types.Version
|
Version types.Version
|
||||||
AffectedBy []Vulnerability `json:",omitempty"`
|
AffectedBy []Vulnerability
|
||||||
}
|
}
|
||||||
|
|
||||||
type Vulnerability struct {
|
type Vulnerability struct {
|
||||||
@ -62,21 +65,21 @@ type Vulnerability struct {
|
|||||||
Link string
|
Link string
|
||||||
Severity types.Priority
|
Severity types.Priority
|
||||||
|
|
||||||
FixedIn []FeatureVersion `json:",omitempty"`
|
FixedIn []FeatureVersion
|
||||||
//Affects []FeatureVersion
|
LayersIntroducingVulnerability []Layer
|
||||||
|
|
||||||
// For output purposes. Only make sense when the vulnerability
|
// For output purposes. Only make sense when the vulnerability
|
||||||
// is already about a specific Feature/FeatureVersion.
|
// is already about a specific Feature/FeatureVersion.
|
||||||
FixedBy types.Version `json:",omitempty"`
|
FixedBy types.Version `json:",omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NewVulnerabilityNotification struct {
|
type VulnerabilityNotification struct {
|
||||||
VulnerabilityID int
|
Name string
|
||||||
}
|
|
||||||
|
|
||||||
type NewVulnerabilityNotificationPage struct {
|
Created time.Time
|
||||||
Vulnerability Vulnerability
|
Notified time.Time
|
||||||
Layers []Layer
|
Deleted time.Time
|
||||||
}
|
|
||||||
|
|
||||||
// ...
|
OldVulnerability *Vulnerability
|
||||||
|
NewVulnerability Vulnerability
|
||||||
|
}
|
||||||
|
@ -28,8 +28,10 @@ func (pgSQL *pgSQL) insertFeature(feature database.Feature) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if pgSQL.cache != nil {
|
if pgSQL.cache != nil {
|
||||||
|
promCacheQueriesTotal.WithLabelValues("feature").Inc()
|
||||||
id, found := pgSQL.cache.Get("feature:" + feature.Namespace.Name + ":" + feature.Name)
|
id, found := pgSQL.cache.Get("feature:" + feature.Namespace.Name + ":" + feature.Name)
|
||||||
if found {
|
if found {
|
||||||
|
promCacheHitsTotal.WithLabelValues("feature").Inc()
|
||||||
return id.(int), nil
|
return id.(int), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -60,9 +62,11 @@ func (pgSQL *pgSQL) insertFeatureVersion(featureVersion database.FeatureVersion)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if pgSQL.cache != nil {
|
if pgSQL.cache != nil {
|
||||||
|
promCacheQueriesTotal.WithLabelValues("featureversion").Inc()
|
||||||
id, found := pgSQL.cache.Get("featureversion:" + featureVersion.Feature.Namespace.Name + ":" +
|
id, found := pgSQL.cache.Get("featureversion:" + featureVersion.Feature.Namespace.Name + ":" +
|
||||||
featureVersion.Feature.Name + ":" + featureVersion.Version.String())
|
featureVersion.Feature.Name + ":" + featureVersion.Version.String())
|
||||||
if found {
|
if found {
|
||||||
|
promCacheHitsTotal.WithLabelValues("featureversion").Inc()
|
||||||
return id.(int), nil
|
return id.(int), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -143,17 +143,18 @@ CREATE INDEX ON Lock (owner);
|
|||||||
|
|
||||||
|
|
||||||
-- -----------------------------------------------------
|
-- -----------------------------------------------------
|
||||||
-- Table Notification
|
-- Table VulnerabilityNotification
|
||||||
-- -----------------------------------------------------
|
-- -----------------------------------------------------
|
||||||
CREATE TABLE IF NOT EXISTS Notification (
|
CREATE TABLE IF NOT EXISTS Vulnerability_Notification (
|
||||||
id SERIAL PRIMARY KEY,
|
id SERIAL PRIMARY KEY,
|
||||||
name VARCHAR(64) NOT NULL UNIQUE,
|
name VARCHAR(64) NOT NULL UNIQUE,
|
||||||
kind VARCHAR(64) NOT NULL,
|
created_at TIMESTAMP WITH TIME ZONE,
|
||||||
notified_at TIMESTAMP WITH TIME ZONE NULL,
|
notified_at TIMESTAMP WITH TIME ZONE NULL,
|
||||||
deleted_at TIMESTAMP WITH TIME ZONE NULL,
|
deleted_at TIMESTAMP WITH TIME ZONE NULL,
|
||||||
data TEXT);
|
old_vulnerability TEXT,
|
||||||
|
new_vulnerability TEXT);
|
||||||
|
|
||||||
CREATE INDEX ON Notification (notified_at, deleted_at);
|
CREATE INDEX ON Vulnerability_Notification (notified_at);
|
||||||
|
|
||||||
-- +goose Down
|
-- +goose Down
|
||||||
|
|
||||||
@ -165,7 +166,7 @@ DROP TABLE IF EXISTS Namespace,
|
|||||||
Vulnerability,
|
Vulnerability,
|
||||||
Vulnerability_FixedIn_Feature,
|
Vulnerability_FixedIn_Feature,
|
||||||
Vulnerability_Affects_FeatureVersion,
|
Vulnerability_Affects_FeatureVersion,
|
||||||
|
Vulnerability_Notification,
|
||||||
KeyValue,
|
KeyValue,
|
||||||
Lock,
|
Lock
|
||||||
Notification
|
|
||||||
CASCADE;
|
CASCADE;
|
||||||
|
@ -25,7 +25,9 @@ func (pgSQL *pgSQL) insertNamespace(namespace database.Namespace) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if pgSQL.cache != nil {
|
if pgSQL.cache != nil {
|
||||||
|
promCacheQueriesTotal.WithLabelValues("namespace").Inc()
|
||||||
if id, found := pgSQL.cache.Get("namespace:" + namespace.Name); found {
|
if id, found := pgSQL.cache.Get("namespace:" + namespace.Name); found {
|
||||||
|
promCacheHitsTotal.WithLabelValues("namespace").Inc()
|
||||||
return id.(int), nil
|
return id.(int), nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,8 +3,6 @@ package pgsql
|
|||||||
import (
|
import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"reflect"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coreos/clair/database"
|
"github.com/coreos/clair/database"
|
||||||
@ -13,15 +11,22 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// do it in tx so we won't insert/update a vuln without notification and vice-versa.
|
// do it in tx so we won't insert/update a vuln without notification and vice-versa.
|
||||||
func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) error {
|
// name and created doesn't matter.
|
||||||
kind := reflect.Indirect(reflect.ValueOf(notification)).Type().String()
|
func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification database.VulnerabilityNotification) error {
|
||||||
data, err := json.Marshal(notification)
|
// Marshal old and new Vulnerabilities.
|
||||||
|
oldVulnerability, err := json.Marshal(notification.OldVulnerability)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return cerrors.NewBadRequestError("could not marshal notification in insertNotification")
|
return cerrors.NewBadRequestError("could not marshal old Vulnerability in insertNotification")
|
||||||
|
}
|
||||||
|
newVulnerability, err := json.Marshal(notification.NewVulnerability)
|
||||||
|
if err != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
return cerrors.NewBadRequestError("could not marshal new Vulnerability in insertNotification")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.Exec(getQuery("i_notification"), uuid.New(), kind, data)
|
// Insert Notification.
|
||||||
|
_, err = tx.Exec(getQuery("i_notification"), uuid.New(), oldVulnerability, newVulnerability)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return handleError("i_notification", err)
|
return handleError("i_notification", err)
|
||||||
@ -30,51 +35,47 @@ func (pgSQL *pgSQL) insertNotification(tx *sql.Tx, notification interface{}) err
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pgSQL *pgSQL) CountAvailableNotifications() (int, error) {
|
// Get one available notification name (!locked && !deleted && (!notified || notified_but_timed-out)).
|
||||||
var count int
|
// Does not fill new/old vuln.
|
||||||
err := pgSQL.QueryRow(getQuery("c_notification_available")).Scan(&count)
|
func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (database.VulnerabilityNotification, error) {
|
||||||
if err != nil {
|
|
||||||
return 0, handleError("c_notification_available", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return count, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get one available notification (!locked && !deleted && (!notified || notified_but_timed-out)).
|
|
||||||
func (pgSQL *pgSQL) GetAvailableNotification(renotifyInterval time.Duration) (string, error) {
|
|
||||||
before := time.Now().Add(-renotifyInterval)
|
before := time.Now().Add(-renotifyInterval)
|
||||||
|
|
||||||
var name string
|
var notification database.VulnerabilityNotification
|
||||||
err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(&name)
|
err := pgSQL.QueryRow(getQuery("s_notification_available"), before).Scan(¬ification.Name,
|
||||||
|
¬ification.Created, ¬ification.Notified, ¬ification.Deleted)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", handleError("s_notification_available", err)
|
return notification, handleError("s_notification_available", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return name, nil
|
return notification, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (string, interface{}, error) {
|
func (pgSQL *pgSQL) GetNotification(name string, limit, page int) (database.VulnerabilityNotification, error) {
|
||||||
var kind, data string
|
// Get Notification.
|
||||||
err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(&kind, &data)
|
var notification database.VulnerabilityNotification
|
||||||
|
var oldVulnerability []byte
|
||||||
|
var newVulnerability []byte
|
||||||
|
|
||||||
|
err := pgSQL.QueryRow(getQuery("s_notification"), name).Scan(¬ification.Name,
|
||||||
|
¬ification.Created, ¬ification.Notified, ¬ification.Deleted, &newVulnerability,
|
||||||
|
&oldVulnerability)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", struct{}{}, handleError("s_notification", err)
|
return notification, handleError("s_notification", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return constructNotification(kind, data, limit, page)
|
// Unmarshal old and new Vulnerabilities.
|
||||||
|
err = json.Unmarshal(oldVulnerability, notification.OldVulnerability)
|
||||||
|
if err != nil {
|
||||||
|
return notification, cerrors.NewBadRequestError("could not unmarshal old Vulnerability in GetNotification")
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(newVulnerability, ¬ification.NewVulnerability)
|
||||||
|
if err != nil {
|
||||||
|
return notification, cerrors.NewBadRequestError("could not unmarshal new Vulnerability in GetNotification")
|
||||||
}
|
}
|
||||||
|
|
||||||
func constructNotification(kind, data string, limit, page int) (string, interface{}, error) {
|
// TODO(Quentin-M): Fill LayersIntroducingVulnerability.
|
||||||
switch kind {
|
|
||||||
case "NotificationNewVulnerability":
|
|
||||||
var notificationPage database.NewVulnerabilityNotificationPage
|
|
||||||
|
|
||||||
// TODO: Request database to fill NewVulnerabilityNotificationPage properly.
|
return notification, nil
|
||||||
|
|
||||||
return kind, notificationPage, nil
|
|
||||||
default:
|
|
||||||
msg := fmt.Sprintf("could not construct notification, '%s' is an unknown notification type", kind)
|
|
||||||
return "", struct{}{}, cerrors.NewBadRequestError(msg)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pgSQL *pgSQL) SetNotificationNotified(name string) error {
|
func (pgSQL *pgSQL) SetNotificationNotified(name string) error {
|
||||||
|
@ -31,9 +31,32 @@ import (
|
|||||||
"github.com/hashicorp/golang-lru"
|
"github.com/hashicorp/golang-lru"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
"github.com/pborman/uuid"
|
"github.com/pborman/uuid"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = capnslog.NewPackageLogger("github.com/coreos/clair", "pgsql")
|
var (
|
||||||
|
log = capnslog.NewPackageLogger("github.com/coreos/clair", "pgsql")
|
||||||
|
|
||||||
|
promErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "clair_pgsql_errors_total",
|
||||||
|
Help: "Number of errors that PostgreSQL requests generates.",
|
||||||
|
}, []string{"request"})
|
||||||
|
|
||||||
|
promCacheHitsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "clair_pgsql_cache_hits_total",
|
||||||
|
Help: "Number of cache hits that the PostgreSQL backend does.",
|
||||||
|
}, []string{"object"})
|
||||||
|
|
||||||
|
promCacheQueriesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||||
|
Name: "clair_pgsql_cache_queries_total",
|
||||||
|
Help: "Number of cache queries that the PostgreSQL backend does.",
|
||||||
|
}, []string{"object"})
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
prometheus.MustRegister(promCacheHitsTotal)
|
||||||
|
prometheus.MustRegister(promCacheQueriesTotal)
|
||||||
|
}
|
||||||
|
|
||||||
type pgSQL struct {
|
type pgSQL struct {
|
||||||
*sql.DB
|
*sql.DB
|
||||||
@ -198,17 +221,17 @@ func OpenForTest(name string, withTestData bool) (*pgSQLTest, error) {
|
|||||||
// handleError logs an error with an extra description and masks the error if it's an SQL one.
|
// handleError logs an error with an extra description and masks the error if it's an SQL one.
|
||||||
// This ensures we never return plain SQL errors and leak anything.
|
// This ensures we never return plain SQL errors and leak anything.
|
||||||
func handleError(desc string, err error) error {
|
func handleError(desc string, err error) error {
|
||||||
if _, ok := err.(*pq.Error); ok {
|
if err == sql.ErrNoRows {
|
||||||
log.Errorf("%s: %v", desc, err)
|
|
||||||
return database.ErrBackendException
|
|
||||||
} else if err == sql.ErrNoRows {
|
|
||||||
return cerrors.ErrNotFound
|
return cerrors.ErrNotFound
|
||||||
} else if err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") {
|
|
||||||
log.Errorf("%s: %v", desc, err)
|
|
||||||
return database.ErrBackendException
|
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Errorf("%s: %v", desc, err)
|
log.Errorf("%s: %v", desc, err)
|
||||||
|
promErrorsTotal.WithLabelValues(desc).Inc()
|
||||||
|
|
||||||
|
if _, o := err.(*pq.Error); o || err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") {
|
||||||
|
return database.ErrBackendException
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,27 +186,24 @@ func init() {
|
|||||||
AND name = $2`
|
AND name = $2`
|
||||||
|
|
||||||
// notification.go
|
// notification.go
|
||||||
queries["i_notification"] = `INSERT INTO Notification(name, kind, data) VALUES($1, $2, $3)`
|
queries["i_notification"] = `
|
||||||
|
INSERT INTO Vulnerability_Notification(name, created_at, old_vulnerability, new_vulnerability)
|
||||||
|
VALUES($1, CURRENT_TIMESTAMP, $2, $3)`
|
||||||
|
|
||||||
queries["r_notification"] = `UPDATE Notification SET deleted_at = CURRENT_TIMESTAMP`
|
queries["r_notification"] = `UPDATE Vulnerability_Notification SET deleted_at = CURRENT_TIMESTAMP`
|
||||||
|
|
||||||
queries["c_notification_available"] = `
|
|
||||||
SELECT COUNT(name) FROM Notification
|
|
||||||
FROM Notification
|
|
||||||
WHERE notified_at = NULL
|
|
||||||
AND name NOT IN (SELECT name FROM Lock)
|
|
||||||
ORDER BY Random()
|
|
||||||
LIMIT 1`
|
|
||||||
|
|
||||||
queries["s_notification_available"] = `
|
queries["s_notification_available"] = `
|
||||||
SELECT name FROM Notification
|
SELECT name, created_at, notified_at, deleted_at
|
||||||
FROM Notification
|
FROM Vulnerability_Notification
|
||||||
WHERE notified_at = NULL OR notified_at < $1
|
WHERE notified_at = NULL OR notified_at < $1
|
||||||
AND name NOT IN (SELECT name FROM Lock)
|
AND name NOT IN (SELECT name FROM Lock)
|
||||||
ORDER BY Random()
|
ORDER BY Random()
|
||||||
LIMIT 1`
|
LIMIT 1`
|
||||||
|
|
||||||
queries["s_notification"] = `SELECT data FROM Notification WHERE name = $1`
|
queries["s_notification"] = `
|
||||||
|
SELECT name, created_at, notified_at, deleted_at, old_vulnerability, new_vulnerability
|
||||||
|
FROM Vulnerability_Notification
|
||||||
|
WHERE name = $1`
|
||||||
|
|
||||||
// complex_test.go
|
// complex_test.go
|
||||||
queries["s_complextest_featureversion_affects"] = `
|
queries["s_complextest_featureversion_affects"] = `
|
||||||
|
@ -185,7 +185,7 @@ func (pgSQL *pgSQL) insertVulnerability(vulnerability database.Vulnerability) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create NewVulnerabilityNotification.
|
// Create NewVulnerabilityNotification.
|
||||||
notification := database.NewVulnerabilityNotification{VulnerabilityID: vulnerability.ID}
|
notification := database.VulnerabilityNotification{NewVulnerability: vulnerability}
|
||||||
if err := pgSQL.insertNotification(tx, notification); err != nil {
|
if err := pgSQL.insertNotification(tx, notification); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -12,8 +12,8 @@
|
|||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
// Package notifier fetches notifications from the database and sends their names
|
// Package notifier fetches notifications from the database and informs the specified remote handler
|
||||||
// to the specified remote handler, inviting the third party to actively query the API about it.
|
// about their existences, inviting the third party to actively query the API about it.
|
||||||
package notifier
|
package notifier
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -22,16 +22,14 @@ import (
|
|||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
"github.com/coreos/pkg/timeutil"
|
"github.com/coreos/pkg/timeutil"
|
||||||
"github.com/pborman/uuid"
|
"github.com/pborman/uuid"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
"github.com/coreos/clair/config"
|
"github.com/coreos/clair/config"
|
||||||
"github.com/coreos/clair/database"
|
"github.com/coreos/clair/database"
|
||||||
"github.com/coreos/clair/health"
|
|
||||||
"github.com/coreos/clair/utils"
|
"github.com/coreos/clair/utils"
|
||||||
cerrors "github.com/coreos/clair/utils/errors"
|
cerrors "github.com/coreos/clair/utils/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
var log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier")
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
checkInterval = 5 * time.Minute
|
checkInterval = 5 * time.Minute
|
||||||
refreshLockDuration = time.Minute * 2
|
refreshLockDuration = time.Minute * 2
|
||||||
@ -39,21 +37,24 @@ const (
|
|||||||
maxBackOff = 15 * time.Minute
|
maxBackOff = 15 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// A Notification represents the structure of the notifications that are sent by a Notifier.
|
var (
|
||||||
type Notification struct {
|
log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier")
|
||||||
Name, Type string
|
|
||||||
Content interface{}
|
|
||||||
}
|
|
||||||
|
|
||||||
var notifiers = make(map[string]Notifier)
|
notifiers = make(map[string]Notifier)
|
||||||
|
|
||||||
|
promNotifierLatencySeconds = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
|
Name: "clair_notifier_latency_seconds",
|
||||||
|
Help: "Time it takes to send a notification after it's been created.",
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
// Notifier represents anything that can transmit notifications.
|
// Notifier represents anything that can transmit notifications.
|
||||||
type Notifier interface {
|
type Notifier interface {
|
||||||
// Configure attempts to initialize the notifier with the provided configuration.
|
// Configure attempts to initialize the notifier with the provided configuration.
|
||||||
// It returns whether the notifier is enabled or not.
|
// It returns whether the notifier is enabled or not.
|
||||||
Configure(*config.NotifierConfig) (bool, error)
|
Configure(*config.NotifierConfig) (bool, error)
|
||||||
// Send transmits the specified notification name.
|
// Send informs the existence of the specified notification.
|
||||||
Send(notificationName string) error
|
Send(notification database.VulnerabilityNotification) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterNotifier makes a Fetcher available by the provided name.
|
// RegisterNotifier makes a Fetcher available by the provided name.
|
||||||
@ -102,9 +103,8 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u
|
|||||||
|
|
||||||
for running := true; running; {
|
for running := true; running; {
|
||||||
// Find task.
|
// Find task.
|
||||||
// TODO(Quentin-M): Combine node and notification.
|
notification := findTask(datastore, config.RenotifyInterval, whoAmI, stopper)
|
||||||
notificationName := findTask(datastore, config.RenotifyInterval, whoAmI, stopper)
|
if notification == nil {
|
||||||
if notificationName == "" {
|
|
||||||
// Interrupted while finding a task, Clair is stopping.
|
// Interrupted while finding a task, Clair is stopping.
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -112,14 +112,15 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u
|
|||||||
// Handle task.
|
// Handle task.
|
||||||
done := make(chan bool, 1)
|
done := make(chan bool, 1)
|
||||||
go func() {
|
go func() {
|
||||||
success, interrupted := handleTask(notificationName, stopper, config.Attempts)
|
success, interrupted := handleTask(*notification, stopper, config.Attempts)
|
||||||
if success {
|
if success {
|
||||||
datastore.SetNotificationNotified(notificationName)
|
promNotifierLatencySeconds.Set(float64(time.Now().Sub(notification.Created)))
|
||||||
|
datastore.SetNotificationNotified(notification.Name)
|
||||||
}
|
}
|
||||||
if interrupted {
|
if interrupted {
|
||||||
running = false
|
running = false
|
||||||
}
|
}
|
||||||
datastore.Unlock(notificationName, whoAmI)
|
datastore.Unlock(notification.Name, whoAmI)
|
||||||
done <- true
|
done <- true
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -130,7 +131,7 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u
|
|||||||
case <-done:
|
case <-done:
|
||||||
break outer
|
break outer
|
||||||
case <-time.After(refreshLockDuration):
|
case <-time.After(refreshLockDuration):
|
||||||
datastore.Lock(notificationName, whoAmI, lockDuration, true)
|
datastore.Lock(notification.Name, whoAmI, lockDuration, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -138,10 +139,10 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u
|
|||||||
log.Info("notifier service stopped")
|
log.Info("notifier service stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoAmI string, stopper *utils.Stopper) string {
|
func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoAmI string, stopper *utils.Stopper) *database.VulnerabilityNotification {
|
||||||
for {
|
for {
|
||||||
// Find a notification to send.
|
// Find a notification to send.
|
||||||
notificationName, err := datastore.GetAvailableNotification(renotifyInterval)
|
notification, err := datastore.GetAvailableNotification(renotifyInterval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// There is no notification or an error occured.
|
// There is no notification or an error occured.
|
||||||
if err != cerrors.ErrNotFound {
|
if err != cerrors.ErrNotFound {
|
||||||
@ -150,21 +151,21 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA
|
|||||||
|
|
||||||
// Wait.
|
// Wait.
|
||||||
if !stopper.Sleep(checkInterval) {
|
if !stopper.Sleep(checkInterval) {
|
||||||
return ""
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock the notification.
|
// Lock the notification.
|
||||||
if hasLock, _ := datastore.Lock(notificationName, whoAmI, lockDuration, false); hasLock {
|
if hasLock, _ := datastore.Lock(notification.Name, whoAmI, lockDuration, false); hasLock {
|
||||||
log.Infof("found and locked a notification: %s", notificationName)
|
log.Infof("found and locked a notification: %s", notification.Name)
|
||||||
return notificationName
|
return ¬ification
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleTask(notificationName string, st *utils.Stopper, maxAttempts int) (bool, bool) {
|
func handleTask(notification database.VulnerabilityNotification, st *utils.Stopper, maxAttempts int) (bool, bool) {
|
||||||
// Send notification.
|
// Send notification.
|
||||||
for notifierName, notifier := range notifiers {
|
for notifierName, notifier := range notifiers {
|
||||||
var attempts int
|
var attempts int
|
||||||
@ -172,22 +173,22 @@ func handleTask(notificationName string, st *utils.Stopper, maxAttempts int) (bo
|
|||||||
for {
|
for {
|
||||||
// Max attempts exceeded.
|
// Max attempts exceeded.
|
||||||
if attempts >= maxAttempts {
|
if attempts >= maxAttempts {
|
||||||
log.Infof("giving up on sending notification '%s' to notifier '%s': max attempts exceeded (%d)\n", notificationName, notifierName, maxAttempts)
|
log.Infof("giving up on sending notification '%s' to notifier '%s': max attempts exceeded (%d)\n", notification.Name, notifierName, maxAttempts)
|
||||||
return false, false
|
return false, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backoff.
|
// Backoff.
|
||||||
if backOff > 0 {
|
if backOff > 0 {
|
||||||
log.Infof("waiting %v before retrying to send notification '%s' to notifier '%s' (Attempt %d / %d)\n", backOff, notificationName, notifierName, attempts+1, maxAttempts)
|
log.Infof("waiting %v before retrying to send notification '%s' to notifier '%s' (Attempt %d / %d)\n", backOff, notification.Name, notifierName, attempts+1, maxAttempts)
|
||||||
if !st.Sleep(backOff) {
|
if !st.Sleep(backOff) {
|
||||||
return false, true
|
return false, true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send using the current notifier.
|
// Send using the current notifier.
|
||||||
if err := notifier.Send(notificationName); err != nil {
|
if err := notifier.Send(notification); err != nil {
|
||||||
// Send failed; increase attempts/backoff and retry.
|
// Send failed; increase attempts/backoff and retry.
|
||||||
log.Errorf("could not send notification '%s' to notifier '%s': %s", notificationName, notifierName, err)
|
log.Errorf("could not send notification '%s' to notifier '%s': %s", notification.Name, notifierName, err)
|
||||||
backOff = timeutil.ExpBackoff(backOff, maxBackOff)
|
backOff = timeutil.ExpBackoff(backOff, maxBackOff)
|
||||||
attempts++
|
attempts++
|
||||||
}
|
}
|
||||||
@ -197,6 +198,6 @@ func handleTask(notificationName string, st *utils.Stopper, maxAttempts int) (bo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("successfully sent notification '%s'\n", notificationName)
|
log.Infof("successfully sent notification '%s'\n", notification.Name)
|
||||||
return true, false
|
return true, false
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"gopkg.in/yaml.v2"
|
"gopkg.in/yaml.v2"
|
||||||
|
|
||||||
"github.com/coreos/clair/config"
|
"github.com/coreos/clair/config"
|
||||||
|
"github.com/coreos/clair/database"
|
||||||
"github.com/coreos/clair/notifier"
|
"github.com/coreos/clair/notifier"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,7 +49,7 @@ type WebhookNotifierConfiguration struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
//notifier.RegisterNotifier("webhook", &WebhookNotifier{})
|
notifier.RegisterNotifier("webhook", &WebhookNotifier{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *WebhookNotifier) Configure(config *config.NotifierConfig) (bool, error) {
|
func (h *WebhookNotifier) Configure(config *config.NotifierConfig) (bool, error) {
|
||||||
@ -92,9 +93,9 @@ func (h *WebhookNotifier) Configure(config *config.NotifierConfig) (bool, error)
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *WebhookNotifier) Send(notification *notifier.Notification) error {
|
func (h *WebhookNotifier) Send(notification database.VulnerabilityNotification) error {
|
||||||
// Marshal notification.
|
// Marshal notification.
|
||||||
jsonNotification, err := json.Marshal(notification)
|
jsonNotification, err := json.Marshal(notification.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal: %s", err)
|
return fmt.Errorf("could not marshal: %s", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user