Skip to content

Commit

Permalink
refactor(plugins): read configuration uniformly across Go plugins
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
  • Loading branch information
jasondellaluce authored and poiana committed Jan 13, 2022
1 parent c32a564 commit ac88ed6
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 52 deletions.
59 changes: 24 additions & 35 deletions plugins/cloudtrail/cloudtrail.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ const (
PluginEventSource = "aws_cloudtrail"
)

const defaultS3DownloadConcurrency = 1
const verbose bool = true

func min(a, b int) int {
Expand Down Expand Up @@ -91,22 +90,20 @@ type snsMessage struct {
Keys []string `json:"s3ObjectKey"`
}

// This is the global plugin state, identifying an instance of this plugin
type pluginContext struct {
plugins.BasePlugin
jparser fastjson.Parser
jdata *fastjson.Value
jdataEvtnum uint64 // The event number jdata refers to. Used to know when we can skip the unmarshaling.
sqsDelete bool // If true, will delete SQS Messages immediately after receiving them
s3DownloadConcurrency int
useAsync bool
}

// Struct for plugin init config
type pluginInitConfig struct {
S3DownloadConcurrency int `json:"s3DownloadConcurrency"`
SQSDelete bool `json:"sqsDelete"`
UseAsync bool `json:"useAsync"`
SQSDelete bool `json:"sqsDelete"` // If true, will delete SQS Messages immediately after receiving them
UseAsync bool `json:"useAsync"` // If false, async extraction optimization is disabled
}

// This is the global plugin state, identifying an instance of this plugin
type pluginContext struct {
plugins.BasePlugin
jparser fastjson.Parser
jdata *fastjson.Value
jdataEvtnum uint64 // The event number jdata refers to. Used to know when we can skip the unmarshaling.
config pluginInitConfig
}

type OpenMode int
Expand Down Expand Up @@ -140,6 +137,12 @@ func init() {
extractor.Register(p)
}

func (p *pluginInitConfig) setDefault() {
p.SQSDelete = true
p.S3DownloadConcurrency = 1
p.UseAsync = true
}

func (p *pluginContext) Info() *plugins.Info {
log.Printf("[%s] Info\n", PluginName)
return &plugins.Info{
Expand All @@ -161,40 +164,26 @@ func (p *pluginContext) Init(cfg string) error {

log.Printf("[%s] Init, config=%s\n", PluginName, cfg)

// initialize state
p.jdataEvtnum = math.MaxUint64
p.sqsDelete = true
p.s3DownloadConcurrency = defaultS3DownloadConcurrency
p.useAsync = false

// set config default values and read the passed one, if available
p.config.setDefault()
if cfg != "" {
var initConfig pluginInitConfig
initConfig.SQSDelete = true
initConfig.S3DownloadConcurrency = defaultS3DownloadConcurrency
initConfig.UseAsync = false

err := json.Unmarshal([]byte(cfg), &initConfig)
err := json.Unmarshal([]byte(cfg), &p.config)
if err != nil {
return err
}

p.sqsDelete = initConfig.SQSDelete
p.s3DownloadConcurrency = initConfig.S3DownloadConcurrency
p.useAsync = initConfig.UseAsync
}

if p.useAsync {
extract.StartAsync(p)
}
// enable/disable async extraction optimazion (enabled by default)
extract.SetAsync(p.config.UseAsync)

return nil
}

func (p *pluginContext) Destroy() {
log.Printf("[%s] Destroy\n", PluginName)

if p.useAsync {
extract.StopAsync(p)
}
}

func (p *pluginContext) Open(params string) (source.Instance, error) {
Expand All @@ -219,7 +208,7 @@ func (p *pluginContext) Open(params string) (source.Instance, error) {

// Create an array of download buffers that will be used to concurrently
// download files from s3
oCtx.s3.DownloadBufs = make([][]byte, p.s3DownloadConcurrency)
oCtx.s3.DownloadBufs = make([][]byte, p.config.S3DownloadConcurrency)

return oCtx, nil
}
Expand Down
6 changes: 3 additions & 3 deletions plugins/cloudtrail/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func getMoreSQSFiles(pCtx *pluginContext, oCtx *openContext) error {
return nil
}

if pCtx.sqsDelete {
if pCtx.config.SQSDelete {
// Delete the message from the queue so it won't be read again
delInput := &sqs.DeleteMessageInput{
QueueUrl: &oCtx.queueURL,
Expand Down Expand Up @@ -256,9 +256,9 @@ func readNextFileS3(pCtx *pluginContext, oCtx *openContext) ([]byte, error) {
return oCtx.s3.DownloadBufs[curBuf], nil
}

dlErrChan = make(chan error, pCtx.s3DownloadConcurrency)
dlErrChan = make(chan error, pCtx.config.S3DownloadConcurrency)
k := oCtx.s3.lastDownloadedFileNum
oCtx.s3.nFilledBufs = min(pCtx.s3DownloadConcurrency, len(oCtx.files)-k)
oCtx.s3.nFilledBufs = min(pCtx.config.S3DownloadConcurrency, len(oCtx.files)-k)
for j, f := range oCtx.files[k : k+oCtx.s3.nFilledBufs] {
oCtx.s3.DownloadWg.Add(1)
go s3Download(oCtx, oCtx.s3.downloader, f.name, j)
Expand Down
31 changes: 17 additions & 14 deletions plugins/dummy/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ const (

///////////////////////////////////////////////////////////////////////////////

type MyPlugin struct {
plugins.BasePlugin
type MyPluginConfig struct {
// This reflects potential internal state for the plugin. In
// this case, the plugin is configured with a jitter (e.g. a
// random amount to add to the sample with each call to Next()
jitter uint64
Jitter uint64 `json:"jitter"`
}

type MyPlugin struct {
plugins.BasePlugin
// Will be used to randomize samples
rand *rand.Rand
// Contains the init configuration values
config MyPluginConfig
}

type MyInstance struct {
Expand All @@ -79,6 +83,10 @@ func init() {
extractor.Register(p)
}

func (p *MyPluginConfig) setDefault() {
p.Jitter = 10
}

func (m *MyPlugin) Info() *plugins.Info {
log.Printf("[%s] Info\n", PluginName)
return &plugins.Info{
Expand All @@ -95,27 +103,22 @@ func (m *MyPlugin) Info() *plugins.Info {
func (m *MyPlugin) Init(cfg string) error {
log.Printf("[%s] Init, config=%s\n", PluginName, cfg)

var jitter uint64 = 10
// initialize state
m.rand = rand.New(rand.NewSource(time.Now().UnixNano()))

// The format of cfg is a json object with a single param
// "jitter", e.g. {"jitter": 10}
//
// Empty configs are allowed, in which case the default is
// used.
if cfg != "" && cfg != "{}" {
var obj map[string]uint64
err := json.Unmarshal([]byte(cfg), &obj)
m.config.setDefault()
if cfg != "" {
err := json.Unmarshal([]byte(cfg), &m.config)
if err != nil {
return err
}
if _, ok := obj["jitter"]; ok {
jitter = obj["jitter"]
}
}

m.jitter = jitter
m.rand = rand.New(rand.NewSource(time.Now().UnixNano()))

return nil
}

Expand Down Expand Up @@ -172,7 +175,7 @@ func (m *MyInstance) NextBatch(pState sdk.PluginState, evts sdk.EventWriters) (i
m.counter++

// Increment sample by 1, also add a jitter of [0:jitter]
m.sample += 1 + uint64(myPlugin.rand.Int63n(int64(myPlugin.jitter+1)))
m.sample += 1 + uint64(myPlugin.rand.Int63n(int64(myPlugin.config.Jitter+1)))

// The representation of a dummy event is the sample as a string.
str := strconv.Itoa(int(m.sample))
Expand Down

0 comments on commit ac88ed6

Please sign in to comment.