diff --git a/client/asink.go b/client/asink.go index b01b191..fae7dd3 100644 --- a/client/asink.go +++ b/client/asink.go @@ -80,6 +80,9 @@ func main() { panic(err) } + //spawn goroutine to handle locking file paths + go PathLocker(globals.db) + //spawn goroutines to handle local events localFileUpdates := make(chan *asink.Event) go StartWatching(globals.syncDir, localFileUpdates) @@ -96,14 +99,15 @@ func main() { } func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { - //add to database - err := globals.db.DatabaseAddEvent(event) - if err != nil { - panic(err) + latestLocal := LockPath(event.Path, true) + defer UnlockPath(event) + if latestLocal != nil { + event.Predecessor = latestLocal.Hash } if event.IsUpdate() { //copy to tmp + //TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir) if err != nil { panic(err) @@ -129,44 +133,41 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { } event.Status |= asink.CACHED - //update database - err = globals.db.DatabaseUpdateEvent(event) - if err != nil { - panic(err) - } - //upload file to remote storage err = globals.storage.Put(event.Path, event.Hash) if err != nil { panic(err) } event.Status |= asink.UPLOADED - - //update database again - err = globals.db.DatabaseUpdateEvent(event) - if err != nil { - panic(err) - } - } //finally, send it off to the server - err = SendEvent(globals, event) + err := SendEvent(globals, event) if err != nil { panic(err) //TODO handle sensibly } event.Status |= asink.ON_SERVER - err = globals.db.DatabaseUpdateEvent(event) - if err != nil { - panic(err) //TODO probably, definitely, none of these should panic - } } func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) { - //TODO check to see if event originated locally. If it did, stop processing it. + latestLocal := LockPath(event.Path, true) + defer UnlockPath(event) + //if we already have this event, or if it is older than our most recent event, bail out + if latestLocal != nil { + if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) { + UnlockPath(event) + return + } + + if latestLocal.Hash != event.Predecessor && latestLocal.Hash != event.Hash { + panic("conflict") + //TODO handle conflict + } + } + //Download event - if event.IsUpdate() { + if event.IsUpdate() && (latestLocal == nil || event.Hash != latestLocal.Hash) { outfile, err := ioutil.TempFile(globals.tmpDir, "asink") if err != nil { panic(err) //TODO handle sensibly @@ -191,7 +192,6 @@ func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) { fmt.Println(event) //TODO make sure file being overwritten is either unchanged or already copied off and hashed - //TODO add event to the local database, and populate the local directory } func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) { diff --git a/client/database.go b/client/database.go index 960b74e..c48097c 100644 --- a/client/database.go +++ b/client/database.go @@ -141,3 +141,24 @@ func (adb *AsinkDB) DatabaseLatestEventForPath(path string) (event *asink.Event, return event, nil } } + +//returns nil if no such event exists +func (adb *AsinkDB) DatabaseLatestRemoteEvent() (event *asink.Event, err error) { + adb.lock.Lock() + //make sure the database gets unlocked + defer adb.lock.Unlock() + + row := adb.db.QueryRow("SELECT id, localid, type, status, path, hash, predecessor, timestamp, permissions FROM events WHERE id > 0 ORDER BY id DESC LIMIT 1;") + + event = new(asink.Event) + err = row.Scan(&event.Id, &event.LocalId, &event.Type, &event.Status, &event.Path, &event.Hash, &event.Predecessor, &event.Timestamp, &event.Permissions) + + switch { + case err == sql.ErrNoRows: + return nil, nil + case err != nil: + return nil, err + default: + return event, nil + } +} diff --git a/client/path_map.go b/client/path_map.go new file mode 100644 index 0000000..416b618 --- /dev/null +++ b/client/path_map.go @@ -0,0 +1,93 @@ +package main + +import ( + "asink" +) + +type pathMapRequest struct { + path string + local bool /*is this event local (true) or remote(false)?*/ + responseChan chan *asink.Event +} + +type pathMapValue struct { + latestEvent *asink.Event + locked bool + localWaiters []chan *asink.Event + remoteWaiters []chan *asink.Event +} + +var pathLockerChan = make(chan *pathMapRequest) +var pathUnlockerChan = make(chan *asink.Event) + +func PathLocker(db *AsinkDB) { + var event *asink.Event + var request *pathMapRequest + var v *pathMapValue + var ok bool + var c chan *asink.Event + m := make(map[string]*pathMapValue) + + for { + select { + case event = <-pathUnlockerChan: + if v, ok = m[event.Path]; ok != false { + if v.latestEvent == nil || !v.latestEvent.IsSameEvent(event) { + err := db.DatabaseAddEvent(event) + if err != nil { + panic(err) + } + //TODO batch database writes instead of doing one at a time + } + v.latestEvent = event + if len(v.localWaiters) > 0 { + c = v.localWaiters[0] + v.localWaiters = v.localWaiters[1:] + c <- event + } else if len(v.remoteWaiters) > 0 { + c = v.remoteWaiters[0] + v.remoteWaiters = v.remoteWaiters[1:] + c <- event + } else { + v.locked = false + } + } + case request = <-pathLockerChan: + v, ok = m[request.path] + //allocate pathMapValue object if it doesn't exist + if !ok { + v = new(pathMapValue) + m[request.path] = v + } + if v.locked { + if request.local { + v.localWaiters = append(v.localWaiters, request.responseChan) + } else { + v.remoteWaiters = append(v.remoteWaiters, request.responseChan) + } + } else { + v.locked = true + event, err := db.DatabaseLatestEventForPath(request.path) + if err != nil { + panic(err) + } + request.responseChan <- event + } + } + } +} + +//locks the path to ensure nothing else inside asink is mucking with that file. +//'local' determines the precedence of the lock - all local lock requesters will +//be served before any remote requesters. +//The previous event for this path is returned, nil is returned if no previous event exists +func LockPath(path string, local bool) (currentEvent *asink.Event) { + c := make(chan *asink.Event) + pathLockerChan <- &pathMapRequest{path, local, c} + return <-c +} + +//unlocks the path, storing the updated event back to the database +func UnlockPath(event *asink.Event) { + pathUnlockerChan <- event +}