Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 90 additions & 11 deletions api/pkg/apis/v1alpha1/managers/campaigns/campaigns-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2"
Expand All @@ -19,8 +21,11 @@ import (
observ_utils "github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/observability/utils"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers"
"github.com/eclipse-symphony/symphony/coa/pkg/apis/v1alpha2/providers/states"
"github.com/eclipse-symphony/symphony/coa/pkg/logger"
)

var log = logger.NewLogger("coa.runtime")

type CampaignsManager struct {
managers.Manager
StateProvider states.IStateProvider
Expand All @@ -47,6 +52,7 @@ func (m *CampaignsManager) GetState(ctx context.Context, name string, namespace
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Campaigns): GetState, name: %s, namespace: %s, traceId: %s", name, namespace, span.SpanContext().TraceID().String())

getRequest := states.GetRequest{
ID: name,
Expand Down Expand Up @@ -90,12 +96,38 @@ func (m *CampaignsManager) UpsertState(ctx context.Context, name string, state m
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Campaigns): UpsertState, name %s, traceId: %s", name, span.SpanContext().TraceID().String())

if state.ObjectMeta.Name != "" && state.ObjectMeta.Name != name {
return v1alpha2.NewCOAError(nil, fmt.Sprintf("Name in metadata (%s) does not match name in request (%s)", state.ObjectMeta.Name, name), v1alpha2.BadRequest)
}
state.ObjectMeta.FixNames(name)

var rootResource string
var version string
var refreshLabels bool
if state.Spec.Version != "" {
version = state.Spec.Version
}
if state.Spec.RootResource == "" && version != "" {
suffix := "-" + version
rootResource = strings.TrimSuffix(name, suffix)
} else {
rootResource = state.Spec.RootResource
}

if state.ObjectMeta.Labels == nil {
state.ObjectMeta.Labels = make(map[string]string)
}
_, versionLabelExists := state.ObjectMeta.Labels["version"]
_, rootLabelExists := state.ObjectMeta.Labels["rootResource"]
if (!versionLabelExists || !rootLabelExists) && version != "" && rootResource != "" {
state.ObjectMeta.Labels["rootResource"] = rootResource
state.ObjectMeta.Labels["version"] = version
refreshLabels = true
}
log.Infof(" M (Campaigns): UpsertState, version %v, rootResource: %v, versionLabelExists: %v, rootLabelExists: %v", version, rootResource, versionLabelExists, rootLabelExists)

upsertRequest := states.UpsertRequest{
Value: states.StateEntry{
ID: name,
Expand All @@ -107,11 +139,13 @@ func (m *CampaignsManager) UpsertState(ctx context.Context, name string, state m
},
},
Metadata: map[string]interface{}{
"namespace": state.ObjectMeta.Namespace,
"group": model.WorkflowGroup,
"version": "v1",
"resource": "campaigns",
"kind": "Campaign",
"namespace": state.ObjectMeta.Namespace,
"group": model.WorkflowGroup,
"version": "v1",
"resource": "campaigns",
"kind": "Campaign",
"rootResource": rootResource,
"refreshLabels": strconv.FormatBool(refreshLabels),
},
}

Expand All @@ -126,14 +160,28 @@ func (m *CampaignsManager) DeleteState(ctx context.Context, name string, namespa
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)

var rootResource string
var version string
var id string
parts := strings.Split(name, ":")
if len(parts) == 2 {
rootResource = parts[0]
version = parts[1]
id = rootResource + "-" + version
} else {
id = name
}
log.Infof(" M (Campaigns): DeleteState, id: %v, namespace: %v, rootResource: %v, version: %v, traceId: %s", id, namespace, version, span.SpanContext().TraceID().String())

err = m.StateProvider.Delete(ctx, states.DeleteRequest{
ID: name,
ID: id,
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.WorkflowGroup,
"version": "v1",
"resource": "campaigns",
"kind": "Campaign",
"namespace": namespace,
"group": model.WorkflowGroup,
"version": "v1",
"resource": "campaigns",
"kind": "Campaign",
"rootResource": rootResource,
},
})
return err
Expand All @@ -145,6 +193,7 @@ func (t *CampaignsManager) ListState(ctx context.Context, namespace string) ([]m
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Campaigns): ListState, namespace: %s, traceId: %s", namespace, span.SpanContext().TraceID().String())

listRequest := states.ListRequest{
Metadata: map[string]interface{}{
Expand All @@ -171,3 +220,33 @@ func (t *CampaignsManager) ListState(ctx context.Context, namespace string) ([]m
}
return ret, nil
}

func (t *CampaignsManager) GetLatestState(ctx context.Context, id string, namespace string) (model.CampaignState, error) {
ctx, span := observability.StartSpan("Campaigns Manager", ctx, &map[string]string{
"method": "GetLatest",
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Campaigns): GetLatestState, id: %s, namespace: %s, traceId: %s", id, namespace, span.SpanContext().TraceID().String())

getRequest := states.GetRequest{
ID: id,
Metadata: map[string]interface{}{
"version": "v1",
"group": model.WorkflowGroup,
"resource": "campaigns",
"namespace": namespace,
"kind": "Campaign",
},
}
entry, err := t.StateProvider.GetLatest(ctx, getRequest)
if err != nil {
return model.CampaignState{}, err
}

ret, err := getCampaignState(entry.Body)
if err != nil {
return model.CampaignState{}, err
}
return ret, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestCreateGetDeleteCampaignSpec(t *testing.T) {
manager := CampaignsManager{
StateProvider: stateProvider,
}
err := manager.UpsertState(context.Background(), "test", model.CampaignState{})
err := manager.UpsertState(context.Background(), "test", model.CampaignState{Spec: &model.CampaignSpec{}})
assert.Nil(t, err)
spec, err := manager.GetState(context.Background(), "test", "default")
assert.Nil(t, err)
Expand Down
104 changes: 91 additions & 13 deletions api/pkg/apis/v1alpha1/managers/catalogs/catalogs-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model"
"github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers/graph"
Expand Down Expand Up @@ -58,6 +60,7 @@ func (s *CatalogsManager) GetState(ctx context.Context, name string, namespace s
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Catalogs): GetState, name: %s, namespace: %s, traceId: %s", name, namespace, span.SpanContext().TraceID().String())

getRequest := states.GetRequest{
ID: name,
Expand All @@ -82,6 +85,36 @@ func (s *CatalogsManager) GetState(ctx context.Context, name string, namespace s
return ret, nil
}

func (t *CatalogsManager) GetLatestState(ctx context.Context, id string, namespace string) (model.CatalogState, error) {
ctx, span := observability.StartSpan("Catalogs Manager", ctx, &map[string]string{
"method": "GetLatest",
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Catalogs): GetLatestState, id: %v, namespace: %v, traceId: %s", id, namespace, span.SpanContext().TraceID().String())

getRequest := states.GetRequest{
ID: id,
Metadata: map[string]interface{}{
"version": "v1",
"group": model.FederationGroup,
"resource": "catalogs",
"namespace": namespace,
"kind": "Catalog",
},
}
entry, err := t.StateProvider.GetLatest(ctx, getRequest)
if err != nil {
return model.CatalogState{}, err
}

ret, err := getCatalogState(entry.Body, entry.ETag)
if err != nil {
return model.CatalogState{}, err
}
return ret, nil
}

func getCatalogState(body interface{}, etag string) (model.CatalogState, error) {
var catalogState model.CatalogState
bytes, _ := json.Marshal(body)
Expand Down Expand Up @@ -136,6 +169,7 @@ func (m *CatalogsManager) UpsertState(ctx context.Context, name string, state mo
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Catalogs): UpsertState, name %s, traceId: %s", name, span.SpanContext().TraceID().String())

if state.ObjectMeta.Name != "" && state.ObjectMeta.Name != name {
return v1alpha2.NewCOAError(nil, fmt.Sprintf("Name in metadata (%s) does not match name in request (%s)", state.ObjectMeta.Name, name), v1alpha2.BadRequest)
Expand All @@ -152,6 +186,32 @@ func (m *CatalogsManager) UpsertState(ctx context.Context, name string, state mo
return err
}

var rootResource string
var version string
var refreshLabels bool
if state.Spec.Version != "" {
version = state.Spec.Version
}
if state.Spec.RootResource == "" && version != "" {
suffix := "-" + version
rootResource = strings.TrimSuffix(name, suffix)
} else {
rootResource = state.Spec.RootResource
}

if state.ObjectMeta.Labels == nil {
state.ObjectMeta.Labels = make(map[string]string)
}

_, versionLabelExists := state.ObjectMeta.Labels["version"]
_, rootLabelExists := state.ObjectMeta.Labels["rootResource"]
if (!versionLabelExists || !rootLabelExists) && version != "" && rootResource != "" {
state.ObjectMeta.Labels["rootResource"] = rootResource
state.ObjectMeta.Labels["version"] = version
refreshLabels = true
}
log.Infof(" M (Catalogs): UpsertState, version %v, rootResource: %v, versionLabelExists: %v, rootLabelExists: %v", version, rootResource, versionLabelExists, rootLabelExists)

upsertRequest := states.UpsertRequest{
Value: states.StateEntry{
ID: name,
Expand All @@ -163,13 +223,16 @@ func (m *CatalogsManager) UpsertState(ctx context.Context, name string, state mo
},
},
Metadata: map[string]interface{}{
"namespace": state.ObjectMeta.Namespace,
"group": model.FederationGroup,
"version": "v1",
"resource": "catalogs",
"kind": "Catalog",
"namespace": state.ObjectMeta.Namespace,
"group": model.FederationGroup,
"version": "v1",
"resource": "catalogs",
"kind": "Catalog",
"rootResource": rootResource,
"refreshLabels": strconv.FormatBool(refreshLabels),
},
}

_, err = m.StateProvider.Upsert(ctx, upsertRequest)
if err != nil {
return err
Expand All @@ -194,15 +257,29 @@ func (m *CatalogsManager) DeleteState(ctx context.Context, name string, namespac
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)

var rootResource string
var version string
var id string
parts := strings.Split(name, ":")
if len(parts) == 2 {
rootResource = parts[0]
version = parts[1]
id = rootResource + "-" + version
} else {
id = name
}
log.Infof(" M (Catalogs): DeleteState, id: %v, namespace: %v, rootResource: %v, version: %v, traceId: %s", id, namespace, rootResource, version, span.SpanContext().TraceID().String())

//TODO: publish DELETE event
err = m.StateProvider.Delete(ctx, states.DeleteRequest{
ID: name,
ID: id,
Metadata: map[string]interface{}{
"namespace": namespace,
"group": model.FederationGroup,
"version": "v1",
"resource": "catalogs",
"kind": "Catalog",
"namespace": namespace,
"group": model.FederationGroup,
"version": "v1",
"resource": "catalogs",
"kind": "Catalog",
"rootResource": rootResource,
},
})
return err
Expand All @@ -214,6 +291,7 @@ func (t *CatalogsManager) ListState(ctx context.Context, namespace string, filte
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Catalogs): ListState, namespace: %v, traceId: %s", namespace, span.SpanContext().TraceID().String())

listRequest := states.ListRequest{
Metadata: map[string]interface{}{
Expand Down Expand Up @@ -265,8 +343,8 @@ func (g *CatalogsManager) GetChains(ctx context.Context, filter string, namespac
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Catalogs): GetChains, filter: %v, namespace: %v, traceId: %s", filter, namespace, span.SpanContext().TraceID().String())

log.Debug(" M (Graph): GetChains")
err = g.setProviderDataIfNecessary(ctx, namespace)
if err != nil {
return nil, err
Expand All @@ -288,8 +366,8 @@ func (g *CatalogsManager) GetTrees(ctx context.Context, filter string, namespace
})
var err error = nil
defer observ_utils.CloseSpanWithError(span, &err)
log.Infof(" M (Catalogs): GetTrees, filter: %v, namespace: %v, traceId: %s", filter, namespace, span.SpanContext().TraceID().String())

log.Debug(" M (Graph): GetTrees")
err = g.setProviderDataIfNecessary(ctx, namespace)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
var manager CatalogsManager
var catalogState = model.CatalogState{
ObjectMeta: model.ObjectMeta{
Name: "name1",
Name: "name1-v1",
},
Spec: &model.CatalogSpec{
Type: "catalog",
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestUpsertAndGet(t *testing.T) {
err := json.Unmarshal(jData, &job)
assert.Nil(t, err)
assert.Equal(t, "catalog", event.Metadata["objectType"])
assert.Equal(t, "name1", job.Id)
assert.Equal(t, "name1-v1", job.Id)
assert.Equal(t, true, job.Action == v1alpha2.JobUpdate || job.Action == v1alpha2.JobDelete)
return nil
})
Expand Down
Loading