diff --git a/kubernetes/base/leaderelection/electionconfig.py b/kubernetes/base/leaderelection/electionconfig.py index 7b0db639b4..fa4a2f7e4b 100644 --- a/kubernetes/base/leaderelection/electionconfig.py +++ b/kubernetes/base/leaderelection/electionconfig.py @@ -16,10 +16,10 @@ import logging logging.basicConfig(level=logging.INFO) - class Config: + # Validate config, exit if an error is detected - def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading): + def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted_leading, onstopped_leading, context): self.jitter_factor = 1.2 if lock is None: @@ -53,6 +53,7 @@ def __init__(self, lock, lease_duration, renew_deadline, retry_period, onstarted self.onstopped_leading = self.on_stoppedleading_callback else: self.onstopped_leading = onstopped_leading + self.context = context # Default callback for when the current candidate if a leader, stops leading def on_stoppedleading_callback(self): diff --git a/kubernetes/base/leaderelection/example.py b/kubernetes/base/leaderelection/example.py index 3b3336c8e3..a9c2bacda7 100644 --- a/kubernetes/base/leaderelection/example.py +++ b/kubernetes/base/leaderelection/example.py @@ -14,10 +14,9 @@ import uuid from kubernetes import client, config -from kubernetes.leaderelection import leaderelection -from kubernetes.leaderelection.resourcelock.configmaplock import ConfigMapLock -from kubernetes.leaderelection import electionconfig - +import leaderelection +from resourcelock.configmaplock import ConfigMapLock +import electionconfig # Authenticate using config file config.load_kube_config(config_file=r"") @@ -33,6 +32,8 @@ # Kubernetes namespace lock_namespace = "default" +context = leaderelection.Context() + # The function that a user wants to run once a candidate is elected as a leader def example_func(): @@ -45,7 +46,7 @@ def example_func(): # Create config config = electionconfig.Config(ConfigMapLock(lock_name, lock_namespace, candidate_id), lease_duration=17, renew_deadline=15, retry_period=5, onstarted_leading=example_func, - onstopped_leading=None) + onstopped_leading=None, context=context) # Enter leader election leaderelection.LeaderElection(config).run() diff --git a/kubernetes/base/leaderelection/leaderelection.py b/kubernetes/base/leaderelection/leaderelection.py index a707fbaccd..9c81ac4af9 100644 --- a/kubernetes/base/leaderelection/leaderelection.py +++ b/kubernetes/base/leaderelection/leaderelection.py @@ -17,8 +17,9 @@ import time import json import threading -from .leaderelectionrecord import LeaderElectionRecord +from leaderelectionrecord import LeaderElectionRecord import logging +import signal # if condition to be removed when support for python2 will be removed if sys.version_info > (3, 0): from http import HTTPStatus @@ -36,8 +37,21 @@ lease. """ +class Context: + def __init__(self): + self.cancelled = False + + def cancel(self): + self.cancelled = True + +# This currently only handles Ctrl+C on a leader, which is not the only way a leader may exit +def handle_sigint(signal_received, frame): + print("\nSIGINT received! Cancelling election...") + if LeaderElection.global_context: + LeaderElection.global_context.cancel() class LeaderElection: + global_context = None def __init__(self, election_config): if election_config is None: sys.exit("argument config not passed") @@ -51,13 +65,18 @@ def __init__(self, election_config): # Latest update time of the lock self.observed_time_milliseconds = 0 + LeaderElection.global_context = self.election_config.context + + # Attach signal handler to Ctrl+C (SIGINT) + signal.signal(signal.SIGINT, handle_sigint) + # Point of entry to Leader election def run(self): # Try to create/ acquire a lock if self.acquire(): logging.info("{} successfully acquired lease".format(self.election_config.lock.identity)) - # Start leading and call OnStartedLeading() + # Start the leader callback in a new daemon thread. threading.daemon = True threading.Thread(target=self.election_config.onstarted_leading).start() @@ -72,6 +91,7 @@ def acquire(self): retry_period = self.election_config.retry_period while True: + succeeded = self.try_acquire_or_renew() if succeeded: @@ -79,6 +99,7 @@ def acquire(self): time.sleep(retry_period) + def renew_loop(self): # Leader logging.info("Leader has entered renew loop and will try to update lease continuously") @@ -87,10 +108,20 @@ def renew_loop(self): renew_deadline = self.election_config.renew_deadline * 1000 while True: + # Check for context cancellation + if self.election_config.context.cancelled: + self.force_expire_lease() + return + timeout = int(time.time() * 1000) + renew_deadline succeeded = False while int(time.time() * 1000) < timeout: + if self.election_config.context.cancelled: + logging.info(f"Context cancelled during renew loop. Reason: {self.election_config.context.cancel_reason}") + self.force_expire_lease() + return + succeeded = self.try_acquire_or_renew() if succeeded: @@ -104,6 +135,41 @@ def renew_loop(self): # failed to renew, return return + def force_expire_lease(self, max_retries=3): + """ + Force the lease to be considered expired by updating the leader election record's renewTime + to a value in the past. Retries the update if a conflict (HTTP 409) is encountered. + """ + expired_time = time.time() - self.election_config.lease_duration - 1 # Expired timestamp + retries = 0 + while retries < max_retries: + # Re-read the current state of the lock to get the latest version. + lock_status, current_record = self.election_config.lock.get( + self.election_config.lock.name, + self.election_config.lock.namespace + ) + # Create a new record using the current record's acquireTime if available. + new_record = LeaderElectionRecord( + self.election_config.lock.identity, + str(self.election_config.lease_duration), + None, + str(expired_time) + ) + update_status = self.election_config.lock.update( + self.election_config.lock.name, + self.election_config.lock.namespace, + new_record + ) + if update_status: + logging.info("Lease forcibly expired.") + return True + else: + logging.info(f"Conflict encountered, retrying update... (attempt {retries+1})") + retries += 1 + time.sleep(0.5) # wait a bit before retrying, this is very hacky + logging.info("Failed to force lease expiration after retries.") + return False + def try_acquire_or_renew(self): now_timestamp = time.time() now = datetime.datetime.fromtimestamp(now_timestamp) diff --git a/kubernetes/base/leaderelection/resourcelock/configmaplock.py b/kubernetes/base/leaderelection/resourcelock/configmaplock.py index a4ccf49d27..a5b277547b 100644 --- a/kubernetes/base/leaderelection/resourcelock/configmaplock.py +++ b/kubernetes/base/leaderelection/resourcelock/configmaplock.py @@ -11,11 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import sys +sys.path.append("..") from kubernetes.client.rest import ApiException from kubernetes import client, config from kubernetes.client.api_client import ApiClient -from ..leaderelectionrecord import LeaderElectionRecord +from leaderelectionrecord import LeaderElectionRecord import json import logging logging.basicConfig(level=logging.INFO)