ext: lock all drivers

This commit is contained in:
Jimmy Zelinskie 2017-01-13 02:33:19 -05:00
parent 78cef02fda
commit f9b319089d
6 changed files with 109 additions and 43 deletions

View File

@ -21,6 +21,8 @@
package notification
import (
"sync"
"github.com/coreos/pkg/capnslog"
"github.com/coreos/clair/config"
@ -30,8 +32,8 @@ import (
var (
log = capnslog.NewPackageLogger("github.com/coreos/clair", "ext/notification")
// Senders is the list of registered Senders.
Senders = make(map[string]Sender)
sendersM sync.RWMutex
senders = make(map[string]Sender)
)
// Sender represents anything that can transmit notifications.
@ -46,8 +48,8 @@ type Sender interface {
// RegisterSender makes a Sender available by the provided name.
//
// If RegisterSender is called twice with the same name, the name is blank, or
// if the provided Sender is nil, this function panics.
// If called twice with the same name, the name is blank, or if the provided
// Sender is nil, this function panics.
func RegisterSender(name string, s Sender) {
if name == "" {
panic("notification: could not register a Sender with an empty name")
@ -57,9 +59,33 @@ func RegisterSender(name string, s Sender) {
panic("notification: could not register a nil Sender")
}
if _, dup := Senders[name]; dup {
sendersM.Lock()
defer sendersM.Unlock()
if _, dup := senders[name]; dup {
panic("notification: RegisterSender called twice for " + name)
}
Senders[name] = s
senders[name] = s
}
// Senders returns the list of the registered Senders.
func Senders() map[string]Sender {
sendersM.RLock()
defer sendersM.RUnlock()
ret := make(map[string]Sender)
for k, v := range senders {
ret[k] = v
}
return ret
}
// UnregisterSender removes a Sender with a particular name from the list.
func UnregisterSender(name string) {
sendersM.Lock()
defer sendersM.Unlock()
delete(senders, name)
}

View File

@ -63,18 +63,20 @@ type Parser interface {
// if the provided Parser is nil, this function panics.
func RegisterParser(name string, p Parser) {
if name == "" {
panic("Could not register a Parser with an empty name")
panic("versionfmt: could not register a Parser with an empty name")
}
if p == nil {
panic("Could not register a nil Parser")
panic("versionfmt: could not register a nil Parser")
}
parsersM.Lock()
defer parsersM.Unlock()
if _, alreadyExists := parsers[name]; alreadyExists {
panic("Parser '" + name + "' is already registered")
if _, dup := parsers[name]; dup {
panic("versionfmt: RegisterParser called twice for " + name)
}
parsers[name] = p
}

View File

@ -17,12 +17,16 @@
package vulnmdsrc
import (
"sync"
"github.com/coreos/clair/database"
"github.com/coreos/clair/utils/types"
)
// Appenders is the list of registered Appenders.
var Appenders = make(map[string]Appender)
var (
appendersM sync.RWMutex
appenders = make(map[string]Appender)
)
// AppendFunc is the type of a callback provided to an Appender.
type AppendFunc func(metadataKey string, metadata interface{}, severity types.Priority)
@ -49,20 +53,37 @@ type Appender interface {
}
// RegisterAppender makes an Appender available by the provided name.
// If Register is called twice with the same name or if driver is nil,
// it panics.
//
// If called twice with the same name, the name is blank, or if the provided
// Appender is nil, this function panics.
func RegisterAppender(name string, a Appender) {
if name == "" {
panic("updater: could not register an Appender with an empty name")
panic("vulnmdsrc: could not register an Appender with an empty name")
}
if a == nil {
panic("vulnmdsrc: could not register a nil Appender")
}
if _, dup := Appenders[name]; dup {
appendersM.Lock()
defer appendersM.Unlock()
if _, dup := appenders[name]; dup {
panic("vulnmdsrc: RegisterAppender called twice for " + name)
}
Appenders[name] = a
appenders[name] = a
}
// Appenders returns the list of the registered Appenders.
func Appenders() map[string]Appender {
appendersM.RLock()
defer appendersM.RUnlock()
ret := make(map[string]Appender)
for k, v := range appenders {
ret[k] = v
}
return ret
}

View File

@ -18,21 +18,30 @@ package vulnsrc
import (
"errors"
"sync"
"github.com/coreos/clair/database"
)
var (
// Updaters is the list of registered Updaters.
Updaters = make(map[string]Updater)
// ErrFilesystem is returned when a fetcher fails to interact with the local filesystem.
ErrFilesystem = errors.New("vulnsrc: something went wrong when interacting with the fs")
// ErrGitFailure is returned when a fetcher fails to interact with git.
ErrGitFailure = errors.New("vulnsrc: something went wrong when interacting with git")
updatersM sync.RWMutex
updaters = make(map[string]Updater)
)
// UpdateResponse represents the sum of results of an update.
type UpdateResponse struct {
FlagName string
FlagValue string
Notes []string
Vulnerabilities []database.Vulnerability
}
// Updater represents anything that can fetch vulnerabilities and insert them
// into a Clair datastore.
type Updater interface {
@ -44,18 +53,10 @@ type Updater interface {
Clean()
}
// UpdateResponse represents the sum of results of an update.
type UpdateResponse struct {
FlagName string
FlagValue string
Notes []string
Vulnerabilities []database.Vulnerability
}
// RegisterUpdater makes an Updater available by the provided name.
//
// If RegisterUpdater is called twice with the same name, the name is blank, or
// if the provided Updater is nil, this function panics.
// If called twice with the same name, the name is blank, or if the provided
// Updater is nil, this function panics.
func RegisterUpdater(name string, u Updater) {
if name == "" {
panic("vulnsrc: could not register an Updater with an empty name")
@ -65,9 +66,25 @@ func RegisterUpdater(name string, u Updater) {
panic("vulnsrc: could not register a nil Updater")
}
if _, dup := Updaters[name]; dup {
updatersM.Lock()
defer updatersM.Unlock()
if _, dup := updaters[name]; dup {
panic("vulnsrc: RegisterUpdater called twice for " + name)
}
Updaters[name] = u
updaters[name] = u
}
// Updaters returns the list of the registered Updaters.
func Updaters() map[string]Updater {
updatersM.RLock()
defer updatersM.RUnlock()
ret := make(map[string]Updater)
for k, v := range updaters {
ret[k] = v
}
return ret
}

View File

@ -63,11 +63,11 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u
defer stopper.End()
// Configure registered notifiers.
for senderName, sender := range notification.Senders {
for senderName, sender := range notification.Senders() {
if configured, err := sender.Configure(config); configured {
log.Infof("sender '%s' configured\n", senderName)
} else {
delete(notification.Senders, senderName)
notification.UnregisterSender(senderName)
if err != nil {
log.Errorf("could not configure notifier '%s': %s", senderName, err)
}
@ -75,7 +75,7 @@ func Run(config *config.NotifierConfig, datastore database.Datastore, stopper *u
}
// Do not run the updater if there is no notifier enabled.
if len(notification.Senders) == 0 {
if len(notification.Senders()) == 0 {
log.Infof("notifier service is disabled")
return
}
@ -149,7 +149,7 @@ func findTask(datastore database.Datastore, renotifyInterval time.Duration, whoA
func handleTask(n database.VulnerabilityNotification, st *utils.Stopper, maxAttempts int) (bool, bool) {
// Send notification.
for senderName, sender := range notification.Senders {
for senderName, sender := range notification.Senders() {
var attempts int
var backOff time.Duration
for {

View File

@ -152,10 +152,10 @@ func Run(config *config.UpdaterConfig, datastore database.Datastore, st *utils.S
}
// Clean resources.
for _, appenders := range vulnmdsrc.Appenders {
for _, appenders := range vulnmdsrc.Appenders() {
appenders.Clean()
}
for _, updaters := range vulnsrc.Updaters {
for _, updaters := range vulnsrc.Updaters() {
updaters.Clean()
}
@ -215,7 +215,7 @@ func fetch(datastore database.Datastore) (bool, []database.Vulnerability, map[st
// Fetch updates in parallel.
log.Info("fetching vulnerability updates")
var responseC = make(chan *vulnsrc.UpdateResponse, 0)
for n, u := range vulnsrc.Updaters {
for n, u := range vulnsrc.Updaters() {
go func(name string, u vulnsrc.Updater) {
response, err := u.Update(datastore)
if err != nil {
@ -231,7 +231,7 @@ func fetch(datastore database.Datastore) (bool, []database.Vulnerability, map[st
}
// Collect results of updates.
for i := 0; i < len(vulnsrc.Updaters); i++ {
for i := 0; i < len(vulnsrc.Updaters()); i++ {
resp := <-responseC
if resp != nil {
vulnerabilities = append(vulnerabilities, doVulnerabilitiesNamespacing(resp.Vulnerabilities)...)
@ -248,7 +248,7 @@ func fetch(datastore database.Datastore) (bool, []database.Vulnerability, map[st
// Add metadata to the specified vulnerabilities using the registered MetadataFetchers, in parallel.
func addMetadata(datastore database.Datastore, vulnerabilities []database.Vulnerability) []database.Vulnerability {
if len(vulnmdsrc.Appenders) == 0 {
if len(vulnmdsrc.Appenders()) == 0 {
return vulnerabilities
}
@ -264,9 +264,9 @@ func addMetadata(datastore database.Datastore, vulnerabilities []database.Vulner
}
var wg sync.WaitGroup
wg.Add(len(vulnmdsrc.Appenders))
wg.Add(len(vulnmdsrc.Appenders()))
for n, a := range vulnmdsrc.Appenders {
for n, a := range vulnmdsrc.Appenders() {
go func(name string, appender vulnmdsrc.Appender) {
defer wg.Done()