154 lines
3.7 KiB
Go
154 lines
3.7 KiB
Go
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||
|
|
||
|
package sse
|
||
|
|
||
|
import (
|
||
|
"net/url"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
)
|
||
|
|
||
|
// Stream ...
|
||
|
type Stream struct {
|
||
|
ID string
|
||
|
event chan *Event
|
||
|
quit chan struct{}
|
||
|
quitOnce sync.Once
|
||
|
register chan *Subscriber
|
||
|
deregister chan *Subscriber
|
||
|
subscribers []*Subscriber
|
||
|
Eventlog EventLog
|
||
|
subscriberCount int32
|
||
|
// Enables replaying of eventlog to newly added subscribers
|
||
|
AutoReplay bool
|
||
|
isAutoStream bool
|
||
|
|
||
|
// Specifies the function to run when client subscribe or un-subscribe
|
||
|
OnSubscribe func(streamID string, sub *Subscriber)
|
||
|
OnUnsubscribe func(streamID string, sub *Subscriber)
|
||
|
}
|
||
|
|
||
|
// newStream returns a new stream
|
||
|
func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream {
|
||
|
return &Stream{
|
||
|
ID: id,
|
||
|
AutoReplay: replay,
|
||
|
subscribers: make([]*Subscriber, 0),
|
||
|
isAutoStream: isAutoStream,
|
||
|
register: make(chan *Subscriber),
|
||
|
deregister: make(chan *Subscriber),
|
||
|
event: make(chan *Event, buffSize),
|
||
|
quit: make(chan struct{}),
|
||
|
Eventlog: make(EventLog, 0),
|
||
|
OnSubscribe: onSubscribe,
|
||
|
OnUnsubscribe: onUnsubscribe,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (str *Stream) run() {
|
||
|
go func(str *Stream) {
|
||
|
for {
|
||
|
select {
|
||
|
// Add new subscriber
|
||
|
case subscriber := <-str.register:
|
||
|
str.subscribers = append(str.subscribers, subscriber)
|
||
|
if str.AutoReplay {
|
||
|
str.Eventlog.Replay(subscriber)
|
||
|
}
|
||
|
|
||
|
// Remove closed subscriber
|
||
|
case subscriber := <-str.deregister:
|
||
|
i := str.getSubIndex(subscriber)
|
||
|
if i != -1 {
|
||
|
str.removeSubscriber(i)
|
||
|
}
|
||
|
|
||
|
if str.OnUnsubscribe != nil {
|
||
|
go str.OnUnsubscribe(str.ID, subscriber)
|
||
|
}
|
||
|
|
||
|
// Publish event to subscribers
|
||
|
case event := <-str.event:
|
||
|
if str.AutoReplay {
|
||
|
str.Eventlog.Add(event)
|
||
|
}
|
||
|
for i := range str.subscribers {
|
||
|
str.subscribers[i].connection <- event
|
||
|
}
|
||
|
|
||
|
// Shutdown if the server closes
|
||
|
case <-str.quit:
|
||
|
// remove connections
|
||
|
str.removeAllSubscribers()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}(str)
|
||
|
}
|
||
|
|
||
|
func (str *Stream) close() {
|
||
|
str.quitOnce.Do(func() {
|
||
|
close(str.quit)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (str *Stream) getSubIndex(sub *Subscriber) int {
|
||
|
for i := range str.subscribers {
|
||
|
if str.subscribers[i] == sub {
|
||
|
return i
|
||
|
}
|
||
|
}
|
||
|
return -1
|
||
|
}
|
||
|
|
||
|
// addSubscriber will create a new subscriber on a stream
|
||
|
func (str *Stream) addSubscriber(eventid int, url *url.URL) *Subscriber {
|
||
|
atomic.AddInt32(&str.subscriberCount, 1)
|
||
|
sub := &Subscriber{
|
||
|
eventid: eventid,
|
||
|
quit: str.deregister,
|
||
|
connection: make(chan *Event, 64),
|
||
|
URL: url,
|
||
|
}
|
||
|
|
||
|
if str.isAutoStream {
|
||
|
sub.removed = make(chan struct{}, 1)
|
||
|
}
|
||
|
|
||
|
str.register <- sub
|
||
|
|
||
|
if str.OnSubscribe != nil {
|
||
|
go str.OnSubscribe(str.ID, sub)
|
||
|
}
|
||
|
|
||
|
return sub
|
||
|
}
|
||
|
|
||
|
func (str *Stream) removeSubscriber(i int) {
|
||
|
atomic.AddInt32(&str.subscriberCount, -1)
|
||
|
close(str.subscribers[i].connection)
|
||
|
if str.subscribers[i].removed != nil {
|
||
|
str.subscribers[i].removed <- struct{}{}
|
||
|
close(str.subscribers[i].removed)
|
||
|
}
|
||
|
str.subscribers = append(str.subscribers[:i], str.subscribers[i+1:]...)
|
||
|
}
|
||
|
|
||
|
func (str *Stream) removeAllSubscribers() {
|
||
|
for i := 0; i < len(str.subscribers); i++ {
|
||
|
close(str.subscribers[i].connection)
|
||
|
if str.subscribers[i].removed != nil {
|
||
|
str.subscribers[i].removed <- struct{}{}
|
||
|
close(str.subscribers[i].removed)
|
||
|
}
|
||
|
}
|
||
|
atomic.StoreInt32(&str.subscriberCount, 0)
|
||
|
str.subscribers = str.subscribers[:0]
|
||
|
}
|
||
|
|
||
|
func (str *Stream) getSubscriberCount() int {
|
||
|
return int(atomic.LoadInt32(&str.subscriberCount))
|
||
|
}
|