Skip to content

Commit

Permalink
API to revert nacos job (#3973)
Browse files Browse the repository at this point in the history
* add debug logs for api

Signed-off-by: Min Min <jamsman94@gmail.com>

* add list revert event api

Signed-off-by: Min Min <jamsman94@gmail.com>

* change debug logs

Signed-off-by: Min Min <jamsman94@gmail.com>

* added list history api for revert

Signed-off-by: Min Min <jamsman94@gmail.com>

* added revert spec for frontend to show

Signed-off-by: Min Min <jamsman94@gmail.com>

* Added missing field in preview

Signed-off-by: Min Min <jamsman94@gmail.com>

* debug

Signed-off-by: Min Min <jamsman94@gmail.com>

* debug

Signed-off-by: Min Min <jamsman94@gmail.com>

* debug

Signed-off-by: Min Min <jamsman94@gmail.com>

* debug

Signed-off-by: Min Min <jamsman94@gmail.com>

* debug

Signed-off-by: Min Min <jamsman94@gmail.com>

* remove debug logs

Signed-off-by: Min Min <jamsman94@gmail.com>

---------

Signed-off-by: Min Min <jamsman94@gmail.com>
  • Loading branch information
jamsman94 authored Jan 26, 2025
1 parent d5cb222 commit fd470c2
Show file tree
Hide file tree
Showing 10 changed files with 457 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/cli/initconfig/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func createOrUpdateMongodbIndex(ctx context.Context) {
commonrepo.NewSAEEnvColl(),
commonrepo.NewEnvInfoColl(),
commonrepo.NewApprovalTicketColl(),
commonrepo.NewWorkflowTaskRevertColl(),

// msg queue
commonrepo.NewMsgQueueCommonColl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type WorkflowTask struct {
GlobalContext map[string]string `bson:"global_context" json:"global_context"`
ClusterIDMap map[string]bool `bson:"cluster_id_map" json:"cluster_id_map"`
Status config.Status `bson:"status" json:"status,omitempty"`
Reverted bool `bson:"reverted" json:"reverted"`
Remark string `bson:"remark,omitempty" json:"remark"`
TaskCreator string `bson:"task_creator" json:"task_creator,omitempty"`
TaskCreatorID string `bson:"task_creator_id" json:"task_creator_id,omitempty"`
Expand Down Expand Up @@ -117,7 +118,8 @@ type JobTask struct {
ErrorHandlerUserID string `bson:"error_handler_user_id" yaml:"error_handler_user_id" json:"error_handler_user_id"`
ErrorHandlerUserName string `bson:"error_handler_username" yaml:"error_handler_username" json:"error_handler_username"`

RetryCount int `bson:"retry_count" json:"retry_count" yaml:"retry_count"`
RetryCount int `bson:"retry_count" json:"retry_count" yaml:"retry_count"`
Reverted bool `bson:"reverted" json:"reverted" yaml:"reverted"`
}

type TaskJobInfo struct {
Expand All @@ -137,6 +139,7 @@ type WorkflowTaskPreview struct {
WorkflowDisplayName string `bson:"workflow_display_name" json:"workflow_display_name"`
Remark string `bson:"remark" json:"remark"`
Status config.Status `bson:"status" json:"status"`
Reverted bool `bson:"reverted" json:"reverted"`
CreateTime int64 `bson:"create_time" json:"create_time,omitempty"`
StartTime int64 `bson:"start_time" json:"start_time,omitempty"`
EndTime int64 `bson:"end_time" json:"end_time,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2025 The KodeRover Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package models

import (
"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
"go.mongodb.org/mongo-driver/bson/primitive"
)

type WorkflowTaskRevert struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
TaskID int64 `bson:"task_id" json:"task_id"`
WorkflowName string `bson:"workflow_name" json:"workflow_name"`
JobName string `bson:"job_name" json:"job_name"`
RevertSpec interface{} `bson:"revert_spec" json:"revert_spec"`
CreateTime int64 `bson:"create_time" json:"create_time,omitempty"`
TaskCreator string `bson:"creator" json:"creator,omitempty"`
TaskCreatorID string `bson:"creator_id" json:"creator_id,omitempty"`
Status config.Status `bson:"status" json:"status,omitempty"`
}

func (WorkflowTaskRevert) TableName() string {
return "workflow_task_revert"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
Copyright 2025 The KodeRover Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mongodb

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"

"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
"github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models"
mongotool "github.com/koderover/zadig/v2/pkg/tool/mongo"
)

type WorkflowTasKRevertColl struct {
*mongo.Collection

coll string
}

func NewWorkflowTaskRevertColl() *WorkflowTasKRevertColl {
name := models.WorkflowTaskRevert{}.TableName()
return &WorkflowTasKRevertColl{Collection: mongotool.Database(config.MongoDatabase()).Collection(name), coll: name}
}

func (c *WorkflowTasKRevertColl) GetCollectionName() string {
return c.coll
}

func (c *WorkflowTasKRevertColl) EnsureIndex(ctx context.Context) error {
mod := []mongo.IndexModel{
{
Keys: bson.D{
bson.E{Key: "workflow_name", Value: 1},
bson.E{Key: "task_id", Value: 1},
bson.E{Key: "job_name", Value: 1},
},
Options: options.Index().SetUnique(false),
},
{
Keys: bson.M{"create_time": 1},
Options: options.Index().SetUnique(false),
},
}

_, err := c.Indexes().CreateMany(ctx, mod)

return err
}

func (c *WorkflowTasKRevertColl) Create(obj *models.WorkflowTaskRevert) (string, error) {
if obj == nil {
return "", fmt.Errorf("nil object")
}

res, err := c.InsertOne(context.TODO(), obj)
if err != nil {
return "", err
}
ID, ok := res.InsertedID.(primitive.ObjectID)
if !ok {
return "", fmt.Errorf("failed to get object id from create")
}
return ID.Hex(), err
}

type ListWorkflowRevertOption struct {
TaskID int64
WorkflowName string
JobName string
}

func (c *WorkflowTasKRevertColl) List(opt *ListWorkflowRevertOption) ([]*models.WorkflowTaskRevert, error) {
resp := make([]*models.WorkflowTaskRevert, 0)

query := bson.M{}
if opt.WorkflowName != "" {
query["workflow_name"] = opt.WorkflowName
}
if opt.TaskID != 0 {
query["task_id"] = opt.TaskID
}
if opt.JobName != "" {
query["job_name"] = opt.JobName
}

findOption := options.Find()
cursor, err := c.Collection.Find(context.TODO(), query, findOption)
if err != nil {
return nil, err
}
err = cursor.All(context.TODO(), &resp)
if err != nil {
return nil, err
}
return resp, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func (c *NacosJobCtl) Run(ctx context.Context) {
return
}
for _, data := range c.jobTaskSpec.NacosDatas {
configHistory, err := client.GetConfigHistory(data.DataID, data.Group, c.jobTaskSpec.NamespaceID)
if err != nil {
logError(c.job, err.Error(), c.logger)
return
}
if len(configHistory) == 0 {
logError(c.job, "config history is empty", c.logger)
return
}
data.OriginalContentUpdateTime = configHistory[0].CreatedTime

if err := client.UpdateConfig(data.DataID, data.Group, c.jobTaskSpec.NamespaceID, data.Content, data.Format); err != nil {
data.Error = err.Error()
logError(c.job, err.Error(), c.logger)
Expand Down
2 changes: 2 additions & 0 deletions pkg/microservice/aslan/core/workflow/handler/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ func (*Router) Inject(router *gin.RouterGroup) {
taskV4.POST("/breakpoint/:workflowName/:jobName/task/:taskID/:position", SetWorkflowTaskV4Breakpoint)
taskV4.POST("/debug/:workflowName/task/:taskID", EnableDebugWorkflowTaskV4)
taskV4.DELETE("/debug/:workflowName/:jobName/task/:taskID/:position", StopDebugWorkflowTaskJobV4)
taskV4.POST("/revert/:workflowName/:jobName/task/:taskID", RevertWorkflowTaskV4Job)
taskV4.GET("/revert/:workflowName/:jobName/task/:taskID", GetWorkflowTaskV4JobRevert)
taskV4.POST("/approve", ApproveStage)
taskV4.POST("/handle/error", HandleJobError)
taskV4.GET("/workflow/:workflowName/taskId/:taskId/job/:jobName", GetWorkflowV4ArtifactFileContent)
Expand Down
107 changes: 106 additions & 1 deletion pkg/microservice/aslan/core/workflow/handler/workflow_task_v4.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,6 @@ func StopDebugWorkflowTaskJobV4(c *gin.Context) {
defer func() { internalhandler.JSONResponse(c, ctx) }()

if err != nil {

ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err)
ctx.UnAuthorized = true
return
Expand Down Expand Up @@ -641,6 +640,112 @@ func StopDebugWorkflowTaskJobV4(c *gin.Context) {
ctx.RespErr = workflow.StopDebugWorkflowTaskJobV4(workflowName, c.Param("jobName"), taskID, c.Param("position"), ctx.Logger)
}

type revertWorkflowTaskV4JobReq struct {
Input interface{} `json:"input"`
}

func RevertWorkflowTaskV4Job(c *gin.Context) {
ctx, err := internalhandler.NewContextWithAuthorization(c)
defer func() { internalhandler.JSONResponse(c, ctx) }()

if err != nil {
ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err)
ctx.UnAuthorized = true
return
}

args := new(revertWorkflowTaskV4JobReq)
data := getBody(c)
if err := json.Unmarshal([]byte(data), args); err != nil {
log.Errorf("CreateWorkflowTaskv4 json.Unmarshal err : %s", err)
ctx.RespErr = e.ErrInvalidParam.AddDesc(err.Error())
return
}

workflowName := c.Param("workflowName")

w, err := workflow.FindWorkflowV4Raw(workflowName, ctx.Logger)
if err != nil {
ctx.Logger.Errorf("EnableDebugWorkflowTaskV4 error: %v", err)
ctx.RespErr = e.ErrInvalidParam.AddErr(err)
return
}

internalhandler.InsertOperationLog(c, ctx.UserName, w.Project, "回滚", "自定义工作流任务", w.Name, data, ctx.Logger)

// authorization check
if !ctx.Resources.IsSystemAdmin {
if _, ok := ctx.Resources.ProjectAuthInfo[w.Project]; !ok {
ctx.UnAuthorized = true
return
}

if !ctx.Resources.ProjectAuthInfo[w.Project].IsProjectAdmin &&
!ctx.Resources.ProjectAuthInfo[w.Project].Workflow.Execute {
// check if the permission is given by collaboration mode
permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, w.Project, types.ResourceTypeWorkflow, w.Name, types.WorkflowActionRun)
if err != nil || !permitted {
ctx.UnAuthorized = true
return
}
}
}

taskID, err := strconv.ParseInt(c.Param("taskID"), 10, 64)
if err != nil {
ctx.RespErr = e.ErrInvalidParam.AddDesc("invalid task id")
return
}

ctx.RespErr = workflow.RevertWorkflowTaskV4Job(workflowName, c.Param("jobName"), taskID, args.Input, ctx.UserName, ctx.UserID, ctx.Logger)
}

func GetWorkflowTaskV4JobRevert(c *gin.Context) {
ctx, err := internalhandler.NewContextWithAuthorization(c)
defer func() { internalhandler.JSONResponse(c, ctx) }()

if err != nil {
ctx.RespErr = fmt.Errorf("authorization Info Generation failed: err %s", err)
ctx.UnAuthorized = true
return
}

workflowName := c.Param("workflowName")

w, err := workflow.FindWorkflowV4Raw(workflowName, ctx.Logger)
if err != nil {
ctx.Logger.Errorf("EnableDebugWorkflowTaskV4 error: %v", err)
ctx.RespErr = e.ErrInvalidParam.AddErr(err)
return
}

// authorization check
if !ctx.Resources.IsSystemAdmin {
if _, ok := ctx.Resources.ProjectAuthInfo[w.Project]; !ok {
ctx.UnAuthorized = true
return
}

if !ctx.Resources.ProjectAuthInfo[w.Project].IsProjectAdmin &&
!ctx.Resources.ProjectAuthInfo[w.Project].Workflow.View {
// check if the permission is given by collaboration mode
permitted, err := internalhandler.GetCollaborationModePermission(ctx.UserID, w.Project, types.ResourceTypeWorkflow, w.Name, types.WorkflowActionView)
if err != nil || !permitted {
ctx.UnAuthorized = true
return
}
}
}

taskID, err := strconv.ParseInt(c.Param("taskID"), 10, 64)
if err != nil {
ctx.RespErr = e.ErrInvalidParam.AddDesc("invalid task id")
return
}

ctx.Resp, ctx.RespErr = workflow.GetWorkflowTaskV4JobRevert(workflowName, c.Param("jobName"), taskID, ctx.Logger)
}

func ApproveStage(c *gin.Context) {
ctx := internalhandler.NewContext(c)
defer func() { internalhandler.JSONResponse(c, ctx) }()
Expand Down
Loading

0 comments on commit fd470c2

Please sign in to comment.