From 793e4eb6b2f39fe38049c9fe710c8562699bec35 Mon Sep 17 00:00:00 2001 From: Aaron Lindsay Date: Thu, 5 Sep 2013 23:20:29 -0400 Subject: [PATCH] client: add batching of sending events to the server --- asink/client.go | 1 + asink/net.go | 60 +++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 57 insertions(+), 4 deletions(-) diff --git a/asink/client.go b/asink/client.go index 835809a..461bbdb 100644 --- a/asink/client.go +++ b/asink/client.go @@ -107,6 +107,7 @@ func StartClient(args []string) { go PathLocker(globals.db) //spawn goroutines to handle local events + go SendEvents(globals) localFileUpdates := make(chan *asink.Event) go StartWatching(globals.syncDir, localFileUpdates) diff --git a/asink/net.go b/asink/net.go index 7ed6622..7b51c02 100644 --- a/asink/net.go +++ b/asink/net.go @@ -19,6 +19,18 @@ import ( const MIN_ERROR_WAIT = 100 // 1/10 of a second const MAX_ERROR_WAIT = 10000 // 10 seconds +const MAX_SEND_AT_ONCE = 50 //maximum number of events to send to the server at once + +type sendEventRequest struct { + event *asink.Event + returnChan *chan error +} + +var sendEventsChan chan *sendEventRequest + +func init() { + sendEventsChan = make(chan *sendEventRequest) +} func AuthenticatedRequest(method, url, bodyType string, body io.Reader, username, password string) (*http.Response, error) { client := &http.Client{} @@ -39,14 +51,14 @@ func AuthenticatedPost(url, bodyType string, body io.Reader, username, password return AuthenticatedRequest("POST", url, bodyType, body, username, password) } -func SendEvent(globals AsinkGlobals, event *asink.Event) error { +func actuallySendEvents(globals AsinkGlobals, events []*asink.Event) error { url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" //construct json payload - events := asink.EventList{ - Events: []*asink.Event{event}, + eventStruct := asink.EventList{ + Events: events, } - b, err := json.Marshal(events) + b, err := json.Marshal(eventStruct) if err != nil { return err } @@ -76,6 +88,46 @@ func SendEvent(globals AsinkGlobals, event *asink.Event) error { return nil } +func SendEvents(globals AsinkGlobals) { + for { + //make arrays to hold events and return channels + events := make([]*asink.Event, 1) + returnChans := make([]*chan error, 1) + + //wait for the first event + request := <-sendEventsChan + events[0] = request.event + returnChans[0] = request.returnChan + + //get any other events that are currently available to send + possiblyMoreEvents := true + for possiblyMoreEvents && len(events) <= MAX_SEND_AT_ONCE { + select { + case request = <-sendEventsChan: + events = append(events, request.event) + returnChans = append(returnChans, request.returnChan) + default: + possiblyMoreEvents = false + } + } + + //send all these events + err := actuallySendEvents(globals, events) + + //send back any errors (or hopefully nil) to their respective channels + for _, c := range returnChans { + *c <- err + } + } +} + +func SendEvent(globals AsinkGlobals, event *asink.Event) error { + responseChan := make(chan error) + request := sendEventRequest{event, &responseChan} + sendEventsChan <- &request + return <-responseChan +} + func GetEvents(globals AsinkGlobals, events chan *asink.Event) { url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/" var successiveErrors uint = 0