399 lines
9.5 KiB
Go
399 lines
9.5 KiB
Go
package rumblelivestreamlib
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"encoding/csv"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/r3labs/sse/v2"
|
|
"github.com/tylertravisty/go-utils/random"
|
|
"gopkg.in/cenkalti/backoff.v1"
|
|
)
|
|
|
|
type ChatInfo struct {
|
|
UrlPrefix string
|
|
ChatID string
|
|
ChannelID int
|
|
}
|
|
|
|
func (ci *ChatInfo) MessageUrl() string {
|
|
return fmt.Sprintf("%s/chat/%s/message", ci.UrlPrefix, ci.ChatID)
|
|
}
|
|
|
|
func (ci *ChatInfo) StreamUrl() string {
|
|
return fmt.Sprintf("%s/chat/%s/stream", ci.UrlPrefix, ci.ChatID)
|
|
}
|
|
|
|
func (c *Client) ChatInfo() error {
|
|
ci, err := c.getChatInfo()
|
|
if err != nil {
|
|
return pkgErr("error getting chat info", err)
|
|
}
|
|
|
|
c.chatInfo = ci
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) getChatInfo() (*ChatInfo, error) {
|
|
if c.StreamUrl == "" {
|
|
return nil, fmt.Errorf("stream url is empty")
|
|
}
|
|
|
|
resp, err := c.getWebpage(c.StreamUrl)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting stream webpage: %v", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
r := bufio.NewReader(resp.Body)
|
|
line, _, err := r.ReadLine()
|
|
var lineS string
|
|
for err == nil {
|
|
lineS = string(line)
|
|
if strings.Contains(lineS, "RumbleChat(") {
|
|
start := strings.Index(lineS, "RumbleChat(") + len("RumbleChat(")
|
|
end := strings.Index(lineS[start:], ");")
|
|
argsS := strings.ReplaceAll(lineS[start:start+end], ", ", ",")
|
|
argsS = strings.Replace(argsS, "[", "\"[", 1)
|
|
n := strings.LastIndex(argsS, "]")
|
|
argsS = argsS[:n] + "]\"" + argsS[n+1:]
|
|
c := csv.NewReader(strings.NewReader(argsS))
|
|
args, err := c.ReadAll()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing csv: %v", err)
|
|
}
|
|
info := args[0]
|
|
channelID, err := strconv.Atoi(info[5])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error converting channel ID argument string to int: %v", err)
|
|
}
|
|
return &ChatInfo{info[0], info[1], channelID}, nil
|
|
}
|
|
line, _, err = r.ReadLine()
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error reading line from stream webpage: %v", err)
|
|
}
|
|
|
|
return nil, fmt.Errorf("did not find RumbleChat function call")
|
|
}
|
|
|
|
type ChatMessage struct {
|
|
Text string `json:"text"`
|
|
}
|
|
|
|
type ChatData struct {
|
|
RequestID string `json:"request_id"`
|
|
Message ChatMessage `json:"message"`
|
|
Rant *string `json:"rant"`
|
|
ChannelID *int `json:"channel_id"`
|
|
}
|
|
|
|
type ChatRequest struct {
|
|
Data ChatData `json:"data"`
|
|
}
|
|
|
|
type Error struct {
|
|
Code string `json:"code"`
|
|
Message string `json:"message"`
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
type ChatResponse struct {
|
|
Errors []Error `json:"errors"`
|
|
}
|
|
|
|
func (c *Client) Chat(asChannel bool, message string) error {
|
|
if c.httpClient == nil {
|
|
return pkgErr("", fmt.Errorf("http client is nil"))
|
|
}
|
|
|
|
// chatInfo, err := c.streamChatInfo()
|
|
// if err != nil {
|
|
// return pkgErr("error getting stream chat info", err)
|
|
// }
|
|
if c.chatInfo == nil {
|
|
err := c.ChatInfo()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
requestID, err := random.String(32)
|
|
if err != nil {
|
|
return pkgErr("error generating request ID", err)
|
|
}
|
|
body := ChatRequest{
|
|
Data: ChatData{
|
|
RequestID: requestID,
|
|
Message: ChatMessage{
|
|
Text: message,
|
|
},
|
|
Rant: nil,
|
|
ChannelID: nil,
|
|
},
|
|
}
|
|
if asChannel {
|
|
body.Data.ChannelID = &c.chatInfo.ChannelID
|
|
}
|
|
|
|
bodyB, err := json.Marshal(body)
|
|
if err != nil {
|
|
return pkgErr("error marshaling request body into json", err)
|
|
}
|
|
|
|
resp, err := c.httpClient.Post(c.chatInfo.MessageUrl(), "application/json", bytes.NewReader(bodyB))
|
|
if err != nil {
|
|
return pkgErr("http Post request returned error", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("http Post response status not %s: %s", http.StatusText(http.StatusOK), resp.Status)
|
|
}
|
|
|
|
var cr ChatResponse
|
|
err = json.NewDecoder(strings.NewReader(string(bodyB))).Decode(&cr)
|
|
if err != nil {
|
|
return fmt.Errorf("error decoding response body from server: %v", err)
|
|
}
|
|
|
|
if len(cr.Errors) != 0 {
|
|
return fmt.Errorf("server returned an error: %s", cr.Errors[0].Message)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type ChatStream struct {
|
|
sseClient *sse.Client
|
|
sseEvent chan *sse.Event
|
|
stop context.CancelFunc
|
|
}
|
|
|
|
type ChatEventChannel struct {
|
|
ID string `json:"id"`
|
|
Image1 string `json:"image.1"`
|
|
Link string `json:"link"`
|
|
Username string `json:"username"`
|
|
}
|
|
|
|
type ChatEventBlockData struct {
|
|
Text string `json:"text"`
|
|
}
|
|
|
|
type ChatEventBlock struct {
|
|
Data ChatEventBlockData `json:"data"`
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
type ChatEventRant struct {
|
|
Duration int `json:"duration"`
|
|
ExpiresOn string `json:"expires_on"`
|
|
PriceCents int `json:"price_cents"`
|
|
}
|
|
|
|
type ChatEventMessage struct {
|
|
Blocks []ChatEventBlock `json:"blocks"`
|
|
ChannelID *int64 `json:"channel_id"`
|
|
ID string `json:"id"`
|
|
Rant *ChatEventRant `json:"rant"`
|
|
Text string `json:"text"`
|
|
Time string `json:"time"`
|
|
UserID string `json:"user_id"`
|
|
}
|
|
|
|
type ChatEventUser struct {
|
|
Badges []string `json:"badges"`
|
|
Color string `json:"color"`
|
|
ID string `json:"id"`
|
|
Image1 string `json:"image.1"`
|
|
IsFollower bool `json:"is_follower"`
|
|
Link string `json:"link"`
|
|
Username string `json:"username"`
|
|
}
|
|
|
|
type ChatEventData struct {
|
|
Channels []ChatEventChannel `json:"channels"`
|
|
Messages []ChatEventMessage `json:"messages"`
|
|
Users []ChatEventUser `json:"users"`
|
|
}
|
|
|
|
type ChatEvent struct {
|
|
Data ChatEventData `json:"data"`
|
|
RequestID string `json:"request_id"`
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
type ChatEventDataNoChannels struct {
|
|
// TODO: change [][]string to [][]any and test
|
|
Channels [][]string `json:"channels"`
|
|
Messages []ChatEventMessage `json:"messages"`
|
|
Users []ChatEventUser `json:"users"`
|
|
}
|
|
|
|
type ChatEventNoChannels struct {
|
|
Data ChatEventDataNoChannels `json:"data"`
|
|
RequestID string `json:"request_id"`
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
func (c *Client) StartChatStream(handle func(cv ChatView), handleError func(err error)) error {
|
|
c.chatStreamMu.Lock()
|
|
defer c.chatStreamMu.Unlock()
|
|
if c.chatStream != nil {
|
|
return pkgErr("", fmt.Errorf("chat stream already started"))
|
|
}
|
|
sseEvent := make(chan *sse.Event)
|
|
sseCl := sse.NewClient(c.chatInfo.StreamUrl())
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
sseCl.ReconnectStrategy = backoff.WithContext(
|
|
backoff.NewExponentialBackOff(),
|
|
ctx,
|
|
)
|
|
|
|
err := sseCl.SubscribeChan("", sseEvent)
|
|
if err != nil {
|
|
cancel()
|
|
return pkgErr(fmt.Sprintf("error subscribing to chat stream %s", c.chatInfo.StreamUrl()), err)
|
|
}
|
|
|
|
streamCtx, stop := context.WithCancel(context.Background())
|
|
|
|
c.chatStream = &ChatStream{sseClient: sseCl, sseEvent: sseEvent, stop: stop}
|
|
go startChatStream(streamCtx, sseEvent, handle, handleError)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Client) StopChatStream() {
|
|
c.chatStreamMu.Lock()
|
|
defer c.chatStreamMu.Unlock()
|
|
// TODO: what order should these be in?
|
|
c.chatStream.sseClient.Unsubscribe(c.chatStream.sseEvent)
|
|
c.chatStream.stop()
|
|
c.chatStream = nil
|
|
}
|
|
|
|
func startChatStream(ctx context.Context, event chan *sse.Event, handle func(cv ChatView), handleError func(err error)) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case msg := <-event:
|
|
if msg == nil {
|
|
handleError(fmt.Errorf("received nil event"))
|
|
} else {
|
|
chats, err := parseEvent(msg.Data)
|
|
if err != nil {
|
|
handleError(err)
|
|
} else {
|
|
for _, chat := range chats {
|
|
handle(chat)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type ChatView struct {
|
|
Badges []string
|
|
Color string
|
|
ImageUrl string
|
|
Init bool
|
|
IsFollower bool
|
|
Rant int
|
|
Text string
|
|
Type string
|
|
Username string
|
|
}
|
|
|
|
func parseEvent(event []byte) ([]ChatView, error) {
|
|
var ce ChatEvent
|
|
err := json.Unmarshal(event, &ce)
|
|
if err != nil {
|
|
var cenc ChatEventNoChannels
|
|
errNC := json.Unmarshal(event, &cenc)
|
|
if errNC != nil {
|
|
return nil, fmt.Errorf("error un-marshaling event: %v", err)
|
|
}
|
|
|
|
ce.Data.Messages = cenc.Data.Messages
|
|
ce.Data.Users = cenc.Data.Users
|
|
ce.Type = cenc.Type
|
|
}
|
|
|
|
users := chatUsers(ce.Data.Users)
|
|
channels := chatChannels(ce.Data.Channels)
|
|
|
|
messages, err := parseMessages(ce.Type, ce.Data.Messages, users, channels)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing messages: %v", err)
|
|
}
|
|
|
|
return messages, nil
|
|
}
|
|
|
|
func chatUsers(users []ChatEventUser) map[string]ChatEventUser {
|
|
usersMap := map[string]ChatEventUser{}
|
|
for _, user := range users {
|
|
usersMap[user.ID] = user
|
|
}
|
|
|
|
return usersMap
|
|
}
|
|
|
|
func chatChannels(channels []ChatEventChannel) map[string]ChatEventChannel {
|
|
channelsMap := map[string]ChatEventChannel{}
|
|
for _, channel := range channels {
|
|
channelsMap[channel.ID] = channel
|
|
}
|
|
|
|
return channelsMap
|
|
}
|
|
|
|
func parseMessages(eventType string, messages []ChatEventMessage, users map[string]ChatEventUser, channels map[string]ChatEventChannel) ([]ChatView, error) {
|
|
views := []ChatView{}
|
|
for _, message := range messages {
|
|
var view ChatView
|
|
user, exists := users[message.UserID]
|
|
if !exists {
|
|
return nil, fmt.Errorf("user ID does not exist: %s", message.UserID)
|
|
}
|
|
|
|
view.Badges = user.Badges
|
|
view.Color = user.Color
|
|
view.ImageUrl = user.Image1
|
|
view.IsFollower = user.IsFollower
|
|
if message.Rant != nil {
|
|
view.Rant = message.Rant.PriceCents
|
|
}
|
|
view.Text = message.Text
|
|
view.Type = eventType
|
|
view.Username = user.Username
|
|
|
|
if message.ChannelID != nil {
|
|
cid := strconv.Itoa(int(*message.ChannelID))
|
|
channel, exists := channels[cid]
|
|
if !exists {
|
|
return nil, fmt.Errorf("channel ID does not exist: %d", *message.ChannelID)
|
|
}
|
|
|
|
view.ImageUrl = channel.Image1
|
|
view.Username = channel.Username
|
|
}
|
|
|
|
views = append(views, view)
|
|
}
|
|
|
|
return views, nil
|
|
}
|