Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/gobot update eventer #1124

Draft
wants to merge 7 commits into
base: dev
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 73 additions & 22 deletions eventer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package gobot

import "sync"
import (
"sync"
)

const (
defaultEventChanBufferSize = 10
maxEventChanBufferSize = 10240
maxChanWorkerCount = 128
EventCrash = "crash"
)

type eventChannel chan *Event

Expand All @@ -14,12 +23,16 @@ type eventer struct {
// map of out channels used by subscribers
outs map[eventChannel]eventChannel

// the in/out channel length
bufferSize int

// controls the maximum number of concurrent executions when eventer is Published to
workerCount int

// mutex to protect the eventChannel map
eventsMutex sync.Mutex
}

const eventChanBufferSize = 10

// Eventer is the interface which describes how a Driver or Adaptor
// handles events.
type Eventer interface {
Expand Down Expand Up @@ -52,25 +65,57 @@ type Eventer interface {
Once(name string, f func(s interface{})) (err error)
}

// EventerOptionFn allows the configurable parameters of an eventer to be modified
type EventerOptionFn func(*eventer)

// WithBufferSize allows an eventer's buffer size to be configured
func WithBufferSize(bufferSize int) EventerOptionFn {
return func(e *eventer) {
if bufferSize >= maxEventChanBufferSize {
e.bufferSize = maxEventChanBufferSize
} else if bufferSize <= 0 {
e.bufferSize = defaultEventChanBufferSize
} else {
e.bufferSize = bufferSize
}
}
}

// WithWorkerCount allows an eventer's workerCount to be configured
func WithWorkerCount(workerCount int) EventerOptionFn {
return func(e *eventer) {
if workerCount >= maxChanWorkerCount {
e.workerCount = maxChanWorkerCount
} else if workerCount <= 0 {
e.workerCount = 1
} else {
e.workerCount = workerCount
}
}
}

// NewEventer returns a new Eventer.
func NewEventer() Eventer {
func NewEventer(fns ...EventerOptionFn) Eventer {
evtr := &eventer{
eventnames: make(map[string]string),
in: make(eventChannel, eventChanBufferSize),
outs: make(map[eventChannel]eventChannel),
eventnames: make(map[string]string),
outs: make(map[eventChannel]eventChannel),
bufferSize: defaultEventChanBufferSize,
workerCount: 1,
}

for _, fn := range fns {
fn(evtr)
}

// goroutine to cascade "in" events to all "out" event channels
evtr.in = make(eventChannel, evtr.bufferSize)
go func() {
for {
select {
case evt := <-evtr.in:
evtr.eventsMutex.Lock()
for _, out := range evtr.outs {
out <- evt
}
evtr.eventsMutex.Unlock()
for evt := range evtr.in {
evtr.eventsMutex.Lock()
for _, out := range evtr.outs {
out <- evt
}
evtr.eventsMutex.Unlock()
}
}()

Expand Down Expand Up @@ -108,7 +153,7 @@ func (e *eventer) Publish(name string, data interface{}) {
func (e *eventer) Subscribe() eventChannel {
e.eventsMutex.Lock()
defer e.eventsMutex.Unlock()
out := make(eventChannel, eventChanBufferSize)
out := make(eventChannel, e.bufferSize)
e.outs[out] = out
return out
}
Expand All @@ -123,16 +168,22 @@ func (e *eventer) Unsubscribe(events eventChannel) {
// On executes the event handler f when e is Published to.
func (e *eventer) On(n string, f func(s interface{})) (err error) {
out := e.Subscribe()
go func() {
for {
select {
case evt := <-out:
for i := 0; i < e.workerCount; i++ {
go func() {
// Add panic handling for goroutines to prevent panics caused by the callback function `f`
defer func() {
if r := recover(); r != nil {
e.Publish(EventCrash, r)
}
}()

for evt := range out {
if evt.Name == n {
f(evt.Data)
}
}
}
}()
}()
}

return
}
Expand Down