diff --git a/api/pkg/apis/v1alpha1/managers/managerfactory.go b/api/pkg/apis/v1alpha1/managers/managerfactory.go index 7587559a4..c7d591d54 100644 --- a/api/pkg/apis/v1alpha1/managers/managerfactory.go +++ b/api/pkg/apis/v1alpha1/managers/managerfactory.go @@ -91,6 +91,8 @@ func (c *SymphonyManagerFactory) CreateManager(config cm.ManagerConfig) (cm.IMan manager = &sites.SitesManager{} case "managers.symphony.staging": manager = &staging.StagingManager{} + case "managers.symphony.summarycleanup": + manager = &solution.SummaryCleanupManager{} case "managers.symphony.sync": manager = &sync.SyncManager{} case "managers.symphony.models": diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index 1e40649a2..34e6999a0 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -31,7 +31,6 @@ import ( config "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/config" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/keylock" secret "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/secret" - states "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states" "github.com/eclipse-symphony/symphony/coa/pkg/logger" ) @@ -56,9 +55,8 @@ const ( ) type SolutionManager struct { - managers.Manager + SummaryManager TargetProviders map[string]tgt.ITargetProvider - StateProvider states.IStateProvider ConfigProvider config.IExtConfigProvider SecretProvider secret.ISecretProvider KeyLockProvider keylock.IKeyLockProvider @@ -73,7 +71,7 @@ type SolutionManagerDeploymentState struct { } func (s *SolutionManager) Init(context *contexts.VendorContext, config managers.ManagerConfig, providers map[string]providers.IProvider) error { - err := s.Manager.Init(context, config, providers) + err := s.SummaryManager.Init(context, config, providers) if err != nil { return err } @@ -91,13 +89,6 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. return err } - stateprovider, err := managers.GetPersistentStateProvider(config, providers) - if err == nil { - s.StateProvider = stateprovider - } else { - return err - } - configProvider, err := managers.GetExtConfigProvider(config, providers) if err == nil { s.ConfigProvider = configProvider @@ -150,109 +141,26 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. return nil } -func (s *SolutionManager) getPreviousState(ctx context.Context, instance string, namespace string) *SolutionManagerDeploymentState { - state, err := s.StateProvider.Get(ctx, states.GetRequest{ - ID: instance, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) - if err == nil { - var managerState SolutionManagerDeploymentState - jData, _ := json.Marshal(state.Body) - err = json.Unmarshal(jData, &managerState) - if err == nil { - return &managerState - } - } - log.InfofCtx(ctx, " M (Solution): failed to get previous state for instance %s in namespace %s: %+v", instance, namespace, err) - return nil -} func (s *SolutionManager) GetSummary(ctx context.Context, summaryId string, name string, namespace string) (model.SummaryResult, error) { - // lock.Lock() - // defer lock.Unlock() - - ctx, span := observability.StartSpan("Solution Manager", ctx, &map[string]string{ - "method": "GetSummary", - }) - var err error = nil - defer observ_utils.CloseSpanWithError(span, &err) - defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err) - - log.InfofCtx(ctx, " M (Solution): get summary, name: %s, summaryId: %s, namespace: %s", name, summaryId, namespace) - - var state states.StateEntry - state, err = s.StateProvider.Get(ctx, states.GetRequest{ - ID: fmt.Sprintf("%s-%s", "summary", summaryId), - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": Summary, - }, - }) - if err != nil && api_utils.IsNotFound(err) { - // if get summary by guid not found, try to get the summary by name - log.InfofCtx(ctx, " M (Solution): failed to get deployment summary[%s] by summaryId, error: %+v. Try to get summary by name", summaryId, err) - state, err = s.StateProvider.Get(ctx, states.GetRequest{ - ID: fmt.Sprintf("%s-%s", "summary", name), - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": Summary, - }, - }) - } - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to get deployment summary[%s]: %+v", summaryId, err) - return model.SummaryResult{}, err - } - - var result model.SummaryResult - jData, _ := json.Marshal(state.Body) - err = json.Unmarshal(jData, &result) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to deserailze deployment summary[%s]: %+v", summaryId, err) - return model.SummaryResult{}, err - } - - return result, nil + return s.SummaryManager.GetSummary(ctx, fmt.Sprintf("%s-%s", "summary", summaryId), name, namespace) } -func (s *SolutionManager) DeleteSummary(ctx context.Context, key string, namespace string) error { - ctx, span := observability.StartSpan("Solution Manager", ctx, &map[string]string{ - "method": "DeleteSummary", - }) - var err error = nil - defer observ_utils.CloseSpanWithError(span, &err) - defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err) - - log.InfofCtx(ctx, " M (Solution): delete summary, key: %s, namespace: %s", key, namespace) - - err = s.StateProvider.Delete(ctx, states.DeleteRequest{ - ID: fmt.Sprintf("%s-%s", "summary", key), - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": Summary, - }, - }) - +func (s *SolutionManager) DeleteSummary(ctx context.Context, summaryId string, namespace string) error { + // Slient side delete summary is soft delete: will only add a deleted flag. + result, err := s.SummaryManager.GetSummary(ctx, fmt.Sprintf("%s-%s", "summary", summaryId), "", namespace) if err != nil { - if api_utils.IsNotFound(err) { - log.DebugfCtx(ctx, " M (Solution): DeleteSummary NoutFound, id: %s, namespace: %s", key, namespace) - return nil - } - log.ErrorfCtx(ctx, " M (Solution): failed to get summary[%s]: %+v", key, err) return err } - - return nil + result.Summary.Removed = true + return s.SummaryManager.UpsertSummary(ctx, + fmt.Sprintf("%s-%s", "summary", summaryId), + result.Generation, + result.DeploymentHash, + result.Summary, + result.State, + namespace, + true, // upsert softDelete flag to summary. + ) } func (s *SolutionManager) sendHeartbeat(ctx context.Context, id string, namespace string, remove bool, stopCh chan struct{}) { @@ -397,7 +305,7 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy } } - previousDesiredState := s.getPreviousState(ctx, deployment.Instance.ObjectMeta.Name, namespace) + previousDesiredState := s.GetDeploymentState(ctx, deployment.Instance.ObjectMeta.Name, namespace) var currentDesiredState, currentState model.DeploymentState currentDesiredState, err = NewDeploymentState(deployment) @@ -593,31 +501,9 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy if !deployment.IsDryRun { if len(mergedState.TargetComponent) == 0 && remove { log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state") - s.StateProvider.Delete(ctx, states.DeleteRequest{ - ID: deployment.Instance.ObjectMeta.Name, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) + s.DeleteDeploymentState(ctx, deployment.Instance.ObjectMeta.Name, namespace) } else { - s.StateProvider.Upsert(ctx, states.UpsertRequest{ - Value: states.StateEntry{ - ID: deployment.Instance.ObjectMeta.Name, - Body: SolutionManagerDeploymentState{ - Spec: deployment, - State: mergedState, - }, - }, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) + s.UpsertDeploymentState(ctx, deployment.Instance.ObjectMeta.Name, namespace, deployment, mergedState) } } @@ -658,47 +544,7 @@ func (s *SolutionManager) saveSummary(ctx context.Context, objectName string, su // TODO: delete this state when time expires. This should probably be invoked by the vendor (via GetSummary method, for instance) log.DebugfCtx(ctx, " M (Solution): saving summary, objectName: %s, summaryId: %s, state: %v, namespace: %s, jobid: %s, hash %s, targetCount %d, successCount %d", objectName, summaryId, state, namespace, summary.JobID, hash, summary.TargetCount, summary.SuccessCount) - oldSummary, err := s.GetSummary(ctx, summaryId, objectName, namespace) - if err != nil && !v1alpha2.IsNotFound(err) { - log.ErrorfCtx(ctx, " M (Solution): failed to get previous summary: %+v", err) - return err - } else if err == nil { - if summary.JobID != "" && oldSummary.Summary.JobID != "" { - var newId, oldId int64 - newId, err = strconv.ParseInt(summary.JobID, 10, 64) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to parse new job id: %+v", err) - return v1alpha2.NewCOAError(err, "failed to parse new job id", v1alpha2.BadRequest) - } - oldId, err = strconv.ParseInt(oldSummary.Summary.JobID, 10, 64) - if err == nil && oldId > newId { - errMsg := fmt.Sprintf("old job id %d is greater than new job id %d", oldId, newId) - log.ErrorfCtx(ctx, " M (Solution): %s", errMsg) - return v1alpha2.NewCOAError(err, errMsg, v1alpha2.BadRequest) - } - } else { - log.WarnfCtx(ctx, " M (Solution): JobIDs are both empty, skip id check") - } - } - _, err = s.StateProvider.Upsert(ctx, states.UpsertRequest{ - Value: states.StateEntry{ - ID: fmt.Sprintf("%s-%s", "summary", summaryId), - Body: model.SummaryResult{ - Summary: summary, - Generation: generation, - Time: time.Now().UTC(), - State: state, - DeploymentHash: hash, - }, - }, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": Summary, - }, - }) - return err + return s.SummaryManager.UpsertSummary(ctx, fmt.Sprintf("%s-%s", "summary", summaryId), generation, hash, summary, state, namespace, false) } func (s *SolutionManager) saveSummaryProgress(ctx context.Context, objectName string, summaryId string, generation string, hash string, summary model.SummarySpec, namespace string) error { diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager_test.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager_test.go index b97dc3b10..8e535d98f 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager_test.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager_test.go @@ -370,7 +370,9 @@ func TestMockGet(t *testing.T) { TargetProviders: map[string]target.ITargetProvider{ "mock": targetProvider, }, - StateProvider: stateProvider, + SummaryManager: SummaryManager{ + StateProvider: stateProvider, + }, KeyLockProvider: keyLockProvider, } state, components, err := manager.Get(context.Background(), deployment, "") @@ -404,8 +406,9 @@ func TestMockGet(t *testing.T) { // Test summary deletion err = manager.DeleteSummary(context.Background(), summaryKey, "default") assert.Nil(t, err) - _, err = manager.GetSummary(context.Background(), summaryKey, name, "default") - assert.NotNil(t, err) + result, err := manager.GetSummary(context.Background(), summaryKey, name, "default") + assert.Nil(t, err) + assert.True(t, result.Summary.Removed, "Summary should have set the removed flag") } func TestMockGetTwoTargets(t *testing.T) { id := uuid.New().String() @@ -482,7 +485,9 @@ func TestMockGetTwoTargets(t *testing.T) { TargetProviders: map[string]target.ITargetProvider{ "mock": targetProvider, }, - StateProvider: stateProvider, + SummaryManager: SummaryManager{ + StateProvider: stateProvider, + }, KeyLockProvider: keyLockProvider, } state, components, err := manager.Get(context.Background(), deployment, "") @@ -580,7 +585,9 @@ func TestMockGetTwoTargetsTwoProviders(t *testing.T) { "mock1": targetProvider, "mock2": targetProvider, }, - StateProvider: stateProvider, + SummaryManager: SummaryManager{ + StateProvider: stateProvider, + }, KeyLockProvider: keyLockProvider, } state, components, err := manager.Get(context.Background(), deployment, "") @@ -656,7 +663,9 @@ func TestMockApply(t *testing.T) { TargetProviders: map[string]target.ITargetProvider{ "mock": targetProvider, }, - StateProvider: stateProvider, + SummaryManager: SummaryManager{ + StateProvider: stateProvider, + }, KeyLockProvider: keyLockProvider, } summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "") @@ -722,7 +731,9 @@ func TestMockApplyMultiRoles(t *testing.T) { "mock": targetProvider, "mock2": targetProvider2, }, - StateProvider: stateProvider, + SummaryManager: SummaryManager{ + StateProvider: stateProvider, + }, KeyLockProvider: keyLockProvider, } summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "") @@ -781,7 +792,9 @@ func TestMockApplyWithUpdateAndRemove(t *testing.T) { TargetProviders: map[string]target.ITargetProvider{ "mock": targetProvider, }, - StateProvider: stateProvider, + SummaryManager: SummaryManager{ + StateProvider: stateProvider, + }, KeyLockProvider: keyLockProvider, } summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "") @@ -835,7 +848,9 @@ func TestMockApplyWithError(t *testing.T) { TargetProviders: map[string]target.ITargetProvider{ "mock": targetProvider, }, - StateProvider: stateProvider, + SummaryManager: SummaryManager{ + StateProvider: stateProvider, + }, KeyLockProvider: keyLockProvider, } summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "") diff --git a/api/pkg/apis/v1alpha1/managers/solution/summary-cleanup-manager.go b/api/pkg/apis/v1alpha1/managers/solution/summary-cleanup-manager.go new file mode 100644 index 000000000..4352b3dac --- /dev/null +++ b/api/pkg/apis/v1alpha1/managers/solution/summary-cleanup-manager.go @@ -0,0 +1,88 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT license. + * SPDX-License-Identifier: MIT + */ + +package solution + +import ( + "context" + "time" + + "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" + vendorCtx "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/contexts" + "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers" + observability "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability" + observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils" + "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" +) + +const ( + // DefaultSummaryRetentionDuration is the default time to cleanup deprecated summaries + // DefaultSummaryRetentionDuration = 180 * time.Hour * 24 + DefaultSummaryRetentionDuration = 60 * time.Second * 5 +) + +type SummaryCleanupManager struct { + SummaryManager + SummaryRetentionDuration time.Duration +} + +func (s *SummaryCleanupManager) Init(ctx *vendorCtx.VendorContext, config managers.ManagerConfig, providers map[string]providers.IProvider) error { + err := s.SummaryManager.Init(ctx, config, providers) + if err != nil { + return err + } + + // Set activation cleanup interval after they are done. If not set, use default 180 days. + if val, ok := config.Properties["SummaryRetentionDuration"]; ok { + s.SummaryRetentionDuration, err = time.ParseDuration(val) + if err != nil { + return v1alpha2.NewCOAError(nil, "SummaryRetentionDuration cannot be parsed, please enter a valid duration", v1alpha2.BadConfig) + } else if s.SummaryRetentionDuration < 0 { + return v1alpha2.NewCOAError(nil, "SummaryRetentionDuration cannot be negative", v1alpha2.BadConfig) + } + } else { + s.SummaryRetentionDuration = DefaultSummaryRetentionDuration + } + + log.Info("M (Summary Cleanup): Initialize SummaryRetentionDuration as " + s.SummaryRetentionDuration.String()) + return nil +} + +func (s *SummaryCleanupManager) Enabled() bool { + return true +} + +func (s *SummaryCleanupManager) Poll() []error { + // TODO: initialize the context with id correctly + ctx, span := observability.StartSpan("Summary Cleanup Manager", context.Background(), &map[string]string{ + "method": "Poll", + }) + var err error = nil + defer observ_utils.CloseSpanWithError(span, &err) + + log.InfoCtx(ctx, "M (Summary Cleanup): Polling summaries") + summaries, err := s.ListSummary(ctx, "") + if err != nil { + return []error{err} + } + ret := []error{} + for _, summary := range summaries { + // Check the upsert time of summary. + duration := time.Since(summary.Time) + if duration > s.SummaryRetentionDuration { + log.InfofCtx(ctx, "M (Summary Cleanup): Deleting summary %s since it has deprecated for %s", summary.SummaryId, duration.String()) + err = s.DeleteSummary(ctx, summary.SummaryId, "") + if err != nil { + ret = append(ret, err) + } + } + } + return ret +} + +func (s *SummaryCleanupManager) Reconcil() []error { + return nil +} diff --git a/api/pkg/apis/v1alpha1/managers/solution/summary-manager.go b/api/pkg/apis/v1alpha1/managers/solution/summary-manager.go new file mode 100644 index 000000000..cc1da97b0 --- /dev/null +++ b/api/pkg/apis/v1alpha1/managers/solution/summary-manager.go @@ -0,0 +1,257 @@ +/* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT license. + * SPDX-License-Identifier: MIT + */ + +package solution + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model" + api_utils "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils" + "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" + vendorCtx "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/contexts" + "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers" + "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability" + observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils" + "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" + states "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states" +) + +type SummaryManager struct { + managers.Manager + StateProvider states.IStateProvider +} + +func (s *SummaryManager) Init(ctx *vendorCtx.VendorContext, config managers.ManagerConfig, providers map[string]providers.IProvider) error { + err := s.Manager.Init(ctx, config, providers) + if err != nil { + return err + } + stateprovider, err := managers.GetPersistentStateProvider(config, providers) + if err == nil { + s.StateProvider = stateprovider + } else { + return err + } + + return nil +} + +func (s *SummaryManager) GetDeploymentState(ctx context.Context, instance string, namespace string) *SolutionManagerDeploymentState { + state, err := s.StateProvider.Get(ctx, states.GetRequest{ + ID: instance, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) + if err == nil { + var managerState SolutionManagerDeploymentState + jData, _ := json.Marshal(state.Body) + err = json.Unmarshal(jData, &managerState) + if err == nil { + return &managerState + } + } + log.InfofCtx(ctx, " M (Summary): failed to get previous state for instance %s in namespace %s: %+v", instance, namespace, err) + return nil +} + +func (s *SummaryManager) UpsertDeploymentState(ctx context.Context, instance string, namespace string, deployment model.DeploymentSpec, mergedState model.DeploymentState) error { + _, err := s.StateProvider.Upsert(ctx, states.UpsertRequest{ + Value: states.StateEntry{ + ID: instance, + Body: SolutionManagerDeploymentState{ + Spec: deployment, + State: mergedState, + }, + }, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) + return err +} + +func (s *SummaryManager) DeleteDeploymentState(ctx context.Context, instance string, namespace string) error { + err := s.StateProvider.Delete(ctx, states.DeleteRequest{ + ID: instance, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) + return err +} + +func (s *SummaryManager) GetSummary(ctx context.Context, summaryId string, name string, namespace string) (model.SummaryResult, error) { + ctx, span := observability.StartSpan("Summary Manager", ctx, &map[string]string{ + "method": "GetSummary", + }) + var err error = nil + defer observ_utils.CloseSpanWithError(span, &err) + defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err) + + log.InfofCtx(ctx, " M (Summary): get summary, name: %s, summaryId: %s, namespace: %s", name, summaryId, namespace) + + var state states.StateEntry + state, err = s.StateProvider.Get(ctx, states.GetRequest{ + ID: summaryId, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": Summary, + }, + }) + if err != nil && api_utils.IsNotFound(err) && name != "" { + // if get summary by guid not found, try to get the summary by name + log.InfofCtx(ctx, " M (Summary): failed to get deployment summary[%s] by summaryId, error: %+v. Try to get summary by name", summaryId, err) + state, err = s.StateProvider.Get(ctx, states.GetRequest{ + ID: fmt.Sprintf("%s-%s", "summary", name), + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": Summary, + }, + }) + } + if err != nil { + log.ErrorfCtx(ctx, " M (Summary): failed to get deployment summary[%s]: %+v", summaryId, err) + return model.SummaryResult{}, err + } + + var result model.SummaryResult + jData, _ := json.Marshal(state.Body) + err = json.Unmarshal(jData, &result) + if err != nil { + log.ErrorfCtx(ctx, " M (Summary): failed to deserailze deployment summary[%s]: %+v", summaryId, err) + return model.SummaryResult{}, err + } + + log.InfofCtx(ctx, " M (Summary): get summary, key: %s, namespace: %s, summary: %+v", summaryId, namespace, result) + return result, nil +} + +func (s *SummaryManager) ListSummary(ctx context.Context, namespace string) ([]model.SummaryResult, error) { + listRequest := states.ListRequest{ + Metadata: map[string]interface{}{ + "namespace": namespace, + "resource": "Summary", + "group": model.SolutionGroup, + }, + } + var entries []states.StateEntry + entries, _, err := s.StateProvider.List(ctx, listRequest) + if err != nil { + return []model.SummaryResult{}, nil + } + + var summaries []model.SummaryResult + for _, entry := range entries { + var result model.SummaryResult + jData, _ := json.Marshal(entry.Body) + err = json.Unmarshal(jData, &result) + if err == nil { + result.SummaryId = entry.ID + summaries = append(summaries, result) + } + } + return summaries, nil +} + +func (s *SummaryManager) UpsertSummary(ctx context.Context, summaryId string, generation string, hash string, summary model.SummarySpec, state model.SummaryState, namespace string, softDelete bool) error { + oldSummary, err := s.GetSummary(ctx, summaryId, "", namespace) + if err != nil && !v1alpha2.IsNotFound(err) { + log.ErrorfCtx(ctx, " M (Summary): failed to get previous summary: %+v", err) + return err + } else if err == nil { + if summary.JobID != "" && oldSummary.Summary.JobID != "" { + var newId, oldId int64 + newId, err = strconv.ParseInt(summary.JobID, 10, 64) + if err != nil { + log.ErrorfCtx(ctx, " M (Summary): failed to parse new job id: %+v", err) + return v1alpha2.NewCOAError(err, "failed to parse new job id", v1alpha2.BadRequest) + } + oldId, err = strconv.ParseInt(oldSummary.Summary.JobID, 10, 64) + if err == nil && oldId > newId { + errMsg := fmt.Sprintf("old job id %d is greater than new job id %d", oldId, newId) + log.ErrorfCtx(ctx, " M (Summary): %s", errMsg) + return v1alpha2.NewCOAError(err, errMsg, v1alpha2.BadRequest) + } + if !softDelete && summary.Removed { + errMsg := fmt.Sprintf("Cannot upsert a deleted summary: %s", summaryId) + log.ErrorfCtx(ctx, " M (Summary): %s", errMsg) + return v1alpha2.NewCOAError(err, errMsg, v1alpha2.BadRequest) + } + } else { + log.WarnfCtx(ctx, " M (Summary): JobIDs are both empty, skip id check") + } + } + _, err = s.StateProvider.Upsert(ctx, states.UpsertRequest{ + Value: states.StateEntry{ + ID: summaryId, + Body: model.SummaryResult{ + Summary: summary, + Generation: generation, + Time: time.Now().UTC(), + State: state, + DeploymentHash: hash, + }, + }, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": Summary, + }, + }) + return err +} + +func (s *SummaryManager) DeleteSummary(ctx context.Context, summaryId string, namespace string) error { + ctx, span := observability.StartSpan("Summary Manager", ctx, &map[string]string{ + "method": "DeleteSummary", + }) + var err error = nil + defer observ_utils.CloseSpanWithError(span, &err) + defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err) + + log.InfofCtx(ctx, " M (Summary): delete summary, key: %s, namespace: %s", summaryId, namespace) + + err = s.StateProvider.Delete(ctx, states.DeleteRequest{ + ID: summaryId, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": Summary, + }, + }) + + if err != nil { + if api_utils.IsNotFound(err) { + log.DebugfCtx(ctx, " M (Summary): DeleteSummary NoutFound, id: %s, namespace: %s", summaryId, namespace) + return nil + } + log.ErrorfCtx(ctx, " M (Summary): failed to get summary[%s]: %+v", summaryId, err) + return err + } + + return nil +} diff --git a/api/pkg/apis/v1alpha1/model/summary.go b/api/pkg/apis/v1alpha1/model/summary.go index 27b1a8c1c..4d73a1a83 100644 --- a/api/pkg/apis/v1alpha1/model/summary.go +++ b/api/pkg/apis/v1alpha1/model/summary.go @@ -34,9 +34,11 @@ type SummarySpec struct { Skipped bool `json:"skipped"` IsRemoval bool `json:"isRemoval"` AllAssignedDeployed bool `json:"allAssignedDeployed"` + Removed bool `json:"removed"` } type SummaryResult struct { Summary SummarySpec `json:"summary"` + SummaryId string `json:"summaryid,omitempty"` Generation string `json:"generation"` Time time.Time `json:"time"` State SummaryState `json:"state"` diff --git a/api/pkg/apis/v1alpha1/providers/stage/script/staging/646f0a76-5a99-45b8-8369-402d0ac732c0-output.json b/api/pkg/apis/v1alpha1/providers/stage/script/staging/646f0a76-5a99-45b8-8369-402d0ac732c0-output.json new file mode 100644 index 000000000..e69de29bb diff --git a/api/pkg/apis/v1alpha1/providers/stage/script/staging/cce1df71-cde6-4f01-9766-30419fa7d946-output.json b/api/pkg/apis/v1alpha1/providers/stage/script/staging/cce1df71-cde6-4f01-9766-30419fa7d946-output.json new file mode 100644 index 000000000..e69de29bb diff --git a/api/pkg/apis/v1alpha1/vendors/backgroundjob-vendor.go b/api/pkg/apis/v1alpha1/vendors/backgroundjob-vendor.go index c7ceb0018..e780cea8b 100644 --- a/api/pkg/apis/v1alpha1/vendors/backgroundjob-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/backgroundjob-vendor.go @@ -8,6 +8,7 @@ package vendors import ( "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/activations" + "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/managers" "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers" @@ -19,6 +20,7 @@ type BackgroundJobVendor struct { vendors.Vendor // Add a new manager if you want to add another background job ActivationsCleanerManager *activations.ActivationsCleanupManager + SummaryCleanupManager *solution.SummaryCleanupManager } func (s *BackgroundJobVendor) GetInfo() vendors.VendorInfo { @@ -41,6 +43,8 @@ func (s *BackgroundJobVendor) Init(config vendors.VendorConfig, factories []mana for _, m := range s.Managers { if c, ok := m.(*activations.ActivationsCleanupManager); ok { s.ActivationsCleanerManager = c + } else if c, ok := m.(*solution.SummaryCleanupManager); ok { + s.SummaryCleanupManager = c } // Load a new manager if you want to add another background job } @@ -49,5 +53,10 @@ func (s *BackgroundJobVendor) Init(config vendors.VendorConfig, factories []mana } else { log.Info("ActivationsCleanupManager is disabled") } + if s.SummaryCleanupManager != nil { + log.Info("SummaryCleanupManager is enabled") + } else { + log.Info("SummaryCleanupManager is disabled") + } return nil } diff --git a/k8s/reconcilers/deployment.go b/k8s/reconcilers/deployment.go index 007f5e3b8..6e8378460 100644 --- a/k8s/reconcilers/deployment.go +++ b/k8s/reconcilers/deployment.go @@ -203,7 +203,7 @@ func (r *DeploymentReconciler) AttemptUpdate(ctx context.Context, object Reconci // gofail: var beforeQueueJob string if err := r.queueDeploymentJob(ctx, object, isRemoval, operationStartTimeKey); err != nil { diagnostic.ErrorWithCtx(log, ctx, err, "failed to queue deployment job") - return r.handleDeploymentError(ctx, object, nil, isRemoval, reconciliationInterval, err, log) + return r.handleDeploymentError(ctx, object, nil, reconciliationInterval, err, log) } // DO NOT REMOVE THIS COMMENT // gofail: var afterQueueJob string @@ -349,7 +349,7 @@ func (r *DeploymentReconciler) PollingResult(ctx context.Context, object Reconci } } -func (r *DeploymentReconciler) handleDeploymentError(ctx context.Context, object Reconcilable, summary *model.SummaryResult, isRemoval bool, reconcileInterval time.Duration, err error, log logr.Logger) (metrics.OperationStatus, ctrl.Result, error) { +func (r *DeploymentReconciler) handleDeploymentError(ctx context.Context, object Reconcilable, summary *model.SummaryResult, reconcileInterval time.Duration, err error, log logr.Logger) (metrics.OperationStatus, ctrl.Result, error) { patchOptions := patchStatusOptions{} if isTermnalError(err, termialErrors) { patchOptions.terminalErr = err diff --git a/packages/helm/symphony/files/symphony-api.json b/packages/helm/symphony/files/symphony-api.json index 3febc2371..303ae7990 100644 --- a/packages/helm/symphony/files/symphony-api.json +++ b/packages/helm/symphony/files/symphony-api.json @@ -197,8 +197,31 @@ } } } - } + }, {{- end }} + { + "name": "summary-cleanup-manager", + "type": "managers.symphony.summarycleanup", + "properties": { + "providers.persistentstate": "redis-state", + "RetentionDuration": "{{ .Values.SummaryCleanup.retentionDuration }}" + }, + "providers": { + "redis-state": { + {{- if .Values.redis.enabled }} + "type": "providers.state.redis", + "config": { + "host": "{{ include "symphony.redisHost" . }}", + "requireTLS": false, + "password": "" + } + {{- else }} + "type": "providers.state.memory", + "config": {} + {{- end }} + } + } + } ] }, { diff --git a/packages/helm/symphony/values.yaml b/packages/helm/symphony/values.yaml index e3eddce31..d574f168f 100644 --- a/packages/helm/symphony/values.yaml +++ b/packages/helm/symphony/values.yaml @@ -83,6 +83,11 @@ ActivationCleanup: # Rentention duration for activations, default is 180days # units are "ns", "us" (or "µs"), "ms", "s", "m", "h" retentionDuration: "4320h" +SummaryCleanup: + enabled: true + # Rentention duration for activations, default is 180days + # units are "ns", "us" (or "µs"), "ms", "s", "m", "h" + retentionDuration: "4320h" Azure: proxySettings: isProxyEnabled: false