-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Hello once again, I've fixed all the problems in #92 Also, I've refactored check function a bit, to perform mock tests. --------- Signed-off-by: Konstantin Konov <80425051+knvk@users.noreply.github.com>
- Loading branch information
Showing
8 changed files
with
391 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package ntpq | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"time" | ||
|
||
"github.com/beevik/ntp" | ||
"github.com/pkg/errors" | ||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/runityru/anycastd/checkers" | ||
) | ||
|
||
var ( | ||
_ checkers.Checker = (*ntpq)(nil) | ||
|
||
ntpOffset = prometheus.NewGaugeVec( | ||
prometheus.GaugeOpts{ | ||
Namespace: "anycastd", | ||
Name: "check_last_ntp_offset_ms", | ||
Help: "The estimated offset of the local system clock relative to the server's clock", | ||
}, | ||
[]string{"check", "host"}, | ||
) | ||
|
||
ntpRtt = prometheus.NewGaugeVec( | ||
prometheus.GaugeOpts{ | ||
Namespace: "anycastd", | ||
Name: "check_last_ntp_rtt_ms", | ||
Help: "An estimate of the round-trip-time delay between the client and the server", | ||
}, | ||
[]string{"check", "host"}, | ||
) | ||
|
||
ntpPacketsSent = prometheus.NewCounterVec( | ||
prometheus.CounterOpts{ | ||
Namespace: "anycastd", | ||
Name: "check_ntp_packets_sent_total", | ||
Help: "Total amount of ntp packets sent", | ||
}, | ||
[]string{"check", "host"}, | ||
) | ||
|
||
ErrOffset = errors.New("Offset is too big") | ||
) | ||
|
||
type ntpq struct { | ||
server string | ||
srcAddr string | ||
tries uint8 | ||
offsetThreshold time.Duration | ||
interval time.Duration | ||
timeout time.Duration | ||
|
||
queryFn func(string, ntp.QueryOptions) (*ntp.Response, error) | ||
} | ||
|
||
const checkName = "ntpq" | ||
|
||
func init() { | ||
checkers.MustRegister(checkName, NewFromSpec) | ||
|
||
prometheus.MustRegister(ntpOffset) | ||
prometheus.MustRegister(ntpPacketsSent) | ||
prometheus.MustRegister(ntpRtt) | ||
} | ||
|
||
func New(s spec) (checkers.Checker, error) { | ||
if err := s.Validate(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return &ntpq{ | ||
server: s.Server, | ||
srcAddr: s.SrcAddr, | ||
tries: s.Tries, | ||
offsetThreshold: s.OffsetThreshold.TimeDuration(), | ||
interval: s.Interval.TimeDuration(), | ||
timeout: s.Timeout.TimeDuration(), | ||
queryFn: ntp.QueryWithOptions, | ||
}, nil | ||
} | ||
|
||
func NewFromSpec(in json.RawMessage) (checkers.Checker, error) { | ||
s := spec{} | ||
if err := json.Unmarshal(in, &s); err != nil { | ||
return nil, err | ||
} | ||
|
||
return New(s) | ||
} | ||
|
||
func (h *ntpq) Kind() string { | ||
return checkName | ||
} | ||
|
||
func (d *ntpq) Check(ctx context.Context) error { | ||
var lastErr error | ||
for i := 0; i < int(d.tries); i++ { | ||
log.WithFields(log.Fields{ | ||
"check": checkName, | ||
"attempt": i + 1, | ||
}).Tracef("running check") | ||
|
||
if err := d.check(ctx); err != nil { | ||
lastErr = err | ||
log.WithFields(log.Fields{ | ||
"check": checkName, | ||
"attempt": i + 1, | ||
}).Infof("error received: %s", err) | ||
} else { | ||
return nil | ||
} | ||
|
||
time.Sleep(d.interval) | ||
} | ||
|
||
if lastErr != nil { | ||
return errors.Errorf( | ||
"check failed: %d tries with %s interval; last error: `%s`", | ||
d.tries, d.interval, lastErr.Error(), | ||
) | ||
} | ||
return nil | ||
} | ||
|
||
func (d *ntpq) check(_ context.Context) error { | ||
// defaut timeout is 5s | ||
options := ntp.QueryOptions{LocalAddress: d.srcAddr, Timeout: d.timeout} | ||
response, err := d.queryFn(d.server, options) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
log.WithFields(log.Fields{ | ||
"check": checkName, | ||
}).Tracef("Offset: %d, RTT: %d, RefID: %d", response.ClockOffset.Milliseconds(), response.RTT.Milliseconds(), response.ReferenceID) | ||
// since beevik/ntp doesn't do retries by itself we increment just by 1 | ||
ntpPacketsSent.WithLabelValues(checkName, d.server).Add(float64(1)) | ||
ntpOffset.WithLabelValues(checkName, d.server).Set(float64(response.ClockOffset.Milliseconds())) | ||
ntpRtt.WithLabelValues(checkName, d.server).Set(float64(response.RTT.Milliseconds())) | ||
|
||
if response.ClockOffset.Abs() > d.offsetThreshold { | ||
return ErrOffset | ||
} | ||
|
||
return nil | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
package ntpq | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/beevik/ntp" | ||
"github.com/stretchr/testify/mock" | ||
"github.com/stretchr/testify/suite" | ||
th "github.com/teran/go-time" | ||
) | ||
|
||
func (s *checkTestSuite) TestOffsetTooBig() { | ||
l, err := New(spec{ | ||
Server: "pool.ntp.org", | ||
SrcAddr: "192.168.0.1", | ||
Tries: 3, | ||
OffsetThreshold: th.Duration(125 * time.Millisecond), | ||
Interval: th.Duration(2 * time.Second), | ||
Timeout: th.Duration(5 * time.Second), | ||
}) | ||
s.Require().NoError(err) | ||
|
||
c := l.(*ntpq) | ||
c.queryFn = s.ntpM.queryMock | ||
|
||
s.ntpM.On("queryMock", "pool.ntp.org", ntp.QueryOptions{ | ||
LocalAddress: "192.168.0.1", | ||
Timeout: (5 * time.Second), | ||
}).Return(&ntp.Response{ | ||
RTT: time.Duration(15 * time.Millisecond), | ||
ClockOffset: time.Duration(150 * time.Millisecond), | ||
ReferenceID: 1, | ||
}, nil).Times(int(c.tries)) | ||
|
||
err = c.Check(context.Background()) | ||
s.Require().Error(err) | ||
} | ||
|
||
func (s *checkTestSuite) TestOffset() { | ||
l, err := New(spec{ | ||
Server: "pool.ntp.org", | ||
SrcAddr: "192.168.0.1", | ||
Tries: 3, | ||
OffsetThreshold: th.Duration(125 * time.Millisecond), | ||
Interval: th.Duration(2 * time.Second), | ||
Timeout: th.Duration(5 * time.Second), | ||
}) | ||
s.Require().NoError(err) | ||
|
||
c := l.(*ntpq) | ||
c.queryFn = s.ntpM.queryMock | ||
|
||
s.ntpM.On("queryMock", "pool.ntp.org", ntp.QueryOptions{ | ||
LocalAddress: "192.168.0.1", | ||
Timeout: (5 * time.Second), | ||
}).Return(&ntp.Response{ | ||
RTT: time.Duration(15 * time.Millisecond), | ||
ClockOffset: time.Duration(10 * time.Millisecond), | ||
ReferenceID: 1, | ||
}, nil).Times(int(c.tries)) | ||
|
||
err = c.Check(context.Background()) | ||
s.Require().NoError(err) | ||
} | ||
|
||
func (s *checkTestSuite) TestNegativeOffset() { | ||
l, err := New(spec{ | ||
Server: "pool.ntp.org", | ||
SrcAddr: "192.168.0.1", | ||
Tries: 3, | ||
OffsetThreshold: th.Duration(125 * time.Millisecond), | ||
Interval: th.Duration(2 * time.Second), | ||
Timeout: th.Duration(5 * time.Second), | ||
}) | ||
s.Require().NoError(err) | ||
|
||
c := l.(*ntpq) | ||
c.queryFn = s.ntpM.queryMock | ||
|
||
s.ntpM.On("queryMock", "pool.ntp.org", ntp.QueryOptions{ | ||
LocalAddress: "192.168.0.1", | ||
Timeout: (5 * time.Second), | ||
}).Return(&ntp.Response{ | ||
RTT: time.Duration(15 * time.Millisecond), | ||
ClockOffset: time.Duration(-10 * time.Millisecond), | ||
ReferenceID: 1, | ||
}, nil).Times(int(c.tries)) | ||
|
||
err = c.Check(context.Background()) | ||
s.Require().NoError(err) | ||
} | ||
|
||
type checkTestSuite struct { | ||
suite.Suite | ||
|
||
ntpM *mockNtp | ||
} | ||
|
||
func (s *checkTestSuite) SetupTest() { | ||
s.ntpM = &mockNtp{} | ||
} | ||
|
||
func TestCheckTestSuite(t *testing.T) { | ||
suite.Run(t, &checkTestSuite{}) | ||
} | ||
|
||
type mockNtp struct { | ||
mock.Mock | ||
} | ||
|
||
func (m *mockNtp) queryMock(server string, opts ntp.QueryOptions) (*ntp.Response, error) { | ||
args := m.Called(server, opts) | ||
return args.Get(0).(*ntp.Response), args.Error(1) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package ntpq | ||
|
||
import ( | ||
validation "github.com/go-ozzo/ozzo-validation/v4" | ||
"github.com/go-ozzo/ozzo-validation/v4/is" | ||
th "github.com/teran/go-time" | ||
) | ||
|
||
type spec struct { | ||
Server string `json:"server"` | ||
SrcAddr string `json:"src_addr"` | ||
Tries uint8 `json:"tries"` | ||
OffsetThreshold th.Duration `json:"offset_threshold"` | ||
Interval th.Duration `json:"interval"` | ||
Timeout th.Duration `json:"timeout"` | ||
} | ||
|
||
func (s spec) Validate() error { | ||
return validation.ValidateStruct(&s, | ||
validation.Field(&s.Server, validation.Required, is.Host), | ||
validation.Field(&s.SrcAddr, validation.Required, is.IPv4), | ||
validation.Field(&s.Tries, validation.Required), | ||
validation.Field(&s.OffsetThreshold, validation.Required), | ||
validation.Field(&s.Interval, validation.Required), | ||
validation.Field(&s.Timeout, validation.Required), | ||
) | ||
} |
Oops, something went wrong.