Restructured to have subpackages, added server communication

This commit is contained in:
2013-02-20 23:43:01 -05:00
parent b193814371
commit 3fc3e3a963
10 changed files with 201 additions and 35 deletions

201
client/asink.go Normal file
View File

@ -0,0 +1,201 @@
package main
import (
"asink"
"asink/util"
"bytes"
"code.google.com/p/goconf/conf"
"database/sql"
"encoding/json"
"flag"
"fmt"
"net/http"
"io/ioutil"
"os"
"os/user"
"path"
"strconv"
)
type AsinkGlobals struct {
configFileName string
syncDir string
cacheDir string
tmpDir string
db *sql.DB
storage Storage
server string
port int
}
var globals AsinkGlobals
func init() {
const config_usage = "Config File to use"
userHomeDir := "~"
u, err := user.Current()
if err == nil {
userHomeDir = u.HomeDir
}
flag.StringVar(&globals.configFileName, "config", path.Join(userHomeDir, ".asink", "config"), config_usage)
flag.StringVar(&globals.configFileName, "c", path.Join(userHomeDir, ".asink", "config"), config_usage+" (shorthand)")
}
func main() {
flag.Parse()
fmt.Println("config file:", globals.configFileName)
config, err := conf.ReadConfigFile(globals.configFileName)
if err != nil {
fmt.Println(err)
fmt.Println("Error reading config file at ", globals.configFileName, ". Does it exist?")
return
}
globals.storage, err = GetStorage(config)
if err != nil {
fmt.Println(err)
return
}
globals.syncDir, err = config.GetString("local", "syncdir")
globals.cacheDir, err = config.GetString("local", "cachedir")
globals.tmpDir, err = config.GetString("local", "tmpdir")
//make sure all the necessary directories exist
err = util.EnsureDirExists(globals.syncDir)
if err != nil {
panic(err)
}
err = util.EnsureDirExists(globals.cacheDir)
if err != nil {
panic(err)
}
err = util.EnsureDirExists(globals.tmpDir)
if err != nil {
panic(err)
}
//TODO FIXME REMOVEME
fmt.Println(globals.syncDir)
fmt.Println(globals.cacheDir)
fmt.Println(globals.tmpDir)
fmt.Println(globals.storage)
//TODO FIXME REMOVEME
globals.server, err = config.GetString("server", "host")
globals.port, err = config.GetInt("server", "port")
fileUpdates := make(chan *asink.Event)
go StartWatching(globals.syncDir, fileUpdates)
globals.db, err = GetAndInitDB(config)
if err != nil {
panic(err)
return
}
for {
event := <-fileUpdates
ProcessEvent(globals, event)
}
}
func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
//add to database
err := DatabaseAddEvent(globals.db, event)
if err != nil {
panic(err)
}
if event.IsUpdate() {
//copy to tmp
tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir)
if err != nil {
panic(err)
}
event.Status |= asink.COPIED_TO_TMP
//get the file's hash
hash, err := HashFile(tmpfilename)
event.Hash = hash
if err != nil {
panic(err)
}
event.Status |= asink.HASHED
//rename to local cache w/ filename=hash
err = os.Rename(tmpfilename, path.Join(globals.cacheDir, event.Hash))
if err != nil {
err := os.Remove(tmpfilename)
if err != nil {
panic(err)
}
}
event.Status |= asink.CACHED
//update database
err = DatabaseUpdateEvent(globals.db, event)
if err != nil {
panic(err)
}
//upload file to remote storage
err = globals.storage.Put(event.Path, event.Hash)
if err != nil {
panic(err)
}
event.Status |= asink.UPLOADED
//update database again
err = DatabaseUpdateEvent(globals.db, event)
if err != nil {
panic(err)
}
}
//finally, send it off to the server
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/"
//construct json payload
events := asink.EventList{
Events: []*asink.Event{event},
}
b, err := json.Marshal(events)
if err != nil {
panic(err)
}
fmt.Println(string(b))
//actually make the request
resp, err := http.Post(url, "application/json", bytes.NewReader(b))
if err != nil {
panic(err)
}
defer resp.Body.Close()
//check to make sure request succeeded
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
var apistatus asink.APIResponse
err = json.Unmarshal(body, &apistatus)
if err != nil {
panic(err) //TODO handle sensibly
}
if apistatus.Status != "success" {
panic("Status not success") //TODO handle sensibly
}
fmt.Println(apistatus)
event.Status |= asink.ON_SERVER
err = DatabaseUpdateEvent(globals.db, event)
if err != nil {
panic(err)
}
}

96
client/database.go Normal file
View File

