157 lines
4 KiB
Go
157 lines
4 KiB
Go
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||
|
|
||
|
package sse
|
||
|
|
||
|
import (
|
||
|
"encoding/base64"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// DefaultBufferSize size of the queue that holds the streams messages.
|
||
|
const DefaultBufferSize = 1024
|
||
|
|
||
|
// Server Is our main struct
|
||
|
type Server struct {
|
||
|
// Extra headers adding to the HTTP response to each client
|
||
|
Headers map[string]string
|
||
|
// Sets a ttl that prevents old events from being transmitted
|
||
|
EventTTL time.Duration
|
||
|
// Specifies the size of the message buffer for each stream
|
||
|
BufferSize int
|
||
|
// Encodes all data as base64
|
||
|
EncodeBase64 bool
|
||
|
// Splits an events data into multiple data: entries
|
||
|
SplitData bool
|
||
|
// Enables creation of a stream when a client connects
|
||
|
AutoStream bool
|
||
|
// Enables automatic replay for each new subscriber that connects
|
||
|
AutoReplay bool
|
||
|
|
||
|
// Specifies the function to run when client subscribe or un-subscribe
|
||
|
OnSubscribe func(streamID string, sub *Subscriber)
|
||
|
OnUnsubscribe func(streamID string, sub *Subscriber)
|
||
|
|
||
|
streams map[string]*Stream
|
||
|
muStreams sync.RWMutex
|
||
|
}
|
||
|
|
||
|
// New will create a server and setup defaults
|
||
|
func New() *Server {
|
||
|
return &Server{
|
||
|
BufferSize: DefaultBufferSize,
|
||
|
AutoStream: false,
|
||
|
AutoReplay: true,
|
||
|
streams: make(map[string]*Stream),
|
||
|
Headers: map[string]string{},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// NewWithCallback will create a server and setup defaults with callback function
|
||
|
func NewWithCallback(onSubscribe, onUnsubscribe func(streamID string, sub *Subscriber)) *Server {
|
||
|
return &Server{
|
||
|
BufferSize: DefaultBufferSize,
|
||
|
AutoStream: false,
|
||
|
AutoReplay: true,
|
||
|
streams: make(map[string]*Stream),
|
||
|
Headers: map[string]string{},
|
||
|
OnSubscribe: onSubscribe,
|
||
|
OnUnsubscribe: onUnsubscribe,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Close shuts down the server, closes all of the streams and connections
|
||
|
func (s *Server) Close() {
|
||
|
s.muStreams.Lock()
|
||
|
defer s.muStreams.Unlock()
|
||
|
|
||
|
for id := range s.streams {
|
||
|
s.streams[id].close()
|
||
|
delete(s.streams, id)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// CreateStream will create a new stream and register it
|
||
|
func (s *Server) CreateStream(id string) *Stream {
|
||
|
s.muStreams.Lock()
|
||
|
defer s.muStreams.Unlock()
|
||
|
|
||
|
if s.streams[id] != nil {
|
||
|
return s.streams[id]
|
||
|
}
|
||
|
|
||
|
str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe)
|
||
|
str.run()
|
||
|
|
||
|
s.streams[id] = str
|
||
|
|
||
|
return str
|
||
|
}
|
||
|
|
||
|
// RemoveStream will remove a stream
|
||
|
func (s *Server) RemoveStream(id string) {
|
||
|
s.muStreams.Lock()
|
||
|
defer s.muStreams.Unlock()
|
||
|
|
||
|
if s.streams[id] != nil {
|
||
|
s.streams[id].close()
|
||
|
delete(s.streams, id)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// StreamExists checks whether a stream by a given id exists
|
||
|
func (s *Server) StreamExists(id string) bool {
|
||
|
return s.getStream(id) != nil
|
||
|
}
|
||
|
|
||
|
// Publish sends a mesage to every client in a streamID.
|
||
|
// If the stream's buffer is full, it blocks until the message is sent out to
|
||
|
// all subscribers (but not necessarily arrived the clients), or when the
|
||
|
// stream is closed.
|
||
|
func (s *Server) Publish(id string, event *Event) {
|
||
|
stream := s.getStream(id)
|
||
|
if stream == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-stream.quit:
|
||
|
case stream.event <- s.process(event):
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// TryPublish is the same as Publish except that when the operation would cause
|
||
|
// the call to be blocked, it simply drops the message and returns false.
|
||
|
// Together with a small BufferSize, it can be useful when publishing the
|
||
|
// latest message ASAP is more important than reliable delivery.
|
||
|
func (s *Server) TryPublish(id string, event *Event) bool {
|
||
|
stream := s.getStream(id)
|
||
|
if stream == nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case stream.event <- s.process(event):
|
||
|
return true
|
||
|
default:
|
||
|
return false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Server) getStream(id string) *Stream {
|
||
|
s.muStreams.RLock()
|
||
|
defer s.muStreams.RUnlock()
|
||
|
return s.streams[id]
|
||
|
}
|
||
|
|
||
|
func (s *Server) process(event *Event) *Event {
|
||
|
if s.EncodeBase64 {
|
||
|
output := make([]byte, base64.StdEncoding.EncodedLen(len(event.Data)))
|
||
|
base64.StdEncoding.Encode(output, event.Data)
|
||
|
event.Data = output
|
||
|
}
|
||
|
return event
|
||
|
}
|