diff --git a/database/pgsql/complex_test.go b/database/pgsql/complex_test.go index af35d1f9..5b42ccfa 100644 --- a/database/pgsql/complex_test.go +++ b/database/pgsql/complex_test.go @@ -15,7 +15,6 @@ package pgsql import ( - "fmt" "math/rand" "runtime" "strconv" @@ -108,34 +107,29 @@ func testGenRandomVulnerabilityAndNamespacedFeature(t *testing.T, store database } func TestConcurrency(t *testing.T) { - store, err := openDatabaseForTest("Concurrency", false) - if !assert.Nil(t, err) { - t.FailNow() - } - defer store.Close() - start := time.Now() + store, cleanup := createTestPgSQL(t, "concurrency") + defer cleanup() + var wg sync.WaitGroup - wg.Add(100) - for i := 0; i < 100; i++ { + // there's a limit on the number of concurrent connections in the pool + wg.Add(30) + for i := 0; i < 30; i++ { go func() { defer wg.Done() nsNamespaces := genRandomNamespaces(t, 100) tx, err := store.Begin() - if !assert.Nil(t, err) { - t.FailNow() - } - assert.Nil(t, tx.PersistNamespaces(nsNamespaces)) - tx.Commit() + require.Nil(t, err) + require.Nil(t, tx.PersistNamespaces(nsNamespaces)) + require.Nil(t, tx.Commit()) }() } + wg.Wait() - fmt.Println("total", time.Since(start)) } func TestCaching(t *testing.T) { - store, err := openDatabaseForTest("Caching", false) - require.Nil(t, err) - defer store.Close() + store, cleanup := createTestPgSQL(t, "caching") + defer cleanup() nsFeatures, vulnerabilities := testGenRandomVulnerabilityAndNamespacedFeature(t, store) tx, err := store.Begin() @@ -145,8 +139,6 @@ func TestCaching(t *testing.T) { require.Nil(t, tx.Commit()) tx, err = store.Begin() - require.Nil(t, tx.Commit()) - require.Nil(t, tx.InsertVulnerabilities(vulnerabilities)) require.Nil(t, tx.Commit()) diff --git a/database/pgsql/lock.go b/database/pgsql/lock.go index 5d9489c6..0fd73f5b 100644 --- a/database/pgsql/lock.go +++ b/database/pgsql/lock.go @@ -15,7 +15,6 @@ package pgsql import ( - "errors" "time" log "github.com/sirupsen/logrus" @@ -25,23 +24,20 @@ const ( searchLock = `SELECT until FROM Lock WHERE name = $1` updateLock = `UPDATE Lock SET until = $3 WHERE name = $1 AND owner = $2` removeLock = `DELETE FROM Lock WHERE name = $1 AND owner = $2` - removeLockExpired = `DELETE FROM LOCK WHERE until < CURRENT_TIMESTAMP` + removeLockExpired = `DELETE FROM LOCK WHERE until < $1` - soiLock = `WITH new_lock AS ( + soiLock = ` + WITH new_lock AS ( INSERT INTO lock (name, owner, until) - VALUES ( $1, $2, $3) - WHERE NOT EXISTS (SELECT id FROM lock WHERE name = $1) - RETURNING owner, until + SELECT CAST ($1 AS TEXT), CAST ($2 AS TEXT), CAST ($3 AS TIMESTAMP) + WHERE NOT EXISTS (SELECT id FROM lock WHERE name = $1) + RETURNING owner, until ) SELECT * FROM new_lock UNION SELECT owner, until FROM lock WHERE name = $1` ) -var ( - errLockNotFound = errors.New("lock is not in database") -) - func (tx *pgSession) AcquireLock(lockName, whoami string, desiredDuration time.Duration) (bool, time.Time, error) { if lockName == "" || whoami == "" || desiredDuration == 0 { panic("invalid lock parameters") @@ -52,7 +48,7 @@ func (tx *pgSession) AcquireLock(lockName, whoami string, desiredDuration time.D } var ( - desiredLockedUntil = time.Now().Add(desiredDuration) + desiredLockedUntil = time.Now().UTC().Add(desiredDuration) lockedUntil time.Time lockOwner string @@ -98,7 +94,7 @@ func (tx *pgSession) ReleaseLock(name, owner string) error { func (tx *pgSession) pruneLocks() error { defer observeQueryTime("pruneLocks", "all", time.Now()) - if r, err := tx.Exec(removeLockExpired); err != nil { + if r, err := tx.Exec(removeLockExpired, time.Now().UTC()); err != nil { return handleError("removeLockExpired", err) } else if affected, err := r.RowsAffected(); err != nil { return handleError("removeLockExpired", err) diff --git a/database/pgsql/lock_test.go b/database/pgsql/lock_test.go index 87806ca3..538961b6 100644 --- a/database/pgsql/lock_test.go +++ b/database/pgsql/lock_test.go @@ -23,8 +23,8 @@ import ( ) func TestAcquireLockReturnsExistingLockDuration(t *testing.T) { - datastore, tx := openSessionForTest(t, "Lock", true) - defer datastore.Close() + tx, cleanup := createTestPgSessionWithFixtures(t, "Lock") + defer cleanup() acquired, originalExpiration, err := tx.AcquireLock("test1", "owner1", time.Minute) require.Nil(t, err) @@ -51,7 +51,7 @@ func TestLock(t *testing.T) { // lock again by itself, the previous lock is not expired yet. l, _, err = tx.AcquireLock("test1", "owner1", time.Minute) assert.Nil(t, err) - assert.False(t, l) + assert.True(t, l) // acquire lock no-op when owner already has the lock. tx = restartSession(t, datastore, tx, false) // Try to renew the same lock with another owner. diff --git a/database/pgsql/pgsql_test.go b/database/pgsql/pgsql_test.go index 43be4f65..b79f0d98 100644 --- a/database/pgsql/pgsql_test.go +++ b/database/pgsql/pgsql_test.go @@ -70,8 +70,6 @@ func genTemplateDatabase(name string, loadFixture bool) (sourceURL string, dbNam } if loadFixture { - log.Info("pgsql: loading fixtures") - d, err := ioutil.ReadFile(fixturePath) if err != nil { panic(err) @@ -219,8 +217,6 @@ func generateTestConfig(testName string, loadFixture bool, manageLife bool) data source = fmt.Sprintf(sourceEnv, dbName) } - log.Infof("pagination key for current test: %s", testPaginationKey.String()) - return database.RegistrableComponentConfig{ Options: map[string]interface{}{ "source": source, @@ -254,7 +250,6 @@ func openSessionForTest(t *testing.T, name string, loadFixture bool) (*pgSQL, *p t.FailNow() } - log.Infof("transaction pagination key: '%s'", tx.(*pgSession).key.String()) return store, tx.(*pgSession) }