Skip to content

Commit

Permalink
Allow by period uniqueness to be based off scheduled time
Browse files Browse the repository at this point in the history
This one attempts to resolve #715. Currently, by period uniqueness
always bases the period off the current time, but there's a good
argument that if the job has been scheduled for a particular time in the
future, it should be based off that time instead.

This is one that could nominally be considered a small breaking change
in a Hyrum's Law sort of way, even though it's really patching what
could be considered a bug. Even though it was sort of broken before,
some apps may have come to depend on the broken behavior of the unique
code ignoring `ScheduledAt`. I'm not sure that it's a big enough problem
to be worth calling out though, so I didn't.

Fixes #715.
  • Loading branch information
brandur committed Jan 25, 2025
1 parent 4cea55a commit a672ad3
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Snoozing a job now causes its `attempt` to be _decremented_, whereas previously the `max_attempts` would be incremented. In either case, this avoids allowing a snooze to exhaust a job's retries; however the new behavior also avoids potential issues with wrapping the `max_attempts` value, and makes it simpler to implement a `RetryPolicy` based on either `attempt` or `max_attempts`. The number of snoozes is also tracked in the job's metadata as `snoozes` for debugging purposes.

The implementation of the builtin `RetryPolicy` implementations is not changed, so this change should not cause any user-facing breakage unless you're relying on `attempt - len(errors)` for some reason. [PR #730](https://github.com/riverqueue/river/pull/730).
- `ByPeriod` uniqueness is now based off a job's `ScheduledAt` instead of the current time if it has a value. [PR #734](https://github.com/riverqueue/river/pull/734).

## [0.15.0] - 2024-12-26

Expand Down
3 changes: 2 additions & 1 deletion internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/tidwall/sjson"

"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -123,7 +124,7 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := timeGen.NowUTC().Truncate(uniqueOpts.ByPeriod)
lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.NowUTC).Truncate(uniqueOpts.ByPeriod)
sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339))
}

Expand Down
36 changes: 29 additions & 7 deletions internal/dbunique/db_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"
)

Expand All @@ -30,10 +31,11 @@ func TestUniqueKey(t *testing.T) {
stubSvc.StubNowUTC(now)

tests := []struct {
name string
argsFunc func() rivertype.JobArgs
uniqueOpts UniqueOpts
expectedJSON string
name string
argsFunc func() rivertype.JobArgs
modifyInsertParamsFunc func(insertParams *rivertype.JobInsertParams)
uniqueOpts UniqueOpts
expectedJSON string
}{
{
name: "ByArgsWithMultipleUniqueStructTagsAndDefaultStates",
Expand Down Expand Up @@ -165,6 +167,22 @@ func TestUniqueKey(t *testing.T) {
uniqueOpts: UniqueOpts{ByPeriod: time.Hour, ByState: []rivertype.JobState{rivertype.JobStateCompleted}},
expectedJSON: "&kind=worker_4&period=" + now.Truncate(time.Hour).Format(time.RFC3339),
},
{
name: "PeriodFromScheduledAt",
argsFunc: func() rivertype.JobArgs {
type TaskJobArgs struct {
JobArgsStaticKind
}
return TaskJobArgs{
JobArgsStaticKind: JobArgsStaticKind{kind: "worker_4"},
}
},
modifyInsertParamsFunc: func(insertParams *rivertype.JobInsertParams) {
insertParams.ScheduledAt = ptrutil.Ptr(now.Add(time.Hour))
},
uniqueOpts: UniqueOpts{ByPeriod: time.Hour},
expectedJSON: "&kind=worker_4&period=" + now.Add(time.Hour).Truncate(time.Hour).Format(time.RFC3339),
},
{
name: "ExcludeKindByArgs",
argsFunc: func() rivertype.JobArgs {
Expand Down Expand Up @@ -228,7 +246,7 @@ func TestUniqueKey(t *testing.T) {
states = tt.uniqueOpts.ByState
}

jobParams := &rivertype.JobInsertParams{
insertParams := &rivertype.JobInsertParams{
Args: args,
CreatedAt: &now,
EncodedArgs: encodedArgs,
Expand All @@ -241,12 +259,16 @@ func TestUniqueKey(t *testing.T) {
UniqueStates: UniqueStatesToBitmask(states),
}

uniqueKeyPreHash, err := buildUniqueKeyString(stubSvc, &tt.uniqueOpts, jobParams)
if tt.modifyInsertParamsFunc != nil {
tt.modifyInsertParamsFunc(insertParams)
}

uniqueKeyPreHash, err := buildUniqueKeyString(stubSvc, &tt.uniqueOpts, insertParams)
require.NoError(t, err)
require.Equal(t, tt.expectedJSON, uniqueKeyPreHash)
expectedHash := sha256.Sum256([]byte(tt.expectedJSON))

uniqueKey, err := UniqueKey(stubSvc, &tt.uniqueOpts, jobParams)
uniqueKey, err := UniqueKey(stubSvc, &tt.uniqueOpts, insertParams)
require.NoError(t, err)
require.NotNil(t, uniqueKey)

Expand Down

0 comments on commit a672ad3

Please sign in to comment.