Merge pull request #52 from Quentin-M/custom_notifiers

Custom notifiers
This commit is contained in:
Quentin Machu 2015-12-17 18:23:25 -05:00
commit 11475beb2c
7 changed files with 257 additions and 125 deletions

View File

@ -48,8 +48,7 @@ func Boot(config *config.Config) {
// Start notifier
st.Begin()
notifier := notifier.New(config.Notifier)
go notifier.Serve(st)
go notifier.Run(config.Notifier, st)
// Start API
st.Begin()

View File

@ -26,6 +26,7 @@ import (
"github.com/coreos/pkg/capnslog"
// Register components
_ "github.com/coreos/clair/notifier/notifiers"
_ "github.com/coreos/clair/updater/fetchers"
_ "github.com/coreos/clair/worker/detectors/os"
_ "github.com/coreos/clair/worker/detectors/packages"

View File

@ -29,7 +29,11 @@ updater:
# Use 0 to disable the updater entirely.
interval: 2h
notifier:
# HTTP endpoint that will receive notifications with POST requests.
# How many attempts will the notifier do when a notifier backend fails.
attempts: 3
# Configuration for HTTP notifier
http:
# Endpoint that will receive notifications with POST requests.
endpoint:
# Server name and path to certificates to call the endpoint securely with TLS and client certificate auth.
servername:

View File

@ -42,13 +42,10 @@ type UpdaterConfig struct {
Interval time.Duration
}
// NotifierConfig is the configuration for the Notifier service.
// NotifierConfig is the configuration for the Notifier service and its registered notifiers.
type NotifierConfig struct {
Endpoint string
ServerName string
CertFile string
KeyFile string
CAFile string
Attempts int
Params map[string]interface{} `yaml:",inline"`
}
// APIConfig is the configuration for the API service.
@ -72,6 +69,9 @@ var DefaultConfig = Config{
HealthPort: 6061,
Timeout: 900 * time.Second,
},
Notifier: &NotifierConfig{
Attempts: 5,
},
}
// Load is a shortcut to open a file, read it, and generate a Config.

View File

@ -2,10 +2,8 @@
This tool can send notifications to external services when specific events happen, such as vulnerability updates.
For now, it only supports transmitting them to an HTTP endpoint using POST requests, but it may be extended quite easily.
To enable the notification system, specify the following command-line arguments:
--notifier-type=http --notifier-http-url="http://your-notification-endpoint"
For now, it only supports transmitting them to an webhook endpoint using HTTP POST requests, but it can be extended quite easily by registering a new Notifier kind.
To enable the notification system, you simply have to specify the appropriate configuration. See the [example configuration](../config.example.yaml).
# Types of notifications

View File

@ -17,16 +17,10 @@
package notifier
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/coreos/pkg/capnslog"
"github.com/coreos/pkg/timeutil"
"github.com/pborman/uuid"
"github.com/coreos/clair/config"
@ -39,9 +33,9 @@ var log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier")
const (
checkInterval = 5 * time.Minute
refreshLockDuration = time.Minute * 2
lockDuration = time.Minute*8 + refreshLockDuration
maxBackOff = 15 * time.Minute
)
// TODO(Quentin-M): Allow registering custom notification handlers.
@ -52,77 +46,84 @@ type Notification struct {
Content interface{}
}
// A Notifier dispatches notifications to an HTTP endpoint.
type Notifier struct {
lockIdentifier string
endpoint string
client *http.Client
var notifiers = make(map[string]Notifier)
// Notifier represents anything that can transmit notifications.
type Notifier interface {
// Configure attempts to initialize the notifier with the provided configuration.
// It returns whether the notifier is enabled or not.
Configure(*config.NotifierConfig) (bool, error)
// Send transmits the specified notification.
Send(notification *Notification) error
}
// New initializes a new Notifier from the specified configuration.
func New(config *config.NotifierConfig) *Notifier {
if config == nil {
return &Notifier{}
// RegisterNotifier makes a Fetcher available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
func RegisterNotifier(name string, n Notifier) {
if name == "" {
panic("notifier: could not register a Notifier with an empty name")
}
// Validate endpoint URL.
if _, err := url.Parse(config.Endpoint); err != nil {
log.Error("could not create a notifier with an invalid endpoint URL")
return &Notifier{}
if n == nil {
panic("notifier: could not register a nil Notifier")
}
// Initialize TLS.
tlsConfig, err := loadTLSClientConfig(config)
if err != nil {
log.Fatalf("could not initialize client cert authentication: %s\n", err)
}
if tlsConfig != nil {
log.Info("notifier configured with client certificate authentication")
if _, dup := notifiers[name]; dup {
panic("notifier: RegisterNotifier called twice for " + name)
}
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
notifiers[name] = n
}
return &Notifier{
lockIdentifier: uuid.New(),
endpoint: config.Endpoint,
client: httpClient,
}
}
// Serve starts the Notifier.
func (n *Notifier) Serve(stopper *utils.Stopper) {
// Run starts the Notifier service.
func Run(config *config.NotifierConfig, stopper *utils.Stopper) {
defer stopper.End()
// Do not run the updater if the endpoint is empty.
if n.endpoint == "" {
log.Infof("notifier service is disabled.")
// Configure registered notifiers.
for notifierName, notifier := range notifiers {
if configured, err := notifier.Configure(config); configured {
log.Infof("notifier '%s' configured\n", notifierName)
} else {
delete(notifiers, notifierName)
if err != nil {
log.Errorf("could not configure notifier '%s': %s", notifierName, err)
}
}
}
// Do not run the updater if there is no notifier enabled.
if len(notifiers) == 0 {
log.Infof("notifier service is disabled")
return
}
whoAmI := uuid.New()
log.Infof("notifier service started. lock identifier: %s\n", whoAmI)
// Register healthchecker.
health.RegisterHealthchecker("notifier", n.Healthcheck)
health.RegisterHealthchecker("notifier", Healthcheck)
log.Infof("notifier service started. endpoint: %s. lock identifier: %s\n", n.endpoint, n.lockIdentifier)
for {
for running := true; running; {
// Find task.
// TODO(Quentin-M): Combine node and notification.
node, notification := n.findTask(stopper)
node, notification := findTask(whoAmI, stopper)
if node == "" && notification == nil {
// Interrupted while finding a task, Clair is stopping.
break
}
// Handle task.
done := make(chan bool, 1)
go func() {
if n.handleTask(node, notification) {
success, interrupted := handleTask(notification, stopper, config.Attempts)
if success {
database.MarkNotificationAsSent(node)
}
database.Unlock(node, n.lockIdentifier)
if interrupted {
running = false
}
database.Unlock(node, whoAmI)
done <- true
}()
@ -133,7 +134,7 @@ func (n *Notifier) Serve(stopper *utils.Stopper) {
case <-done:
break outer
case <-time.After(refreshLockDuration):
database.Lock(node, lockDuration, n.lockIdentifier)
database.Lock(node, lockDuration, whoAmI)
}
}
}
@ -141,7 +142,7 @@ func (n *Notifier) Serve(stopper *utils.Stopper) {
log.Info("notifier service stopped")
}
func (n *Notifier) findTask(stopper *utils.Stopper) (string, database.Notification) {
func findTask(whoAmI string, stopper *utils.Stopper) (string, database.Notification) {
for {
// Find a notification to send.
node, notification, err := database.FindOneNotificationToSend(database.GetDefaultNotificationWrapper())
@ -158,84 +159,67 @@ func (n *Notifier) findTask(stopper *utils.Stopper) (string, database.Notificati
}
// Lock the notification.
if hasLock, _ := database.Lock(node, lockDuration, n.lockIdentifier); hasLock {
if hasLock, _ := database.Lock(node, lockDuration, whoAmI); hasLock {
log.Infof("found and locked a notification: %s", notification.GetName())
return node, notification
}
}
}
func (n *Notifier) handleTask(node string, notification database.Notification) bool {
func handleTask(notification database.Notification, st *utils.Stopper, maxAttempts int) (bool, bool) {
// Get notification content.
// TODO(Quentin-M): Split big notifications.
notificationContent, err := notification.GetContent()
if err != nil {
log.Warningf("could not get content of notification '%s': %s", notification.GetName(), err)
return false
return false, false
}
// Create notification.
payload := Notification{
payload := &Notification{
Name: notification.GetName(),
Type: notification.GetType(),
Content: notificationContent,
}
// Marshal notification.
jsonPayload, err := json.Marshal(payload)
if err != nil {
log.Errorf("could not marshal content of notification '%s': %s", notification.GetName(), err)
return false
// Send notification.
for notifierName, notifier := range notifiers {
var attempts int
var backOff time.Duration
for {
// Max attempts exceeded.
if attempts >= maxAttempts {
log.Infof("giving up on sending notification '%s' to notifier '%s': max attempts exceeded (%d)\n", notification.GetName(), notifierName, maxAttempts)
return false, false
}
// Send notification.
resp, err := n.client.Post(n.endpoint, "application/json", bytes.NewBuffer(jsonPayload))
defer resp.Body.Close()
if err != nil || (resp.StatusCode != 200 && resp.StatusCode != 201) {
log.Errorf("could not send notification '%s': (%d) %s", notification.GetName(), resp.StatusCode, err)
return false
// Backoff.
if backOff > 0 {
log.Infof("waiting %v before retrying to send notification '%s' to notifier '%s' (Attempt %d / %d)\n", backOff, notification.GetName(), notifierName, attempts+1, maxAttempts)
if !st.Sleep(backOff) {
return false, true
}
}
// Send using the current notifier.
if err := notifier.Send(payload); err == nil {
// Send has been successful. Go to the next one.
break
}
// Send failed; increase attempts/backoff and retry.
log.Errorf("could not send notification '%s' to notifier '%s': %s", notification.GetName(), notifierName, err)
backOff = timeutil.ExpBackoff(backOff, maxBackOff)
attempts++
}
}
log.Infof("successfully sent notification '%s'\n", notification.GetName())
return true
return true, false
}
// Healthcheck returns the health of the notifier service.
func (n *Notifier) Healthcheck() health.Status {
func Healthcheck() health.Status {
queueSize, err := database.CountNotificationsToSend()
return health.Status{IsEssential: false, IsHealthy: err == nil, Details: struct{ QueueSize int }{QueueSize: queueSize}}
}
// loadTLSClientConfig initializes a *tls.Config using the given notifier
// configuration.
//
// If no certificates are given, (nil, nil) is returned.
// The CA certificate is optional and falls back to the system default.
func loadTLSClientConfig(cfg *config.NotifierConfig) (*tls.Config, error) {
if cfg.CertFile == "" || cfg.KeyFile == "" {
return nil, nil
}
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}
var caCertPool *x509.CertPool
if cfg.CAFile != "" {
caCert, err := ioutil.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
}
tlsConfig := &tls.Config{
ServerName: cfg.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
return tlsConfig, nil
}

View File

@ -0,0 +1,146 @@
// Copyright 2015 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package notifiers implements several kinds of notifier.Notifier
package notifiers
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"gopkg.in/yaml.v2"
"github.com/coreos/clair/config"
"github.com/coreos/clair/notifier"
)
// A WebhookNotifier dispatches notifications to a webhook endpoint.
type WebhookNotifier struct {
endpoint string
client *http.Client
}
// A WebhookNotifierConfiguration represents the configuration of a WebhookNotifier.
type WebhookNotifierConfiguration struct {
Endpoint string
ServerName string
CertFile string
KeyFile string
CAFile string
}
func init() {
notifier.RegisterNotifier("webhook", &WebhookNotifier{})
}
func (h *WebhookNotifier) Configure(config *config.NotifierConfig) (bool, error) {
// Get configuration
var httpConfig WebhookNotifierConfiguration
if config == nil {
return false, nil
}
if _, ok := config.Params["http"]; !ok {
return false, nil
}
yamlConfig, err := yaml.Marshal(config.Params["http"])
if err != nil {
return false, errors.New("invalid configuration")
}
err = yaml.Unmarshal(yamlConfig, &httpConfig)
if err != nil {
return false, errors.New("invalid configuration")
}
// Validate endpoint URL.
if httpConfig.Endpoint == "" {
return false, nil
}
if _, err := url.Parse(httpConfig.Endpoint); err != nil {
return false, errors.New("invalid endpoint URL")
}
h.endpoint = httpConfig.Endpoint
// Initialize TLS.
tlsConfig, err := loadTLSClientConfig(&httpConfig)
if err != nil {
return false, fmt.Errorf("could not initialize client cert auth: %s\n", err)
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
return true, nil
}
func (h *WebhookNotifier) Send(notification *notifier.Notification) error {
// Marshal notification.
jsonNotification, err := json.Marshal(notification)
if err != nil {
return fmt.Errorf("could not marshal: %s", err)
}
// Send notification via HTTP POST.
resp, err := h.client.Post(h.endpoint, "application/json", bytes.NewBuffer(jsonNotification))
if err != nil || resp == nil || (resp.StatusCode != 200 && resp.StatusCode != 201) {
if resp != nil {
return fmt.Errorf("(%d) %s", resp.StatusCode, err)
}
return err
}
defer resp.Body.Close()
return nil
}
// loadTLSClientConfig initializes a *tls.Config using the given WebhookNotifierConfiguration.
//
// If no certificates are given, (nil, nil) is returned.
// The CA certificate is optional and falls back to the system default.
func loadTLSClientConfig(cfg *WebhookNotifierConfiguration) (*tls.Config, error) {
if cfg.CertFile == "" || cfg.KeyFile == "" {
return nil, nil
}
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}
var caCertPool *x509.CertPool
if cfg.CAFile != "" {
caCert, err := ioutil.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
}
tlsConfig := &tls.Config{
ServerName: cfg.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
return tlsConfig, nil
}