Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Oct 7, 2023
1 parent 6347f25 commit 779d878
Show file tree
Hide file tree
Showing 2 changed files with 309 additions and 124 deletions.
255 changes: 131 additions & 124 deletions e2e_tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/receiver/receivertest"
Expand All @@ -34,6 +33,8 @@ import (
"helm.sh/helm/v3/pkg/chart/loader"
"helm.sh/helm/v3/pkg/kube"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -178,6 +179,44 @@ func deployChartsAndApps(t *testing.T) {
require.NoError(t, err)
}
}
jobstream, err := os.ReadFile(filepath.Join("testdata", "test_jobs.yaml"))
require.NoError(t, err)
var namespaces []*corev1.Namespace
var jobs []*batchv1.Job
for _, resourceYAML := range strings.Split(string(jobstream), "---") {
if len(resourceYAML) == 0 {
continue
}

obj, groupVersionKind, err := decode(
[]byte(resourceYAML),
nil,
nil)
require.NoError(t, err)
if groupVersionKind.Group == "" &&
groupVersionKind.Version == "v1" &&
groupVersionKind.Kind == "Namespace" {
nm := obj.(*corev1.Namespace)
namespaces = append(namespaces, nm)
nms := clientset.CoreV1().Namespaces()
_, err := nms.Create(context.Background(), nm, metav1.CreateOptions{})
require.NoError(t, err)
t.Logf("Deployed namespace %s", nm.Name)
}

waitForAllNamespacesToBeCreated(t, clientset)

if groupVersionKind.Group == "batch" &&
groupVersionKind.Version == "v1" &&
groupVersionKind.Kind == "Job" {
job := obj.(*batchv1.Job)
jobs = append(jobs, job)
jobClient := clientset.BatchV1().Jobs(job.Namespace)
_, err := jobClient.Create(context.Background(), job, metav1.CreateOptions{})
require.NoError(t, err)
t.Logf("Deployed job %s", job.Name)
}
}

waitForAllDeploymentsToStart(t, clientset)

