65 lines
1.1 KiB
Go
65 lines
1.1 KiB
Go
package eventbus
|
|
|
|
import (
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
type EventBus interface {
|
|
Subscribe(topic string) chan any
|
|
Publish(topic string, data any)
|
|
Unsubscribe(topic string, ch chan any)
|
|
}
|
|
|
|
type EBus struct {
|
|
mu sync.RWMutex
|
|
topics map[string][]chan any
|
|
}
|
|
|
|
func New() *EBus {
|
|
return &EBus{
|
|
mu: sync.RWMutex{},
|
|
topics: map[string][]chan any{},
|
|
}
|
|
}
|
|
|
|
func (eb *EBus) Subscribe(topic string) chan any {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
ch := make(chan any, 20)
|
|
eb.topics[topic] = append(eb.topics[topic], ch)
|
|
return ch
|
|
}
|
|
|
|
func (eb *EBus) Publish(topic string, data any) {
|
|
eb.mu.RLock()
|
|
defer eb.mu.RUnlock()
|
|
for _, ch := range eb.topics[topic] {
|
|
select {
|
|
case ch <- data:
|
|
default:
|
|
log.Printf("[Event Bus]: Could not pass Message %v to %v channel full", data, topic)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (eb *EBus) Unsubscribe(topic string, c chan any) {
|
|
eb.mu.Lock()
|
|
defer eb.mu.Unlock()
|
|
|
|
channels, ok := eb.topics[topic]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
for i, ch := range channels {
|
|
if ch != c {
|
|
eb.topics[topic] = append(channels[:i], channels[i+1:]...) // example: 5 channels max i=3 channels[:3] (0,1,2) + channels[3+1:] (4,5)
|
|
|
|
close(ch)
|
|
return
|
|
}
|
|
}
|
|
}
|