From 651f63d3061a2d86b1bae3a3fb1b0fa02d9901d8 Mon Sep 17 00:00:00 2001 From: tyler Date: Fri, 19 Jan 2024 12:54:34 -0500 Subject: [PATCH] Added function to receive chats from live stream --- chat.go | 240 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- client.go | 12 ++- go.mod | 3 + go.sum | 15 ++++ 4 files changed, 259 insertions(+), 11 deletions(-) diff --git a/chat.go b/chat.go index b049a40..075751b 100644 --- a/chat.go +++ b/chat.go @@ -3,14 +3,18 @@ package rumblelivestreamlib import ( "bufio" "bytes" + "context" "encoding/csv" "encoding/json" "fmt" "net/http" "strconv" "strings" + "time" + "github.com/r3labs/sse/v2" "github.com/tylertravisty/go-utils/random" + "gopkg.in/cenkalti/backoff.v1" ) type ChatInfo struct { @@ -19,11 +23,25 @@ type ChatInfo struct { ChannelID int } -func (ci *ChatInfo) Url() string { +func (ci *ChatInfo) MessageUrl() string { return fmt.Sprintf("%s/chat/%s/message", ci.UrlPrefix, ci.ChatID) } -func (c *Client) streamChatInfo() (*ChatInfo, error) { +func (ci *ChatInfo) StreamUrl() string { + return fmt.Sprintf("%s/chat/api/chat/%s/stream", ci.UrlPrefix, ci.ChatID) +} + +func (c *Client) ChatInfo() error { + ci, err := c.getChatInfo() + if err != nil { + return pkgErr("error getting chat info", err) + } + + c.chatInfo = ci + return nil +} + +func (c *Client) getChatInfo() (*ChatInfo, error) { if c.StreamUrl == "" { return nil, fmt.Errorf("stream url is empty") } @@ -97,9 +115,15 @@ func (c *Client) Chat(asChannel bool, message string) error { return pkgErr("", fmt.Errorf("http client is nil")) } - chatInfo, err := c.streamChatInfo() - if err != nil { - return pkgErr("error getting stream chat info", err) + // chatInfo, err := c.streamChatInfo() + // if err != nil { + // return pkgErr("error getting stream chat info", err) + // } + if c.chatInfo == nil { + err := c.ChatInfo() + if err != nil { + return err + } } requestID, err := random.String(32) @@ -117,7 +141,7 @@ func (c *Client) Chat(asChannel bool, message string) error { }, } if asChannel { - body.Data.ChannelID = &chatInfo.ChannelID + body.Data.ChannelID = &c.chatInfo.ChannelID } bodyB, err := json.Marshal(body) @@ -125,7 +149,7 @@ func (c *Client) Chat(asChannel bool, message string) error { return pkgErr("error marshaling request body into json", err) } - resp, err := c.httpClient.Post(chatInfo.Url(), "application/json", bytes.NewReader(bodyB)) + resp, err := c.httpClient.Post(c.chatInfo.MessageUrl(), "application/json", bytes.NewReader(bodyB)) if err != nil { return pkgErr("http Post request returned error", err) } @@ -147,3 +171,205 @@ func (c *Client) Chat(asChannel bool, message string) error { return nil } + +type ChatStream struct { + sseClient *sse.Client + sseEvent chan *sse.Event + stop context.CancelFunc +} + +type ChatEventChannel struct { + ID string `json:"id"` + Image1 string `json:"image.1"` + Link string `json:"link"` + Username string `json:"username"` +} + +type ChatEventBlockData struct { + Text string `json:"text"` +} + +type ChatEventBlock struct { + Data ChatEventBlockData `json:"data"` + Type string `json:"type"` +} + +type ChatEventRant struct { + Duration int `json:"duration"` + ExpiresOn string `json:"expires_on"` + PriceCents int `json:"price_cents"` +} + +type ChatEventMessage struct { + Blocks []ChatEventBlock `json:"blocks"` + ChannelID *int64 `json:"channel_id"` + ID string `json:"id"` + Rant *ChatEventRant `json:"rant"` + Text string `json:"text"` + Time string `json:"time"` + UserID string `json:"user_id"` +} + +type ChatEventUser struct { + Badges []string `json:"badges"` + Color string `json:"color"` + ID string `json:"id"` + Image1 string `json:"image.1"` + IsFollower bool `json:"is_follower"` + Link string `json:"link"` + Username string `json:"username"` +} + +type ChatEventData struct { + Channels []ChatEventChannel `json:"channels"` + Messages []ChatEventMessage `json:"messages"` + Users []ChatEventUser `json:"users"` +} + +type ChatEvent struct { + Data ChatEventData `json:"data"` + RequestID string `json:"request_id"` + Type string `json:"type"` +} + +func (c *Client) StartChatStream(handle func(cv ChatView), handleError func(err error)) error { + c.chatStreamMu.Lock() + defer c.chatStreamMu.Unlock() + if c.chatStream != nil { + return pkgErr("", fmt.Errorf("chat stream already started")) + } + sseEvent := make(chan *sse.Event) + sseCl := sse.NewClient(c.chatInfo.StreamUrl()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + sseCl.ReconnectStrategy = backoff.WithContext( + backoff.NewExponentialBackOff(), + ctx, + ) + + err := sseCl.SubscribeChan("", sseEvent) + if err != nil { + cancel() + return pkgErr("error subscribing to chat stream", err) + } + + streamCtx, stop := context.WithCancel(context.Background()) + + c.chatStream = &ChatStream{sseClient: sseCl, sseEvent: sseEvent, stop: stop} + go startChatStream(streamCtx, sseEvent, handle, handleError) + + return nil +} + +func (c *Client) StopChatStream() { + c.chatStreamMu.Lock() + defer c.chatStreamMu.Unlock() + // TODO: what order should these be in? + c.chatStream.sseClient.Unsubscribe(c.chatStream.sseEvent) + c.chatStream.stop() + c.chatStream = nil +} + +func startChatStream(ctx context.Context, event chan *sse.Event, handle func(cv ChatView), handleError func(err error)) { + for { + select { + case <-ctx.Done(): + return + case msg := <-event: + if msg == nil { + handleError(fmt.Errorf("received nil event")) + } else { + chats, err := parseEvent(msg.Data) + if err != nil { + handleError(err) + } else { + for _, chat := range chats { + handle(chat) + } + } + } + } + } +} + +type ChatView struct { + Badges []string + Color string + ImageUrl string + IsFollower bool + Rant int + Text string + Username string +} + +func parseEvent(event []byte) ([]ChatView, error) { + var ce ChatEvent + err := json.Unmarshal(event, &ce) + if err != nil { + return nil, fmt.Errorf("error un-marshaling event: %v", err) + } + + users := chatUsers(ce.Data.Users) + channels := chatChannels(ce.Data.Channels) + + messages, err := parseMessages(ce.Data.Messages, users, channels) + if err != nil { + return nil, fmt.Errorf("error parsing messages: %v", err) + } + + return messages, nil + +} + +func chatUsers(users []ChatEventUser) map[string]ChatEventUser { + usersMap := map[string]ChatEventUser{} + for _, user := range users { + usersMap[user.ID] = user + } + + return usersMap +} + +func chatChannels(channels []ChatEventChannel) map[string]ChatEventChannel { + channelsMap := map[string]ChatEventChannel{} + for _, channel := range channels { + channelsMap[channel.ID] = channel + } + + return channelsMap +} + +func parseMessages(messages []ChatEventMessage, users map[string]ChatEventUser, channels map[string]ChatEventChannel) ([]ChatView, error) { + views := []ChatView{} + for _, message := range messages { + var view ChatView + user, exists := users[message.UserID] + if !exists { + return nil, fmt.Errorf("user ID does not exist: %s", message.UserID) + } + + view.Badges = user.Badges + view.Color = user.Color + view.ImageUrl = user.Image1 + view.IsFollower = user.IsFollower + view.Text = message.Text + if message.Rant != nil { + view.Rant = message.Rant.PriceCents + } + view.Username = user.Username + + if message.ChannelID != nil { + cid := strconv.Itoa(int(*message.ChannelID)) + channel, exists := channels[cid] + if !exists { + return nil, fmt.Errorf("channel ID does not exist: %d", *message.ChannelID) + } + + view.ImageUrl = channel.Image1 + view.Username = channel.Username + } + + views = append(views, view) + } + + return views, nil +} diff --git a/client.go b/client.go index 4d1c8ff..7514ed9 100644 --- a/client.go +++ b/client.go @@ -8,6 +8,7 @@ import ( "net/http/cookiejar" "net/url" "strings" + "sync" "github.com/robertkrimen/otto" ) @@ -21,9 +22,12 @@ const ( ) type Client struct { - httpClient *http.Client - StreamKey string - StreamUrl string + httpClient *http.Client + chatInfo *ChatInfo + chatStream *ChatStream + chatStreamMu sync.Mutex + StreamKey string + StreamUrl string } func (c *Client) cookies() ([]*http.Cookie, error) { @@ -53,7 +57,7 @@ func NewClient(streamKey string, streamUrl string) (*Client, error) { return nil, pkgErr("error creating http client", err) } - return &Client{cl, streamKey, streamUrl}, nil + return &Client{httpClient: cl, StreamKey: streamKey, StreamUrl: streamUrl}, nil } func newHttpClient() (*http.Client, error) { diff --git a/go.mod b/go.mod index e1d4d51..1c86821 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,14 @@ module github.com/tylertravisty/rumble-livestream-lib-go go 1.19 require ( + github.com/r3labs/sse/v2 v2.10.0 github.com/robertkrimen/otto v0.2.1 github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909 ) require ( + golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect golang.org/x/text v0.4.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect ) diff --git a/go.sum b/go.sum index 5743d45..84ea1d7 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,27 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/robertkrimen/otto v0.2.1 h1:FVP0PJ0AHIjC+N4pKCG9yCDz6LHNPCwi/GKID5pGGF0= github.com/robertkrimen/otto v0.2.1/go.mod h1:UPwtJ1Xu7JrLcZjNWN8orJaM5n5YEtqL//farB5FlRY= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909 h1:xrjIFqzGQXlCrCdMPpW6+SodGFSlrQ3ZNUCr3f5tF1g= github.com/tylertravisty/go-utils v0.0.0-20230524204414-6893ae548909/go.mod h1:2W31Jhs9YSy7y500wsCOW0bcamGi9foQV1CKrfvfTxk= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=