From b1980188481c572862ca62f005068397395ff4e9 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Tue, 17 Sep 2024 17:25:26 +0200 Subject: [PATCH 01/10] =?UTF-8?q?=F0=9F=91=94=20Add=20FortiManager=20Metho?= =?UTF-8?q?ds=20to=20handle=20addresses=20and=20services?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fotoobo/fortinet/fortimanager.py | 582 ++++++++++++++++++++++++++++++- 1 file changed, 575 insertions(+), 7 deletions(-) diff --git a/fotoobo/fortinet/fortimanager.py b/fotoobo/fortinet/fortimanager.py index 38c2714..e281263 100644 --- a/fotoobo/fortinet/fortimanager.py +++ b/fotoobo/fortinet/fortimanager.py @@ -15,11 +15,16 @@ log = logging.getLogger("fotoobo") -class FortiManager(Fortinet): +class FortiManager(Fortinet): # pylint: disable=too-many-public-methods """ Represents one FortiManager (digital twin) """ + def __del__(self) -> None: + """The destructor""" + if self.session_key and not self.session_path: + self.logout() + def __init__(self, hostname: str, username: str, password: str, **kwargs: Any) -> None: """ Set some initial parameters. @@ -63,11 +68,6 @@ def __init__(self, hostname: str, username: str, password: str, **kwargs: Any) - "rootp", ] - def __del__(self) -> None: - """The destructor""" - if self.session_key and not self.session_path: - self.logout() - def api( # pylint: disable=too-many-arguments self, method: str, @@ -155,7 +155,371 @@ def assign_all_objects(self, adoms: str, policy: str) -> int: return task_id - def get_adoms(self, ignored_adoms: Optional[List[str]] = None) -> List[Any]: + def delete_adom_address(self, adom: str, address: str) -> bool: + """ + Delete an address from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address from + address: The address to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + url = f"/pm/config/adom/{adom}/obj/firewall/address/{address}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] in [-3, 0]: + # FortiManager response status codes: + # -3: Object does not exist + # 0: OK + result = True + + return result + + def delete_adom_address_group(self, adom: str, group: str) -> bool: + """ + Delete an address group from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address group from + group: The address group to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + url = f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] in [-3, 0]: + # FortiManager response status codes: + # -3: Object does not exist + # 0: OK + result = True + + return result + + def delete_adom_service(self, adom: str, service: str) -> bool: + """ + Delete a service from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address from + service: The service to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + url = f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] in [-3, 0]: + # FortiManager response status codes: + # -3: Object does not exist + # 0: OK + result = True + + return result + + def delete_adom_service_group(self, adom: str, group: str) -> bool: + """ + Delete a service group from an ADOM in FortiManager + + Args: + adom: The ADOM to delete the address group from + group: The address group to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + url = f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] in [-3, 0]: + # FortiManager response status codes: + # -3: Object does not exist + # 0: OK + result = True + + return result + + def delete_global_address(self, address: str) -> bool: + """ + Delete a global address from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global address object we have to check if it's in use in any ADOM. Only + after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links: + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + address: The global address to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + + # Get the address object with 'scope member' information + if address_object := self.get_global_address(address, scope_member=True): + + # Generate a list of ADOMs where the object is used. Therefore we get the address object + # from FortiManager with the 'scope member' option. If the object is used in any ADOM it + # is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in address_object: + used_adoms = [_["name"] for _ in address_object["scope member"]] + log.debug("'%s' is used in ADOM '%s'", address, ",".join(used_adoms)) + + else: + used_adoms = [] + + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_address(adom, address): + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", address, ",".join(blocked_adoms)) + + # Try to delete the global address object + if not blocked_adoms: + url = f"/pm/config/global/obj/firewall/address/{address}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = True + + else: + log.warning("Address '%s' not found in FortiManager", address) + + return result + + def delete_global_address_group(self, group: str) -> bool: + """ + Delete a global address group from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global address group object we have to check if it's in use in any ADOM. + Only after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links: + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + group: The global address group to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + + # Get the address group object with 'scope member' information + if address_group_object := self.get_global_address_group(group, scope_member=True): + + # Generate a list of ADOMs where the object is used. Therefore we get the address group + # object from FortiManager with the 'scope member' option. If the object is used in any + # ADOM it is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in address_group_object: + used_adoms = [_["name"] for _ in address_group_object["scope member"]] + log.debug("'%s' is used in ADOM '%s'", group, ",".join(used_adoms)) + + else: + used_adoms = [] + + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_address_group(adom, group): + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + + # Try to delete the global address group object + if not blocked_adoms: + url = f"/pm/config/global/obj/firewall/addrgrp/{group}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = True + + else: + log.warning("Address group '%s' not found in FortiManager", group) + + return result + + def delete_global_service(self, service: str) -> bool: + """ + Delete a global service from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global service object we have to check if it's in use in any ADOM. Only + after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links: + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + service: The global service to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + + # Get the service object with 'scope member' information + if service_object := self.get_global_service(service, scope_member=True): + + # Generate a list of ADOMs where the object is used. Therefore we get the service object + # from FortiManager with the 'scope member' option. If the object is used in any ADOM it + # is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in service_object: + used_adoms = [_["name"] for _ in service_object["scope member"]] + log.debug("'%s' is used in ADOM '%s'", service, ",".join(used_adoms)) + + else: + used_adoms = [] + + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_service(adom, service): + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", service, ",".join(blocked_adoms)) + + # Try to delete the global service object + if not blocked_adoms: + url = f"/pm/config/global/obj/firewall/service/custom/{service}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = True + + else: + log.warning("Service '%s' not found in FortiManager", service) + + return result + + def delete_global_service_group(self, group: str) -> bool: + """ + Delete a global service group from FortiManager + + To be sure to not delete used objects we configure the FortiManager as follows: + + config system admin setting + set objects-force-deletion disable + + Before deleting a global service group object we have to check if it's in use in any ADOM. + Only after we found that no ADOM uses the object we can safely delete it. + + Fortinet documentation links: + https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 + https://docs.fortinet.com/document/fortimanager/7.6.0/administration-guide/322714 + + Args: + group: The global service group to delete + + Returns: + True if deletion was successful or object does not exist + """ + result = False + + # Get the service group object with 'scope member' information + if service_group_object := self.get_global_service_group(group, scope_member=True): + + # Generate a list of ADOMs where the object is used. Therefore we get the service group + # object from FortiManager with the 'scope member' option. If the object is used in any + # ADOM it is listed in the key 'scope member'. If the object is not used in any ADOM the + # 'scope member' key is not present. + if "scope member" in service_group_object: + used_adoms = [_["name"] for _ in service_group_object["scope member"]] + log.debug("'%s' is used in ADOM '%s'", group, ",".join(used_adoms)) + + else: + used_adoms = [] + + # Try to delete the service group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_service_group(adom, group): + blocked_adoms.append(adom) + + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + + # Try to delete the global service group object + if not blocked_adoms: + url = f"/pm/config/global/obj/firewall/service/group/{group}" + payload = { + "method": "delete", + "params": [{"url": url}], + } + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = True + + else: + log.warning("Service group '%s' not found in FortiManager", group) + + return result + + def get_adoms(self, ignored_adoms: Optional[List[str]] = None) -> list[Any]: """ Get FortiManager ADOM list @@ -177,6 +541,210 @@ def get_adoms(self, ignored_adoms: Optional[List[str]] = None) -> List[Any]: return fmg_adoms + def get_global_address(self, address: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get an address object from global ADOM + + Args: + address: The address to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + The global object from FortiManager + """ + result = {} + url = f"/pm/config/global/obj/firewall/address/{address}" + payload = { + "method": "get", + "params": [{"url": url}], + } + if scope_member: + payload["params"][0]["option"] = ["scope member"] # type: ignore + + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = response["data"] + + return result + + def get_global_addresses(self) -> list[dict[str, Any]]: + """ + Get the global address database + + Returns: + List of global address objects + """ + addresses: list[dict[str, Any]] = [] + payload = { + "method": "get", + "params": [ + { + "url": "/pm/config/global/obj/firewall/address", + # "fields": ["name", "uuid"], + # "range": [0, 10], + } + ], + } + response = self.api("post", payload=payload, timeout=10) + addresses = response.json()["result"][0]["data"] + + return addresses + + def get_global_address_group(self, group: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get an address group object from the global ADOM + + Args: + group: The address group to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + The global object from FortiManager + """ + result = {} + url = f"/pm/config/global/obj/firewall/addrgrp/{group}" + payload = { + "method": "get", + "params": [ + { + "url": url, + } + ], + } + if scope_member: + payload["params"][0]["option"] = ["scope member"] # type: ignore + + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = response["data"] + + return result + + def get_global_address_groups(self) -> list[dict[str, Any]]: + """ + Get the global address group database + + Returns: + List of global address group objects + """ + address_groups: list[dict[str, Any]] = [] + payload = { + "method": "get", + "params": [ + { + "url": "/pm/config/global/obj/firewall/addrgrp", + # "fields": ["name", "uuid"], + # "range": [0, 10], + } + ], + } + response = self.api("post", payload=payload, timeout=10) + address_groups = response.json()["result"][0]["data"] + + return address_groups + + def get_global_service(self, service: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get a service object from global ADOM + + Args: + service: The service to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + The global object from FortiManager + """ + result = {} + url = f"/pm/config/global/obj/firewall/service/custom/{service}" + payload = { + "method": "get", + "params": [{"url": url}], + } + if scope_member: + payload["params"][0]["option"] = ["scope member"] # type: ignore + + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = response["data"] + + return result + + def get_global_services(self) -> list[dict[str, Any]]: + """ + Get the global services database + + Returns: + List of global service objects + """ + addresses: list[dict[str, Any]] = [] + payload = { + "method": "get", + "params": [ + { + "url": "/pm/config/global/obj/firewall/service/custom", + # "fields": ["name", "uuid"], + # "range": [0, 10], + } + ], + } + response = self.api("post", payload=payload, timeout=10) + addresses = response.json()["result"][0]["data"] + + return addresses + + def get_global_service_group(self, group: str, scope_member: bool = False) -> dict[str, Any]: + """ + Get a service group object from the global ADOM + + Args: + group: The address group to get + scope_member: Whether the scope member attribute should be included in the response + + Returns: + The global object from FortiManager or empty if not present + """ + result = {} + url = f"/pm/config/global/obj/firewall/service/group/{group}" + payload = { + "method": "get", + "params": [ + { + "url": url, + } + ], + } + if scope_member: + payload["params"][0]["option"] = ["scope member"] # type: ignore + + response = self.api("post", payload=payload).json()["result"][0] + if response["status"]["code"] == 0: + result = response["data"] + + return result + + def get_global_service_groups(self) -> list[dict[str, Any]]: + """ + Get the global network service group database + + Returns: + List of global network service group objects + """ + service_groups: list[dict[str, Any]] = [] + payload = { + "method": "get", + "params": [ + { + "url": "/pm/config/global/obj/firewall/service/group", + # "fields": ["name", "uuid"], + # "range": [0, 10], + } + ], + } + response = self.api("post", payload=payload, timeout=10) + service_groups = response.json()["result"][0]["data"] + + return service_groups + def get_version(self) -> str: """ Get FortiManager version. From c749652c19920bd681dd20d0dbd4775b361ad70a Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Wed, 18 Sep 2024 07:28:32 +0200 Subject: [PATCH 02/10] =?UTF-8?q?=F0=9F=93=9D=20Rename=20'utils'=20to=20't?= =?UTF-8?q?ools'=20and=20squeeze=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../diagrams/package_structure.drawio.svg | 224 +++++++++--------- 1 file changed, 112 insertions(+), 112 deletions(-) diff --git a/docs/source/developer/architecture/diagrams/package_structure.drawio.svg b/docs/source/developer/architecture/diagrams/package_structure.drawio.svg index d98e8d7..f23a534 100644 --- a/docs/source/developer/architecture/diagrams/package_structure.drawio.svg +++ b/docs/source/developer/architecture/diagrams/package_structure.drawio.svg @@ -1,4 +1,4 @@ - + @@ -18,12 +18,12 @@ - - + + -
+
cli @@ -31,16 +31,16 @@
- + cli - + -
+
Fortinet @@ -48,16 +48,16 @@
- + Fortinet - + -
+
helpers @@ -65,7 +65,7 @@
- + helpers @@ -87,11 +87,11 @@ - + -
+
faz @@ -99,16 +99,16 @@
- + faz - + -
+
fgt @@ -116,16 +116,16 @@
- + fgt - + -
+
fmg @@ -133,16 +133,16 @@
- + fmg - + -
+
Fortinet @@ -150,16 +150,16 @@
- + Fortinet - + -
+
FortiAnalyzer @@ -167,16 +167,16 @@
- + FortiAnalyzer - + -
+
FortiGate @@ -184,16 +184,16 @@
- + FortiGate - + -
+
FortiManager @@ -201,16 +201,16 @@
- + FortiManager - + -
+
config @@ -218,16 +218,16 @@
- + config - + -
+
output @@ -235,16 +235,16 @@
- + output - + -
+
files @@ -252,16 +252,16 @@
- + files - + -
+
log @@ -269,16 +269,16 @@
- + log - + -
+
FortiClientEMS @@ -286,32 +286,31 @@
- + FortiClientEMS - - - - - - - - - - - - - - - + + + + + + + + + + + + + + - + -
+
faz @@ -319,16 +318,16 @@
- + faz - + -
+
fgt @@ -336,16 +335,16 @@
- + fgt - + -
+
fmg @@ -353,38 +352,38 @@
- + fmg - + -
+
- utils + tools
- - utils + + tools - - - - - - + + + + + + -
+
ems @@ -392,16 +391,16 @@
- + ems - + -
+
ems @@ -409,16 +408,16 @@
- + ems - + -
+
inventory @@ -426,17 +425,17 @@
- + inventory - - + + -
+
inventory @@ -444,16 +443,16 @@
- + inventory - + -
+
generic @@ -461,18 +460,18 @@
- + generic - - - + + + -
+
exceptions @@ -480,16 +479,16 @@
- + exceptions - + -
+
exceptions @@ -497,18 +496,18 @@
- + exceptions - - - + + + -
+
result @@ -516,12 +515,13 @@
- + result - + + @@ -531,4 +531,4 @@ - + \ No newline at end of file From 82ea26896afb6a42ae302772dcdca0dbb4add39c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 17 Jun 2024 23:00:03 +0000 Subject: [PATCH 03/10] Bump urllib3 from 2.2.1 to 2.2.2 Bumps [urllib3](https://github.com/urllib3/urllib3) from 2.2.1 to 2.2.2. - [Release notes](https://github.com/urllib3/urllib3/releases) - [Changelog](https://github.com/urllib3/urllib3/blob/main/CHANGES.rst) - [Commits](https://github.com/urllib3/urllib3/compare/2.2.1...2.2.2) --- updated-dependencies: - dependency-name: urllib3 dependency-type: indirect ... Signed-off-by: dependabot[bot] --- poetry.lock | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index 8496b6e..3921a85 100644 --- a/poetry.lock +++ b/poetry.lock @@ -848,6 +848,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -1208,13 +1209,13 @@ files = [ [[package]] name = "urllib3" -version = "2.2.1" +version = "2.2.2" description = "HTTP library with thread-safe connection pooling, file post, and more." optional = false python-versions = ">=3.8" files = [ - {file = "urllib3-2.2.1-py3-none-any.whl", hash = "sha256:450b20ec296a467077128bff42b73080516e71b56ff59a60a02bef2232c4fa9d"}, - {file = "urllib3-2.2.1.tar.gz", hash = "sha256:d0570876c61ab9e520d776c38acbbb5b05a776d3f9ff98a5c8fd5162a444cf19"}, + {file = "urllib3-2.2.2-py3-none-any.whl", hash = "sha256:a448b2f64d686155468037e1ace9f2d2199776e17f0a46610480d311f73e3472"}, + {file = "urllib3-2.2.2.tar.gz", hash = "sha256:dd505485549a7a552833da5e6063639d0d177c04f23bc3864e41e5dc5f612168"}, ] [package.extras] From dfb93ec02b1ab3b908fbde09fe1a795235abff4b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 6 Jul 2024 01:40:16 +0000 Subject: [PATCH 04/10] Bump certifi from 2024.2.2 to 2024.7.4 Bumps [certifi](https://github.com/certifi/python-certifi) from 2024.2.2 to 2024.7.4. - [Commits](https://github.com/certifi/python-certifi/compare/2024.02.02...2024.07.04) --- updated-dependencies: - dependency-name: certifi dependency-type: indirect ... Signed-off-by: dependabot[bot] --- poetry.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index 3921a85..028d6f6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "alabaster" @@ -120,13 +120,13 @@ files = [ [[package]] name = "certifi" -version = "2024.2.2" +version = "2024.7.4" description = "Python package for providing Mozilla's CA Bundle." optional = false python-versions = ">=3.6" files = [ - {file = "certifi-2024.2.2-py3-none-any.whl", hash = "sha256:dc383c07b76109f368f6106eee2b593b04a011ea4d55f652c6ca24a754d1cdd1"}, - {file = "certifi-2024.2.2.tar.gz", hash = "sha256:0569859f95fc761b18b45ef421b1290a0f65f147e92a1e5eb3e635f9a5e4e66f"}, + {file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"}, + {file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"}, ] [[package]] From f551ae79781023e775933d44aca711c133de049f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 9 Jul 2024 19:27:27 +0000 Subject: [PATCH 05/10] Bump zipp from 3.18.2 to 3.19.1 Bumps [zipp](https://github.com/jaraco/zipp) from 3.18.2 to 3.19.1. - [Release notes](https://github.com/jaraco/zipp/releases) - [Changelog](https://github.com/jaraco/zipp/blob/main/NEWS.rst) - [Commits](https://github.com/jaraco/zipp/compare/v3.18.2...v3.19.1) --- updated-dependencies: - dependency-name: zipp dependency-type: indirect ... Signed-off-by: dependabot[bot] --- poetry.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/poetry.lock b/poetry.lock index 028d6f6..db85e59 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1246,18 +1246,18 @@ test = ["covdefaults (>=2.3)", "coverage (>=7.2.7)", "coverage-enable-subprocess [[package]] name = "zipp" -version = "3.18.2" +version = "3.19.1" description = "Backport of pathlib-compatible object wrapper for zip files" optional = false python-versions = ">=3.8" files = [ - {file = "zipp-3.18.2-py3-none-any.whl", hash = "sha256:dce197b859eb796242b0622af1b8beb0a722d52aa2f57133ead08edd5bf5374e"}, - {file = "zipp-3.18.2.tar.gz", hash = "sha256:6278d9ddbcfb1f1089a88fde84481528b07b0e10474e09dcfe53dad4069fa059"}, + {file = "zipp-3.19.1-py3-none-any.whl", hash = "sha256:2828e64edb5386ea6a52e7ba7cdb17bb30a73a858f5eb6eb93d8d36f5ea26091"}, + {file = "zipp-3.19.1.tar.gz", hash = "sha256:35427f6d5594f4acf82d25541438348c26736fa9b3afa2754bcd63cdb99d8e8f"}, ] [package.extras] -docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -testing = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] [metadata] lock-version = "2.0" From 27f826645526bca3f998c1b7de0c3e1f6bebb22d Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Wed, 18 Sep 2024 12:20:16 +0200 Subject: [PATCH 06/10] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Refactor=20new=20met?= =?UTF-8?q?hods=20to=20get=20consistent=20return=20values?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- WHATSNEW.md | 2 + fotoobo/fortinet/fortimanager.py | 327 ++++++++++++------------------- 2 files changed, 123 insertions(+), 206 deletions(-) diff --git a/WHATSNEW.md b/WHATSNEW.md index dc0bacb..601a9e5 100644 --- a/WHATSNEW.md +++ b/WHATSNEW.md @@ -1,5 +1,7 @@ ### Added +- FortiManager Methods for handling addresses and services (get and delete) + ### Changed - Upgrade requests to >=2.32 due to dependabot warning diff --git a/fotoobo/fortinet/fortimanager.py b/fotoobo/fortinet/fortimanager.py index e281263..0ba9c18 100644 --- a/fotoobo/fortinet/fortimanager.py +++ b/fotoobo/fortinet/fortimanager.py @@ -155,7 +155,7 @@ def assign_all_objects(self, adoms: str, policy: str) -> int: return task_id - def delete_adom_address(self, adom: str, address: str) -> bool: + def delete_adom_address(self, adom: str, address: str) -> dict[str, Any]: """ Delete an address from an ADOM in FortiManager @@ -164,24 +164,18 @@ def delete_adom_address(self, adom: str, address: str) -> bool: address: The address to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False - url = f"/pm/config/adom/{adom}/obj/firewall/address/{address}" + result: dict[str, Any] = {} payload = { "method": "delete", - "params": [{"url": url}], + "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/address/{address}"}], } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] in [-3, 0]: - # FortiManager response status codes: - # -3: Object does not exist - # 0: OK - result = True + result = self.api("post", payload=payload).json()["result"][0] return result - def delete_adom_address_group(self, adom: str, group: str) -> bool: + def delete_adom_address_group(self, adom: str, group: str) -> dict[str, Any]: """ Delete an address group from an ADOM in FortiManager @@ -190,24 +184,18 @@ def delete_adom_address_group(self, adom: str, group: str) -> bool: group: The address group to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False - url = f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}" + result: dict[str, Any] = {} payload = { "method": "delete", - "params": [{"url": url}], + "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}"}], } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] in [-3, 0]: - # FortiManager response status codes: - # -3: Object does not exist - # 0: OK - result = True + result = self.api("post", payload=payload).json()["result"][0] return result - def delete_adom_service(self, adom: str, service: str) -> bool: + def delete_adom_service(self, adom: str, service: str) -> dict[str, Any]: """ Delete a service from an ADOM in FortiManager @@ -216,24 +204,18 @@ def delete_adom_service(self, adom: str, service: str) -> bool: service: The service to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False - url = f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}" + result: dict[str, Any] = {} payload = { "method": "delete", - "params": [{"url": url}], + "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}"}], } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] in [-3, 0]: - # FortiManager response status codes: - # -3: Object does not exist - # 0: OK - result = True + result = self.api("post", payload=payload).json()["result"][0] return result - def delete_adom_service_group(self, adom: str, group: str) -> bool: + def delete_adom_service_group(self, adom: str, group: str) -> dict[str, Any]: """ Delete a service group from an ADOM in FortiManager @@ -242,24 +224,18 @@ def delete_adom_service_group(self, adom: str, group: str) -> bool: group: The address group to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False - url = f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}" + result: dict[str, Any] = {} payload = { "method": "delete", - "params": [{"url": url}], + "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}"}], } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] in [-3, 0]: - # FortiManager response status codes: - # -3: Object does not exist - # 0: OK - result = True + result = self.api("post", payload=payload).json()["result"][0] return result - def delete_global_address(self, address: str) -> bool: + def delete_global_address(self, address: str) -> dict[str, Any]: """ Delete a global address from FortiManager @@ -271,7 +247,7 @@ def delete_global_address(self, address: str) -> bool: Before deleting a global address object we have to check if it's in use in any ADOM. Only after we found that no ADOM uses the object we can safely delete it. - Fortinet documentation links: + Fortinet documentation links:\n https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 @@ -281,50 +257,46 @@ def delete_global_address(self, address: str) -> bool: address: The global address to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False + result: dict[str, Any] = {} # Get the address object with 'scope member' information - if address_object := self.get_global_address(address, scope_member=True): - + address_object = self.get_global_address(address, scope_member=True) + if address_object["status"]["code"] == 0: # Generate a list of ADOMs where the object is used. Therefore we get the address object # from FortiManager with the 'scope member' option. If the object is used in any ADOM it # is listed in the key 'scope member'. If the object is not used in any ADOM the # 'scope member' key is not present. - if "scope member" in address_object: - used_adoms = [_["name"] for _ in address_object["scope member"]] + if "scope member" in address_object["data"]: + used_adoms = [_["name"] for _ in address_object["data"]["scope member"]] log.debug("'%s' is used in ADOM '%s'", address, ",".join(used_adoms)) else: used_adoms = [] - # Try to delete the address group object in every ADOM + # Try to delete the address group object in every ADOM blocked_adoms = [] for adom in used_adoms: - if not self.delete_adom_address(adom, address): + if self.delete_adom_address(adom, address)["status"]["code"] not in [-3, 0]: blocked_adoms.append(adom) if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", address, ",".join(blocked_adoms)) # Try to delete the global address object - if not blocked_adoms: - url = f"/pm/config/global/obj/firewall/address/{address}" - payload = { - "method": "delete", - "params": [{"url": url}], - } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = True + payload = { + "method": "delete", + "params": [{"url": f"/pm/config/global/obj/firewall/address/{address}"}], + } + result = self.api("post", payload=payload).json()["result"][0] else: - log.warning("Address '%s' not found in FortiManager", address) + result = address_object return result - def delete_global_address_group(self, group: str) -> bool: + def delete_global_address_group(self, group: str) -> dict[str, Any]: """ Delete a global address group from FortiManager @@ -336,7 +308,7 @@ def delete_global_address_group(self, group: str) -> bool: Before deleting a global address group object we have to check if it's in use in any ADOM. Only after we found that no ADOM uses the object we can safely delete it. - Fortinet documentation links: + Fortinet documentation links:\n https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 @@ -346,19 +318,20 @@ def delete_global_address_group(self, group: str) -> bool: group: The global address group to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False + result: dict[str, Any] = {} # Get the address group object with 'scope member' information - if address_group_object := self.get_global_address_group(group, scope_member=True): + address_group_object = self.get_global_address_group(group, scope_member=True) + if address_group_object["status"]["code"] == 0: # Generate a list of ADOMs where the object is used. Therefore we get the address group # object from FortiManager with the 'scope member' option. If the object is used in any # ADOM it is listed in the key 'scope member'. If the object is not used in any ADOM the # 'scope member' key is not present. - if "scope member" in address_group_object: - used_adoms = [_["name"] for _ in address_group_object["scope member"]] + if "scope member" in address_group_object["data"]: + used_adoms = [_["name"] for _ in address_group_object["data"]["scope member"]] log.debug("'%s' is used in ADOM '%s'", group, ",".join(used_adoms)) else: @@ -367,29 +340,25 @@ def delete_global_address_group(self, group: str) -> bool: # Try to delete the address group object in every ADOM blocked_adoms = [] for adom in used_adoms: - if not self.delete_adom_address_group(adom, group): + if not self.delete_adom_address_group(adom, group)["status"]["code"] in [-3, 0]: blocked_adoms.append(adom) if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) # Try to delete the global address group object - if not blocked_adoms: - url = f"/pm/config/global/obj/firewall/addrgrp/{group}" - payload = { - "method": "delete", - "params": [{"url": url}], - } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = True + payload = { + "method": "delete", + "params": [{"url": f"/pm/config/global/obj/firewall/addrgrp/{group}"}], + } + result = self.api("post", payload=payload).json()["result"][0] else: - log.warning("Address group '%s' not found in FortiManager", group) + result = address_group_object return result - def delete_global_service(self, service: str) -> bool: + def delete_global_service(self, service: str) -> dict[str, Any]: """ Delete a global service from FortiManager @@ -401,7 +370,7 @@ def delete_global_service(self, service: str) -> bool: Before deleting a global service object we have to check if it's in use in any ADOM. Only after we found that no ADOM uses the object we can safely delete it. - Fortinet documentation links: + Fortinet documentation links:\n https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 @@ -411,19 +380,20 @@ def delete_global_service(self, service: str) -> bool: service: The global service to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False + result: dict[str, Any] = {} # Get the service object with 'scope member' information - if service_object := self.get_global_service(service, scope_member=True): + service_object = self.get_global_service(service, scope_member=True) + if service_object["status"]["code"] == 0: # Generate a list of ADOMs where the object is used. Therefore we get the service object # from FortiManager with the 'scope member' option. If the object is used in any ADOM it # is listed in the key 'scope member'. If the object is not used in any ADOM the # 'scope member' key is not present. - if "scope member" in service_object: - used_adoms = [_["name"] for _ in service_object["scope member"]] + if "scope member" in service_object["data"]: + used_adoms = [_["name"] for _ in service_object["data"]["scope member"]] log.debug("'%s' is used in ADOM '%s'", service, ",".join(used_adoms)) else: @@ -432,29 +402,25 @@ def delete_global_service(self, service: str) -> bool: # Try to delete the address group object in every ADOM blocked_adoms = [] for adom in used_adoms: - if not self.delete_adom_service(adom, service): + if not self.delete_adom_service(adom, service)["status"]["code"] in [-3, 0]: blocked_adoms.append(adom) if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", service, ",".join(blocked_adoms)) # Try to delete the global service object - if not blocked_adoms: - url = f"/pm/config/global/obj/firewall/service/custom/{service}" - payload = { - "method": "delete", - "params": [{"url": url}], - } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = True + payload = { + "method": "delete", + "params": [{"url": f"/pm/config/global/obj/firewall/service/custom/{service}"}], + } + result = self.api("post", payload=payload).json()["result"][0] else: - log.warning("Service '%s' not found in FortiManager", service) + result = service_object return result - def delete_global_service_group(self, group: str) -> bool: + def delete_global_service_group(self, group: str) -> dict[str, Any]: """ Delete a global service group from FortiManager @@ -466,7 +432,7 @@ def delete_global_service_group(self, group: str) -> bool: Before deleting a global service group object we have to check if it's in use in any ADOM. Only after we found that no ADOM uses the object we can safely delete it. - Fortinet documentation links: + Fortinet documentation links:\n https://docs.fortinet.com/document/fortimanager/7.0.12/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.2.7/administration-guide/322714 https://docs.fortinet.com/document/fortimanager/7.4.3/administration-guide/322714 @@ -476,19 +442,20 @@ def delete_global_service_group(self, group: str) -> bool: group: The global service group to delete Returns: - True if deletion was successful or object does not exist + FortiManager result item """ - result = False + result: dict[str, Any] = {} # Get the service group object with 'scope member' information - if service_group_object := self.get_global_service_group(group, scope_member=True): + service_group_object = self.get_global_service_group(group, scope_member=True) + if service_group_object["status"]["code"] == 0: # Generate a list of ADOMs where the object is used. Therefore we get the service group # object from FortiManager with the 'scope member' option. If the object is used in any # ADOM it is listed in the key 'scope member'. If the object is not used in any ADOM the # 'scope member' key is not present. - if "scope member" in service_group_object: - used_adoms = [_["name"] for _ in service_group_object["scope member"]] + if "scope member" in service_group_object["data"]: + used_adoms = [_["name"] for _ in service_group_object["data"]["scope member"]] log.debug("'%s' is used in ADOM '%s'", group, ",".join(used_adoms)) else: @@ -497,25 +464,21 @@ def delete_global_service_group(self, group: str) -> bool: # Try to delete the service group object in every ADOM blocked_adoms = [] for adom in used_adoms: - if not self.delete_adom_service_group(adom, group): + if not self.delete_adom_service_group(adom, group)["status"]["code"] in [-3, 0]: blocked_adoms.append(adom) if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) # Try to delete the global service group object - if not blocked_adoms: - url = f"/pm/config/global/obj/firewall/service/group/{group}" - payload = { - "method": "delete", - "params": [{"url": url}], - } - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = True + payload = { + "method": "delete", + "params": [{"url": f"/pm/config/global/obj/firewall/service/group/{group}"}], + } + result = self.api("post", payload=payload).json()["result"][0] else: - log.warning("Service group '%s' not found in FortiManager", group) + result = service_group_object return result @@ -550,45 +513,35 @@ def get_global_address(self, address: str, scope_member: bool = False) -> dict[s scope_member: Whether the scope member attribute should be included in the response Returns: - The global object from FortiManager + FortiManager result item """ - result = {} - url = f"/pm/config/global/obj/firewall/address/{address}" + result: dict[str, Any] = {} payload = { "method": "get", - "params": [{"url": url}], + "params": [{"url": f"/pm/config/global/obj/firewall/address/{address}"}], } if scope_member: payload["params"][0]["option"] = ["scope member"] # type: ignore - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = response["data"] + result = self.api("post", payload=payload).json()["result"][0] return result - def get_global_addresses(self) -> list[dict[str, Any]]: + def get_global_addresses(self) -> dict[str, Any]: """ Get the global address database Returns: - List of global address objects + FortiManager result item """ - addresses: list[dict[str, Any]] = [] + result: dict[str, Any] = {} payload = { "method": "get", - "params": [ - { - "url": "/pm/config/global/obj/firewall/address", - # "fields": ["name", "uuid"], - # "range": [0, 10], - } - ], + "params": [{"url": "/pm/config/global/obj/firewall/address"}], } - response = self.api("post", payload=payload, timeout=10) - addresses = response.json()["result"][0]["data"] + result = self.api("post", payload=payload, timeout=10).json()["result"][0] - return addresses + return result def get_global_address_group(self, group: str, scope_member: bool = False) -> dict[str, Any]: """ @@ -599,49 +552,35 @@ def get_global_address_group(self, group: str, scope_member: bool = False) -> di scope_member: Whether the scope member attribute should be included in the response Returns: - The global object from FortiManager + FortiManager result item """ - result = {} - url = f"/pm/config/global/obj/firewall/addrgrp/{group}" + result: dict[str, Any] = {} payload = { "method": "get", - "params": [ - { - "url": url, - } - ], + "params": [{"url": f"/pm/config/global/obj/firewall/addrgrp/{group}"}], } if scope_member: payload["params"][0]["option"] = ["scope member"] # type: ignore - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = response["data"] + result = self.api("post", payload=payload).json()["result"][0] return result - def get_global_address_groups(self) -> list[dict[str, Any]]: + def get_global_address_groups(self) -> dict[str, Any]: """ Get the global address group database Returns: - List of global address group objects + FortiManager result item """ - address_groups: list[dict[str, Any]] = [] + result: dict[str, Any] = {} payload = { "method": "get", - "params": [ - { - "url": "/pm/config/global/obj/firewall/addrgrp", - # "fields": ["name", "uuid"], - # "range": [0, 10], - } - ], + "params": [{"url": "/pm/config/global/obj/firewall/addrgrp"}], } - response = self.api("post", payload=payload, timeout=10) - address_groups = response.json()["result"][0]["data"] + result = self.api("post", payload=payload, timeout=10).json()["result"][0] - return address_groups + return result def get_global_service(self, service: str, scope_member: bool = False) -> dict[str, Any]: """ @@ -652,45 +591,35 @@ def get_global_service(self, service: str, scope_member: bool = False) -> dict[s scope_member: Whether the scope member attribute should be included in the response Returns: - The global object from FortiManager + FortiManager result item """ - result = {} - url = f"/pm/config/global/obj/firewall/service/custom/{service}" + result: dict[str, Any] = {} payload = { "method": "get", - "params": [{"url": url}], + "params": [{"url": f"/pm/config/global/obj/firewall/service/custom/{service}"}], } if scope_member: payload["params"][0]["option"] = ["scope member"] # type: ignore - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = response["data"] + result = self.api("post", payload=payload).json()["result"][0] return result - def get_global_services(self) -> list[dict[str, Any]]: + def get_global_services(self) -> dict[str, Any]: """ Get the global services database Returns: - List of global service objects + FortiManager result item """ - addresses: list[dict[str, Any]] = [] + result: dict[str, Any] = {} payload = { "method": "get", - "params": [ - { - "url": "/pm/config/global/obj/firewall/service/custom", - # "fields": ["name", "uuid"], - # "range": [0, 10], - } - ], + "params": [{"url": "/pm/config/global/obj/firewall/service/custom"}], } - response = self.api("post", payload=payload, timeout=10) - addresses = response.json()["result"][0]["data"] + result = self.api("post", payload=payload, timeout=10).json()["result"][0] - return addresses + return result def get_global_service_group(self, group: str, scope_member: bool = False) -> dict[str, Any]: """ @@ -701,49 +630,35 @@ def get_global_service_group(self, group: str, scope_member: bool = False) -> di scope_member: Whether the scope member attribute should be included in the response Returns: - The global object from FortiManager or empty if not present + FortiManager result item """ - result = {} - url = f"/pm/config/global/obj/firewall/service/group/{group}" + result: dict[str, Any] = {} payload = { "method": "get", - "params": [ - { - "url": url, - } - ], + "params": [{"url": f"/pm/config/global/obj/firewall/service/group/{group}"}], } if scope_member: payload["params"][0]["option"] = ["scope member"] # type: ignore - response = self.api("post", payload=payload).json()["result"][0] - if response["status"]["code"] == 0: - result = response["data"] + result = self.api("post", payload=payload).json()["result"][0] return result - def get_global_service_groups(self) -> list[dict[str, Any]]: + def get_global_service_groups(self) -> dict[str, Any]: """ Get the global network service group database Returns: - List of global network service group objects + FortiManager result item """ - service_groups: list[dict[str, Any]] = [] + result: dict[str, Any] = {} payload = { "method": "get", - "params": [ - { - "url": "/pm/config/global/obj/firewall/service/group", - # "fields": ["name", "uuid"], - # "range": [0, 10], - } - ], + "params": [{"url": "/pm/config/global/obj/firewall/service/group"}], } - response = self.api("post", payload=payload, timeout=10) - service_groups = response.json()["result"][0]["data"] + result = self.api("post", payload=payload, timeout=10).json()["result"][0] - return service_groups + return result def get_version(self) -> str: """ From b1b14c055c4cf100c9b3a8ecab9db25456a65b0e Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Wed, 25 Sep 2024 11:09:43 +0200 Subject: [PATCH 07/10] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Optimize=20and=20fin?= =?UTF-8?q?alize=20address=20and=20service=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fotoobo/fortinet/fortimanager.py | 194 ++++++----- tests/fortinet/test_fortimanager.py | 505 +++++++++++++++++++++++++++- 2 files changed, 590 insertions(+), 109 deletions(-) diff --git a/fotoobo/fortinet/fortimanager.py b/fotoobo/fortinet/fortimanager.py index 0ba9c18..933ab6f 100644 --- a/fotoobo/fortinet/fortimanager.py +++ b/fotoobo/fortinet/fortimanager.py @@ -68,6 +68,44 @@ def __init__(self, hostname: str, username: str, password: str, **kwargs: Any) - "rootp", ] + def api_delete(self, url: str) -> requests.models.Response: + """DELETE method for API requests + + Args: + url: API endpoint to access + + Result: + FortiManager result item + """ + payload = { + "method": "delete", + "params": [{"url": f"{url}"}], + } + return self.api("post", payload=payload) + + def api_get( + self, url: str, params: Optional[dict[str, Any]] = None, timeout: Optional[float] = None + ) -> requests.models.Response: + """GET method for API requests + + Args: + url: API endpoint to access + params: Additional query parameters if needed + timeout: The requests read timeout in seconds + + Result: + FortiManager result item + """ + _params = {"url": f"{url}"} + if params: + _params = {**_params, **params} + + payload = { + "method": "get", + "params": [_params], + } + return self.api("post", payload=payload, timeout=timeout) + def api( # pylint: disable=too-many-arguments self, method: str, @@ -166,12 +204,8 @@ def delete_adom_address(self, adom: str, address: str) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/address/{address}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/adom/{adom}/obj/firewall/address/{address}" + result: dict[str, Any] = self.api_delete(url).json()["result"][0] return result @@ -186,12 +220,8 @@ def delete_adom_address_group(self, adom: str, group: str) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}" + result: dict[str, Any] = self.api_delete(url).json()["result"][0] return result @@ -206,12 +236,8 @@ def delete_adom_service(self, adom: str, service: str) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}" + result: dict[str, Any] = self.api_delete(url).json()["result"][0] return result @@ -226,12 +252,8 @@ def delete_adom_service_group(self, adom: str, group: str) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}" + result: dict[str, Any] = self.api_delete(url).json()["result"][0] return result @@ -285,11 +307,8 @@ def delete_global_address(self, address: str) -> dict[str, Any]: log.warning("'%s' blocked by ADOM '%s'", address, ",".join(blocked_adoms)) # Try to delete the global address object - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/global/obj/firewall/address/{address}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/global/obj/firewall/address/{address}" + result = self.api_delete(url).json()["result"][0] else: result = address_object @@ -347,11 +366,8 @@ def delete_global_address_group(self, group: str) -> dict[str, Any]: log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) # Try to delete the global address group object - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/global/obj/firewall/addrgrp/{group}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" + result = self.api_delete(url).json()["result"][0] else: result = address_group_object @@ -409,11 +425,8 @@ def delete_global_service(self, service: str) -> dict[str, Any]: log.warning("'%s' blocked by ADOM '%s'", service, ",".join(blocked_adoms)) # Try to delete the global service object - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/global/obj/firewall/service/custom/{service}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" + result = self.api_delete(url).json()["result"][0] else: result = service_object @@ -471,11 +484,8 @@ def delete_global_service_group(self, group: str) -> dict[str, Any]: log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) # Try to delete the global service group object - payload = { - "method": "delete", - "params": [{"url": f"/pm/config/global/obj/firewall/service/group/{group}"}], - } - result = self.api("post", payload=payload).json()["result"][0] + url: str = f"/pm/config/global/obj/firewall/service/group/{group}" + result = self.api_delete(url).json()["result"][0] else: result = service_group_object @@ -516,14 +526,16 @@ def get_global_address(self, address: str, scope_member: bool = False) -> dict[s FortiManager result item """ result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": f"/pm/config/global/obj/firewall/address/{address}"}], - } + url: str = f"/pm/config/global/obj/firewall/address/{address}" + if scope_member: - payload["params"][0]["option"] = ["scope member"] # type: ignore + params = {"option": ["scope member"]} + response = self.api_get(url, params) - result = self.api("post", payload=payload).json()["result"][0] + else: + response = self.api_get(url) + + result = response.json()["result"][0] return result @@ -534,12 +546,10 @@ def get_global_addresses(self) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": "/pm/config/global/obj/firewall/address"}], - } - result = self.api("post", payload=payload, timeout=10).json()["result"][0] + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/address", + timeout=10, + ).json()["result"][0] return result @@ -555,14 +565,16 @@ def get_global_address_group(self, group: str, scope_member: bool = False) -> di FortiManager result item """ result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": f"/pm/config/global/obj/firewall/addrgrp/{group}"}], - } + url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" + if scope_member: - payload["params"][0]["option"] = ["scope member"] # type: ignore + params = {"option": ["scope member"]} + response = self.api_get(url, params) - result = self.api("post", payload=payload).json()["result"][0] + else: + response = self.api_get(url) + + result = response.json()["result"][0] return result @@ -573,12 +585,10 @@ def get_global_address_groups(self) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": "/pm/config/global/obj/firewall/addrgrp"}], - } - result = self.api("post", payload=payload, timeout=10).json()["result"][0] + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/addrgrp", + timeout=10, + ).json()["result"][0] return result @@ -594,14 +604,16 @@ def get_global_service(self, service: str, scope_member: bool = False) -> dict[s FortiManager result item """ result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": f"/pm/config/global/obj/firewall/service/custom/{service}"}], - } + url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" + if scope_member: - payload["params"][0]["option"] = ["scope member"] # type: ignore + params = {"option": ["scope member"]} + response = self.api_get(url, params) - result = self.api("post", payload=payload).json()["result"][0] + else: + response = self.api_get(url) + + result = response.json()["result"][0] return result @@ -612,12 +624,10 @@ def get_global_services(self) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": "/pm/config/global/obj/firewall/service/custom"}], - } - result = self.api("post", payload=payload, timeout=10).json()["result"][0] + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/service/custom", + timeout=10, + ).json()["result"][0] return result @@ -633,14 +643,16 @@ def get_global_service_group(self, group: str, scope_member: bool = False) -> di FortiManager result item """ result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": f"/pm/config/global/obj/firewall/service/group/{group}"}], - } + url: str = f"/pm/config/global/obj/firewall/service/group/{group}" + if scope_member: - payload["params"][0]["option"] = ["scope member"] # type: ignore + params = {"option": ["scope member"]} + response = self.api_get(url, params) + + else: + response = self.api_get(url) - result = self.api("post", payload=payload).json()["result"][0] + result = response.json()["result"][0] return result @@ -651,12 +663,10 @@ def get_global_service_groups(self) -> dict[str, Any]: Returns: FortiManager result item """ - result: dict[str, Any] = {} - payload = { - "method": "get", - "params": [{"url": "/pm/config/global/obj/firewall/service/group"}], - } - result = self.api("post", payload=payload, timeout=10).json()["result"][0] + result: dict[str, Any] = self.api_get( + "/pm/config/global/obj/firewall/service/group", + timeout=10, + ).json()["result"][0] return result diff --git a/tests/fortinet/test_fortimanager.py b/tests/fortinet/test_fortimanager.py index 99d4aa4..552a1c7 100644 --- a/tests/fortinet/test_fortimanager.py +++ b/tests/fortinet/test_fortimanager.py @@ -3,6 +3,8 @@ """ # pylint: disable=no-member +# mypy: disable-error-code=attr-defined +from typing import Any from unittest.mock import MagicMock import pytest @@ -14,9 +16,138 @@ from tests.helper import ResponseMock -class TestFortiManager: +class TestFortiManager: # pylint: disable=too-many-public-methods """Test the FortiManager class""" + @staticmethod + @pytest.fixture + def response_mock_api_ok() -> MagicMock: + """Fixture to return a mocked response for API ok""" + return MagicMock( + return_value=ResponseMock( + json={"result": [{"status": {"code": 0, "message": "OK"}}]}, + status_code=200, + ) + ) + + @staticmethod + @pytest.fixture + def api_delete_ok(response_mock_api_ok: MagicMock, monkeypatch: MonkeyPatch) -> None: + """Fixture to patch the FortiManager.api_delete() method""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.api_delete", response_mock_api_ok + ) + + @staticmethod + @pytest.fixture + def api_get_ok(response_mock_api_ok: MagicMock, monkeypatch: MonkeyPatch) -> None: + """Fixture to patch the FortiManager.api_get() method""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.api_get", response_mock_api_ok + ) + + @staticmethod + def test_api_delete(monkeypatch: MonkeyPatch) -> None: + """Test api_delete""" + url: str = "/pm/config/adom/dummy/obj/firewall/address/dummy" + monkeypatch.setattr( + "fotoobo.fortinet.fortinet.requests.Session.post", + MagicMock( + return_value=ResponseMock( + json={ + "result": [ + { + "status": {"code": 0, "message": "OK"}, + "url": url, + } + ] + }, + status_code=200, + ) + ), + ) + + fmg = FortiManager("host", "", "") + assert fmg.api_delete(url).json()["result"][0]["status"]["code"] == 0 + requests.Session.post.assert_called_with( + "https://host:443/jsonrpc", + headers=None, + json={"method": "delete", "params": [{"url": url}], "session": ""}, + params=None, + timeout=3, + verify=True, + ) + + @staticmethod + @pytest.mark.parametrize( + "with_params", + ( + pytest.param(False, id="without params"), + pytest.param(True, id="with params"), + ), + ) + def test_api_get(with_params: bool, monkeypatch: MonkeyPatch) -> None: + """test api_get""" + url: str = "/pm/config/global/obj/firewall/address/dummy" + params = {"option": ["scope member"]} + monkeypatch.setattr( + "fotoobo.fortinet.fortinet.requests.Session.post", + MagicMock( + return_value=ResponseMock( + json={ + "result": [ + { + "data": { + "name": "dummy", + "uuid": "88888888-4444-4444-4444-121212121212", + }, + "status": {"code": 0, "message": "OK"}, + "url": url, + } + ] + }, + status_code=200, + ) + ), + ) + fmg = FortiManager("host", "", "") + + expected_call = ( + ["https://host:443/jsonrpc"], + { + "headers": None, + "json": { + "method": "get", + "params": [ + { + "url": "/pm/config/global/obj/firewall/address/dummy", + } + ], + "session": "", + }, + "params": None, + "timeout": 3, + "verify": True, + }, + ) + if with_params: + assert fmg.api_get(url, params).json()["result"][0]["status"]["code"] == 0 + expected_call[1]["json"]["params"][0] = { # type: ignore + **expected_call[1]["json"]["params"][0], # type: ignore + **{"option": ["scope member"]}, + } + requests.Session.post.assert_called_with( + *expected_call[0], + **expected_call[1], + ) + + else: + assert fmg.api_get(url).json()["result"][0]["status"]["code"] == 0 + requests.Session.post.assert_called_with( + *expected_call[0], + **expected_call[1], + ) + @staticmethod def test_assign_all_objects(monkeypatch: MonkeyPatch) -> None: """Test assign_all_objects""" @@ -38,7 +169,7 @@ def test_assign_all_objects(monkeypatch: MonkeyPatch) -> None: ), ) assert FortiManager("host", "", "").assign_all_objects("dummy_adom", "dummy_policy") == 111 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -70,7 +201,7 @@ def test_assign_all_objects_http_404(monkeypatch: MonkeyPatch) -> None: with pytest.raises(APIError) as err: FortiManager("host", "", "").assign_all_objects("dummy_adom", "dummy_policy") assert "HTTP/404 Resource Not Found" in str(err.value) - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -113,7 +244,7 @@ def test_assign_all_objects_status_not_ok(monkeypatch: MonkeyPatch) -> None: ), ) assert FortiManager("host", "", "").assign_all_objects("adom1,adom2", "policy1") == 0 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -138,6 +269,184 @@ def test_assign_all_objects_status_not_ok(monkeypatch: MonkeyPatch) -> None: verify=True, ) + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address() -> None: + """Test fmg delete_adom_address""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/address/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address_group() -> None: + """Test fmg delete_adom_address_group""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address_group("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/addrgrp/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service() -> None: + """Test fmg delete_adom_service""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/service/custom/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service_group() -> None: + """Test fmg delete_adom_service_group""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service_group("dummy", "dummy")["status"]["code"] == 0 + FortiManager.api_delete.assert_called_with( + "/pm/config/adom/dummy/obj/firewall/service/group/dummy" + ) + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_address_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_address_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_address_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_address( + get_global_address_data: dict[str, Any], + get_global_address_status: dict[str, Any], + delete_adom_address_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_address""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_address", + MagicMock( + return_value={ + "data": get_global_address_data, + "status": get_global_address_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_address", + MagicMock(return_value={"status": delete_adom_address_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address("dummy")["status"]["code"] in [0, 7] + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_address_group_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_address_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_address_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_address_group( + get_global_address_group_data: dict[str, Any], + get_global_address_group_status: dict[str, Any], + delete_adom_address_group_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_address_group""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_address_group", + MagicMock( + return_value={ + "data": get_global_address_group_data, + "status": get_global_address_group_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_address_group", + MagicMock(return_value={"status": delete_adom_address_group_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address_group("dummy")["status"]["code"] in [0, 7] + + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_service_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_service_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_service_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_service( + get_global_service_data: dict[str, Any], + get_global_service_status: dict[str, Any], + delete_adom_service_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_service""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_service", + MagicMock( + return_value={ + "data": get_global_service_data, + "status": get_global_service_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_service", + MagicMock(return_value={"status": delete_adom_service_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service("dummy")["status"]["code"] in [0, 7] + @staticmethod def test_get_adoms(monkeypatch: MonkeyPatch) -> None: """Test fmg get adoms""" @@ -150,7 +459,7 @@ def test_get_adoms(monkeypatch: MonkeyPatch) -> None: ), ) assert FortiManager("host", "", "").get_adoms() == [{"name": "dummy"}] - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/dvmdb/adom"}], "session": ""}, @@ -159,6 +468,52 @@ def test_get_adoms(monkeypatch: MonkeyPatch) -> None: verify=True, ) + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + @pytest.mark.parametrize( + "get_global_service_group_data", + ( + pytest.param({"scope member": [{"name": "ADOM"}]}, id="with scope member"), + pytest.param({}, id="without scope member"), + ), + ) + @pytest.mark.parametrize( + "get_global_service_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + @pytest.mark.parametrize( + "delete_adom_service_group_status", + ( + pytest.param({"code": 0, "message": "OK"}, id="status OK"), + pytest.param({"code": 7, "message": "dummy"}, id="status dummy"), + ), + ) + def test_delete_global_service_group( + get_global_service_group_data: dict[str, Any], + get_global_service_group_status: dict[str, Any], + delete_adom_service_group_status: dict[str, Any], + monkeypatch: MonkeyPatch, + ) -> None: + """Test fmg delete_global_service_group""" + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.get_global_service_group", + MagicMock( + return_value={ + "data": get_global_service_group_data, + "status": get_global_service_group_status, + } + ), + ) + monkeypatch.setattr( + "fotoobo.fortinet.fortimanager.FortiManager.delete_adom_service_group", + MagicMock(return_value={"status": delete_adom_service_group_status}), + ) + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service_group("dummy")["status"]["code"] in [0, 7] + @staticmethod def test_get_adoms_http_error(monkeypatch: MonkeyPatch) -> None: """Test fmg get adoms with a status != 200""" @@ -170,6 +525,122 @@ def test_get_adoms_http_error(monkeypatch: MonkeyPatch) -> None: FortiManager("", "", "").get_adoms() assert "HTTP/400 Bad Request" in str(err.value) + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_address(scope_member: bool) -> None: + """Test fmg get_global_address""" + fmg = FortiManager("host", "", "") + assert fmg.get_global_address("dummy", scope_member=scope_member)["status"]["code"] == 0 + expected_call: list[Any] = ["/pm/config/global/obj/firewall/address/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + + FortiManager.api_get.assert_called_with(*expected_call) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_addresses() -> None: + """Test fmg get_global_addresses""" + assert FortiManager("host", "", "").get_global_addresses()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/address", timeout=10 + ) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_address_group(scope_member: bool) -> None: + """Test fmg get_global_address_group""" + fmg = FortiManager("host", "", "") + assert ( + fmg.get_global_address_group("dummy", scope_member=scope_member)["status"]["code"] == 0 + ) + expected_call: list[Any] = ["/pm/config/global/obj/firewall/addrgrp/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + FortiManager.api_get.assert_called_with(*expected_call) + + # assert fmg.get_global_address_group("dummy")["status"]["code"] == 0 + # FortiManager.api_get.assert_called_with("/pm/config/global/obj/firewall/addrgrp/dummy") + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_address_groups() -> None: + """Test fmg get_global_address_groups""" + assert FortiManager("host", "", "").get_global_address_groups()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/addrgrp", timeout=10 + ) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_service(scope_member: bool) -> None: + """Test fmg get_global_service""" + fmg = FortiManager("host", "", "") + assert fmg.get_global_service("dummy", scope_member=scope_member)["status"]["code"] == 0 + expected_call: list[Any] = ["/pm/config/global/obj/firewall/service/custom/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + FortiManager.api_get.assert_called_with(*expected_call) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_services() -> None: + """Test fmg get_global_services""" + assert FortiManager("host", "", "").get_global_services()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/service/custom", timeout=10 + ) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + @pytest.mark.parametrize( + "scope_member", + ( + pytest.param(True, id="with scope member"), + pytest.param(False, id="without scope member"), + ), + ) + def test_get_global_service_group(scope_member: bool) -> None: + """Test fmg get_global_service_group""" + fmg = FortiManager("host", "", "") + assert ( + fmg.get_global_service_group("dummy", scope_member=scope_member)["status"]["code"] == 0 + ) + expected_call: list[Any] = ["/pm/config/global/obj/firewall/service/group/dummy"] + if scope_member: + expected_call.append({"option": ["scope member"]}) + FortiManager.api_get.assert_called_with(*expected_call) + + @staticmethod + @pytest.mark.usefixtures("api_get_ok") + def test_get_global_service_groups() -> None: + """Test fmg get_global_service_groups""" + assert FortiManager("host", "", "").get_global_service_groups()["status"]["code"] == 0 + FortiManager.api_get.assert_called_with( + "/pm/config/global/obj/firewall/service/group", timeout=10 + ) + @staticmethod @pytest.mark.parametrize( "response, expected", @@ -189,7 +660,7 @@ def test_get_version(response: MagicMock, expected: str, monkeypatch: MonkeyPatc MagicMock(return_value=ResponseMock(json=response, status_code=200)), ) assert FortiManager("host", "", "").get_version() == expected - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/sys/status"}], "session": ""}, @@ -218,7 +689,7 @@ def test_login(monkeypatch: MonkeyPatch) -> None: ) fmg = FortiManager("host", "user", "pass") assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -255,7 +726,7 @@ def test_login_with_session_path(monkeypatch: MonkeyPatch) -> None: fmg.hostname = "test_fmg" fmg.session_path = "tests/data" assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -295,7 +766,7 @@ def test_login_with_session_path_invalid_key(monkeypatch: MonkeyPatch) -> None: fmg.hostname = "test_fmg" fmg.session_path = "tests/data" assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -334,7 +805,7 @@ def test_login_with_session_path_not_found(temp_dir: str, monkeypatch: MonkeyPat fmg.hostname = "test_fmg_dummy" fmg.session_path = temp_dir assert fmg.login() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -364,7 +835,7 @@ def test_logout(monkeypatch: MonkeyPatch) -> None: ) fortimanager = FortiManager("host", "user", "pass") assert fortimanager.logout() == 200 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={ @@ -389,7 +860,7 @@ def test_post_single(monkeypatch: MonkeyPatch) -> None: ), ) assert not FortiManager("host", "", "").post("ADOM", {"params": [{"url": "{adom}"}]}) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -410,7 +881,7 @@ def test_post_multiple(monkeypatch: MonkeyPatch) -> None: ), ) assert not FortiManager("host", "", "").post("ADOM", [{"params": [{"url": "{adom}"}]}]) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -431,7 +902,7 @@ def test_post_single_global(monkeypatch: MonkeyPatch) -> None: ), ) assert not FortiManager("host", "", "").post("global", {"params": [{"url": "{adom}"}]}) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "global"}], "session": ""}, @@ -457,7 +928,7 @@ def test_post_response_error(monkeypatch: MonkeyPatch) -> None: assert FortiManager("host", "", "").post("ADOM", [{"params": [{"url": "{adom}"}]}]) == [ "dummy: dummy (code: 444)" ] - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -476,7 +947,7 @@ def test_post_http_error(monkeypatch: MonkeyPatch) -> None: with pytest.raises(APIError) as err: FortiManager("host", "", "").post("ADOM", [{"params": [{"url": "{adom}"}]}]) assert "HTTP/444 general API Error" in str(err.value) - requests.Session.post.assert_called_with( # type:ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"params": [{"url": "adom/ADOM"}], "session": ""}, @@ -519,7 +990,7 @@ def test_wait_for_task(monkeypatch: MonkeyPatch) -> None: messages = FortiManager("host", "", "").wait_for_task(222, 0) assert isinstance(messages, list) assert messages[0]["task_id"] == 222 - requests.Session.post.assert_called_with( # type: ignore + requests.Session.post.assert_called_with( "https://host:443/jsonrpc", headers=None, json={"method": "get", "params": [{"url": "/task/task/222/line"}], "session": ""}, From e330c2e6240d510cc53c98516dd32c3a2543b281 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Thu, 26 Sep 2024 08:54:59 +0200 Subject: [PATCH 08/10] =?UTF-8?q?=F0=9F=91=94=20Add=20dry-run=20for=20dele?= =?UTF-8?q?te=20methods?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fotoobo/fortinet/fortimanager.py | 159 ++++++++++++++++++---------- tests/fortinet/test_fortimanager.py | 114 +++++++++++++++----- 2 files changed, 190 insertions(+), 83 deletions(-) diff --git a/fotoobo/fortinet/fortimanager.py b/fotoobo/fortinet/fortimanager.py index 933ab6f..82d8679 100644 --- a/fotoobo/fortinet/fortimanager.py +++ b/fotoobo/fortinet/fortimanager.py @@ -193,71 +193,95 @@ def assign_all_objects(self, adoms: str, policy: str) -> int: return task_id - def delete_adom_address(self, adom: str, address: str) -> dict[str, Any]: + def delete_adom_address(self, adom: str, address: str, dry: bool = False) -> dict[str, Any]: """ Delete an address from an ADOM in FortiManager Args: adom: The ADOM to delete the address from address: The address to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item """ - url: str = f"/pm/config/adom/{adom}/obj/firewall/address/{address}" - result: dict[str, Any] = self.api_delete(url).json()["result"][0] + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/address/{address}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove address '%s' in ADOM '%s'", address, adom) return result - def delete_adom_address_group(self, adom: str, group: str) -> dict[str, Any]: + def delete_adom_address_group(self, adom: str, group: str, dry: bool = False) -> dict[str, Any]: """ Delete an address group from an ADOM in FortiManager Args: adom: The ADOM to delete the address group from group: The address group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item """ - url: str = f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}" - result: dict[str, Any] = self.api_delete(url).json()["result"][0] + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/addrgrp/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove address group '%s' in ADOM '%s'", group, adom) return result - def delete_adom_service(self, adom: str, service: str) -> dict[str, Any]: + def delete_adom_service(self, adom: str, service: str, dry: bool = False) -> dict[str, Any]: """ Delete a service from an ADOM in FortiManager Args: adom: The ADOM to delete the address from service: The service to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item """ - url: str = f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}" - result: dict[str, Any] = self.api_delete(url).json()["result"][0] + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/service/custom/{service}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove service '%s' in ADOM '%s'", service, adom) return result - def delete_adom_service_group(self, adom: str, group: str) -> dict[str, Any]: + def delete_adom_service_group(self, adom: str, group: str, dry: bool = False) -> dict[str, Any]: """ Delete a service group from an ADOM in FortiManager Args: adom: The ADOM to delete the address group from group: The address group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item """ - url: str = f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}" - result: dict[str, Any] = self.api_delete(url).json()["result"][0] + result: dict[str, Any] = {} + if not dry: + url: str = f"/pm/config/adom/{adom}/obj/firewall/service/group/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove service group '%s' in ADOM '%s'", group, adom) return result - def delete_global_address(self, address: str) -> dict[str, Any]: + def delete_global_address(self, address: str, dry: bool = False) -> dict[str, Any]: """ Delete a global address from FortiManager @@ -277,6 +301,7 @@ def delete_global_address(self, address: str) -> dict[str, Any]: Args: address: The global address to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item @@ -297,25 +322,32 @@ def delete_global_address(self, address: str) -> dict[str, Any]: else: used_adoms = [] - # Try to delete the address group object in every ADOM - blocked_adoms = [] - for adom in used_adoms: - if self.delete_adom_address(adom, address)["status"]["code"] not in [-3, 0]: - blocked_adoms.append(adom) + if not dry: + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if self.delete_adom_address(adom, address)["status"]["code"] not in [ + -3, + 0, + ]: + blocked_adoms.append(adom) - if blocked_adoms: - log.warning("'%s' blocked by ADOM '%s'", address, ",".join(blocked_adoms)) + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", address, ",".join(blocked_adoms)) - # Try to delete the global address object - url: str = f"/pm/config/global/obj/firewall/address/{address}" - result = self.api_delete(url).json()["result"][0] + # Try to delete the global address object + url: str = f"/pm/config/global/obj/firewall/address/{address}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global address '%s'", address) else: result = address_object return result - def delete_global_address_group(self, group: str) -> dict[str, Any]: + def delete_global_address_group(self, group: str, dry: bool = False) -> dict[str, Any]: """ Delete a global address group from FortiManager @@ -335,6 +367,7 @@ def delete_global_address_group(self, group: str) -> dict[str, Any]: Args: group: The global address group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item @@ -356,25 +389,29 @@ def delete_global_address_group(self, group: str) -> dict[str, Any]: else: used_adoms = [] - # Try to delete the address group object in every ADOM - blocked_adoms = [] - for adom in used_adoms: - if not self.delete_adom_address_group(adom, group)["status"]["code"] in [-3, 0]: - blocked_adoms.append(adom) + if not dry: + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_address_group(adom, group)["status"]["code"] in [-3, 0]: + blocked_adoms.append(adom) - if blocked_adoms: - log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) - # Try to delete the global address group object - url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" - result = self.api_delete(url).json()["result"][0] + # Try to delete the global address group object + url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global address group '%s'", group) else: result = address_group_object return result - def delete_global_service(self, service: str) -> dict[str, Any]: + def delete_global_service(self, service: str, dry: bool = False) -> dict[str, Any]: """ Delete a global service from FortiManager @@ -394,6 +431,7 @@ def delete_global_service(self, service: str) -> dict[str, Any]: Args: service: The global service to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item @@ -415,25 +453,29 @@ def delete_global_service(self, service: str) -> dict[str, Any]: else: used_adoms = [] - # Try to delete the address group object in every ADOM - blocked_adoms = [] - for adom in used_adoms: - if not self.delete_adom_service(adom, service)["status"]["code"] in [-3, 0]: - blocked_adoms.append(adom) + if not dry: + # Try to delete the address group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_service(adom, service)["status"]["code"] in [-3, 0]: + blocked_adoms.append(adom) - if blocked_adoms: - log.warning("'%s' blocked by ADOM '%s'", service, ",".join(blocked_adoms)) + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", service, ",".join(blocked_adoms)) - # Try to delete the global service object - url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" - result = self.api_delete(url).json()["result"][0] + # Try to delete the global service object + url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global service '%s'", service) else: result = service_object return result - def delete_global_service_group(self, group: str) -> dict[str, Any]: + def delete_global_service_group(self, group: str, dry: bool = False) -> dict[str, Any]: """ Delete a global service group from FortiManager @@ -453,6 +495,7 @@ def delete_global_service_group(self, group: str) -> dict[str, Any]: Args: group: The global service group to delete + dry: Set to True to enable dry-run (no changes on FortiManager) Returns: FortiManager result item @@ -474,18 +517,22 @@ def delete_global_service_group(self, group: str) -> dict[str, Any]: else: used_adoms = [] - # Try to delete the service group object in every ADOM - blocked_adoms = [] - for adom in used_adoms: - if not self.delete_adom_service_group(adom, group)["status"]["code"] in [-3, 0]: - blocked_adoms.append(adom) + if not dry: + # Try to delete the service group object in every ADOM + blocked_adoms = [] + for adom in used_adoms: + if not self.delete_adom_service_group(adom, group)["status"]["code"] in [-3, 0]: + blocked_adoms.append(adom) - if blocked_adoms: - log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + if blocked_adoms: + log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) - # Try to delete the global service group object - url: str = f"/pm/config/global/obj/firewall/service/group/{group}" - result = self.api_delete(url).json()["result"][0] + # Try to delete the global service group object + url: str = f"/pm/config/global/obj/firewall/service/group/{group}" + result = self.api_delete(url).json()["result"][0] + + else: + log.info("DRY-RUN: Would remove global service group '%s'", group) else: result = service_group_object diff --git a/tests/fortinet/test_fortimanager.py b/tests/fortinet/test_fortimanager.py index 552a1c7..f535a24 100644 --- a/tests/fortinet/test_fortimanager.py +++ b/tests/fortinet/test_fortimanager.py @@ -2,7 +2,7 @@ Test the FortiManager class """ -# pylint: disable=no-member +# pylint: disable=no-member, too-many-lines # mypy: disable-error-code=attr-defined from typing import Any from unittest.mock import MagicMock @@ -25,7 +25,7 @@ def response_mock_api_ok() -> MagicMock: """Fixture to return a mocked response for API ok""" return MagicMock( return_value=ResponseMock( - json={"result": [{"status": {"code": 0, "message": "OK"}}]}, + json={"result": [{"data": {}, "status": {"code": 0, "message": "OK"}}]}, status_code=200, ) ) @@ -66,7 +66,6 @@ def test_api_delete(monkeypatch: MonkeyPatch) -> None: ) ), ) - fmg = FortiManager("host", "", "") assert fmg.api_delete(url).json()["result"][0]["status"]["code"] == 0 requests.Session.post.assert_called_with( @@ -111,7 +110,6 @@ def test_api_get(with_params: bool, monkeypatch: MonkeyPatch) -> None: ), ) fmg = FortiManager("host", "", "") - expected_call = ( ["https://host:443/jsonrpc"], { @@ -143,10 +141,7 @@ def test_api_get(with_params: bool, monkeypatch: MonkeyPatch) -> None: else: assert fmg.api_get(url).json()["result"][0]["status"]["code"] == 0 - requests.Session.post.assert_called_with( - *expected_call[0], - **expected_call[1], - ) + requests.Session.post.assert_called_with(*expected_call[0], **expected_call[1]) @staticmethod def test_assign_all_objects(monkeypatch: MonkeyPatch) -> None: @@ -200,6 +195,7 @@ def test_assign_all_objects_http_404(monkeypatch: MonkeyPatch) -> None: ) with pytest.raises(APIError) as err: FortiManager("host", "", "").assign_all_objects("dummy_adom", "dummy_policy") + assert "HTTP/404 Resource Not Found" in str(err.value) requests.Session.post.assert_called_with( "https://host:443/jsonrpc", @@ -279,6 +275,14 @@ def test_delete_adom_address() -> None: "/pm/config/adom/dummy/obj/firewall/address/dummy" ) + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address_dry() -> None: + """Test fmg delete_adom_address with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + @staticmethod @pytest.mark.usefixtures("api_delete_ok") def test_delete_adom_address_group() -> None: @@ -289,6 +293,14 @@ def test_delete_adom_address_group() -> None: "/pm/config/adom/dummy/obj/firewall/addrgrp/dummy" ) + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_address_group_dry() -> None: + """Test fmg delete_adom_address_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_address_group("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + @staticmethod @pytest.mark.usefixtures("api_delete_ok") def test_delete_adom_service() -> None: @@ -299,6 +311,14 @@ def test_delete_adom_service() -> None: "/pm/config/adom/dummy/obj/firewall/service/custom/dummy" ) + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service_dry() -> None: + """Test fmg delete_adom_service with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + @staticmethod @pytest.mark.usefixtures("api_delete_ok") def test_delete_adom_service_group() -> None: @@ -309,6 +329,14 @@ def test_delete_adom_service_group() -> None: "/pm/config/adom/dummy/obj/firewall/service/group/dummy" ) + @staticmethod + @pytest.mark.usefixtures("api_delete_ok") + def test_delete_adom_service_group_dry() -> None: + """Test fmg delete_adom_service_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_adom_service_group("dummy", "dummy", dry=True) == {} + FortiManager.api_delete.assert_not_called() + @staticmethod @pytest.mark.usefixtures("api_delete_ok") @pytest.mark.parametrize( @@ -355,6 +383,14 @@ def test_delete_global_address( fmg = FortiManager("host", "", "") assert fmg.delete_global_address("dummy")["status"]["code"] in [0, 7] + @staticmethod + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_address_dry() -> None: + """Test fmg delete_global_address with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address("dummy", dry=True) == {} + # FortiManager.api_delete.assert_not_called() + @staticmethod @pytest.mark.usefixtures("api_delete_ok") @pytest.mark.parametrize( @@ -401,6 +437,14 @@ def test_delete_global_address_group( fmg = FortiManager("host", "", "") assert fmg.delete_global_address_group("dummy")["status"]["code"] in [0, 7] + @staticmethod + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_address_group_dry() -> None: + """Test fmg delete_global_address_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_address_group("dummy", dry=True) == {} + # FortiManager.api_delete.assert_not_called() + @staticmethod @pytest.mark.usefixtures("api_delete_ok") @pytest.mark.parametrize( @@ -448,25 +492,12 @@ def test_delete_global_service( assert fmg.delete_global_service("dummy")["status"]["code"] in [0, 7] @staticmethod - def test_get_adoms(monkeypatch: MonkeyPatch) -> None: - """Test fmg get adoms""" - monkeypatch.setattr( - "fotoobo.fortinet.fortinet.requests.Session.post", - MagicMock( - return_value=ResponseMock( - json={"result": [{"data": [{"name": "dummy"}]}]}, status_code=200 - ) - ), - ) - assert FortiManager("host", "", "").get_adoms() == [{"name": "dummy"}] - requests.Session.post.assert_called_with( - "https://host:443/jsonrpc", - headers=None, - json={"method": "get", "params": [{"url": "/dvmdb/adom"}], "session": ""}, - params=None, - timeout=3, - verify=True, - ) + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_service_dry() -> None: + """Test fmg delete_global_service with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service("dummy", dry=True) == {} + # FortiManager.api_delete.assert_not_called() @staticmethod @pytest.mark.usefixtures("api_delete_ok") @@ -514,6 +545,35 @@ def test_delete_global_service_group( fmg = FortiManager("host", "", "") assert fmg.delete_global_service_group("dummy")["status"]["code"] in [0, 7] + @staticmethod + @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") + def test_delete_global_service_group_dry() -> None: + """Test fmg delete_global_service_group with dry-run""" + fmg = FortiManager("host", "", "") + assert fmg.delete_global_service_group("dummy", dry=True) == {} + # FortiManager.api_delete.assert_not_called() + + @staticmethod + def test_get_adoms(monkeypatch: MonkeyPatch) -> None: + """Test fmg get adoms""" + monkeypatch.setattr( + "fotoobo.fortinet.fortinet.requests.Session.post", + MagicMock( + return_value=ResponseMock( + json={"result": [{"data": [{"name": "dummy"}]}]}, status_code=200 + ) + ), + ) + assert FortiManager("host", "", "").get_adoms() == [{"name": "dummy"}] + requests.Session.post.assert_called_with( + "https://host:443/jsonrpc", + headers=None, + json={"method": "get", "params": [{"url": "/dvmdb/adom"}], "session": ""}, + params=None, + timeout=3, + verify=True, + ) + @staticmethod def test_get_adoms_http_error(monkeypatch: MonkeyPatch) -> None: """Test fmg get adoms with a status != 200""" From 85b7fbfac211e8e1d7a97930a5c1b00efb90f2e5 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Thu, 26 Sep 2024 09:45:18 +0200 Subject: [PATCH 09/10] =?UTF-8?q?=E2=9C=85=20Fix=20tests=20for=20dry-run?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/fortinet/test_fortimanager.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/fortinet/test_fortimanager.py b/tests/fortinet/test_fortimanager.py index f535a24..557545f 100644 --- a/tests/fortinet/test_fortimanager.py +++ b/tests/fortinet/test_fortimanager.py @@ -20,8 +20,7 @@ class TestFortiManager: # pylint: disable=too-many-public-methods """Test the FortiManager class""" @staticmethod - @pytest.fixture - def response_mock_api_ok() -> MagicMock: + def _response_mock_api_ok() -> MagicMock: """Fixture to return a mocked response for API ok""" return MagicMock( return_value=ResponseMock( @@ -32,18 +31,20 @@ def response_mock_api_ok() -> MagicMock: @staticmethod @pytest.fixture - def api_delete_ok(response_mock_api_ok: MagicMock, monkeypatch: MonkeyPatch) -> None: + def api_delete_ok(monkeypatch: MonkeyPatch) -> None: """Fixture to patch the FortiManager.api_delete() method""" monkeypatch.setattr( - "fotoobo.fortinet.fortimanager.FortiManager.api_delete", response_mock_api_ok + "fotoobo.fortinet.fortimanager.FortiManager.api_delete", + TestFortiManager._response_mock_api_ok(), ) @staticmethod @pytest.fixture - def api_get_ok(response_mock_api_ok: MagicMock, monkeypatch: MonkeyPatch) -> None: + def api_get_ok(monkeypatch: MonkeyPatch) -> None: """Fixture to patch the FortiManager.api_get() method""" monkeypatch.setattr( - "fotoobo.fortinet.fortimanager.FortiManager.api_get", response_mock_api_ok + "fotoobo.fortinet.fortimanager.FortiManager.api_get", + TestFortiManager._response_mock_api_ok(), ) @staticmethod @@ -389,7 +390,7 @@ def test_delete_global_address_dry() -> None: """Test fmg delete_global_address with dry-run""" fmg = FortiManager("host", "", "") assert fmg.delete_global_address("dummy", dry=True) == {} - # FortiManager.api_delete.assert_not_called() + FortiManager.api_delete.assert_not_called() @staticmethod @pytest.mark.usefixtures("api_delete_ok") @@ -443,7 +444,7 @@ def test_delete_global_address_group_dry() -> None: """Test fmg delete_global_address_group with dry-run""" fmg = FortiManager("host", "", "") assert fmg.delete_global_address_group("dummy", dry=True) == {} - # FortiManager.api_delete.assert_not_called() + FortiManager.api_delete.assert_not_called() @staticmethod @pytest.mark.usefixtures("api_delete_ok") @@ -497,7 +498,7 @@ def test_delete_global_service_dry() -> None: """Test fmg delete_global_service with dry-run""" fmg = FortiManager("host", "", "") assert fmg.delete_global_service("dummy", dry=True) == {} - # FortiManager.api_delete.assert_not_called() + FortiManager.api_delete.assert_not_called() @staticmethod @pytest.mark.usefixtures("api_delete_ok") @@ -551,7 +552,7 @@ def test_delete_global_service_group_dry() -> None: """Test fmg delete_global_service_group with dry-run""" fmg = FortiManager("host", "", "") assert fmg.delete_global_service_group("dummy", dry=True) == {} - # FortiManager.api_delete.assert_not_called() + FortiManager.api_delete.assert_not_called() @staticmethod def test_get_adoms(monkeypatch: MonkeyPatch) -> None: From 5c8c7e827ede22f1849be2cf449d67c3b1896d19 Mon Sep 17 00:00:00 2001 From: Patrik Spiess Date: Thu, 26 Sep 2024 11:18:58 +0200 Subject: [PATCH 10/10] =?UTF-8?q?=F0=9F=91=94=20Optimize=20delete=5Fglobal?= =?UTF-8?q?=5Fxyz()=20when=20ADOM=20blocks=20deletion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fotoobo/fortinet/fortimanager.py | 48 +++++++++++++++++++++-------- tests/fortinet/test_fortimanager.py | 8 ++--- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/fotoobo/fortinet/fortimanager.py b/fotoobo/fortinet/fortimanager.py index 82d8679..1b06990 100644 --- a/fotoobo/fortinet/fortimanager.py +++ b/fotoobo/fortinet/fortimanager.py @@ -334,10 +334,16 @@ def delete_global_address(self, address: str, dry: bool = False) -> dict[str, An if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", address, ",".join(blocked_adoms)) + result = address_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } - # Try to delete the global address object - url: str = f"/pm/config/global/obj/firewall/address/{address}" - result = self.api_delete(url).json()["result"][0] + else: + # Try to delete the global address object + url: str = f"/pm/config/global/obj/firewall/address/{address}" + result = self.api_delete(url).json()["result"][0] else: log.info("DRY-RUN: Would remove global address '%s'", address) @@ -398,10 +404,16 @@ def delete_global_address_group(self, group: str, dry: bool = False) -> dict[str if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + result = address_group_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } - # Try to delete the global address group object - url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" - result = self.api_delete(url).json()["result"][0] + else: + # Try to delete the global address group object + url: str = f"/pm/config/global/obj/firewall/addrgrp/{group}" + result = self.api_delete(url).json()["result"][0] else: log.info("DRY-RUN: Would remove global address group '%s'", group) @@ -462,10 +474,16 @@ def delete_global_service(self, service: str, dry: bool = False) -> dict[str, An if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", service, ",".join(blocked_adoms)) + result = service_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } - # Try to delete the global service object - url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" - result = self.api_delete(url).json()["result"][0] + else: + # Try to delete the global service object + url: str = f"/pm/config/global/obj/firewall/service/custom/{service}" + result = self.api_delete(url).json()["result"][0] else: log.info("DRY-RUN: Would remove global service '%s'", service) @@ -526,10 +544,16 @@ def delete_global_service_group(self, group: str, dry: bool = False) -> dict[str if blocked_adoms: log.warning("'%s' blocked by ADOM '%s'", group, ",".join(blocked_adoms)) + result = service_group_object + result["status"] = { + "code": 601, + "message": f"Used in ADOM {','.join(blocked_adoms)}", + } - # Try to delete the global service group object - url: str = f"/pm/config/global/obj/firewall/service/group/{group}" - result = self.api_delete(url).json()["result"][0] + else: + # Try to delete the global service group object + url: str = f"/pm/config/global/obj/firewall/service/group/{group}" + result = self.api_delete(url).json()["result"][0] else: log.info("DRY-RUN: Would remove global service group '%s'", group) diff --git a/tests/fortinet/test_fortimanager.py b/tests/fortinet/test_fortimanager.py index 557545f..812c471 100644 --- a/tests/fortinet/test_fortimanager.py +++ b/tests/fortinet/test_fortimanager.py @@ -382,7 +382,7 @@ def test_delete_global_address( MagicMock(return_value={"status": delete_adom_address_status}), ) fmg = FortiManager("host", "", "") - assert fmg.delete_global_address("dummy")["status"]["code"] in [0, 7] + assert fmg.delete_global_address("dummy")["status"]["code"] in [0, 7, 601] @staticmethod @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") @@ -436,7 +436,7 @@ def test_delete_global_address_group( MagicMock(return_value={"status": delete_adom_address_group_status}), ) fmg = FortiManager("host", "", "") - assert fmg.delete_global_address_group("dummy")["status"]["code"] in [0, 7] + assert fmg.delete_global_address_group("dummy")["status"]["code"] in [0, 7, 601] @staticmethod @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") @@ -490,7 +490,7 @@ def test_delete_global_service( MagicMock(return_value={"status": delete_adom_service_status}), ) fmg = FortiManager("host", "", "") - assert fmg.delete_global_service("dummy")["status"]["code"] in [0, 7] + assert fmg.delete_global_service("dummy")["status"]["code"] in [0, 7, 601] @staticmethod @pytest.mark.usefixtures("api_get_ok", "api_delete_ok") @@ -544,7 +544,7 @@ def test_delete_global_service_group( MagicMock(return_value={"status": delete_adom_service_group_status}), ) fmg = FortiManager("host", "", "") - assert fmg.delete_global_service_group("dummy")["status"]["code"] in [0, 7] + assert fmg.delete_global_service_group("dummy")["status"]["code"] in [0, 7, 601] @staticmethod @pytest.mark.usefixtures("api_get_ok", "api_delete_ok")