From 79031a19785083cbbfe9984c6f10d009b804a923 Mon Sep 17 00:00:00 2001 From: tyler Date: Wed, 10 Apr 2024 15:19:00 -0400 Subject: [PATCH] Migrated api package into events package --- NOTES.md | 16 +- v1/app.go | 365 +++++++++++++++++---- v1/frontend/src/components/PageDetails.jsx | 12 +- v1/internal/api/api.go | 257 --------------- v1/internal/events/api.go | 192 +++++++++++ v1/internal/{api => events}/error.go | 4 +- v1/internal/events/producers.go | 63 ++++ 7 files changed, 572 insertions(+), 337 deletions(-) delete mode 100644 v1/internal/api/api.go create mode 100644 v1/internal/events/api.go rename v1/internal/{api => events}/error.go (83%) create mode 100644 v1/internal/events/producers.go diff --git a/NOTES.md b/NOTES.md index a707061..6c45dee 100644 --- a/NOTES.md +++ b/NOTES.md @@ -1,16 +1,18 @@ # Doing +Next steps: +- delete page needs to handle new architecture +- activatePage: verify defer page.activeMu.Unlock does not conflict with display function + +On API errors +- include backoff multiple, if exceeded then stop API + Add option to delete API key for accounts? -First sign in screen: use different function than Login used by dashboard page? -- Dashboard login function emits events that first sign in screen may not need - Add better styles/icon to account details menu -Loading screen replaces signin at / -SignIn moves to /signin -Check for accounts, if accounts -> dashboard, else -> signin -Also check for new updates and tell user +Start screen: +- check for new updates and tell user Trigger on event from API vs. trigger on event from chat - Chat bot trigger on follow requires API key diff --git a/v1/app.go b/v1/app.go index 19488d4..9d3d1f6 100644 --- a/v1/app.go +++ b/v1/app.go @@ -10,8 +10,8 @@ import ( "sync" "time" - "github.com/tylertravisty/rum-goggles/v1/internal/api" "github.com/tylertravisty/rum-goggles/v1/internal/config" + "github.com/tylertravisty/rum-goggles/v1/internal/events" "github.com/tylertravisty/rum-goggles/v1/internal/models" rumblelivestreamlib "github.com/tylertravisty/rumble-livestream-lib-go" "github.com/wailsapp/wails/v2/pkg/runtime" @@ -24,30 +24,53 @@ const ( ChannelType = "Channel" ) +type ApiState struct { + active bool + activeMu sync.Mutex + resp *rumblelivestreamlib.LivestreamResponse + respMu sync.Mutex +} + +type Page struct { + active bool + activeMu sync.Mutex + apiSt *ApiState + displaying bool + displayingMu sync.Mutex + name string +} + // App struct type App struct { - api *api.Api - clients map[string]*rumblelivestreamlib.Client - clientsMu sync.Mutex - ctx context.Context - services *models.Services - logError *log.Logger - logFile *os.File - logFileMu sync.Mutex - logInfo *log.Logger + //apiS *ApiService + cancelCtrl context.CancelFunc + clients map[string]*rumblelivestreamlib.Client + clientsMu sync.Mutex + displaying string + displayingMu sync.Mutex + logError *log.Logger + logFile *os.File + logFileMu sync.Mutex + logInfo *log.Logger + pages map[string]*Page + pagesMu sync.Mutex + producers *events.Producers + services *models.Services + wails context.Context } // NewApp creates a new App application struct func NewApp() *App { app := &App{ clients: map[string]*rumblelivestreamlib.Client{}, + pages: map[string]*Page{}, } err := app.log() if err != nil { log.Fatal("error initializing log: ", err) } - app.api = api.NewApi(app.logError, app.logInfo) + //app.apiS = NewApiService(app.logError, app.logInfo) return app } @@ -69,19 +92,88 @@ func (a *App) log() error { // startup is called when the app starts. The context is saved // so we can call the runtime methods -func (a *App) startup(ctx context.Context) { - a.ctx = ctx - a.api.Startup(ctx) +func (a *App) startup(wails context.Context) { + a.wails = wails + + //a.apiS.Startup(a.apiS.ch) + + // ctx, cancel := context.WithCancel(context.Background()) + // a.cancelCtrl = cancel + + // go a.handle(ctx) +} + +func (a *App) handle(ctx context.Context) { + for { + select { + case apiE := <-a.producers.ApiP.Ch: + err := a.handleApi(apiE) + if err != nil { + a.logError.Println("error handling API event:", err) + } + case <-ctx.Done(): + return + } + } +} + +func (a *App) handleApi(event events.Api) error { + if event.Name == "" { + return fmt.Errorf("event name is empty") + } + + a.pagesMu.Lock() + defer a.pagesMu.Unlock() + page, exists := a.pages[event.Name] + if !exists { + page = &Page{ + apiSt: &ApiState{}, + name: event.Name, + } + a.pages[event.Name] = page + } + + page.apiSt.activeMu.Lock() + page.apiSt.active = event.Stop + page.apiSt.activeMu.Unlock() + + if event.Stop { + runtime.EventsEmit(a.wails, "ApiActive-"+page.name, false) + return nil + } + + runtime.EventsEmit(a.wails, "ApiActive-"+page.name, true) + page.displayingMu.Lock() + if page.displaying { + runtime.EventsEmit(a.wails, "PageActive", true) + } + page.displayingMu.Unlock() + + page.apiSt.respMu.Lock() + page.apiSt.resp = event.Resp + page.apiSt.respMu.Unlock() + + a.updatePage(page) + + return nil } func (a *App) shutdown(ctx context.Context) { - if a.api != nil { - err := a.api.Shutdown() - if err != nil { - a.logError.Println("error shutting down api:", err) - } + // if a.apiS != nil && a.apiS.Api != nil { + // err := a.apiS.Shutdown() + // if err != nil { + // a.logError.Println("error shutting down api:", err) + // } + + // close(a.apiS.ch) + // } + err := a.producers.Shutdown() + if err != nil { + a.logError.Println("error closing event producers:", err) } + a.cancelCtrl() + if a.services != nil { err := a.services.Close() if err != nil { @@ -101,27 +193,39 @@ func (a *App) shutdown(ctx context.Context) { func (a *App) Start() (bool, error) { - runtime.EventsEmit(a.ctx, "StartupMessage", "Initializing database...") + runtime.EventsEmit(a.wails, "StartupMessage", "Initializing database...") err := a.initServices() if err != nil { a.logError.Println("error initializing services:", err) return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.") } - runtime.EventsEmit(a.ctx, "StartupMessage", "Initializing database complete.") + runtime.EventsEmit(a.wails, "StartupMessage", "Initializing database complete.") - runtime.EventsEmit(a.ctx, "StartupMessage", "Verifying account sessions...") + runtime.EventsEmit(a.wails, "StartupMessage", "Verifying account sessions...") count, err := a.verifyAccounts() if err != nil { a.logError.Println("error verifying accounts:", err) return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.") } - runtime.EventsEmit(a.ctx, "StartupMessage", "Verifying account sessions complete.") + runtime.EventsEmit(a.wails, "StartupMessage", "Verifying account sessions complete.") + + runtime.EventsEmit(a.wails, "StartupMessage", "Initializing event producers...") + err = a.initProducers() + if err != nil { + a.logError.Println("error initializing producers:", err) + return false, fmt.Errorf("Error starting Rum Goggles. Try restarting.") + } + runtime.EventsEmit(a.wails, "StartupMessage", "Initializing event producers complete.") // TODO: check for update - if available, pop up window // runtime.EventsEmit(a.ctx, "StartupMessage", "Checking for updates...") // update, err = a.checkForUpdate() // runtime.EventsEmit(a.ctx, "StartupMessage", "Checking for updates complete.") + ctx, cancel := context.WithCancel(context.Background()) + a.cancelCtrl = cancel + go a.handle(ctx) + signin := true if count > 0 { signin = false @@ -130,6 +234,25 @@ func (a *App) Start() (bool, error) { return signin, nil } +func (a *App) initProducers() error { + producers, err := events.NewProducers( + events.WithLoggers(a.logError, a.logInfo), + events.WithApiProducer(), + ) + if err != nil { + return fmt.Errorf("error initializing producers: %v", err) + } + + err = producers.Startup() + if err != nil { + return fmt.Errorf("error starting producers: %v", err) + } + + a.producers = producers + + return nil +} + func (a *App) initServices() error { db, err := config.Database() if err != nil { @@ -329,14 +452,14 @@ func (a *App) Login(username string, password string) error { a.logError.Println("account name is nil") return fmt.Errorf("Error logging in. Try again.") } - runtime.EventsEmit(a.ctx, "LoggedIn-"+*name, true) + runtime.EventsEmit(a.wails, "LoggedIn-"+*name, true) list, err := a.accountList() if err != nil { a.logError.Println("error getting account list:", err) return fmt.Errorf("Error logging in. Try again.") } - runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) + runtime.EventsEmit(a.wails, "PageSideBarAccounts", list) err = a.openDetails(acct) if err != nil { @@ -402,14 +525,14 @@ func (a *App) Logout(id int64) error { a.logError.Println("account name is nil") return fmt.Errorf("Error logging out. Try again.") } - runtime.EventsEmit(a.ctx, "LoggedIn-"+*name, false) + runtime.EventsEmit(a.wails, "LoggedIn-"+*name, false) list, err := a.accountList() if err != nil { a.logError.Println("error getting account list:", err) return fmt.Errorf("Error logging out. Try again.") } - runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) + runtime.EventsEmit(a.wails, "PageSideBarAccounts", list) err = a.openDetails(acct) if err != nil { @@ -510,7 +633,7 @@ func (a *App) OpenChannel(id int64) error { return nil } -type Page interface { +type PageInfo interface { Id() *int64 KeyUrl() *string LoggedIn() bool @@ -527,39 +650,39 @@ type PageDetails struct { Type string `json:"type"` } -func (a *App) openDetails(p Page) error { - id := p.Id() +func (a *App) openDetails(pi PageInfo) error { + id := pi.Id() if id == nil { return fmt.Errorf("page id is nil") } hasApi := true - key := p.KeyUrl() + key := pi.KeyUrl() if key == nil || *key == "" { hasApi = false } - name := p.String() + name := pi.String() if name == nil { return fmt.Errorf("page name is nil") } - title := p.Title() + title := pi.Title() if title == nil { return fmt.Errorf("page title is nil") } - runtime.EventsEmit(a.ctx, "PageDetails", PageDetails{ + runtime.EventsEmit(a.wails, "PageDetails", PageDetails{ ID: *id, HasApi: hasApi, - LoggedIn: p.LoggedIn(), + LoggedIn: pi.LoggedIn(), Title: *title, - Type: p.Type(), + Type: pi.Type(), }) - err := a.api.Display(*name) + err := a.display(*name) if err != nil { - return fmt.Errorf("error displaying api for %s: %v", *name, err) + return fmt.Errorf("error displaying page: %v", err) } return nil @@ -606,32 +729,60 @@ func (a *App) ActivateChannel(id int64) error { // If page is inactivate, activate. // If page is active, deactivate. -func (a *App) activatePage(p Page) error { - name := p.String() +func (a *App) activatePage(pi PageInfo) error { + name := pi.String() if name == nil { return fmt.Errorf("page name is nil") } - url := p.KeyUrl() + url := pi.KeyUrl() if url == nil { return fmt.Errorf("page key url is nil") } - if a.api.Active(*name) { - err := a.api.Stop(*name) - if err != nil { - return fmt.Errorf("error stopping api: %v", err) + a.pagesMu.Lock() + page, exists := a.pages[*name] + if !exists { + page = &Page{ + active: false, + apiSt: &ApiState{}, + name: *name, } + a.pages[*name] = page + } + a.pagesMu.Unlock() + + page.activeMu.Lock() + defer page.activeMu.Unlock() + if page.active { + if a.producers.ApiP.Active(*name) { + err := a.producers.ApiP.Stop(*name) + if err != nil { + return fmt.Errorf("error stopping api: %v", err) + } + } + + page.displayingMu.Lock() + if page.displaying { + runtime.EventsEmit(a.wails, "PageActive", false) + } + page.displayingMu.Unlock() + + page.active = false return nil } + page.active = true - err := a.api.Start(*name, *url, 10*time.Second) + err := a.producers.ApiP.Start(*name, *url, 10*time.Second) if err != nil { return fmt.Errorf("error starting api: %v", err) } - err = a.api.Display(*name) + runtime.EventsEmit(a.wails, "ApiActive-"+*name, true) + + err = a.display(*name) if err != nil { - return fmt.Errorf("error displaying api: %v", err) + return fmt.Errorf("error displaying page: %v", err) } + runtime.EventsEmit(a.wails, "PageActive", true) return nil } @@ -661,8 +812,8 @@ func (a *App) DeleteAccount(id int64) error { return fmt.Errorf("Error deleting account. Try again.") } - if a.api.Active(*name) { - err := a.api.Stop(*name) + if a.producers.ApiP.Active(*name) { + err := a.producers.ApiP.Stop(*name) if err != nil { a.logError.Println("error stopping api:", err) return fmt.Errorf("Error deleting account. Try again.") @@ -675,7 +826,7 @@ func (a *App) DeleteAccount(id int64) error { return fmt.Errorf("Error deleting account. Try again.") } - runtime.EventsEmit(a.ctx, "PageDetails", nil) + runtime.EventsEmit(a.wails, "PageDetails", nil) list, err := a.accountList() if err != nil { @@ -683,7 +834,7 @@ func (a *App) DeleteAccount(id int64) error { return fmt.Errorf("Error deleting account. Try again.") } - runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) + runtime.EventsEmit(a.wails, "PageSideBarAccounts", list) return nil } @@ -704,8 +855,8 @@ func (a *App) DeleteChannel(id int64) error { return fmt.Errorf("Error deleting channel. Try again.") } - if a.api.Active(*name) { - err := a.api.Stop(*name) + if a.producers.ApiP.Active(*name) { + err := a.producers.ApiP.Stop(*name) if err != nil { a.logError.Println("error stopping api:", err) return fmt.Errorf("Error deleting channel. Try again.") @@ -718,7 +869,7 @@ func (a *App) DeleteChannel(id int64) error { return fmt.Errorf("Error deleting channel. Try again.") } - runtime.EventsEmit(a.ctx, "PageDetails", nil) + runtime.EventsEmit(a.wails, "PageDetails", nil) list, err := a.accountList() if err != nil { @@ -726,7 +877,79 @@ func (a *App) DeleteChannel(id int64) error { return fmt.Errorf("Error deleting channel. Try again.") } - runtime.EventsEmit(a.ctx, "PageSideBarAccounts", list) + runtime.EventsEmit(a.wails, "PageSideBarAccounts", list) + + return nil +} + +func (a *App) display(name string) error { + a.displayingMu.Lock() + defer a.displayingMu.Unlock() + + if name == a.displaying { + return nil + } + + a.pagesMu.Lock() + defer a.pagesMu.Unlock() + + if a.displaying != "" { + displaying, exists := a.pages[a.displaying] + a.displaying = "" + if !exists { + return fmt.Errorf("displaying page does not exist: %s", a.display) + } + displaying.displayingMu.Lock() + displaying.displaying = false + displaying.displayingMu.Unlock() + } + + page, exists := a.pages[name] + if !exists { + page = &Page{ + active: false, + apiSt: &ApiState{}, + name: name, + } + a.pages[name] = page + } + page.displayingMu.Lock() + page.displaying = true + page.displayingMu.Unlock() + + a.displaying = name + + err := a.updatePage(page) + if err != nil { + return fmt.Errorf("error updating page: %v", err) + } + + return nil +} + +func (a *App) updatePage(p *Page) error { + if p == nil { + return fmt.Errorf("page is nil") + } + + //TODO check p.api == nil + + p.apiSt.respMu.Lock() + defer p.apiSt.respMu.Unlock() + + p.displayingMu.Lock() + if p.displaying { + runtime.EventsEmit(a.wails, "PageActivity", p.apiSt.resp) + p.activeMu.Lock() + runtime.EventsEmit(a.wails, "PageActive", p.active) + p.activeMu.Unlock() + } + p.displayingMu.Unlock() + + if p.apiSt.resp != nil { + isLive := len(p.apiSt.resp.Livestreams) > 0 + runtime.EventsEmit(a.wails, "PageLive-"+p.name, isLive) + } return nil } @@ -735,14 +958,24 @@ func (a *App) PageStatus(name string) { active := false isLive := false - resp := a.api.Response(name) - if resp != nil { - active = true - isLive = len(resp.Livestreams) > 0 + // resp := a.api.Response(name) + a.pagesMu.Lock() + defer a.pagesMu.Unlock() + page, exists := a.pages[name] + if exists && page.apiSt != nil { + page.apiSt.activeMu.Lock() + active = page.apiSt.active + page.apiSt.activeMu.Unlock() + + page.apiSt.respMu.Lock() + if page.apiSt.resp != nil { + isLive = len(page.apiSt.resp.Livestreams) > 0 + } + page.apiSt.respMu.Unlock() } - runtime.EventsEmit(a.ctx, "ApiActive-"+name, active) - runtime.EventsEmit(a.ctx, "PageLive-"+name, isLive) + runtime.EventsEmit(a.wails, "ApiActive-"+name, active) + runtime.EventsEmit(a.wails, "PageLive-"+name, isLive) } func (a *App) UpdateAccountApi(id int64, apiKey string) error { @@ -761,8 +994,8 @@ func (a *App) UpdateAccountApi(id int64, apiKey string) error { return fmt.Errorf("Error updating account. Try again.") } - if a.api.Active(*name) { - err := a.api.Stop(*name) + if a.producers.ApiP.Active(*name) { + err := a.producers.ApiP.Stop(*name) if err != nil { a.logError.Println("error stopping api:", err) return fmt.Errorf("Error updating account. Try again.") @@ -806,8 +1039,8 @@ func (a *App) UpdateChannelApi(id int64, apiKey string) error { return fmt.Errorf("Error updating channel. Try again.") } - if a.api.Active(*name) { - err := a.api.Stop(*name) + if a.producers.ApiP.Active(*name) { + err := a.producers.ApiP.Stop(*name) if err != nil { a.logError.Println("error stopping api:", err) return fmt.Errorf("Error updating channel. Try again.") diff --git a/v1/frontend/src/components/PageDetails.jsx b/v1/frontend/src/components/PageDetails.jsx index d56ffeb..eb43f90 100644 --- a/v1/frontend/src/components/PageDetails.jsx +++ b/v1/frontend/src/components/PageDetails.jsx @@ -96,11 +96,13 @@ function PageDetails(props) { EventsOn('PageActivity', (event) => { setActivity(event); - setActive(true); - if (event.livestreams.length > 0) { - setLive(true); - } else { - setLive(false); + if (event !== null) { + setActive(true); + if (event.livestreams.length > 0) { + setLive(true); + } else { + setLive(false); + } } }); diff --git a/v1/internal/api/api.go b/v1/internal/api/api.go deleted file mode 100644 index 9e70483..0000000 --- a/v1/internal/api/api.go +++ /dev/null @@ -1,257 +0,0 @@ -package api - -import ( - "context" - "fmt" - "log" - "sync" - "time" - - rumblelivestreamlib "github.com/tylertravisty/rumble-livestream-lib-go" - "github.com/wailsapp/wails/v2/pkg/runtime" -) - -type Api struct { - callers map[string]*caller - callersMu sync.Mutex - ctx context.Context - display string - displayMu sync.Mutex - logError *log.Logger - logInfo *log.Logger -} - -type caller struct { - cancel context.CancelFunc - cancelMu sync.Mutex - display bool - displayMu sync.Mutex - interval time.Duration - name string - response *rumblelivestreamlib.LivestreamResponse - responseMu sync.Mutex - url string -} - -type event struct { - close bool - err error - name string - resp *rumblelivestreamlib.LivestreamResponse -} - -func NewApi(logError *log.Logger, logInfo *log.Logger) *Api { - return &Api{logError: logError, logInfo: logInfo} -} - -func (a *Api) Response(name string) *rumblelivestreamlib.LivestreamResponse { - a.callersMu.Lock() - defer a.callersMu.Unlock() - caller, exists := a.callers[name] - if !exists { - return nil - } - - caller.responseMu.Lock() - defer caller.responseMu.Unlock() - - copy := *caller.response - return © -} - -func (a *Api) Display(name string) error { - a.displayMu.Lock() - defer a.displayMu.Unlock() - if name == a.display { - return nil - } - - a.callersMu.Lock() - defer a.callersMu.Unlock() - - if a.display != "" { - displaying, exists := a.callers[a.display] - if !exists { - return pkgErr("", fmt.Errorf("displaying caller does not exist: %s", a.display)) - } - displaying.displayMu.Lock() - displaying.display = false - displaying.displayMu.Unlock() - a.display = "" - } - - caller, exists := a.callers[name] - if !exists { - // return pkgErr("", fmt.Errorf("caller does not exist: %s", name)) - runtime.EventsEmit(a.ctx, "PageActive", false) - return nil - } - caller.displayMu.Lock() - caller.display = true - caller.displayMu.Unlock() - - a.display = name - - a.handleResponse(caller) - - return nil -} - -func (a *Api) Startup(ctx context.Context) { - a.ctx = ctx - a.callers = map[string]*caller{} -} - -func (a *Api) Shutdown() error { - for _, caller := range a.callers { - caller.cancelMu.Lock() - if caller.cancel != nil { - caller.cancel() - } - caller.cancelMu.Unlock() - } - - return nil -} - -func (a *Api) Start(name string, url string, interval time.Duration) error { - if name == "" { - return fmt.Errorf("name is empty") - } - if url == "" { - return fmt.Errorf("url is empty") - } - - a.callersMu.Lock() - defer a.callersMu.Unlock() - if _, active := a.callers[name]; active { - return nil - } - - ctx, cancel := context.WithCancel(context.Background()) - caller := &caller{ - cancel: cancel, - interval: interval, - name: name, - url: url, - } - a.callers[name] = caller - go a.run(ctx, caller) - - return nil -} - -func (a *Api) run(ctx context.Context, caller *caller) { - client := &rumblelivestreamlib.Client{StreamKey: caller.url} - for { - runtime.EventsEmit(a.ctx, "ApiActive-"+caller.name, true) - caller.displayMu.Lock() - if caller.display { - runtime.EventsEmit(a.ctx, "PageActive", true) - } - caller.displayMu.Unlock() - - resp, err := a.query(client) - if err != nil { - a.logError.Println(pkgErr("error querying api", err)) - // runtime.EventsEmit(a.ctx, "ApiActive-"+caller.name, false) - a.stop(caller) - return - } - caller.responseMu.Lock() - caller.response = resp - caller.responseMu.Unlock() - a.handleResponse(caller) - - timer := time.NewTimer(caller.interval) - select { - case <-ctx.Done(): - timer.Stop() - // runtime.EventsEmit(a.ctx, "ApiActive-"+caller.name, false) - a.stop(caller) - return - case <-timer.C: - } - } -} - -func (a *Api) handleResponse(c *caller) { - if c == nil { - return - } - - c.responseMu.Lock() - defer c.responseMu.Unlock() - if c.response == nil { - return - } - - c.displayMu.Lock() - if c.display { - runtime.EventsEmit(a.ctx, "PageActivity", c.response) - } - c.displayMu.Unlock() - - isLive := len(c.response.Livestreams) > 0 - runtime.EventsEmit(a.ctx, "PageLive-"+c.name, isLive) -} - -func (a *Api) stop(c *caller) { - if c == nil { - return - } - - runtime.EventsEmit(a.ctx, "ApiActive-"+c.name, false) - c.displayMu.Lock() - if c.display { - c.display = false - runtime.EventsEmit(a.ctx, "PageActive", false) - } - c.displayMu.Unlock() - - a.displayMu.Lock() - if a.display == c.name { - a.display = "" - } - a.displayMu.Unlock() - - a.callersMu.Lock() - delete(a.callers, c.name) - a.callersMu.Unlock() - - return -} - -func (a *Api) Active(name string) bool { - a.callersMu.Lock() - defer a.callersMu.Unlock() - _, active := a.callers[name] - - return active -} - -func (a *Api) Stop(name string) error { - a.callersMu.Lock() - caller, exists := a.callers[name] - if !exists { - return pkgErr("", fmt.Errorf("caller does not exist: %s", name)) - } - a.callersMu.Unlock() - - caller.cancelMu.Lock() - if caller.cancel != nil { - caller.cancel() - } - caller.cancelMu.Unlock() - - return nil -} - -func (a *Api) query(client *rumblelivestreamlib.Client) (*rumblelivestreamlib.LivestreamResponse, error) { - resp, err := client.Request() - if err != nil { - return nil, fmt.Errorf("error executing client request: %v", err) - } - - return resp, nil -} diff --git a/v1/internal/events/api.go b/v1/internal/events/api.go new file mode 100644 index 0000000..71f839f --- /dev/null +++ b/v1/internal/events/api.go @@ -0,0 +1,192 @@ +package events + +import ( + "context" + "fmt" + "log" + "sync" + "time" + + rumblelivestreamlib "github.com/tylertravisty/rumble-livestream-lib-go" +) + +type Api struct { + Name string + Resp *rumblelivestreamlib.LivestreamResponse + Stop bool +} + +type apiProducer struct { + cancel context.CancelFunc + cancelMu sync.Mutex + interval time.Duration + name string + url string +} + +type ApiProducer struct { + Ch chan Api + close bool + closeMu sync.Mutex + closeCh chan bool + logError *log.Logger + logInfo *log.Logger + producers map[string]*apiProducer + producersMu sync.Mutex +} + +func NewApiProducer(logError *log.Logger, logInfo *log.Logger) *ApiProducer { + return &ApiProducer{ + Ch: make(chan Api, 10), + closeCh: make(chan bool), + logError: logError, + logInfo: logInfo, + producers: map[string]*apiProducer{}, + } +} + +func (ap *ApiProducer) Active(name string) bool { + ap.producersMu.Lock() + defer ap.producersMu.Unlock() + _, active := ap.producers[name] + + return active +} + +func (ap *ApiProducer) Shutdown() error { + wait := false + ap.producersMu.Lock() + if len(ap.producers) > 0 { + ap.closeMu.Lock() + ap.close = true + ap.closeMu.Unlock() + } + for _, producer := range ap.producers { + producer.cancelMu.Lock() + if producer.cancel != nil { + producer.cancel() + } + producer.cancelMu.Unlock() + } + ap.producersMu.Unlock() + + if wait { + timer := time.NewTimer(3 * time.Second) + select { + case <-ap.closeCh: + close(ap.Ch) + case <-timer.C: + return pkgErr("", fmt.Errorf("not all producers were stopped")) + } + } + + return nil +} + +func (ap *ApiProducer) Start(name string, url string, interval time.Duration) error { + if name == "" { + return pkgErr("", fmt.Errorf("name is empty")) + } + if url == "" { + return pkgErr("", fmt.Errorf("url is empty")) + } + + ap.producersMu.Lock() + defer ap.producersMu.Unlock() + if _, active := ap.producers[name]; active { + return nil + } + + ctx, cancel := context.WithCancel(context.Background()) + producer := &apiProducer{ + cancel: cancel, + interval: interval, + name: name, + url: url, + } + ap.producers[name] = producer + go ap.run(ctx, producer) + + return nil +} + +func (ap *ApiProducer) Stop(name string) error { + ap.producersMu.Lock() + producer, exists := ap.producers[name] + if !exists { + return pkgErr("", fmt.Errorf("producer does not exist: %s", name)) + } + ap.producersMu.Unlock() + + producer.cancelMu.Lock() + if producer.cancel != nil { + producer.cancel() + } + producer.cancelMu.Unlock() + + return nil +} + +func (ap *ApiProducer) run(ctx context.Context, producer *apiProducer) { + client := &rumblelivestreamlib.Client{StreamKey: producer.url} + for { + resp, err := apiQuery(client) + if err != nil { + ap.logError.Println(pkgErr("error querying api", err)) + ap.stop(producer) + return + } + ap.handleResponse(producer, resp) + + timer := time.NewTimer(producer.interval) + select { + case <-ctx.Done(): + timer.Stop() + ap.stop(producer) + return + case <-timer.C: + } + } +} + +func (ap *ApiProducer) handleResponse(p *apiProducer, resp *rumblelivestreamlib.LivestreamResponse) { + if p == nil || resp == nil { + return + } + + ap.Ch <- Api{Name: p.name, Resp: resp} +} + +func apiQuery(client *rumblelivestreamlib.Client) (*rumblelivestreamlib.LivestreamResponse, error) { + resp, err := client.Request() + if err != nil { + return nil, fmt.Errorf("error executing client request: %v", err) + } + + return resp, nil +} + +func (ap *ApiProducer) stop(p *apiProducer) { + if p == nil { + return + } + + ap.Ch <- Api{Name: p.name, Stop: true} + + ap.producersMu.Lock() + delete(ap.producers, p.name) + remaining := len(ap.producers) + ap.producersMu.Unlock() + + ap.closeMu.Lock() + if remaining == 0 && ap.close { + select { + case ap.closeCh <- true: + default: + break + } + } + ap.closeMu.Unlock() + + return +} diff --git a/v1/internal/api/error.go b/v1/internal/events/error.go similarity index 83% rename from v1/internal/api/error.go rename to v1/internal/events/error.go index c368953..ca54e68 100644 --- a/v1/internal/api/error.go +++ b/v1/internal/events/error.go @@ -1,8 +1,8 @@ -package api +package events import "fmt" -const pkgName = "api" +const pkgName = "events" func pkgErr(prefix string, err error) error { pkgErr := pkgName diff --git a/v1/internal/events/producers.go b/v1/internal/events/producers.go new file mode 100644 index 0000000..9359496 --- /dev/null +++ b/v1/internal/events/producers.go @@ -0,0 +1,63 @@ +package events + +import ( + "fmt" + "log" +) + +type Producers struct { + logError *log.Logger + logInfo *log.Logger + ApiP *ApiProducer +} + +func (p *Producers) Startup() error { + return nil +} + +func (p *Producers) Shutdown() error { + err := p.ApiP.Shutdown() + if err != nil { + return pkgErr("error shutting down api producer", err) + } + + return nil +} + +type ProducersInit func(*Producers) error + +func NewProducers(inits ...ProducersInit) (*Producers, error) { + var p Producers + for _, init := range inits { + err := init(&p) + if err != nil { + return nil, err + } + } + + return &p, nil +} + +func WithLoggers(logError *log.Logger, logInfo *log.Logger) ProducersInit { + return func(p *Producers) error { + if logError == nil { + return pkgErr("", fmt.Errorf("error logger is nil")) + } + p.logError = logError + + if logInfo == nil { + return pkgErr("", fmt.Errorf("info logger is nil")) + } + p.logInfo = logInfo + + return nil + } +} + +func WithApiProducer() ProducersInit { + return func(p *Producers) error { + p.ApiP = NewApiProducer(p.logError, p.logInfo) + + return nil + } +}