From a33050637b4b28f947eb8256cd48ee35d2fe5bfe Mon Sep 17 00:00:00 2001 From: Sida Chen Date: Wed, 6 Mar 2019 16:26:31 -0500 Subject: [PATCH] pgsql: Move extra logic in pgsql.go to util folder - pgsql/util/error.go contains error handling logic - pgsql/page/page.go contains the page struct - pgsql/monitoring contains the prometheus logic - pgsql/pgsession contains the pgsession struct --- database/pgsql/monitoring/prometheus.go | 67 +++++++++ database/pgsql/page/page.go | 24 ++++ database/pgsql/pgsession.go | 184 ++++++++++++++++++++++++ database/pgsql/pgsession_test.go | 56 ++++++++ database/pgsql/pgsql.go | 101 ------------- database/pgsql/util/error.go | 63 ++++++++ 6 files changed, 394 insertions(+), 101 deletions(-) create mode 100644 database/pgsql/monitoring/prometheus.go create mode 100644 database/pgsql/page/page.go create mode 100644 database/pgsql/pgsession.go create mode 100644 database/pgsql/pgsession_test.go create mode 100644 database/pgsql/util/error.go diff --git a/database/pgsql/monitoring/prometheus.go b/database/pgsql/monitoring/prometheus.go new file mode 100644 index 00000000..ccd1970b --- /dev/null +++ b/database/pgsql/monitoring/prometheus.go @@ -0,0 +1,67 @@ +// Copyright 2019 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitoring + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + PromErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "clair_pgsql_errors_total", + Help: "Number of errors that PostgreSQL requests generated.", + }, []string{"request"}) + + PromCacheHitsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "clair_pgsql_cache_hits_total", + Help: "Number of cache hits that the PostgreSQL backend did.", + }, []string{"object"}) + + PromCacheQueriesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "clair_pgsql_cache_queries_total", + Help: "Number of cache queries that the PostgreSQL backend did.", + }, []string{"object"}) + + PromQueryDurationMilliseconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "clair_pgsql_query_duration_milliseconds", + Help: "Time it takes to execute the database query.", + }, []string{"query", "subquery"}) + + PromConcurrentLockVAFV = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "clair_pgsql_concurrent_lock_vafv_total", + Help: "Number of transactions trying to hold the exclusive Vulnerability_Affects_Feature lock.", + }) +) + +func init() { + prometheus.MustRegister(PromErrorsTotal) + prometheus.MustRegister(PromCacheHitsTotal) + prometheus.MustRegister(PromCacheQueriesTotal) + prometheus.MustRegister(PromQueryDurationMilliseconds) + prometheus.MustRegister(PromConcurrentLockVAFV) +} + +// monitoring.ObserveQueryTime computes the time elapsed since `start` to represent the +// query time. +// 1. `query` is a pgSession function name. +// 2. `subquery` is a specific query or a batched query. +// 3. `start` is the time right before query is executed. +func ObserveQueryTime(query, subquery string, start time.Time) { + PromQueryDurationMilliseconds. + WithLabelValues(query, subquery). + Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)) +} diff --git a/database/pgsql/page/page.go b/database/pgsql/page/page.go new file mode 100644 index 00000000..3ec17e8e --- /dev/null +++ b/database/pgsql/page/page.go @@ -0,0 +1,24 @@ +// Copyright 2019 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package page + +// Page is the representation of a page for the Postgres schema. +type Page struct { + // StartID is the ID being used as the basis for pagination across database + // results. It is used to search for an ancestry with ID >= StartID. + // + // StartID is required to be unique to every ancestry and always increasing. + StartID int64 +} diff --git a/database/pgsql/pgsession.go b/database/pgsql/pgsession.go new file mode 100644 index 00000000..ed443ccc --- /dev/null +++ b/database/pgsql/pgsession.go @@ -0,0 +1,184 @@ +// Copyright 2019 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pgsql + +import ( + "database/sql" + "time" + + "github.com/coreos/clair/database/pgsql/vulnerability" + + "github.com/coreos/clair/database" + "github.com/coreos/clair/database/pgsql/ancestry" + "github.com/coreos/clair/database/pgsql/detector" + "github.com/coreos/clair/database/pgsql/feature" + "github.com/coreos/clair/database/pgsql/layer" + "github.com/coreos/clair/database/pgsql/lock" + "github.com/coreos/clair/database/pgsql/namespace" + "github.com/coreos/clair/database/pgsql/notification" + "github.com/coreos/clair/pkg/pagination" +) + +type pgSession struct { + *sql.Tx + + key pagination.Key +} + +func (tx *pgSession) Commit() error { + return tx.Tx.Commit() +} + +// Rollback drops changes to datastore. +// +// Rollback call after Commit does no-op. +func (tx *pgSession) Rollback() error { + return tx.Tx.Rollback() +} + +// UpsertAncestry inserts or replaces an ancestry and its namespaced +// features and processors used to scan the ancestry. +func (tx *pgSession) UpsertAncestry(a database.Ancestry) error { + return ancestry.UpsertAncestry(tx.Tx, a) +} + +// FindAncestry retrieves an ancestry with all detected +// namespaced features. If the ancestry is not found, return false. +func (tx *pgSession) FindAncestry(name string) (database.Ancestry, bool, error) { + return ancestry.FindAncestry(tx.Tx, name) +} + +// PersistDetector inserts a slice of detectors if not in the database. +func (tx *pgSession) PersistDetectors(detectors []database.Detector) error { + return detector.PersistDetectors(tx.Tx, detectors) +} + +// PersistFeatures inserts a set of features if not in the database. +func (tx *pgSession) PersistFeatures(features []database.Feature) error { + return feature.PersistFeatures(tx.Tx, features) +} + +// PersistNamespacedFeatures inserts a set of namespaced features if not in +// the database. +func (tx *pgSession) PersistNamespacedFeatures(features []database.NamespacedFeature) error { + return feature.PersistNamespacedFeatures(tx.Tx, features) +} + +// CacheAffectedNamespacedFeatures relates the namespaced features with the +// vulnerabilities affecting these features. +// +// NOTE(Sida): it's not necessary for every database implementation and so +// this function may have a better home. +func (tx *pgSession) CacheAffectedNamespacedFeatures(features []database.NamespacedFeature) error { + return vulnerability.CacheAffectedNamespacedFeatures(tx.Tx, features) +} + +// FindAffectedNamespacedFeatures retrieves a set of namespaced features +// with affecting vulnerabilities. +func (tx *pgSession) FindAffectedNamespacedFeatures(features []database.NamespacedFeature) ([]database.NullableAffectedNamespacedFeature, error) { + return vulnerability.FindAffectedNamespacedFeatures(tx.Tx, features) +} + +// PersistNamespaces inserts a set of namespaces if not in the database. +func (tx *pgSession) PersistNamespaces(namespaces []database.Namespace) error { + return namespace.PersistNamespaces(tx.Tx, namespaces) +} + +// PersistLayer appends a layer's content in the database. +func (tx *pgSession) PersistLayer(hash string, features []database.LayerFeature, namespaces []database.LayerNamespace, detectedBy []database.Detector) error { + return layer.PersistLayer(tx.Tx, hash, features, namespaces, detectedBy) +} + +func (tx *pgSession) FindLayer(hash string) (database.Layer, bool, error) { + return layer.FindLayer(tx.Tx, hash) +} + +// InsertVulnerabilities inserts a set of UNIQUE vulnerabilities with +// affected features into database, assuming that all vulnerabilities +// provided are NOT in database and all vulnerabilities' namespaces are +// already in the database. +func (tx *pgSession) InsertVulnerabilities(vulns []database.VulnerabilityWithAffected) error { + return vulnerability.InsertVulnerabilities(tx.Tx, vulns) +} + +// FindVulnerability retrieves a set of Vulnerabilities with affected +// features. +func (tx *pgSession) FindVulnerabilities(ids []database.VulnerabilityID) ([]database.NullableVulnerability, error) { + return vulnerability.FindVulnerabilities(tx.Tx, ids) +} + +// DeleteVulnerability removes a set of Vulnerabilities assuming that the +// requested vulnerabilities are in the database. +func (tx *pgSession) DeleteVulnerabilities(ids []database.VulnerabilityID) error { + return vulnerability.DeleteVulnerabilities(tx.Tx, ids) +} + +// InsertVulnerabilityNotifications inserts a set of unique vulnerability +// notifications into datastore, assuming that they are not in the database. +func (tx *pgSession) InsertVulnerabilityNotifications(notifications []database.VulnerabilityNotification) error { + return notification.InsertVulnerabilityNotifications(tx.Tx, notifications) +} + +func (tx *pgSession) FindNewNotification(notifiedBefore time.Time) (hook database.NotificationHook, found bool, err error) { + return notification.FindNewNotification(tx.Tx, notifiedBefore) +} + +func (tx *pgSession) FindVulnerabilityNotification(name string, limit int, oldVulnerabilityPage pagination.Token, newVulnerabilityPage pagination.Token) (noti database.VulnerabilityNotificationWithVulnerable, found bool, err error) { + return notification.FindVulnerabilityNotification(tx.Tx, name, limit, oldVulnerabilityPage, newVulnerabilityPage, tx.key) +} + +// MarkNotificationAsRead marks a Notification as notified now, assuming +// the requested notification is in the database. +func (tx *pgSession) MarkNotificationAsRead(name string) error { + return notification.MarkNotificationAsRead(tx.Tx, name) +} + +// DeleteNotification removes a Notification in the database. +func (tx *pgSession) DeleteNotification(name string) error { + return notification.DeleteNotification(tx.Tx, name) +} + +// UpdateKeyValue stores or updates a simple key/value pair. +func (tx *pgSession) UpdateKeyValue(key, value string) error { + return lock.UpdateKeyValue(tx.Tx, key, value) +} + +// FindKeyValue retrieves a value from the given key. +func (tx *pgSession) FindKeyValue(key string) (value string, found bool, err error) { + return lock.FindKeyValue(tx.Tx, key) +} + +// AcquireLock acquires a brand new lock in the database with a given name +// for the given duration. +// +// A lock can only have one owner. +// This method should NOT block until a lock is acquired. +func (tx *pgSession) AcquireLock(name, owner string, duration time.Duration) (acquired bool, expiration time.Time, err error) { + return lock.AcquireLock(tx.Tx, name, owner, duration) +} + +// ExtendLock extends an existing lock such that the lock will expire at the +// current time plus the provided duration. +// +// This method should return immediately with an error if the lock does not +// exist. +func (tx *pgSession) ExtendLock(name, owner string, duration time.Duration) (extended bool, expiration time.Time, err error) { + return lock.ExtendLock(tx.Tx, name, owner, duration) +} + +// ReleaseLock releases an existing lock. +func (tx *pgSession) ReleaseLock(name, owner string) error { + return lock.ReleaseLock(tx.Tx, name, owner) +} diff --git a/database/pgsql/pgsession_test.go b/database/pgsql/pgsession_test.go new file mode 100644 index 00000000..8f991882 --- /dev/null +++ b/database/pgsql/pgsession_test.go @@ -0,0 +1,56 @@ +// Copyright 2019 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pgsql + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/coreos/clair/database/pgsql/namespace" + "github.com/coreos/clair/database/pgsql/testutil" +) + +const ( + numVulnerabilities = 100 + numFeatures = 100 +) + +func TestConcurrency(t *testing.T) { + db, cleanup := testutil.CreateTestDB(t, "concurrency") + defer cleanup() + + var wg sync.WaitGroup + // there's a limit on the number of concurrent connections in the pool + wg.Add(30) + for i := 0; i < 30; i++ { + go func() { + defer wg.Done() + nsNamespaces := testutil.GenRandomNamespaces(t, 100) + tx, err := db.Begin() + if err != nil { + panic(err) + } + + assert.Nil(t, namespace.PersistNamespaces(tx, nsNamespaces)) + if err := tx.Commit(); err != nil { + panic(err) + } + }() + } + + wg.Wait() +} diff --git a/database/pgsql/pgsql.go b/database/pgsql/pgsql.go index 4b23e014..0d0919f2 100644 --- a/database/pgsql/pgsql.go +++ b/database/pgsql/pgsql.go @@ -21,13 +21,10 @@ import ( "io/ioutil" "net/url" "strings" - "time" "gopkg.in/yaml.v2" "github.com/hashicorp/golang-lru" - "github.com/lib/pq" - "github.com/prometheus/client_golang/prometheus" "github.com/remind101/migrate" log "github.com/sirupsen/logrus" @@ -37,50 +34,10 @@ import ( "github.com/coreos/clair/pkg/pagination" ) -var ( - promErrorsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "clair_pgsql_errors_total", - Help: "Number of errors that PostgreSQL requests generated.", - }, []string{"request"}) - - promCacheHitsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "clair_pgsql_cache_hits_total", - Help: "Number of cache hits that the PostgreSQL backend did.", - }, []string{"object"}) - - promCacheQueriesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "clair_pgsql_cache_queries_total", - Help: "Number of cache queries that the PostgreSQL backend did.", - }, []string{"object"}) - - promQueryDurationMilliseconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "clair_pgsql_query_duration_milliseconds", - Help: "Time it takes to execute the database query.", - }, []string{"query", "subquery"}) - - promConcurrentLockVAFV = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "clair_pgsql_concurrent_lock_vafv_total", - Help: "Number of transactions trying to hold the exclusive Vulnerability_Affects_Feature lock.", - }) -) - func init() { - prometheus.MustRegister(promErrorsTotal) - prometheus.MustRegister(promCacheHitsTotal) - prometheus.MustRegister(promCacheQueriesTotal) - prometheus.MustRegister(promQueryDurationMilliseconds) - prometheus.MustRegister(promConcurrentLockVAFV) - database.Register("pgsql", openDatabase) } -// pgSessionCache is the session's cache, which holds the pgSQL's cache and the -// individual session's cache. Only when session.Commit is called, all the -// changes to pgSQL cache will be applied. -type pgSessionCache struct { - c *lru.ARCCache -} - type pgSQL struct { *sql.DB @@ -88,12 +45,6 @@ type pgSQL struct { config Config } -type pgSession struct { - *sql.Tx - - key pagination.Key -} - // Begin initiates a transaction to database. // // The expected transaction isolation level in this implementation is "Read @@ -109,10 +60,6 @@ func (pgSQL *pgSQL) Begin() (database.Session, error) { }, nil } -func (tx *pgSession) Commit() error { - return tx.Tx.Commit() -} - // Close closes the database and destroys if ManageDatabaseLifecycle has been specified in // the configuration. func (pgSQL *pgSQL) Close() { @@ -131,15 +78,6 @@ func (pgSQL *pgSQL) Ping() bool { return pgSQL.DB.Ping() == nil } -// Page is the representation of a page for the Postgres schema. -type Page struct { - // StartID is the ID being used as the basis for pagination across database - // results. It is used to search for an ancestry with ID >= StartID. - // - // StartID is required to be unique to every ancestry and always increasing. - StartID int64 -} - // Config is the configuration that is used by openDatabase. type Config struct { Source string @@ -313,42 +251,3 @@ func dropDatabase(source, dbName string) error { return nil } - -// handleError logs an error with an extra description and masks the error if it's an SQL one. -// The function ensures we never return plain SQL errors and leak anything. -// The function should be used for every database query error. -func handleError(desc string, err error) error { - if err == nil { - return nil - } - - if err == sql.ErrNoRows { - return commonerr.ErrNotFound - } - - log.WithError(err).WithField("Description", desc).Error("database: handled database error") - promErrorsTotal.WithLabelValues(desc).Inc() - - if _, o := err.(*pq.Error); o || err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") { - return database.ErrBackendException - } - - return err -} - -// isErrUniqueViolation determines is the given error is a unique contraint violation. -func isErrUniqueViolation(err error) bool { - pqErr, ok := err.(*pq.Error) - return ok && pqErr.Code == "23505" -} - -// observeQueryTime computes the time elapsed since `start` to represent the -// query time. -// 1. `query` is a pgSession function name. -// 2. `subquery` is a specific query or a batched query. -// 3. `start` is the time right before query is executed. -func observeQueryTime(query, subquery string, start time.Time) { - promQueryDurationMilliseconds. - WithLabelValues(query, subquery). - Observe(float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)) -} diff --git a/database/pgsql/util/error.go b/database/pgsql/util/error.go new file mode 100644 index 00000000..de724c93 --- /dev/null +++ b/database/pgsql/util/error.go @@ -0,0 +1,63 @@ +// Copyright 2019 clair authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "database/sql" + "strings" + + "github.com/coreos/clair/database" + "github.com/coreos/clair/database/pgsql/monitoring" + "github.com/coreos/clair/pkg/commonerr" + "github.com/lib/pq" + "github.com/sirupsen/logrus" +) + +// IsErrUniqueViolation determines is the given error is a unique contraint violation. +func IsErrUniqueViolation(err error) bool { + pqErr, ok := err.(*pq.Error) + return ok && pqErr.Code == "23505" +} + +// HandleError logs an error with an extra description and masks the error if it's an SQL one. +// The function ensures we never return plain SQL errors and leak anything. +// The function should be used for every database query error. +func HandleError(desc string, err error) error { + if err == nil { + return nil + } + + if err == sql.ErrNoRows { + return commonerr.ErrNotFound + } + + if pqErr, ok := err.(*pq.Error); ok { + if pqErr.Fatal() { + panic(pqErr) + } + + if pqErr.Code == "42601" { + panic("invalid query: " + desc + ", info: " + err.Error()) + } + } + + logrus.WithError(err).WithField("Description", desc).Error("database: handled database error") + monitoring.PromErrorsTotal.WithLabelValues(desc).Inc() + if _, o := err.(*pq.Error); o || err == sql.ErrTxDone || strings.HasPrefix(err.Error(), "sql:") { + return database.ErrBackendException + } + + return err +}