diff --git a/alerter/service.go b/alerter/service.go index 9eaba0487..8a4448f18 100644 --- a/alerter/service.go +++ b/alerter/service.go @@ -14,8 +14,6 @@ import ( "github.com/Azure/adx-mon/metrics" "github.com/Azure/adx-mon/pkg/logger" "github.com/Azure/azure-kusto-go/kusto" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" "sigs.k8s.io/controller-runtime/pkg/client" @@ -73,16 +71,6 @@ type KustoClient interface { Endpoint() string } -var ruleErrorCounter = promauto.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "l2m", - Subsystem: "engine", - Name: "errors", - Help: "Number of errors encountered in the primary execution engine", - }, - []string{"region"}, -) - func NewService(opts *AlerterOpts) (*Alerter, error) { ruleStore := rules.NewStore(rules.StoreOpts{ Region: opts.Region, diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 946c03c4d..daff1e2e8 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -11,7 +11,6 @@ import ( "regexp" "runtime" "sort" - "strings" "syscall" "time" @@ -712,20 +711,6 @@ func getInformer(kubeConfig string, nodeName string, informer *k8s.PodInformer) return k8s.NewPodInformer(client, nodeName), nil } -// parseKeyPairs parses a list of key pairs in the form of key=value,key=value -// and returns them as a map. -func parseKeyPairs(kp []string) (map[string]string, error) { - m := make(map[string]string) - for _, encoded := range kp { - split := strings.Split(encoded, "=") - if len(split) != 2 { - return nil, fmt.Errorf("invalid key-pair %s", encoded) - } - m[split[0]] = split[1] - } - return m, nil -} - func getMetricsExporters(exporterNames []string, exporters *config.Exporters, cache map[string]remote.RemoteWriteClient) ([]remote.RemoteWriteClient, error) { var remoteClients []remote.RemoteWriteClient for _, exporterName := range exporterNames { diff --git a/collector/scraper.go b/collector/scraper.go index 2ef5577fa..a9ea8eab9 100644 --- a/collector/scraper.go +++ b/collector/scraper.go @@ -604,5 +604,3 @@ func getTargetAnnotationMapOrDefault(p *v1.Pod, key string, defaultVal map[strin } return parsedMap } - -var adxmonNamespaceLabel = []byte("adxmon_namespace") diff --git a/ingestor/cluster/batcher_test.go b/ingestor/cluster/batcher_test.go index 2344a7cb7..afa8b1338 100644 --- a/ingestor/cluster/batcher_test.go +++ b/ingestor/cluster/batcher_test.go @@ -302,7 +302,7 @@ func TestBatcher_Stats(t *testing.T) { created, err := flakeutil.ParseFlakeID("2359cdac8d6f0001") require.NoError(t, err) - segments := []wal.SegmentInfo{} + var segments []wal.SegmentInfo idx := wal.NewIndex() f1 := wal.SegmentInfo{ diff --git a/ingestor/cluster/coordinator.go b/ingestor/cluster/coordinator.go index 458dea3c0..bab615225 100644 --- a/ingestor/cluster/coordinator.go +++ b/ingestor/cluster/coordinator.go @@ -346,11 +346,3 @@ func IsInitReady(pod *v1.Pod) bool { } return false } - -func reverse(b []byte) []byte { - x := make([]byte, len(b)) - for i := 0; i < len(b); i++ { - x[i] = b[len(b)-i-1] - } - return x -} diff --git a/metrics/metrics.go b/metrics/metrics.go index 0a2965e79..abec6dd7e 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -86,13 +86,6 @@ var ( Help: "Counter of the number of metrics uploaded to Kusto", }, []string{"database", "table"}) - LogsReceived = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: Namespace, - Subsystem: "ingestor", - Name: "logs_received_total", - Help: "Counter of the number of logs received", - }, []string{"database", "table"}) - LogsUploaded = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: Namespace, Subsystem: "ingestor", @@ -107,13 +100,6 @@ var ( Help: "Counter of the number of invalid logs dropped", }, []string{}) - ValidLogsDropped = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: Namespace, - Subsystem: "ingestor", - Name: "valid_logs_dropped", - Help: "Counter of the number of logs dropped due to ingestor errors", - }, []string{}) - SampleLatency = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: "ingestor", @@ -172,20 +158,6 @@ var ( Help: "Counter of the number of logs uploaded", }, []string{"database", "table"}) - LogsProxyFailures = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: Namespace, - Subsystem: "collector", - Name: "logs_failures", - Help: "Counter of the number of failures when proxying logs to the OTLP endpoints", - }, []string{"endpoint"}) - - LogsProxyPartialFailures = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: Namespace, - Subsystem: "collector", - Name: "logs_partial_failures", - Help: "Counter of the number of partial failures when proxying logs to the OTLP endpoints", - }, []string{"endpoint"}) - LogKeys = promauto.NewGaugeVec(prometheus.GaugeOpts{ Namespace: Namespace, Subsystem: "collector", diff --git a/pkg/otlp/logs.go b/pkg/otlp/logs.go index 350c2b951..541419731 100644 --- a/pkg/otlp/logs.go +++ b/pkg/otlp/logs.go @@ -2,14 +2,12 @@ package otlp import ( "log/slog" - "strconv" "time" v1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/collector/logs/v1" commonv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/common/v1" logsv1 "buf.build/gen/go/opentelemetry/opentelemetry/protocolbuffers/go/opentelemetry/proto/logs/v1" "github.com/Azure/adx-mon/metrics" - "github.com/Azure/adx-mon/pkg/tlv" ) // Logs is a collection of logs and their resources. @@ -24,20 +22,8 @@ type Logs struct { const ( dbKey = "kusto.database" tblKey = "kusto.table" - - LogsTotalTag = tlv.Tag(0xAB) ) -func EmitMetricsForTLV(tlvs []tlv.TLV, database, table string) { - for _, t := range tlvs { - if t.Tag == LogsTotalTag { - if v, err := strconv.Atoi(string(t.Value)); err == nil { - metrics.LogsUploaded.WithLabelValues(database, table).Add(float64(v)) - } - } - } -} - // Group logs into a collection of logs and their resources. func Group(req *v1.ExportLogsServiceRequest, add []*commonv1.KeyValue, log *slog.Logger) []*Logs { var grouped []*Logs diff --git a/pkg/prompb/protobuf_test.go b/pkg/prompb/protobuf_test.go index e6707515e..b0d82678b 100644 --- a/pkg/prompb/protobuf_test.go +++ b/pkg/prompb/protobuf_test.go @@ -1,7 +1,6 @@ package prompb import ( - "fmt" "testing" "github.com/stretchr/testify/require" @@ -130,14 +129,3 @@ func BenchmarkWriteRequestUnmarshal(b *testing.B) { require.NoError(b, wr.Unmarshal(buf)) } } - -func formatBytes(b []byte) string { - s := "" - for i, v := range b { - if i != 0 { - s += ", " - } - s += fmt.Sprint(v) - } - return s -} diff --git a/pkg/promremote/promremote_test.go b/pkg/promremote/promremote_test.go deleted file mode 100644 index b01ec65c7..000000000 --- a/pkg/promremote/promremote_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package promremote - -import ( - "context" - "testing" - "time" - - "github.com/Azure/adx-mon/pkg/prompb" - "github.com/Azure/adx-mon/pkg/remote" - "github.com/stretchr/testify/require" -) - -func TestSendBatchWithValidData(t *testing.T) { - client := &MockClient{} - proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 10, false) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := proxy.Open(ctx) - require.NoError(t, err) - defer proxy.Close() - - wr := &prompb.WriteRequest{ - Timeseries: []*prompb.TimeSeries{ - { - Labels: []*prompb.Label{ - { - Name: []byte("test"), Value: []byte("value"), - }, - }, - Samples: []*prompb.Sample{ - { - Timestamp: time.Now().Unix(), Value: 1.0}, - }, - }, - }, - } - - err = proxy.Write(ctx, wr) - require.NoError(t, err) -} - -func TestSendBatchWithEmptyBatch(t *testing.T) { - client := &MockClient{} - proxy := NewRemoteWriteProxy([]remote.RemoteWriteClient{client}, 1, false) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - err := proxy.Open(ctx) - require.NoError(t, err) - defer proxy.Close() - - wr := &prompb.WriteRequest{ - Timeseries: []*prompb.TimeSeries{ - { - Labels: []*prompb.Label{ - {Name: []byte("test"), Value: []byte("value")}, - }, - Samples: []*prompb.Sample{ - {Timestamp: time.Now().Unix(), Value: 1.0}, - }, - }, - { - Samples: []*prompb.Sample{ - {Timestamp: time.Now().Unix(), Value: 1.0}, - }, - }, - }, - } - - err = proxy.Write(ctx, wr) - require.NoError(t, err) - -} - -type MockClient struct{} - -func (m *MockClient) CloseIdleConnections() {} - -func (m *MockClient) Write(ctx context.Context, wr *prompb.WriteRequest) error { - return nil -} diff --git a/pkg/promremote/proxy.go b/pkg/promremote/proxy.go deleted file mode 100644 index f66a2852c..000000000 --- a/pkg/promremote/proxy.go +++ /dev/null @@ -1,167 +0,0 @@ -package promremote - -import ( - "bytes" - "context" - "fmt" - "sort" - "strings" - "time" - - "github.com/Azure/adx-mon/pkg/logger" - "github.com/Azure/adx-mon/pkg/prompb" - "github.com/Azure/adx-mon/pkg/remote" -) - -type RemoteWriteProxy struct { - clients []remote.RemoteWriteClient - maxBatchSize int - disableMetricsForwarding bool - - // queue is the channel where incoming writes are queued. These are arbitrary sized writes. - queue chan *prompb.WriteRequest - // ready is the channel where writes are batched up to maxBatchSize and ready to be sent to the remote write endpoint. - ready chan *prompb.WriteRequest - - cancelFn context.CancelFunc -} - -func NewRemoteWriteProxy(clients []remote.RemoteWriteClient, maxBatchSize int, disableMetricsForwarding bool) *RemoteWriteProxy { - p := &RemoteWriteProxy{ - clients: clients, - maxBatchSize: maxBatchSize, - disableMetricsForwarding: disableMetricsForwarding, - queue: make(chan *prompb.WriteRequest, 100), - ready: make(chan *prompb.WriteRequest, 5), - } - return p -} - -func (r *RemoteWriteProxy) Open(ctx context.Context) error { - ctx, cancelFn := context.WithCancel(context.Background()) - r.cancelFn = cancelFn - go r.flush(ctx) - for i := 0; i < cap(r.ready); i++ { - go r.sendBatch(ctx) - } - return nil -} - -func (r *RemoteWriteProxy) Close() error { - r.cancelFn() - return nil -} - -func (r *RemoteWriteProxy) Write(ctx context.Context, wr *prompb.WriteRequest) error { - if logger.IsDebug() { - var sb strings.Builder - for _, ts := range wr.Timeseries { - sb.Reset() - for i, l := range ts.Labels { - sb.Write(l.Name) - sb.WriteString("=") - sb.Write(l.Value) - if i < len(ts.Labels)-1 { - sb.Write([]byte(",")) - } - } - sb.Write([]byte(" ")) - for _, s := range ts.Samples { - logger.Debugf("%s %d %f", sb.String(), s.Timestamp, s.Value) - } - } - } - - if r.disableMetricsForwarding { - return nil - } - - select { - case <-ctx.Done(): - return ctx.Err() - case r.queue <- wr: - return nil - case <-time.After(15 * time.Second): - // If the channel is full, we will try to flush the batch - return fmt.Errorf("writes are throttled") - } -} - -func (c *RemoteWriteProxy) flush(ctx context.Context) { - pendingBatch := &prompb.WriteRequest{} - for { - - select { - case <-ctx.Done(): - return - case b := <-c.queue: - pendingBatch.Timeseries = append(pendingBatch.Timeseries, b.Timeseries...) - - // Flush as many full queue as we can - for len(pendingBatch.Timeseries) >= c.maxBatchSize { - nextBatch := prompb.WriteRequestPool.Get() - nextBatch.Timeseries = append(nextBatch.Timeseries, pendingBatch.Timeseries[:c.maxBatchSize]...) - pendingBatch.Timeseries = append(pendingBatch.Timeseries[:0], pendingBatch.Timeseries[c.maxBatchSize:]...) - c.ready <- nextBatch - } - case <-time.After(10 * time.Second): - for len(pendingBatch.Timeseries) >= c.maxBatchSize { - nextBatch := prompb.WriteRequestPool.Get() - nextBatch.Timeseries = append(nextBatch.Timeseries, pendingBatch.Timeseries[:c.maxBatchSize]...) - pendingBatch.Timeseries = append(pendingBatch.Timeseries[:0], pendingBatch.Timeseries[c.maxBatchSize:]...) - c.ready <- nextBatch - } - if len(pendingBatch.Timeseries) == 0 { - continue - } - nextBatch := prompb.WriteRequestPool.Get() - nextBatch.Timeseries = append(nextBatch.Timeseries, pendingBatch.Timeseries...) - pendingBatch.Timeseries = append(pendingBatch.Timeseries[:0]) - c.ready <- nextBatch - } - } -} - -func (p *RemoteWriteProxy) sendBatch(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case wr := <-p.ready: - // This sorts all the series by metric name which has the effect of reducing contention on segments - // in ingestor as the locks tend to be acquired in order of series name. Since there are usually - // multiple series with the same name, this can reduce contention. - sort.Slice(wr.Timeseries, func(i, j int) bool { - return bytes.Compare(prompb.MetricName(wr.Timeseries[i]), prompb.MetricName(wr.Timeseries[j])) < 0 - }) - - if len(p.clients) == 0 || logger.IsDebug() { - var sb strings.Builder - for _, ts := range wr.Timeseries { - sb.Reset() - for i, l := range ts.Labels { - sb.Write(l.Name) - sb.WriteString("=") - sb.Write(l.Value) - if i < len(ts.Labels)-1 { - sb.Write([]byte(",")) - } - } - sb.Write([]byte(" ")) - for _, s := range ts.Samples { - logger.Debugf("%s %d %f", sb.String(), s.Timestamp, s.Value) - } - - } - } - - start := time.Now() - err := remote.WriteRequest(ctx, p.clients, wr) - logger.Infof("Sending %d timeseries to %d endpoints duration=%s", len(wr.Timeseries), len(p.clients), time.Since(start)) - if err != nil { - logger.Errorf("Error sending batch: %v", err) - } - prompb.WriteRequestPool.Put(wr) - } - } -} diff --git a/pkg/tlv/tlv.go b/pkg/tlv/tlv.go deleted file mode 100644 index c28f4f87a..000000000 --- a/pkg/tlv/tlv.go +++ /dev/null @@ -1,324 +0,0 @@ -package tlv - -import ( - "bytes" - "encoding/binary" - "io" - - gbp "github.com/libp2p/go-buffer-pool" -) - -type TLV struct { - Tag Tag - Length uint32 - Value []byte -} - -type Tag uint16 - -var ( - marker = Tag(0xFEDA) -) - -const ( - sizeOfHeader = binary.MaxVarintLen16 /* T */ + binary.MaxVarintLen32 /* L */ + binary.MaxVarintLen32 /* V */ - - // PayloadTag is used to indicate the length of the payload. - PayloadTag = Tag(0xCB) -) - -func New(tag Tag, value []byte) *TLV { - - return &TLV{ - Tag: tag, - Length: uint32(len(value)), - Value: value, - } -} - -func (t *TLV) Encode() []byte { - b := make([]byte, binary.MaxVarintLen16+binary.MaxVarintLen32+t.Length) - binary.BigEndian.PutUint16(b[0:], uint16(t.Tag)) - binary.BigEndian.PutUint32(b[binary.MaxVarintLen16:], t.Length) - copy(b[binary.MaxVarintLen16+binary.MaxVarintLen32:], t.Value) - return b -} - -// Encode the TLVs by prefixing a TLV as a header that -// contains the number of TLVs contained within. -func Encode(tlvs ...*TLV) []byte { - var b bytes.Buffer - - for _, t := range tlvs { - b.Write(t.Encode()) - } - - // Header is TLV where V is a uint32 instead of a byte slice. - // T is a magic number 0x1 - // L is the number of TLVs - // V is the size in bytes of all the TLVs - v := gbp.Get(sizeOfHeader) - defer gbp.Put(v) - binary.BigEndian.PutUint16(v, uint16(marker)) - binary.BigEndian.PutUint32(v[binary.MaxVarintLen16:], uint32(b.Len())) // L - binary.BigEndian.PutUint32(v[binary.MaxVarintLen16+binary.MaxVarintLen32:], uint32(len(tlvs))) // V - - return append(v, b.Bytes()...) -} - -type ReaderOption func(*Reader) - -func WithPreserve(preserve bool) ReaderOption { - return func(r *Reader) { - r.preserve = preserve - } -} - -func WithBufferSize(size int) ReaderOption { - return func(r *Reader) { - r.bufferSize = size - } -} - -type Reader struct { - source io.Reader - header []TLV - buf []byte - - // underRunIndex is the index of the first byte - // in buf that marks the end of the last TLV bytes. - // - // This value is set when our buffer contains a TLV - // marker but does not have enough bytes to extract - // the full TLV sequence. - // Upon our next Read invocation, we'll reference the - // underRunIndex to continue filling the buffer from - // that point. - underRunIndex int - - // preserve indicates that we should not discard TLV - // read from the source. - preserve bool - - // bufferSize is the size of the buffer used for the - // lifetime of the Reader. - bufferSize int -} - -func NewReader(r io.Reader, opts ...ReaderOption) *Reader { - // Our buffer is necessary because we'll need to reslice - // the provided slice in order to remove TLVs. Since slices - // are references to an underlying array, any reslicing - // we do within the body of Read won't translate to the - // caller's slice. - tlvr := &Reader{source: r, bufferSize: 4096} - for _, opt := range opts { - opt(tlvr) - } - tlvr.buf = gbp.Get(tlvr.bufferSize) - return tlvr -} - -func (r *Reader) Header() []TLV { - return r.header -} - -func (r *Reader) Read(p []byte) (n int, err error) { - // Upon return, if we're at EOF, we'll return our buffer to the pool. - defer func() { - if err == io.EOF { - gbp.Put(r.buf) - } - }() - - // Read from our source into our buffer. If the parameter slice has less - // capacity than our own buffer, we'll use a limit reader so as to be capable - // of returning the entire slice. - if len(p) < r.bufferSize { - n, err = io.LimitReader(r.source, int64(len(p))).Read(r.buf[r.underRunIndex:]) - } else { - n, err = r.source.Read(r.buf[r.underRunIndex:]) - } - - if err == io.EOF { - // If there's anything remaining in our buffer, copy it over - if r.underRunIndex > 0 { - n = copy(p, r.buf[0:r.underRunIndex]) - } - return - } - - // Initialize our index state - var ( - head = 0 - tail = r.underRunIndex + n - - // `stop` is returned by `next` and denotes no additional buffer procssing - stop bool - // `markerHead` and `markerTail` is returned by `next` and denotes the indices - // where TLV has been found. - markerHead, markerTail int - // `dstIndex` is the index of the next byte to copy into the destination slice. - dstIndex int - ) - - // Reset our `underRunIndex` - r.underRunIndex = 0 - - // Also reset `n`, which will be updated as we copy bytes into `p` - n = 0 - - // process our buffer - for { - // Find our next TLV - stop, markerHead, markerTail = r.next(head, tail) - - // If `markerTail` != `tail` yet `stop` is true, we have - // insufficient space in our buffer to fully evaluate - // the presence of TLV. So we're going to move the remaining - // bytes to the beginning of our buffer and set `underRunIndex` - // to the end of our moved bytes such that the next time Read - // is called, we'll fill the remainder of our buffer starting - // at `underRunIndex`. - // - // tail - // │ - // ▼ - // ┌───────────────┬────────┐ - // │ │ │ - // └───────────────┴────────┘ - // ▲ - // │ - // markerTail - // - // ┌──────┬─────────────────┐ - // │ │ │ - // └──────┴─────────────────┘ - // ▲ - // │ - // underRunIndex - if stop && markerTail != tail { - // An edge case is where we have TLV but not enough buffer to finish - // reading the full TLV context, but we've not advanced the head index. - // Fortunately, we already return the starting point for where we've - // found the TLV marker, so we can set the head index to that value. - if head == 0 { - head = markerHead - } - r.underRunIndex = copy(r.buf[0:], r.buf[head:tail]) - break - } - - if !r.preserve { - // If we don't want to preserve TLV - // ┌───────────────────────────┐ - // └───────────────────────────┘ - // ▲ ▲ ▲ - // │ │ │ - // d mh mt - // - // We want to copy into `p` from `dstIndex` to `markerHead` - if markerHead == -1 { - // If we didn't find TLV in the remaining bytes of our buffer, - // we'll copy the remaining bytes into `p` and return. - markerHead = tail - } - dstIndex += copy(p[dstIndex:], r.buf[head:markerHead]) - n = dstIndex - } else { - // If we want to preserve TLV - // ┌───────────────────────────┐ - // └───────────────────────────┘ - // ▲ ▲ ▲ - // │ │ │ - // d mh mt - // - // We want to copy into `p` from `dstIndex` to `markerTail` - dstIndex += copy(p[dstIndex:], r.buf[head:markerTail]) - n = dstIndex - } - - // Advance our index state - // - // head - // │ - // ▼ - // ┌───────────────────────────┐ - // └───────────────────────────┘ - // ▲ ▲ ▲ - // │ │ │ - // d mh mt - // - head = markerTail - - if stop { - break - } - } - - return -} - -func (r *Reader) next(start, end int) (stop bool, markerHead, markerTail int) { - // If our buffer has insufficient remaining bytes to read a TLV header, - // we'll set underRunIndex so we can continue filling the buffer from - // that point on the next call to Read. - if start+sizeOfHeader > end { - stop = true - return - } - - // Find our marker - markerHead = bytes.Index(r.buf[start:end], []byte{0xFE, 0xDA}) - stop = markerHead == -1 - if stop { - // Advance `markerTail` to the end of our buffer, no more TLV - markerTail = end - return - } - - // Since we're indexing into the buffer from `start`, we need to - // advance `markerHead` by `start` to get the actual index of the - // marker in the buffer. - // - // markerHead [5] - // │ - // ▼ - // ┌─────────────────────────────┐ - // └─────────────────────────────┘ - // ▲ - // │ - // start [10] - markerHead += start - - // Read the header - markerTail = markerHead + binary.MaxVarintLen16 - sizeOfElements := binary.BigEndian.Uint32(r.buf[markerTail:]) - markerTail += binary.MaxVarintLen32 - elements := int(binary.BigEndian.Uint32(r.buf[markerTail:])) - markerTail += binary.MaxVarintLen32 - - // At this point we have a TLV header. If there is insufficient - // space in the buffer to extract the full TLV, we'll set underRunIndex - // so we can continue filling the buffer from that point on the next - // call to Read. - stop = markerTail+int(sizeOfElements) > end - if stop { - return - } - - // We have a TLV header and enough bytes in our buffer to extract - // the full TLV. - for i := 0; i < elements; i++ { - t := TLV{} - t.Tag = Tag(binary.BigEndian.Uint16(r.buf[markerTail:])) - markerTail += binary.MaxVarintLen16 - t.Length = binary.BigEndian.Uint32(r.buf[markerTail:]) - markerTail += binary.MaxVarintLen32 - t.Value = append(t.Value, r.buf[markerTail:markerTail+int(t.Length)]...) - markerTail += int(t.Length) - r.header = append(r.header, t) - } - - return -} diff --git a/pkg/tlv/tlv_test.go b/pkg/tlv/tlv_test.go deleted file mode 100644 index 24996b023..000000000 --- a/pkg/tlv/tlv_test.go +++ /dev/null @@ -1,281 +0,0 @@ -package tlv_test - -import ( - "bytes" - "io" - "os" - "path/filepath" - "strconv" - "testing" - - "github.com/Azure/adx-mon/pkg/tlv" - "github.com/stretchr/testify/require" -) - -func TestTLVAtHead(t *testing.T) { - tests := []struct { - Preserve bool - }{ - { - Preserve: true, - }, - { - Preserve: false, - }, - } - for _, tt := range tests { - t.Run(strconv.FormatBool(tt.Preserve), func(t *testing.T) { - // Create a couple TLVs - t1 := tlv.New(tlv.Tag(0x1), []byte("Tag1Value")) - t2 := tlv.New(tlv.Tag(0x2), []byte("Tag2Value")) - ts := []*tlv.TLV{t1, t2} - encoded := tlv.Encode(ts...) - - // Create a payload - payload := []byte("foo bar baz") - - // Our source stream - r := bytes.NewReader(append(encoded, payload...)) - - // Create a TLV reader - tr := tlv.NewReader(r, tlv.WithPreserve(tt.Preserve)) - - // Consume our stream - var w bytes.Buffer - n, err := io.Copy(&w, tr) - require.NoError(t, err) - - if tt.Preserve { - require.Equal(t, len(payload)+len(encoded), int(n)) - } else { - require.Equal(t, len(payload), int(n)) - } - - // Test header - for i, h := range tr.Header() { - require.Equal(t, ts[i].Tag, h.Tag) - require.Equal(t, ts[i].Length, h.Length) - require.Equal(t, ts[i].Value, h.Value) - } - - // Ensure payload is intact - if tt.Preserve { - require.Equal(t, append(encoded, payload...), w.Bytes()) - } else { - require.Equal(t, payload, w.Bytes()) - } - }) - } -} - -func TestTLVInPayload(t *testing.T) { - tests := []struct { - Preserve bool - }{ - { - Preserve: true, - }, - { - Preserve: false, - }, - } - for _, tt := range tests { - t.Run(strconv.FormatBool(tt.Preserve), func(t *testing.T) { - // We want to test the case where TLV is present in - // the stream but not at the head of the stream. - // | Payload | TLV | Payload | - - // Create a TLV - t1 := tlv.New(tlv.Tag(0x1), []byte("Tag1Value")) - encoded := tlv.Encode(t1) - - // Create a payload - payload1 := []byte("foo bar baz") - payload2 := []byte("baz bar foo") - - // Our source stream - streamBytes := append(payload1, encoded...) - streamBytes = append(streamBytes, payload2...) - r := bytes.NewReader(streamBytes) - - // Create a TLV reader - tr := tlv.NewReader(r, tlv.WithPreserve(tt.Preserve)) - - // Consume our stream - var w bytes.Buffer - n, err := io.Copy(&w, tr) - require.NoError(t, err) - - if tt.Preserve { - require.Equal(t, len(streamBytes), int(n)) - } else { - require.Equal(t, len(payload1)+len(payload2), int(n)) - } - - // Test header - h := tr.Header() - require.Equal(t, 1, len(h)) - require.Equal(t, t1.Tag, h[0].Tag) - require.Equal(t, t1.Length, h[0].Length) - require.Equal(t, t1.Value, h[0].Value) - - // Ensure payload is intact - if tt.Preserve { - require.Equal(t, streamBytes, w.Bytes()) - } else { - require.Equal(t, append(payload1, payload2...), w.Bytes()) - } - }) - } -} - -func TestMultiReadCalls(t *testing.T) { - tests := []struct { - Preserve bool - }{ - { - Preserve: true, - }, - { - Preserve: false, - }, - } - for _, tt := range tests { - t.Run(strconv.FormatBool(tt.Preserve), func(t *testing.T) { - // We want to test the case where Read is called multiple times. - // This is going to test our most common case, where we have a - // stream that contains checksums at the head and TLVs - // scattered throughout the stream. - // | Checksum | TLV | Payload | TLV | Payload - - checksum := []byte("checksum") - payload1 := bytes.Repeat([]byte("payload1"), 100) - t1 := tlv.New(tlv.Tag(0x1), []byte("Tag1Value")) - tp1 := tlv.New(tlv.PayloadTag, []byte(strconv.Itoa(len(payload1)))) - - payload2 := bytes.Repeat([]byte("payload2"), 127) - t2 := tlv.New(tlv.Tag(0x1), []byte("Tag2Value")) - tp2 := tlv.New(tlv.PayloadTag, []byte(strconv.Itoa(len(payload2)))) - - // Our source stream - streamBytes := append(checksum, tlv.Encode(t1, tp1)...) - streamBytes = append(streamBytes, payload1...) - - streamBytes = append(streamBytes, tlv.Encode(t2, tp2)...) - streamBytes = append(streamBytes, payload2...) - - r := bytes.NewReader(streamBytes) - - // Create a TLV reader - tr := tlv.NewReader(r, tlv.WithPreserve(tt.Preserve), tlv.WithBufferSize(128)) - - // Consume our stream - var w bytes.Buffer - n, err := io.Copy(&w, tr) - require.NoError(t, err) - - if tt.Preserve { - require.Equal(t, len(streamBytes), int(n)) - } else { - require.Equal(t, len(checksum)+len(payload1)+len(payload2), int(n)) - } - - h := tr.Header() - require.Equal(t, 4, len(h)) - tltvs := []*tlv.TLV{t1, tp1, t2, tp2} - for i, hh := range h { - require.Equal(t, tltvs[i].Tag, hh.Tag) - require.Equal(t, tltvs[i].Length, hh.Length) - require.Equal(t, tltvs[i].Value, hh.Value) - } - - }) - } -} - -func BenchmarkReader(b *testing.B) { - dir := b.TempDir() - fn := filepath.Join(dir, "test") - b.Logf("writing to %s", fn) - - f, err := os.Create(fn) - require.NoError(b, err) - for i := 0; i < 10; i++ { - t := tlv.New(tlv.Tag(0xAB), []byte("some tag payload")) - payload := bytes.Repeat([]byte("payload"), 100) - f.Write(tlv.Encode(t)) - f.Write(payload) - } - require.NoError(b, f.Close()) - - b.ReportAllocs() - b.ResetTimer() - for i := 0; i < b.N; i++ { - f, err := os.Open(fn) - require.NoError(b, err) - - r := tlv.NewReader(f, tlv.WithBufferSize(128)) - - io.Copy(io.Discard, r) - - require.NoError(b, f.Close()) - } -} - -var otlpLog = []byte(`{ - "resourceLogs": [ - { - "resource": { - "attributes": [ - { - "key": "source", - "value": { - "stringValue": "hostname" - } - } - ], - "droppedAttributesCount": 1 - }, - "scopeLogs": [ - { - "scope": { - "name": "name", - "version": "version", - "droppedAttributesCount": 1 - }, - "logRecords": [ - { - "timeUnixNano": "1669112524001", - "observedTimeUnixNano": "1669112524001", - "severityNumber": 17, - "severityText": "Error", - "body": { - "stringValue": "{\"msg\":\"something happened\n\"}" - }, - "attributes": [ - { - "key": "kusto.table", - "value": { - "stringValue": "ATable" - } - }, - { - "key": "kusto.database", - "value": { - "stringValue": "ADatabase" - } - } - ], - "droppedAttributesCount": 1, - "flags": 1, - "traceId": "", - "spanId": "" - } - ], - "schemaUrl": "scope_schema" - } - ], - "schemaUrl": "resource_schema" - } - ] -}`) diff --git a/storage/store.go b/storage/store.go index 161f8222e..93e7f480e 100644 --- a/storage/store.go +++ b/storage/store.go @@ -36,10 +36,6 @@ var ( metricsCSVWriterPool = pool.NewGeneric(1000, func(sz int) interface{} { return transform2.NewMetricsCSVWriter(bytes.NewBuffer(make([]byte, 0, sz)), nil) }) - - bytesBufPool = pool.NewGeneric(1000, func(sz int) interface{} { - return bytes.NewBuffer(make([]byte, 0, sz)) - }) ) type Store interface {