From 11bcf164c652e429b222b49a1e4c5e23163c3515 Mon Sep 17 00:00:00 2001 From: Aaron Lindsay Date: Sun, 10 Mar 2013 23:13:30 -0400 Subject: [PATCH] Surround sqlite3 transactions with a lock to handle concurrency --- client/asink.go | 13 ++++++------- client/database.go | 38 ++++++++++++++++++++++++++++++++------ 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/client/asink.go b/client/asink.go index 9b7cd86..c9b46ee 100644 --- a/client/asink.go +++ b/client/asink.go @@ -5,7 +5,6 @@ import ( "asink/util" "bytes" "code.google.com/p/goconf/conf" - "database/sql" "encoding/json" "errors" "flag" @@ -23,7 +22,7 @@ type AsinkGlobals struct { syncDir string cacheDir string tmpDir string - db *sql.DB + db *AsinkDB storage Storage server string port int @@ -91,13 +90,13 @@ func main() { for { event := <-fileUpdates - ProcessEvent(globals, event) + go ProcessEvent(globals, event) } } func ProcessEvent(globals AsinkGlobals, event *asink.Event) { //add to database - err := DatabaseAddEvent(globals.db, event) + err := globals.db.DatabaseAddEvent(event) if err != nil { panic(err) } @@ -129,7 +128,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { event.Status |= asink.CACHED //update database - err = DatabaseUpdateEvent(globals.db, event) + err = globals.db.DatabaseUpdateEvent(event) if err != nil { panic(err) } @@ -142,7 +141,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { event.Status |= asink.UPLOADED //update database again - err = DatabaseUpdateEvent(globals.db, event) + err = globals.db.DatabaseUpdateEvent(event) if err != nil { panic(err) } @@ -156,7 +155,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { } event.Status |= asink.ON_SERVER - err = DatabaseUpdateEvent(globals.db, event) + err = globals.db.DatabaseUpdateEvent(event) if err != nil { panic(err) //TODO probably, definitely, none of these should panic } diff --git a/client/database.go b/client/database.go index e4eefb1..3c7baf9 100644 --- a/client/database.go +++ b/client/database.go @@ -7,9 +7,15 @@ import ( "errors" _ "github.com/mattn/go-sqlite3" "strconv" + "sync" ) -func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) { +type AsinkDB struct { + db *sql.DB + lock sync.Mutex +} + +func GetAndInitDB(config *conf.ConfigFile) (*AsinkDB, error) { dbLocation, err := config.GetString("local", "dblocation") if err != nil { return nil, errors.New("Error: database location not specified in config file.") @@ -41,14 +47,25 @@ func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) { return nil, err } - return db, nil + ret := new(AsinkDB) + ret.db = db + return ret, nil } -func DatabaseAddEvent(db *sql.DB, e *asink.Event) error { - tx, err := db.Begin() +func (adb *AsinkDB) DatabaseAddEvent(e *asink.Event) (err error) { + adb.lock.Lock() + tx, err := adb.db.Begin() if err != nil { return err } + //make sure the transaction gets rolled back on error, and the database gets unlocked + defer func() { + if err != nil { + tx.Rollback() + } + adb.lock.Unlock() + }() + result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions) if err != nil { return err @@ -67,15 +84,24 @@ func DatabaseAddEvent(db *sql.DB, e *asink.Event) error { return nil } -func DatabaseUpdateEvent(db *sql.DB, e *asink.Event) error { +func (adb *AsinkDB) DatabaseUpdateEvent(e *asink.Event) (err error) { if !e.InDB { return errors.New("Attempting to update an event in the database which hasn't been previously added.") } - tx, err := db.Begin() + adb.lock.Lock() + tx, err := adb.db.Begin() if err != nil { return err } + //make sure the transaction gets rolled back on error, and the database gets unlocked + defer func() { + if err != nil { + tx.Rollback() + } + adb.lock.Unlock() + }() + result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions, e.LocalId) if err != nil { return err