Add path map and convert asink client event processing to that model
This commit is contained in:
parent
cfc45fec71
commit
2569f4cfbf
@ -80,6 +80,9 @@ func main() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//spawn goroutine to handle locking file paths
|
||||||
|
go PathLocker(globals.db)
|
||||||
|
|
||||||
//spawn goroutines to handle local events
|
//spawn goroutines to handle local events
|
||||||
localFileUpdates := make(chan *asink.Event)
|
localFileUpdates := make(chan *asink.Event)
|
||||||
go StartWatching(globals.syncDir, localFileUpdates)
|
go StartWatching(globals.syncDir, localFileUpdates)
|
||||||
@ -96,14 +99,15 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
||||||
//add to database
|
latestLocal := LockPath(event.Path, true)
|
||||||
err := globals.db.DatabaseAddEvent(event)
|
defer UnlockPath(event)
|
||||||
if err != nil {
|
if latestLocal != nil {
|
||||||
panic(err)
|
event.Predecessor = latestLocal.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
if event.IsUpdate() {
|
if event.IsUpdate() {
|
||||||
//copy to tmp
|
//copy to tmp
|
||||||
|
//TODO upload in chunks and check modification times to make sure it hasn't been changed instead of copying the whole thing off
|
||||||
tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir)
|
tmpfilename, err := util.CopyToTmp(event.Path, globals.tmpDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -129,44 +133,41 @@ func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
|
|||||||
}
|
}
|
||||||
event.Status |= asink.CACHED
|
event.Status |= asink.CACHED
|
||||||
|
|
||||||
//update database
|
|
||||||
err = globals.db.DatabaseUpdateEvent(event)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
//upload file to remote storage
|
//upload file to remote storage
|
||||||
err = globals.storage.Put(event.Path, event.Hash)
|
err = globals.storage.Put(event.Path, event.Hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
event.Status |= asink.UPLOADED
|
event.Status |= asink.UPLOADED
|
||||||
|
|
||||||
//update database again
|
|
||||||
err = globals.db.DatabaseUpdateEvent(event)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//finally, send it off to the server
|
//finally, send it off to the server
|
||||||
err = SendEvent(globals, event)
|
err := SendEvent(globals, event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) //TODO handle sensibly
|
panic(err) //TODO handle sensibly
|
||||||
}
|
}
|
||||||
|
|
||||||
event.Status |= asink.ON_SERVER
|
event.Status |= asink.ON_SERVER
|
||||||
err = globals.db.DatabaseUpdateEvent(event)
|
|
||||||
if err != nil {
|
|
||||||
panic(err) //TODO probably, definitely, none of these should panic
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
|
func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
|
||||||
//TODO check to see if event originated locally. If it did, stop processing it.
|
latestLocal := LockPath(event.Path, true)
|
||||||
|
defer UnlockPath(event)
|
||||||
|
//if we already have this event, or if it is older than our most recent event, bail out
|
||||||
|
if latestLocal != nil {
|
||||||
|
if event.Timestamp < latestLocal.Timestamp || event.IsSameEvent(latestLocal) {
|
||||||
|
UnlockPath(event)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if latestLocal.Hash != event.Predecessor && latestLocal.Hash != event.Hash {
|
||||||
|
panic("conflict")
|
||||||
|
//TODO handle conflict
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//Download event
|
//Download event
|
||||||
if event.IsUpdate() {
|
if event.IsUpdate() && (latestLocal == nil || event.Hash != latestLocal.Hash) {
|
||||||
outfile, err := ioutil.TempFile(globals.tmpDir, "asink")
|
outfile, err := ioutil.TempFile(globals.tmpDir, "asink")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) //TODO handle sensibly
|
panic(err) //TODO handle sensibly
|
||||||
@ -191,7 +192,6 @@ func ProcessRemoteEvent(globals AsinkGlobals, event *asink.Event) {
|
|||||||
|
|
||||||
fmt.Println(event)
|
fmt.Println(event)
|
||||||
//TODO make sure file being overwritten is either unchanged or already copied off and hashed
|
//TODO make sure file being overwritten is either unchanged or already copied off and hashed
|
||||||
//TODO add event to the local database, and populate the local directory
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) {
|
func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) {
|
||||||
|
@ -141,3 +141,24 @@ func (adb *AsinkDB) DatabaseLatestEventForPath(path string) (event *asink.Event,
|
|||||||
return event, nil
|
return event, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//returns nil if no such event exists
|
||||||
|
func (adb *AsinkDB) DatabaseLatestRemoteEvent() (event *asink.Event, err error) {
|
||||||
|
adb.lock.Lock()
|
||||||
|
//make sure the database gets unlocked
|
||||||
|
defer adb.lock.Unlock()
|
||||||
|
|
||||||
|
row := adb.db.QueryRow("SELECT id, localid, type, status, path, hash, predecessor, timestamp, permissions FROM events WHERE id > 0 ORDER BY id DESC LIMIT 1;")
|
||||||
|
|
||||||
|
event = new(asink.Event)
|
||||||
|
err = row.Scan(&event.Id, &event.LocalId, &event.Type, &event.Status, &event.Path, &event.Hash, &event.Predecessor, &event.Timestamp, &event.Permissions)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case err == sql.ErrNoRows:
|
||||||
|
return nil, nil
|
||||||
|
case err != nil:
|
||||||
|
return nil, err
|
||||||
|
default:
|
||||||
|
return event, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
93
client/path_map.go
Normal file
93
client/path_map.go
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"asink"
|
||||||
|
)
|
||||||
|
|
||||||
|
type pathMapRequest struct {
|
||||||
|
path string
|
||||||
|
local bool /*is this event local (true) or remote(false)?*/
|
||||||
|
responseChan chan *asink.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
type pathMapValue struct {
|
||||||
|
latestEvent *asink.Event
|
||||||
|
locked bool
|
||||||
|
localWaiters []chan *asink.Event
|
||||||
|
remoteWaiters []chan *asink.Event
|
||||||
|
}
|
||||||
|
|
||||||
|
var pathLockerChan = make(chan *pathMapRequest)
|
||||||
|
var pathUnlockerChan = make(chan *asink.Event)
|
||||||
|
|
||||||
|
func PathLocker(db *AsinkDB) {
|
||||||
|
var event *asink.Event
|
||||||
|
var request *pathMapRequest
|
||||||
|
var v *pathMapValue
|
||||||
|
var ok bool
|
||||||
|
var c chan *asink.Event
|
||||||
|
m := make(map[string]*pathMapValue)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event = <-pathUnlockerChan:
|
||||||
|
if v, ok = m[event.Path]; ok != false {
|
||||||
|
if v.latestEvent == nil || !v.latestEvent.IsSameEvent(event) {
|
||||||
|
err := db.DatabaseAddEvent(event)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
//TODO batch database writes instead of doing one at a time
|
||||||
|
}
|
||||||
|
v.latestEvent = event
|
||||||
|
if len(v.localWaiters) > 0 {
|
||||||
|
c = v.localWaiters[0]
|
||||||
|
v.localWaiters = v.localWaiters[1:]
|
||||||
|
c <- event
|
||||||
|
} else if len(v.remoteWaiters) > 0 {
|
||||||
|
c = v.remoteWaiters[0]
|
||||||
|
v.remoteWaiters = v.remoteWaiters[1:]
|
||||||
|
c <- event
|
||||||
|
} else {
|
||||||
|
v.locked = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case request = <-pathLockerChan:
|
||||||
|
v, ok = m[request.path]
|
||||||
|
//allocate pathMapValue object if it doesn't exist
|
||||||
|
if !ok {
|
||||||
|
v = new(pathMapValue)
|
||||||
|
m[request.path] = v
|
||||||
|
}
|
||||||
|
if v.locked {
|
||||||
|
if request.local {
|
||||||
|
v.localWaiters = append(v.localWaiters, request.responseChan)
|
||||||
|
} else {
|
||||||
|
v.remoteWaiters = append(v.remoteWaiters, request.responseChan)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
v.locked = true
|
||||||
|
event, err := db.DatabaseLatestEventForPath(request.path)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
request.responseChan <- event
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//locks the path to ensure nothing else inside asink is mucking with that file.
|
||||||
|
//'local' determines the precedence of the lock - all local lock requesters will
|
||||||
|
//be served before any remote requesters.
|
||||||
|
//The previous event for this path is returned, nil is returned if no previous event exists
|
||||||
|
func LockPath(path string, local bool) (currentEvent *asink.Event) {
|
||||||
|
c := make(chan *asink.Event)
|
||||||
|
pathLockerChan <- &pathMapRequest{path, local, c}
|
||||||
|
return <-c
|
||||||
|
}
|
||||||
|
|
||||||
|
//unlocks the path, storing the updated event back to the database
|
||||||
|
func UnlockPath(event *asink.Event) {
|
||||||
|
pathUnlockerChan <- event
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user