262 lines
6.3 KiB
Go
262 lines
6.3 KiB
Go
package pgsql
|
|
|
|
import (
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/lib/pq"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/coreos/clair/database"
|
|
"github.com/coreos/clair/pkg/commonerr"
|
|
)
|
|
|
|
func (tx *pgSession) UpsertAncestry(ancestry database.Ancestry, features []database.NamespacedFeature, processedBy database.Processors) error {
|
|
if ancestry.Name == "" {
|
|
log.Warning("Empty ancestry name is not allowed")
|
|
return commonerr.NewBadRequestError("could not insert an ancestry with empty name")
|
|
}
|
|
|
|
if len(ancestry.Layers) == 0 {
|
|
log.Warning("Empty ancestry is not allowed")
|
|
return commonerr.NewBadRequestError("could not insert an ancestry with 0 layers")
|
|
}
|
|
|
|
err := tx.deleteAncestry(ancestry.Name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var ancestryID int64
|
|
err = tx.QueryRow(insertAncestry, ancestry.Name).Scan(&ancestryID)
|
|
if err != nil {
|
|
if isErrUniqueViolation(err) {
|
|
return handleError("insertAncestry", errors.New("Other Go-routine is processing this ancestry (skip)."))
|
|
}
|
|
return handleError("insertAncestry", err)
|
|
}
|
|
|
|
err = tx.insertAncestryLayers(ancestryID, ancestry.Layers)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = tx.insertAncestryFeatures(ancestryID, features)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.persistProcessors(persistAncestryLister,
|
|
"persistAncestryLister",
|
|
persistAncestryDetector,
|
|
"persistAncestryDetector",
|
|
ancestryID, processedBy)
|
|
}
|
|
|
|
func (tx *pgSession) FindAncestry(name string) (database.Ancestry, database.Processors, bool, error) {
|
|
ancestry := database.Ancestry{Name: name}
|
|
processed := database.Processors{}
|
|
|
|
var ancestryID int64
|
|
err := tx.QueryRow(searchAncestry, name).Scan(&ancestryID)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return ancestry, processed, false, nil
|
|
}
|
|
return ancestry, processed, false, handleError("searchAncestry", err)
|
|
}
|
|
|
|
ancestry.Layers, err = tx.findAncestryLayers(ancestryID)
|
|
if err != nil {
|
|
return ancestry, processed, false, err
|
|
}
|
|
|
|
processed.Detectors, err = tx.findProcessors(searchAncestryDetectors, "searchAncestryDetectors", "detector", ancestryID)
|
|
if err != nil {
|
|
return ancestry, processed, false, err
|
|
}
|
|
|
|
processed.Listers, err = tx.findProcessors(searchAncestryListers, "searchAncestryListers", "lister", ancestryID)
|
|
if err != nil {
|
|
return ancestry, processed, false, err
|
|
}
|
|
|
|
return ancestry, processed, true, nil
|
|
}
|
|
|
|
func (tx *pgSession) FindAncestryFeatures(name string) (database.AncestryWithFeatures, bool, error) {
|
|
var (
|
|
awf database.AncestryWithFeatures
|
|
ok bool
|
|
err error
|
|
)
|
|
awf.Ancestry, awf.ProcessedBy, ok, err = tx.FindAncestry(name)
|
|
if err != nil {
|
|
return awf, false, err
|
|
}
|
|
|
|
if !ok {
|
|
return awf, false, nil
|
|
}
|
|
|
|
rows, err := tx.Query(searchAncestryFeatures, name)
|
|
if err != nil {
|
|
return awf, false, handleError("searchAncestryFeatures", err)
|
|
}
|
|
|
|
for rows.Next() {
|
|
nf := database.NamespacedFeature{}
|
|
err := rows.Scan(&nf.Namespace.Name, &nf.Namespace.VersionFormat, &nf.Feature.Name, &nf.Feature.Version)
|
|
if err != nil {
|
|
return awf, false, handleError("searchAncestryFeatures", err)
|
|
}
|
|
nf.Feature.VersionFormat = nf.Namespace.VersionFormat
|
|
awf.Features = append(awf.Features, nf)
|
|
}
|
|
|
|
return awf, true, nil
|
|
}
|
|
|
|
func (tx *pgSession) deleteAncestry(name string) error {
|
|
result, err := tx.Exec(removeAncestry, name)
|
|
if err != nil {
|
|
return handleError("removeAncestry", err)
|
|
}
|
|
|
|
_, err = result.RowsAffected()
|
|
if err != nil {
|
|
return handleError("removeAncestry", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tx *pgSession) findProcessors(query, queryName, processorType string, id int64) ([]string, error) {
|
|
rows, err := tx.Query(query, id)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
log.Warning("No " + processorType + " are used")
|
|
return nil, nil
|
|
}
|
|
return nil, handleError(queryName, err)
|
|
}
|
|
|
|
var (
|
|
processors []string
|
|
processor string
|
|
)
|
|
|
|
for rows.Next() {
|
|
err := rows.Scan(&processor)
|
|
if err != nil {
|
|
return nil, handleError(queryName, err)
|
|
}
|
|
processors = append(processors, processor)
|
|
}
|
|
|
|
return processors, nil
|
|
}
|
|
|
|
func (tx *pgSession) findAncestryLayers(ancestryID int64) ([]database.Layer, error) {
|
|
rows, err := tx.Query(searchAncestryLayer, ancestryID)
|
|
if err != nil {
|
|
return nil, handleError("searchAncestryLayer", err)
|
|
}
|
|
layers := []database.Layer{}
|
|
for rows.Next() {
|
|
var layer database.Layer
|
|
err := rows.Scan(&layer.Hash)
|
|
if err != nil {
|
|
return nil, handleError("searchAncestryLayer", err)
|
|
}
|
|
layers = append(layers, layer)
|
|
}
|
|
return layers, nil
|
|
}
|
|
|
|
func (tx *pgSession) insertAncestryLayers(ancestryID int64, layers []database.Layer) error {
|
|
layerIDs := map[string]sql.NullInt64{}
|
|
for _, l := range layers {
|
|
layerIDs[l.Hash] = sql.NullInt64{}
|
|
}
|
|
|
|
layerHashes := []string{}
|
|
for hash := range layerIDs {
|
|
layerHashes = append(layerHashes, hash)
|
|
}
|
|
|
|
rows, err := tx.Query(searchLayerIDs, pq.Array(layerHashes))
|
|
if err != nil {
|
|
return handleError("searchLayerIDs", err)
|
|
}
|
|
|
|
for rows.Next() {
|
|
var (
|
|
layerID sql.NullInt64
|
|
layerName string
|
|
)
|
|
err := rows.Scan(&layerID, &layerName)
|
|
if err != nil {
|
|
return handleError("searchLayerIDs", err)
|
|
}
|
|
layerIDs[layerName] = layerID
|
|
}
|
|
|
|
notFound := []string{}
|
|
for hash, id := range layerIDs {
|
|
if !id.Valid {
|
|
notFound = append(notFound, hash)
|
|
}
|
|
}
|
|
|
|
if len(notFound) > 0 {
|
|
return handleError("searchLayerIDs", fmt.Errorf("Layer %s is not found in database", strings.Join(notFound, ",")))
|
|
}
|
|
|
|
//TODO(Sida): use bulk insert.
|
|
stmt, err := tx.Prepare(insertAncestryLayer)
|
|
if err != nil {
|
|
return handleError("insertAncestryLayer", err)
|
|
}
|
|
|
|
defer stmt.Close()
|
|
for index, layer := range layers {
|
|
_, err := stmt.Exec(ancestryID, index, layerIDs[layer.Hash].Int64)
|
|
if err != nil {
|
|
return handleError("insertAncestryLayer", commonerr.CombineErrors(err, stmt.Close()))
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (tx *pgSession) insertAncestryFeatures(ancestryID int64, features []database.NamespacedFeature) error {
|
|
featureIDs, err := tx.findNamespacedFeatureIDs(features)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
//TODO(Sida): use bulk insert.
|
|
stmtFeatures, err := tx.Prepare(insertAncestryFeature)
|
|
if err != nil {
|
|
return handleError("insertAncestryFeature", err)
|
|
}
|
|
|
|
defer stmtFeatures.Close()
|
|
|
|
for _, id := range featureIDs {
|
|
if !id.Valid {
|
|
return errors.New("requested namespaced feature is not in database")
|
|
}
|
|
|
|
_, err := stmtFeatures.Exec(ancestryID, id)
|
|
if err != nil {
|
|
return handleError("insertAncestryFeature", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|