Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[riverpro] task of the same sequence run parallel #738

Closed
caraboides opened this issue Jan 30, 2025 · 3 comments
Closed

[riverpro] task of the same sequence run parallel #738

caraboides opened this issue Jan 30, 2025 · 3 comments
Assignees

Comments

@caraboides
Copy link

Hi,

we use river pro to ensure, that different tasks for the same entity did not run at the same time.

We have task A with an id and task B also with an id and they run in the same sequence ExcludeKind: true :

type A struct {
	Id string `json:"cluster_id" river:"sequence"`
}
func (A) Kind() string { return "A" }
func (A) SequenceOpts() riverpro.SequenceOpts {
	return riverpro.SequenceOpts{
		ByArgs:      true,
		ExcludeKind: true,
	}
}
type B struct {
	Id string `json:"cluster_id" river:"sequence"`
}
func (B) Kind() string { return "B" }
func (B) SequenceOpts() riverpro.SequenceOpts {
	return riverpro.SequenceOpts{
		ByArgs:      true,
		ExcludeKind: true,
	}
}

But i notice if i insert B during the execution of A, B will start immediately. I expected the start of B after the end of A.

Here is a test to show the behavior:
The test fail with:

        	            	expected: []string{"start-A", "end-A", "start-B", "end-B"}
        	            	actual  : []string{"start-A", "start-B", "end-B", "end-A"}
package tests

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"testing"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/pkg/errors"
	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivermigrate"
	"github.com/riverqueue/river/rivertype"
	"github.com/stretchr/testify/assert"
	testcontainers "github.com/testcontainers/testcontainers-go"
	"github.com/testcontainers/testcontainers-go/modules/postgres"
	"github.com/testcontainers/testcontainers-go/wait"
	"riverqueue.com/riverpro"
	"riverqueue.com/riverpro/driver/riverpropgxv5"
)

func TestUnitRiver(t *testing.T) {
	assert := assert.New(t)
	databaseUrl := setupPostgres(t)
	var runLog *[]string = &[]string{}
	workers := river.NewWorkers()
	river.AddWorker(workers, &AWorker{log: runLog})
	river.AddWorker(workers, &BWorker{log: runLog})

	client := createClient(context.Background(), workers, databaseUrl)
	assert.NotNil(client)

	t.Run("test sub task in sequence", func(t *testing.T) {
		subscribeChan, subscribeCancel := client.Subscribe(river.EventKindJobCompleted)
		defer subscribeCancel()
		client.Insert(context.Background(), AArgs{
			ClusterId: "1",
		}, nil)
		waitFor(subscribeChan, 2)
		assert.Equal([]string{"start-A", "end-A", "start-B", "end-B"}, *runLog)
	})

}

type AArgs struct {
	ClusterId string `json:"cluster_id" river:"sequence"`
}

func (AArgs) Kind() string { return "A" }

func (AArgs) SequenceOpts() riverpro.SequenceOpts {
	return riverpro.SequenceOpts{
		ByArgs:      true,
		ExcludeKind: true,
	}
}

type AWorker struct {
	river.WorkerDefaults[AArgs]
	log *[]string
}

func (w *AWorker) Work(ctx context.Context, job *river.Job[AArgs]) error {
	*w.log = append(*w.log, "start-A")
	time.Sleep(1 * time.Second)
	riverpro.ClientFromContext[pgx.Tx](ctx).Insert(context.Background(), BArgs{
		ClusterId: job.Args.ClusterId,
	}, nil)
	time.Sleep(3 * time.Second)
	*w.log = append(*w.log, "end-A")
	return nil
}

type BArgs struct {
	ClusterId string `json:"cluster_id" river:"sequence"`
}

func (BArgs) Kind() string { return "B" }

func (BArgs) SequenceOpts() riverpro.SequenceOpts {
	return riverpro.SequenceOpts{
		ByArgs:      true,
		ExcludeKind: true,
	}
}

type BWorker struct {
	river.WorkerDefaults[BArgs]
	log *[]string
}

