commit
1f0bc1ea5f
@ -15,7 +15,6 @@
|
|||||||
package pgsql
|
package pgsql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
@ -108,34 +107,29 @@ func testGenRandomVulnerabilityAndNamespacedFeature(t *testing.T, store database
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestConcurrency(t *testing.T) {
|
func TestConcurrency(t *testing.T) {
|
||||||
store, err := openDatabaseForTest("Concurrency", false)
|
store, cleanup := createTestPgSQL(t, "concurrency")
|
||||||
if !assert.Nil(t, err) {
|
defer cleanup()
|
||||||
t.FailNow()
|
|
||||||
}
|
|
||||||
defer store.Close()
|
|
||||||
start := time.Now()
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(100)
|
// there's a limit on the number of concurrent connections in the pool
|
||||||
for i := 0; i < 100; i++ {
|
wg.Add(30)
|
||||||
|
for i := 0; i < 30; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
nsNamespaces := genRandomNamespaces(t, 100)
|
nsNamespaces := genRandomNamespaces(t, 100)
|
||||||
tx, err := store.Begin()
|
tx, err := store.Begin()
|
||||||
if !assert.Nil(t, err) {
|
require.Nil(t, err)
|
||||||
t.FailNow()
|
require.Nil(t, tx.PersistNamespaces(nsNamespaces))
|
||||||
}
|
require.Nil(t, tx.Commit())
|
||||||
assert.Nil(t, tx.PersistNamespaces(nsNamespaces))
|
|
||||||
tx.Commit()
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
fmt.Println("total", time.Since(start))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCaching(t *testing.T) {
|
func TestCaching(t *testing.T) {
|
||||||
store, err := openDatabaseForTest("Caching", false)
|
store, cleanup := createTestPgSQL(t, "caching")
|
||||||
require.Nil(t, err)
|
defer cleanup()
|
||||||
defer store.Close()
|
|
||||||
|
|
||||||
nsFeatures, vulnerabilities := testGenRandomVulnerabilityAndNamespacedFeature(t, store)
|
nsFeatures, vulnerabilities := testGenRandomVulnerabilityAndNamespacedFeature(t, store)
|
||||||
tx, err := store.Begin()
|
tx, err := store.Begin()
|
||||||
@ -145,8 +139,6 @@ func TestCaching(t *testing.T) {
|
|||||||
require.Nil(t, tx.Commit())
|
require.Nil(t, tx.Commit())
|
||||||
|
|
||||||
tx, err = store.Begin()
|
tx, err = store.Begin()
|
||||||
require.Nil(t, tx.Commit())
|
|
||||||
|
|
||||||
require.Nil(t, tx.InsertVulnerabilities(vulnerabilities))
|
require.Nil(t, tx.InsertVulnerabilities(vulnerabilities))
|
||||||
require.Nil(t, tx.Commit())
|
require.Nil(t, tx.Commit())
|
||||||
|
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
package pgsql
|
package pgsql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@ -25,23 +24,20 @@ const (
|
|||||||
searchLock = `SELECT until FROM Lock WHERE name = $1`
|
searchLock = `SELECT until FROM Lock WHERE name = $1`
|
||||||
updateLock = `UPDATE Lock SET until = $3 WHERE name = $1 AND owner = $2`
|
updateLock = `UPDATE Lock SET until = $3 WHERE name = $1 AND owner = $2`
|
||||||
removeLock = `DELETE FROM Lock WHERE name = $1 AND owner = $2`
|
removeLock = `DELETE FROM Lock WHERE name = $1 AND owner = $2`
|
||||||
removeLockExpired = `DELETE FROM LOCK WHERE until < CURRENT_TIMESTAMP`
|
removeLockExpired = `DELETE FROM LOCK WHERE until < $1`
|
||||||
|
|
||||||
soiLock = `WITH new_lock AS (
|
soiLock = `
|
||||||
|
WITH new_lock AS (
|
||||||
INSERT INTO lock (name, owner, until)
|
INSERT INTO lock (name, owner, until)
|
||||||
VALUES ( $1, $2, $3)
|
SELECT CAST ($1 AS TEXT), CAST ($2 AS TEXT), CAST ($3 AS TIMESTAMP)
|
||||||
WHERE NOT EXISTS (SELECT id FROM lock WHERE name = $1)
|
WHERE NOT EXISTS (SELECT id FROM lock WHERE name = $1)
|
||||||
RETURNING owner, until
|
RETURNING owner, until
|
||||||
)
|
)
|
||||||
SELECT * FROM new_lock
|
SELECT * FROM new_lock
|
||||||
UNION
|
UNION
|
||||||
SELECT owner, until FROM lock WHERE name = $1`
|
SELECT owner, until FROM lock WHERE name = $1`
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
errLockNotFound = errors.New("lock is not in database")
|
|
||||||
)
|
|
||||||
|
|
||||||
func (tx *pgSession) AcquireLock(lockName, whoami string, desiredDuration time.Duration) (bool, time.Time, error) {
|
func (tx *pgSession) AcquireLock(lockName, whoami string, desiredDuration time.Duration) (bool, time.Time, error) {
|
||||||
if lockName == "" || whoami == "" || desiredDuration == 0 {
|
if lockName == "" || whoami == "" || desiredDuration == 0 {
|
||||||
panic("invalid lock parameters")
|
panic("invalid lock parameters")
|
||||||
@ -52,7 +48,7 @@ func (tx *pgSession) AcquireLock(lockName, whoami string, desiredDuration time.D
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
desiredLockedUntil = time.Now().Add(desiredDuration)
|
desiredLockedUntil = time.Now().UTC().Add(desiredDuration)
|
||||||
|
|
||||||
lockedUntil time.Time
|
lockedUntil time.Time
|
||||||
lockOwner string
|
lockOwner string
|
||||||
@ -98,7 +94,7 @@ func (tx *pgSession) ReleaseLock(name, owner string) error {
|
|||||||
func (tx *pgSession) pruneLocks() error {
|
func (tx *pgSession) pruneLocks() error {
|
||||||
defer observeQueryTime("pruneLocks", "all", time.Now())
|
defer observeQueryTime("pruneLocks", "all", time.Now())
|
||||||
|
|
||||||
if r, err := tx.Exec(removeLockExpired); err != nil {
|
if r, err := tx.Exec(removeLockExpired, time.Now().UTC()); err != nil {
|
||||||
return handleError("removeLockExpired", err)
|
return handleError("removeLockExpired", err)
|
||||||
} else if affected, err := r.RowsAffected(); err != nil {
|
} else if affected, err := r.RowsAffected(); err != nil {
|
||||||
return handleError("removeLockExpired", err)
|
return handleError("removeLockExpired", err)
|
||||||
|
@ -23,8 +23,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestAcquireLockReturnsExistingLockDuration(t *testing.T) {
|
func TestAcquireLockReturnsExistingLockDuration(t *testing.T) {
|
||||||
datastore, tx := openSessionForTest(t, "Lock", true)
|
tx, cleanup := createTestPgSessionWithFixtures(t, "Lock")
|
||||||
defer datastore.Close()
|
defer cleanup()
|
||||||
|
|
||||||
acquired, originalExpiration, err := tx.AcquireLock("test1", "owner1", time.Minute)
|
acquired, originalExpiration, err := tx.AcquireLock("test1", "owner1", time.Minute)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
@ -51,7 +51,7 @@ func TestLock(t *testing.T) {
|
|||||||
// lock again by itself, the previous lock is not expired yet.
|
// lock again by itself, the previous lock is not expired yet.
|
||||||
l, _, err = tx.AcquireLock("test1", "owner1", time.Minute)
|
l, _, err = tx.AcquireLock("test1", "owner1", time.Minute)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
assert.False(t, l)
|
assert.True(t, l) // acquire lock no-op when owner already has the lock.
|
||||||
tx = restartSession(t, datastore, tx, false)
|
tx = restartSession(t, datastore, tx, false)
|
||||||
|
|
||||||
// Try to renew the same lock with another owner.
|
// Try to renew the same lock with another owner.
|
||||||
|
@ -70,8 +70,6 @@ func genTemplateDatabase(name string, loadFixture bool) (sourceURL string, dbNam
|
|||||||
}
|
}
|
||||||
|
|
||||||
if loadFixture {
|
if loadFixture {
|
||||||
log.Info("pgsql: loading fixtures")
|
|
||||||
|
|
||||||
d, err := ioutil.ReadFile(fixturePath)
|
d, err := ioutil.ReadFile(fixturePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -219,8 +217,6 @@ func generateTestConfig(testName string, loadFixture bool, manageLife bool) data
|
|||||||
source = fmt.Sprintf(sourceEnv, dbName)
|
source = fmt.Sprintf(sourceEnv, dbName)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("pagination key for current test: %s", testPaginationKey.String())
|
|
||||||
|
|
||||||
return database.RegistrableComponentConfig{
|
return database.RegistrableComponentConfig{
|
||||||
Options: map[string]interface{}{
|
Options: map[string]interface{}{
|
||||||
"source": source,
|
"source": source,
|
||||||
@ -254,7 +250,6 @@ func openSessionForTest(t *testing.T, name string, loadFixture bool) (*pgSQL, *p
|
|||||||
t.FailNow()
|
t.FailNow()
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("transaction pagination key: '%s'", tx.(*pgSession).key.String())
|
|
||||||
return store, tx.(*pgSession)
|
return store, tx.(*pgSession)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user