This commit is contained in:
tyler 2024-06-14 13:13:49 -04:00
parent 2a02e7255b
commit e9ae892666
7 changed files with 511 additions and 9 deletions

2
.gitignore vendored
View file

@ -19,3 +19,5 @@
# Go workspace file # Go workspace file
go.work go.work
.prettierignore

2
NOTES.md Normal file
View file

@ -0,0 +1,2 @@
Should connections be managed in the client?
- Keep API, Chat, etc. conns in client?

281
chat.go Normal file
View file

@ -0,0 +1,281 @@
package rumble
import (
"context"
"encoding/json"
"github.com/r3labs/sse"
)
const (
ChatBadgeRecurringSubscription = "recurring_subscription"
ChatBadgeLocalsSupporter = "locals_supporter"
ChatTypeInit = "init"
ChatTypeMessages = "messages"
ChatTypeMuteUsers = "mute_users"
ChatTypeDeleteMessages = "delete_messages"
ChatTypeSubscriber = "locals_supporter"
ChatTypeRaiding = "raid_confirmed"
ChatTypePinMessage = "pin_message"
ChatTypeUnpinMessage = "unpin_message"
)
type ChatEventChannels struct {
ErrorCh chan error
}
type RegisterChatEventsOptions struct {
}
// TODO: for every true bool, create channel, otherwise nil.
// TODO: always create error channel
func RegisterChatEvents(opts RegisterChatEventsOptions) *ChatEventChannels {
}
type chatClient struct {
cancel context.CancelFunc
sseCancel context.CancelFunc
sseCl *sse.Client
sseEvent chan *sse.Event
}
type RawChatEventChannel struct {
ID string `json:"id"`
Image1 string `json:"image.1"`
Link string `json:"link"`
Username string `json:"username"`
}
type RawChatEventBlockData struct {
Text string `json:"text"`
}
type RawChatEventBlock struct {
Data RawChatEventBlockData `json:"data"`
Type string `json:"type"`
}
type RawChatEventNotification struct {
Badge string `json:"badge"`
Text string `json:"text"`
}
type RawChatEventRaidNotification struct {
StartTs int64 `json:"start_ts"`
}
type RawChatEventRant struct {
Duration int `json:"duration"`
ExpiresOn string `json:"expires_on"`
PriceCents int `json:"price_cents"`
}
type RawChatEventMessage struct {
Blocks []RawChatEventBlock `json:"blocks"`
ChannelID *int64 `json:"channel_id"`
ID string `json:"id"`
Notification *RawChatEventNotification `json:"notification"`
RaidNotification *RawChatEventRaidNotification `json:"raid_notification"`
Rant *RawChatEventRant `json:"rant"`
Text string `json:"text"`
Time string `json:"time"`
UserID string `json:"user_id"`
}
type RawChatEventUser struct {
Badges []string `json:"badges"`
Color string `json:"color"`
ID string `json:"id"`
Image1 string `json:"image.1"`
IsFollower bool `json:"is_follower"`
Link string `json:"link"`
Username string `json:"username"`
}
type RawChatEventData struct {
Channels []RawChatEventChannel `json:"channels"`
Messages []RawChatEventMessage `json:"messages"`
Users []RawChatEventUser `json:"users"`
}
type RawChatEvent struct {
Data RawChatEventData `json:"data"`
RequestID string `json:"request_id"`
Type string `json:"type"`
}
type RawChatEventAny struct {
Data any `json:"data"`
Type string `json:"type"`
}
type RawChatEventDataNoChannels struct {
// Channels [][]string `json:"channels"`
Channels [][]any `json:"channels"`
Messages []RawChatEventMessage `json:"messages"`
Users []RawChatEventUser `json:"users"`
}
type RawChatEventNoChannels struct {
Data RawChatEventDataNoChannels `json:"data"`
RequestID string `json:"request_id"`
Type string `json:"type"`
}
type ChatBase struct {
Data any `json:"data"`
Type string `json:"type"`
}
func (rce RawChatEvent) users() map[string]RawChatEventUser {
uMap := map[string]RawChatEventUser{}
for _, user := range rce.Data.Users {
uMap[user.ID] = user
}
return uMap
}
func (rce RawChatEvent) channels() map[string]RawChatEventChannel {
cMap := map[string]RawChatEventChannel{}
for _, channel := range rce.Data.Channels {
cMap[channel.ID] = channel
}
return cMap
}
// func (rce RawChatEvent) parse() ([]ChatEvent, error) {
// events := []ChatEvent{}
// users := rce.users()
// channels := rce.channels()
// for _, message := range rce.Data.Messages {
// var event ChatEvent
// user, exists := users[message.UserID]
// if !exists {
// return nil, fmt.Errorf("missing user information for user ID: %s", message.UserID)
// }
// event.Badges = user.Badges
// event.Color = user.Color
// event.ImageUrl = user.Image1
// event.IsFollower = user.IsFollower
// if message.RaidNotification != nil {
// event.Raid = true
// }
// if message.Rant != nil {
// event.Rant = message.Rant.PriceCents
// }
// if message.Notification != nil {
// if message.Notification.Badge == ChatBadgeRecurringSubscription {
// event.Sub = true
// }
// }
// event.Text = message.Text
// t, err := time.Parse(time.RFC3339, message.Time)
// if err != nil {
// return nil, fmt.Errorf("error parsing message time: %v", err)
// }
// event.Time = t
// event.Type = rce.Type
// event.Username = user.Username
// if message.ChannelID != nil {
// cid := strconv.Itoa(int(*message.ChannelID))
// channel, exists := channels[cid]
// if !exists {
// return nil, fmt.Errorf("missing channel information for channel ID: %s", cid)
// }
// event.ImageUrl = channel.Image1
// event.ChannelName = channel.Username
// }
// events = append(events, event)
// }
// return events, nil
// }
// type ChatEvent struct {
// Badges []string
// ChannelName string
// Color string
// ImageUrl string
// Init bool
// IsFollower bool
// Raid bool
// Rant int
// Sub bool
// Text string
// Time time.Time
// Type string
// Username string
// }
func (cl *chatClient) start(ctx context.Context, eventCh chan *sse.Event) {
for {
select {
case <-ctx.Done():
cl.sseCancel()
return
case event := <-eventCh:
if event == nil {
// handleError(fmt.Errorf("received nil event"))
} else {
cl.handleEvent(event)
// chats, err := parseEvent(event.Data)
// if err != nil {
// handleError(err)
// } else {
// for _, chat := range chats {
// handle(chat)
// }
// }
}
}
}
}
func (cl *chatClient) handleEvent(e *sse.Event) {
var base ChatBase
err := json.Unmarshal(e, &base)
if err != nil {
// send error
return
}
}
// func parseEvent(event []byte) ([]ChatEvent, error) {
// var rce RawChatEvent
// err := json.Unmarshal(event, &rce)
// if err != nil {
// var rcenc RawChatEventNoChannels
// errnc := json.Unmarshal(event, &rcenc)
// if errnc != nil {
// var rcea RawChatEventAny
// errany := json.Unmarshal(event, &rcea)
// if errany != nil {
// return nil, fmt.Errorf("error un-marshaling event: %v", err)
// }
// // ChatEvent type not supported, return empty events
// return []ChatEvent{}, nil
// }
// rce.Data.Messages = rcenc.Data.Messages
// rce.Data.Users = rcenc.Data.Users
// rce.RequestID = rcenc.RequestID
// rce.Type = rcenc.Type
// }
// events, err := rce.parse()
// if err != nil {
// return nil, fmt.Errorf("error parsing raw chat event: %v", err)
// }
// return events, nil
// }

