client: Retrieve events sent to server

This commit is contained in:
Aaron Lindsay 2013-03-17 23:02:51 -04:00
parent 11bcf164c6
commit a4bce8a07b
3 changed files with 162 additions and 47 deletions

View File

@ -3,18 +3,12 @@ package main
import ( import (
"asink" "asink"
"asink/util" "asink/util"
"bytes"
"code.google.com/p/goconf/conf" "code.google.com/p/goconf/conf"
"encoding/json"
"errors"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"net/http"
"os" "os"
"os/user" "os/user"
"path" "path"
"strconv"
) )
type AsinkGlobals struct { type AsinkGlobals struct {
@ -80,21 +74,27 @@ func main() {
globals.server, err = config.GetString("server", "host") globals.server, err = config.GetString("server", "host")
globals.port, err = config.GetInt("server", "port") globals.port, err = config.GetInt("server", "port")
fileUpdates := make(chan *asink.Event)
go StartWatching(globals.syncDir, fileUpdates)
globals.db, err = GetAndInitDB(config) globals.db, err = GetAndInitDB(config)
if err != nil { if err != nil {
panic(err) panic(err)
} }
//spawn goroutines to handle local events
localFileUpdates := make(chan *asink.Event)
go StartWatching(globals.syncDir, localFileUpdates)
//spawn goroutines to receive remote events
remoteFileUpdates := make(chan *asink.Event)
go GetEvents(globals, remoteFileUpdates)
go ProcessRemoteEvents(globals, remoteFileUpdates)
for { for {
event := <-fileUpdates event := <-localFileUpdates
go ProcessEvent(globals, event) go ProcessLocalEvent(globals, event)
} }
} }
func ProcessEvent(globals AsinkGlobals, event *asink.Event) { func ProcessLocalEvent(globals AsinkGlobals, event *asink.Event) {
//add to database //add to database
err := globals.db.DatabaseAddEvent(event) err := globals.db.DatabaseAddEvent(event)
if err != nil { if err != nil {
@ -124,6 +124,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
panic(err)
} }
event.Status |= asink.CACHED event.Status |= asink.CACHED
@ -161,40 +162,9 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
} }
} }
func SendEvent(globals AsinkGlobals, event *asink.Event) error { func ProcessRemoteEvents(globals AsinkGlobals, eventChan chan *asink.Event) {
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" for event := range eventChan {
fmt.Println(event)
//construct json payload //TODO actually download event, add it to the local database, and populate the local directory
events := asink.EventList{
Events: []*asink.Event{event},
} }
b, err := json.Marshal(events)
if err != nil {
return err
}
fmt.Println(string(b))
//actually make the request
resp, err := http.Post(url, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
defer resp.Body.Close()
//check to make sure request succeeded
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var apistatus asink.APIResponse
err = json.Unmarshal(body, &apistatus)
if err != nil {
return err
}
if apistatus.Status != asink.SUCCESS {
return errors.New("API response was not success")
}
return nil
} }

View File

@ -120,3 +120,25 @@ func (adb *AsinkDB) DatabaseUpdateEvent(e *asink.Event) (err error) {
return nil return nil
} }
//returns nil if no such event exists
func (adb *AsinkDB) DatabaseLatestRemoteEvent() (event *asink.Event, err error) {
adb.lock.Lock()
//make sure the database gets unlocked
defer adb.lock.Unlock()
rows, err := adb.db.Query("SELECT id, localid, type, status, path, hash, timestamp, permissions FROM events WHERE id > 0 ORDER BY id DESC LIMIT 1;")
if err != nil {
return nil, err
}
for rows.Next() {
event = new(asink.Event)
err = rows.Scan(&event.Id, &event.LocalId, &event.Type, &event.Status, &event.Path, &event.Hash, &event.Timestamp, &event.Permissions)
if err != nil {
return nil, err
}
return event, nil
}
return nil, nil
}

123
client/net.go Normal file
View File

@ -0,0 +1,123 @@
package main
import (
"asink"
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"
)
const MIN_ERROR_WAIT = 100 // 1/10 of a second
const MAX_ERROR_WAIT = 10000 // 10 seconds
func SendEvent(globals AsinkGlobals, event *asink.Event) error {
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/"
//construct json payload
events := asink.EventList{
Events: []*asink.Event{event},
}
b, err := json.Marshal(events)
if err != nil {
return err
}
fmt.Println(string(b))
//actually make the request
resp, err := http.Post(url, "application/json", bytes.NewReader(b))
if err != nil {
return err
}
defer resp.Body.Close()
//check to make sure request succeeded
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var apistatus asink.APIResponse
err = json.Unmarshal(body, &apistatus)
if err != nil {
return err
}
if apistatus.Status != asink.SUCCESS {
return errors.New("API response was not success: " + apistatus.Explanation)
}
return nil
}
func GetEvents(globals AsinkGlobals, events chan *asink.Event) {
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/"
var successiveErrors uint = 0
errorWait := func(err error) {
fmt.Println(err)
var waitMilliseconds time.Duration = MIN_ERROR_WAIT << successiveErrors
if waitMilliseconds > MAX_ERROR_WAIT {
waitMilliseconds = MAX_ERROR_WAIT
}
time.Sleep(waitMilliseconds * time.Millisecond)
successiveErrors++
}
//query DB for latest remote event version number that we've seen locally
latestEvent, err := globals.db.DatabaseLatestRemoteEvent()
if err != nil {
panic(err)
}
for {
//query for events after latest_event
var fullUrl string
if latestEvent != nil {
fullUrl = url + strconv.FormatInt(latestEvent.Id + 1, 10)
} else {
fullUrl = url + "0"
}
resp, err := http.Get(fullUrl)
//if error, perform exponential backoff (with maximum timeout)
if err != nil {
if resp != nil {
resp.Body.Close()
}
errorWait(err)
continue
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close() //must be done after the last time resp is used
if err != nil {
errorWait(err)
continue
}
var apistatus asink.APIResponse
err = json.Unmarshal(body, &apistatus)
if err != nil {
errorWait(err)
continue
}
if apistatus.Status != asink.SUCCESS {
errorWait(err)
continue
}
for _, event := range apistatus.Events {
if latestEvent != nil && event.Id != latestEvent.Id + 1 {
break
}
events <- event
latestEvent = event
}
successiveErrors = 0
}
}