Skip to content

Commit

Permalink
Add the telemetry/stats job (masa-finance/issues#216)
Browse files Browse the repository at this point in the history
  • Loading branch information
mcamou committed Jan 29, 2025
1 parent 8c39d12 commit 61d2af1
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 31 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ The tee-worker currently supports 3 job types:

`returned_profiles` - Number of profiles returned to clients.

`tweet_errors` - Number of errors while scraping tweets.
`returned_other` - Number of other records returned to clients (e.g. media, spaces or trending topics).

`tweet_errors` - Number of errors while scraping tweets (excluding authentication and rate-limiting).

`twitter_ratelimit_errors` - Number of Twitter rate-limiting errors.

`twitter_auth_errors` - Number of Twitter authentication errors.

`web_success` - Number of successful web scrapes.

Expand Down
75 changes: 75 additions & 0 deletions internal/jobs/stats/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package stats

import (
"encoding/json"
"time"

"github.com/sirupsen/logrus"
)

type statType string

const (
TweetScrapes statType = "tweet_scrapes"
ReturnedTweets statType = "returned_tweets"
ReturnedProfiles statType = "returned_profiles"
ReturnedOther statType = "returned_other"
TweetErrors statType = "tweet_errors"
TwitterAuthErrors statType = "twitter_auth_errors"
TwitterRateLimitErrors statType = "twitter_ratelimit_errors"
WebErrors statType = "web_errors"
WebSuccess statType = "web_success"
// TODO Should we add stats for calls to each of the Twitter job types?
)

type AddStat struct {
Type statType
Num uint
}

type stats struct {
BootTimeUnix int64 `json:"boot_time"`
LastOperationUnix int64 `json:"last_operation_time"`
CurrentTimeUnix int64 `json:"current_time"`
Stats map[statType]uint `json:"stats"`
}

type StatsCollector struct {
stats *stats
Chan chan AddStat
}

func StartCollector() *StatsCollector {
logrus.Info("Starting stats collector")

s := stats{
BootTimeUnix: time.Now().Unix(),
Stats: make(map[statType]uint),
}

ch := make(chan AddStat, 64)

go func(s *stats, ch chan AddStat) {
for {
stat := <-ch
s.LastOperationUnix = time.Now().Unix()
if _, ok := s.Stats[stat.Type]; ok {
s.Stats[stat.Type] += stat.Num
} else {
s.Stats[stat.Type] = stat.Num
}
logrus.Debugf("Added %d to stat %s. Current stats: %#v", stat.Num, stat.Type, s)
}
}(&s, ch)

return &StatsCollector{stats: &s, Chan: ch}
}

func (s StatsCollector) Json() ([]byte, error) {
s.stats.CurrentTimeUnix = time.Now().Unix()
return json.Marshal(s.stats)
}

func (s StatsCollector) AddStat(typ statType, num uint) {
s.Chan <- AddStat{Type: typ, Num: num}
}
34 changes: 34 additions & 0 deletions internal/jobs/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package jobs

import (
"github.com/masa-finance/tee-worker/api/types"
"github.com/masa-finance/tee-worker/internal/jobs/stats"
"github.com/sirupsen/logrus"
)

const TelemetryJobType = "telemetry"

type TelemetryJob struct {
collector *stats.StatsCollector
}

func NewTelemetryJob(jc types.JobConfiguration, c *stats.StatsCollector) TelemetryJob {
return TelemetryJob{collector: c}
}

func (t TelemetryJob) ExecuteJob(j types.Job) (types.JobResult, error) {
logrus.Debug("Executing telemetry job")

data, err := t.collector.Json()
if err != nil {
return types.JobResult{
Error: err.Error(),
Job: j,
}, err
}

return types.JobResult{
Data: data,
Job: j,
}, nil
}
Loading

0 comments on commit 61d2af1

Please sign in to comment.