rumble-lib-go/livestream.go
2024-06-14 13:13:49 -04:00

186 lines
4.5 KiB
Go

package rumble
import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
"github.com/r3labs/sse"
"golang.org/x/net/html"
"gopkg.in/cenkalti/backoff.v1"
)
type LiveStream struct {
ChannelID int
chatCl *chatClient
ChatApiPrefix string
httpCl *http.Client
Page string
ServiceApiPrefix string
Url string
VideoID string
}
func (ls *LiveStream) ChatMessageUrl() string {
return fmt.Sprintf("%s/chat/%s/message", ls.ChatApiPrefix, ls.VideoID)
}
func (ls *LiveStream) ChatStreamUrl() string {
return fmt.Sprintf("%s/chat/%s/stream", ls.ChatApiPrefix, ls.VideoID)
}
func (ls *LiveStream) Open() error {
if ls.Url == "" {
return pkgErr("", fmt.Errorf("live stream url is empty"))
}
err := ls.open()
if err != nil {
return pkgErr("error opening live stream", err)
}
return nil
}
func (ls *LiveStream) open() error {
if ls.Url == "" {
return fmt.Errorf("live stream url is empty")
}
resp, err := ls.httpCl.Get(ls.Url)
if err != nil {
return fmt.Errorf("http get request returned error: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("http get response status not %s: %s", http.StatusText(http.StatusOK), resp.Status)
}
err = ls.parseWebpage(resp.Body)
if err != nil {
return fmt.Errorf("error parsing live stream webpage: %v", err)
}
return nil
}
func (ls *LiveStream) parseWebpage(body io.ReadCloser) error {
rumbleChatFound := false
r := bufio.NewReader(body)
lineS, err := r.ReadString('\n')
for err == nil {
if strings.Contains(lineS, "RumbleChat(") {
start := strings.Index(lineS, "RumbleChat(") + len("RumbleChat(")
if start == -1 {
return fmt.Errorf("error finding chat function in webpage")
}
end := strings.Index(lineS[start:], ");")
if end == -1 {
return fmt.Errorf("error finding end of chat function in webpage")
}
args := parseRumbleChatArgs(lineS[start : start+end])
channelID, err := strconv.Atoi(args[6])
if err != nil {
return fmt.Errorf("error converting channel ID argument string to int: %v", err)
}
rumbleChatFound = true
ls.ChannelID = channelID
ls.ChatApiPrefix = args[0]
ls.ServiceApiPrefix = args[1]
ls.VideoID = args[2]
} else if strings.Contains(lineS, "media-by--a") && strings.Contains(lineS, "author") {
r := strings.NewReader(lineS)
node, err := html.Parse(r)
if err != nil {
return fmt.Errorf("error parsing html tag with page name: %v", err)
}
if node.FirstChild != nil && node.FirstChild.LastChild != nil && node.FirstChild.LastChild.FirstChild != nil {
for _, attr := range node.FirstChild.LastChild.FirstChild.Attr {
if attr.Key == "href" {
ls.Page = attr.Val
}
}
}
}
lineS, err = r.ReadString('\n')
}
if err != io.EOF {
return fmt.Errorf("error reading line from webpage: %v", err)
}
if !rumbleChatFound {
return fmt.Errorf("did not find RumbleChat function call")
}
return nil
}
func parseRumbleChatArgs(argsS string) []string {
open := 0
args := []string{}
arg := []rune{}
for _, c := range argsS {
if c == ',' && open == 0 {
args = append(args, trimRumbleChatArg(string(arg)))
arg = []rune{}
} else {
if c == '[' {
open = open + 1
}
if c == ']' {
open = open - 1
}
arg = append(arg, c)
}
}
if len(arg) > 0 {
args = append(args, trimRumbleChatArg(string(arg)))
}
return args
}
func trimRumbleChatArg(arg string) string {
return strings.Trim(strings.TrimSpace(arg), "\"")
}
func (ls *LiveStream) StartChat(handle func(ce ChatEvent), handleError func(err error)) error {
// Validate all of the necessary live stream fields
sseEvent := make(chan *sse.Event)
sseCl := sse.NewClient(ls.ChatStreamUrl())
sseCl.Connection = ls.httpCl
sseCtx, sseCancel := context.WithTimeout(context.Background(), 5*time.Second)
sseCl.ReconnectStrategy = backoff.WithContext(
backoff.NewExponentialBackOff(),
sseCtx,
)
err := sseCl.SubscribeChan("", sseEvent)
if err != nil {
sseCancel()
return pkgErr(fmt.Sprintf("error subscribing to chat stream %s", ls.ChatStreamUrl()), err)
}
chatCtx, chatCancel := context.WithCancel(context.Background())
ls.chatCl = &chatClient{cancel: chatCancel, sseCl: sseCl, sseCancel: sseCancel, sseEvent: sseEvent}
go ls.chatCl.start(chatCtx, sseEvent, handle, handleError)
return nil
}
func (ls *LiveStream) Close() error {
// TODO: figure out how to remove livestream instance from rumble.Client livestreams array
return nil
}