Surround sqlite3 transactions with a lock to handle concurrency
This commit is contained in:
		@@ -5,7 +5,6 @@ import (
 | 
			
		||||
	"asink/util"
 | 
			
		||||
	"bytes"
 | 
			
		||||
	"code.google.com/p/goconf/conf"
 | 
			
		||||
	"database/sql"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"flag"
 | 
			
		||||
@@ -23,7 +22,7 @@ type AsinkGlobals struct {
 | 
			
		||||
	syncDir        string
 | 
			
		||||
	cacheDir       string
 | 
			
		||||
	tmpDir         string
 | 
			
		||||
	db             *sql.DB
 | 
			
		||||
	db             *AsinkDB
 | 
			
		||||
	storage        Storage
 | 
			
		||||
	server         string
 | 
			
		||||
	port           int
 | 
			
		||||
@@ -91,13 +90,13 @@ func main() {
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		event := <-fileUpdates
 | 
			
		||||
		ProcessEvent(globals, event)
 | 
			
		||||
		go ProcessEvent(globals, event)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
	//add to database
 | 
			
		||||
	err := DatabaseAddEvent(globals.db, event)
 | 
			
		||||
	err := globals.db.DatabaseAddEvent(event)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -129,7 +128,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
		event.Status |= asink.CACHED
 | 
			
		||||
 | 
			
		||||
		//update database
 | 
			
		||||
		err = DatabaseUpdateEvent(globals.db, event)
 | 
			
		||||
		err = globals.db.DatabaseUpdateEvent(event)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -142,7 +141,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
		event.Status |= asink.UPLOADED
 | 
			
		||||
 | 
			
		||||
		//update database again
 | 
			
		||||
		err = DatabaseUpdateEvent(globals.db, event)
 | 
			
		||||
		err = globals.db.DatabaseUpdateEvent(event)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			panic(err)
 | 
			
		||||
		}
 | 
			
		||||
@@ -156,7 +155,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	event.Status |= asink.ON_SERVER
 | 
			
		||||
	err = DatabaseUpdateEvent(globals.db, event)
 | 
			
		||||
	err = globals.db.DatabaseUpdateEvent(event)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		panic(err) //TODO probably, definitely, none of these should panic
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -7,9 +7,15 @@ import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	_ "github.com/mattn/go-sqlite3"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) {
 | 
			
		||||
type AsinkDB struct {
 | 
			
		||||
	db *sql.DB
 | 
			
		||||
	lock sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func GetAndInitDB(config *conf.ConfigFile) (*AsinkDB, error) {
 | 
			
		||||
	dbLocation, err := config.GetString("local", "dblocation")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, errors.New("Error: database location not specified in config file.")
 | 
			
		||||
@@ -41,14 +47,25 @@ func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return db, nil
 | 
			
		||||
	ret := new(AsinkDB)
 | 
			
		||||
	ret.db = db
 | 
			
		||||
	return ret, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DatabaseAddEvent(db *sql.DB, e *asink.Event) error {
 | 
			
		||||
	tx, err := db.Begin()
 | 
			
		||||
func (adb *AsinkDB) DatabaseAddEvent(e *asink.Event) (err error) {
 | 
			
		||||
	adb.lock.Lock()
 | 
			
		||||
	tx, err := adb.db.Begin()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	//make sure the transaction gets rolled back on error, and the database gets unlocked
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			tx.Rollback()
 | 
			
		||||
		}
 | 
			
		||||
		adb.lock.Unlock()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
@@ -67,15 +84,24 @@ func DatabaseAddEvent(db *sql.DB, e *asink.Event) error {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func DatabaseUpdateEvent(db *sql.DB, e *asink.Event) error {
 | 
			
		||||
func (adb *AsinkDB) DatabaseUpdateEvent(e *asink.Event) (err error) {
 | 
			
		||||
	if !e.InDB {
 | 
			
		||||
		return errors.New("Attempting to update an event in the database which hasn't been previously added.")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tx, err := db.Begin()
 | 
			
		||||
	adb.lock.Lock()
 | 
			
		||||
	tx, err := adb.db.Begin()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	//make sure the transaction gets rolled back on error, and the database gets unlocked
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			tx.Rollback()
 | 
			
		||||
		}
 | 
			
		||||
		adb.lock.Unlock()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, e.Permissions, e.LocalId)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user