-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathresource_manager.py
225 lines (184 loc) · 8.3 KB
/
resource_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""Manages resource allocation and scaling for LXC containers."""
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
import paramiko
import lxc_utils
import scaling_manager
from config import (
config,
DEFAULTS,
LXC_TIER_ASSOCIATIONS,
IGNORE_LXC
)
from notification import send_notification
IGNORE_LXC = set(config.get("DEFAULTS", {}).get("ignore_lxc", [])) # Example list of containers to ignore
def collect_data_for_container(ctid: str) -> Optional[Dict[str, Any]]:
"""Collect resource usage data for a single LXC container.
Args:
ctid: The container ID.
Returns:
A dictionary with resource data for the container, or None if the container is not running.
"""
if ctid in IGNORE_LXC:
logging.debug(f"Skipping ignored container {ctid}")
return None
if not lxc_utils.is_container_running(ctid):
logging.debug(f"Container {ctid} is not running")
return None
try:
# Get container config more reliably
config_output = lxc_utils.run_command(f"pct config {ctid}")
if not config_output:
raise ValueError(f"No configuration found for container {ctid}")
cores = memory = None
for line in config_output.splitlines():
if ':' not in line:
continue
key, value = [x.strip() for x in line.split(':', 1)]
if key == 'cores':
cores = int(value)
elif key == 'memory':
memory = int(value)
if cores is None or memory is None:
raise ValueError(f"Missing cores or memory configuration for container {ctid}")
# Get resource usage with better error handling
cpu_usage = lxc_utils.get_cpu_usage(ctid)
mem_usage = lxc_utils.get_memory_usage(ctid)
if cpu_usage is None or mem_usage is None:
raise ValueError(f"Failed to get resource usage for container {ctid}")
return {
ctid: {
"cpu": cpu_usage,
"mem": mem_usage,
"initial_cores": cores,
"initial_memory": memory,
}
}
except Exception as e:
logging.error(f"Error collecting data for container {ctid}: {str(e)}")
return None
def collect_container_data() -> Dict[str, Dict[str, Any]]:
"""Collect resource usage data for all LXC containers."""
containers: Dict[str, Dict[str, Any]] = {}
with ThreadPoolExecutor(max_workers=8) as executor:
futures = {
executor.submit(collect_data_for_container, ctid): ctid
for ctid in lxc_utils.get_containers()
if ctid not in IGNORE_LXC
}
for future in as_completed(futures):
ctid = futures[future]
try:
result = future.result()
if result:
containers.update(result)
# Apply tier settings
tier = config.get("tiers", {}).get(ctid)
containers[ctid]["tier"] = tier
except Exception as e:
logging.error(f"Error collecting data for container {ctid}: {e}")
return containers
def validate_tier_config(ctid: str, tier_config: Dict[str, Any]) -> bool:
"""Validate tier configuration settings.
Args:
ctid: The container ID.
tier_config: The tier configuration to validate.
Returns:
bool: True if configuration is valid, False otherwise.
"""
required_fields = [
'cpu_upper_threshold',
'cpu_lower_threshold',
'memory_upper_threshold',
'memory_lower_threshold',
'min_cores',
'max_cores',
'min_memory'
]
# Check for missing fields
missing = [field for field in required_fields if field not in tier_config]
if missing:
logging.error(f"Missing required tier configuration fields for container {ctid}: {', '.join(missing)}")
return False
# Validate threshold relationships
try:
thresholds = [
('CPU', 'cpu_lower_threshold', 'cpu_upper_threshold'),
('Memory', 'memory_lower_threshold', 'memory_upper_threshold')
]
for resource, lower, upper in thresholds:
if not (0 <= tier_config[lower] < tier_config[upper] <= 100):
logging.error(
f"Invalid {resource} thresholds for container {ctid}: "
f"lower={tier_config[lower]}, upper={tier_config[upper]}"
)
return False
if not (0 < tier_config['min_cores'] <= tier_config['max_cores']):
logging.error(
f"Invalid core limits for container {ctid}: "
f"min={tier_config['min_cores']}, max={tier_config['max_cores']}"
)
return False
if tier_config['min_memory'] <= 0:
logging.error(f"Invalid minimum memory for container {ctid}: {tier_config['min_memory']}")
return False
logging.info(f"Tier configuration validated successfully for container {ctid}")
return True
except (TypeError, ValueError) as e:
logging.error(f"Invalid tier configuration values for container {ctid}: {e}")
return False
def main_loop(poll_interval: int, energy_mode: bool) -> None:
"""Main loop that handles the resource allocation and scaling process.
Args:
poll_interval: The interval in seconds between each resource allocation process.
energy_mode: A flag to indicate if energy efficiency mode should be enabled during off-peak hours.
"""
while True:
loop_start_time = time.time()
logging.info("Starting resource allocation process...")
try:
# Log time before collecting data
collect_start_time = time.time()
logging.debug("Collecting container data...")
containers = collect_container_data()
collect_duration = time.time() - collect_start_time
logging.debug(f"Container data collection took {collect_duration:.2f} seconds.")
# Validate tier settings from configuration
for ctid in list(containers.keys()):
tier = config.get("tiers", {}).get(ctid)
containers[ctid]["tier"] = tier
if tier:
if not validate_tier_config(ctid, tier):
logging.error("Tier configuration for container %s is invalid. Removing container from scaling.", ctid)
del containers[ctid]
continue
logging.info("Applying tier settings for container %s: %s", ctid, tier)
else:
logging.info("No tier settings found for container %s in /etc/lxc_autoscale/lxc_autoscale.yml", ctid)
# Log time before adjusting resources
adjust_start_time = time.time()
logging.debug("Adjusting resources...")
scaling_manager.adjust_resources(containers, energy_mode)
adjust_duration = time.time() - adjust_start_time
logging.debug(f"Resource adjustment took {adjust_duration:.2f} seconds.")
# Log time before scaling horizontally
scale_start_time = time.time()
logging.debug("Managing horizontal scaling...")
scaling_manager.manage_horizontal_scaling(containers)
scale_duration = time.time() - scale_start_time
logging.debug(f"Horizontal scaling took {scale_duration:.2f} seconds.")
loop_duration = time.time() - loop_start_time
logging.info(f"Resource allocation process completed. Total loop duration: {loop_duration:.2f} seconds.")
# Log next run in `poll_interval` seconds
if loop_duration < poll_interval:
sleep_duration = poll_interval - loop_duration
logging.debug(f"Sleeping for {sleep_duration:.2f} seconds until the next run.")
time.sleep(sleep_duration)
else:
logging.warning("The loop took longer than the poll interval! No sleep will occur.")
except Exception as e:
logging.error(f"Error in main loop: {e}")
logging.exception("Exception traceback:")
time.sleep(poll_interval)