Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add k8s event interpretation #382

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions acto/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from acto.kubectl_client import KubectlClient
from acto.snapshot import Snapshot
from acto.utils import acto_timer, get_thread_logger
from acto.utils.k8s_event_watcher import K8sEventWatcher

RunnerHookType = Callable[[kubernetes.client.ApiClient], None]
CustomSystemStateHookType = Callable[
Expand Down Expand Up @@ -456,7 +457,7 @@ def wait_for_system_converge(self, hard_timeout=480) -> bool:
while True:
try:
event = combined_event_queue.get(timeout=self.wait_time)
if event == "timeout":
if event in ["timeout", K8sEventWatcher.ABORT]:
converge = False
break
except queue.Empty:
Expand Down Expand Up @@ -590,9 +591,13 @@ def wait_for_system_converge(self, hard_timeout=480) -> bool:

def watch_system_events(self, event_stream, q: multiprocessing.Queue):
"""A process that watches namespaced events"""
for _ in event_stream:

watcher = K8sEventWatcher(q)

for payload in event_stream:
try:
q.put("event")
watcher.observe(payload)
except (ValueError, AssertionError):
pass

Expand Down
107 changes: 107 additions & 0 deletions acto/utils/k8s_event_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""determines whether cluster is stuck in an unhealthy state through interpreting K8s events"""
import json
import multiprocessing
from acto.utils.thread_logger import get_thread_logger
from typing import Callable, Optional
import copy


class Predicate():
def __init__(self, reason: str, message_filter: Callable[[Optional[str]], bool] = lambda x: True, threshold: Optional[int] = None) -> None:
self.reason = reason
self.message_filter = message_filter # event count threshold for deciding an abort
self.threshold = threshold

def match(self, reason: str, message: str = ""):
return self.reason == reason and self.message_filter(message)

def __str__(self) -> str:
return "(reason: {}, count_threshold: {})".format(self.reason, self.threshold)


#todo: unify this with other Acto configs
k8s_event_watcher_config = {
"default_threshold": 3,
"abort_predicates": [
# a full list of kubelet emitted reason can be found at https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/events/event.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the documentation

# event reason emitted by K8s controllers however, needs a scan from source code unfortunately

#kublet reasons, todo: add fine-calibrated message filters
Predicate("Failed"),
Predicate("BackOff", threshold=5),
Predicate("FailedCreatePodContainer"),
Predicate("ErrImageNeverPull"),
Predicate("FailedAttachVolume"),
Predicate("FailedMount"),
Predicate("VolumeResizeFailed"),
Predicate("FileSystemResizeFailed"),
Predicate("FailedMapVolume"),

# it is possible that scheduling fails due to node.kubernetes.io/not-ready
# which should be transient. We need to filter for truly alarming ones
Predicate(
"FailedScheduling",
lambda msg : any([keyword in msg for keyword in ["affinity", "Insufficient memory", "Insufficient cpu"]])
)
]

}



class K8sEventWatcher():
"""watch for K8s events that might signal an unresolvable state and request Acto to abort the convergence wait"""

ABORT = "k8s_event_watcher_abort_request"


def __init__(self, output_channel: multiprocessing.Queue) -> None:
self.logger = get_thread_logger(with_prefix=True)
self.output_channel = output_channel
self.counter = dict()
self.abort_requested = False
self.abort_predicates = copy.deepcopy(k8s_event_watcher_config.get("abort_predicates", []))
for predicate in self.abort_predicates:
t = predicate.threshold
predicate.threshold = t if t is not None and t > 0 else k8s_event_watcher_config.get("default_threshold", 3)




def observe(self, payload: bytes) -> None:
if self.abort_requested: # do nothing since we have already requested Acto to abort the convergence wait
return

try:
event: dict = json.loads(payload.decode("utf-8"))
reason = event.get("object", {}).get("reason")
message = event.get("object", {}).get("message")
except Exception:
self.logger.warning("Failed to deserialize K8s event from payload", str(payload))
return

for predicate in self.abort_predicates:
if predicate.match(reason, message):
involved_object = event["object"].get("involvedObject", {})
self.logger.info("Observered K8s event matching abort predicate %s for object %s" % (predicate, str(involved_object)))
object_id = involved_object.get("uid", "")

need_abort = self._inc_and_check(object_id, predicate)
if need_abort:
self.logger.warning("Aborting convergence wait due to failed predicate %s" % predicate)
self.output_channel.put(self.ABORT)
self.abort_requested = True
break



def _inc_and_check(self, object_id: str, predicate: Predicate) -> bool:
if not predicate in self.counter:
self.counter[predicate] = dict()

if not object_id in self.counter[predicate]:
self.counter[predicate][object_id] = 0

self.counter[predicate][object_id] += 1
need_abort = self.counter[predicate][object_id] >= predicate.threshold
return need_abort
120 changes: 120 additions & 0 deletions test/e2e_tests/test_convergence_wait_abort.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""This module tests quick abort from waiting for system convergence upon observing unresolvable
errors from K8s events when deploying a testcase"""


import logging
import os
import pathlib
from typing import Callable
from acto import utils
from acto.kubernetes_engine.kind import Kind
from acto.utils.k8s_event_watcher import k8s_event_watcher_config
from acto.runner import Runner
import tempfile
import unittest


test_dir = pathlib.Path(__file__).parent.resolve()
test_data_dir = os.path.join(test_dir, "test_data")


class TestConvergenceWaitAbort(unittest.TestCase):


def __init__(self, methodName: str = "runTest") -> None:
super().__init__(methodName)
# lower threshold for the sake of faster test
k8s_event_watcher_config["default_threshold"] = 2



def test_unsatisfiable_affinity_rule(self):
def log_file_test(log_file_path) -> bool:
keyword = "Aborting convergence wait due to failed predicate (reason: FailedScheduling,"
with open(log_file_path, "r") as log_file:
for log_line in log_file:
if keyword in log_line:
return True
return False

resource_manifest_path = os.path.join(test_data_dir, "k8s-event-watcher", "unsatisfiable-affinity.yaml")
self._test_convergence_wait_abort("unsatisfiable-affinity", resource_manifest_path, log_file_test)


def test_invalid_image(self):
def log_file_test(log_file_path) -> bool:
keyword = "Aborting convergence wait due to failed predicate (reason: Failed,"
with open(log_file_path, "r") as log_file:
for log_line in log_file:
if keyword in log_line:
return True
return False

resource_manifest_path = os.path.join(test_data_dir, "k8s-event-watcher", "invalid-image.yaml")
self._test_convergence_wait_abort("invalid-image", resource_manifest_path, log_file_test)





# should never abort a convergence wait for satisfiable deployments
def test_satisfiable_deployment(self):
def log_file_test(log_file_path) -> bool:
keyword = "Aborting convergence"
with open(log_file_path, "r") as log_file:
for log_line in log_file:
if keyword in log_line:
return False
return True

resource_manifest_path = os.path.join(test_data_dir, "k8s-event-watcher", "satisfiable-deployment.yaml")
self._test_convergence_wait_abort("satisfiable", resource_manifest_path, log_file_test)



# apply a resource manifest and examine the log file
def _test_convergence_wait_abort(self, cluster_name:str, resource_file_path: str, log_test_predicate: Callable[[str],bool]) -> str:

tmp_dir = tempfile.TemporaryDirectory()


log_file_path = os.path.join(tmp_dir.name, "test.log")

logging.basicConfig(
filename=log_file_path,
level=logging.WARN,
format="%(message)s",
force=True
)

kube_config_path = os.path.join(os.path.expanduser("~"), ".kube/test-"+cluster_name)

cluster = Kind(
acto_namespace=0, num_nodes=3, version="v1.27.3"
)

cluster.create_cluster(cluster_name, kube_config_path)


runner = Runner(
context = {
"namespace": "test",
"crd": None,
"preload_images": set(),
},
trial_dir=tmp_dir.name,
kubeconfig= kube_config_path,
context_name="kind-"+cluster_name
)

utils.create_namespace(runner.apiclient, "test")

runner.run_without_collect(resource_file_path)
cluster.delete_cluster(cluster_name, kube_config_path)

assert(log_test_predicate(log_file_path))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test to see if an abort is triggered by examining the log file, since I haven't been able to find a cleaner way to test that.

tmp_dir.cleanup()




30 changes: 30 additions & 0 deletions test/e2e_tests/test_data/k8s-event-watcher/invalid-image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
spec:
selector:
matchLabels:
app: busybox
replicas: 12
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: strange-invalid-image
resources:
requests:
cpu: 1

limits:
cpu: 1

command:
- "sh"
- "-c"
- "while true; do sleep 3600; done"


Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
spec:
selector:
matchLabels:
app: busybox
replicas: 3
template:
metadata:
labels:
app: busybox
spec:
containers:
- name: busybox
image: busybox
command:
- "sh"
- "-c"
- "while true; do sleep 3600; done"


Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: busybox-deployment
spec:
selector:
matchLabels:
app: busybox
replicas: 5 #unsatisfiable affinity rule when we have only 3 worker nodes
template:
metadata:
labels:
app: busybox
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- busybox
topologyKey: kubernetes.io/hostname
containers:
- name: busybox
image: busybox
command:
- "sh"
- "-c"
- "while true; do sleep 3600; done"