Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/NifiConnection] Implementation of NifiConnection controller #168

Closed
wants to merge 62 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
71b9ff7
Add NifiConnection resource skeleton
juldrixx Jun 29, 2022
fa45165
Comment line
juldrixx Jun 29, 2022
c358332
Manage GetDataflowComponentInformation
juldrixx Jun 30, 2022
3aed6d6
Add cluster verification
juldrixx Jun 30, 2022
9ed7408
Add cluster connection
juldrixx Jun 30, 2022
a5aaef6
Add connection configuration
juldrixx Jul 1, 2022
2575348
Add finalizer, status management and start label management
juldrixx Jul 25, 2022
954c387
Add stop-component label management
juldrixx Jul 27, 2022
b3b56d8
Update port stopping feature
juldrixx Aug 1, 2022
003b8b3
Add function to check if connection exists
juldrixx Aug 1, 2022
9858215
Add annotations to store last nificluster applied
juldrixx Aug 20, 2022
45cca04
-
juldrixx Aug 25, 2022
65dee6b
Rebase on master
juldrixx Aug 25, 2022
06df7fc
Fix issues following rebase
juldrixx Aug 25, 2022
f7451e0
Add capability to identify if an input connection is active
juldrixx Sep 16, 2022
ee079b0
Add capability to update the connection configuration
juldrixx Sep 16, 2022
99b6750
Add update connection destination
juldrixx Sep 16, 2022
4669b88
[WIP] Add update connection destination && update connection source
juldrixx Sep 16, 2022
5fd01cb
WIP update source and destination
juldrixx Sep 17, 2022
2e71ad2
Merge branch 'master' into feature/nificonnection_resource
juldrixx Sep 17, 2022
1b36eff
Merge branch 'master' into feature/nificonnection_resource
juldrixx Sep 17, 2022
1f90a39
Upgrade nigoapi & Add labelIndex & Manage update source/destination i…
juldrixx Sep 17, 2022
820bd44
Remove old code and fix state
juldrixx Sep 17, 2022
a6616e9
Add drop queue flowfiles if drop strategy on connection and dataflow
juldrixx Sep 18, 2022
703587b
Unify kubebuilder validation
juldrixx Sep 18, 2022
4c2f384
Fix DropRequest management
juldrixx Sep 18, 2022
b87082f
Add finilizer deleting
juldrixx Sep 18, 2022
8eb7834
Save clusterRef after creation
juldrixx Sep 19, 2022
d04ae35
Unify finalizer label management
juldrixx Sep 19, 2022
e2eff09
Manage clusterRef changment connection & Modify all update to patch i…
juldrixx Sep 19, 2022
4dec565
Add abilty to force start/stop of dataflow & Update connection deleti…
juldrixx Sep 20, 2022
c3ca1b9
Add comments
juldrixx Sep 21, 2022
b4012a5
Remove debug modifications
juldrixx Sep 21, 2022
f59cd28
Add test getConnection
juldrixx Sep 21, 2022
a658221
Add unit tests for connection
juldrixx Sep 21, 2022
c9575c7
Patch connection creation & add units tests for input-port/output-port
juldrixx Sep 21, 2022
292a100
Add NifiConnection documentation
juldrixx Sep 21, 2022
0259d4d
Update plugin and documentation on it
juldrixx Sep 21, 2022
33062aa
Regenerate helm charts and crds
juldrixx Sep 21, 2022
1ba3caf
Update changelog
juldrixx Sep 21, 2022
768e7b9
Fix changelog
juldrixx Sep 21, 2022
8245911
Fix plugin doc
juldrixx Sep 22, 2022
0dc9b44
Add unit tests and remove license on plugin
juldrixx Sep 22, 2022
2d64957
Rebase on master and fix doc
juldrixx Sep 26, 2022
d4389cf
Remove .config folder from vscode
juldrixx Sep 26, 2022
3379148
Refactoring plugin in golang
juldrixx Sep 26, 2022
34d31eb
Add NifiConnection sample
juldrixx Oct 2, 2022
f51fdb8
Update changelog
juldrixx Oct 2, 2022
6cff180
Add NifiConnection RBAC in Helm Chart
juldrixx Oct 2, 2022
0b9af08
Add example of NifiConnection
juldrixx Oct 3, 2022
5609fca
Change default value with kubebuilder
juldrixx Oct 16, 2022
b752508
Merge branch 'master' into feature/nificonnection_resource
juldrixx Oct 27, 2022
54635db
Rebase
juldrixx Oct 27, 2022
e3ca110
Rebase on master
juldrixx Nov 21, 2022
0dd29e7
fix/tests
juldrixx Nov 22, 2022
9630691
Merge branch 'master' into feature/nificonnection_resource
juldrixx Feb 21, 2023
6e477cd
Rebase on master
juldrixx Apr 25, 2023
3ac605f
Rebase on master
juldrixx Aug 28, 2023
5327219
Fix go.sum
juldrixx Aug 28, 2023
a6f2026
Update CHANGELOG
juldrixx Aug 28, 2023
2475d3d
Fixes for go 1.21
juldrixx Aug 28, 2023
fbd2357
Fix tests
juldrixx Aug 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ Session.vim
tags
### VisualStudioCode ###
.vscode/*
.history
.config/*
.history
__debug_bin*
# End of https://www.gitignore.io/api/go,vim,emacs,visualstudiocode

Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

### Added

- [PR #168](https://github.com/konpyutaika/nifikop/pull/168) - **[Plugin]** Implementation on NiFiKop's plugin.
- [PR #168](https://github.com/konpyutaika/nifikop/pull/168) - **[Operator/NifiConnection]** Implementation on NifiConnection controller.
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
-
### Changed

- [PR #290](https://github.com/konpyutaika/nifikop/pull/290) - **[Operator/NifiCluster]** Change default sensitive algorithm
- [PR #290](https://github.com/konpyutaika/nifikop/pull/290) - **[Operator/NifiCluster]** Change default sensitive algorithm.

### Fixed Bugs

Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,8 @@ catalog-build: opm ## Build a catalog image.
# Push the catalog image.
.PHONY: catalog-push
catalog-push: ## Push a catalog image.
$(MAKE) docker-push IMG=$(CATALOG_IMG)
$(MAKE) docker-push IMG=$(CATALOG_IMG)

.PHONY: kubectl-nifikop
kubectl-nifikop:
go build -o bin/kubectl-nifikop ./cmd/kubectl-nifikop/main.go
9 changes: 9 additions & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,13 @@ resources:
kind: NifiNodeGroupAutoscaler
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
- api:
crdVersion: v1
namespaced: true
controller: true
domain: konpyutaika.com
group: nifi
kind: NifiConnection
path: github.com/konpyutaika/nifikop/api/v1alpha1
version: v1alpha1
version: "3"
13 changes: 9 additions & 4 deletions api/v1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type DataflowState string
// DataflowUpdateRequestType defines the type of versioned flow update request
type DataflowUpdateRequestType string

// DataflowUpdateStrategy defines the type of strategy to update a flow
type DataflowUpdateStrategy string
// ComponentUpdateStrategy defines the type of strategy to update a component
// +kubebuilder:validation:Enum={"drop","drain"}
type ComponentUpdateStrategy string

// RackAwarenessState stores info about rack awareness status
type RackAwarenessState string
Expand All @@ -46,12 +47,15 @@ type ConfigurationState string
type InitClusterNode bool

// PKIBackend represents an interface implementing the PKIManager
// +kubebuilder:validation:Enum={"cert-manager","vault"}
type PKIBackend string

// ClientConfigType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"tls","basic"}
type ClientConfigType string

// ClusterType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"external","internal"}
type ClusterType string

// AccessPolicyType represents the type of access policy
Expand Down Expand Up @@ -285,9 +289,9 @@ const (

// DrainStrategy leads to shutting down only input components (Input processors, remote input process group)
// and dropping all flowfiles from the flow.
DrainStrategy DataflowUpdateStrategy = "drain"
DrainStrategy ComponentUpdateStrategy = "drain"
// DropStrategy leads to shutting down all components and dropping all flowfiles from the flow.
DropStrategy DataflowUpdateStrategy = "drop"
DropStrategy ComponentUpdateStrategy = "drop"

// UserStateCreated describes the status of a NifiUser as created
UserStateCreated UserState = "created"
Expand Down Expand Up @@ -437,6 +441,7 @@ func SecretRefsEquals(secretRefs []SecretReference) bool {
return true
}

// +kubebuilder:validation:Enum={"never","always","once"}
type DataflowSyncMode string

const (
Expand Down
3 changes: 0 additions & 3 deletions api/v1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ const (
// NifiClusterSpec defines the desired state of NifiCluster
type NifiClusterSpec struct {
// clientType defines if the operator will use basic or tls authentication to query the NiFi cluster.
// +kubebuilder:validation:Enum={"tls","basic"}
ClientType ClientConfigType `json:"clientType,omitempty"`
// type defines if the cluster is internal (i.e manager by the operator) or external.
// +kubebuilder:validation:Enum={"external","internal"}
Type ClusterType `json:"type,omitempty"`
// nodeURITemplate used to dynamically compute node uri (used if external type)
NodeURITemplate string `json:"nodeURITemplate,omitempty"`
Expand Down Expand Up @@ -415,7 +413,6 @@ type SSLSecrets struct {
// https://cert-manager.io/docs/concepts/issuer/
IssuerRef *cmmeta.ObjectReference `json:"issuerRef,omitempty"`
// TODO : add vault
// +kubebuilder:validation:Enum={"cert-manager","vault"}
PKIBackend PKIBackend `json:"pkiBackend,omitempty"`
//,"vault"
}
Expand Down
4 changes: 1 addition & 3 deletions api/v1/nifidataflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type NifiDataflowSpec struct {
// contains the reference to the ParameterContext with the one the dataflow is linked.
ParameterContextRef *ParameterContextReference `json:"parameterContextRef,omitempty"`
// if the flow will be synchronized once, continuously or never
// +kubebuilder:validation:Enum={"never","always","once"}
SyncMode *DataflowSyncMode `json:"syncMode,omitempty"`
// whether the flow is considered as ran if some controller services are still invalid or not.
SkipInvalidControllerService bool `json:"skipInvalidControllerService,omitempty"`
Expand All @@ -33,8 +32,7 @@ type NifiDataflowSpec struct {
// contains the reference to the NifiRegistry with the one the dataflow is linked.
RegistryClientRef *RegistryClientReference `json:"registryClientRef,omitempty"`
// describes the way the operator will deal with data when a dataflow will be updated : drop or drain
// +kubebuilder:validation:Enum={"drop","drain"}
UpdateStrategy DataflowUpdateStrategy `json:"updateStrategy"`
UpdateStrategy ComponentUpdateStrategy `json:"updateStrategy"`
}

type FlowPosition struct {
Expand Down
99 changes: 95 additions & 4 deletions api/v1alpha1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1alpha1
import (
"fmt"

v1 "github.com/konpyutaika/nifikop/api/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -21,11 +22,15 @@ type ClusterScalingStrategy string
// DataflowState defines the state of a NifiDataflow
type DataflowState string

// ConnectionState defines the state of a NifiConnection
type ConnectionState string

// DataflowUpdateRequestType defines the type of versioned flow update request
type DataflowUpdateRequestType string

// DataflowUpdateStrategy defines the type of strategy to update a flow
type DataflowUpdateStrategy string
// ComponentUpdateStrategy defines the type of strategy to update a component
// +kubebuilder:validation:Enum={"drop","drain"}
type ComponentUpdateStrategy string

// RackAwarenessState stores info about rack awareness status
type RackAwarenessState string
Expand All @@ -46,12 +51,15 @@ type ConfigurationState string
type InitClusterNode bool

// PKIBackend represents an interface implementing the PKIManager
// +kubebuilder:validation:Enum={"cert-manager","vault"}
type PKIBackend string

// ClientConfigType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"tls","basic"}
type ClientConfigType string

// ClusterType represents an interface implementing the ClientConfigManager
// +kubebuilder:validation:Enum={"external","internal"}
type ClusterType string

// AccessPolicyType represents the type of access policy
Expand Down Expand Up @@ -278,16 +286,23 @@ const (
// DataflowStateInSync describes the status of a NifiDataflow as in sync
DataflowStateInSync DataflowState = "InSync"

// ConnectionStateOutOfSync describes the status of a NifiConnection as out of sync
ConnectionStateOutOfSync ConnectionState = "OutOfSync"
// ConnectionStateInSync describes the status of a NifiConnection as in sync
ConnectionStateInSync ConnectionState = "InSync"
// ConnectionStateCreated describes the status of a NifiConnection as created
ConnectionStateCreated ConnectionState = "Created"

// RevertRequestType defines a revert changes request.
RevertRequestType DataflowUpdateRequestType = "Revert"
// UpdateRequestType defines an update version request.
UpdateRequestType DataflowUpdateRequestType = "Update"

// DrainStrategy leads to shutting down only input components (Input processors, remote input process group)
// and dropping all flowfiles from the flow.
DrainStrategy DataflowUpdateStrategy = "drain"
DrainStrategy ComponentUpdateStrategy = "drain"
// DropStrategy leads to shutting down all components and dropping all flowfiles from the flow.
DropStrategy DataflowUpdateStrategy = "drop"
DropStrategy ComponentUpdateStrategy = "drop"

// UserStateCreated describes the status of a NifiUser as created
UserStateCreated UserState = "created"
Expand Down Expand Up @@ -437,6 +452,21 @@ func SecretRefsEquals(secretRefs []SecretReference) bool {
return true
}

func ComponentRefsEquals(componentRefs []ComponentReference) bool {
c1 := componentRefs[0]
name := c1.Name
ns := c1.Namespace

for _, component := range componentRefs {
if name != component.Name || ns != component.Namespace || ns != string(component.Type) || ns != component.SubName {
return false
}
}

return true
}

// +kubebuilder:validation:Enum={"never","always","once"}
type DataflowSyncMode string

const (
Expand All @@ -462,3 +492,64 @@ const (
// downscale strategy targeting nodes which are least busy in terms of # flowfiles in queues
LeastBusyClusterDownscaleStrategy ClusterScalingStrategy = "leastbusy"
)

// Change the list to {"dataflow","input-port","output-port","processor","process-group"} when all the type are available
// +kubebuilder:validation:Enum={"dataflow"}
type ComponentType string

const (
ComponentDataflow ComponentType = "dataflow"
ComponentInputPort ComponentType = "input-port"
ComponentOutputPort ComponentType = "output-port"
ComponentProcessor ComponentType = "processor"
ComponentFunnel ComponentType = "funnel"
ComponentProcessGroup ComponentType = "process-group"
)

type ComponentInformation struct {
Id string `json:"id"`
GroupId string `json:"groupId"`
Type string `json:"type"`
ParentGroupId string `json:"parentGroupId"`
ClusterRef v1.ClusterReference `json:"clusterRef"`
}

// +kubebuilder:validation:Enum={"DO_NOT_LOAD_BALANCE","PARTITION_BY_ATTRIBUTE","ROUND_ROBIN","SINGLE"}
type ConnectionLoadBalanceStrategy string

const (
// Do not load balance FlowFiles between nodes in the cluster.
StrategyDoNotLoadBalance ConnectionLoadBalanceStrategy = "DO_NOT_LOAD_BALANCE"
// Determine which node to send a given FlowFile to based on the value of a user-specified FlowFile Attribute. All FlowFiles that have the same value for said Attribute will be sent to the same node in the cluster.
StrategyPartitionByAttribute ConnectionLoadBalanceStrategy = "PARTITION_BY_ATTRIBUTE"
// FlowFiles will be distributed to nodes in the cluster in a Round-Robin fashion. However, if a node in the cluster is not able to receive data as fast as other nodes, that node may be skipped in one or more iterations in order to maximize throughput of data distribution across the cluster.
StrategyRoundRobin ConnectionLoadBalanceStrategy = "ROUND_ROBIN"
// All FlowFiles will be sent to the same node. Which node they are sent to is not defined.
StrategySingle ConnectionLoadBalanceStrategy = "SINGLE"
)

// +kubebuilder:validation:Enum={"DO_NOT_COMPRESS","COMPRESS_ATTRIBUTES_ONLY","COMPRESS_ATTRIBUTES_AND_CONTENT"}
type ConnectionLoadBalanceCompression string

const (
// FlowFiles will not be compressed.
CompressionDoNotCompress ConnectionLoadBalanceCompression = "DO_NOT_COMPRESS"
// FlowFiles' attributes will be compressed, but the FlowFiles' contents will not be
CompressionCompressAttributesOnly ConnectionLoadBalanceCompression = "COMPRESS_ATTRIBUTES_ONLY"
// FlowFiles' attributes and content will be compressed
CompressionCompressAttributesAndContent ConnectionLoadBalanceCompression = "COMPRESS_ATTRIBUTES_AND_CONTENT"
)

// +kubebuilder:validation:Enum={"FirstInFirstOutPrioritizer","NewestFlowFileFirstPrioritizer","OldestFlowFileFirstPrioritizer","PriorityAttributePrioritizer"}
type ConnectionPrioritizer string

const (
// Given two FlowFiles, the one that reached the connection first will be processed first.
PrioritizerFirstInFirstOutPrioritizer ConnectionPrioritizer = "FirstInFirstOutPrioritizer"
// Given two FlowFiles, the one that is newest in the dataflow will be processed first.
PrioritizerNewestFlowFileFirstPrioritizer ConnectionPrioritizer = "NewestFlowFileFirstPrioritizer"
// Given two FlowFiles, the one that is oldest in the dataflow will be processed first. 'This is the default scheme that is used if no prioritizers are selected'.
PrioritizerOldestFlowFileFirstPrioritizer ConnectionPrioritizer = "OldestFlowFileFirstPrioritizer"
// Given two FlowFiles, an attribute called “priority” will be extracted. The one that has the lowest priority value will be processed first.
PrioritizerPriorityAttributePrioritizer ConnectionPrioritizer = "PriorityAttributePrioritizer"
)
3 changes: 0 additions & 3 deletions api/v1alpha1/nificluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ const (
// NifiClusterSpec defines the desired state of NifiCluster
type NifiClusterSpec struct {
// clientType defines if the operator will use basic or tls authentication to query the NiFi cluster.
// +kubebuilder:validation:Enum={"tls","basic"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
ClientType ClientConfigType `json:"clientType,omitempty"`
// type defines if the cluster is internal (i.e manager by the operator) or external.
// +kubebuilder:validation:Enum={"external","internal"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
Type ClusterType `json:"type,omitempty"`
// nodeURITemplate used to dynamically compute node uri (used if external type)
NodeURITemplate string `json:"nodeURITemplate,omitempty"`
Expand Down Expand Up @@ -373,7 +371,6 @@ type SSLSecrets struct {
// https://cert-manager.io/docs/concepts/issuer/
IssuerRef *cmmeta.ObjectReference `json:"issuerRef,omitempty"`
// TODO : add vault
// +kubebuilder:validation:Enum={"cert-manager","vault"}
juldrixx marked this conversation as resolved.
Show resolved Hide resolved
PKIBackend PKIBackend `json:"pkiBackend,omitempty"`
//,"vault"
}
Expand Down
Loading