1
0
Fork 0

client: add batching of sending events to the server

This commit is contained in:
Aaron Lindsay 2013-09-05 23:20:29 -04:00
parent 65e692b701
commit 793e4eb6b2
2 changed files with 57 additions and 4 deletions

View File

@ -107,6 +107,7 @@ func StartClient(args []string) {
go PathLocker(globals.db)
//spawn goroutines to handle local events
go SendEvents(globals)
localFileUpdates := make(chan *asink.Event)
go StartWatching(globals.syncDir, localFileUpdates)

View File

@ -19,6 +19,18 @@ import (
const MIN_ERROR_WAIT = 100 // 1/10 of a second
const MAX_ERROR_WAIT = 10000 // 10 seconds
const MAX_SEND_AT_ONCE = 50 //maximum number of events to send to the server at once
type sendEventRequest struct {
event *asink.Event
returnChan *chan error
}
var sendEventsChan chan *sendEventRequest
func init() {
sendEventsChan = make(chan *sendEventRequest)
}
func AuthenticatedRequest(method, url, bodyType string, body io.Reader, username, password string) (*http.Response, error) {
client := &http.Client{}
@ -39,14 +51,14 @@ func AuthenticatedPost(url, bodyType string, body io.Reader, username, password
return AuthenticatedRequest("POST", url, bodyType, body, username, password)
}
func SendEvent(globals AsinkGlobals, event *asink.Event) error {
func actuallySendEvents(globals AsinkGlobals, events []*asink.Event) error {
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/"
//construct json payload
events := asink.EventList{
Events: []*asink.Event{event},
eventStruct := asink.EventList{
Events: events,
}
b, err := json.Marshal(events)
b, err := json.Marshal(eventStruct)
if err != nil {
return err
}
@ -76,6 +88,46 @@ func SendEvent(globals AsinkGlobals, event *asink.Event) error {
return nil
}
func SendEvents(globals AsinkGlobals) {
for {
//make arrays to hold events and return channels
events := make([]*asink.Event, 1)
returnChans := make([]*chan error, 1)
//wait for the first event
request := <-sendEventsChan
events[0] = request.event
returnChans[0] = request.returnChan
//get any other events that are currently available to send
possiblyMoreEvents := true
for possiblyMoreEvents && len(events) <= MAX_SEND_AT_ONCE {
select {
case request = <-sendEventsChan:
events = append(events, request.event)
returnChans = append(returnChans, request.returnChan)
default:
possiblyMoreEvents = false
}
}
//send all these events
err := actuallySendEvents(globals, events)
//send back any errors (or hopefully nil) to their respective channels
for _, c := range returnChans {
*c <- err
}
}
}
func SendEvent(globals AsinkGlobals, event *asink.Event) error {
responseChan := make(chan error)
request := sendEventRequest{event, &responseChan}
sendEventsChan <- &request
return <-responseChan
}
func GetEvents(globals AsinkGlobals, events chan *asink.Event) {
url := "http://" + globals.server + ":" + strconv.Itoa(int(globals.port)) + "/events/"
var successiveErrors uint = 0