Skip to content

Commit

Permalink
Refactor connection closing logic in limitListenerConn and add tests …
Browse files Browse the repository at this point in the history
…for connection limits (#553)

If Close is invoked for a LimitListener more than once, the metric tracking the total number of active connections can fall below zero and since this metric is used by Ingestor's health check, Ingestor erroneously thinks it's unhealthy and our readiness probe begins to fail, which in turn is causing our integration tests to fail.
  • Loading branch information
jessejlt authored Jan 30, 2025
1 parent 864ea6a commit 57ae38e
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pkg/limiter/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ type limitListenerConn struct {

func (l *limitListenerConn) Close() error {
err := l.Conn.Close()
l.releaseOnce.Do(l.release)
metrics.IngestorActiveConnections.Dec()
l.releaseOnce.Do(func() {
l.release()
metrics.IngestorActiveConnections.Dec()
})
return err
}
65 changes: 65 additions & 0 deletions pkg/limiter/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package limiter

import (
"net"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)

func TestLimitListener_ExcessConnectionIsDropped(t *testing.T) {
originalCount := getActiveConnections(t)

// Create a simple listener that always returns the same connection
n := &net.TCPConn{}
t.Cleanup(func() { n.Close() })
fakeL := &fakeConnListener{conn: n}

// Limit to 1 connection
ll := LimitListener(fakeL, 1)

conn, err := ll.Accept()
require.NoError(t, err)

// Duplicate close should not cause our counter to drop below zero
conn.Close()
conn.Close()

// Validate counter should return to original count
require.Equal(t, getActiveConnections(t), originalCount)
}

func getActiveConnections(t *testing.T) float64 {
t.Helper()

mets, err := prometheus.DefaultGatherer.Gather()
require.NoError(t, err)

var activeConns float64
for _, v := range mets {
switch *v.Type {
case io_prometheus_client.MetricType_GAUGE:
for _, vv := range v.Metric {
if strings.Contains(v.GetName(), "ingestor_active_connections") {
activeConns += vv.Gauge.GetValue()
}
}
}
}

return activeConns
}

type fakeConnListener struct {
conn net.Conn
}

func (f *fakeConnListener) Accept() (net.Conn, error) {
return f.conn, nil
}

func (f *fakeConnListener) Close() error { return nil }
func (f *fakeConnListener) Addr() net.Addr { return nil }

0 comments on commit 57ae38e

Please sign in to comment.