From 20a126c84ae8fad5f9a9ee3bc042866407d48308 Mon Sep 17 00:00:00 2001 From: Quentin Machu Date: Mon, 23 Nov 2015 23:43:33 -0500 Subject: [PATCH] notifier: Refactor and add client certificate authentification support. Fixes #23 --- main.go | 29 +++-- notifier/notifier.go | 252 +++++++++++++++++++++++-------------------- utils/http/http.go | 55 ++++++++++ 3 files changed, 205 insertions(+), 131 deletions(-) create mode 100644 utils/http/http.go diff --git a/main.go b/main.go index cfaccbd3..c6ace87d 100644 --- a/main.go +++ b/main.go @@ -45,8 +45,10 @@ var ( cfgDbPath = kingpin.Flag("db-path", "Path to the database to use").String() // Notifier configuration - cfgNotifierType = kingpin.Flag("notifier-type", "Type of the notifier to use").Default("none").Enum("none", "http") - cfgNotifierHTTPURL = kingpin.Flag("notifier-http-url", "URL that will receive POST notifications").String() + cfgNotifierEndpoint = kingpin.Flag("notifier-endpoint", "URL that will receive POST notifications").String() + cfgNotifierCertFile = kingpin.Flag("notifier-cert-file", "Path to TLS Cert file").ExistingFile() + cfgNotifierKeyFile = kingpin.Flag("notifier-key-file", "Path to TLS Key file").ExistingFile() + cfgNotifierCAFile = kingpin.Flag("notifier-ca-file", "Path to CA for verifying TLS client certs").ExistingFile() // Updater configuration cfgUpdateInterval = kingpin.Flag("update-interval", "Frequency at which the vulnerability updater will run. Use 0 to disable the updater entirely.").Default("1h").Duration() @@ -75,10 +77,6 @@ func main() { kingpin.Errorf("required flag --db-path not provided, try --help") os.Exit(1) } - if *cfgNotifierType == "http" && *cfgNotifierHTTPURL == "" { - kingpin.Errorf("required flag --notifier-http-url not provided, try --help") - os.Exit(1) - } // Initialize error/logging system logLevel, err := capnslog.ParseLevel(strings.ToUpper(*cfgLogLevel)) @@ -110,17 +108,16 @@ func main() { defer database.Close() // Start notifier - var notifierService notifier.Notifier - switch *cfgNotifierType { - case "http": - notifierService, err = notifier.NewHTTPNotifier(*cfgNotifierHTTPURL) - if err != nil { - log.Fatalf("could not initialize HTTP notifier: %s", err) - } - } - if notifierService != nil { + if len(*cfgNotifierEndpoint) > 0 { + notifier := notifier.New(notifier.Config{ + Endpoint: *cfgNotifierEndpoint, + CertFile: *cfgNotifierCertFile, + KeyFile: *cfgNotifierKeyFile, + CAFile: *cfgNotifierCAFile, + }) + st.Begin() - go notifierService.Run(st) + go notifier.Serve(st) } // Start Main API and Health API diff --git a/notifier/notifier.go b/notifier/notifier.go index 6793098d..dc4995ea 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -24,150 +24,172 @@ import ( "time" "github.com/coreos/pkg/capnslog" - "github.com/coreos/pkg/timeutil" + "github.com/pborman/uuid" + "github.com/coreos/clair/database" - cerrors "github.com/coreos/clair/utils/errors" "github.com/coreos/clair/health" "github.com/coreos/clair/utils" - "github.com/pborman/uuid" + httputils "github.com/coreos/clair/utils/http" ) -// A Notifier dispatches notifications -type Notifier interface { - Run(*utils.Stopper) -} - var log = capnslog.NewPackageLogger("github.com/coreos/clair", "notifier") const ( - maxBackOff = 5 * time.Minute - checkInterval = 5 * time.Second + checkInterval = 5 * time.Minute - refreshLockAnticipation = time.Minute * 2 - lockDuration = time.Minute*8 + refreshLockAnticipation + refreshLockDuration = time.Minute * 2 + lockDuration = time.Minute*8 + refreshLockDuration ) -// A HTTPNotifier dispatches notifications to an HTTP endpoint with POST requests -type HTTPNotifier struct { - url string +// A Notification represents the structure of the notifications that are sent by a Notifier. +type Notification struct { + Name, Type string + Content interface{} } -// NewHTTPNotifier initializes a new HTTPNotifier -func NewHTTPNotifier(URL string) (*HTTPNotifier, error) { - if _, err := url.Parse(URL); err != nil { - return nil, cerrors.NewBadRequestError("could not create a notifier with an invalid URL") +// A Notifier dispatches notifications to an HTTP endpoint. +type Notifier struct { + lockIdentifier string + endpoint string + client *http.Client +} + +// Config represents the configuration of a Notifier. +// The certificates are optionnals and enables client certificate authentification. +type Config struct { + Endpoint string + CertFile, KeyFile, CAFile string +} + +// New initializes a new Notifier from the specified configuration. +func New(cfg Config) *Notifier { + if _, err := url.Parse(cfg.Endpoint); err != nil { + log.Fatal("could not create a notifier with an invalid endpoint URL") } - notifier := &HTTPNotifier{url: URL} - health.RegisterHealthchecker("notifier", notifier.Healthcheck) + // Initialize TLS + tlsConfig, err := httputils.LoadTLSClientConfig(cfg.CertFile, cfg.KeyFile, cfg.CAFile) + if err != nil { + log.Fatalf("could not initialize client cert authentification: %s\n", err) + } + if tlsConfig != nil { + log.Info("notifier configured with client certificate authentification") + } - return notifier, nil + httpClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsConfig, + }, + } + + return &Notifier{ + lockIdentifier: uuid.New(), + endpoint: cfg.Endpoint, + client: httpClient, + } } -// Run pops notifications from the database, lock them, send them, mark them as -// send and unlock them -// -// It uses an exponential backoff when POST requests fail -func (notifier *HTTPNotifier) Run(st *utils.Stopper) { - defer st.End() +// Serve starts the Notifier. +func (n *Notifier) Serve(stopper *utils.Stopper) { + defer stopper.End() + health.RegisterHealthchecker("notifier", n.Healthcheck) - whoAmI := uuid.New() - log.Infof("HTTP notifier started. URL: %s. Lock Identifier: %s", notifier.url, whoAmI) + log.Infof("notifier service started. endpoint: %s. lock identifier: %s\n", n.endpoint, n.lockIdentifier) for { - node, notification, err := database.FindOneNotificationToSend(database.GetDefaultNotificationWrapper()) - if notification == nil || err != nil { - if err != nil { - log.Warningf("could not get notification to send: %s.", err) - } - - if !st.Sleep(checkInterval) { - break - } - - continue - } - - // Try to lock the notification - hasLock, hasLockUntil := database.Lock(node, lockDuration, whoAmI) - if !hasLock { - continue - } - - for backOff := time.Duration(0); ; backOff = timeutil.ExpBackoff(backOff, maxBackOff) { - // Backoff, it happens when an error occurs during the communication - // with the notification endpoint - if backOff > 0 { - // Renew lock before going to sleep if necessary - if time.Now().Add(backOff).After(hasLockUntil.Add(-refreshLockAnticipation)) { - hasLock, hasLockUntil = database.Lock(node, lockDuration, whoAmI) - if !hasLock { - log.Warning("lost lock ownership, aborting") - break - } - } - - // Sleep - if !st.Sleep(backOff) { - return - } - } - - // Get notification content - content, err := notification.GetContent() - if err != nil { - log.Warningf("could not get content of notification '%s': %s", notification.GetName(), err.Error()) - break - } - - // Marshal the notification content - jsonContent, err := json.Marshal(struct { - Name, Type string - Content interface{} - }{ - Name: notification.GetName(), - Type: notification.GetType(), - Content: content, - }) - if err != nil { - log.Errorf("could not marshal content of notification '%s': %s", notification.GetName(), err.Error()) - break - } - - // Post notification - req, _ := http.NewRequest("POST", notifier.url, bytes.NewBuffer(jsonContent)) - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{} - res, err := client.Do(req) - if err != nil { - log.Warningf("could not post notification '%s': %s", notification.GetName(), err.Error()) - continue - } - res.Body.Close() - - if res.StatusCode != 200 && res.StatusCode != 201 { - log.Warningf("could not post notification '%s': got status code %d", notification.GetName(), res.StatusCode) - continue - } - - // Mark the notification as sent - database.MarkNotificationAsSent(node) - - log.Infof("sent notification '%s' successfully", notification.GetName()) + // Find task. + // TODO(Quentin-M): Combine node and notification. + node, notification := n.findTask(stopper) + if node == "" && notification == nil { break } - if hasLock { - database.Unlock(node, whoAmI) + // Handle task. + done := make(chan bool, 1) + go func() { + if n.handleTask(node, notification) { + database.MarkNotificationAsSent(node) + } + database.Unlock(node, n.lockIdentifier) + done <- true + }() + + // Refresh task lock until done. + outer: + for { + select { + case <-done: + break outer + case <-time.After(refreshLockDuration): + database.Lock(node, lockDuration, n.lockIdentifier) + } } } - log.Info("HTTP notifier stopped") + log.Info("notifier service stopped") } -// Healthcheck returns the health of the notifier service -func (notifier *HTTPNotifier) Healthcheck() health.Status { +func (n *Notifier) findTask(stopper *utils.Stopper) (string, database.Notification) { + for { + // Find a notification to send. + node, notification, err := database.FindOneNotificationToSend(database.GetDefaultNotificationWrapper()) + if err != nil { + log.Warningf("could not get notification to send: %s", err) + } + + // No notification or error: wait. + if notification == nil || err != nil { + if !stopper.Sleep(checkInterval) { + return "", nil + } + continue + } + + // Lock the notification. + if hasLock, _ := database.Lock(node, lockDuration, n.lockIdentifier); hasLock { + log.Infof("found and locked a notification: %s", notification.GetName()) + return node, notification + } + } +} + +func (n *Notifier) handleTask(node string, notification database.Notification) bool { + // Get notification content. + // TODO(Quentin-M): Split big notifications. + notificationContent, err := notification.GetContent() + if err != nil { + log.Warningf("could not get content of notification '%s': %s", notification.GetName(), err) + return false + } + + // Create notification. + payload := Notification{ + Name: notification.GetName(), + Type: notification.GetType(), + Content: notificationContent, + } + + // Marshal notification. + jsonPayload, err := json.Marshal(payload) + if err != nil { + log.Errorf("could not marshal content of notification '%s': %s", notification.GetName(), err) + return false + } + + // Send notification. + resp, err := n.client.Post(n.endpoint, "application/json", bytes.NewBuffer(jsonPayload)) + defer resp.Body.Close() + if err != nil || (resp.StatusCode != 200 && resp.StatusCode != 201) { + log.Errorf("could not send notification '%s': (%d) %s", notification.GetName(), resp.StatusCode, err) + return false + } + + log.Infof("successfully sent notification '%s'\n", notification.GetName()) + return true +} + +// Healthcheck returns the health of the notifier service. +func (n *Notifier) Healthcheck() health.Status { queueSize, err := database.CountNotificationsToSend() return health.Status{IsEssential: false, IsHealthy: err == nil, Details: struct{ QueueSize int }{QueueSize: queueSize}} } diff --git a/utils/http/http.go b/utils/http/http.go new file mode 100644 index 00000000..6250d733 --- /dev/null +++ b/utils/http/http.go @@ -0,0 +1,55 @@ +// Copyright 2015 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package http provides utility functions for HTTP servers and clients. +package http + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" +) + +// LoadTLSClientConfig initializes a *tls.Config using the given certificates and private key, that +// can be used to communicate with a server using client certificate authentificate. +// +// If no certificates are given, a nil *tls.Config is returned. +// The CA certificate is optionnal, the system defaults are used if not provided. +func LoadTLSClientConfig(certFile, keyFile, caFile string) (*tls.Config, error) { + if len(certFile) == 0 || len(keyFile) == 0 { + return nil, nil + } + + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + + var caCertPool *x509.CertPool + if len(caFile) > 0 { + caCert, err := ioutil.ReadFile(caFile) + if err != nil { + return nil, err + } + caCertPool = x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + } + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + + return tlsConfig, nil +}