diff --git a/client/asink.go b/client/asink.go index c9b46ee..c024e82 100644 --- a/client/asink.go +++ b/client/asink.go @@ -3,18 +3,12 @@ package main import ( "asink" "asink/util" - "bytes" "code.google.com/p/goconf/conf" - "encoding/json" - "errors" "flag" "fmt" - "io/ioutil" - "net/http" "os" "os/user" "path" - "strconv" ) type AsinkGlobals struct { @@ -80,21 +74,27 @@ func main() { globals.server, err = config.GetString("server", "host") globals.port, err = config.GetInt("server", "port") - fileUpdates := make(chan *asink.Event) - go StartWatching(globals.syncDir, fileUpdates) - globals.db, err = GetAndInitDB(config) if err != nil { panic(err) } + //spawn goroutines to handle local events + localFileUpdates := make(chan *asink.Event) + go StartWatching(globals.syncDir, localFileUpdates) + + //spawn goroutines to receive remote events + remoteFileUpdates := make(chan *asink.Event) + go GetEvents(globals, remoteFileUpdates) + go ProcessRemoteEvents(globals, remoteFileUpdates) + for { - event := <-fileUpdates - go ProcessEvent(globals, event) + event := <-localFileUpdates + go ProcessLocalEvent(globals, event) } } -func ProcessEvent(globals AsinkGlobals, event *asink.Event) { +func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) { //add to database err := globals.db.DatabaseAddEvent(event) if err != nil { @@ -124,6 +124,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { if err != nil { panic(err) } + panic(err) } event.Status |= asink.CACHED @@ -161,40 +162,9 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { } } -func SendEvent(globals AsinkGlobals, event *asink.Event) error { - url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" - - //construct json payload - events := asink.EventList{ - Events: []*asink.Event{event}, +func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) { + for event := range eventChan { + fmt.Println(event) + //TODO actually download event, add it to the local database, and populate the local directory } - b, err := json.Marshal(events) - if err != nil { - return err - } - fmt.Println(string(b)) - - //actually make the request - resp, err := http.Post(url, "application/json", bytes.NewReader(b)) - if err != nil { - return err - } - defer resp.Body.Close() - - //check to make sure request succeeded - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return err - } - - var apistatus asink.APIResponse - err = json.Unmarshal(body, &apistatus) - if err != nil { - return err - } - if apistatus.Status != asink.SUCCESS { - return errors.New("API response was not success") - } - - return nil } diff --git a/client/database.go b/client/database.go index 3c7baf9..b458f36 100644 --- a/client/database.go +++ b/client/database.go @@ -120,3 +120,25 @@ func (adb *AsinkDB) DatabaseUpdateEvent(e *asink.Event) (err error) { return nil } + +//returns nil if no such event exists +func (adb *AsinkDB) DatabaseLatestRemoteEvent() (event *asink.Event, err error) { + adb.lock.Lock() + //make sure the database gets unlocked + defer adb.lock.Unlock() + + rows, err := adb.db.Query("SELECT id, localid, type, status, path, hash, timestamp, permissions FROM events WHERE id > 0 ORDER BY id DESC LIMIT 1;") + if err != nil { + return nil, err + } + + for rows.Next() { + event = new(asink.Event) + err = rows.Scan(&event.Id, &event.LocalId, &event.Type, &event.Status, &event.Path, &event.Hash, &event.Timestamp, &event.Permissions) + if err != nil { + return nil, err + } + return event, nil + } + return nil, nil +} diff --git a/client/net.go b/client/net.go new file mode 100644 index 0000000..24b42ef --- /dev/null +++ b/client/net.go @@ -0,0 +1,123 @@ +package main + +import ( + "asink" + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "strconv" + "time" +) + +const MIN_ERROR_WAIT = 100 // 1/10 of a second +const MAX_ERROR_WAIT = 10000 // 10 seconds + +func SendEvent(globals AsinkGlobals, event *asink.Event) error { + url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" + + //construct json payload + events := asink.EventList{ + Events: []*asink.Event{event}, + } + b, err := json.Marshal(events) + if err != nil { + return err + } + fmt.Println(string(b)) + + //actually make the request + resp, err := http.Post(url, "application/json", bytes.NewReader(b)) + if err != nil { + return err + } + defer resp.Body.Close() + + //check to make sure request succeeded + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + var apistatus asink.APIResponse + err = json.Unmarshal(body, &apistatus) + if err != nil { + return err + } + if apistatus.Status != asink.SUCCESS { + return errors.New("API response was not success: " + apistatus.Explanation) + } + + return nil +} + +func GetEvents(globals AsinkGlobals, events chan *asink.Event) { + url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" + var successiveErrors uint = 0 + + errorWait := func(err error) { + fmt.Println(err) + var waitMilliseconds time.Duration = MIN_ERROR_WAIT << successiveErrors + if waitMilliseconds > MAX_ERROR_WAIT { + waitMilliseconds = MAX_ERROR_WAIT + } + time.Sleep(waitMilliseconds * time.Millisecond) + successiveErrors++ + } + + //query DB for latest remote event version number that we've seen locally + latestEvent, err := globals.db.DatabaseLatestRemoteEvent() + if err != nil { + panic(err) + } + + for { + //query for events after latest_event + var fullUrl string + if latestEvent != nil { + fullUrl = url + strconv.FormatInt(latestEvent.Id + 1, 10) + } else { + fullUrl = url + "0" + } + resp, err := http.Get(fullUrl) + + //if error, perform exponential backoff (with maximum timeout) + if err != nil { + if resp != nil { + resp.Body.Close() + } + errorWait(err) + continue + } + + body, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() //must be done after the last time resp is used + if err != nil { + errorWait(err) + continue + } + + var apistatus asink.APIResponse + err = json.Unmarshal(body, &apistatus) + if err != nil { + errorWait(err) + continue + } + if apistatus.Status != asink.SUCCESS { + errorWait(err) + continue + } + + + for _, event := range apistatus.Events { + if latestEvent != nil && event.Id != latestEvent.Id + 1 { + break + } + events <- event + latestEvent = event + } + successiveErrors = 0 + } +}