186 lines
4.5 KiB
Go
186 lines
4.5 KiB
Go
|
package rumble
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net/http"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"time"
|
||
|
|
||
|
"github.com/r3labs/sse"
|
||
|
"golang.org/x/net/html"
|
||
|
"gopkg.in/cenkalti/backoff.v1"
|
||
|
)
|
||
|
|
||
|
type LiveStream struct {
|
||
|
ChannelID int
|
||
|
chatCl *chatClient
|
||
|
ChatApiPrefix string
|
||
|
httpCl *http.Client
|
||
|
Page string
|
||
|
ServiceApiPrefix string
|
||
|
Url string
|
||
|
VideoID string
|
||
|
}
|
||
|
|
||
|
func (ls *LiveStream) ChatMessageUrl() string {
|
||
|
return fmt.Sprintf("%s/chat/%s/message", ls.ChatApiPrefix, ls.VideoID)
|
||
|
}
|
||
|
|
||
|
func (ls *LiveStream) ChatStreamUrl() string {
|
||
|
return fmt.Sprintf("%s/chat/%s/stream", ls.ChatApiPrefix, ls.VideoID)
|
||
|
}
|
||
|
|
||
|
func (ls *LiveStream) Open() error {
|
||
|
if ls.Url == "" {
|
||
|
return pkgErr("", fmt.Errorf("live stream url is empty"))
|
||
|
}
|
||
|
|
||
|
err := ls.open()
|
||
|
if err != nil {
|
||
|
return pkgErr("error opening live stream", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ls *LiveStream) open() error {
|
||
|
if ls.Url == "" {
|
||
|
return fmt.Errorf("live stream url is empty")
|
||
|
}
|
||
|
|
||
|
resp, err := ls.httpCl.Get(ls.Url)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("http get request returned error: %v", err)
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
if resp.StatusCode != http.StatusOK {
|
||
|
return fmt.Errorf("http get response status not %s: %s", http.StatusText(http.StatusOK), resp.Status)
|
||
|
}
|
||
|
|
||
|
err = ls.parseWebpage(resp.Body)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error parsing live stream webpage: %v", err)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ls *LiveStream) parseWebpage(body io.ReadCloser) error {
|
||
|
rumbleChatFound := false
|
||
|
|
||
|
r := bufio.NewReader(body)
|
||
|
lineS, err := r.ReadString('\n')
|
||
|
for err == nil {
|
||
|
if strings.Contains(lineS, "RumbleChat(") {
|
||
|
start := strings.Index(lineS, "RumbleChat(") + len("RumbleChat(")
|
||
|
if start == -1 {
|
||
|
return fmt.Errorf("error finding chat function in webpage")
|
||
|
}
|
||
|
end := strings.Index(lineS[start:], ");")
|
||
|
if end == -1 {
|
||
|
return fmt.Errorf("error finding end of chat function in webpage")
|
||
|
}
|
||
|
args := parseRumbleChatArgs(lineS[start : start+end])
|
||
|
channelID, err := strconv.Atoi(args[6])
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error converting channel ID argument string to int: %v", err)
|
||
|
}
|
||
|
rumbleChatFound = true
|
||
|
ls.ChannelID = channelID
|
||
|
ls.ChatApiPrefix = args[0]
|
||
|
ls.ServiceApiPrefix = args[1]
|
||
|
ls.VideoID = args[2]
|
||
|
} else if strings.Contains(lineS, "media-by--a") && strings.Contains(lineS, "author") {
|
||
|
r := strings.NewReader(lineS)
|
||
|
node, err := html.Parse(r)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("error parsing html tag with page name: %v", err)
|
||
|
}
|
||
|
if node.FirstChild != nil && node.FirstChild.LastChild != nil && node.FirstChild.LastChild.FirstChild != nil {
|
||
|
for _, attr := range node.FirstChild.LastChild.FirstChild.Attr {
|
||
|
if attr.Key == "href" {
|
||
|
ls.Page = attr.Val
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
lineS, err = r.ReadString('\n')
|
||
|
}
|
||
|
if err != io.EOF {
|
||
|
return fmt.Errorf("error reading line from webpage: %v", err)
|
||
|
}
|
||
|
if !rumbleChatFound {
|
||
|
return fmt.Errorf("did not find RumbleChat function call")
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func parseRumbleChatArgs(argsS string) []string {
|
||
|
open := 0
|
||
|
|
||
|
args := []string{}
|
||
|
arg := []rune{}
|
||
|
for _, c := range argsS {
|
||
|
if c == ',' && open == 0 {
|
||
|
args = append(args, trimRumbleChatArg(string(arg)))
|
||
|
arg = []rune{}
|
||
|
} else {
|
||
|
if c == '[' {
|
||
|
open = open + 1
|
||
|
}
|
||
|
if c == ']' {
|
||
|
open = open - 1
|
||
|
}
|
||
|
|
||
|
arg = append(arg, c)
|
||
|
}
|
||
|
}
|
||
|
if len(arg) > 0 {
|
||
|
args = append(args, trimRumbleChatArg(string(arg)))
|
||
|
}
|
||
|
|
||
|
return args
|
||
|
}
|
||
|
|
||
|
func trimRumbleChatArg(arg string) string {
|
||
|
return strings.Trim(strings.TrimSpace(arg), "\"")
|
||
|
}
|
||
|
|
||
|
func (ls *LiveStream) StartChat(handle func(ce ChatEvent), handleError func(err error)) error {
|
||
|
// Validate all of the necessary live stream fields
|
||
|
|
||
|
sseEvent := make(chan *sse.Event)
|
||
|
|
||
|
sseCl := sse.NewClient(ls.ChatStreamUrl())
|
||
|
sseCl.Connection = ls.httpCl
|
||
|
sseCtx, sseCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||
|
sseCl.ReconnectStrategy = backoff.WithContext(
|
||
|
backoff.NewExponentialBackOff(),
|
||
|
sseCtx,
|
||
|
)
|
||
|
|
||
|
err := sseCl.SubscribeChan("", sseEvent)
|
||
|
if err != nil {
|
||
|
sseCancel()
|
||
|
return pkgErr(fmt.Sprintf("error subscribing to chat stream %s", ls.ChatStreamUrl()), err)
|
||
|
}
|
||
|
|
||
|
chatCtx, chatCancel := context.WithCancel(context.Background())
|
||
|
ls.chatCl = &chatClient{cancel: chatCancel, sseCl: sseCl, sseCancel: sseCancel, sseEvent: sseEvent}
|
||
|
go ls.chatCl.start(chatCtx, sseEvent, handle, handleError)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (ls *LiveStream) Close() error {
|
||
|
// TODO: figure out how to remove livestream instance from rumble.Client livestreams array
|
||
|
|
||
|
return nil
|
||
|
}
|