@ -0,0 +1,96 @@
package main
import (
"asink"
"code.google.com/p/goconf/conf"
"database/sql"
"errors"
_ "github.com/mattn/go-sqlite3"
"strconv"
)
func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) {
dbLocation, err := config.GetString("local", "dblocation")
if err != nil {
return nil, errors.New("Error: database location not specified in config file.")
}
db, err := sql.Open("sqlite3", dbLocation)
if err != nil {
return nil, err
}
//make sure the events table is created
tx, err := db.Begin()
if err != nil {
return nil, err
}
rows, err := tx.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='events';")
if err != nil {
return nil, err
}
if !rows.Next() {
//if this is false, it means no rows were returned
tx.Exec("CREATE TABLE events (id INTEGER, localid INTEGER PRIMARY KEY ASC, type INTEGER, status INTEGER, path TEXT, hash TEXT, timestamp INTEGER, permissions INTEGER);")
// tx.Exec("CREATE INDEX IF NOT EXISTS localididx on events (localid)")
tx.Exec("CREATE INDEX IF NOT EXISTS ididx on events (id);")
tx.Exec("CREATE INDEX IF NOT EXISTS pathidx on events (path);")
}
err = tx.Commit()
if err != nil {
return nil, err
}
return db, nil
}
func DatabaseAddEvent(db *sql.DB, e *asink.Event) error {
tx, err := db.Begin()
if err != nil {
return err
}
result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, 0)
if err != nil {
return err
}
id, err := result.LastInsertId()
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}
e.LocalId = id
e.InDB = true
return nil
}
func DatabaseUpdateEvent(db *sql.DB, e *asink.Event) error {
if !e.InDB {
return errors.New("Attempting to update an event in the database which hasn't been previously added.")
}
tx, err := db.Begin()
if err != nil {
return err
}
result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, 0, e.LocalId)
if err != nil {
return err
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows != 1 {
return errors.New("Updated " + strconv.Itoa(int(rows)) + " row(s) when intending to update 1 event row.")
}
err = tx.Commit()
if err != nil {
return err
}
return nil
}

25
client/hash.go Normal file
View File

@ -0,0 +1,25 @@
package main
import (
"crypto/sha256"
"fmt"
"io"
"os"
)
func HashFile(filename string) (string, error) {
hashfn := sha256.New()
infile, err := os.Open(filename)
if err != nil {
return "", err
}
defer infile.Close()
_, err = io.Copy(hashfn, infile)
if err != nil {
return "", err
}
return fmt.Sprintf("%x", hashfn.Sum(nil)), nil
}

32
client/storage.go Normal file
View File

@ -0,0 +1,32 @@
package main
import (
"code.google.com/p/goconf/conf"
"errors"
)
type Storage interface {
Put(filename string, hash string) error
Get(filename string, hash string) error
}
func GetStorage(config *conf.ConfigFile) (Storage, error) {
storageMethod, err := config.GetString("storage", "method")
if err != nil {
return nil, errors.New("Error: storage method not specified in config file.")
}
var storage Storage
switch storageMethod {
case "local":
storage, err = NewLocalStorage(config)
if err != nil {
return nil, err
}
default:
return nil, errors.New("Error: storage method '" + storageMethod + "' not implemented.")
}
return storage, nil
}

73
client/storage_local.go Normal file
View File

@ -0,0 +1,73 @@
package main
import (
"asink/util"
"code.google.com/p/goconf/conf"
"errors"
"io"
"os"
"path"
)
type LocalStorage struct {
storageDir string
tmpSubdir string
}
func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) {
storageDir, err := config.GetString("storage", "dir")
if err != nil {
return nil, errors.New("Error: LocalStorage indicated in config file, but lacking local storage directory ('dir = some/dir').")
}
ls := new(LocalStorage)
ls.storageDir = storageDir
ls.tmpSubdir = path.Join(storageDir, ".asink-tmpdir")
//make sure the base directory and tmp subdir exist
err = util.EnsureDirExists(ls.storageDir)
if err != nil {
return nil, err
}
err = util.EnsureDirExists(ls.tmpSubdir)
if err != nil {
return nil, err
}
return ls, nil
}
func (ls *LocalStorage) Put(filename string, hash string) (e error) {
tmpfile, err := util.CopyToTmp(filename, ls.tmpSubdir)
if err != nil {
return err
}
err = os.Rename(tmpfile, path.Join(ls.storageDir, hash))
if err != nil {
err := os.Remove(tmpfile)
if err != nil {
return err
}
}
return nil
}
func (ls *LocalStorage) Get(filename string, hash string) error {
infile, err := os.Open(path.Join(ls.storageDir, hash))
if err != nil {
return err
}
defer infile.Close()
outfile, err := os.Open(filename)
if err != nil {
return err
}
defer outfile.Close()
_, err = io.Copy(outfile, infile)
return err
}

64
client/watcher.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"asink"
"github.com/howeyc/fsnotify"
"os"
"path/filepath"
"time"
)
func StartWatching(watchDir string, fileUpdates chan *asink.Event) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
panic("Failed to create fsnotify watcher")
}
//function called by filepath.Walk to start watching a directory and all subdirectories
watchDirFn := func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
err = watcher.Watch(path)
if err != nil {
panic("Failed to watch " + path)
}
}
return nil
}
//processes all the fsnotify events into asink events
go func() {
for {
select {
case ev := <-watcher.Event:
//if a directory was created, begin recursively watching all its subdirectories
if fi, err := os.Stat(ev.Name); err == nil && fi.IsDir() {
if ev.IsCreate() {
filepath.Walk(ev.Name, watchDirFn)
}
continue
}
event := new(asink.Event)
if ev.IsCreate() || ev.IsModify() {
event.Type = asink.UPDATE
} else if ev.IsDelete() || ev.IsRename() {
event.Type = asink.DELETE
} else {
panic("Unknown fsnotify event type")
}
event.Status = asink.NOTICED
event.Path = ev.Name
event.Timestamp = time.Now().UnixNano()
fileUpdates <- event
case err := <-watcher.Error:
panic(err)
}
}
}()
//start watching the directory passed in
filepath.Walk(watchDir, watchDirFn)
}