From 6bdb81834c03b80bdb28b0684e64657942f8dc08 Mon Sep 17 00:00:00 2001 From: Aaron Lindsay Date: Wed, 14 Aug 2013 07:26:52 -0400 Subject: [PATCH] Improve error handling by adding DISCARDED state for events --- client/asink.go | 30 ++++++++++++++++++++++++------ client/net.go | 1 - client/path_map.go | 19 +++++++++++-------- client/watcher.go | 1 - events.go | 13 ++----------- 5 files changed, 37 insertions(+), 27 deletions(-) diff --git a/client/asink.go b/client/asink.go index 55ac099..5640433 100644 --- a/client/asink.go +++ b/client/asink.go @@ -109,15 +109,27 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { //copy to tmp //TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir) - if err != nil && !util.ErrorFileNotFound(err) { + if err != nil { + //bail out if the file we are trying to upload already got deleted + if util.ErrorFileNotFound(err) { + event.Status |= asink.DISCARDED + return + } panic(err) } + //try to collect the file's permissions fileinfo, err := os.Stat(event.Path) - if err != nil && !util.ErrorFileNotFound(err) { + if err != nil { + //bail out if the file we are trying to upload already got deleted + if util.ErrorFileNotFound(err) { + event.Status |= asink.DISCARDED + return + } panic(err) + } else { + event.Permissions = fileinfo.Mode() } - event.Permissions = fileinfo.Mode() //get the file's hash hash, err := HashFile(tmpfilename) @@ -129,6 +141,7 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { //If the file didn't actually change, squash this event if latestLocal != nil && event.Hash == latestLocal.Hash { os.Remove(tmpfilename) + event.Status |= asink.DISCARDED return } @@ -148,6 +161,12 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { if err != nil { panic(err) } + } else { + //if we're trying to delete a file that we thought was already deleted, there's no need to delete it again + if latestLocal != nil && latestLocal.IsDelete() { + event.Status |= asink.DISCARDED + return + } } //finally, send it off to the server @@ -155,16 +174,16 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { if err != nil { panic(err) //TODO handle sensibly } - } func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) { latestLocal := LockPath(event.Path, true) defer UnlockPath(event) + //if we already have this event, or if it is older than our most recent event, bail out if latestLocal != nil { if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) { - UnlockPath(event) + event.Status |= asink.DISCARDED return } @@ -226,7 +245,6 @@ func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) { //TODO delete file hierarchy beneath this file if its the last one in its directory? } - fmt.Println(event) //TODO make sure file being overwritten is either unchanged or already copied off and hashed } diff --git a/client/net.go b/client/net.go index 05c58e5..7289bee 100644 --- a/client/net.go +++ b/client/net.go @@ -26,7 +26,6 @@ func SendEvent(globals AsinkGlobals, event *asink.Event) error { if err != nil { return err } - fmt.Println(string(b)) //actually make the request resp, err := http.Post(url, "application/json", bytes.NewReader(b)) diff --git a/client/path_map.go b/client/path_map.go index 416b618..55c110a 100644 --- a/client/path_map.go +++ b/client/path_map.go @@ -32,22 +32,25 @@ func PathLocker(db *AsinkDB) { select { case event = <-pathUnlockerChan: if v, ok = m[event.Path]; ok != false { - if v.latestEvent == nil || !v.latestEvent.IsSameEvent(event) { - err := db.DatabaseAddEvent(event) - if err != nil { - panic(err) + //only update status in data structures if the event hasn't been discarded + if event.Status&asink.DISCARDED == 0 { + if v.latestEvent == nil || !v.latestEvent.IsSameEvent(event) { + err := db.DatabaseAddEvent(event) + if err != nil { + panic(err) + } + //TODO batch database writes instead of doing one at a time } - //TODO batch database writes instead of doing one at a time + v.latestEvent = event } - v.latestEvent = event if len(v.localWaiters) > 0 { c = v.localWaiters[0] v.localWaiters = v.localWaiters[1:] - c <- event + c <- v.latestEvent } else if len(v.remoteWaiters) > 0 { c = v.remoteWaiters[0] v.remoteWaiters = v.remoteWaiters[1:] - c <- event + c <- v.latestEvent } else { v.locked = false } diff --git a/client/watcher.go b/client/watcher.go index c4b9f86..1433b85 100644 --- a/client/watcher.go +++ b/client/watcher.go @@ -47,7 +47,6 @@ func StartWatching(watchDir string, fileUpdates chan *asink.Event) { panic("Unknown fsnotify event type") } - event.Status = asink.NOTICED event.Path = ev.Name event.Timestamp = time.Now().UnixNano() diff --git a/events.go b/events.go index 7b37e83..4837895 100644 --- a/events.go +++ b/events.go @@ -16,17 +16,8 @@ const ( type EventStatus uint32 const ( - //the state of the event on the local asink instance on which it originated: - NOTICED = 1 << iota //watcher.go has been notified that a file changed - COPIED_TO_TMP //temporary version saved off - HASHED //hash taken of tmp file - CACHED //tmp file renamed to its hash - UPLOADED //tmp file has been successfully uploaded to storage - ON_SERVER //server has been successfully notified of event - //the state of the event on the asink instance notified that it occurred elsewhere - NOTIFIED //we've been told a file has been changed remotely - DOWNLOADED //event has been downloaded and stored in the local file cache - SYNCED //everything has been done to ensure the affected file is up-to-date + //Local event status flags + DISCARDED = 1 << iota //event is to be discarded because it errored or is duplicate ) type Event struct {