View file

@ -23,7 +23,7 @@ const (
) )
type Client struct { type Client struct {
httpClient *http.Client httpCl *http.Client
} }
type NewClientOptions struct { type NewClientOptions struct {
@ -36,7 +36,7 @@ func NewClient(opts NewClientOptions) (*Client, error) {
return nil, pkgErr("error creating new http client: %v", err) return nil, pkgErr("error creating new http client: %v", err)
} }
return &Client{httpClient: cl}, nil return &Client{httpCl: cl}, nil
} }
func newHttpClient(cookies []*http.Cookie) (*http.Client, error) { func newHttpClient(cookies []*http.Cookie) (*http.Client, error) {
@ -55,7 +55,7 @@ func newHttpClient(cookies []*http.Cookie) (*http.Client, error) {
} }
func (c *Client) Login(username string, password string) ([]*http.Cookie, error) { func (c *Client) Login(username string, password string) ([]*http.Cookie, error) {
if c.httpClient == nil { if c.httpCl == nil {
return nil, pkgErr("", fmt.Errorf("http client is nil")) return nil, pkgErr("", fmt.Errorf("http client is nil"))
} }
@ -86,7 +86,7 @@ func (c *Client) getSalts(username string) ([]string, error) {
q.Add("username", username) q.Add("username", username)
body := q.Encode() body := q.Encode()
resp, err := c.httpClient.Post(urlServiceUserGetSalts, "application/x-www-form-urlencoded", strings.NewReader(body)) resp, err := c.httpCl.Post(urlServiceUserGetSalts, "application/x-www-form-urlencoded", strings.NewReader(body))
if err != nil { if err != nil {
return nil, fmt.Errorf("http post request returned error: %v", err) return nil, fmt.Errorf("http post request returned error: %v", err)
} }
@ -120,7 +120,7 @@ func (c *Client) login(username string, password string, salts []string) ([]*htt
q.Add("username", username) q.Add("username", username)
q.Add("password_hashes", hashes) q.Add("password_hashes", hashes)
body := q.Encode() body := q.Encode()
resp, err := c.httpClient.Post(urlServiceUserLogin, "application/x-www-form-urlencoded", strings.NewReader(body)) resp, err := c.httpCl.Post(urlServiceUserLogin, "application/x-www-form-urlencoded", strings.NewReader(body))
if err != nil { if err != nil {
return nil, fmt.Errorf("http post request returned error: %v", err) return nil, fmt.Errorf("http post request returned error: %v", err)
} }
@ -207,7 +207,7 @@ func loginSession(body []byte) (string, error) {
} }
func (c *Client) Logout() error { func (c *Client) Logout() error {
if c.httpClient == nil { if c.httpCl == nil {
return pkgErr("", fmt.Errorf("http client is nil")) return pkgErr("", fmt.Errorf("http client is nil"))
} }
@ -220,7 +220,7 @@ func (c *Client) Logout() error {
} }
func (c *Client) logout() error { func (c *Client) logout() error {
resp, err := c.httpClient.Get(urlServiceUserLogout) resp, err := c.httpCl.Get(urlServiceUserLogout)
if err != nil { if err != nil {
return fmt.Errorf("http get request returned error: %v", err) return fmt.Errorf("http get request returned error: %v", err)
} }
@ -241,7 +241,7 @@ type LoggedInResponse struct {
} }
func (c *Client) LoggedIn() (bool, error) { func (c *Client) LoggedIn() (bool, error) {
resp, err := c.httpClient.Get(urlServiceUserLogin) resp, err := c.httpCl.Get(urlServiceUserLogin)
if err != nil { if err != nil {
return false, pkgErr("http get request returned error", err) return false, pkgErr("http get request returned error", err)
} }
@ -263,3 +263,7 @@ func (c *Client) LoggedIn() (bool, error) {
return lir.User.LoggedIn, nil return lir.User.LoggedIn, nil
} }
func (c *Client) NewLiveStream(url string) *LiveStream {
return &LiveStream{httpCl: c.httpCl, Url: url}
}

8
go.mod
View file

@ -3,7 +3,13 @@ module github.com/tylertravisty/rumble-lib-go
go 1.22.2 go 1.22.2
require ( require (
github.com/robertkrimen/otto v0.4.0 // indirect github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/robertkrimen/otto v0.4.0
)
require (
golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect
golang.org/x/text v0.4.0 // indirect golang.org/x/text v0.4.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect
) )

22
go.sum
View file

@ -1,6 +1,28 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/robertkrimen/otto v0.4.0 h1:/c0GRrK1XDPcgIasAsnlpBT5DelIeB9U/Z/JCQsgr7E= github.com/robertkrimen/otto v0.4.0 h1:/c0GRrK1XDPcgIasAsnlpBT5DelIeB9U/Z/JCQsgr7E=
github.com/robertkrimen/otto v0.4.0/go.mod h1:uW9yN1CYflmUQYvAMS0m+ZiNo3dMzRUDQJX0jWbzgxw= github.com/robertkrimen/otto v0.4.0/go.mod h1:uW9yN1CYflmUQYvAMS0m+ZiNo3dMzRUDQJX0jWbzgxw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI=
gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

185
livestream.go Normal file
View file

@ -0,0 +1,185 @@
package rumble
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/r3labs/sse"
"golang.org/x/net/html"
"gopkg.in/cenkalti/backoff.v1"
)
type LiveStream struct {
ChannelID int
chatCl *chatClient
ChatApiPrefix string
httpCl *http.Client
Page string
ServiceApiPrefix string
Url string
VideoID string
}
func (ls *LiveStream) ChatMessageUrl() string {
return fmt.Sprintf("%s/chat/%s/message", ls.ChatApiPrefix, ls.VideoID)
}
func (ls *LiveStream) ChatStreamUrl() string {
return fmt.Sprintf("%s/chat/%s/stream", ls.ChatApiPrefix, ls.VideoID)
}
func (ls *LiveStream) Open() error {
if ls.Url == "" {
return pkgErr("", fmt.Errorf("live stream url is empty"))
}
err := ls.open()
if err != nil {
return pkgErr("error opening live stream", err)
}
return nil
}
func (ls *LiveStream) open() error {
if ls.Url == "" {
return fmt.Errorf("live stream url is empty")
}
resp, err := ls.httpCl.Get(ls.Url)
if err != nil {
return fmt.Errorf("http get request returned error: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http get response status not %s: %s", http.StatusText(http.StatusOK), resp.Status)
}
err = ls.parseWebpage(resp.Body)
if err != nil {
return fmt.Errorf("error parsing live stream webpage: %v", err)
}
return nil
}
func (ls *LiveStream) parseWebpage(body io.ReadCloser) error {
rumbleChatFound := false
r := bufio.NewReader(body)
lineS, err := r.ReadString('\n')
for err == nil {
if strings.Contains(lineS, "RumbleChat(") {
start := strings.Index(lineS, "RumbleChat(") + len("RumbleChat(")
if start == -1 {
return fmt.Errorf("error finding chat function in webpage")
}
end := strings.Index(lineS[start:], ");")
if end == -1 {
return fmt.Errorf("error finding end of chat function in webpage")
}
args := parseRumbleChatArgs(lineS[start : start+end])
channelID, err := strconv.Atoi(args[6])
if err != nil {
return fmt.Errorf("error converting channel ID argument string to int: %v", err)
}
rumbleChatFound = true
ls.ChannelID = channelID
ls.ChatApiPrefix = args[0]
ls.ServiceApiPrefix = args[1]
ls.VideoID = args[2]
} else if strings.Contains(lineS, "media-by--a") && strings.Contains(lineS, "author") {
r := strings.NewReader(lineS)
node, err := html.Parse(r)
if err != nil {
return fmt.Errorf("error parsing html tag with page name: %v", err)
}
if node.FirstChild != nil && node.FirstChild.LastChild != nil && node.FirstChild.LastChild.FirstChild != nil {
for _, attr := range node.FirstChild.LastChild.FirstChild.Attr {
if attr.Key == "href" {
ls.Page = attr.Val
}
}
}
}
lineS, err = r.ReadString('\n')
}
if err != io.EOF {
return fmt.Errorf("error reading line from webpage: %v", err)
}
if !rumbleChatFound {
return fmt.Errorf("did not find RumbleChat function call")
}
return nil
}
func parseRumbleChatArgs(argsS string) []string {
open := 0
args := []string{}
arg := []rune{}
for _, c := range argsS {
if c == ',' && open == 0 {
args = append(args, trimRumbleChatArg(string(arg)))
arg = []rune{}
} else {
if c == '[' {
open = open + 1
}
if c == ']' {
open = open - 1
}
arg = append(arg, c)
}
}
if len(arg) > 0 {
args = append(args, trimRumbleChatArg(string(arg)))
}
return args
}
func trimRumbleChatArg(arg string) string {
return strings.Trim(strings.TrimSpace(arg), "\"")
}
func (ls *LiveStream) StartChat(handle func(ce ChatEvent), handleError func(err error)) error {
// Validate all of the necessary live stream fields
sseEvent := make(chan *sse.Event)
sseCl := sse.NewClient(ls.ChatStreamUrl())
sseCl.Connection = ls.httpCl
sseCtx, sseCancel := context.WithTimeout(context.Background(), 5*time.Second)
sseCl.ReconnectStrategy = backoff.WithContext(
backoff.NewExponentialBackOff(),
sseCtx,
)
err := sseCl.SubscribeChan("", sseEvent)
if err != nil {
sseCancel()
return pkgErr(fmt.Sprintf("error subscribing to chat stream %s", ls.ChatStreamUrl()), err)
}
chatCtx, chatCancel := context.WithCancel(context.Background())
ls.chatCl = &chatClient{cancel: chatCancel, sseCl: sseCl, sseCancel: sseCancel, sseEvent: sseEvent}
go ls.chatCl.start(chatCtx, sseEvent, handle, handleError)
return nil
}
func (ls *LiveStream) Close() error {
// TODO: figure out how to remove livestream instance from rumble.Client livestreams array
return nil
}