notifier: Allow custom notifiers to be registered.

This commit is contained in:
Quentin Machu 2015-12-15 11:36:06 -05:00
parent 7ee1481e92
commit 3ff8bfaa93
7 changed files with 217 additions and 120 deletions

View File

@ -48,8 +48,7 @@ func Boot(config *config.Config) {
// Start notifier // Start notifier
st.Begin() st.Begin()
notifier := notifier.New(config.Notifier) go notifier.Run(config.Notifier, st)
go notifier.Serve(st)
// Start API // Start API
st.Begin() st.Begin()

View File

@ -26,6 +26,7 @@ import (
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
// Register components // Register components
_ "github.com/coreos/clair/notifier/notifiers"
_ "github.com/coreos/clair/updater/fetchers" _ "github.com/coreos/clair/updater/fetchers"
_ "github.com/coreos/clair/worker/detectors/os" _ "github.com/coreos/clair/worker/detectors/os"
_ "github.com/coreos/clair/worker/detectors/packages" _ "github.com/coreos/clair/worker/detectors/packages"

View File

@ -29,7 +29,9 @@ updater:
# Use 0 to disable the updater entirely. # Use 0 to disable the updater entirely.
interval: 2h interval: 2h
notifier: notifier:
# HTTP endpoint that will receive notifications with POST requests. # Configuration for HTTP notifier
http:
# Endpoint that will receive notifications with POST requests.
endpoint: endpoint:
# Server name and path to certificates to call the endpoint securely with TLS and client certificate auth. # Server name and path to certificates to call the endpoint securely with TLS and client certificate auth.
servername: servername:

View File

@ -42,13 +42,9 @@ type UpdaterConfig struct {
Interval time.Duration Interval time.Duration
} }
// NotifierConfig is the configuration for the Notifier service. // NotifierConfig is the configuration for the Notifier service and its registered notifiers.
type NotifierConfig struct { type NotifierConfig struct {
Endpoint string Params map[string]interface{} `yaml:",inline"`
ServerName string
CertFile string
KeyFile string
CAFile string
} }
// APIConfig is the configuration for the API service. // APIConfig is the configuration for the API service.

View File

@ -2,10 +2,8 @@
This tool can send notifications to external services when specific events happen, such as vulnerability updates. This tool can send notifications to external services when specific events happen, such as vulnerability updates.
For now, it only supports transmitting them to an HTTP endpoint using POST requests, but it may be extended quite easily. For now, it only supports transmitting them to an HTTP endpoint using POST requests, but it can be extended quite easily by registering a new Notifier kind.
To enable the notification system, specify the following command-line arguments: To enable the notification system, you simply have to specify the appropriate configuration. See the [example configuration](../config.example.yaml).
--notifier-type=http --notifier-http-url="http://your-notification-endpoint"
# Types of notifications # Types of notifications

View File

@ -17,13 +17,6 @@
package notifier package notifier
import ( import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"time" "time"
"github.com/coreos/pkg/capnslog" "github.com/coreos/pkg/capnslog"
@ -52,66 +45,68 @@ type Notification struct {
Content interface{} Content interface{}
} }
// A Notifier dispatches notifications to an HTTP endpoint. var notifiers = make(map[string]Notifier)
type Notifier struct {
lockIdentifier string // Notifier represents anything that can transmit notifications.
endpoint string type Notifier interface {
client *http.Client // Configure attempts to initialize the notifier with the provided configuration.
// It returns whether the notifier is enabled or not.
Configure(*config.NotifierConfig) (bool, error)
// Send transmits the specified notification.
Send(notification *Notification) error
} }
// New initializes a new Notifier from the specified configuration. // RegisterNotifier makes a Fetcher available by the provided name.
func New(config *config.NotifierConfig) *Notifier { // If Register is called twice with the same name or if driver is nil,
if config == nil { // it panics.
return &Notifier{} func RegisterNotifier(name string, n Notifier) {
if name == "" {
panic("notifier: could not register a Notifier with an empty name")
} }
// Validate endpoint URL. if n == nil {
if _, err := url.Parse(config.Endpoint); err != nil { panic("notifier: could not register a nil Notifier")
log.Error("could not create a notifier with an invalid endpoint URL")
return &Notifier{}
} }
// Initialize TLS. if _, dup := notifiers[name]; dup {
tlsConfig, err := loadTLSClientConfig(config) panic("notifier: RegisterNotifier called twice for " + name)
if err != nil {
log.Fatalf("could not initialize client cert authentication: %s\n", err)
}
if tlsConfig != nil {
log.Info("notifier configured with client certificate authentication")
} }
httpClient := &http.Client{ notifiers[name] = n
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
} }
return &Notifier{ // Run starts the Notifier service.
lockIdentifier: uuid.New(), func Run(config *config.NotifierConfig, stopper *utils.Stopper) {
endpoint: config.Endpoint,
client: httpClient,
}
}
// Serve starts the Notifier.
func (n *Notifier) Serve(stopper *utils.Stopper) {
defer stopper.End() defer stopper.End()
// Do not run the updater if the endpoint is empty. // Configure registered notifiers.
if n.endpoint == "" { for notifierName, notifier := range notifiers {
log.Infof("notifier service is disabled.") if configured, err := notifier.Configure(config); configured {
log.Infof("notifier '%s' configured\n", notifierName)
} else {
delete(notifiers, notifierName)
if err != nil {
log.Errorf("could not configure notifier '%s': %s", notifierName, err)
}
}
}
// Do not run the updater if there is no notifier enabled.
if len(notifiers) == 0 {
log.Infof("notifier service is disabled")
return return
} }
// Register healthchecker. whoAmI := uuid.New()
health.RegisterHealthchecker("notifier", n.Healthcheck) log.Infof("notifier service started. lock identifier: %s\n", whoAmI)
log.Infof("notifier service started. endpoint: %s. lock identifier: %s\n", n.endpoint, n.lockIdentifier) // Register healthchecker.
health.RegisterHealthchecker("notifier", Healthcheck)
for { for {
// Find task. // Find task.
// TODO(Quentin-M): Combine node and notification. // TODO(Quentin-M): Combine node and notification.
node, notification := n.findTask(stopper) node, notification := findTask(whoAmI, stopper)
if node == "" && notification == nil { if node == "" && notification == nil {
break break
} }
@ -119,10 +114,10 @@ func (n *Notifier) Serve(stopper *utils.Stopper) {
// Handle task. // Handle task.
done := make(chan bool, 1) done := make(chan bool, 1)
go func() { go func() {
if n.handleTask(node, notification) { if handleTask(notification) {
database.MarkNotificationAsSent(node) database.MarkNotificationAsSent(node)
} }
database.Unlock(node, n.lockIdentifier) database.Unlock(node, whoAmI)
done <- true done <- true
}() }()
@ -133,7 +128,7 @@ func (n *Notifier) Serve(stopper *utils.Stopper) {
case <-done: case <-done:
break outer break outer
case <-time.After(refreshLockDuration): case <-time.After(refreshLockDuration):
database.Lock(node, lockDuration, n.lockIdentifier) database.Lock(node, lockDuration, whoAmI)
} }
} }
} }
@ -141,7 +136,7 @@ func (n *Notifier) Serve(stopper *utils.Stopper) {
log.Info("notifier service stopped") log.Info("notifier service stopped")
} }
func (n *Notifier) findTask(stopper *utils.Stopper) (string, database.Notification) { func findTask(whoAmI string, stopper *utils.Stopper) (string, database.Notification) {
for { for {
// Find a notification to send. // Find a notification to send.
node, notification, err := database.FindOneNotificationToSend(database.GetDefaultNotificationWrapper()) node, notification, err := database.FindOneNotificationToSend(database.GetDefaultNotificationWrapper())
@ -158,14 +153,14 @@ func (n *Notifier) findTask(stopper *utils.Stopper) (string, database.Notificati
} }
// Lock the notification. // Lock the notification.
if hasLock, _ := database.Lock(node, lockDuration, n.lockIdentifier); hasLock { if hasLock, _ := database.Lock(node, lockDuration, whoAmI); hasLock {
log.Infof("found and locked a notification: %s", notification.GetName()) log.Infof("found and locked a notification: %s", notification.GetName())
return node, notification return node, notification
} }
} }
} }
func (n *Notifier) handleTask(node string, notification database.Notification) bool { func handleTask(notification database.Notification) bool {
// Get notification content. // Get notification content.
// TODO(Quentin-M): Split big notifications. // TODO(Quentin-M): Split big notifications.
notificationContent, err := notification.GetContent() notificationContent, err := notification.GetContent()
@ -175,25 +170,19 @@ func (n *Notifier) handleTask(node string, notification database.Notification) b
} }
// Create notification. // Create notification.
payload := Notification{ payload := &Notification{
Name: notification.GetName(), Name: notification.GetName(),
Type: notification.GetType(), Type: notification.GetType(),
Content: notificationContent, Content: notificationContent,
} }
// Marshal notification. // Send notification.
jsonPayload, err := json.Marshal(payload) // TODO(Quentin-M): Backoff / MaxRetries
if err != nil { for notifierName, notifier := range notifiers {
log.Errorf("could not marshal content of notification '%s': %s", notification.GetName(), err) if err := notifier.Send(payload); err != nil {
log.Errorf("could not send notification '%s' to notifier '%s': %s", notification.GetName(), notifierName, err)
return false return false
} }
// Send notification.
resp, err := n.client.Post(n.endpoint, "application/json", bytes.NewBuffer(jsonPayload))
defer resp.Body.Close()
if err != nil || (resp.StatusCode != 200 && resp.StatusCode != 201) {
log.Errorf("could not send notification '%s': (%d) %s", notification.GetName(), resp.StatusCode, err)
return false
} }
log.Infof("successfully sent notification '%s'\n", notification.GetName()) log.Infof("successfully sent notification '%s'\n", notification.GetName())
@ -201,41 +190,7 @@ func (n *Notifier) handleTask(node string, notification database.Notification) b
} }
// Healthcheck returns the health of the notifier service. // Healthcheck returns the health of the notifier service.
func (n *Notifier) Healthcheck() health.Status { func Healthcheck() health.Status {
queueSize, err := database.CountNotificationsToSend() queueSize, err := database.CountNotificationsToSend()
return health.Status{IsEssential: false, IsHealthy: err == nil, Details: struct{ QueueSize int }{QueueSize: queueSize}} return health.Status{IsEssential: false, IsHealthy: err == nil, Details: struct{ QueueSize int }{QueueSize: queueSize}}
} }
// loadTLSClientConfig initializes a *tls.Config using the given notifier
// configuration.
//
// If no certificates are given, (nil, nil) is returned.
// The CA certificate is optional and falls back to the system default.
func loadTLSClientConfig(cfg *config.NotifierConfig) (*tls.Config, error) {
if cfg.CertFile == "" || cfg.KeyFile == "" {
return nil, nil
}
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}
var caCertPool *x509.CertPool
if cfg.CAFile != "" {
caCert, err := ioutil.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
}
tlsConfig := &tls.Config{
ServerName: cfg.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
return tlsConfig, nil
}