Expand All @@ -190,6 +229,18 @@ func deployChartsAndApps(t *testing.T) {
_ = deployments.Delete(context.Background(), "nodejs-test", metav1.DeleteOptions{
GracePeriodSeconds: &waitTime,
})
for _, job := range jobs {
jobClient := clientset.BatchV1().Jobs(job.Namespace)
_ = jobClient.Delete(context.Background(), job.Name, metav1.DeleteOptions{
GracePeriodSeconds: &waitTime,
})
}
for _, nm := range namespaces {
nmClient := clientset.CoreV1().Namespaces()
_ = nmClient.Delete(context.Background(), nm.Name, metav1.DeleteOptions{
GracePeriodSeconds: &waitTime,
})
}
uninstall := action.NewUninstall(actionConfig)
uninstall.IgnoreNotFound = true
uninstall.Wait = true
Expand All @@ -198,29 +249,17 @@ func deployChartsAndApps(t *testing.T) {
}

func Test_E2E(t *testing.T) {
sinks := setupOnce(t)
t.Run("node.js traces captured", func(t *testing.T) {
testNodeJSTraces(t, sinks)
})
t.Run("kubernetes cluster metrics", func(t *testing.T) {
testK8sClusterReceiverMetrics(t, sinks)
})
t.Run("agent logs", func(t *testing.T) {
testAgentLogs(t, sinks)
})
t.Run("test HEC metrics", func(t *testing.T) {
testHECMetrics(t, sinks)
})
t.Run("test k8s objects", func(t *testing.T) {
testK8sObjects(t, sinks)
})
t.Run("test agent metrics", func(t *testing.T) {
testAgentMetrics(t, sinks)
})
_ = setupOnce(t)
t.Run("node.js traces captured", testNodeJSTraces)
t.Run("kubernetes cluster metrics", testK8sClusterReceiverMetrics)
t.Run("agent logs", testAgentLogs)
t.Run("test HEC metrics", testHECMetrics)
t.Run("test k8s objects", testK8sObjects)
t.Run("test agent metrics", testAgentMetrics)
}

func testNodeJSTraces(t *testing.T, s *sinks) {
tracesConsumer := s.tracesConsumer
func testNodeJSTraces(t *testing.T) {
tracesConsumer := setupOnce(t).tracesConsumer

var expectedTraces ptrace.Traces
expectedTracesFile := filepath.Join("testdata", "expected_traces.yaml")
Expand All @@ -246,8 +285,8 @@ func testNodeJSTraces(t *testing.T, s *sinks) {

}

func testK8sClusterReceiverMetrics(t *testing.T, s *sinks) {
metricsConsumer := s.k8sclusterReceiverMetricsConsumer
func testK8sClusterReceiverMetrics(t *testing.T) {
metricsConsumer := setupOnce(t).k8sclusterReceiverMetricsConsumer

metricNames := []string{
"k8s.node.condition_ready",
Expand All @@ -272,14 +311,17 @@ func testK8sClusterReceiverMetrics(t *testing.T, s *sinks) {
checkMetricsAreEmitted(t, metricsConsumer, metricNames)
}

func testAgentLogs(t *testing.T, s *sinks) {
logsConsumer := s.logsConsumer
func testAgentLogs(t *testing.T, ) {
logsConsumer := setupOnce(t).logsConsumer
waitForLogs(t, 5, logsConsumer)

var resource pcommon.Resource
var helloWorldResource pcommon.Resource
var helloWorldLogRecord *plog.LogRecord
var podAnnoResource pcommon.Resource
var podAnnoLogRecord *plog.LogRecord
var nsAnnoResource pcommon.Resource
var nsAnnoLogRecord *plog.LogRecord

OUTER:
for i := 0; i < len(logsConsumer.AllLogs()); i++ {
l := logsConsumer.AllLogs()[i]
for j := 0; j < l.ResourceLogs().Len(); j++ {
Expand All @@ -290,30 +332,59 @@ OUTER:
logRecord := sl.LogRecords().At(m)
if logRecord.Body().AsString() == "Hello World" {
helloWorldLogRecord = &logRecord
resource = rl.Resource()
break OUTER
helloWorldResource = rl.Resource()
}
if value, ok := rl.Resource().Attributes().Get("com.splunk.index"); ok {
if "pod-anno" == value.AsString() {
podAnnoLogRecord = &logRecord
podAnnoResource = rl.Resource()
}
if "ns-anno" == value.AsString() {
nsAnnoLogRecord = &logRecord
nsAnnoResource = rl.Resource()
}
}
}
}
}
}
assert.NotNil(t, helloWorldLogRecord)
sourceType, ok := resource.Attributes().Get("com.splunk.sourcetype")
assert.True(t, ok)
assert.Equal(t, "kube:container:nodejs-test", sourceType.AsString())
source, ok := resource.Attributes().Get("com.splunk.source")
assert.True(t, ok)
assert.Regexp(t, regexp.MustCompile("/var/log/pods/default_nodejs-test-.*/nodejs-test/0.log"), source.AsString())
index, ok := resource.Attributes().Get("com.splunk.index")
assert.True(t, ok)
assert.Equal(t, "main", index.AsString())
podName, ok := helloWorldLogRecord.Attributes().Get("k8s.pod.name")
assert.True(t, ok)
assert.Regexp(t, regexp.MustCompile("nodejs-test-.*"), podName.AsString())
{
assert.NotNil(t, helloWorldLogRecord)
sourceType, ok := helloWorldResource.Attributes().Get("com.splunk.sourcetype")
assert.True(t, ok)
assert.Equal(t, "kube:container:nodejs-test", sourceType.AsString())
source, ok := helloWorldResource.Attributes().Get("com.splunk.source")
assert.True(t, ok)
assert.Regexp(t, regexp.MustCompile("/var/log/pods/default_nodejs-test-.*/nodejs-test/0.log"), source.AsString())
index, ok := helloWorldResource.Attributes().Get("com.splunk.index")
assert.True(t, ok)
assert.Equal(t, "main", index.AsString())
podName, ok := helloWorldLogRecord.Attributes().Get("k8s.pod.name")
assert.True(t, ok)
assert.Regexp(t, regexp.MustCompile("nodejs-test-.*"), podName.AsString())
}
{
assert.NotNil(t, podAnnoLogRecord)
sourceType, ok := podAnnoResource.Attributes().Get("com.splunk.sourcetype")
assert.True(t, ok)
assert.Equal(t, "kube:container:pod-w-index-wo-ns-index", sourceType.AsString())
tag, ok := podAnnoLogRecord.Attributes().Get("k8s.pod.labels.app")
assert.True(t, ok)
assert.Equal(t, "pod-w-index-wo-ns-index", tag.AsString())
}
{
assert.NotNil(t, nsAnnoLogRecord)
sourceType, ok := nsAnnoResource.Attributes().Get("com.splunk.sourcetype")
assert.True(t, ok)
assert.Equal(t, "kube:container:pod-wo-index-w-ns-index", sourceType.AsString())
tag, ok := nsAnnoLogRecord.Attributes().Get("k8s.pod.labels.app")
assert.True(t, ok)
assert.Equal(t, "pod-w-index-wo-ns-index", tag.AsString())
}
}

func testK8sObjects(t *testing.T, s *sinks) {
logsObjectsConsumer := s.logsObjectsConsumer
func testK8sObjects(t *testing.T) {
logsObjectsConsumer := setupOnce(t).logsObjectsConsumer
waitForLogs(t, 5, logsObjectsConsumer)

var kinds []string
Expand Down Expand Up @@ -349,8 +420,8 @@ func testK8sObjects(t *testing.T, s *sinks) {
assert.Contains(t, sourceTypes, "kube:object:nodes")
}

func testAgentMetrics(t *testing.T, s *sinks) {
agentMetricsConsumer := s.agentMetricsConsumer
func testAgentMetrics(t *testing.T) {
agentMetricsConsumer := setupOnce(t).agentMetricsConsumer

metricNames := []string{
"container.filesystem.available",
Expand Down Expand Up @@ -400,7 +471,7 @@ func testAgentMetrics(t *testing.T, s *sinks) {
checkMetricsAreEmitted(t, agentMetricsConsumer, metricNames)
}

func testHECMetrics(t *testing.T, s *sinks) {
func testHECMetrics(t *testing.T) {
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer

metricNames := []string{
Expand Down Expand Up @@ -486,6 +557,18 @@ func testHECMetrics(t *testing.T, s *sinks) {
checkMetricsAreEmitted(t, hecMetricsConsumer, metricNames)
}

func waitForAllNamespacesToBeCreated(t *testing.T, clientset *kubernetes.Clientset) {
require.Eventually(t, func() bool {
nms, err := clientset.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{})
require.NoError(t, err)
for _, d := range nms.Items {
if d.Status.Phase != corev1.NamespaceActive {
return false
}
}
return true
}, 5*time.Minute, 10*time.Second)
}
func waitForAllDeploymentsToStart(t *testing.T, clientset *kubernetes.Clientset) {
require.Eventually(t, func() bool {
di, err := clientset.AppsV1().Deployments("default").List(context.Background(), metav1.ListOptions{})
Expand Down Expand Up @@ -641,26 +724,6 @@ func hostEndpoint(t *testing.T) string {
return ""
}

// readMetrics reads a pmetric.Metrics from the specified YAML or JSON file.
func readMetrics(filePath string) (pmetric.Metrics, error) {
b, err := os.ReadFile(filePath)
if err != nil {
return pmetric.Metrics{}, err
}
if strings.HasSuffix(filePath, ".yaml") || strings.HasSuffix(filePath, ".yml") {
var m map[string]interface{}
if err = yaml.Unmarshal(b, &m); err != nil {
return pmetric.Metrics{}, err
}
b, err = json.Marshal(m)
if err != nil {
return pmetric.Metrics{}, err
}
}
unmarshaller := &pmetric.JSONUnmarshaler{}
return unmarshaller.UnmarshalMetrics(b)
}

// readTraces reads a ptrace.Traces from the specified YAML or JSON file.
func readTraces(filePath string) (ptrace.Traces, error) {
b, err := os.ReadFile(filePath)
Expand All @@ -681,53 +744,6 @@ func readTraces(filePath string) (ptrace.Traces, error) {
return unmarshaler.UnmarshalTraces(b)
}

func containerImageShorten(value string) string {
return value[(strings.LastIndex(value, "/") + 1):]
}

func shortenNames(value string) string {
if strings.HasPrefix(value, "kube-proxy") {
return "kube-proxy"
}
if strings.HasPrefix(value, "local-path-provisioner") {
return "local-path-provisioner"
}
if strings.HasPrefix(value, "kindnet") {
return "kindnet"
}
if strings.HasPrefix(value, "coredns") {
return "coredns"
}
if strings.HasPrefix(value, "otelcol") {
return "otelcol"
}
if strings.HasPrefix(value, "sock-splunk-otel-collector-agent") {
return "sock-splunk-otel-collector-agent"
}
if strings.HasPrefix(value, "sock-splunk-otel-collector-k8s-cluster-receiver") {
return "sock-splunk-otel-collector-k8s-cluster-receiver"
}
if strings.HasPrefix(value, "cert-manager-cainjector") {
return "cert-manager-cainjector"
}
if strings.HasPrefix(value, "sock-operator") {
return "sock-operator"
}
if strings.HasPrefix(value, "nodejs-test") {
return "nodejs-test"
}
if strings.HasPrefix(value, "cert-manager-webhook") {
return "cert-manager-webhook"
}
if strings.HasPrefix(value, "cert-manager") {
return "cert-manager"
}

return value
}

func replaceWithStar(string) string { return "*" }

func waitForTraces(t *testing.T, entriesNum int, tc *consumertest.TracesSink) {
timeoutMinutes := 3
require.Eventuallyf(t, func() bool {
Expand All @@ -737,15 +753,6 @@ func waitForTraces(t *testing.T, entriesNum int, tc *consumertest.TracesSink) {
len(tc.AllTraces()), timeoutMinutes)
}

func waitForMetrics(t *testing.T, entriesNum int, mc *consumertest.MetricsSink) {
timeoutMinutes := 3
require.Eventuallyf(t, func() bool {
return len(mc.AllMetrics()) > entriesNum
}, time.Duration(timeoutMinutes)*time.Minute, 1*time.Second,
"failed to receive %d entries, received %d metrics in %d minutes", entriesNum,
len(mc.AllMetrics()), timeoutMinutes)
}

func waitForLogs(t *testing.T, entriesNum int, lc *consumertest.LogsSink) {
timeoutMinutes := 3
require.Eventuallyf(t, func() bool {
Expand Down
Loading

0 comments on commit 779d878

Please sign in to comment.