diff --git a/client/asink.go b/client/asink.go index b27b21a..9b7cd86 100644 --- a/client/asink.go +++ b/client/asink.go @@ -158,7 +158,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) { event.Status |= asink.ON_SERVER err = DatabaseUpdateEvent(globals.db, event) if err != nil { - panic(err) + panic(err) //TODO probably, definitely, none of these should panic } } diff --git a/server/longpolling.go b/server/longpolling.go new file mode 100644 index 0000000..6355590 --- /dev/null +++ b/server/longpolling.go @@ -0,0 +1,76 @@ +package main + +import ( + "asink" + "sync" + "time" +) + +type LongPollGroup struct { + channels []*chan *asink.Event + lock sync.Mutex +} + +type PollingManager struct { + lock sync.RWMutex + groups map[string]*LongPollGroup +} + +var pm *PollingManager + +func init() { + pm = new(PollingManager) + pm.groups = make(map[string]*LongPollGroup) +} + +func addPoller(uid string, channel *chan *asink.Event) { + pm.lock.RLock() + + group := pm.groups[uid] + if group != nil { + group.lock.Lock() + pm.lock.RUnlock() + group.channels = append(group.channels, channel) + group.lock.Unlock() + } else { + pm.lock.RUnlock() + pm.lock.Lock() + group = new(LongPollGroup) + group.channels = append(group.channels, channel) + pm.groups[uid] = group + pm.lock.Unlock() + } + + //set timer to call function after one minute + timeout := time.Duration(1)*time.Minute + time.AfterFunc(timeout, func() { + group.lock.Lock() + for i, c := range group.channels { + if c == channel { + copy(group.channels[i:], group.channels[i+1:]) + group.channels = group.channels[:len(group.channels)-1] + break + } + } + group.lock.Unlock() + close(*channel) + }) +} + +func broadcastToPollers(uid string, event *asink.Event) { + //store off the long polling group we're trying to send to and remove + //it from PollingManager.groups + pm.lock.Lock() + group := pm.groups[uid] + pm.groups[uid] = nil + pm.lock.Unlock() + + //send event down each of group's channels + if group != nil { + group.lock.Lock() + for _, c := range group.channels { + *c <- event + } + group.lock.Unlock() + } +} diff --git a/server/server.go b/server/server.go index 050735d..eed0761 100644 --- a/server/server.go +++ b/server/server.go @@ -12,6 +12,7 @@ import ( "strconv" ) +//global variables var eventsRegexp *regexp.Regexp var port int = 8080 var db *sql.DB @@ -80,7 +81,15 @@ func getEvents(w http.ResponseWriter, r *http.Request, nextEvent uint64) { return } - //TODO long-poll here if events is empty + //long-poll if events is empty + if len(events) == 0 { + c := make(chan *asink.Event) + addPoller("aclindsa", &c) //TODO support more than one user + e, ok := <-c + if ok { + events = append(events, e) + } + } } func putEvents(w http.ResponseWriter, r *http.Request) { @@ -119,6 +128,8 @@ func putEvents(w http.ResponseWriter, r *http.Request) { for _, event := range events.Events { DatabaseAddEvent(db, event) } + + broadcastToPollers("aclindsa", events.Events[0]) //TODO support more than one user } func eventHandler(w http.ResponseWriter, r *http.Request) {