func (w *BWorker) Work(ctx context.Context, job *river.Job[BArgs]) error {
	*w.log = append(*w.log, "start-B")
	time.Sleep(1 * time.Second)
	*w.log = append(*w.log, "end-B")
	return nil
}

func waitFor(subscribeChan <-chan *river.Event, numJobs int) {
	var (
		timeout  = 20 * time.Second
		deadline = time.Now().Add(timeout)
		events   = make([]*river.Event, 0, numJobs)
	)
	for {
		select {
		case event := <-subscribeChan:
			events = append(events, event)
			if len(events) >= numJobs {
				return
			}
		case <-time.After(time.Until(deadline)):
			panic(fmt.Sprintf("WaitOrTimeout timed out after waiting %s (received %d job(s), wanted %d)",
				timeout, len(events), numJobs))
		}
	}
}

func setupPostgres(t *testing.T) string {
	ctx := context.Background()
	postgresContainer, _ := postgres.Run(ctx,
		"docker.io/postgres:16-alpine",
		postgres.WithDatabase("postgresDB"),
		postgres.WithUsername("postgresUSER"),
		postgres.WithPassword("postgresW"),
		testcontainers.WithWaitStrategy(
			wait.ForLog("database system is ready to accept connections").WithOccurrence(2).WithStartupTimeout(5*time.Second),
			wait.ForListeningPort("5432/tcp"),
		),
	)
	t.Cleanup(func() {
		if err := postgresContainer.Terminate(ctx); err != nil {
			t.Fatalf("failed to terminate container: %s", err)
		}
	})
	databaseUrl, _ := postgresContainer.ConnectionString(ctx)
	return databaseUrl
}

func createClient(ctx context.Context, workers *river.Workers, databaseUrl string) *riverpro.Client[pgx.Tx] {
	config, _ := pgxpool.ParseConfig(databaseUrl)
	config.ConnConfig.DefaultQueryExecMode = pgx.QueryExecModeCacheDescribe
	conn, _ := pgxpool.NewWithConfig(context.Background(), config)
	migrator, _ := rivermigrate.New(riverpgxv5.New(conn), nil)
	migrator.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{})
	migrator2, _ := rivermigrate.New(riverpropgxv5.New(conn), &rivermigrate.Config{
		Line: "sequence",
	})
	migrator2.Migrate(ctx, rivermigrate.DirectionUp, &rivermigrate.MigrateOpts{})
	riverClient, _ := riverpro.NewClient(riverpropgxv5.New(conn), &riverpro.Config{
		Config: river.Config{
			Queues: map[string]river.QueueConfig{
				river.QueueDefault: {MaxWorkers: 10},
			},
			Workers:          workers,
		},
	})
	riverClient.Start(ctx)
	return riverClient
}

If i change MaxWorkers to 1 all works fine, but we need more worker and also have several replicas of that service.

@bgentry bgentry self-assigned this Jan 30, 2025
@bgentry
Copy link
Contributor

bgentry commented Jan 30, 2025

@caraboides thank you for the detailed report and sorry for the trouble. I’m looking into this now.

For context, we have a fairly extensive test suite on these pro features that does not ship with the packaged module. This scenario should not be possible and should be covered by existing tests, but clearly something is misbehaving.

@bgentry
Copy link
Contributor

bgentry commented Jan 31, 2025

@caraboides I'm sorry for the delay on this. I did confirm the bug shortly after replying earlier, but the fixes turned out to not be trivial. I also wanted to be sure that the gaps in test coverage revealed by this were fully taken care of.

I have the PR ready for this now and we're reviewing it. A new release of River Pro will be out soon as soon as we're sure it's ready to merge.

@bgentry bgentry closed this as completed Jan 31, 2025
@bgentry
Copy link
Contributor

bgentry commented Jan 31, 2025

@caraboides riverpro v0.8.1 has been released with fixes for this issue. Thank you again for reporting it and please let us know if you see additional problems with this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants