@ -24,150 +24,172 @@ import (
"time"
"github.com/coreos/pkg/capnslog"
"github.com/coreos/pkg/timeutil"
"github.com/pborman/uuid"
"github.com/coreos/clair/database"
cerrors "github.com/coreos/clair/utils/errors"
"github.com/coreos/clair/health"
"github.com/coreos/clair/utils"
"github.com/pborman/uuid "
httputils "github.com/coreos/clair/utils/http "
)
// A Notifier dispatches notifications
type Notifier interface {
Run ( * utils . Stopper )
}
var log = capnslog . NewPackageLogger ( "github.com/coreos/clair" , "notifier" )
const (
maxBackOff = 5 * time . Minute
checkInterval = 5 * time . Second
checkInterval = 5 * time . Minute
refreshLock Anticip ation = time . Minute * 2
lockDuration = time . Minute * 8 + refreshLock Anticip ation
refreshLock Dur ation = time . Minute * 2
lockDuration = time . Minute * 8 + refreshLock Dur ation
)
// A HTTPNotifier dispatches notifications to an HTTP endpoint with POST requests
type HTTPNotifier struct {
url string
// A Notification represents the structure of the notifications that are sent by a Notifier.
type Notification struct {
Name , Type string
Content interface { }
}
// NewHTTPNotifier initializes a new HTTPNotifier
func NewHTTPNotifier ( URL string ) ( * HTTPNotifier , error ) {
if _ , err := url . Parse ( URL ) ; err != nil {
return nil , cerrors . NewBadRequestError ( "could not create a notifier with an invalid URL" )
}
notifier := & HTTPNotifier { url : URL }
health . RegisterHealthchecker ( "notifier" , notifier . Healthcheck )
// A Notifier dispatches notifications to an HTTP endpoint.
type Notifier struct {
lockIdentifier string
endpoint string
client * http . Client
}
return notifier , nil
// Config represents the configuration of a Notifier.
// The certificates are optionnals and enables client certificate authentification.
type Config struct {
Endpoint string
CertFile , KeyFile , CAFile string
}
// Run pops notifications from the database, lock them, send them, mark them as
// send and unlock them
//
// It uses an exponential backoff when POST requests fail
func ( notifier * HTTPNotifier ) Run ( st * utils . Stopper ) {
defer st . End ( )
// New initializes a new Notifier from the specified configuration.
func New ( cfg Config ) * Notifier {
if _ , err := url . Parse ( cfg . Endpoint ) ; err != nil {
log . Fatal ( "could not create a notifier with an invalid endpoint URL" )
}
// Initialize TLS
tlsConfig , err := httputils . LoadTLSClientConfig ( cfg . CertFile , cfg . KeyFile , cfg . CAFile )
if err != nil {
log . Fatalf ( "could not initialize client cert authentification: %s\n" , err )
}
if tlsConfig != nil {
log . Info ( "notifier configured with client certificate authentification" )
}
whoAmI := uuid . New ( )
log . Infof ( "HTTP notifier started. URL: %s. Lock Identifier: %s" , notifier . url , whoAmI )
httpClient := & http . Client {
Transport : & http . Transport {
TLSClientConfig : tlsConfig ,
} ,
}
for {
node , notification , err := database . FindOneNotificationToSend ( database . GetDefaultNotificationWrapper ( ) )
if notification == nil || err != nil {
if err != nil {
log . Warningf ( "could not get notification to send: %s." , err )
}
return & Notifier {
lockIdentifier: uuid . New ( ) ,
endpoint : cfg . Endpoint ,
client : httpClient ,
}
}
if ! st . Sleep ( checkInterval ) {
break
}
// Serve starts the Notifier.
func ( n * Notifier ) Serve ( stopper * utils . Stopper ) {
defer stopper . End ( )
health . RegisterHealthchecker ( "notifier" , n . Healthcheck )
continue
}
log . Infof ( "notifier service started. endpoint: %s. lock identifier: %s\n" , n . endpoint , n . lockIdentifier )
// Try to lock the notification
hasLock , hasLockUntil := database . Lock ( node , lockDuration , whoAmI )
if ! hasLock {
continue
for {
// Find task.
// TODO(Quentin-M): Combine node and notification.
node , notification := n . findTask ( stopper )
if node == "" && notification == nil {
break
}
for backOff := time . Duration ( 0 ) ; ; backOff = timeutil . ExpBackoff ( backOff , maxBackOff ) {
// Backoff, it happens when an error occurs during the communication
// with the notification endpoint
if backOff > 0 {
// Renew lock before going to sleep if necessary
if time . Now ( ) . Add ( backOff ) . After ( hasLockUntil . Add ( - refreshLockAnticipation ) ) {
hasLock , hasLockUntil = database . Lock ( node , lockDuration , whoAmI )
if ! hasLock {
log . Warning ( "lost lock ownership, aborting" )
break
}
}
// Sleep
if ! st . Sleep ( backOff ) {
return
}
// Handle task.
done := make ( chan bool , 1 )
go func ( ) {
if n . handleTask ( node , notification ) {
database . MarkNotificationAsSent ( node )
}
// Get notification content
content , err := notification . GetContent ( )
if err != nil {
log . Warningf ( "could not get content of notification '%s': %s" , notification . GetName ( ) , err . Error ( ) )
break
database . Unlock ( node , n . lockIdentifier )
done <- true
} ( )
// Refresh task lock until done.
outer :
for {
select {
case <- done :
break outer
case <- time . After ( refreshLockDuration ) :
database . Lock ( node , lockDuration , n . lockIdentifier )
}
}
}
// Marshal the notification content
jsonContent , err := json . Marshal ( struct {
Name , Type string
Content interface { }
} {
Name : notification . GetName ( ) ,
Type : notification . GetType ( ) ,
Content : content ,
} )
if err != nil {
log . Errorf ( "could not marshal content of notification '%s': %s" , notification . GetName ( ) , err . Error ( ) )
break
}
log . Info ( "notifier service stopped" )
}
// Post notification
req , _ := http . NewRequest ( "POST" , notifier . url , bytes . NewBuffer ( jsonContent ) )
req . Header . Set ( "Content-Type" , "application/json" )
func ( n * Notifier ) findTask ( stopper * utils . Stopper ) ( string , database . Notification ) {
for {
// Find a notification to send.
node , notification , err := database . FindOneNotificationToSend ( database . GetDefaultNotificationWrapper ( ) )
if err != nil {
log . Warningf ( "could not get notification to send: %s" , err )
}
client := & http . Client { }
res , err := client . Do ( req )
if err != nil {
log . Warningf ( "could not post notification '%s': %s" , notification . GetName ( ) , err . Error ( ) )
continue
// No notification or error: wait.
if notification == nil || err != nil {
if ! stopper . Sleep ( checkInterval ) {
return "" , nil
}
res . Body . Close ( )
continue
}
if res . StatusCode != 200 && res . StatusCode != 201 {
log . Warningf ( "could not post notification '%s': got status code %d" , notification . GetName ( ) , res . StatusCode )
continue
}
// Lock the notification.
if hasLock , _ := database . Lock ( node , lockDuration , n . lockIdentifier ) ; hasLock {
log . Infof ( "found and locked a notification: %s" , notification . GetName ( ) )
return node , notification
}
}
}
func ( n * Notifier ) handleTask ( node string , notification database . Notification ) bool {
// Get notification content.
// TODO(Quentin-M): Split big notifications.
notificationContent , err := notification . GetContent ( )
if err != nil {
log . Warningf ( "could not get content of notification '%s': %s" , notification . GetName ( ) , err )
return false
}
// Mark the notification as sent
database . MarkNotificationAsSent ( node )
// Create notification.
payload := Notification {
Name : notification . GetName ( ) ,
Type : notification . GetType ( ) ,
Content : notificationContent ,
}
log . Infof ( "sent notification '%s' successfully" , notification . GetName ( ) )
break
}
// Marshal notification.
jsonPayload , err := json . Marshal ( payload )
if err != nil {
log . Errorf ( "could not marshal content of notification '%s': %s" , notification . GetName ( ) , err )
return false
}
if hasLock {
database . Unlock ( node , whoAmI )
}
// Send notification.
resp , err := n . client . Post ( n . endpoint , "application/json" , bytes . NewBuffer ( jsonPayload ) )
defer resp . Body . Close ( )
if err != nil || ( resp . StatusCode != 200 && resp . StatusCode != 201 ) {
log . Errorf ( "could not send notification '%s': (%d) %s" , notification . GetName ( ) , resp . StatusCode , err )
return false
}
log . Info ( "HTTP notifier stopped" )
log . Infof ( "successfully sent notification '%s'\n" , notification . GetName ( ) )
return true
}
// Healthcheck returns the health of the notifier service
func ( n otifier * HTTP Notifier) Healthcheck ( ) health . Status {
// Healthcheck returns the health of the notifier service .
func ( n * Notifier) Healthcheck ( ) health . Status {
queueSize , err := database . CountNotificationsToSend ( )
return health . Status { IsEssential : false , IsHealthy : err == nil , Details : struct { QueueSize int } { QueueSize : queueSize } }
}