v3: Analyze layer content in parallel
This commit is contained in:
parent
2c7838eac7
commit
88f506918b
@ -15,7 +15,10 @@
|
|||||||
package v3
|
package v3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
@ -83,32 +86,47 @@ func (s *AncestryServer) PostAncestry(ctx context.Context, req *pb.PostAncestryR
|
|||||||
}
|
}
|
||||||
|
|
||||||
builder := clair.NewAncestryBuilder(clair.EnabledDetectors())
|
builder := clair.NewAncestryBuilder(clair.EnabledDetectors())
|
||||||
|
layerMap := map[string]*database.Layer{}
|
||||||
|
layerMapLock := sync.RWMutex{}
|
||||||
|
g, analyzerCtx := errgroup.WithContext(ctx)
|
||||||
|
for i := range req.Layers {
|
||||||
|
layer := req.Layers[i]
|
||||||
|
if _, ok := layerMap[layer.Hash]; !ok {
|
||||||
|
layerMap[layer.Hash] = nil
|
||||||
|
if layer == nil {
|
||||||
|
err := status.Error(codes.InvalidArgument, "ancestry layer is invalid")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if layer.GetHash() == "" {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "ancestry layer hash should not be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
if layer.GetPath() == "" {
|
||||||
|
return nil, status.Error(codes.InvalidArgument, "ancestry layer path should not be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
g.Go(func() error {
|
||||||
|
clairLayer, err := clair.AnalyzeLayer(analyzerCtx, s.Store, layer.Hash, req.Format, layer.Path, layer.Headers)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
layerMapLock.Lock()
|
||||||
|
layerMap[layer.Hash] = clairLayer
|
||||||
|
layerMapLock.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = g.Wait(); err != nil {
|
||||||
|
return nil, newRPCErrorWithClairError(codes.Internal, err)
|
||||||
|
}
|
||||||
|
|
||||||
for _, layer := range req.Layers {
|
for _, layer := range req.Layers {
|
||||||
if layer == nil {
|
builder.AddLeafLayer(layerMap[layer.Hash])
|
||||||
err := status.Error(codes.InvalidArgument, "ancestry layer is invalid")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if layer.GetHash() == "" {
|
|
||||||
return nil, status.Error(codes.InvalidArgument, "ancestry layer hash should not be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
if layer.GetPath() == "" {
|
|
||||||
return nil, status.Error(codes.InvalidArgument, "ancestry layer path should not be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(sidac): make AnalyzeLayer to be async to ensure
|
|
||||||
// non-blocking downloads.
|
|
||||||
// We'll need to deal with two layers post by the same or different
|
|
||||||
// requests that may have the same hash. In that case, since every
|
|
||||||
// layer/feature/namespace is unique in the database, it may introduce
|
|
||||||
// deadlock.
|
|
||||||
clairLayer, err := clair.AnalyzeLayer(ctx, s.Store, layer.Hash, req.Format, layer.Path, layer.Headers)
|
|
||||||
if err != nil {
|
|
||||||
return nil, newRPCErrorWithClairError(codes.Internal, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
builder.AddLeafLayer(clairLayer)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := clair.SaveAncestry(s.Store, builder.Ancestry(req.AncestryName)); err != nil {
|
if err := clair.SaveAncestry(s.Store, builder.Ancestry(req.AncestryName)); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user