Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions util-scripts/acs-correlation-example/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ FROM registry.access.redhat.com/ubi9:9.1.0-1782
#Change User
USER 0


# Install the required software
RUN dnf install -y wget yum-utils make gcc openssl-devel bzip2-devel libffi-devel zlib-devel && \
wget https://www.python.org/ftp/python/3.10.8/Python-3.10.8.tgz && \
tar xzf Python-3.10.8.tgz && \
cd Python-3.10.8 && \
wget https://www.python.org/ftp/python/3.13.2/Python-3.13.2.tgz && \
tar xzf Python-3.13.2.tgz && \
cd Python-3.13.2 && \
./configure --with-system-ffi --with-computed-gotos --enable-loadable-sqlite-extensions && \
make altinstall && \
cd .. && \
rm Python-3.10.8.tgz
rm Python-3.13.2.tgz

# # Install pip
# RUN curl -O https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py && python3 get-pip.py
Expand All @@ -30,16 +31,13 @@ COPY ./ ./app
WORKDIR ./app

#Install App Dependecies
RUN pip3.10 install -r requirements.txt && pip3.10 install --upgrade pip
RUN pip3.13 install -r requirements.txt && pip3.13 install --upgrade pip

#Expose Ports
EXPOSE 8080/tcp

#Change Permissions to allow not root-user work
RUN chmod -R g+rw ./

#Change User
USER 1001

#ENTRY
ENTRYPOINT python3.10 app.py
ENTRYPOINT python3.13 app.py
27 changes: 25 additions & 2 deletions util-scripts/acs-correlation-example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,30 @@
podman build -t quick_acs_app .
```

- Copy and update endpoint list file and token
```bash
export CENTRAL_API_URL="https://console-openshift-console.apps.cluster1.sandbox568.opentlc.com"
export MAIN_ACS_TOKEN=""
export ENDPOINT_DIR=$(mktemp -d -t ACS_Endpoint_List_XXXX )
export OUTPUT_FILE_DIR=$(mktemp -d -t ACS_Output_DIR_XXXX )
cat ./endpoint_list.json | envsubst > ${ENDPOINT_DIR}/endpoint_list.json
```

- Run Container
```bash
podman run --env $MAIN_ACS_TOKEN --env OUTPUT_FOLDER=/output -v /tmp/output:/output:Z localhost/quick_acs_app
```
podman run --name acs_correlator \
--replace \
--userns=keep-id \
--env MAIN_ACS_TOKEN=${MAIN_ACS_TOKEN} \
--env ENDPOINT_LIST_JSON_PATH=/endpoint/endpoint_list.json \
--env OUTPUT_FOLDER=/output \
-v ${OUTPUT_FILE_DIR}:/output:z \
-v ${ENDPOINT_DIR}:/endpoint:z \
localhost/quick_acs_app
```
- If All goes well sample output should get written out to ${OUTPUT_FILE_DIR}