146
notifier/notifiers/http.go Normal file
View File

@ -0,0 +1,146 @@
// Copyright 2015 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package notifiers implements several kinds of notifier.Notifier
package notifiers
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"gopkg.in/yaml.v2"
"github.com/coreos/clair/config"
"github.com/coreos/clair/notifier"
)
// A HTTP notifier dispatches notifications to an HTTP endpoint.
type HTTP struct {
endpoint string
client *http.Client
}
// A HTTPConfiguration represents the configuration of an HTTP notifier.
type HTTPConfiguration struct {
Endpoint string
ServerName string
CertFile string
KeyFile string
CAFile string
}
func init() {
notifier.RegisterNotifier("http", &HTTP{})
}
func (h *HTTP) Configure(config *config.NotifierConfig) (bool, error) {
// Get configuration
var httpConfig HTTPConfiguration
if config == nil {
return false, nil
}
if _, ok := config.Params["http"]; !ok {
return false, nil
}
yamlConfig, err := yaml.Marshal(config.Params["http"])
if err != nil {
return false, errors.New("invalid configuration")
}
err = yaml.Unmarshal(yamlConfig, &httpConfig)
if err != nil {
return false, errors.New("invalid configuration")
}
// Validate endpoint URL.
if httpConfig.Endpoint == "" {
return false, nil
}
if _, err := url.Parse(httpConfig.Endpoint); err != nil {
return false, errors.New("invalid endpoint URL")
}
h.endpoint = httpConfig.Endpoint
// Initialize TLS.
tlsConfig, err := loadTLSClientConfig(&httpConfig)
if err != nil {
return false, fmt.Errorf("could not initialize client cert auth: %s\n", err)
}
h.client = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
return true, nil
}
func (h *HTTP) Send(notification *notifier.Notification) error {
// Marshal notification.
jsonNotification, err := json.Marshal(notification)
if err != nil {
return fmt.Errorf("could not marshal: %s", err)
}
// Send notification over HTTP.
resp, err := h.client.Post(h.endpoint, "application/json", bytes.NewBuffer(jsonNotification))
if err != nil || resp == nil || (resp.StatusCode != 200 && resp.StatusCode != 201) {
if resp != nil {
return fmt.Errorf("(%d) %s", resp.StatusCode, err)
}
return err
}
defer resp.Body.Close()
return nil
}
// loadTLSClientConfig initializes a *tls.Config using the given HTTPConfiguration.
//
// If no certificates are given, (nil, nil) is returned.
// The CA certificate is optional and falls back to the system default.
func loadTLSClientConfig(cfg *HTTPConfiguration) (*tls.Config, error) {
if cfg.CertFile == "" || cfg.KeyFile == "" {
return nil, nil
}
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, err
}
var caCertPool *x509.CertPool
if cfg.CAFile != "" {
caCert, err := ioutil.ReadFile(cfg.CAFile)
if err != nil {
return nil, err
}
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
}
tlsConfig := &tls.Config{
ServerName: cfg.ServerName,
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
return tlsConfig, nil
}