Merge pull request #4 from Quentin-M/reduce_tx

database: reduce InsertPackages transaction and ensure locking order.
This commit is contained in:
Quentin Machu 2015-11-13 18:11:03 -05:00
commit 8bb6c50f80

View File

@ -32,8 +32,6 @@ const (
FieldPackageVersion = "version" FieldPackageVersion = "version"
FieldPackageNextVersion = "nextVersion" FieldPackageNextVersion = "nextVersion"
FieldPackagePreviousVersion = "previousVersion" FieldPackagePreviousVersion = "previousVersion"
insertPackagesBatchSize = 5
) )
var FieldPackageAll = []string{FieldPackageOS, FieldPackageName, FieldPackageVersion, FieldPackageNextVersion, FieldPackagePreviousVersion} var FieldPackageAll = []string{FieldPackageOS, FieldPackageName, FieldPackageVersion, FieldPackageNextVersion, FieldPackagePreviousVersion}
@ -123,24 +121,11 @@ func InsertPackages(packageParameters []*Package) error {
} }
} }
// Create required data structures
t := cayley.NewTransaction()
packagesInTransaction := 0
cachedPackagesByBranch := make(map[string]map[string]*Package)
// Iterate over all the packages we need to insert // Iterate over all the packages we need to insert
for _, packageParameter := range packageParameters { for _, packageParameter := range packageParameters {
branch := packageParameter.Branch() t := cayley.NewTransaction()
// Is the package already existing ? // Is the package already existing ?
if _, branchExistsLocally := cachedPackagesByBranch[branch]; branchExistsLocally {
if pkg, _ := cachedPackagesByBranch[branch][packageParameter.Key()]; pkg != nil {
packageParameter.Node = pkg.Node
continue
}
} else {
cachedPackagesByBranch[branch] = make(map[string]*Package)
}
pkg, err := FindOnePackage(packageParameter.OS, packageParameter.Name, packageParameter.Version, []string{}) pkg, err := FindOnePackage(packageParameter.OS, packageParameter.Name, packageParameter.Version, []string{})
if err != nil && err != cerrors.ErrNotFound { if err != nil && err != cerrors.ErrNotFound {
return err return err
@ -155,9 +140,6 @@ func InsertPackages(packageParameters []*Package) error {
if err != nil { if err != nil {
return err return err
} }
for _, p := range cachedPackagesByBranch[branch] {
branchPackages = append(branchPackages, p)
}
if len(branchPackages) == 0 { if len(branchPackages) == 0 {
// The branch does not exist yet // The branch does not exist yet
@ -171,7 +153,6 @@ func InsertPackages(packageParameters []*Package) error {
Version: types.MaxVersion, Version: types.MaxVersion,
} }
endPackage.Node = endPackage.GetNode() endPackage.Node = endPackage.GetNode()
cachedPackagesByBranch[branch][endPackage.Key()] = endPackage
t.AddQuad(cayley.Quad(endPackage.Node, FieldIs, FieldPackageIsValue, "")) t.AddQuad(cayley.Quad(endPackage.Node, FieldIs, FieldPackageIsValue, ""))
t.AddQuad(cayley.Quad(endPackage.Node, FieldPackageOS, endPackage.OS, "")) t.AddQuad(cayley.Quad(endPackage.Node, FieldPackageOS, endPackage.OS, ""))
@ -188,7 +169,6 @@ func InsertPackages(packageParameters []*Package) error {
Version: packageParameter.Version, Version: packageParameter.Version,
} }
newPackage.Node = newPackage.GetNode() newPackage.Node = newPackage.GetNode()
cachedPackagesByBranch[branch][newPackage.Key()] = newPackage
t.AddQuad(cayley.Quad(newPackage.Node, FieldIs, FieldPackageIsValue, "")) t.AddQuad(cayley.Quad(newPackage.Node, FieldIs, FieldPackageIsValue, ""))
t.AddQuad(cayley.Quad(newPackage.Node, FieldPackageOS, newPackage.OS, "")) t.AddQuad(cayley.Quad(newPackage.Node, FieldPackageOS, newPackage.OS, ""))
@ -206,7 +186,6 @@ func InsertPackages(packageParameters []*Package) error {
Version: types.MinVersion, Version: types.MinVersion,
} }
startPackage.Node = startPackage.GetNode() startPackage.Node = startPackage.GetNode()
cachedPackagesByBranch[branch][startPackage.Key()] = startPackage
t.AddQuad(cayley.Quad(startPackage.Node, FieldIs, FieldPackageIsValue, "")) t.AddQuad(cayley.Quad(startPackage.Node, FieldIs, FieldPackageIsValue, ""))
t.AddQuad(cayley.Quad(startPackage.Node, FieldPackageOS, startPackage.OS, "")) t.AddQuad(cayley.Quad(startPackage.Node, FieldPackageOS, startPackage.OS, ""))
@ -230,7 +209,6 @@ func InsertPackages(packageParameters []*Package) error {
// Create the package // Create the package
newPackage := &Package{OS: packageParameter.OS, Name: packageParameter.Name, Version: packageParameter.Version} newPackage := &Package{OS: packageParameter.OS, Name: packageParameter.Name, Version: packageParameter.Version}
newPackage.Node = "package:" + utils.Hash(newPackage.Key()) newPackage.Node = "package:" + utils.Hash(newPackage.Key())
cachedPackagesByBranch[branch][newPackage.Key()] = newPackage
packageParameter.Node = newPackage.Node packageParameter.Node = newPackage.Node
t.AddQuad(cayley.Quad(newPackage.Node, FieldIs, FieldPackageIsValue, "")) t.AddQuad(cayley.Quad(newPackage.Node, FieldIs, FieldPackageIsValue, ""))
@ -273,23 +251,7 @@ func InsertPackages(packageParameters []*Package) error {
t.AddQuad(cayley.Quad(newPackage.Node, FieldPackageNextVersion, succ.Node, "")) t.AddQuad(cayley.Quad(newPackage.Node, FieldPackageNextVersion, succ.Node, ""))
} }
packagesInTransaction = packagesInTransaction + 1
// Apply transaction // Apply transaction
if packagesInTransaction >= insertPackagesBatchSize {
if err := store.ApplyTransaction(t); err != nil {
log.Errorf("failed transaction (InsertPackages): %s", err)
return ErrTransaction
}
t = cayley.NewTransaction()
cachedPackagesByBranch = make(map[string]map[string]*Package)
packagesInTransaction = 0
}
}
// Apply transaction
if packagesInTransaction > 0 {
if err := store.ApplyTransaction(t); err != nil { if err := store.ApplyTransaction(t); err != nil {
log.Errorf("failed transaction (InsertPackages): %s", err) log.Errorf("failed transaction (InsertPackages): %s", err)
return ErrTransaction return ErrTransaction