Improve error handling by adding DISCARDED state for events
This commit is contained in:
parent
0c9b4cb057
commit
6bdb81834c
@ -109,15 +109,27 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
|||||||
//copy to tmp
|
//copy to tmp
|
||||||
//TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off
|
//TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off
|
||||||
tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir)
|
tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir)
|
||||||
if err != nil && !util.ErrorFileNotFound(err) {
|
if err != nil {
|
||||||
|
//bail out if the file we are trying to upload already got deleted
|
||||||
|
if util.ErrorFileNotFound(err) {
|
||||||
|
event.Status |= asink.DISCARDED
|
||||||
|
return
|
||||||
|
}
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//try to collect the file's permissions
|
||||||
fileinfo, err := os.Stat(event.Path)
|
fileinfo, err := os.Stat(event.Path)
|
||||||
if err != nil && !util.ErrorFileNotFound(err) {
|
if err != nil {
|
||||||
|
//bail out if the file we are trying to upload already got deleted
|
||||||
|
if util.ErrorFileNotFound(err) {
|
||||||
|
event.Status |= asink.DISCARDED
|
||||||
|
return
|
||||||
|
}
|
||||||
panic(err)
|
panic(err)
|
||||||
|
} else {
|
||||||
|
event.Permissions = fileinfo.Mode()
|
||||||
}
|
}
|
||||||
event.Permissions = fileinfo.Mode()
|
|
||||||
|
|
||||||
//get the file's hash
|
//get the file's hash
|
||||||
hash, err := HashFile(tmpfilename)
|
hash, err := HashFile(tmpfilename)
|
||||||
@ -129,6 +141,7 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
|||||||
//If the file didn't actually change, squash this event
|
//If the file didn't actually change, squash this event
|
||||||
if latestLocal != nil && event.Hash == latestLocal.Hash {
|
if latestLocal != nil && event.Hash == latestLocal.Hash {
|
||||||
os.Remove(tmpfilename)
|
os.Remove(tmpfilename)
|
||||||
|
event.Status |= asink.DISCARDED
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,6 +161,12 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
//if we're trying to delete a file that we thought was already deleted, there's no need to delete it again
|
||||||
|
if latestLocal != nil && latestLocal.IsDelete() {
|
||||||
|
event.Status |= asink.DISCARDED
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//finally, send it off to the server
|
//finally, send it off to the server
|
||||||
@ -155,16 +174,16 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) //TODO handle sensibly
|
panic(err) //TODO handle sensibly
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
|
func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
|
||||||
latestLocal := LockPath(event.Path, true)
|
latestLocal := LockPath(event.Path, true)
|
||||||
defer UnlockPath(event)
|
defer UnlockPath(event)
|
||||||
|
|
||||||
//if we already have this event, or if it is older than our most recent event, bail out
|
//if we already have this event, or if it is older than our most recent event, bail out
|
||||||
if latestLocal != nil {
|
if latestLocal != nil {
|
||||||
if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) {
|
if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) {
|
||||||
UnlockPath(event)
|
event.Status |= asink.DISCARDED
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,7 +245,6 @@ func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
|
|||||||
//TODO delete file hierarchy beneath this file if its the last one in its directory?
|
//TODO delete file hierarchy beneath this file if its the last one in its directory?
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Println(event)
|
|
||||||
//TODO make sure file being overwritten is either unchanged or already copied off and hashed
|
//TODO make sure file being overwritten is either unchanged or already copied off and hashed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -26,7 +26,6 @@ func SendEvent(globals AsinkGlobals, event *asink.Event) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fmt.Println(string(b))
|
|
||||||
|
|
||||||
//actually make the request
|
//actually make the request
|
||||||
resp, err := http.Post(url, "application/json", bytes.NewReader(b))
|
resp, err := http.Post(url, "application/json", bytes.NewReader(b))
|
||||||
|
@ -32,22 +32,25 @@ func PathLocker(db *AsinkDB) {
|
|||||||
select {
|
select {
|
||||||
case event = <-pathUnlockerChan:
|
case event = <-pathUnlockerChan:
|
||||||
if v, ok = m[event.Path]; ok != false {
|
if v, ok = m[event.Path]; ok != false {
|
||||||
if v.latestEvent == nil || !v.latestEvent.IsSameEvent(event) {
|
//only update status in data structures if the event hasn't been discarded
|
||||||
err := db.DatabaseAddEvent(event)
|
if event.Status&asink.DISCARDED == 0 {
|
||||||
if err != nil {
|
if v.latestEvent == nil || !v.latestEvent.IsSameEvent(event) {
|
||||||
panic(err)
|
err := db.DatabaseAddEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
//TODO batch database writes instead of doing one at a time
|
||||||
}
|
}
|
||||||
//TODO batch database writes instead of doing one at a time
|
v.latestEvent = event
|
||||||
}
|
}
|
||||||
v.latestEvent = event
|
|
||||||
if len(v.localWaiters) > 0 {
|
if len(v.localWaiters) > 0 {
|
||||||
c = v.localWaiters[0]
|
c = v.localWaiters[0]
|
||||||
v.localWaiters = v.localWaiters[1:]
|
v.localWaiters = v.localWaiters[1:]
|
||||||
c <- event
|
c <- v.latestEvent
|
||||||
} else if len(v.remoteWaiters) > 0 {
|
} else if len(v.remoteWaiters) > 0 {
|
||||||
c = v.remoteWaiters[0]
|
c = v.remoteWaiters[0]
|
||||||
v.remoteWaiters = v.remoteWaiters[1:]
|
v.remoteWaiters = v.remoteWaiters[1:]
|
||||||
c <- event
|
c <- v.latestEvent
|
||||||
} else {
|
} else {
|
||||||
v.locked = false
|
v.locked = false
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,6 @@ func StartWatching(watchDir string, fileUpdates chan *asink.Event) {
|
|||||||
panic("Unknown fsnotify event type")
|
panic("Unknown fsnotify event type")
|
||||||
}
|
}
|
||||||
|
|
||||||
event.Status = asink.NOTICED
|
|
||||||
event.Path = ev.Name
|
event.Path = ev.Name
|
||||||
event.Timestamp = time.Now().UnixNano()
|
event.Timestamp = time.Now().UnixNano()
|
||||||
|
|
||||||
|
13
events.go
13
events.go
@ -16,17 +16,8 @@ const (
|
|||||||
type EventStatus uint32
|
type EventStatus uint32
|
||||||
|
|
||||||
const (
|
const (
|
||||||
//the state of the event on the local asink instance on which it originated:
|
//Local event status flags
|
||||||
NOTICED = 1 << iota //watcher.go has been notified that a file changed
|
DISCARDED = 1 << iota //event is to be discarded because it errored or is duplicate
|
||||||
COPIED_TO_TMP //temporary version saved off
|
|
||||||
HASHED //hash taken of tmp file
|
|
||||||
CACHED //tmp file renamed to its hash
|
|
||||||
UPLOADED //tmp file has been successfully uploaded to storage
|
|
||||||
ON_SERVER //server has been successfully notified of event
|
|
||||||
//the state of the event on the asink instance notified that it occurred elsewhere
|
|
||||||
NOTIFIED //we've been told a file has been changed remotely
|
|
||||||
DOWNLOADED //event has been downloaded and stored in the local file cache
|
|
||||||
SYNCED //everything has been done to ensure the affected file is up-to-date
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type Event struct {
|
type Event struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user