server: add long-polling support

This commit is contained in:
Aaron Lindsay 2013-03-09 22:47:49 -05:00
parent 94823e104d
commit 1ddbcb3636
3 changed files with 89 additions and 2 deletions

View File

@ -158,7 +158,7 @@ func ProcessEvent(globals AsinkGlobals, event *asink.Event) {
event.Status |= asink.ON_SERVER
err = DatabaseUpdateEvent(globals.db, event)
if err != nil {
panic(err)
panic(err) //TODO probably, definitely, none of these should panic
}
}

76
server/longpolling.go Normal file
View File

@ -0,0 +1,76 @@
package main
import (
"asink"
"sync"
"time"
)
type LongPollGroup struct {
channels []*chan *asink.Event
lock sync.Mutex
}
type PollingManager struct {
lock sync.RWMutex
groups map[string]*LongPollGroup
}
var pm *PollingManager
func init() {
pm = new(PollingManager)
pm.groups = make(map[string]*LongPollGroup)
}
func addPoller(uid string, channel *chan *asink.Event) {
pm.lock.RLock()
group := pm.groups[uid]
if group != nil {
group.lock.Lock()
pm.lock.RUnlock()
group.channels = append(group.channels, channel)
group.lock.Unlock()
} else {
pm.lock.RUnlock()
pm.lock.Lock()
group = new(LongPollGroup)
group.channels = append(group.channels, channel)
pm.groups[uid] = group
pm.lock.Unlock()
}
//set timer to call function after one minute
timeout := time.Duration(1)*time.Minute
time.AfterFunc(timeout, func() {
group.lock.Lock()
for i, c := range group.channels {
if c == channel {
copy(group.channels[i:], group.channels[i+1:])
group.channels = group.channels[:len(group.channels)-1]
break
}
}
group.lock.Unlock()
close(*channel)
})
}
func broadcastToPollers(uid string, event *asink.Event) {
//store off the long polling group we're trying to send to and remove
//it from PollingManager.groups
pm.lock.Lock()
group := pm.groups[uid]
pm.groups[uid] = nil
pm.lock.Unlock()
//send event down each of group's channels
if group != nil {
group.lock.Lock()
for _, c := range group.channels {
*c <- event
}
group.lock.Unlock()
}
}

View File

@ -12,6 +12,7 @@ import (
"strconv"
)
//global variables
var eventsRegexp *regexp.Regexp
var port int = 8080
var db *sql.DB
@ -80,7 +81,15 @@ func getEvents(w http.ResponseWriter, r *http.Request, nextEvent uint64) {
return
}
//TODO long-poll here if events is empty
//long-poll if events is empty
if len(events) == 0 {
c := make(chan *asink.Event)
addPoller("aclindsa", &c) //TODO support more than one user
e, ok := <-c
if ok {
events = append(events, e)
}
}
}
func putEvents(w http.ResponseWriter, r *http.Request) {
@ -119,6 +128,8 @@ func putEvents(w http.ResponseWriter, r *http.Request) {
for _, event := range events.Events {
DatabaseAddEvent(db, event)
}
broadcastToPollers("aclindsa", events.Events[0]) //TODO support more than one user
}
func eventHandler(w http.ResponseWriter, r *http.Request) {