Add database support, basic client-side event processing

This commit is contained in:
Aaron Lindsay 2013-02-18 20:44:00 -05:00
parent 6975ef035c
commit b193814371
6 changed files with 242 additions and 65 deletions

116
asink.go
View File

@ -2,13 +2,24 @@ package main
import ( import (
"code.google.com/p/goconf/conf" "code.google.com/p/goconf/conf"
"database/sql"
"flag" "flag"
"fmt" "fmt"
"os"
"os/user" "os/user"
"path" "path"
) )
var configFileName string type AsinkGlobals struct {
configFileName string
syncDir string
cacheDir string
tmpDir string
db *sql.DB
storage Storage
}
var globals AsinkGlobals
func init() { func init() {
const config_usage = "Config File to use" const config_usage = "Config File to use"
@ -19,50 +30,121 @@ func init() {
userHomeDir = u.HomeDir userHomeDir = u.HomeDir
} }
flag.StringVar(&configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage) flag.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage)
flag.StringVar(&configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)") flag.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)")
} }
func main() { func main() {
flag.Parse() flag.Parse()
fmt.Println("config file:", configFileName) fmt.Println("config file:", globals.configFileName)
config, err := conf.ReadConfigFile(configFileName) config, err := conf.ReadConfigFile(globals.configFileName)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
fmt.Println("Error reading config file at ", configFileName, ". Does it exist?") fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?")
return return
} }
storage, err := GetStorage(config) globals.storage, err = GetStorage(config)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
} }
syncdir, err := config.GetString("local", "syncdir") globals.syncDir, err = config.GetString("local", "syncdir")
cachedir, err := config.GetString("local", "cachedir") globals.cacheDir, err = config.GetString("local", "cachedir")
globals.tmpDir, err = config.GetString("local", "tmpdir")
fmt.Println(syncdir) //make sure all the necessary directories exist
fmt.Println(cachedir) err = ensureDirExists(globals.syncDir)
fmt.Println(storage) if err != nil {
panic(err)
}
err = ensureDirExists(globals.cacheDir)
if err != nil {
panic(err)
}
err = ensureDirExists(globals.tmpDir)
if err != nil {
panic(err)
}
//TODO FIXME REMOVEME
fmt.Println(globals.syncDir)
fmt.Println(globals.cacheDir)
fmt.Println(globals.tmpDir)
fmt.Println(globals.storage)
//TODO FIXME REMOVEME
fileUpdates := make(chan *Event) fileUpdates := make(chan *Event)
go StartWatching(syncdir, fileUpdates) go StartWatching(globals.syncDir, fileUpdates)
globals.db, err = GetAndInitDB(config)
if err != nil {
panic(err)
return
}
for { for {
event := <-fileUpdates event := <-fileUpdates
ProcessEvent(storage, event) ProcessEvent(globals, event)
} }
} }
func ProcessEvent(storage Storage, event *Event) { func ProcessEvent(globals AsinkGlobals, event *Event) {
fmt.Println(event) //add to database
err := DatabaseAddEvent(globals.db, event)
if err != nil {
panic(err)
}
if event.IsUpdate() { if event.IsUpdate() {
err := storage.Put(event.Path, event.Hash) //copy to tmp
tmpfilename, err := copyToTmp(event.Path, globals.tmpDir)
if err != nil {
panic(err)
}
event.Status |= COPIED_TO_TMP
//get the file's hash
hash, err := HashFile(tmpfilename)
event.Hash = hash
if err != nil {
panic(err)
}
event.Status |= HASHED
//rename to local cache w/ filename=hash
err = os.Rename(tmpfilename, path.Join(globals.cacheDir, event.Hash))
if err != nil {
err := os.Remove(tmpfilename)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
event.Status |= CACHED
//update database
err = DatabaseUpdateEvent(globals.db, event)
if err != nil {
panic(err)
}
//upload file to remote storage
err = globals.storage.Put(event.Path, event.Hash)
if err != nil {
panic(err)
}
event.Status |= UPLOADED
//update database again
err = DatabaseUpdateEvent(globals.db, event)
if err != nil {
panic(err)
}
}
fmt.Println(event)
//TODO notify server of new file
} }

95
database.go Normal file
View File

@ -0,0 +1,95 @@
package main
import (
"code.google.com/p/goconf/conf"
"database/sql"
"errors"
_ "github.com/mattn/go-sqlite3"
"strconv"
)
func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) {
dbLocation, err := config.GetString("local", "dblocation")
if err != nil {
return nil, errors.New("Error: database location not specified in config file.")
}
db, err := sql.Open("sqlite3", dbLocation)
if err != nil {
return nil, err
}
//make sure the events table is created
tx, err := db.Begin()
if err != nil {
return nil, err
}
rows, err := tx.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='events';")
if err != nil {
return nil, err
}
if !rows.Next() {
//if this is false, it means no rows were returned
tx.Exec("CREATE TABLE events (id INTEGER, localid INTEGER PRIMARY KEY ASC, type INTEGER, status INTEGER, path TEXT, hash TEXT, timestamp INTEGER, permissions INTEGER);")
// tx.Exec("CREATE INDEX IF NOT EXISTS localididx on events (localid)")
tx.Exec("CREATE INDEX IF NOT EXISTS ididx on events (id);")
tx.Exec("CREATE INDEX IF NOT EXISTS pathidx on events (path);")
}
err = tx.Commit()
if err != nil {
return nil, err
}
return db, nil
}
func DatabaseAddEvent(db *sql.DB, e *Event) error {
tx, err := db.Begin()
if err != nil {
return err
}
result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp.UnixNano(), 0)
if err != nil {
return err
}
id, err := result.LastInsertId()
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
e.LocalId = id
e.InDB = true
return nil
}
func DatabaseUpdateEvent(db *sql.DB, e *Event) error {
if !e.InDB {
return errors.New("Attempting to update an event in the database which hasn't been previously added.")
}
tx, err := db.Begin()
if err != nil {
return err
}
result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp.UnixNano(), 0, e.LocalId)
if err != nil {
return err
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows != 1 {
return errors.New("Updated " + strconv.Itoa(int(rows)) + " row(s) when intending to update 1 event row.")
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}

View File

@ -25,11 +25,14 @@ const (
) )
type Event struct { type Event struct {
Id int64
LocalId int64
Type EventType Type EventType
Status EventStatus Status EventStatus
Path string Path string
Hash string Hash string
Timestamp time.Time Timestamp time.Time
InDB bool //defaults to false
} }
func (e Event) IsUpdate() bool { func (e Event) IsUpdate() bool {

View File

@ -4,7 +4,6 @@ import (
"code.google.com/p/goconf/conf" "code.google.com/p/goconf/conf"
"errors" "errors"
"io" "io"
"io/ioutil"
"os" "os"
"path" "path"
) )
@ -14,21 +13,6 @@ type LocalStorage struct {
tmpSubdir string tmpSubdir string
} }
func ensureDirExists(dir string) error {
_, err := os.Lstat(dir)
if err != nil {
fi, err := os.Lstat(path.Dir(dir))
if err != nil {
return err
}
err = os.Mkdir(dir, fi.Mode().Perm())
if err != nil {
return err
}
}
return nil
}
func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) { func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) {
storageDir, err := config.GetString("storage", "dir") storageDir, err := config.GetString("storage", "dir")
if err != nil { if err != nil {
@ -52,29 +36,8 @@ func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) {
return ls, nil return ls, nil
} }
func (ls *LocalStorage) copyToTmp(src string) (string, error) {
infile, err := os.Open(src)
if err != nil {
return "", err
}
defer infile.Close()
outfile, err := ioutil.TempFile(ls.tmpSubdir, "asink")
if err != nil {
return "", err
}
defer outfile.Close()
_, err = io.Copy(outfile, infile)
if err != nil {
return "", err
}
return outfile.Name(), nil
}
func (ls *LocalStorage) Put(filename string, hash string) (e error) { func (ls *LocalStorage) Put(filename string, hash string) (e error) {
tmpfile, err := ls.copyToTmp(filename) tmpfile, err := copyToTmp(filename, ls.tmpSubdir)
if err != nil { if err != nil {
return err return err
} }

44
util.go Normal file
View File

@ -0,0 +1,44 @@
package main
import (
"io"
"io/ioutil"
"os"
"path"
)
func ensureDirExists(dir string) error {
_, err := os.Lstat(dir)
if err != nil {
fi, err := os.Lstat(path.Dir(dir))
if err != nil {
return err
}
err = os.Mkdir(dir, fi.Mode().Perm())
if err != nil {
return err
}
}
return nil
}
func copyToTmp(src string, tmpdir string) (string, error) {
infile, err := os.Open(src)
if err != nil {
return "", err
}
defer infile.Close()
outfile, err := ioutil.TempFile(tmpdir, "asink")
if err != nil {
return "", err
}
defer outfile.Close()
_, err = io.Copy(outfile, infile)
if err != nil {
return "", err
}
return outfile.Name(), nil
}

View File

@ -50,16 +50,6 @@ func StartWatching(watchDir string, fileUpdates chan *Event) {
event.Path = ev.Name event.Path = ev.Name
event.Timestamp = time.Now() event.Timestamp = time.Now()
if event.IsUpdate() {
event.Hash, err = HashFile(ev.Name)
if err != nil {
panic("file deleted already?")
continue
}
} else {
event.Hash = ""
}
fileUpdates <- event fileUpdates <- event
case err := <-watcher.Error: case err := <-watcher.Error: