Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add summary cleanup manager to delete deprecated summaries periodically #594

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/pkg/apis/v1alpha1/managers/managerfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ func (c *SymphonyManagerFactory) CreateManager(config cm.ManagerConfig) (cm.IMan
manager = &sites.SitesManager{}
case "managers.symphony.staging":
manager = &staging.StagingManager{}
case "managers.symphony.summarycleanup":
manager = &solution.SummaryCleanupManager{}
case "managers.symphony.sync":
manager = &sync.SyncManager{}
case "managers.symphony.models":
Expand Down
194 changes: 20 additions & 174 deletions api/pkg/apis/v1alpha1/managers/solution/solution-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
config "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/config"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/keylock"
secret "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/secret"
states "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states"
"github.com/eclipse-symphony/symphony/coa/pkg/logger"
)

Expand All @@ -56,9 +55,8 @@ const (
)

type SolutionManager struct {
managers.Manager
SummaryManager
TargetProviders map[string]tgt.ITargetProvider
StateProvider states.IStateProvider
ConfigProvider config.IExtConfigProvider
SecretProvider secret.ISecretProvider
KeyLockProvider keylock.IKeyLockProvider
Expand All @@ -73,7 +71,7 @@ type SolutionManagerDeploymentState struct {
}

func (s *SolutionManager) Init(context *contexts.VendorContext, config managers.ManagerConfig, providers map[string]providers.IProvider) error {
err := s.Manager.Init(context, config, providers)
err := s.SummaryManager.Init(context, config, providers)
if err != nil {
return err
}
Expand All @@ -91,13 +89,6 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers.
return err
}

stateprovider, err := managers.GetPersistentStateProvider(config, providers)
if err == nil {
s.StateProvider = stateprovider
} else {
return err
}

configProvider, err := managers.GetExtConfigProvider(config, providers)
if err == nil {
s.ConfigProvider = configProvider
Expand Down Expand Up @@ -150,109 +141,26 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers.
return nil
}

func (s *SolutionManager) getPreviousState(ctx context.Context, instance string, namespace string) *SolutionManagerDeploymentState {
state, err := s.StateProvider.Get(ctx, states.GetRequest{
ID: instance,
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.SolutionGroup,
"version": "v1",
"resource": DeploymentState,
},
})
if err == nil {
var managerState SolutionManagerDeploymentState
jData, _ := json.Marshal(state.Body)
err = json.Unmarshal(jData, &managerState)
if err == nil {
return &managerState
}
}
log.InfofCtx(ctx, " M (Solution): failed to get previous state for instance %s in namespace %s: %+v", instance, namespace, err)
return nil
}
func (s *SolutionManager) GetSummary(ctx context.Context, summaryId string, name string, namespace string) (model.SummaryResult, error) {
// lock.Lock()
// defer lock.Unlock()

ctx, span := observability.StartSpan("Solution Manager", ctx, &map[string]string{
"method": "GetSummary",
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, " M (Solution): get summary, name: %s, summaryId: %s, namespace: %s", name, summaryId, namespace)

var state states.StateEntry
state, err = s.StateProvider.Get(ctx, states.GetRequest{
ID: fmt.Sprintf("%s-%s", "summary", summaryId),
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.SolutionGroup,
"version": "v1",
"resource": Summary,
},
})
if err != nil && api_utils.IsNotFound(err) {
// if get summary by guid not found, try to get the summary by name
log.InfofCtx(ctx, " M (Solution): failed to get deployment summary[%s] by summaryId, error: %+v. Try to get summary by name", summaryId, err)
state, err = s.StateProvider.Get(ctx, states.GetRequest{
ID: fmt.Sprintf("%s-%s", "summary", name),
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.SolutionGroup,
"version": "v1",
"resource": Summary,
},
})
}
if err != nil {
log.ErrorfCtx(ctx, " M (Solution): failed to get deployment summary[%s]: %+v", summaryId, err)
return model.SummaryResult{}, err
}

var result model.SummaryResult
jData, _ := json.Marshal(state.Body)
err = json.Unmarshal(jData, &result)
if err != nil {
log.ErrorfCtx(ctx, " M (Solution): failed to deserailze deployment summary[%s]: %+v", summaryId, err)
return model.SummaryResult{}, err
}

return result, nil
return s.SummaryManager.GetSummary(ctx, fmt.Sprintf("%s-%s", "summary", summaryId), name, namespace)
}

func (s *SolutionManager) DeleteSummary(ctx context.Context, key string, namespace string) error {
ctx, span := observability.StartSpan("Solution Manager", ctx, &map[string]string{
"method": "DeleteSummary",
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
defer observ_utils.EmitUserDiagnosticsLogs(ctx, &err)

log.InfofCtx(ctx, " M (Solution): delete summary, key: %s, namespace: %s", key, namespace)

err = s.StateProvider.Delete(ctx, states.DeleteRequest{
ID: fmt.Sprintf("%s-%s", "summary", key),
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.SolutionGroup,
"version": "v1",
"resource": Summary,
},
})

func (s *SolutionManager) DeleteSummary(ctx context.Context, summaryId string, namespace string) error {
// Slient side delete summary is soft delete: will only add a deleted flag.
result, err := s.SummaryManager.GetSummary(ctx, fmt.Sprintf("%s-%s", "summary", summaryId), "", namespace)
if err != nil {
if api_utils.IsNotFound(err) {
log.DebugfCtx(ctx, " M (Solution): DeleteSummary NoutFound, id: %s, namespace: %s", key, namespace)
return nil
}
log.ErrorfCtx(ctx, " M (Solution): failed to get summary[%s]: %+v", key, err)
return err
}

return nil
result.Summary.Removed = true
return s.SummaryManager.UpsertSummary(ctx,
fmt.Sprintf("%s-%s", "summary", summaryId),
result.Generation,
result.DeploymentHash,
result.Summary,
result.State,
namespace,
true, // upsert softDelete flag to summary.
)
}

func (s *SolutionManager) sendHeartbeat(ctx context.Context, id string, namespace string, remove bool, stopCh chan struct{}) {
Expand Down Expand Up @@ -397,7 +305,7 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy
}
}

previousDesiredState := s.getPreviousState(ctx, deployment.Instance.ObjectMeta.Name, namespace)
previousDesiredState := s.GetDeploymentState(ctx, deployment.Instance.ObjectMeta.Name, namespace)

var currentDesiredState, currentState model.DeploymentState
currentDesiredState, err = NewDeploymentState(deployment)
Expand Down Expand Up @@ -593,31 +501,9 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy
if !deployment.IsDryRun {
if len(mergedState.TargetComponent) == 0 && remove {
log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state")
s.StateProvider.Delete(ctx, states.DeleteRequest{
ID: deployment.Instance.ObjectMeta.Name,
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.SolutionGroup,
"version": "v1",
"resource": DeploymentState,
},
})
s.DeleteDeploymentState(ctx, deployment.Instance.ObjectMeta.Name, namespace)
} else {
s.StateProvider.Upsert(ctx, states.UpsertRequest{
Value: states.StateEntry{
ID: deployment.Instance.ObjectMeta.Name,
Body: SolutionManagerDeploymentState{
Spec: deployment,
State: mergedState,
},
},
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.SolutionGroup,
"version": "v1",
"resource": DeploymentState,
},
})
s.UpsertDeploymentState(ctx, deployment.Instance.ObjectMeta.Name, namespace, deployment, mergedState)
}
}

Expand Down Expand Up @@ -658,47 +544,7 @@ func (s *SolutionManager) saveSummary(ctx context.Context, objectName string, su
// TODO: delete this state when time expires. This should probably be invoked by the vendor (via GetSummary method, for instance)
log.DebugfCtx(ctx, " M (Solution): saving summary, objectName: %s, summaryId: %s, state: %v, namespace: %s, jobid: %s, hash %s, targetCount %d, successCount %d",
objectName, summaryId, state, namespace, summary.JobID, hash, summary.TargetCount, summary.SuccessCount)
oldSummary, err := s.GetSummary(ctx, summaryId, objectName, namespace)
if err != nil && !v1alpha2.IsNotFound(err) {
log.ErrorfCtx(ctx, " M (Solution): failed to get previous summary: %+v", err)
return err
} else if err == nil {
if summary.JobID != "" && oldSummary.Summary.JobID != "" {
var newId, oldId int64
newId, err = strconv.ParseInt(summary.JobID, 10, 64)
if err != nil {
log.ErrorfCtx(ctx, " M (Solution): failed to parse new job id: %+v", err)
return v1alpha2.NewCOAError(err, "failed to parse new job id", v1alpha2.BadRequest)
}
oldId, err = strconv.ParseInt(oldSummary.Summary.JobID, 10, 64)
if err == nil && oldId > newId {
errMsg := fmt.Sprintf("old job id %d is greater than new job id %d", oldId, newId)
log.ErrorfCtx(ctx, " M (Solution): %s", errMsg)
return v1alpha2.NewCOAError(err, errMsg, v1alpha2.BadRequest)
}
} else {
log.WarnfCtx(ctx, " M (Solution): JobIDs are both empty, skip id check")
}
}
_, err = s.StateProvider.Upsert(ctx, states.UpsertRequest{
Value: states.StateEntry{
ID: fmt.Sprintf("%s-%s", "summary", summaryId),
Body: model.SummaryResult{
Summary: summary,
Generation: generation,
Time: time.Now().UTC(),
State: state,
DeploymentHash: hash,
},
},
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.SolutionGroup,
"version": "v1",
"resource": Summary,
},
})
return err
return s.SummaryManager.UpsertSummary(ctx, fmt.Sprintf("%s-%s", "summary", summaryId), generation, hash, summary, state, namespace, false)
}

func (s *SolutionManager) saveSummaryProgress(ctx context.Context, objectName string, summaryId string, generation string, hash string, summary model.SummarySpec, namespace string) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ func TestMockGet(t *testing.T) {
TargetProviders: map[string]target.ITargetProvider{
"mock": targetProvider,
},
StateProvider: stateProvider,
SummaryManager: SummaryManager{
StateProvider: stateProvider,
},
KeyLockProvider: keyLockProvider,
}
state, components, err := manager.Get(context.Background(), deployment, "")
Expand Down Expand Up @@ -404,8 +406,9 @@ func TestMockGet(t *testing.T) {
// Test summary deletion
err = manager.DeleteSummary(context.Background(), summaryKey, "default")
assert.Nil(t, err)
_, err = manager.GetSummary(context.Background(), summaryKey, name, "default")
assert.NotNil(t, err)
result, err := manager.GetSummary(context.Background(), summaryKey, name, "default")
assert.Nil(t, err)
assert.True(t, result.Summary.Removed, "Summary should have set the removed flag")
}
func TestMockGetTwoTargets(t *testing.T) {
id := uuid.New().String()
Expand Down Expand Up @@ -482,7 +485,9 @@ func TestMockGetTwoTargets(t *testing.T) {
TargetProviders: map[string]target.ITargetProvider{
"mock": targetProvider,
},
StateProvider: stateProvider,
SummaryManager: SummaryManager{
StateProvider: stateProvider,
},
KeyLockProvider: keyLockProvider,
}
state, components, err := manager.Get(context.Background(), deployment, "")
Expand Down Expand Up @@ -580,7 +585,9 @@ func TestMockGetTwoTargetsTwoProviders(t *testing.T) {
"mock1": targetProvider,
"mock2": targetProvider,
},
StateProvider: stateProvider,
SummaryManager: SummaryManager{
StateProvider: stateProvider,
},
KeyLockProvider: keyLockProvider,
}
state, components, err := manager.Get(context.Background(), deployment, "")
Expand Down Expand Up @@ -656,7 +663,9 @@ func TestMockApply(t *testing.T) {
TargetProviders: map[string]target.ITargetProvider{
"mock": targetProvider,
},
StateProvider: stateProvider,
SummaryManager: SummaryManager{
StateProvider: stateProvider,
},
KeyLockProvider: keyLockProvider,
}
summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "")
Expand Down Expand Up @@ -722,7 +731,9 @@ func TestMockApplyMultiRoles(t *testing.T) {
"mock": targetProvider,
"mock2": targetProvider2,
},
StateProvider: stateProvider,
SummaryManager: SummaryManager{
StateProvider: stateProvider,
},
KeyLockProvider: keyLockProvider,
}
summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "")
Expand Down Expand Up @@ -781,7 +792,9 @@ func TestMockApplyWithUpdateAndRemove(t *testing.T) {
TargetProviders: map[string]target.ITargetProvider{
"mock": targetProvider,
},
StateProvider: stateProvider,
SummaryManager: SummaryManager{
StateProvider: stateProvider,
},
KeyLockProvider: keyLockProvider,
}
summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "")
Expand Down Expand Up @@ -835,7 +848,9 @@ func TestMockApplyWithError(t *testing.T) {
TargetProviders: map[string]target.ITargetProvider{
"mock": targetProvider,
},
StateProvider: stateProvider,
SummaryManager: SummaryManager{
StateProvider: stateProvider,
},
KeyLockProvider: keyLockProvider,
}
summary, err := manager.Reconcile(context.Background(), deployment, false, "default", "")
Expand Down
Loading
Loading