From e467e977c6bc23bf4941017685c807f60e23be60 Mon Sep 17 00:00:00 2001 From: Serghei Anicheev Date: Wed, 27 Aug 2025 09:11:24 +1000 Subject: [PATCH] feat: Adding new role type - mixed This PR includes following changes: - Adding new role type: mixed. In case pipline has a mixed source (valid for both agent and aggregator). - New skipPrefix flag on vectorpipelines and clustervectorpipelines. If specified will skip adding a prefix to a pipeline name(namespace + object name). Required when using wildcard in input references from different pipeline. - Applying linter fixes which were missed before --- api/v1alpha1/clustervectorpipeline.go | 5 ++++ api/v1alpha1/vectorpipeline.go | 6 ++++ api/v1alpha1/vectorpipeline_types.go | 4 +++ ...ity.kaasops.io_clustervectorpipelines.yaml | 4 +++ ...ervability.kaasops.io_vectorpipelines.yaml | 4 +++ internal/config/agent.go | 11 ++++---- internal/config/aggregator.go | 17 +++++------ internal/config/config.go | 16 +++++++---- internal/config/utils.go | 5 +++- internal/config/vector_source_types.go | 11 ++++++++ internal/controller/pipeline_controller.go | 2 +- internal/pipeline/pipeline.go | 6 ++-- internal/utils/k8s/label.go | 28 +++++++++---------- .../vectoragent/vectoragent_controller.go | 1 - 14 files changed, 83 insertions(+), 37 deletions(-) diff --git a/api/v1alpha1/clustervectorpipeline.go b/api/v1alpha1/clustervectorpipeline.go index da07cbe..eca1390 100644 --- a/api/v1alpha1/clustervectorpipeline.go +++ b/api/v1alpha1/clustervectorpipeline.go @@ -2,6 +2,7 @@ package v1alpha1 import ( "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kaasops/vector-operator/internal/utils/k8s" @@ -61,3 +62,7 @@ func (vp *ClusterVectorPipeline) SetRole(role *VectorPipelineRole) { func (vp *ClusterVectorPipeline) GetTypeMeta() metav1.TypeMeta { return vp.TypeMeta } + +func (vp *ClusterVectorPipeline) SkipPrefix() bool { + return vp.GetSpec().SkipPrefix +} diff --git a/api/v1alpha1/vectorpipeline.go b/api/v1alpha1/vectorpipeline.go index 5fa963c..f911180 100644 --- a/api/v1alpha1/vectorpipeline.go +++ b/api/v1alpha1/vectorpipeline.go @@ -2,6 +2,7 @@ package v1alpha1 import ( "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kaasops/vector-operator/internal/utils/k8s" @@ -14,6 +15,7 @@ const ( VectorPipelineRoleUnknown VectorPipelineRole = "unknown" VectorPipelineRoleAgent VectorPipelineRole = "agent" VectorPipelineRoleAggregator VectorPipelineRole = "aggregator" + VectorPipelineRoleMixed VectorPipelineRole = "mixed" ) func (vp *VectorPipeline) GetSpec() VectorPipelineSpec { @@ -69,3 +71,7 @@ func (vp *VectorPipeline) SetRole(role *VectorPipelineRole) { func (vp *VectorPipeline) GetTypeMeta() metav1.TypeMeta { return vp.TypeMeta } + +func (vp *VectorPipeline) SkipPrefix() bool { + return vp.GetSpec().SkipPrefix +} diff --git a/api/v1alpha1/vectorpipeline_types.go b/api/v1alpha1/vectorpipeline_types.go index f036653..a529afc 100644 --- a/api/v1alpha1/vectorpipeline_types.go +++ b/api/v1alpha1/vectorpipeline_types.go @@ -26,6 +26,10 @@ import ( // VectorPipelineSpec defines the desired state of VectorPipeline type VectorPipelineSpec struct { + // Controls whether prefix logic should be applied + // +optional + // +kubebuilder:default:=false + SkipPrefix bool `json:"skipPrefix,omitempty"` // +kubebuilder:pruning:PreserveUnknownFields Sources *runtime.RawExtension `json:"sources,omitempty"` // +kubebuilder:pruning:PreserveUnknownFields diff --git a/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml b/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml index 8515663..c1f0522 100644 --- a/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml +++ b/config/crd/bases/observability.kaasops.io_clustervectorpipelines.yaml @@ -55,6 +55,10 @@ spec: sinks: type: object x-kubernetes-preserve-unknown-fields: true + skipPrefix: + default: false + description: Controls whether prefix logic should be applied + type: boolean sources: type: object x-kubernetes-preserve-unknown-fields: true diff --git a/config/crd/bases/observability.kaasops.io_vectorpipelines.yaml b/config/crd/bases/observability.kaasops.io_vectorpipelines.yaml index 9814ffc..6a313ea 100644 --- a/config/crd/bases/observability.kaasops.io_vectorpipelines.yaml +++ b/config/crd/bases/observability.kaasops.io_vectorpipelines.yaml @@ -56,6 +56,10 @@ spec: sinks: type: object x-kubernetes-preserve-unknown-fields: true + skipPrefix: + default: false + description: Controls whether prefix logic should be applied + type: boolean sources: type: object x-kubernetes-preserve-unknown-fields: true diff --git a/internal/config/agent.go b/internal/config/agent.go index e98afb4..443409b 100644 --- a/internal/config/agent.go +++ b/internal/config/agent.go @@ -2,6 +2,7 @@ package config import ( "fmt" + vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1" "github.com/kaasops/vector-operator/internal/pipeline" "github.com/kaasops/vector-operator/internal/utils/k8s" @@ -62,20 +63,20 @@ func buildAgentConfig(params VectorConfigParams, pipelines ...pipeline.Pipeline) if v.Type == KubernetesLogsType && params.UseApiServerCache { v.UseApiServerCache = true } - v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k) + v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix()) cfg.Sources[v.Name] = v } for k, v := range p.Transforms { - v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k) + v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix()) for i, inputName := range v.Inputs { - v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName) + v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix()) } cfg.Transforms[v.Name] = v } for k, v := range p.Sinks { - v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k) + v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix()) for i, inputName := range v.Inputs { - v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName) + v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix()) } cfg.Sinks[v.Name] = v } diff --git a/internal/config/aggregator.go b/internal/config/aggregator.go index ec46916..35f4245 100644 --- a/internal/config/aggregator.go +++ b/internal/config/aggregator.go @@ -3,13 +3,14 @@ package config import ( "errors" "fmt" + "net" + "strconv" + "strings" + "github.com/kaasops/vector-operator/internal/common" "github.com/kaasops/vector-operator/internal/pipeline" "github.com/stoewer/go-strcase" corev1 "k8s.io/api/core/v1" - "net" - "strconv" - "strings" ) func BuildAggregatorConfig(params VectorConfigParams, pipelines ...pipeline.Pipeline) (*VectorConfig, error) { @@ -87,20 +88,20 @@ func BuildAggregatorConfig(params VectorConfigParams, pipelines ...pipeline.Pipe } } - v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k) + v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix()) cfg.Sources[v.Name] = settings } for k, v := range p.Transforms { - v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k) + v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix()) for i, inputName := range v.Inputs { - v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName) + v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix()) } cfg.Transforms[v.Name] = v } for k, v := range p.Sinks { - v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k) + v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix()) for i, inputName := range v.Inputs { - v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName) + v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix()) } cfg.Sinks[v.Name] = v } diff --git a/internal/config/config.go b/internal/config/config.go index e51f52f..31ad713 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -20,13 +20,14 @@ import ( "encoding/json" "errors" "fmt" + "net" + "strconv" + vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1" "github.com/kaasops/vector-operator/internal/evcollector" "github.com/mitchellh/mapstructure" "gopkg.in/yaml.v2" - "net" goyaml "sigs.k8s.io/yaml" - "strconv" ) var ( @@ -118,9 +119,11 @@ func (c *PipelineConfig) VectorRole() (*vectorv1alpha1.VectorPipelineRole, error aggregatorCount := 0 for _, s := range c.Sources { switch { + case isAgentAndAggregator(s.Type): + agentCount++ + aggregatorCount++ case isAgent(s.Type): agentCount++ - fallthrough // some types can be both an agent and an aggregator at the same time case isAggregator(s.Type): aggregatorCount++ default: @@ -128,10 +131,13 @@ func (c *PipelineConfig) VectorRole() (*vectorv1alpha1.VectorPipelineRole, error } } switch { - case len(c.Sources) == agentCount: + case agentCount > 0 && aggregatorCount > 0: + role := vectorv1alpha1.VectorPipelineRoleMixed + return &role, nil + case agentCount > 0: role := vectorv1alpha1.VectorPipelineRoleAgent return &role, nil - case len(c.Sources) == aggregatorCount: + case aggregatorCount > 0: role := vectorv1alpha1.VectorPipelineRoleAggregator return &role, nil } diff --git a/internal/config/utils.go b/internal/config/utils.go index d39db2d..6206c98 100644 --- a/internal/config/utils.go +++ b/internal/config/utils.go @@ -1,6 +1,9 @@ package config -func addPrefix(Namespace, Name, componentName string) string { +func addPrefix(Namespace, Name, componentName string, skipPrefix bool) string { + if skipPrefix { + return componentName + } return generateName(Namespace, Name) + "-" + componentName } diff --git a/internal/config/vector_source_types.go b/internal/config/vector_source_types.go index aa0c21d..1eb7c93 100644 --- a/internal/config/vector_source_types.go +++ b/internal/config/vector_source_types.go @@ -101,3 +101,14 @@ func isAgent(name string) bool { _, ok := agentTypes[name] return ok } + +func isAgentAndAggregator(name string) bool { + _, isAgent := agentTypes[name] + _, isAggregator := aggregatorTypes[name] + + if isAgent && isAggregator { + return true + } + + return false +} diff --git a/internal/controller/pipeline_controller.go b/internal/controller/pipeline_controller.go index 52b5d46..11fbd46 100644 --- a/internal/controller/pipeline_controller.go +++ b/internal/controller/pipeline_controller.go @@ -135,7 +135,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c eg := errgroup.Group{} - if *pipelineVectorRole == v1alpha1.VectorPipelineRoleAgent { + if *pipelineVectorRole == v1alpha1.VectorPipelineRoleAgent || *pipelineVectorRole == v1alpha1.VectorPipelineRoleMixed { for _, vector := range vectorAgents { eg.Go(func() error { diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 559bef6..6b73caf 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -19,6 +19,7 @@ package pipeline import ( "context" "fmt" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/kaasops/vector-operator/api/v1alpha1" @@ -30,6 +31,7 @@ type Pipeline interface { GetSpec() v1alpha1.VectorPipelineSpec SetConfigCheck(bool) SetReason(*string) + SkipPrefix() bool GetLastAppliedPipeline() *uint32 SetLastAppliedPipeline(*uint32) GetConfigCheckResult() *bool @@ -78,7 +80,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP for _, vp := range vps { if !vp.IsDeleted() && vp.IsValid() && - vp.GetRole() == filter.Role && + (vp.GetRole() == filter.Role || vp.GetRole() == v1alpha1.VectorPipelineRoleMixed) && (filter.Scope == AllPipelines || vp.Namespace == filter.Namespace) && MatchLabels(matchLabels, vp.Labels) { validPipelines = append(validPipelines, vp.DeepCopy()) @@ -96,7 +98,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP for _, cvp := range cvps { if !cvp.IsDeleted() && cvp.IsValid() && - cvp.GetRole() == filter.Role && + (cvp.GetRole() == filter.Role || cvp.GetRole() == v1alpha1.VectorPipelineRoleMixed) && MatchLabels(matchLabels, cvp.Labels) { validPipelines = append(validPipelines, cvp.DeepCopy()) } diff --git a/internal/utils/k8s/label.go b/internal/utils/k8s/label.go index 72c63eb..9f75c8e 100644 --- a/internal/utils/k8s/label.go +++ b/internal/utils/k8s/label.go @@ -42,18 +42,18 @@ const ( // being merged into the destination (dst) labels. If a key exists in both maps, // the destination value is preserved. func MergeLabels(dst, src map[string]string) map[string]string { - if dst == nil { + if dst == nil { dst = make(map[string]string) - } - - if src == nil { - return dst - } - - for k, v := range src { - if _, ok := dst[k]; !ok { - dst[k] = v - } - } - return dst -} \ No newline at end of file + } + + if src == nil { + return dst + } + + for k, v := range src { + if _, ok := dst[k]; !ok { + dst[k] = v + } + } + return dst +} diff --git a/internal/vector/vectoragent/vectoragent_controller.go b/internal/vector/vectoragent/vectoragent_controller.go index 3a27e41..ef1f6cb 100644 --- a/internal/vector/vectoragent/vectoragent_controller.go +++ b/internal/vector/vectoragent/vectoragent_controller.go @@ -162,7 +162,6 @@ func (ctrl *Controller) matchLabelsForVectorAgent() map[string]string { func (ctrl *Controller) labelsForVectorAgent() map[string]string { basicLabels := ctrl.matchLabelsForVectorAgent() - labels := k8s.MergeLabels(basicLabels, ctrl.Vector.Spec.Agent.Labels) return labels