- TODO:
- Example uses the [Pydantic Library to create models to export out objects relationships](https://docs.pydantic.dev/1.10/usage/exporting_models/#advanced-include-and-exclude).
- The sample relationships used for output can be seen in the [app.py](util-scripts/acs-correlation-example/app.py) on line 866
- Will eventually extend this example to get custom relationships and export out a file.
89 changes: 60 additions & 29 deletions util-scripts/acs-correlation-example/acs_request.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from httpx import AsyncClient,HTTPError,NetworkError,RequestError,TimeoutException,ConnectTimeout,InvalidURL,ProtocolError,ConnectError
import os
from httpx._config import SSLConfig
#from httpx._config import SSLConfig
from logging import getLogger, config
import typing as t
import asyncio
Expand Down Expand Up @@ -36,24 +36,35 @@ def __next__(self):
return self.start
else:
raise StopIteration


async def make_request(full_url_path,insecure:bool=False,headers:dict=None,params:dict=None,offset=None) -> dict:

async def make_request(
client,
full_url_path,
insecure: bool = False,
headers: dict = None,
params: dict = None,
offset = None
) -> dict:
"""Make a request to the API"""
error=None
response=None
local_client=False

#TODO: Clean offset and params
if offset is not None:
params["pagination.offset"] = offset

if client is None:
logger.debug(f"make_request - client is None,will create a new client")
client = AsyncClient(verify=insecure)
local_client=True

try:
async with AsyncClient(verify=insecure) as client:
response = await client.get(
f"{full_url_path}",headers=headers,params=params
)
logger.debug(f"request_processing - attempted request")
response.raise_for_status()
response = await client.get(
f"{full_url_path}",headers=headers,params=params
)
logger.debug(f"request_processing - attempted request")
response.raise_for_status()
except ConnectTimeout as timeout_err:
logger.error(f" Connect Timeout error occurred: {timeout_err}")
error=f"Connect Timeout error occurred: {timeout_err}"
Expand All @@ -75,14 +86,18 @@ async def make_request(full_url_path,insecure:bool=False,headers:dict=None,param
except IOError as e:
logger.error("I/O error({0}): {1}".format(e.errno, e.strerror))
except BaseException as e:
print("Something serious has occured")
error=f"Something Seriously unexpected has occured"
print("Something serious has occurred")
error=f"Something Seriously unexpected has occurred"
finally:
if local_client:
await client.aclose()

return {"response_object":response,"error_object":error}

async def request_processing_pagination(full_url_path,insecure:bool=False,headers:dict=None,params:dict=None):
async def request_processing_pagination(client,full_url_path,insecure:bool=False,headers:dict=None,params:dict=None):
"""
Args:
client (AsyncClient): httpx AsyncClient object for making requests
full_url_path (_type_): ACS URL with path for the request
insecure (bool, optional): Make an insecure Request, Should be set from verify_endpoint_ssl on endpoint object
headers (dict, optional): Headers for Request to ACS. Defaults to None.
Expand Down Expand Up @@ -116,57 +131,59 @@ async def request_processing_pagination(full_url_path,insecure:bool=False,header
response_dict={"response_object":[],"error_object":None}
for offset in PaginationCounter(total_expected_count,params["pagination.limit"]):
params.update({"pagination.offset":offset})
temp_dict = await make_request(full_url_path,insecure,headers,params)
temp_dict = await make_request(client,full_url_path,insecure,headers,params)

if temp_dict["error_object"] is not None:
return temp_dict["error_object"]
else:
response_dict["response_object"].append(temp_dict["response_object"])
return response_dict

async def request_processing(full_url_path,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
async def request_processing(client,full_url_path,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
"""Send the Request and process the response"""
logger.debug(f"request_processing -start: url:{full_url_path} verify_ssl:{insecure}")
error=None
response_dict={"response_object":[],"error_object":None}

if params is None:
response_dict = await make_request(full_url_path,insecure,headers,params)
response_dict = await make_request(client,full_url_path,insecure,headers,params)
else:
if "pagination.limit" in params and "total_expected_count" in params:
response_dict = await request_processing_pagination(full_url_path,insecure,headers,params)
response_dict = await request_processing_pagination(client,full_url_path,insecure,headers,params)
else:
response_dict = await make_request(full_url_path,insecure,headers,params)
response_dict = await make_request(client,full_url_path,insecure,headers,params)

return response_dict

async def get_acs_alert(url,alert_id: str,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
async def get_acs_alert(client,url,alert_id: str,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
"""Get ACS alert from the API"""
if alert_id is not None:
logger.debug(f"get_acs_alert -start: url:{url} id:{alert_id} verify_ssl:{insecure}")
rhacs_alert_url_path=f"{url}/v1/alerts/{alert_id}"
else:
logger.debug(f"get_acs_alert -start: url:{url} verify_ssl:{insecure}")
rhacs_alert_url_path=f"{url}/v1/alerts"
response_dict = await request_processing(rhacs_alert_url_path,insecure,headers,params)
response_dict = await request_processing(client,rhacs_alert_url_path,insecure,headers,params)
logger.debug(f"get_acs_alert - complete")
return response_dict

async def get_policy(url,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
async def get_policy(client,url,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
"""Get Policy from the API"""
logger.debug(f"get_policy -start: url:{url} verify_ssl:{insecure}")
rhacs_policy_url_path=f"{url}/v1/policies"
response_dict = await request_processing(rhacs_policy_url_path,insecure,headers,params)
response_dict = await request_processing(client,rhacs_policy_url_path,insecure,headers,params)
logger.debug(f"get_policy - complete")
return response_dict

async def get_alert_count(url,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
async def get_alert_count(client,url,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
"""Get Alert Count"""
logger.debug(f"get_policy -start: url:{url} verify_ssl:{insecure}")
rhacs_policy_url_path=f"{url}/v1/alertscount"
response_dict = await request_processing(rhacs_policy_url_path,insecure,headers,params)
response_dict = await request_processing(client,rhacs_policy_url_path,insecure,headers,params)
logger.debug(f"get_policy - complete")
return response_dict

async def get_acs_deployment(url,deployment_id:str, insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
async def get_acs_deployment(client,url,deployment_id:str, insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
"""Get Deployment from the API"""
if deployment_id is not None:
logger.debug(f"get_acs_alert -start: url:{url} id:{deployment_id} verify_ssl:{insecure}")
Expand All @@ -176,14 +193,28 @@ async def get_acs_deployment(url,deployment_id:str, insecure:bool=False,headers:
rhacs_deployment_url_path=f"{url}/v1/deployments"

logger.debug(f"get_deployment -start: url:{url}")
response_dict = await request_processing(rhacs_deployment_url_path,insecure,headers,params)
response_dict = await request_processing(client,rhacs_deployment_url_path,insecure,headers,params)
logger.debug(f"get_deployment - complete")
return response_dict

async def get_rhacs_health(url,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
"""Get health from the API"""
async def get_rhacs_health(client,url,insecure:bool=False,headers:dict=None,params:dict=None) -> dict:
"""
Asynchronously retrieves the health status of RHACS (Red Hat Advanced Cluster Security).
Args:
client: The HTTP client to use for making the request.
url (str): The base URL for the RHACS instance.
insecure (bool, optional): Whether to ignore SSL certificate verification. Defaults to False.
headers (dict, optional): Additional headers to include in the request. Defaults to None.
params (dict, optional): Additional parameters to include in the request. Defaults to None.
Returns:
dict: The response dictionary containing the health status of RHACS.
Logs:
Debug logs indicating the start and completion of the health check request.
"""


logger.debug(f"get_rhacs_health -start: url:{url}")
rhacs_health_url_path=f"{url}/v1/ping"
response_dict = await request_processing(rhacs_health_url_path,insecure,headers,params)
response_dict = await request_processing(client,rhacs_health_url_path,insecure,headers,params)
logger.debug(f"get_rhacs_health - complete")
return response_dict
Loading