-
Notifications
You must be signed in to change notification settings - Fork 3
/
objgetters.py
147 lines (124 loc) · 5.03 KB
/
objgetters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#! /usr/bin/env python3
# Requires: python3 (>= 3.8)
#
# Copyright the Cluster Management Toolkit for Kubernetes contributors.
# SPDX-License-Identifier: MIT
"""
This file contains helpers that provide an obj for use in info views,
for cases where the obj provided from the list view is not sufficient
"""
from pathlib import Path, PurePath
import sys
from typing import Callable, Dict, List
try:
from natsort import natsorted
except ModuleNotFoundError: # pragma: no cover
sys.exit("ModuleNotFoundError: Could not import natsort; "
"you may need to (re-)run `cmt-install` or `pip3 install natsort`; aborting.")
from cmttypes import deep_get, DictPath, FilePath, FilePathAuditError
from cmttypes import SecurityChecks, SecurityStatus
from ansible_helper import ansible_run_playbook_on_selection, get_playbook_path
from cmtio import check_path, join_securitystatus_set
from cmtio_yaml import secure_read_yaml
def objgetter_ansible_facts(obj: Dict) -> Dict:
"""
Get an obj by using ansible facts
Parameters:
obj (dict): The obj to use as reference
Returns:
(dict): An ansible facts object
"""
hostname = deep_get(obj, DictPath("name"), "")
get_facts_path = get_playbook_path(FilePath("get_facts.yaml"))
retval, ansible_results = ansible_run_playbook_on_selection(get_facts_path, [hostname])
if retval != 0:
return {}
ar = {}
for result in deep_get(ansible_results, DictPath(hostname), []):
if deep_get(result, DictPath("task"), "") == "Gathering host facts":
ar = deep_get(result, DictPath("ansible_facts"))
break
return ar
def objgetter_journalctl_log(obj: List[Dict]) -> Dict:
"""
Format a journalctl log message
Parameters:
obj (dict): The obj to get data from
Returns:
(dict): A journalctl facts object
"""
data = {
# This should only be logs from one host, so we can get the hostname
"name": deep_get(obj[0], DictPath("name")),
"host": deep_get(obj[0], DictPath("host")),
"created_at": deep_get(obj[0], DictPath("created_at")),
"obj": obj,
}
return data
def objgetter_ansible_log(obj: FilePath) -> Dict:
"""
Get an obj from an ansible log entry
Parameters:
obj (dict): The obj to use as reference
Returns:
(dict): An ansible log entry
"""
tmpobj: Dict = secure_read_yaml(obj.joinpath("metadata.yaml"))
tmpobj["log_path"] = str(obj)
# The playbook directory itself may be a symlink.
# This is expected behaviour when installing from a git repo,
# but we only allow it if the rest of the path components are secure
checks = [
SecurityChecks.PARENT_RESOLVES_TO_SELF,
SecurityChecks.PARENT_OWNER_IN_ALLOWLIST,
SecurityChecks.OWNER_IN_ALLOWLIST,
SecurityChecks.PARENT_PERMISSIONS,
SecurityChecks.PERMISSIONS,
SecurityChecks.EXISTS,
SecurityChecks.IS_DIR,
]
playbook_path = FilePath(tmpobj["playbook_path"])
playbook_dir = FilePath(PurePath(playbook_path).parent)
violations = check_path(playbook_dir, checks=checks)
if violations != [SecurityStatus.OK]:
violations_joined = join_securitystatus_set(",", set(violations))
raise FilePathAuditError(f"Violated rules: {violations_joined}", path=playbook_dir)
# We do not want to check that parent resolves to itself,
# because when we have an installation with links directly to the git repo
# the playbooks directory will be a symlink
checks = [
SecurityChecks.RESOLVES_TO_SELF,
SecurityChecks.PARENT_OWNER_IN_ALLOWLIST,
SecurityChecks.OWNER_IN_ALLOWLIST,
SecurityChecks.PARENT_PERMISSIONS,
SecurityChecks.PERMISSIONS,
SecurityChecks.IS_FILE,
]
try:
playbook = secure_read_yaml(playbook_path, checks=checks)
tmpobj["name"] = deep_get(playbook, DictPath("vars#metadata#description"))
tmpobj["playbook_types"] = deep_get(playbook,
DictPath("vars#metadata#playbook_types"), ["<any>"])
tmpobj["category"] = deep_get(playbook,
DictPath("vars#metadata#category"), "Uncategorized")
except FileNotFoundError:
tmpobj["name"] = "File not found"
tmpobj["playbook_types"] = ["Unavailable"]
tmpobj["category"] = "Unavailable"
logs = []
for path in natsorted(Path(str(obj)).iterdir()):
if (filename := str(PurePath(str(path)).name)) == "metadata.yaml":
continue
log = secure_read_yaml(FilePath(str(path)))
logs.append({
"index": filename.split("-", maxsplit=1)[0],
"log": log
})
tmpobj["logs"] = logs
return tmpobj
# Objgetters acceptable for direct use in view files
objgetter_allowlist: Dict[str, Callable] = {
"objgetter_ansible_facts": objgetter_ansible_facts,
"objgetter_ansible_log": objgetter_ansible_log,
"objgetter_journalctl_log": objgetter_journalctl_log,
}