clair/worker.go
Sida Chen 5d725e67b0 Replace Ancestry with AncestryWithContent struct in database models
As one of the steps to simplifies the codebase, the AncestryWithContent
struct is renamed to Ancestry, and Ancestry is removed. It will cause
the PostAncestry request to be slower.
2018-09-10 12:48:23 -04:00

619 lines
18 KiB
Go

// Copyright 2017 clair authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clair
import (
"errors"
"regexp"
"sync"
log "github.com/sirupsen/logrus"
"github.com/coreos/clair/database"
"github.com/coreos/clair/ext/featurefmt"
"github.com/coreos/clair/ext/featurens"
"github.com/coreos/clair/ext/imagefmt"
"github.com/coreos/clair/pkg/commonerr"
"github.com/coreos/clair/pkg/strutil"
)
const (
logLayerName = "layer"
)
var (
// ErrUnsupported is the error that should be raised when an OS or package
// manager is not supported.
ErrUnsupported = commonerr.NewBadRequestError("worker: OS and/or package manager are not supported")
// ErrParentUnknown is the error that should be raised when a parent layer
// has yet to be processed for the current layer.
ErrParentUnknown = commonerr.NewBadRequestError("worker: parent layer is unknown, it must be processed first")
urlParametersRegexp = regexp.MustCompile(`(\?|\&)([^=]+)\=([^ &]+)`)
// Processors contain the names of namespace detectors and feature listers
// enabled in this instance of Clair.
//
// Processors are initialized during booting and configured in the
// configuration file.
Processors database.Processors
)
type WorkerConfig struct {
EnabledDetectors []string `yaml:"namespace_detectors"`
EnabledListers []string `yaml:"feature_listers"`
}
// LayerRequest represents all information necessary to download and process a
// layer.
type LayerRequest struct {
Hash string
Path string
Headers map[string]string
}
// partialLayer stores layer's content detected by `processedBy` processors.
type partialLayer struct {
hash string
processedBy database.Processors
namespaces []database.Namespace
features []database.Feature
err error
}
// processRequest stores parameters used for processing layers.
type processRequest struct {
request LayerRequest
// notProcessedBy represents a set of processors used to process the
// request.
notProcessedBy database.Processors
}
// cleanURL removes all parameters from an URL.
func cleanURL(str string) string {
return urlParametersRegexp.ReplaceAllString(str, "")
}
// processLayers in parallel processes a set of requests for unique set of layers
// and returns sets of unique namespaces, features and layers to be inserted
// into the database.
func processRequests(imageFormat string, toDetect []processRequest) ([]database.Namespace, []database.Feature, map[string]partialLayer, error) {
wg := &sync.WaitGroup{}
wg.Add(len(toDetect))
results := make([]partialLayer, len(toDetect))
for i := range toDetect {
go func(req *processRequest, res *partialLayer) {
res.hash = req.request.Hash
res.processedBy = req.notProcessedBy
res.namespaces, res.features, res.err = detectContent(imageFormat, req.request.Hash, req.request.Path, req.request.Headers, req.notProcessedBy)
wg.Done()
}(&toDetect[i], &results[i])
}
wg.Wait()
distinctNS := map[database.Namespace]struct{}{}
distinctF := map[database.Feature]struct{}{}
errs := []error{}
for _, r := range results {
errs = append(errs, r.err)
}
if err := commonerr.CombineErrors(errs...); err != nil {
return nil, nil, nil, err
}
updates := map[string]partialLayer{}
for _, r := range results {
for _, ns := range r.namespaces {
distinctNS[ns] = struct{}{}
}
for _, f := range r.features {
distinctF[f] = struct{}{}
}
if _, ok := updates[r.hash]; !ok {
updates[r.hash] = r
} else {
return nil, nil, nil, errors.New("Duplicated updates is not allowed")
}
}
namespaces := make([]database.Namespace, 0, len(distinctNS))
features := make([]database.Feature, 0, len(distinctF))
for ns := range distinctNS {
namespaces = append(namespaces, ns)
}
for f := range distinctF {
features = append(features, f)
}
return namespaces, features, updates, nil
}
func getLayer(datastore database.Datastore, req LayerRequest) (layer database.LayerWithContent, preq *processRequest, err error) {
var ok bool
tx, err := datastore.Begin()
if err != nil {
return
}
defer tx.Rollback()
layer, ok, err = tx.FindLayerWithContent(req.Hash)
if err != nil {
return
}
if !ok {
err = tx.PersistLayer(req.Hash)
if err != nil {
return
}
if err = tx.Commit(); err != nil {
return
}
layer = database.LayerWithContent{}
layer.Hash = req.Hash
preq = &processRequest{
request: req,
notProcessedBy: Processors,
}
} else {
notProcessed := getNotProcessedBy(layer.ProcessedBy)
if !(len(notProcessed.Detectors) == 0 && len(notProcessed.Listers) == 0 && ok) {
preq = &processRequest{
request: req,
notProcessedBy: notProcessed,
}
}
}
return
}
// processLayers processes a set of post layer requests, stores layers and
// returns an ordered list of processed layers with detected features and
// namespaces.
func processLayers(datastore database.Datastore, imageFormat string, requests []LayerRequest) ([]database.LayerWithContent, error) {
toDetect := []processRequest{}
layers := map[string]database.LayerWithContent{}
for _, req := range requests {
if _, ok := layers[req.Hash]; ok {
continue
}
layer, preq, err := getLayer(datastore, req)
if err != nil {
return nil, err
}
layers[req.Hash] = layer
if preq != nil {
toDetect = append(toDetect, *preq)
}
}
namespaces, features, partialRes, err := processRequests(imageFormat, toDetect)
if err != nil {
return nil, err
}
// Store partial results.
if err := persistNamespaces(datastore, namespaces); err != nil {
return nil, err
}
if err := persistFeatures(datastore, features); err != nil {
return nil, err
}
for _, res := range partialRes {
if err := persistPartialLayer(datastore, res); err != nil {
return nil, err
}
}
// NOTE(Sida): The full layers are computed using partially
// processed layers in current database session. If any other instances of
// Clair are changing some layers in this set of layers, it might generate
// different results especially when the other Clair is with different
// processors.
completeLayers := []database.LayerWithContent{}
for _, req := range requests {
if partialLayer, ok := partialRes[req.Hash]; ok {
completeLayers = append(completeLayers, combineLayers(layers[req.Hash], partialLayer))
} else {
completeLayers = append(completeLayers, layers[req.Hash])
}
}
return completeLayers, nil
}
func persistPartialLayer(datastore database.Datastore, layer partialLayer) error {
tx, err := datastore.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := tx.PersistLayerContent(layer.hash, layer.namespaces, layer.features, layer.processedBy); err != nil {
return err
}
return tx.Commit()
}
func persistFeatures(datastore database.Datastore, features []database.Feature) error {
tx, err := datastore.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := tx.PersistFeatures(features); err != nil {
return err
}
return tx.Commit()
}
func persistNamespaces(datastore database.Datastore, namespaces []database.Namespace) error {
tx, err := datastore.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := tx.PersistNamespaces(namespaces); err != nil {
return err
}
return tx.Commit()
}
// combineLayers merges `layer` and `partial` without duplicated content.
func combineLayers(layer database.LayerWithContent, partial partialLayer) database.LayerWithContent {
mapF := map[database.Feature]struct{}{}
mapNS := map[database.Namespace]struct{}{}
for _, f := range layer.Features {
mapF[f] = struct{}{}
}
for _, ns := range layer.Namespaces {
mapNS[ns] = struct{}{}
}
for _, f := range partial.features {
mapF[f] = struct{}{}
}
for _, ns := range partial.namespaces {
mapNS[ns] = struct{}{}
}
features := make([]database.Feature, 0, len(mapF))
namespaces := make([]database.Namespace, 0, len(mapNS))
for f := range mapF {
features = append(features, f)
}
for ns := range mapNS {
namespaces = append(namespaces, ns)
}
layer.ProcessedBy.Detectors = append(layer.ProcessedBy.Detectors, strutil.CompareStringLists(partial.processedBy.Detectors, layer.ProcessedBy.Detectors)...)
layer.ProcessedBy.Listers = append(layer.ProcessedBy.Listers, strutil.CompareStringLists(partial.processedBy.Listers, layer.ProcessedBy.Listers)...)
return database.LayerWithContent{
Layer: database.Layer{
Hash: layer.Hash,
ProcessedBy: layer.ProcessedBy,
},
Features: features,
Namespaces: namespaces,
}
}
func isAncestryProcessed(datastore database.Datastore, name string) (bool, error) {
tx, err := datastore.Begin()
if err != nil {
return false, err
}
defer tx.Rollback()
ancestry, ok, err := tx.FindAncestry(name)
if err != nil {
return false, err
}
if !ok {
return false, nil
}
notProcessed := getNotProcessedBy(ancestry.ProcessedBy)
return len(notProcessed.Detectors) == 0 && len(notProcessed.Listers) == 0, nil
}
// ProcessAncestry downloads and scans an ancestry if it's not scanned by all
// enabled processors in this instance of Clair.
func ProcessAncestry(datastore database.Datastore, imageFormat, name string, layerRequest []LayerRequest) error {
var (
err error
ok bool
layers []database.LayerWithContent
commonProcessors database.Processors
)
if name == "" {
return commonerr.NewBadRequestError("could not process a layer which does not have a name")
}
if imageFormat == "" {
return commonerr.NewBadRequestError("could not process a layer which does not have a format")
}
if ok, err = isAncestryProcessed(datastore, name); err != nil {
return err
} else if ok {
log.WithField("ancestry", name).Debug("Ancestry is processed")
return nil
}
if layers, err = processLayers(datastore, imageFormat, layerRequest); err != nil {
return err
}
if commonProcessors, err = getProcessors(layers); err != nil {
return err
}
return processAncestry(datastore, name, layers, commonProcessors)
}
// getNamespacedFeatures extracts the namespaced features introduced in each
// layer into one array.
func getNamespacedFeatures(layers []database.AncestryLayer) []database.NamespacedFeature {
features := []database.NamespacedFeature{}
for _, layer := range layers {
features = append(features, layer.DetectedFeatures...)
}
return features
}
func processAncestry(datastore database.Datastore, name string, layers []database.LayerWithContent, commonProcessors database.Processors) error {
var (
ancestry database.Ancestry
err error
)
ancestry.Name = name
ancestry.ProcessedBy = commonProcessors
ancestry.Layers, err = computeAncestryLayers(layers, commonProcessors)
if err != nil {
return err
}
ancestryFeatures := getNamespacedFeatures(ancestry.Layers)
log.WithFields(log.Fields{
"ancestry": name,
"number of features": len(ancestryFeatures),
"processed by": Processors,
"number of layers": len(ancestry.Layers),
}).Debug("compute ancestry features")
if err := persistNamespacedFeatures(datastore, ancestryFeatures); err != nil {
return err
}
tx, err := datastore.Begin()
if err != nil {
return err
}
err = tx.UpsertAncestry(ancestry)
if err != nil {
tx.Rollback()
return err
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}
func persistNamespacedFeatures(datastore database.Datastore, features []database.NamespacedFeature) error {
tx, err := datastore.Begin()
if err != nil {
return err
}
if err := tx.PersistNamespacedFeatures(features); err != nil {
tx.Rollback()
return err
}
if err := tx.Commit(); err != nil {
return err
}
tx, err = datastore.Begin()
if err != nil {
return err
}
if err := tx.CacheAffectedNamespacedFeatures(features); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}
// getProcessors retrieves common subset of the processors of each layer.
func getProcessors(layers []database.LayerWithContent) (database.Processors, error) {
if len(layers) == 0 {
return database.Processors{}, nil
}
detectors := layers[0].ProcessedBy.Detectors
listers := layers[0].ProcessedBy.Listers
detectorsLen := len(detectors)
listersLen := len(listers)
for _, l := range layers[1:] {
detectors := strutil.CompareStringListsInBoth(detectors, l.ProcessedBy.Detectors)
listers := strutil.CompareStringListsInBoth(listers, l.ProcessedBy.Listers)
if len(detectors) != detectorsLen || len(listers) != listersLen {
// This error might be triggered because of multiple workers are
// processing the same instance with different processors.
// TODO(sidchen): Once the features can be associated with
// Detectors/Listers, we can support dynamically generating ancestry's
// detector/lister based on the layers.
return database.Processors{}, errors.New("processing layers with different Clair instances is currently unsupported")
}
}
return database.Processors{
Detectors: detectors,
Listers: listers,
}, nil
}
type introducedFeature struct {
feature database.NamespacedFeature
layerIndex int
}
// computeAncestryLayers computes ancestry's layers along with what features are
// introduced.
func computeAncestryLayers(layers []database.LayerWithContent, commonProcessors database.Processors) ([]database.AncestryLayer, error) {
// TODO(sidchen): Once the features are linked to specific processor, we
// will use commonProcessors to filter out the features for this ancestry.
// version format -> namespace
namespaces := map[string]database.Namespace{}
// version format -> feature ID -> feature
features := map[string]map[string]introducedFeature{}
ancestryLayers := []database.AncestryLayer{}
for index, layer := range layers {
// Initialize the ancestry Layer
initializedLayer := database.AncestryLayer{Layer: layer.Layer, DetectedFeatures: []database.NamespacedFeature{}}
ancestryLayers = append(ancestryLayers, initializedLayer)
// Precondition: namespaces and features contain the result from union
// of all parents.
for _, ns := range layer.Namespaces {
namespaces[ns.VersionFormat] = ns
}
// version format -> feature ID -> feature
currentFeatures := map[string]map[string]introducedFeature{}
for _, f := range layer.Features {
if ns, ok := namespaces[f.VersionFormat]; ok {
var currentMap map[string]introducedFeature
if currentMap, ok = currentFeatures[f.VersionFormat]; !ok {
currentFeatures[f.VersionFormat] = make(map[string]introducedFeature)
currentMap = currentFeatures[f.VersionFormat]
}
inherited := false
if mapF, ok := features[f.VersionFormat]; ok {
if parentFeature, ok := mapF[f.Name+":"+f.Version]; ok {
currentMap[f.Name+":"+f.Version] = parentFeature
inherited = true
}
}
if !inherited {
currentMap[f.Name+":"+f.Version] = introducedFeature{
feature: database.NamespacedFeature{
Feature: f,
Namespace: ns,
},
layerIndex: index,
}
}
} else {
return nil, errors.New("No corresponding version format")
}
}
// NOTE(Sida): we update the feature map in some version format
// only if there's at least one feature with that version format. This
// approach won't differentiate feature file removed vs all detectable
// features removed from that file vs feature file not changed.
//
// One way to differentiate (feature file removed or not changed) vs
// all detectable features removed is to pass in the file status.
for vf, mapF := range currentFeatures {
features[vf] = mapF
}
}
for _, featureMap := range features {
for _, feature := range featureMap {
ancestryLayers[feature.layerIndex].DetectedFeatures = append(
ancestryLayers[feature.layerIndex].DetectedFeatures,
feature.feature,
)
}
}
return ancestryLayers, nil
}
// getNotProcessedBy returns a processors, which contains the detectors and
// listers not in `processedBy` but implemented in the current clair instance.
func getNotProcessedBy(processedBy database.Processors) database.Processors {
notProcessedLister := strutil.CompareStringLists(Processors.Listers, processedBy.Listers)
notProcessedDetector := strutil.CompareStringLists(Processors.Detectors, processedBy.Detectors)
return database.Processors{
Listers: notProcessedLister,
Detectors: notProcessedDetector,
}
}
// detectContent downloads a layer and detects all features and namespaces.
func detectContent(imageFormat, name, path string, headers map[string]string, toProcess database.Processors) (namespaces []database.Namespace, featureVersions []database.Feature, err error) {
log.WithFields(log.Fields{"Hash": name}).Debug("Process Layer")
totalRequiredFiles := append(featurefmt.RequiredFilenames(toProcess.Listers), featurens.RequiredFilenames(toProcess.Detectors)...)
files, err := imagefmt.Extract(imageFormat, path, headers, totalRequiredFiles)
if err != nil {
log.WithError(err).WithFields(log.Fields{
logLayerName: name,
"path": cleanURL(path),
}).Error("failed to extract data from path")
return
}
namespaces, err = featurens.Detect(files, toProcess.Detectors)
if err != nil {
return
}
if len(featureVersions) > 0 {
log.WithFields(log.Fields{logLayerName: name, "count": len(namespaces)}).Debug("detected layer namespaces")
}
featureVersions, err = featurefmt.ListFeatures(files, toProcess.Listers)
if err != nil {
return
}
if len(featureVersions) > 0 {
log.WithFields(log.Fields{logLayerName: name, "count": len(featureVersions)}).Debug("detected layer features")
}
return
}