diff --git a/api.go b/api.go new file mode 100644 index 0000000..3a79b37 --- /dev/null +++ b/api.go @@ -0,0 +1,6 @@ +package asink + +type APIResponse struct { + Status string //may be 'error' or 'success' + Explanation string +} diff --git a/asink.go b/client/asink.go similarity index 61% rename from asink.go rename to client/asink.go index 030528c..b8b0ddb 100644 --- a/asink.go +++ b/client/asink.go @@ -1,13 +1,20 @@ package main import ( + "asink" + "asink/util" + "bytes" "code.google.com/p/goconf/conf" "database/sql" + "encoding/json" "flag" "fmt" + "net/http" + "io/ioutil" "os" "os/user" "path" + "strconv" ) type AsinkGlobals struct { @@ -17,6 +24,8 @@ type AsinkGlobals struct { tmpDir string db *sql.DB storage Storage + server string + port int } var globals AsinkGlobals @@ -56,15 +65,15 @@ func main() { globals.tmpDir, err = config.GetString("local", "tmpdir") //make sure all the necessary directories exist - err = ensureDirExists(globals.syncDir) + err = util.EnsureDirExists(globals.syncDir) if err != nil { panic(err) } - err = ensureDirExists(globals.cacheDir) + err = util.EnsureDirExists(globals.cacheDir) if err != nil { panic(err) } - err = ensureDirExists(globals.tmpDir) + err = util.EnsureDirExists(globals.tmpDir) if err != nil { panic(err) } @@ -76,7 +85,10 @@ func main() { fmt.Println(globals.storage) //TODO FIXME REMOVEME - fileUpdates := make(chan *Event) + globals.server, err = config.GetString("server", "host") + globals.port, err = config.GetInt("server", "port") + + fileUpdates := make(chan *asink.Event) go StartWatching(globals.syncDir, fileUpdates) globals.db, err = GetAndInitDB(config) @@ -91,7 +103,7 @@ func main() { } } -func ProcessEvent(globals AsinkGlobals, event *Event) { +func ProcessEvent(globals AsinkGlobals, event *asink.Event) { //add to database err := DatabaseAddEvent(globals.db, event) if err != nil { @@ -100,11 +112,11 @@ func ProcessEvent(globals AsinkGlobals, event *Event) { if event.IsUpdate() { //copy to tmp - tmpfilename, err := copyToTmp(event.Path, globals.tmpDir) + tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir) if err != nil { panic(err) } - event.Status |= COPIED_TO_TMP + event.Status |= asink.COPIED_TO_TMP //get the file's hash hash, err := HashFile(tmpfilename) @@ -112,7 +124,7 @@ func ProcessEvent(globals AsinkGlobals, event *Event) { if err != nil { panic(err) } - event.Status |= HASHED + event.Status |= asink.HASHED //rename to local cache w/ filename=hash err = os.Rename(tmpfilename, path.Join(globals.cacheDir, event.Hash)) @@ -122,7 +134,7 @@ func ProcessEvent(globals AsinkGlobals, event *Event) { panic(err) } } - event.Status |= CACHED + event.Status |= asink.CACHED //update database err = DatabaseUpdateEvent(globals.db, event) @@ -135,7 +147,7 @@ func ProcessEvent(globals AsinkGlobals, event *Event) { if err != nil { panic(err) } - event.Status |= UPLOADED + event.Status |= asink.UPLOADED //update database again err = DatabaseUpdateEvent(globals.db, event) @@ -144,7 +156,46 @@ func ProcessEvent(globals AsinkGlobals, event *Event) { } } - fmt.Println(event) - //TODO notify server of new file + //finally, send it off to the server + url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" + + //construct json payload + events := asink.EventList{ + Events: []*asink.Event{event}, + } + b, err := json.Marshal(events) + if err != nil { + panic(err) + } + fmt.Println(string(b)) + + //actually make the request + resp, err := http.Post(url, "application/json", bytes.NewReader(b)) + if err != nil { + panic(err) + } + defer resp.Body.Close() + + //check to make sure request succeeded + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + panic(err) + } + + var apistatus asink.APIResponse + err = json.Unmarshal(body, &apistatus) + if err != nil { + panic(err) //TODO handle sensibly + } + if apistatus.Status != "success" { + panic("Status not success") //TODO handle sensibly + } + fmt.Println(apistatus) + + event.Status |= asink.ON_SERVER + err = DatabaseUpdateEvent(globals.db, event) + if err != nil { + panic(err) + } } diff --git a/database.go b/client/database.go similarity index 89% rename from database.go rename to client/database.go index d1c444e..efebe3e 100644 --- a/database.go +++ b/client/database.go @@ -1,6 +1,7 @@ package main import ( + "asink" "code.google.com/p/goconf/conf" "database/sql" "errors" @@ -43,12 +44,12 @@ func GetAndInitDB(config *conf.ConfigFile) (*sql.DB, error) { return db, nil } -func DatabaseAddEvent(db *sql.DB, e *Event) error { +func DatabaseAddEvent(db *sql.DB, e *asink.Event) error { tx, err := db.Begin() if err != nil { return err } - result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp.UnixNano(), 0) + result, err := tx.Exec("INSERT INTO events (id, type, status, path, hash, timestamp, permissions) VALUES (?,?,?,?,?,?,?);", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, 0) if err != nil { return err } @@ -66,7 +67,7 @@ func DatabaseAddEvent(db *sql.DB, e *Event) error { return nil } -func DatabaseUpdateEvent(db *sql.DB, e *Event) error { +func DatabaseUpdateEvent(db *sql.DB, e *asink.Event) error { if !e.InDB { return errors.New("Attempting to update an event in the database which hasn't been previously added.") } @@ -75,7 +76,7 @@ func DatabaseUpdateEvent(db *sql.DB, e *Event) error { if err != nil { return err } - result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp.UnixNano(), 0, e.LocalId) + result, err := tx.Exec("UPDATE events SET id=?, type=?, status=?, path=?, hash=?, timestamp=?, permissions=? WHERE localid=?;", e.Id, e.Type, e.Status, e.Path, e.Hash, e.Timestamp, 0, e.LocalId) if err != nil { return err } diff --git a/hash.go b/client/hash.go similarity index 100% rename from hash.go rename to client/hash.go diff --git a/storage.go b/client/storage.go similarity index 100% rename from storage.go rename to client/storage.go diff --git a/storage_local.go b/client/storage_local.go similarity index 89% rename from storage_local.go rename to client/storage_local.go index 7648966..2c99c23 100644 --- a/storage_local.go +++ b/client/storage_local.go @@ -1,6 +1,7 @@ package main import ( + "asink/util" "code.google.com/p/goconf/conf" "errors" "io" @@ -24,11 +25,11 @@ func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) { ls.tmpSubdir = path.Join(storageDir, ".asink-tmpdir") //make sure the base directory and tmp subdir exist - err = ensureDirExists(ls.storageDir) + err = util.EnsureDirExists(ls.storageDir) if err != nil { return nil, err } - err = ensureDirExists(ls.tmpSubdir) + err = util.EnsureDirExists(ls.tmpSubdir) if err != nil { return nil, err } @@ -37,7 +38,7 @@ func NewLocalStorage(config *conf.ConfigFile) (*LocalStorage, error) { } func (ls *LocalStorage) Put(filename string, hash string) (e error) { - tmpfile, err := copyToTmp(filename, ls.tmpSubdir) + tmpfile, err := util.CopyToTmp(filename, ls.tmpSubdir) if err != nil { return err } diff --git a/watcher.go b/client/watcher.go similarity index 82% rename from watcher.go rename to client/watcher.go index 4e56149..c4b9f86 100644 --- a/watcher.go +++ b/client/watcher.go @@ -1,13 +1,14 @@ package main import ( + "asink" "github.com/howeyc/fsnotify" "os" "path/filepath" "time" ) -func StartWatching(watchDir string, fileUpdates chan *Event) { +func StartWatching(watchDir string, fileUpdates chan *asink.Event) { watcher, err := fsnotify.NewWatcher() if err != nil { panic("Failed to create fsnotify watcher") @@ -37,18 +38,18 @@ func StartWatching(watchDir string, fileUpdates chan *Event) { continue } - event := new(Event) + event := new(asink.Event) if ev.IsCreate() || ev.IsModify() { - event.Type = UPDATE + event.Type = asink.UPDATE } else if ev.IsDelete() || ev.IsRename() { - event.Type = DELETE + event.Type = asink.DELETE } else { panic("Unknown fsnotify event type") } - event.Status = NOTICED + event.Status = asink.NOTICED event.Path = ev.Name - event.Timestamp = time.Now() + event.Timestamp = time.Now().UnixNano() fileUpdates <- event diff --git a/events.go b/events.go index 5042800..02848ba 100644 --- a/events.go +++ b/events.go @@ -1,8 +1,4 @@ -package main - -import ( - "time" -) +package asink //event type type EventType uint32 @@ -31,8 +27,8 @@ type Event struct { Status EventStatus Path string Hash string - Timestamp time.Time - InDB bool //defaults to false + Timestamp int64 + InDB bool `json:"-"` //defaults to false. Omitted from json marshalling. } func (e Event) IsUpdate() bool { @@ -42,3 +38,7 @@ func (e Event) IsUpdate() bool { func (e Event) IsDelete() bool { return e.Type&DELETE == DELETE } + +type EventList struct { + Events []*Event +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..7890125 --- /dev/null +++ b/server/server.go @@ -0,0 +1,106 @@ +package main + +import ( + "asink" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "net/http" + "regexp" + "strconv" +) + +var eventsRegexp *regexp.Regexp + +var port int = 8080 +func init() { + const port_usage = "Port on which to serve HTTP API" + + flag.IntVar(&port, "port", 8080, port_usage) + flag.IntVar(&port, "p", 8080, port_usage+" (shorthand)") + + eventsRegexp = regexp.MustCompile("^/events/([0-9]+)$") +} + +func main() { + flag.Parse() + + http.HandleFunc("/", rootHandler) + http.HandleFunc("/events", eventHandler) + http.HandleFunc("/events/", eventHandler) + + //TODO replace with http://golang.org/pkg/net/http/#ListenAndServeTLS + err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil) + if err != nil { + fmt.Println(err) + } +} + +func rootHandler(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "You're probably looking for /events/") +} + +func getEvents(w http.ResponseWriter, r *http.Request, nextEvent uint64) { + fmt.Fprintf(w, strconv.FormatUint(nextEvent, 10)) +} + +func putEvents(w http.ResponseWriter, r *http.Request) { + var events asink.EventList + var error_occurred bool = false + var error_message string = "" + defer func() { + var apiresponse asink.APIResponse + if error_occurred { + apiresponse = asink.APIResponse{ + Status: "error", + Explanation: error_message, + } + } else { + apiresponse = asink.APIResponse{ + Status: "success", + } + } + b, err := json.Marshal(apiresponse) + if err != nil { + panic(err) + } + w.Write(b) + }() + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + error_message = err.Error() + return + } + err = json.Unmarshal(body, &events) + if err != nil { + error_message = err.Error() + return + } + for _, event := range events.Events { + fmt.Println(event) + } +} + +func eventHandler(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" { + //if GET, return any events later than (and including) the event id passed in + if sm := eventsRegexp.FindStringSubmatch(r.RequestURI); sm != nil { + i, err := strconv.ParseUint(sm[1], 10, 64) + if err != nil { + //TODO display error message here instead + fmt.Printf("ERROR parsing " + sm[1] + "\n") + getEvents(w, r, 0) + } else { + getEvents(w, r, i) + } + } else { + getEvents(w, r, 0) + } + } else if r.Method == "POST" { + putEvents(w, r) + } else { + fmt.Fprintf(w, "You f-ed up.") + } +} diff --git a/util.go b/util/util.go similarity index 83% rename from util.go rename to util/util.go index 0232d52..0316361 100644 --- a/util.go +++ b/util/util.go @@ -1,4 +1,4 @@ -package main +package util import ( "io" @@ -7,7 +7,7 @@ import ( "path" ) -func ensureDirExists(dir string) error { +func EnsureDirExists(dir string) error { _, err := os.Lstat(dir) if err != nil { fi, err := os.Lstat(path.Dir(dir)) @@ -22,7 +22,7 @@ func ensureDirExists(dir string) error { return nil } -func copyToTmp(src string, tmpdir string) (string, error) { +func CopyToTmp(src string, tmpdir string) (string, error) { infile, err := os.Open(src) if err != nil { return "", err