Added function to receive chats from live stream
This commit is contained in:
parent
42ff91e9f9
commit
651f63d306
238
chat.go
238
chat.go
|
@ -3,14 +3,18 @@ package rumblelivestreamlib
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/csv"
|
"encoding/csv"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/r3labs/sse/v2"
|
||||||
"github.com/tylertravisty/go-utils/random"
|
"github.com/tylertravisty/go-utils/random"
|
||||||
|
"gopkg.in/cenkalti/backoff.v1"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ChatInfo struct {
|
type ChatInfo struct {
|
||||||
|
@ -19,11 +23,25 @@ type ChatInfo struct {
|
||||||
ChannelID int
|
ChannelID int
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ci *ChatInfo) Url() string {
|
func (ci *ChatInfo) MessageUrl() string {
|
||||||
return fmt.Sprintf("%s/chat/%s/message", ci.UrlPrefix, ci.ChatID)
|
return fmt.Sprintf("%s/chat/%s/message", ci.UrlPrefix, ci.ChatID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) streamChatInfo() (*ChatInfo, error) {
|
func (ci *ChatInfo) StreamUrl() string {
|
||||||
|
return fmt.Sprintf("%s/chat/api/chat/%s/stream", ci.UrlPrefix, ci.ChatID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) ChatInfo() error {
|
||||||
|
ci, err := c.getChatInfo()
|
||||||
|
if err != nil {
|
||||||
|
return pkgErr("error getting chat info", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.chatInfo = ci
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) getChatInfo() (*ChatInfo, error) {
|
||||||
if c.StreamUrl == "" {
|
if c.StreamUrl == "" {
|
||||||
return nil, fmt.Errorf("stream url is empty")
|
return nil, fmt.Errorf("stream url is empty")
|
||||||
}
|
}
|
||||||
|
@ -97,9 +115,15 @@ func (c *Client) Chat(asChannel bool, message string) error {
|
||||||
return pkgErr("", fmt.Errorf("http client is nil"))
|
return pkgErr("", fmt.Errorf("http client is nil"))
|
||||||
}
|
}
|
||||||
|
|
||||||
chatInfo, err := c.streamChatInfo()
|
// chatInfo, err := c.streamChatInfo()
|
||||||
|
// if err != nil {
|
||||||
|
// return pkgErr("error getting stream chat info", err)
|
||||||
|
// }
|
||||||
|
if c.chatInfo == nil {
|
||||||
|
err := c.ChatInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pkgErr("error getting stream chat info", err)
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
requestID, err := random.String(32)
|
requestID, err := random.String(32)
|
||||||
|
@ -117,7 +141,7 @@ func (c *Client) Chat(asChannel bool, message string) error {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if asChannel {
|
if asChannel {
|
||||||
body.Data.ChannelID = &chatInfo.ChannelID
|
body.Data.ChannelID = &c.chatInfo.ChannelID
|
||||||
}
|
}
|
||||||
|
|
||||||
bodyB, err := json.Marshal(body)
|
bodyB, err := json.Marshal(body)
|
||||||
|
@ -125,7 +149,7 @@ func (c *Client) Chat(asChannel bool, message string) error {
|
||||||
return pkgErr("error marshaling request body into json", err)
|
return pkgErr("error marshaling request body into json", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.httpClient.Post(chatInfo.Url(), "application/json", bytes.NewReader(bodyB))
|
resp, err := c.httpClient.Post(c.chatInfo.MessageUrl(), "application/json", bytes.NewReader(bodyB))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return pkgErr("http Post request returned error", err)
|
return pkgErr("http Post request returned error", err)
|
||||||
}
|
}
|
||||||
|
@ -147,3 +171,205 @@ func (c *Client) Chat(asChannel bool, message string) error {
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ChatStream struct {
|
||||||
|
sseClient *sse.Client
|
||||||
|
sseEvent chan *sse.Event
|
||||||
|
stop context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEventChannel struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Image1 string `json:"image.1"`
|
||||||
|
Link string `json:"link"`
|
||||||
|
Username string `json:"username"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEventBlockData struct {
|
||||||
|
Text string `json:"text"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEventBlock struct {
|
||||||
|
Data ChatEventBlockData `json:"data"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEventRant struct {
|
||||||
|
Duration int `json:"duration"`
|
||||||
|
ExpiresOn string `json:"expires_on"`
|
||||||
|
PriceCents int `json:"price_cents"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEventMessage struct {
|
||||||
|
Blocks []ChatEventBlock `json:"blocks"`
|
||||||
|
ChannelID *int64 `json:"channel_id"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
Rant *ChatEventRant `json:"rant"`
|
||||||
|
Text string `json:"text"`
|
||||||
|
Time string `json:"time"`
|
||||||
|
UserID string `json:"user_id"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEventUser struct {
|
||||||
|
Badges []string `json:"badges"`
|
||||||
|
Color string `json:"color"`
|
||||||
|
ID string `json:"id"`
|
||||||
|
Image1 string `json:"image.1"`
|
||||||
|
IsFollower bool `json:"is_follower"`
|
||||||
|
Link string `json:"link"`
|
||||||
|
Username string `json:"username"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEventData struct {
|
||||||
|
Channels []ChatEventChannel `json:"channels"`
|
||||||
|
Messages []ChatEventMessage `json:"messages"`
|
||||||
|
Users []ChatEventUser `json:"users"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatEvent struct {
|
||||||
|
Data ChatEventData `json:"data"`
|
||||||
|
RequestID string `json:"request_id"`
|
||||||
|
Type string `json:"type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) StartChatStream(handle func(cv ChatView), handleError func(err error)) error {
|
||||||
|
c.chatStreamMu.Lock()
|
||||||
|
defer c.chatStreamMu.Unlock()
|
||||||
|
if c.chatStream != nil {
|
||||||
|
return pkgErr("", fmt.Errorf("chat stream already started"))
|
||||||
|
}
|
||||||
|
sseEvent := make(chan *sse.Event)
|
||||||
|
sseCl := sse.NewClient(c.chatInfo.StreamUrl())
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
sseCl.ReconnectStrategy = backoff.WithContext(
|
||||||
|
backoff.NewExponentialBackOff(),
|
||||||
|
ctx,
|
||||||
|
)
|
||||||
|
|
||||||
|
err := sseCl.SubscribeChan("", sseEvent)
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return pkgErr("error subscribing to chat stream", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
streamCtx, stop := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
c.chatStream = &ChatStream{sseClient: sseCl, sseEvent: sseEvent, stop: stop}
|
||||||
|
go startChatStream(streamCtx, sseEvent, handle, handleError)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) StopChatStream() {
|
||||||
|
c.chatStreamMu.Lock()
|
||||||
|
defer c.chatStreamMu.Unlock()
|
||||||
|
// TODO: what order should these be in?
|
||||||
|
c.chatStream.sseClient.Unsubscribe(c.chatStream.sseEvent)
|
||||||
|
c.chatStream.stop()
|
||||||
|
c.chatStream = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func startChatStream(ctx context.Context, event chan *sse.Event, handle func(cv ChatView), handleError func(err error)) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case msg := <-event:
|
||||||
|
if msg == nil {
|
||||||
|
handleError(fmt.Errorf("received nil event"))
|
||||||
|
} else {
|
||||||
|
chats, err := parseEvent(msg.Data)
|
||||||
|
if err != nil {
|
||||||
|
handleError(err)
|
||||||
|
} else {
|
||||||
|
for _, chat := range chats {
|
||||||
|
handle(chat)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ChatView struct {
|
||||||
|
Badges []string
|
||||||
|
Color string
|
||||||
|
ImageUrl string
|
||||||
|
IsFollower bool
|
||||||
|
Rant int
|
||||||
|
Text string
|
||||||
|
Username string
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseEvent(event []byte) ([]ChatView, error) {
|
||||||
|
var ce ChatEvent
|
||||||
|
err := json.Unmarshal(event, &ce)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error un-marshaling event: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
users := chatUsers(ce.Data.Users)
|
||||||
|
channels := chatChannels(ce.Data.Channels)
|
||||||
|
|
||||||
|
messages, err := parseMessages(ce.Data.Messages, users, channels)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error parsing messages: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func chatUsers(users []ChatEventUser) map[string]ChatEventUser {
|
||||||
|
usersMap := map[string]ChatEventUser{}
|
||||||
|
for _, user := range users {
|
||||||
|
usersMap[user.ID] = user
|
||||||
|
}
|
||||||
|
|
||||||
|
return usersMap
|
||||||
|
}
|
||||||
|
|
||||||
|
func chatChannels(channels []ChatEventChannel) map[string]ChatEventChannel {
|
||||||
|
channelsMap := map[string]ChatEventChannel{}
|
||||||
|
for _, channel := range channels {
|
||||||
|
channelsMap[channel.ID] = channel
|
||||||
|
}
|
||||||
|
|
||||||
|
return channelsMap
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseMessages(messages []ChatEventMessage, users map[string]ChatEventUser, channels map[string]ChatEventChannel) ([]ChatView, error) {
|
||||||
|
views := []ChatView{}
|
||||||
|
for _, message := range messages {
|
||||||
|
var view ChatView
|
||||||
|
user, exists := users[message.UserID]
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("user ID does not exist: %s", message.UserID)
|
||||||
|
}
|
||||||
|
|
||||||
|
view.Badges = user.Badges
|
||||||
|
view.Color = user.Color
|
||||||
|
view.ImageUrl = user.Image1
|
||||||
|
view.IsFollower = user.IsFollower
|
||||||
|
view.Text = message.Text
|
||||||
|
if message.Rant != nil {
|
||||||
|
view.Rant = message.Rant.PriceCents
|
||||||
|
}
|
||||||
|
view.Username = user.Username
|
||||||
|
|
||||||
|
if message.ChannelID != nil {
|
||||||
|
cid := strconv.Itoa(int(*message.ChannelID))
|
||||||
|
channel, exists := channels[cid]
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("channel ID does not exist: %d", *message.ChannelID)
|
||||||
|
}
|
||||||
|
|
||||||
|
view.ImageUrl = channel.Image1
|
||||||
|
view.Username = channel.Username
|
||||||
|
}
|
||||||
|
|
||||||
|
views = append(views, view)
|
||||||
|
}
|
||||||
|
|
||||||
|
return views, nil
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"net/http/cookiejar"
|
"net/http/cookiejar"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/robertkrimen/otto"
|
"github.com/robertkrimen/otto"
|
||||||
)
|
)
|
||||||
|
@ -22,6 +23,9 @@ const (
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
chatInfo *ChatInfo
|
||||||
|
chatStream *ChatStream
|
||||||
|
chatStreamMu sync.Mutex
|
||||||
StreamKey string
|
StreamKey string
|
||||||
StreamUrl string
|
StreamUrl string
|
||||||
}
|
}
|
||||||
|
@ -53,7 +57,7 @@ func NewClient(streamKey string, streamUrl string) (*Client, error) {
|
||||||
return nil, pkgErr("error creating http client", err)
|
return nil, pkgErr("error creating http client", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Client{cl, streamKey, streamUrl}, nil
|
return &Client{httpClient: cl, StreamKey: streamKey, StreamUrl: streamUrl}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHttpClient() (*http.Client, error) {
|
func newHttpClient() (*http.Client, error) {
|
||||||
|
|
3
go.mod
3
go.mod
|
@ -3,11 +3,14 @@ module github.com/tylertravisty/rumble-livestream-lib-go
|
||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/r3labs/sse/v2 v2.10.0
|
||||||
github.com/robertkrimen/otto v0.2.1
|
github.com/robertkrimen/otto v0.2.1
|
||||||
github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909
|
github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect
|
||||||
golang.org/x/text v0.4.0 // indirect
|
golang.org/x/text v0.4.0 // indirect
|
||||||
|
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
|
||||||
gopkg.in/sourcemap.v1 v1.0.5 // indirect
|
gopkg.in/sourcemap.v1 v1.0.5 // indirect
|
||||||
)
|
)
|
||||||
|
|
15
go.sum
15
go.sum
|
@ -1,12 +1,27 @@
|
||||||
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
|
||||||
|
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
|
||||||
github.com/robertkrimen/otto v0.2.1 h1:FVP0PJ0AHIjC+N4pKCG9yCDz6LHNPCwi/GKID5pGGF0=
|
github.com/robertkrimen/otto v0.2.1 h1:FVP0PJ0AHIjC+N4pKCG9yCDz6LHNPCwi/GKID5pGGF0=
|
||||||
github.com/robertkrimen/otto v0.2.1/go.mod h1:UPwtJ1Xu7JrLcZjNWN8orJaM5n5YEtqL//farB5FlRY=
|
github.com/robertkrimen/otto v0.2.1/go.mod h1:UPwtJ1Xu7JrLcZjNWN8orJaM5n5YEtqL//farB5FlRY=
|
||||||
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||||
github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909 h1:xrjIFqzGQXlCrCdMPpW6+SodGFSlrQ3ZNUCr3f5tF1g=
|
github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909 h1:xrjIFqzGQXlCrCdMPpW6+SodGFSlrQ3ZNUCr3f5tF1g=
|
||||||
github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909/go.mod h1:2W31Jhs9YSy7y500wsCOW0bcamGi9foQV1CKrfvfTxk=
|
github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909/go.mod h1:2W31Jhs9YSy7y500wsCOW0bcamGi9foQV1CKrfvfTxk=
|
||||||
|
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||||
|
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
|
||||||
|
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||||
|
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||||
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
|
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
|
||||||
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||||
|
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
|
||||||
|
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI=
|
gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI=
|
||||||
gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78=
|
gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78=
|
||||||
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
|
Loading…
Reference in a new issue