Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add redis queue & async message strategy #615

Merged

Conversation

yanjiaxin534
Copy link
Contributor

@yanjiaxin534 yanjiaxin534 commented Jan 21, 2025

Roles of Each Topic:

DeploymentPlanTopic:

Splits the plan into sub-steps and sends them to DeploymentStepTopic.

DeploymentStepTopic:

Responsible for executing steps.
If the step's target is not a remote target, it executes the step and sends the result to CollectStepResultTopic.
If the step's target is a remote target, it sends the step execution information to the Redis queue to be executed by the remote agent.

CollectStepResultTopic:

Receives step execution results, which may come from either the remote agent or local execution.

Execution Details:

Each time when DeploymentPlanTopic receives a plan:

First -> splits the plan. Then sends the first step (step 0) to DeploymentStepTopic.

When DeploymentStepTopic receives a message:

If the target is local, it executes the step and sends the result to CollectStepResultTopic.
If the target is remote, it sends the step to the queue and waits for it to be picked up.

When CollectStepResultTopic receives a result, it checks if it is the last step:

If it is the last step, it executes the function to handle the end of the plan.
If it is not the last step, it sends the next step to DeploymentStepTopic.

Handling the End of the Plan:

  1. If it is a "get" plan, it performs a three-way merge and generates an apply plan, then sends it to DeploymentPlanTopic.
  2. If it is an "apply" plan, it concludes with a summary.

Remote Agent Message Interfaces:

solution/tasks -> onGetRequest:
Retrieves tasks by checking the target name, namespace, and whether to get all.
solution/task/getResult -> onGetResponse:
Receives the result from the remote agent, parses it, and sends it to CollectStepResultTopic.

Data Structure

Operation Body:

Responsible for storing parameters related to the operationId. When a result is received from the remote agent:
It first queries the related parameters based on the operationId, then returns the result, and finally deletes the information corresponding to that operationId.
The benefit of this approach is that if the remote agent sends two identical requests, it will not send the result twice to CollectStepResultTopic. Additionally, random requests from the remote agent will not be processed.

@yanjiaxin534 yanjiaxin534 reopened this Jan 21, 2025
Copy link
Contributor

@msftcoderdjw msftcoderdjw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the comments in next improvement PR.

@@ -65,6 +65,11 @@ func toMemoryQueueProviderConfig(config providers.IProviderConfig) (MemoryQueueP
return ret, err
}

// fake
func (s *MemoryQueueProvider) QueryByPaging(queueName string, start string, size int) ([][]byte, string, error) {
return [][]byte{}, "", nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better throw error for not implemented.

assert.Equal(t, v1alpha2.OK, resp.State)
json.Unmarshal(resp.Body, &summary)
assert.False(t, summary.Skipped)
// // deploy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we comment out these cases?

@@ -76,14 +81,18 @@ func (s *MemoryQueueProvider) Init(config providers.IProviderConfig) error {
return nil
}

func (s *MemoryQueueProvider) Enqueue(queue string, data interface{}) error {
// fake
func (s *MemoryQueueProvider) RemoveFromQueue(queue string, messageID string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

@@ -7,8 +7,10 @@
package queue

type IQueueProvider interface {
Enqueue(queue string, element interface{}) error
Enqueue(queue string, element interface{}) (string, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does string return value mean?
For IQueueProvider, pass context in.

@msftcoderdjw msftcoderdjw merged commit db8532e into eclipse-symphony:remote-agent Jan 27, 2025
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants