Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/v1alpha1/clustervectorpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kaasops/vector-operator/internal/utils/k8s"
Expand Down Expand Up @@ -61,3 +62,7 @@ func (vp *ClusterVectorPipeline) SetRole(role *VectorPipelineRole) {
func (vp *ClusterVectorPipeline) GetTypeMeta() metav1.TypeMeta {
return vp.TypeMeta
}

func (vp *ClusterVectorPipeline) SkipPrefix() bool {
return vp.GetSpec().SkipPrefix
}
6 changes: 6 additions & 0 deletions api/v1alpha1/vectorpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
"context"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kaasops/vector-operator/internal/utils/k8s"
Expand All @@ -14,6 +15,7 @@ const (
VectorPipelineRoleUnknown VectorPipelineRole = "unknown"
VectorPipelineRoleAgent VectorPipelineRole = "agent"
VectorPipelineRoleAggregator VectorPipelineRole = "aggregator"
VectorPipelineRoleMixed VectorPipelineRole = "mixed"
)

func (vp *VectorPipeline) GetSpec() VectorPipelineSpec {
Expand Down Expand Up @@ -69,3 +71,7 @@ func (vp *VectorPipeline) SetRole(role *VectorPipelineRole) {
func (vp *VectorPipeline) GetTypeMeta() metav1.TypeMeta {
return vp.TypeMeta
}

func (vp *VectorPipeline) SkipPrefix() bool {
return vp.GetSpec().SkipPrefix
}
4 changes: 4 additions & 0 deletions api/v1alpha1/vectorpipeline_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (

// VectorPipelineSpec defines the desired state of VectorPipeline
type VectorPipelineSpec struct {
// Controls whether prefix logic should be applied
// +optional
// +kubebuilder:default:=false
SkipPrefix bool `json:"skipPrefix,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Sources *runtime.RawExtension `json:"sources,omitempty"`
// +kubebuilder:pruning:PreserveUnknownFields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ spec:
sinks:
type: object
x-kubernetes-preserve-unknown-fields: true
skipPrefix:
default: false
description: Controls whether prefix logic should be applied
type: boolean
sources:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ spec:
sinks:
type: object
x-kubernetes-preserve-unknown-fields: true
skipPrefix:
default: false
description: Controls whether prefix logic should be applied
type: boolean
sources:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down
11 changes: 6 additions & 5 deletions internal/config/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"fmt"

vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1"
"github.com/kaasops/vector-operator/internal/pipeline"
"github.com/kaasops/vector-operator/internal/utils/k8s"
Expand Down Expand Up @@ -62,20 +63,20 @@ func buildAgentConfig(params VectorConfigParams, pipelines ...pipeline.Pipeline)
if v.Type == KubernetesLogsType && params.UseApiServerCache {
v.UseApiServerCache = true
}
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k)
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix())
cfg.Sources[v.Name] = v
}
for k, v := range p.Transforms {
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k)
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix())
for i, inputName := range v.Inputs {
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName)
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix())
}
cfg.Transforms[v.Name] = v
}
for k, v := range p.Sinks {
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k)
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix())
for i, inputName := range v.Inputs {
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName)
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix())
}
cfg.Sinks[v.Name] = v
}
Expand Down
17 changes: 9 additions & 8 deletions internal/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package config
import (
"errors"
"fmt"
"net"
"strconv"
"strings"

"github.com/kaasops/vector-operator/internal/common"
"github.com/kaasops/vector-operator/internal/pipeline"
"github.com/stoewer/go-strcase"
corev1 "k8s.io/api/core/v1"
"net"
"strconv"
"strings"
)

func BuildAggregatorConfig(params VectorConfigParams, pipelines ...pipeline.Pipeline) (*VectorConfig, error) {
Expand Down Expand Up @@ -87,20 +88,20 @@ func BuildAggregatorConfig(params VectorConfigParams, pipelines ...pipeline.Pipe
}
}

v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k)
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix())
cfg.Sources[v.Name] = settings
}
for k, v := range p.Transforms {
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k)
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix())
for i, inputName := range v.Inputs {
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName)
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix())
}
cfg.Transforms[v.Name] = v
}
for k, v := range p.Sinks {
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k)
v.Name = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), k, pipeline.SkipPrefix())
for i, inputName := range v.Inputs {
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName)
v.Inputs[i] = addPrefix(pipeline.GetNamespace(), pipeline.GetName(), inputName, pipeline.SkipPrefix())
}
cfg.Sinks[v.Name] = v
}
Expand Down
16 changes: 11 additions & 5 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"strconv"

vectorv1alpha1 "github.com/kaasops/vector-operator/api/v1alpha1"
"github.com/kaasops/vector-operator/internal/evcollector"
"github.com/mitchellh/mapstructure"
"gopkg.in/yaml.v2"
"net"
goyaml "sigs.k8s.io/yaml"
"strconv"
)

var (
Expand Down Expand Up @@ -118,20 +119,25 @@ func (c *PipelineConfig) VectorRole() (*vectorv1alpha1.VectorPipelineRole, error
aggregatorCount := 0
for _, s := range c.Sources {
switch {
case isAgentAndAggregator(s.Type):
agentCount++
aggregatorCount++
case isAgent(s.Type):
agentCount++
fallthrough // some types can be both an agent and an aggregator at the same time
case isAggregator(s.Type):
aggregatorCount++
default:
return nil, fmt.Errorf("unsupported source type: %s", s.Type)
}
}
switch {
case len(c.Sources) == agentCount:
case agentCount > 0 && aggregatorCount > 0:
role := vectorv1alpha1.VectorPipelineRoleMixed
return &role, nil
case agentCount > 0:
role := vectorv1alpha1.VectorPipelineRoleAgent
return &role, nil
case len(c.Sources) == aggregatorCount:
case aggregatorCount > 0:
role := vectorv1alpha1.VectorPipelineRoleAggregator
return &role, nil
}
Expand Down
5 changes: 4 additions & 1 deletion internal/config/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package config

func addPrefix(Namespace, Name, componentName string) string {
func addPrefix(Namespace, Name, componentName string, skipPrefix bool) string {
if skipPrefix {
return componentName
}
return generateName(Namespace, Name) + "-" + componentName
}

Expand Down
11 changes: 11 additions & 0 deletions internal/config/vector_source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,14 @@ func isAgent(name string) bool {
_, ok := agentTypes[name]
return ok
}

func isAgentAndAggregator(name string) bool {
_, isAgent := agentTypes[name]
_, isAggregator := aggregatorTypes[name]

if isAgent && isAggregator {
return true
}

return false
}
2 changes: 1 addition & 1 deletion internal/controller/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

eg := errgroup.Group{}

if *pipelineVectorRole == v1alpha1.VectorPipelineRoleAgent {
if *pipelineVectorRole == v1alpha1.VectorPipelineRoleAgent || *pipelineVectorRole == v1alpha1.VectorPipelineRoleMixed {

for _, vector := range vectorAgents {
eg.Go(func() error {
Expand Down
6 changes: 4 additions & 2 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipeline
import (
"context"
"fmt"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kaasops/vector-operator/api/v1alpha1"
Expand All @@ -30,6 +31,7 @@ type Pipeline interface {
GetSpec() v1alpha1.VectorPipelineSpec
SetConfigCheck(bool)
SetReason(*string)
SkipPrefix() bool
GetLastAppliedPipeline() *uint32
SetLastAppliedPipeline(*uint32)
GetConfigCheckResult() *bool
Expand Down Expand Up @@ -78,7 +80,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP
for _, vp := range vps {
if !vp.IsDeleted() &&
vp.IsValid() &&
vp.GetRole() == filter.Role &&
(vp.GetRole() == filter.Role || vp.GetRole() == v1alpha1.VectorPipelineRoleMixed) &&
(filter.Scope == AllPipelines || vp.Namespace == filter.Namespace) &&
MatchLabels(matchLabels, vp.Labels) {
validPipelines = append(validPipelines, vp.DeepCopy())
Expand All @@ -96,7 +98,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP
for _, cvp := range cvps {
if !cvp.IsDeleted() &&
cvp.IsValid() &&
cvp.GetRole() == filter.Role &&
(cvp.GetRole() == filter.Role || cvp.GetRole() == v1alpha1.VectorPipelineRoleMixed) &&
MatchLabels(matchLabels, cvp.Labels) {
validPipelines = append(validPipelines, cvp.DeepCopy())
}
Expand Down
28 changes: 14 additions & 14 deletions internal/utils/k8s/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,18 @@ const (
// being merged into the destination (dst) labels. If a key exists in both maps,
// the destination value is preserved.
func MergeLabels(dst, src map[string]string) map[string]string {
if dst == nil {
if dst == nil {
dst = make(map[string]string)
}

if src == nil {
return dst
}

for k, v := range src {
if _, ok := dst[k]; !ok {
dst[k] = v
}
}
return dst
}
}

if src == nil {
return dst
}

for k, v := range src {
if _, ok := dst[k]; !ok {
dst[k] = v
}
}
return dst
}
1 change: 0 additions & 1 deletion internal/vector/vectoragent/vectoragent_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func (ctrl *Controller) matchLabelsForVectorAgent() map[string]string {
func (ctrl *Controller) labelsForVectorAgent() map[string]string {
basicLabels := ctrl.matchLabelsForVectorAgent()


labels := k8s.MergeLabels(basicLabels, ctrl.Vector.Spec.Agent.Labels)

return labels
Expand Down