From ac88ed63eb78c335ea39ea48a21844edbe64e598 Mon Sep 17 00:00:00 2001 From: Jason Dellaluce Date: Wed, 12 Jan 2022 10:55:22 +0000 Subject: [PATCH] refactor(plugins): read configuration uniformly across Go plugins Signed-off-by: Jason Dellaluce --- plugins/cloudtrail/cloudtrail.go | 59 +++++++++++++------------------- plugins/cloudtrail/source.go | 6 ++-- plugins/dummy/dummy.go | 31 +++++++++-------- 3 files changed, 44 insertions(+), 52 deletions(-) diff --git a/plugins/cloudtrail/cloudtrail.go b/plugins/cloudtrail/cloudtrail.go index 07724ff8..ef3d35da 100644 --- a/plugins/cloudtrail/cloudtrail.go +++ b/plugins/cloudtrail/cloudtrail.go @@ -58,7 +58,6 @@ const ( PluginEventSource = "aws_cloudtrail" ) -const defaultS3DownloadConcurrency = 1 const verbose bool = true func min(a, b int) int { @@ -91,22 +90,20 @@ type snsMessage struct { Keys []string `json:"s3ObjectKey"` } -// This is the global plugin state, identifying an instance of this plugin -type pluginContext struct { - plugins.BasePlugin - jparser fastjson.Parser - jdata *fastjson.Value - jdataEvtnum uint64 // The event number jdata refers to. Used to know when we can skip the unmarshaling. - sqsDelete bool // If true, will delete SQS Messages immediately after receiving them - s3DownloadConcurrency int - useAsync bool -} - // Struct for plugin init config type pluginInitConfig struct { S3DownloadConcurrency int `json:"s3DownloadConcurrency"` - SQSDelete bool `json:"sqsDelete"` - UseAsync bool `json:"useAsync"` + SQSDelete bool `json:"sqsDelete"` // If true, will delete SQS Messages immediately after receiving them + UseAsync bool `json:"useAsync"` // If false, async extraction optimization is disabled +} + +// This is the global plugin state, identifying an instance of this plugin +type pluginContext struct { + plugins.BasePlugin + jparser fastjson.Parser + jdata *fastjson.Value + jdataEvtnum uint64 // The event number jdata refers to. Used to know when we can skip the unmarshaling. + config pluginInitConfig } type OpenMode int @@ -140,6 +137,12 @@ func init() { extractor.Register(p) } +func (p *pluginInitConfig) setDefault() { + p.SQSDelete = true + p.S3DownloadConcurrency = 1 + p.UseAsync = true +} + func (p *pluginContext) Info() *plugins.Info { log.Printf("[%s] Info\n", PluginName) return &plugins.Info{ @@ -161,40 +164,26 @@ func (p *pluginContext) Init(cfg string) error { log.Printf("[%s] Init, config=%s\n", PluginName, cfg) + // initialize state p.jdataEvtnum = math.MaxUint64 - p.sqsDelete = true - p.s3DownloadConcurrency = defaultS3DownloadConcurrency - p.useAsync = false + // set config default values and read the passed one, if available + p.config.setDefault() if cfg != "" { - var initConfig pluginInitConfig - initConfig.SQSDelete = true - initConfig.S3DownloadConcurrency = defaultS3DownloadConcurrency - initConfig.UseAsync = false - - err := json.Unmarshal([]byte(cfg), &initConfig) + err := json.Unmarshal([]byte(cfg), &p.config) if err != nil { return err } - - p.sqsDelete = initConfig.SQSDelete - p.s3DownloadConcurrency = initConfig.S3DownloadConcurrency - p.useAsync = initConfig.UseAsync } - if p.useAsync { - extract.StartAsync(p) - } + // enable/disable async extraction optimazion (enabled by default) + extract.SetAsync(p.config.UseAsync) return nil } func (p *pluginContext) Destroy() { log.Printf("[%s] Destroy\n", PluginName) - - if p.useAsync { - extract.StopAsync(p) - } } func (p *pluginContext) Open(params string) (source.Instance, error) { @@ -219,7 +208,7 @@ func (p *pluginContext) Open(params string) (source.Instance, error) { // Create an array of download buffers that will be used to concurrently // download files from s3 - oCtx.s3.DownloadBufs = make([][]byte, p.s3DownloadConcurrency) + oCtx.s3.DownloadBufs = make([][]byte, p.config.S3DownloadConcurrency) return oCtx, nil } diff --git a/plugins/cloudtrail/source.go b/plugins/cloudtrail/source.go index b114353e..25924e55 100644 --- a/plugins/cloudtrail/source.go +++ b/plugins/cloudtrail/source.go @@ -146,7 +146,7 @@ func getMoreSQSFiles(pCtx *pluginContext, oCtx *openContext) error { return nil } - if pCtx.sqsDelete { + if pCtx.config.SQSDelete { // Delete the message from the queue so it won't be read again delInput := &sqs.DeleteMessageInput{ QueueUrl: &oCtx.queueURL, @@ -256,9 +256,9 @@ func readNextFileS3(pCtx *pluginContext, oCtx *openContext) ([]byte, error) { return oCtx.s3.DownloadBufs[curBuf], nil } - dlErrChan = make(chan error, pCtx.s3DownloadConcurrency) + dlErrChan = make(chan error, pCtx.config.S3DownloadConcurrency) k := oCtx.s3.lastDownloadedFileNum - oCtx.s3.nFilledBufs = min(pCtx.s3DownloadConcurrency, len(oCtx.files)-k) + oCtx.s3.nFilledBufs = min(pCtx.config.S3DownloadConcurrency, len(oCtx.files)-k) for j, f := range oCtx.files[k : k+oCtx.s3.nFilledBufs] { oCtx.s3.DownloadWg.Add(1) go s3Download(oCtx, oCtx.s3.downloader, f.name, j) diff --git a/plugins/dummy/dummy.go b/plugins/dummy/dummy.go index e4e85132..1cf94902 100644 --- a/plugins/dummy/dummy.go +++ b/plugins/dummy/dummy.go @@ -45,15 +45,19 @@ const ( /////////////////////////////////////////////////////////////////////////////// -type MyPlugin struct { - plugins.BasePlugin +type MyPluginConfig struct { // This reflects potential internal state for the plugin. In // this case, the plugin is configured with a jitter (e.g. a // random amount to add to the sample with each call to Next() - jitter uint64 + Jitter uint64 `json:"jitter"` +} +type MyPlugin struct { + plugins.BasePlugin // Will be used to randomize samples rand *rand.Rand + // Contains the init configuration values + config MyPluginConfig } type MyInstance struct { @@ -79,6 +83,10 @@ func init() { extractor.Register(p) } +func (p *MyPluginConfig) setDefault() { + p.Jitter = 10 +} + func (m *MyPlugin) Info() *plugins.Info { log.Printf("[%s] Info\n", PluginName) return &plugins.Info{ @@ -95,27 +103,22 @@ func (m *MyPlugin) Info() *plugins.Info { func (m *MyPlugin) Init(cfg string) error { log.Printf("[%s] Init, config=%s\n", PluginName, cfg) - var jitter uint64 = 10 + // initialize state + m.rand = rand.New(rand.NewSource(time.Now().UnixNano())) // The format of cfg is a json object with a single param // "jitter", e.g. {"jitter": 10} // // Empty configs are allowed, in which case the default is // used. - if cfg != "" && cfg != "{}" { - var obj map[string]uint64 - err := json.Unmarshal([]byte(cfg), &obj) + m.config.setDefault() + if cfg != "" { + err := json.Unmarshal([]byte(cfg), &m.config) if err != nil { return err } - if _, ok := obj["jitter"]; ok { - jitter = obj["jitter"] - } } - m.jitter = jitter - m.rand = rand.New(rand.NewSource(time.Now().UnixNano())) - return nil } @@ -172,7 +175,7 @@ func (m *MyInstance) NextBatch(pState sdk.PluginState, evts sdk.EventWriters) (i m.counter++ // Increment sample by 1, also add a jitter of [0:jitter] - m.sample += 1 + uint64(myPlugin.rand.Int63n(int64(myPlugin.jitter+1))) + m.sample += 1 + uint64(myPlugin.rand.Int63n(int64(myPlugin.config.Jitter+1))) // The representation of a dummy event is the sample as a string. str := strconv.Itoa(int(m.sample))