Migrated api package into events package

This commit is contained in:
tyler 2024-04-10 15:19:00 -04:00
parent 419e67ff21
commit 79031a1978
7 changed files with 572 additions and 337 deletions

View file

@ -1,16 +1,18 @@
# Doing # Doing
Next steps:
- delete page needs to handle new architecture
- activatePage: verify defer page.activeMu.Unlock does not conflict with display function
On API errors
- include backoff multiple, if exceeded then stop API
Add option to delete API key for accounts? Add option to delete API key for accounts?
First sign in screen: use different function than Login used by dashboard page?
- Dashboard login function emits events that first sign in screen may not need
Add better styles/icon to account details menu Add better styles/icon to account details menu
Loading screen replaces signin at / Start screen:
SignIn moves to /signin - check for new updates and tell user
Check for accounts, if accounts -> dashboard, else -> signin
Also check for new updates and tell user
Trigger on event from API vs. trigger on event from chat Trigger on event from API vs. trigger on event from chat
- Chat bot trigger on follow requires API key - Chat bot trigger on follow requires API key

365
v1/app.go
View file

@ -10,8 +10,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/tylertravisty/rum-goggles/v1/internal/api"
"github.com/tylertravisty/rum-goggles/v1/internal/config" "github.com/tylertravisty/rum-goggles/v1/internal/config"
"github.com/tylertravisty/rum-goggles/v1/internal/events"
"github.com/tylertravisty/rum-goggles/v1/internal/models" "github.com/tylertravisty/rum-goggles/v1/internal/models"
rumblelivestreamlib "github.com/tylertravisty/rumble-livestream-lib-go" rumblelivestreamlib "github.com/tylertravisty/rumble-livestream-lib-go"
"github.com/wailsapp/wails/v2/pkg/runtime" "github.com/wailsapp/wails/v2/pkg/runtime"
@ -24,30 +24,53 @@ const (
ChannelType = "Channel" ChannelType = "Channel"
) )
type ApiState struct {
active bool
activeMu sync.Mutex
resp *rumblelivestreamlib.LivestreamResponse
respMu sync.Mutex
}
type Page struct {
active bool
activeMu sync.Mutex
apiSt *ApiState
displaying bool
displayingMu sync.Mutex
name string
}
// App struct // App struct
type App struct { type App struct {
api *api.Api //apiS *ApiService
clients map[string]*rumblelivestreamlib.Client cancelCtrl context.CancelFunc
clientsMu sync.Mutex clients map[string]*rumblelivestreamlib.Client
ctx context.Context clientsMu sync.Mutex
services *models.Services displaying string
logError *log.Logger displayingMu sync.Mutex
logFile *os.File logError *log.Logger
logFileMu sync.Mutex logFile *os.File
logInfo *log.Logger logFileMu sync.Mutex
logInfo *log.Logger
pages map[string]*Page
pagesMu sync.Mutex
producers *events.Producers
services *models.Services
wails context.Context
} }
// NewApp creates a new App application struct // NewApp creates a new App application struct
func NewApp() *App { func NewApp() *App {
app := &App{ app := &App{
clients: map[string]*rumblelivestreamlib.Client{}, clients: map[string]*rumblelivestreamlib.Client{},
pages: map[string]*Page{},
} }
err := app.log() err := app.log()
if err != nil { if err != nil {
log.Fatal("error initializing log: ", err) log.Fatal("error initializing log: ", err)
} }
app.api = api.NewApi(app.logError, app.logInfo) //app.apiS = NewApiService(app.logError, app.logInfo)
return app return app
} }
@ -69,19 +92,88 @@ func (a *App) log() error {
// startup is called when the app starts. The context is saved // startup is called when the app starts. The context is saved
// so we can call the runtime methods // so we can call the runtime methods
func (a *App) startup(ctx context.Context) { func (a *App) startup(wails context.Context) {
a.ctx = ctx a.wails = wails
a.api.Startup(ctx)
//a.apiS.Startup(a.apiS.ch)
// ctx, cancel := context.WithCancel(context.Background())
// a.cancelCtrl = cancel
// go a.handle(ctx)
}
func (a *App) handle(ctx context.Context) {
for {
select {
case apiE := <-a.producers.ApiP.Ch:
err := a.handleApi(apiE)
if err != nil {
a.logError.Println("error handling API event:", err)
}
case <-ctx.Done():
return
}
}
}
func (a *App) handleApi(event events.Api) error {
if event.Name == "" {
return fmt.Errorf("event name is empty")
}
a.pagesMu.Lock()
defer a.pagesMu.Unlock()
page, exists := a.pages[event.Name]
if !exists {
page = &Page{
apiSt: &ApiState{},
name: event.Name,
}
a.pages[event.Name] = page
}
page.apiSt.activeMu.Lock()
page.apiSt.active = event.Stop
page.apiSt.activeMu.Unlock()
if event.Stop {
runtime.EventsEmit(a.wails, "ApiActive-"+page.name, false)
return nil
}
runtime.EventsEmit(a.wails, "ApiActive-"+page.name, true)
page.displayingMu.Lock()
if page.displaying {
runtime.EventsEmit(a.wails, "PageActive", true)
}
page.displayingMu.Unlock()
page.apiSt.respMu.Lock()
page.apiSt.resp = event.Resp
page.apiSt.respMu.Unlock()
a.updatePage(page)
return nil
} }
func (a *App) shutdown(ctx context.Context) { func (a *App) shutdown(ctx context.Context) {
if a.api != nil { // if a.apiS != nil && a.apiS.Api != nil {
err := a.api.Shutdown() // err := a.apiS.Shutdown()
if err != nil { // if err != nil {
a.logError.Println("error shutting down api:", err) // a.logError.Println("error shutting down api:", err)
} // }
// close(a.apiS.ch)
// }
err := a.producers.Shutdown()
if err != nil {
a.logError.Println("error closing event producers:", err)
} }
a.cancelCtrl()
if a.services != nil { if a.services != nil {
err := a.services.Close() err := a.services.Close()
if err != nil { if err != nil {
@ -101,27 +193,39 @@ func (a *App) shutdown(ctx context.Context) {
func (a *App) Start() (bool, error) { func (a *App) Start() (bool, error) {
runtime.EventsEmit(a.ctx, "StartupMessage", "Initializing database...") runtime.EventsEmit(a.wails, "StartupMessage", "Initializing database...")
err := a.initServices() err := a.initServices()
if err != nil { if err != nil {
a.logError.Println("error initializing services:", err) a.logError.Println("error initializing services:", err)
return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.") return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.")
} }
runtime.EventsEmit(a.ctx, "StartupMessage", "Initializing database complete.") runtime.EventsEmit(a.wails, "StartupMessage", "Initializing database complete.")
runtime.EventsEmit(a.ctx, "StartupMessage", "Verifying account sessions...") runtime.EventsEmit(a.wails, "StartupMessage", "Verifying account sessions...")
count, err := a.verifyAccounts() count, err := a.verifyAccounts()
if err != nil { if err != nil {
a.logError.Println("error verifying accounts:", err) a.logError.Println("error verifying accounts:", err)
return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.") return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.")
} }
runtime.EventsEmit(a.ctx, "StartupMessage", "Verifying account sessions complete.") runtime.EventsEmit(a.wails, "StartupMessage", "Verifying account sessions complete.")
runtime.EventsEmit(a.wails, "StartupMessage", "Initializing event producers...")
err = a.initProducers()
if err != nil {
a.logError.Println("error initializing producers:", err)
return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.")
}
runtime.EventsEmit(a.wails, "StartupMessage", "Initializing event producers complete.")
// TODO: check for update - if available, pop up window // TODO: check for update - if available, pop up window
// runtime.EventsEmit(a.ctx, "StartupMessage", "Checking for updates...") // runtime.EventsEmit(a.ctx, "StartupMessage", "Checking for updates...")
// update, err = a.checkForUpdate() // update, err = a.checkForUpdate()
// runtime.EventsEmit(a.ctx, "StartupMessage", "Checking for updates complete.") // runtime.EventsEmit(a.ctx, "StartupMessage", "Checking for updates complete.")
ctx, cancel := context.WithCancel(context.Background())
a.cancelCtrl = cancel
go a.handle(ctx)
signin := true signin := true
if count > 0 { if count > 0 {
signin = false signin = false
@ -130,6 +234,25 @@ func (a *App) Start() (bool, error) {
return signin, nil return signin, nil
} }
func (a *App) initProducers() error {
producers, err := events.NewProducers(
events.WithLoggers(a.logError, a.logInfo),
events.WithApiProducer(),
)
if err != nil {
return fmt.Errorf("error initializing producers: %v", err)
}
err = producers.Startup()
if err != nil {
return fmt.Errorf("error starting producers: %v", err)
}
a.producers = producers
return nil
}
func (a *App) initServices() error { func (a *App) initServices() error {
db, err := config.Database() db, err := config.Database()
if err != nil { if err != nil {
@ -329,14 +452,14 @@ func (a *App) Login(username string, password string) error {
a.logError.Println("account name is nil") a.logError.Println("account name is nil")
return fmt.Errorf("Error logging in. Try again.") return fmt.Errorf("Error logging in. Try again.")
} }
runtime.EventsEmit(a.ctx, "LoggedIn-"+*name, true) runtime.EventsEmit(a.wails, "LoggedIn-"+*name, true)
list, err := a.accountList() list, err := a.accountList()
if err != nil { if err != nil {
a.logError.Println("error getting account list:", err) a.logError.Println("error getting account list:", err)
return fmt.Errorf("Error logging in. Try again.") return fmt.Errorf("Error logging in. Try again.")
} }
runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) runtime.EventsEmit(a.wails, "PageSideBarAccounts", list)
err = a.openDetails(acct) err = a.openDetails(acct)
if err != nil { if err != nil {
@ -402,14 +525,14 @@ func (a *App) Logout(id int64) error {
a.logError.Println("account name is nil") a.logError.Println("account name is nil")
return fmt.Errorf("Error logging out. Try again.") return fmt.Errorf("Error logging out. Try again.")
} }
runtime.EventsEmit(a.ctx, "LoggedIn-"+*name, false) runtime.EventsEmit(a.wails, "LoggedIn-"+*name, false)
list, err := a.accountList() list, err := a.accountList()
if err != nil { if err != nil {
a.logError.Println("error getting account list:", err) a.logError.Println("error getting account list:", err)
return fmt.Errorf("Error logging out. Try again.") return fmt.Errorf("Error logging out. Try again.")
} }
runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) runtime.EventsEmit(a.wails, "PageSideBarAccounts", list)
err = a.openDetails(acct) err = a.openDetails(acct)
if err != nil { if err != nil {
@ -510,7 +633,7 @@ func (a *App) OpenChannel(id int64) error {
return nil return nil
} }
type Page interface { type PageInfo interface {
Id() *int64 Id() *int64
KeyUrl() *string KeyUrl() *string
LoggedIn() bool LoggedIn() bool
@ -527,39 +650,39 @@ type PageDetails struct {
Type string `json:"type"` Type string `json:"type"`
} }
func (a *App) openDetails(p Page) error { func (a *App) openDetails(pi PageInfo) error {
id := p.Id() id := pi.Id()
if id == nil { if id == nil {
return fmt.Errorf("page id is nil") return fmt.Errorf("page id is nil")
} }
hasApi := true hasApi := true
key := p.KeyUrl() key := pi.KeyUrl()
if key == nil || *key == "" { if key == nil || *key == "" {
hasApi = false hasApi = false
} }
name := p.String() name := pi.String()
if name == nil { if name == nil {
return fmt.Errorf("page name is nil") return fmt.Errorf("page name is nil")
} }
title := p.Title() title := pi.Title()
if title == nil { if title == nil {
return fmt.Errorf("page title is nil") return fmt.Errorf("page title is nil")
} }
runtime.EventsEmit(a.ctx, "PageDetails", PageDetails{ runtime.EventsEmit(a.wails, "PageDetails", PageDetails{
ID: *id, ID: *id,
HasApi: hasApi, HasApi: hasApi,
LoggedIn: p.LoggedIn(), LoggedIn: pi.LoggedIn(),
Title: *title, Title: *title,
Type: p.Type(), Type: pi.Type(),
}) })
err := a.api.Display(*name) err := a.display(*name)
if err != nil { if err != nil {
return fmt.Errorf("error displaying api for %s: %v", *name, err) return fmt.Errorf("error displaying page: %v", err)
} }
return nil return nil
@ -606,32 +729,60 @@ func (a *App) ActivateChannel(id int64) error {
// If page is inactivate, activate. // If page is inactivate, activate.
// If page is active, deactivate. // If page is active, deactivate.
func (a *App) activatePage(p Page) error { func (a *App) activatePage(pi PageInfo) error {
name := p.String() name := pi.String()
if name == nil { if name == nil {
return fmt.Errorf("page name is nil") return fmt.Errorf("page name is nil")
} }
url := p.KeyUrl() url := pi.KeyUrl()
if url == nil { if url == nil {
return fmt.Errorf("page key url is nil") return fmt.Errorf("page key url is nil")
} }
if a.api.Active(*name) { a.pagesMu.Lock()
err := a.api.Stop(*name) page, exists := a.pages[*name]
if err != nil { if !exists {
return fmt.Errorf("error stopping api: %v", err) page = &Page{
active: false,
apiSt: &ApiState{},
name: *name,
} }
a.pages[*name] = page
}
a.pagesMu.Unlock()
page.activeMu.Lock()
defer page.activeMu.Unlock()
if page.active {
if a.producers.ApiP.Active(*name) {
err := a.producers.ApiP.Stop(*name)
if err != nil {
return fmt.Errorf("error stopping api: %v", err)
}
}
page.displayingMu.Lock()
if page.displaying {
runtime.EventsEmit(a.wails, "PageActive", false)
}
page.displayingMu.Unlock()
page.active = false
return nil return nil
} }
page.active = true
err := a.api.Start(*name, *url, 10*time.Second) err := a.producers.ApiP.Start(*name, *url, 10*time.Second)
if err != nil { if err != nil {
return fmt.Errorf("error starting api: %v", err) return fmt.Errorf("error starting api: %v", err)
} }
err = a.api.Display(*name) runtime.EventsEmit(a.wails, "ApiActive-"+*name, true)
err = a.display(*name)
if err != nil { if err != nil {
return fmt.Errorf("error displaying api: %v", err) return fmt.Errorf("error displaying page: %v", err)
} }
runtime.EventsEmit(a.wails, "PageActive", true)
return nil return nil
} }
@ -661,8 +812,8 @@ func (a *App) DeleteAccount(id int64) error {
return fmt.Errorf("Error deleting account. Try again.") return fmt.Errorf("Error deleting account. Try again.")
} }
if a.api.Active(*name) { if a.producers.ApiP.Active(*name) {
err := a.api.Stop(*name) err := a.producers.ApiP.Stop(*name)
if err != nil { if err != nil {
a.logError.Println("error stopping api:", err) a.logError.Println("error stopping api:", err)
return fmt.Errorf("Error deleting account. Try again.") return fmt.Errorf("Error deleting account. Try again.")
@ -675,7 +826,7 @@ func (a *App) DeleteAccount(id int64) error {
return fmt.Errorf("Error deleting account. Try again.") return fmt.Errorf("Error deleting account. Try again.")
} }
runtime.EventsEmit(a.ctx, "PageDetails", nil) runtime.EventsEmit(a.wails, "PageDetails", nil)
list, err := a.accountList() list, err := a.accountList()
if err != nil { if err != nil {
@ -683,7 +834,7 @@ func (a *App) DeleteAccount(id int64) error {
return fmt.Errorf("Error deleting account. Try again.") return fmt.Errorf("Error deleting account. Try again.")
} }
runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) runtime.EventsEmit(a.wails, "PageSideBarAccounts", list)
return nil return nil
} }
@ -704,8 +855,8 @@ func (a *App) DeleteChannel(id int64) error {
return fmt.Errorf("Error deleting channel. Try again.") return fmt.Errorf("Error deleting channel. Try again.")
} }
if a.api.Active(*name) { if a.producers.ApiP.Active(*name) {
err := a.api.Stop(*name) err := a.producers.ApiP.Stop(*name)
if err != nil { if err != nil {
a.logError.Println("error stopping api:", err) a.logError.Println("error stopping api:", err)
return fmt.Errorf("Error deleting channel. Try again.") return fmt.Errorf("Error deleting channel. Try again.")
@ -718,7 +869,7 @@ func (a *App) DeleteChannel(id int64) error {
return fmt.Errorf("Error deleting channel. Try again.") return fmt.Errorf("Error deleting channel. Try again.")
} }
runtime.EventsEmit(a.ctx, "PageDetails", nil) runtime.EventsEmit(a.wails, "PageDetails", nil)
list, err := a.accountList() list, err := a.accountList()
if err != nil { if err != nil {
@ -726,7 +877,79 @@ func (a *App) DeleteChannel(id int64) error {
return fmt.Errorf("Error deleting channel. Try again.") return fmt.Errorf("Error deleting channel. Try again.")
} }
runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) runtime.EventsEmit(a.wails, "PageSideBarAccounts", list)
return nil
}
func (a *App) display(name string) error {
a.displayingMu.Lock()
defer a.displayingMu.Unlock()
if name == a.displaying {
return nil
}
a.pagesMu.Lock()
defer a.pagesMu.Unlock()
if a.displaying != "" {
displaying, exists := a.pages[a.displaying]
a.displaying = ""
if !exists {
return fmt.Errorf("displaying page does not exist: %s", a.display)
}
displaying.displayingMu.Lock()
displaying.displaying = false
displaying.displayingMu.Unlock()
}
page, exists := a.pages[name]
if !exists {
page = &Page{
active: false,
apiSt: &ApiState{},
name: name,
}
a.pages[name] = page
}
page.displayingMu.Lock()
page.displaying = true
page.displayingMu.Unlock()
a.displaying = name
err := a.updatePage(page)
if err != nil {
return fmt.Errorf("error updating page: %v", err)
}
return nil
}
func (a *App) updatePage(p *Page) error {
if p == nil {
return fmt.Errorf("page is nil")
}
//TODO check p.api == nil
p.apiSt.respMu.Lock()
defer p.apiSt.respMu.Unlock()
p.displayingMu.Lock()
if p.displaying {
runtime.EventsEmit(a.wails, "PageActivity", p.apiSt.resp)
p.activeMu.Lock()
runtime.EventsEmit(a.wails, "PageActive", p.active)
p.activeMu.Unlock()
}
p.displayingMu.Unlock()
if p.apiSt.resp != nil {
isLive := len(p.apiSt.resp.Livestreams) > 0
runtime.EventsEmit(a.wails, "PageLive-"+p.name, isLive)
}
return nil return nil
} }
@ -735,14 +958,24 @@ func (a *App) PageStatus(name string) {
active := false active := false
isLive := false isLive := false
resp := a.api.Response(name) // resp := a.api.Response(name)
if resp != nil { a.pagesMu.Lock()
active = true defer a.pagesMu.Unlock()
isLive = len(resp.Livestreams) > 0 page, exists := a.pages[name]
if exists && page.apiSt != nil {
page.apiSt.activeMu.Lock()
active = page.apiSt.active
page.apiSt.activeMu.Unlock()
page.apiSt.respMu.Lock()
if page.apiSt.resp != nil {
isLive = len(page.apiSt.resp.Livestreams) > 0
}
page.apiSt.respMu.Unlock()
} }
runtime.EventsEmit(a.ctx, "ApiActive-"+name, active) runtime.EventsEmit(a.wails, "ApiActive-"+name, active)
runtime.EventsEmit(a.ctx, "PageLive-"+name, isLive) runtime.EventsEmit(a.wails, "PageLive-"+name, isLive)
} }
func (a *App) UpdateAccountApi(id int64, apiKey string) error { func (a *App) UpdateAccountApi(id int64, apiKey string) error {
@ -761,8 +994,8 @@ func (a *App) UpdateAccountApi(id int64, apiKey string) error {
return fmt.Errorf("Error updating account. Try again.") return fmt.Errorf("Error updating account. Try again.")
} }
if a.api.Active(*name) { if a.producers.ApiP.Active(*name) {
err := a.api.Stop(*name) err := a.producers.ApiP.Stop(*name)
if err != nil { if err != nil {
a.logError.Println("error stopping api:", err) a.logError.Println("error stopping api:", err)
return fmt.Errorf("Error updating account. Try again.") return fmt.Errorf("Error updating account. Try again.")
@ -806,8 +1039,8 @@ func (a *App) UpdateChannelApi(id int64, apiKey string) error {
return fmt.Errorf("Error updating channel. Try again.") return fmt.Errorf("Error updating channel. Try again.")
} }
if a.api.Active(*name) { if a.producers.ApiP.Active(*name) {
err := a.api.Stop(*name) err := a.producers.ApiP.Stop(*name)
if err != nil { if err != nil {
a.logError.Println("error stopping api:", err) a.logError.Println("error stopping api:", err)
return fmt.Errorf("Error updating channel. Try again.") return fmt.Errorf("Error updating channel. Try again.")

View file

@ -96,11 +96,13 @@ function PageDetails(props) {
EventsOn('PageActivity', (event) => { EventsOn('PageActivity', (event) => {
setActivity(event); setActivity(event);
setActive(true); if (event !== null) {
if (event.livestreams.length > 0) { setActive(true);
setLive(true); if (event.livestreams.length > 0) {
} else { setLive(true);
setLive(false); } else {
setLive(false);
}
} }
}); });

View file

@ -1,257 +0,0 @@
package api
import (
"context"
"fmt"
"log"
"sync"
"time"
rumblelivestreamlib "github.com/tylertravisty/rumble-livestream-lib-go"
"github.com/wailsapp/wails/v2/pkg/runtime"
)
type Api struct {
callers map[string]*caller
callersMu sync.Mutex
ctx context.Context
display string
displayMu sync.Mutex
logError *log.Logger
logInfo *log.Logger
}
type caller struct {
cancel context.CancelFunc
cancelMu sync.Mutex
display bool
displayMu sync.Mutex
interval time.Duration
name string
response *rumblelivestreamlib.LivestreamResponse
responseMu sync.Mutex
url string
}
type event struct {
close bool
err error
name string
resp *rumblelivestreamlib.LivestreamResponse
}
func NewApi(logError *log.Logger, logInfo *log.Logger) *Api {
return &Api{logError: logError, logInfo: logInfo}
}
func (a *Api) Response(name string) *rumblelivestreamlib.LivestreamResponse {
a.callersMu.Lock()
defer a.callersMu.Unlock()
caller, exists := a.callers[name]
if !exists {
return nil
}
caller.responseMu.Lock()
defer caller.responseMu.Unlock()
copy := *caller.response
return &copy
}
func (a *Api) Display(name string) error {
a.displayMu.Lock()
defer a.displayMu.Unlock()
if name == a.display {
return nil
}
a.callersMu.Lock()
defer a.callersMu.Unlock()
if a.display != "" {
displaying, exists := a.callers[a.display]
if !exists {
return pkgErr("", fmt.Errorf("displaying caller does not exist: %s", a.display))
}
displaying.displayMu.Lock()
displaying.display = false
displaying.displayMu.Unlock()
a.display = ""
}
caller, exists := a.callers[name]
if !exists {
// return pkgErr("", fmt.Errorf("caller does not exist: %s", name))
runtime.EventsEmit(a.ctx, "PageActive", false)
return nil
}
caller.displayMu.Lock()
caller.display = true
caller.displayMu.Unlock()
a.display = name
a.handleResponse(caller)
return nil
}
func (a *Api) Startup(ctx context.Context) {
a.ctx = ctx
a.callers = map[string]*caller{}
}
func (a *Api) Shutdown() error {
for _, caller := range a.callers {
caller.cancelMu.Lock()
if caller.cancel != nil {
caller.cancel()
}
caller.cancelMu.Unlock()
}
return nil
}
func (a *Api) Start(name string, url string, interval time.Duration) error {
if name == "" {
return fmt.Errorf("name is empty")
}
if url == "" {
return fmt.Errorf("url is empty")
}
a.callersMu.Lock()
defer a.callersMu.Unlock()
if _, active := a.callers[name]; active {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
caller := &caller{
cancel: cancel,
interval: interval,
name: name,
url: url,
}
a.callers[name] = caller
go a.run(ctx, caller)
return nil
}
func (a *Api) run(ctx context.Context, caller *caller) {
client := &rumblelivestreamlib.Client{StreamKey: caller.url}
for {
runtime.EventsEmit(a.ctx, "ApiActive-"+caller.name, true)
caller.displayMu.Lock()
if caller.display {
runtime.EventsEmit(a.ctx, "PageActive", true)
}
caller.displayMu.Unlock()
resp, err := a.query(client)
if err != nil {
a.logError.Println(pkgErr("error querying api", err))
// runtime.EventsEmit(a.ctx, "ApiActive-"+caller.name, false)
a.stop(caller)
return
}
caller.responseMu.Lock()
caller.response = resp
caller.responseMu.Unlock()
a.handleResponse(caller)
timer := time.NewTimer(caller.interval)
select {
case <-ctx.Done():
timer.Stop()
// runtime.EventsEmit(a.ctx, "ApiActive-"+caller.name, false)
a.stop(caller)
return
case <-timer.C:
}
}
}
func (a *Api) handleResponse(c *caller) {
if c == nil {
return
}
c.responseMu.Lock()
defer c.responseMu.Unlock()
if c.response == nil {
return
}
c.displayMu.Lock()
if c.display {
runtime.EventsEmit(a.ctx, "PageActivity", c.response)
}
c.displayMu.Unlock()
isLive := len(c.response.Livestreams) > 0
runtime.EventsEmit(a.ctx, "PageLive-"+c.name, isLive)
}
func (a *Api) stop(c *caller) {
if c == nil {
return
}
runtime.EventsEmit(a.ctx, "ApiActive-"+c.name, false)
c.displayMu.Lock()
if c.display {
c.display = false
runtime.EventsEmit(a.ctx, "PageActive", false)
}
c.displayMu.Unlock()
a.displayMu.Lock()
if a.display == c.name {
a.display = ""
}
a.displayMu.Unlock()
a.callersMu.Lock()
delete(a.callers, c.name)
a.callersMu.Unlock()
return
}
func (a *Api) Active(name string) bool {
a.callersMu.Lock()
defer a.callersMu.Unlock()
_, active := a.callers[name]
return active
}
func (a *Api) Stop(name string) error {
a.callersMu.Lock()
caller, exists := a.callers[name]
if !exists {
return pkgErr("", fmt.Errorf("caller does not exist: %s", name))
}
a.callersMu.Unlock()
caller.cancelMu.Lock()
if caller.cancel != nil {
caller.cancel()
}
caller.cancelMu.Unlock()
return nil
}
func (a *Api) query(client *rumblelivestreamlib.Client) (*rumblelivestreamlib.LivestreamResponse, error) {
resp, err := client.Request()
if err != nil {
return nil, fmt.Errorf("error executing client request: %v", err)
}
return resp, nil
}

192
v1/internal/events/api.go Normal file
View file

@ -0,0 +1,192 @@
package events
import (
"context"
"fmt"
"log"
"sync"
"time"
rumblelivestreamlib "github.com/tylertravisty/rumble-livestream-lib-go"
)
type Api struct {
Name string
Resp *rumblelivestreamlib.LivestreamResponse
Stop bool
}
type apiProducer struct {
cancel context.CancelFunc
cancelMu sync.Mutex
interval time.Duration
name string
url string
}
type ApiProducer struct {
Ch chan Api
close bool
closeMu sync.Mutex
closeCh chan bool
logError *log.Logger
logInfo *log.Logger
producers map[string]*apiProducer
producersMu sync.Mutex
}
func NewApiProducer(logError *log.Logger, logInfo *log.Logger) *ApiProducer {
return &ApiProducer{
Ch: make(chan Api, 10),
closeCh: make(chan bool),
logError: logError,
logInfo: logInfo,
producers: map[string]*apiProducer{},
}
}
func (ap *ApiProducer) Active(name string) bool {
ap.producersMu.Lock()
defer ap.producersMu.Unlock()
_, active := ap.producers[name]
return active
}
func (ap *ApiProducer) Shutdown() error {
wait := false
ap.producersMu.Lock()
if len(ap.producers) > 0 {
ap.closeMu.Lock()
ap.close = true
ap.closeMu.Unlock()
}
for _, producer := range ap.producers {
producer.cancelMu.Lock()
if producer.cancel != nil {
producer.cancel()
}
producer.cancelMu.Unlock()
}
ap.producersMu.Unlock()
if wait {
timer := time.NewTimer(3 * time.Second)
select {
case <-ap.closeCh:
close(ap.Ch)
case <-timer.C:
return pkgErr("", fmt.Errorf("not all producers were stopped"))
}
}
return nil
}
func (ap *ApiProducer) Start(name string, url string, interval time.Duration) error {
if name == "" {
return pkgErr("", fmt.Errorf("name is empty"))
}
if url == "" {
return pkgErr("", fmt.Errorf("url is empty"))
}
ap.producersMu.Lock()
defer ap.producersMu.Unlock()
if _, active := ap.producers[name]; active {
return nil
}
ctx, cancel := context.WithCancel(context.Background())
producer := &apiProducer{
cancel: cancel,
interval: interval,
name: name,
url: url,
}
ap.producers[name] = producer
go ap.run(ctx, producer)
return nil
}
func (ap *ApiProducer) Stop(name string) error {
ap.producersMu.Lock()
producer, exists := ap.producers[name]
if !exists {
return pkgErr("", fmt.Errorf("producer does not exist: %s", name))
}
ap.producersMu.Unlock()
producer.cancelMu.Lock()
if producer.cancel != nil {
producer.cancel()
}
producer.cancelMu.Unlock()
return nil
}
func (ap *ApiProducer) run(ctx context.Context, producer *apiProducer) {
client := &rumblelivestreamlib.Client{StreamKey: producer.url}
for {
resp, err := apiQuery(client)
if err != nil {
ap.logError.Println(pkgErr("error querying api", err))
ap.stop(producer)
return
}
ap.handleResponse(producer, resp)
timer := time.NewTimer(producer.interval)
select {
case <-ctx.Done():
timer.Stop()
ap.stop(producer)
return
case <-timer.C:
}
}
}
func (ap *ApiProducer) handleResponse(p *apiProducer, resp *rumblelivestreamlib.LivestreamResponse) {
if p == nil || resp == nil {
return
}
ap.Ch <- Api{Name: p.name, Resp: resp}
}
func apiQuery(client *rumblelivestreamlib.Client) (*rumblelivestreamlib.LivestreamResponse, error) {
resp, err := client.Request()
if err != nil {
return nil, fmt.Errorf("error executing client request: %v", err)
}
return resp, nil
}
func (ap *ApiProducer) stop(p *apiProducer) {
if p == nil {
return
}
ap.Ch <- Api{Name: p.name, Stop: true}
ap.producersMu.Lock()
delete(ap.producers, p.name)
remaining := len(ap.producers)
ap.producersMu.Unlock()
ap.closeMu.Lock()
if remaining == 0 && ap.close {
select {
case ap.closeCh <- true:
default:
break
}
}
ap.closeMu.Unlock()
return
}

View file

@ -1,8 +1,8 @@
package api package events
import "fmt" import "fmt"
const pkgName = "api" const pkgName = "events"
func pkgErr(prefix string, err error) error { func pkgErr(prefix string, err error) error {
pkgErr := pkgName pkgErr := pkgName

View file

@ -0,0 +1,63 @@
package events
import (
"fmt"
"log"
)
type Producers struct {
logError *log.Logger
logInfo *log.Logger
ApiP *ApiProducer
}
func (p *Producers) Startup() error {
return nil
}
func (p *Producers) Shutdown() error {
err := p.ApiP.Shutdown()
if err != nil {
return pkgErr("error shutting down api producer", err)
}
return nil
}
type ProducersInit func(*Producers) error
func NewProducers(inits ...ProducersInit) (*Producers, error) {
var p Producers
for _, init := range inits {
err := init(&p)
if err != nil {
return nil, err
}
}
return &p, nil
}
func WithLoggers(logError *log.Logger, logInfo *log.Logger) ProducersInit {
return func(p *Producers) error {
if logError == nil {
return pkgErr("", fmt.Errorf("error logger is nil"))
}
p.logError = logError
if logInfo == nil {
return pkgErr("", fmt.Errorf("info logger is nil"))
}
p.logInfo = logInfo
return nil
}
}
func WithApiProducer() ProducersInit {
return func(p *Producers) error {
p.ApiP = NewApiProducer(p.logError, p.logInfo)
return nil
}
}