diff --git a/.gitignore b/.gitignore index 3b735ec..291e97b 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ # Go workspace file go.work + +.prettierignore \ No newline at end of file diff --git a/NOTES.md b/NOTES.md new file mode 100644 index 0000000..5fd253d --- /dev/null +++ b/NOTES.md @@ -0,0 +1,2 @@ +Should connections be managed in the client? +- Keep API, Chat, etc. conns in client? diff --git a/chat.go b/chat.go new file mode 100644 index 0000000..81fc651 --- /dev/null +++ b/chat.go @@ -0,0 +1,281 @@ +package rumble + +import ( + "context" + "encoding/json" + + "github.com/r3labs/sse" +) + +const ( + ChatBadgeRecurringSubscription = "recurring_subscription" + ChatBadgeLocalsSupporter = "locals_supporter" + + ChatTypeInit = "init" + ChatTypeMessages = "messages" + ChatTypeMuteUsers = "mute_users" + ChatTypeDeleteMessages = "delete_messages" + ChatTypeSubscriber = "locals_supporter" + ChatTypeRaiding = "raid_confirmed" + ChatTypePinMessage = "pin_message" + ChatTypeUnpinMessage = "unpin_message" +) + +type ChatEventChannels struct { + ErrorCh chan error +} + +type RegisterChatEventsOptions struct { +} + +// TODO: for every true bool, create channel, otherwise nil. +// TODO: always create error channel +func RegisterChatEvents(opts RegisterChatEventsOptions) *ChatEventChannels { + +} + +type chatClient struct { + cancel context.CancelFunc + sseCancel context.CancelFunc + sseCl *sse.Client + sseEvent chan *sse.Event +} + +type RawChatEventChannel struct { + ID string `json:"id"` + Image1 string `json:"image.1"` + Link string `json:"link"` + Username string `json:"username"` +} + +type RawChatEventBlockData struct { + Text string `json:"text"` +} + +type RawChatEventBlock struct { + Data RawChatEventBlockData `json:"data"` + Type string `json:"type"` +} + +type RawChatEventNotification struct { + Badge string `json:"badge"` + Text string `json:"text"` +} + +type RawChatEventRaidNotification struct { + StartTs int64 `json:"start_ts"` +} + +type RawChatEventRant struct { + Duration int `json:"duration"` + ExpiresOn string `json:"expires_on"` + PriceCents int `json:"price_cents"` +} + +type RawChatEventMessage struct { + Blocks []RawChatEventBlock `json:"blocks"` + ChannelID *int64 `json:"channel_id"` + ID string `json:"id"` + Notification *RawChatEventNotification `json:"notification"` + RaidNotification *RawChatEventRaidNotification `json:"raid_notification"` + Rant *RawChatEventRant `json:"rant"` + Text string `json:"text"` + Time string `json:"time"` + UserID string `json:"user_id"` +} + +type RawChatEventUser struct { + Badges []string `json:"badges"` + Color string `json:"color"` + ID string `json:"id"` + Image1 string `json:"image.1"` + IsFollower bool `json:"is_follower"` + Link string `json:"link"` + Username string `json:"username"` +} + +type RawChatEventData struct { + Channels []RawChatEventChannel `json:"channels"` + Messages []RawChatEventMessage `json:"messages"` + Users []RawChatEventUser `json:"users"` +} + +type RawChatEvent struct { + Data RawChatEventData `json:"data"` + RequestID string `json:"request_id"` + Type string `json:"type"` +} + +type RawChatEventAny struct { + Data any `json:"data"` + Type string `json:"type"` +} + +type RawChatEventDataNoChannels struct { + // Channels [][]string `json:"channels"` + Channels [][]any `json:"channels"` + Messages []RawChatEventMessage `json:"messages"` + Users []RawChatEventUser `json:"users"` +} + +type RawChatEventNoChannels struct { + Data RawChatEventDataNoChannels `json:"data"` + RequestID string `json:"request_id"` + Type string `json:"type"` +} + +type ChatBase struct { + Data any `json:"data"` + Type string `json:"type"` +} + +func (rce RawChatEvent) users() map[string]RawChatEventUser { + uMap := map[string]RawChatEventUser{} + for _, user := range rce.Data.Users { + uMap[user.ID] = user + } + + return uMap +} + +func (rce RawChatEvent) channels() map[string]RawChatEventChannel { + cMap := map[string]RawChatEventChannel{} + for _, channel := range rce.Data.Channels { + cMap[channel.ID] = channel + } + + return cMap +} + +// func (rce RawChatEvent) parse() ([]ChatEvent, error) { +// events := []ChatEvent{} + +// users := rce.users() +// channels := rce.channels() + +// for _, message := range rce.Data.Messages { +// var event ChatEvent +// user, exists := users[message.UserID] +// if !exists { +// return nil, fmt.Errorf("missing user information for user ID: %s", message.UserID) +// } + +// event.Badges = user.Badges +// event.Color = user.Color +// event.ImageUrl = user.Image1 +// event.IsFollower = user.IsFollower +// if message.RaidNotification != nil { +// event.Raid = true +// } +// if message.Rant != nil { +// event.Rant = message.Rant.PriceCents +// } +// if message.Notification != nil { +// if message.Notification.Badge == ChatBadgeRecurringSubscription { +// event.Sub = true +// } +// } +// event.Text = message.Text +// t, err := time.Parse(time.RFC3339, message.Time) +// if err != nil { +// return nil, fmt.Errorf("error parsing message time: %v", err) +// } +// event.Time = t +// event.Type = rce.Type +// event.Username = user.Username + +// if message.ChannelID != nil { +// cid := strconv.Itoa(int(*message.ChannelID)) +// channel, exists := channels[cid] +// if !exists { +// return nil, fmt.Errorf("missing channel information for channel ID: %s", cid) +// } + +// event.ImageUrl = channel.Image1 +// event.ChannelName = channel.Username +// } + +// events = append(events, event) +// } + +// return events, nil +// } + +// type ChatEvent struct { +// Badges []string +// ChannelName string +// Color string +// ImageUrl string +// Init bool +// IsFollower bool +// Raid bool +// Rant int +// Sub bool +// Text string +// Time time.Time +// Type string +// Username string +// } + +func (cl *chatClient) start(ctx context.Context, eventCh chan *sse.Event) { + for { + select { + case <-ctx.Done(): + cl.sseCancel() + return + case event := <-eventCh: + if event == nil { + // handleError(fmt.Errorf("received nil event")) + } else { + cl.handleEvent(event) + // chats, err := parseEvent(event.Data) + // if err != nil { + // handleError(err) + // } else { + // for _, chat := range chats { + // handle(chat) + // } + // } + } + } + } +} + +func (cl *chatClient) handleEvent(e *sse.Event) { + var base ChatBase + err := json.Unmarshal(e, &base) + if err != nil { + // send error + return + } +} + +// func parseEvent(event []byte) ([]ChatEvent, error) { +// var rce RawChatEvent +// err := json.Unmarshal(event, &rce) +// if err != nil { +// var rcenc RawChatEventNoChannels +// errnc := json.Unmarshal(event, &rcenc) +// if errnc != nil { +// var rcea RawChatEventAny +// errany := json.Unmarshal(event, &rcea) +// if errany != nil { +// return nil, fmt.Errorf("error un-marshaling event: %v", err) +// } +// // ChatEvent type not supported, return empty events +// return []ChatEvent{}, nil +// } + +// rce.Data.Messages = rcenc.Data.Messages +// rce.Data.Users = rcenc.Data.Users +// rce.RequestID = rcenc.RequestID +// rce.Type = rcenc.Type +// } + +// events, err := rce.parse() +// if err != nil { +// return nil, fmt.Errorf("error parsing raw chat event: %v", err) +// } + +// return events, nil +// } diff --git a/client.go b/client.go index 70cf9fb..7eaca4c 100644 --- a/client.go +++ b/client.go @@ -23,7 +23,7 @@ const ( ) type Client struct { - httpClient *http.Client + httpCl *http.Client } type NewClientOptions struct { @@ -36,7 +36,7 @@ func NewClient(opts NewClientOptions) (*Client, error) { return nil, pkgErr("error creating new http client: %v", err) } - return &Client{httpClient: cl}, nil + return &Client{httpCl: cl}, nil } func newHttpClient(cookies []*http.Cookie) (*http.Client, error) { @@ -55,7 +55,7 @@ func newHttpClient(cookies []*http.Cookie) (*http.Client, error) { } func (c *Client) Login(username string, password string) ([]*http.Cookie, error) { - if c.httpClient == nil { + if c.httpCl == nil { return nil, pkgErr("", fmt.Errorf("http client is nil")) } @@ -86,7 +86,7 @@ func (c *Client) getSalts(username string) ([]string, error) { q.Add("username", username) body := q.Encode() - resp, err := c.httpClient.Post(urlServiceUserGetSalts, "application/x-www-form-urlencoded", strings.NewReader(body)) + resp, err := c.httpCl.Post(urlServiceUserGetSalts, "application/x-www-form-urlencoded", strings.NewReader(body)) if err != nil { return nil, fmt.Errorf("http post request returned error: %v", err) } @@ -120,7 +120,7 @@ func (c *Client) login(username string, password string, salts []string) ([]*htt q.Add("username", username) q.Add("password_hashes", hashes) body := q.Encode() - resp, err := c.httpClient.Post(urlServiceUserLogin, "application/x-www-form-urlencoded", strings.NewReader(body)) + resp, err := c.httpCl.Post(urlServiceUserLogin, "application/x-www-form-urlencoded", strings.NewReader(body)) if err != nil { return nil, fmt.Errorf("http post request returned error: %v", err) } @@ -207,7 +207,7 @@ func loginSession(body []byte) (string, error) { } func (c *Client) Logout() error { - if c.httpClient == nil { + if c.httpCl == nil { return pkgErr("", fmt.Errorf("http client is nil")) } @@ -220,7 +220,7 @@ func (c *Client) Logout() error { } func (c *Client) logout() error { - resp, err := c.httpClient.Get(urlServiceUserLogout) + resp, err := c.httpCl.Get(urlServiceUserLogout) if err != nil { return fmt.Errorf("http get request returned error: %v", err) } @@ -241,7 +241,7 @@ type LoggedInResponse struct { } func (c *Client) LoggedIn() (bool, error) { - resp, err := c.httpClient.Get(urlServiceUserLogin) + resp, err := c.httpCl.Get(urlServiceUserLogin) if err != nil { return false, pkgErr("http get request returned error", err) } @@ -263,3 +263,7 @@ func (c *Client) LoggedIn() (bool, error) { return lir.User.LoggedIn, nil } + +func (c *Client) NewLiveStream(url string) *LiveStream { + return &LiveStream{httpCl: c.httpCl, Url: url} +} diff --git a/go.mod b/go.mod index 1afae8a..ea38f17 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,13 @@ module github.com/tylertravisty/rumble-lib-go go 1.22.2 require ( - github.com/robertkrimen/otto v0.4.0 // indirect + github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc + github.com/robertkrimen/otto v0.4.0 +) + +require ( + golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect golang.org/x/text v0.4.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect ) diff --git a/go.sum b/go.sum index ae0405c..a6393ae 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,28 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o= +github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8= github.com/robertkrimen/otto v0.4.0 h1:/c0GRrK1XDPcgIasAsnlpBT5DelIeB9U/Z/JCQsgr7E= github.com/robertkrimen/otto v0.4.0/go.mod h1:uW9yN1CYflmUQYvAMS0m+ZiNo3dMzRUDQJX0jWbzgxw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/sourcemap.v1 v1.0.5 h1:inv58fC9f9J3TK2Y2R1NPntXEn3/wjWHkonhIUODNTI= gopkg.in/sourcemap.v1 v1.0.5/go.mod h1:2RlvNNSMglmRrcvhfuzp4hQHwOtjxlbjX7UPY/GXb78= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/livestream.go b/livestream.go new file mode 100644 index 0000000..5b46253 --- /dev/null +++ b/livestream.go @@ -0,0 +1,185 @@ +package rumble + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "time" + + "github.com/r3labs/sse" + "golang.org/x/net/html" + "gopkg.in/cenkalti/backoff.v1" +) + +type LiveStream struct { + ChannelID int + chatCl *chatClient + ChatApiPrefix string + httpCl *http.Client + Page string + ServiceApiPrefix string + Url string + VideoID string +} + +func (ls *LiveStream) ChatMessageUrl() string { + return fmt.Sprintf("%s/chat/%s/message", ls.ChatApiPrefix, ls.VideoID) +} + +func (ls *LiveStream) ChatStreamUrl() string { + return fmt.Sprintf("%s/chat/%s/stream", ls.ChatApiPrefix, ls.VideoID) +} + +func (ls *LiveStream) Open() error { + if ls.Url == "" { + return pkgErr("", fmt.Errorf("live stream url is empty")) + } + + err := ls.open() + if err != nil { + return pkgErr("error opening live stream", err) + } + + return nil +} + +func (ls *LiveStream) open() error { + if ls.Url == "" { + return fmt.Errorf("live stream url is empty") + } + + resp, err := ls.httpCl.Get(ls.Url) + if err != nil { + return fmt.Errorf("http get request returned error: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("http get response status not %s: %s", http.StatusText(http.StatusOK), resp.Status) + } + + err = ls.parseWebpage(resp.Body) + if err != nil { + return fmt.Errorf("error parsing live stream webpage: %v", err) + } + + return nil +} + +func (ls *LiveStream) parseWebpage(body io.ReadCloser) error { + rumbleChatFound := false + + r := bufio.NewReader(body) + lineS, err := r.ReadString('\n') + for err == nil { + if strings.Contains(lineS, "RumbleChat(") { + start := strings.Index(lineS, "RumbleChat(") + len("RumbleChat(") + if start == -1 { + return fmt.Errorf("error finding chat function in webpage") + } + end := strings.Index(lineS[start:], ");") + if end == -1 { + return fmt.Errorf("error finding end of chat function in webpage") + } + args := parseRumbleChatArgs(lineS[start : start+end]) + channelID, err := strconv.Atoi(args[6]) + if err != nil { + return fmt.Errorf("error converting channel ID argument string to int: %v", err) + } + rumbleChatFound = true + ls.ChannelID = channelID + ls.ChatApiPrefix = args[0] + ls.ServiceApiPrefix = args[1] + ls.VideoID = args[2] + } else if strings.Contains(lineS, "media-by--a") && strings.Contains(lineS, "author") { + r := strings.NewReader(lineS) + node, err := html.Parse(r) + if err != nil { + return fmt.Errorf("error parsing html tag with page name: %v", err) + } + if node.FirstChild != nil && node.FirstChild.LastChild != nil && node.FirstChild.LastChild.FirstChild != nil { + for _, attr := range node.FirstChild.LastChild.FirstChild.Attr { + if attr.Key == "href" { + ls.Page = attr.Val + } + } + } + } + + lineS, err = r.ReadString('\n') + } + if err != io.EOF { + return fmt.Errorf("error reading line from webpage: %v", err) + } + if !rumbleChatFound { + return fmt.Errorf("did not find RumbleChat function call") + } + + return nil +} + +func parseRumbleChatArgs(argsS string) []string { + open := 0 + + args := []string{} + arg := []rune{} + for _, c := range argsS { + if c == ',' && open == 0 { + args = append(args, trimRumbleChatArg(string(arg))) + arg = []rune{} + } else { + if c == '[' { + open = open + 1 + } + if c == ']' { + open = open - 1 + } + + arg = append(arg, c) + } + } + if len(arg) > 0 { + args = append(args, trimRumbleChatArg(string(arg))) + } + + return args +} + +func trimRumbleChatArg(arg string) string { + return strings.Trim(strings.TrimSpace(arg), "\"") +} + +func (ls *LiveStream) StartChat(handle func(ce ChatEvent), handleError func(err error)) error { + // Validate all of the necessary live stream fields + + sseEvent := make(chan *sse.Event) + + sseCl := sse.NewClient(ls.ChatStreamUrl()) + sseCl.Connection = ls.httpCl + sseCtx, sseCancel := context.WithTimeout(context.Background(), 5*time.Second) + sseCl.ReconnectStrategy = backoff.WithContext( + backoff.NewExponentialBackOff(), + sseCtx, + ) + + err := sseCl.SubscribeChan("", sseEvent) + if err != nil { + sseCancel() + return pkgErr(fmt.Sprintf("error subscribing to chat stream %s", ls.ChatStreamUrl()), err) + } + + chatCtx, chatCancel := context.WithCancel(context.Background()) + ls.chatCl = &chatClient{cancel: chatCancel, sseCl: sseCl, sseCancel: sseCancel, sseEvent: sseEvent} + go ls.chatCl.start(chatCtx, sseEvent, handle, handleError) + + return nil +} + +func (ls *LiveStream) Close() error { + // TODO: figure out how to remove livestream instance from rumble.Client livestreams array + + return nil +}