From b193814371b505b844b872826cfcc5cf1104eb98 Mon Sep 17 00:00:00 2001 From: Aaron Lindsay Date: Mon, 18 Feb 2013 20:44:00 -0500 Subject: [PATCH] Add database support, basic client-side event processing --- asink.go | 116 ++++++++++++++++++++++++++++++++++++++++------- database.go | 95 ++++++++++++++++++++++++++++++++++++++ events.go | 3 ++ storage_local.go | 39 +--------------- util.go | 44 ++++++++++++++++++ watcher.go | 10 ---- 6 files changed, 242 insertions(+), 65 deletions(-) create mode 100644 database.go create mode 100644 util.go diff --git a/asink.go b/asink.go index bcc38c1..030528c 100644 --- a/asink.go +++ b/asink.go @@ -2,13 +2,24 @@ package main import ( "code.google.com/p/goconf/conf" + "database/sql" "flag" "fmt" + "os" "os/user" "path" ) -var configFileName string +type AsinkGlobals struct { + configFileName string + syncDir string + cacheDir string + tmpDir string + db *sql.DB + storage Storage +} + +var globals AsinkGlobals func init() { const config_usage = "Config File to use" @@ -19,50 +30,121 @@ func init() { userHomeDir = u.HomeDir } - flag.StringVar(&configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage) - flag.StringVar(&configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)") + flag.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage) + flag.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)") } func main() { flag.Parse() - fmt.Println("config file:", configFileName) + fmt.Println("config file:", globals.configFileName) - config, err := conf.ReadConfigFile(configFileName) + config, err := conf.ReadConfigFile(globals.configFileName) if err != nil { fmt.Println(err) - fmt.Println("Error reading config file at ", configFileName, ". Does it exist?") + fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?") return } - storage, err := GetStorage(config) + globals.storage, err = GetStorage(config) if err != nil { fmt.Println(err) return } - syncdir, err := config.GetString("local", "syncdir") - cachedir, err := config.GetString("local", "cachedir") + globals.syncDir, err = config.GetString("local", "syncdir") + globals.cacheDir, err = config.GetString("local", "cachedir") + globals.tmpDir, err = config.GetString("local", "tmpdir") - fmt.Println(syncdir) - fmt.Println(cachedir) - fmt.Println(storage) + //make sure all the necessary directories exist + err = ensureDirExists(globals.syncDir) + if err != nil { + panic(err) + } + err = ensureDirExists(globals.cacheDir) + if err != nil { + panic(err) + } + err = ensureDirExists(globals.tmpDir) + if err != nil { + panic(err) + } + + //TODO FIXME REMOVEME + fmt.Println(globals.syncDir) + fmt.Println(globals.cacheDir) + fmt.Println(globals.tmpDir) + fmt.Println(globals.storage) + //TODO FIXME REMOVEME fileUpdates := make(chan *Event) - go StartWatching(syncdir, fileUpdates) + go StartWatching(globals.syncDir, fileUpdates) + + globals.db, err = GetAndInitDB(config) + if err != nil { + panic(err) + return + } for { event := <-fileUpdates - ProcessEvent(storage, event) + ProcessEvent(globals, event) } } -func ProcessEvent(storage Storage, event *Event) { - fmt.Println(event) +func ProcessEvent(globals AsinkGlobals, event *Event) { + //add to database + err := DatabaseAddEvent(globals.db, event) + if err != nil { + panic(err) + } if event.IsUpdate() { - err := storage.Put(event.Path, event.Hash) + //copy to tmp + tmpfilename, err := copyToTmp(event.Path, globals.tmpDir) if err != nil { panic(err) } + event.Status |= COPIED_TO_TMP + + //get the file's hash + hash, err := HashFile(tmpfilename) + event.Hash = hash + if err != nil { + panic(err) + } + event.Status |= HASHED + + //rename to local cache w/ filename=hash + err = os.Rename(tmpfilename, path.Join(globals.cacheDir, event.Hash)) + if err != nil { + err := os.Remove(tmpfilename) + if err != nil { + panic(err) + } + } + event.Status |= CACHED + + //update database + err = DatabaseUpdateEvent(globals.db, event) + if err != nil { + panic(err) + } + + //upload file to remote storage + err = globals.storage.Put(event.Path, event.Hash) + if err != nil { + panic(err) + } + event.Status |= UPLOADED + + //update database again + err = DatabaseUpdateEvent(globals.db, event) + if err != nil { + panic(err) + } + } + fmt.Println(event) + + //TODO notify server of new file } diff --git a/database.go b/database.go new file mode 100644 index 0000000..d1c444e --- /dev/null +++ b/database.go @@ -0,0 +1,95 @@ +package main + +import ( + "code.google.com/p/goconf/conf" + "database/sql" + "errors" + _ "github.com/mattn/go-sqlite3" + "strconv" +) + +func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) { + dbLocation, err := config.GetString("local", "dblocation") + if err != nil { + return nil, errors.New("Error: database location not specified in config file.") + } + + db, err := sql.Open("sqlite3", dbLocation) + if err != nil { + return nil, err + } + + //make sure the events table is created + tx, err := db.Begin() + if err != nil { + return nil, err + } + rows, err := tx.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='events';") + if err != nil { + return nil, err + } + if !rows.Next() { + //if this is false, it means no rows were returned + tx.Exec("CREATE TABLE events (id INTEGER, localid INTEGER PRIMARY KEY ASC, type INTEGER, status INTEGER, path TEXT, hash TEXT, timestamp INTEGER, permissions INTEGER);") + // tx.Exec("CREATE INDEX IF NOT EXISTS localididx on events (localid)") + tx.Exec("CREATE INDEX IF NOT EXISTS ididx on events (id);") + tx.Exec("CREATE INDEX IF NOT EXISTS pathidx on events (path);") + } + err = tx.Commit() + if err != nil { + return nil, err + } + + return db, nil +} + +func DatabaseAddEvent(db *sql.DB, e *Event) error { + tx, err := db.Begin() + if err != nil { + return err + } + result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp.UnixNano(), 0) + if err != nil { + return err + } + id, err := result.LastInsertId() + if err != nil { + return err + } + err = tx.Commit() + if err != nil { + return err + } + + e.LocalId = id + e.InDB = true + return nil +} + +func DatabaseUpdateEvent(db *sql.DB, e *Event) error { + if !e.InDB { + return errors.New("Attempting to update an event in the database which hasn't been previously added.") + } + + tx, err := db.Begin() + if err != nil { + return err + } + result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp.UnixNano(), 0, e.LocalId) + if err != nil { + return err + } + rows, err := result.RowsAffected() + if err != nil { + return err + } + if rows != 1 { + return errors.New("Updated " + strconv.Itoa(int(rows)) + " row(s) when intending to update 1 event row.") + } + err = tx.Commit() + if err != nil { + return err + } + + return nil +} diff --git a/events.go b/events.go index 47b7f21..5042800 100644 --- a/events.go +++ b/events.go @@ -25,11 +25,14 @@ const ( ) type Event struct { + Id int64 + LocalId int64 Type EventType Status EventStatus Path string Hash string Timestamp time.Time + InDB bool //defaults to false } func (e Event) IsUpdate() bool { diff --git a/storage_local.go b/storage_local.go index 9187fa7..7648966 100644 --- a/storage_local.go +++ b/storage_local.go @@ -4,7 +4,6 @@ import ( "code.google.com/p/goconf/conf" "errors" "io" - "io/ioutil" "os" "path" ) @@ -14,21 +13,6 @@ type LocalStorage struct { tmpSubdir string } -func ensureDirExists(dir string) error { - _, err := os.Lstat(dir) - if err != nil { - fi, err := os.Lstat(path.Dir(dir)) - if err != nil { - return err - } - err = os.Mkdir(dir, fi.Mode().Perm()) - if err != nil { - return err - } - } - return nil -} - func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) { storageDir, err := config.GetString("storage", "dir") if err != nil { @@ -52,29 +36,8 @@ func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) { return ls, nil } -func (ls *LocalStorage) copyToTmp(src string) (string, error) { - infile, err := os.Open(src) - if err != nil { - return "", err - } - defer infile.Close() - - outfile, err := ioutil.TempFile(ls.tmpSubdir, "asink") - if err != nil { - return "", err - } - defer outfile.Close() - - _, err = io.Copy(outfile, infile) - if err != nil { - return "", err - } - - return outfile.Name(), nil -} - func (ls *LocalStorage) Put(filename string, hash string) (e error) { - tmpfile, err := ls.copyToTmp(filename) + tmpfile, err := copyToTmp(filename, ls.tmpSubdir) if err != nil { return err } diff --git a/util.go b/util.go new file mode 100644 index 0000000..0232d52 --- /dev/null +++ b/util.go @@ -0,0 +1,44 @@ +package main + +import ( + "io" + "io/ioutil" + "os" + "path" +) + +func ensureDirExists(dir string) error { + _, err := os.Lstat(dir) + if err != nil { + fi, err := os.Lstat(path.Dir(dir)) + if err != nil { + return err + } + err = os.Mkdir(dir, fi.Mode().Perm()) + if err != nil { + return err + } + } + return nil +} + +func copyToTmp(src string, tmpdir string) (string, error) { + infile, err := os.Open(src) + if err != nil { + return "", err + } + defer infile.Close() + + outfile, err := ioutil.TempFile(tmpdir, "asink") + if err != nil { + return "", err + } + defer outfile.Close() + + _, err = io.Copy(outfile, infile) + if err != nil { + return "", err + } + + return outfile.Name(), nil +} diff --git a/watcher.go b/watcher.go index c1e88f2..4e56149 100644 --- a/watcher.go +++ b/watcher.go @@ -50,16 +50,6 @@ func StartWatching(watchDir string, fileUpdates chan *Event) { event.Path = ev.Name event.Timestamp = time.Now() - if event.IsUpdate() { - event.Hash, err = HashFile(ev.Name) - if err != nil { - panic("file deleted already?") - continue - } - } else { - event.Hash = "" - } - fileUpdates <- event case err := <-watcher.Error: