From 37e8870485860f2443ae5bb9e6460991172ac46e Mon Sep 17 00:00:00 2001 From: monkumar Date: Thu, 19 Jun 2025 14:36:45 +0530 Subject: [PATCH 01/12] pgp-encryption --- CyberSource/api/__init__.py | 1 + CyberSource/api/batch_upload_with_mtls_api.py | 208 ++++++++++++ .../utilities/pgpBatchUpload/__init__.py | 0 .../pgpBatchUpload/mutual_auth_upload.py | 261 +++++++++++++++ .../pgpBatchUpload/pgp_encryption.py | 303 ++++++++++++++++++ .../__init__api.mustache | 1 + requirements.txt | 1 + 7 files changed, 775 insertions(+) create mode 100644 CyberSource/api/batch_upload_with_mtls_api.py create mode 100644 CyberSource/utilities/pgpBatchUpload/__init__.py create mode 100644 CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py create mode 100644 CyberSource/utilities/pgpBatchUpload/pgp_encryption.py diff --git a/CyberSource/api/__init__.py b/CyberSource/api/__init__.py index 13466179..10c85077 100644 --- a/CyberSource/api/__init__.py +++ b/CyberSource/api/__init__.py @@ -63,3 +63,4 @@ from .reversal_api import ReversalApi from .taxes_api import TaxesApi from .void_api import VoidApi +from .batch_upload_with_mtls_api import BatchUploadWithMTLSApi diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py new file mode 100644 index 00000000..db405a95 --- /dev/null +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -0,0 +1,208 @@ +""" +Batch Upload with mTLS API for CyberSource. + +This module provides functionality for batch uploads to CyberSource using mTLS authentication, +supporting both file-based and in-memory operations with various configuration options. +""" + +from pathlib import Path +from typing import Optional + +import CyberSource.logging.log_factory as LogFactory +from CyberSource.utilities.pgpBatchUpload.mutual_auth_upload import MutualAuthUpload +from CyberSource.utilities.pgpBatchUpload.pgp_encryption import PgpEncryption + + +def _validate_paths(paths: list) -> None: + """ + Validate that all specified paths exist. + + Args: + paths: List of tuples containing (path, description) + + Raises: + FileNotFoundError: If any of the paths don't exist + """ + for path, description in paths: + if path is None: + continue # Skip validation for None paths + if not path: + raise ValueError(f"{description} path is required") + if not Path(path).exists(): + raise FileNotFoundError(f"{description} not found: {path}") + + +class BatchUploadWithMTLSApi: + """ + A class for handling batch uploads to CyberSource using mTLS authentication. + + This class provides methods for encrypting and uploading batch files to CyberSource + using PGP encryption and mutual TLS authentication. It supports various authentication + methods including separate key/cert files. + + Attributes: + logger: Logger instance for logging operations and errors + pgp_encryption: PgpEncryption instance for handling PGP encryption + mutual_auth_upload: MutualAuthUpload instance for handling mTLS uploads + """ + + _end_point = "/pts/v1/transaction-batch-upload" + + def __init__(self, log_config=None): + """ + Initialize the BatchUploadWithMTLSApi. + + Args: + log_config: Optional configuration for the logger + """ + self.logger = LogFactory.setup_logger(self.__class__.__name__, log_config) + self.pgp_encryption = PgpEncryption(log_config) + self.mutual_auth_upload = MutualAuthUpload(log_config) + + def upload_batch_api_with_separate_key_and_cert_file( + self, + input_file: str, + environment_hostname: str, + pgp_encryption_public_key_path: str, + client_cert_path: str, + client_key_path: str, + server_trust_cert_path: str = None, + client_cert_password: Optional[str] = None, + verify_ssl: bool = True, + ) -> None: + """ + Upload a batch file using separate key and certificate files. + + This method encrypts the input file using PGP and uploads it to CyberSource + using mutual TLS authentication with separate key and certificate files. + + Args: + input_file: Path to the file to upload + environment_hostname: CyberSource environment hostname + pgp_encryption_public_key_path: Path to the PGP public key file + client_cert_path: Path to the client certificate file + client_key_path: Path to the client private key file + server_trust_cert_path: Path to the server trust certificate file (optional) + client_cert_password: Optional password for the client certificate file + verify_ssl: Whether to verify SSL certificates (default: True). Set to False only for development purposes. + + Raises: + ValueError: If required parameters are missing or invalid + FileNotFoundError: If any of the required files don't exist + requests.exceptions.RequestException: If there's an error during the upload process + """ + try: + # Step 1: Create endpoint URL + endpoint_url = self.get_base_url(environment_hostname) + self._end_point + + # Step 2: Validations + _validate_paths( + [ + (input_file, "Input file"), + (pgp_encryption_public_key_path, "PGP public key"), + (client_cert_path, "Client certificate"), + (client_key_path, "Client private key"), + (server_trust_cert_path, "Server trust certificate"), + ] + ) + + # Validate file size (maximum 75 MB) + file_size = Path(input_file).stat().st_size + max_size_bytes = 75 * 1024 * 1024 # 75 MB in bytes + if file_size > max_size_bytes: + error_msg = f"Input file size ({file_size} bytes) exceeds the maximum allowed size of 75 MB ({max_size_bytes} bytes)" + if self.logger is not None: + self.logger.error(error_msg) + raise ValueError(error_msg) + + # Validate file extension (.csv) + file_extension = Path(input_file).suffix.lower() + if file_extension != ".csv": + error_msg = ( + f"Input file must have a .csv extension, but got '{file_extension}'" + ) + if self.logger is not None: + self.logger.error(error_msg) + raise ValueError(error_msg) + + # Step 3: PGP encryption + encrypted_pgp_bytes = self.pgp_encryption.handle_encrypt_operation( + input_file, pgp_encryption_public_key_path + ) + + # Step 4: Upload the encrypted PGP file using mTLS + # Replace the file extension with .pgp + file_name = Path(input_file).stem + ".pgp" + + # Log a warning if SSL verification is disabled + if not verify_ssl and self.logger is not None: + self.logger.warning( + "SSL certificate verification is disabled. This should only be used in development environments." + ) + + self.mutual_auth_upload.handle_upload_operation_using_private_key_and_certs( + encrypted_pgp_bytes=encrypted_pgp_bytes, + endpoint_url=endpoint_url, + file_name=file_name, + client_private_key_path=client_key_path, + client_cert_path=client_cert_path, + server_trust_cert_path=server_trust_cert_path, + client_cert_password=client_cert_password, + verify_ssl=verify_ssl, + ) + if self.logger is not None: + self.logger.info("Batch file uploaded successfully") + except ValueError as e: + if self.logger is not None: + self.logger.error(f"Validation error: {str(e)}") + raise + except FileNotFoundError as e: + if self.logger is not None: + self.logger.error(f"File not found: {str(e)}") + raise + except Exception as e: + if self.logger is not None: + self.logger.error( + f"Error in upload_batch_api_with_separate_key_and_cert_file: {str(e)}" + ) + raise + + @staticmethod + def get_base_url(environment_hostname: str) -> str: + """ + Get the base URL from the environment hostname. + + Args: + environment_hostname: CyberSource environment hostname + + Returns: + str: The base URL with https:// prefix if not already present + """ + if not environment_hostname: + raise ValueError("Environment hostname is required") + + base_url = environment_hostname.strip() + if not base_url.startswith("https://"): + base_url = "https://" + base_url + return base_url + + def __enter__(self): + """ + Enter the context manager. + + Returns: + BatchUploadWithMTLSApi: The instance itself + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Exit the context manager. + + Args: + exc_type: Exception type if an exception was raised + exc_val: Exception value if an exception was raised + exc_tb: Exception traceback if an exception was raised + """ + # Currently no resources to clean up + pass diff --git a/CyberSource/utilities/pgpBatchUpload/__init__.py b/CyberSource/utilities/pgpBatchUpload/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py new file mode 100644 index 00000000..7defe965 --- /dev/null +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -0,0 +1,261 @@ +import ssl +import uuid + +import certifi +import urllib3 +from urllib3 import BaseHTTPResponse +from urllib3.exceptions import HTTPError + +import CyberSource.logging.log_factory as LogFactory +from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters + + +def _prepare_files(encrypted_pgp_bytes: bytes, file_name: str) -> tuple: + """ + Prepare the file data for multipart/form-data upload with urllib3. + + :param encrypted_pgp_bytes: Encrypted PGP file content + :param file_name: Name of the file to be uploaded + :return: Tuple containing the file field name, filename, file content, and content type + """ + return "file", file_name, encrypted_pgp_bytes, "application/octet-stream" + + +class MutualAuthUpload: + def __init__(self, log_config=None): + self.logger = LogFactory.setup_logger(self.__class__.__name__, log_config) + + def handle_upload_operation_with_certificate( + self, + encrypted_pgp_bytes: bytes, + endpoint_url: str, + file_name: str, + cert_file_path: str, + cert_file_password: str, + server_trust_cert_path: str = None, + verify_ssl: bool = True, + ) -> None: + """ + Handle upload operation using certificate file. + + :param encrypted_pgp_bytes: Encrypted PGP file content + :param endpoint_url: URL to upload the file + :param file_name: Name of the file to be uploaded + :param cert_file_path: Path to the certificate file + :param cert_file_password: Password for the certificate file + :param server_trust_cert_path: Path to the server trust certificate (optional) + :param verify_ssl: Whether to verify SSL certificates (default: True) + :raises Exception: If there's an error during the upload process + """ + try: + self.logger.info("Handling upload operation with certificate") + + if not verify_ssl and self.logger is not None: + self.logger.warning( + "SSL verification is disabled. This should only be used in development environments." + ) + + self.upload_file( + encrypted_pgp_bytes, + endpoint_url, + file_name, + client_cert_path=cert_file_path, + client_key_path=cert_file_path, + ca_cert_path=server_trust_cert_path, + cert_password=cert_file_password, + verify_ssl=verify_ssl, + ) + except HTTPError as e: + if self.logger is not None: + self.logger.error( + f"Error in handle_upload_operation_with_certificate: {str(e)}" + ) + raise + + def handle_upload_operation_using_private_key_and_certs( + self, + encrypted_pgp_bytes: bytes, + endpoint_url: str, + file_name: str, + client_private_key_path: str, + client_cert_path: str, + server_trust_cert_path: str = None, + client_cert_password: str = None, + verify_ssl: bool = True, + ) -> None: + """ + Handle upload operation using private key and certificates. + + :param encrypted_pgp_bytes: Encrypted PGP file content + :param endpoint_url: URL to upload the file + :param file_name: Name of the file to be uploaded + :param client_private_key_path: Path to the client private key file + :param client_cert_path: Path to the client certificate file + :param server_trust_cert_path: Path to the server trust certificate file + :param client_cert_password: Password for the client certificate file + :param verify_ssl: Whether to verify SSL certificates (default: True) + :raises urllib3.exceptions.HTTPError: If there's an error during the upload process + """ + try: + if self.logger is not None: + self.logger.info( + "Handling upload operation with private key and certificates" + ) + if not verify_ssl and self.logger is not None: + self.logger.warning( + "SSL verification is disabled. This should only be used in development environments." + ) + self.upload_file( + encrypted_pgp_bytes, + endpoint_url, + file_name, + client_cert_path=client_cert_path, + client_key_path=client_private_key_path, + ca_cert_path=server_trust_cert_path, + cert_password=client_cert_password, + verify_ssl=verify_ssl, + ) + except HTTPError as e: + if self.logger is not None: + self.logger.error( + f"Error in handle_upload_operation_using_private_key_and_certs: {str(e)}" + ) + raise + + def upload_file( + self, + encrypted_pgp_bytes: bytes, + endpoint_url: str, + file_name: str, + client_cert_path: str = None, + client_key_path: str = None, + ca_cert_path: str = None, + cert_password: str = None, + verify_ssl: bool = True, + ) -> None: + """ + Upload encrypted PGP file to the specified endpoint. + + :param encrypted_pgp_bytes: Encrypted PGP file content + :param endpoint_url: URL to upload the file + :param file_name: Name of the file to be uploaded + :param client_cert_path: Path to the client certificate file + :param client_key_path: Path to the client private key file + :param ca_cert_path: Path to the CA certificate file + :param cert_password: Password for the certificate file + :param verify_ssl: Whether to verify SSL certificates (default: True) + :raises urllib3.exceptions.HTTPError: If there's an error during the upload process + """ + try: + if self.logger is not None: + self.logger.info("Starting file upload process") + headers = self._create_headers() + file_tuple = _prepare_files(encrypted_pgp_bytes, file_name) + + response = self._send_request( + endpoint_url, + headers, + file_tuple, + client_cert_path, + client_key_path, + ca_cert_path, + cert_password, + verify_ssl, + ) + self._handle_response(response) + except HTTPError as e: + if self.logger is not None: + self.logger.error(f"HTTPError in upload_file: {str(e)}") + raise + + def _create_headers(self) -> dict: + correlation_id = str(uuid.uuid4()) + if self.logger is not None: + self.logger.info(f"Created correlation ID: {correlation_id}") + return {GlobalLabelParameters.V_C_CORRELATION_ID: correlation_id} + + def _send_request( + self, + endpoint_url: str, + headers: dict, + file_tuple: tuple, + client_cert_path: str, + client_key_path: str, + ca_cert_path: str, + cert_password: str = None, + verify_ssl: bool = True, + ) -> BaseHTTPResponse: + """ + Send HTTP request with the file and certificates. + + :param endpoint_url: URL to upload the file + :param headers: HTTP headers + :param file_tuple: Tuple containing file information + :param client_cert_path: Path to the client certificate file + :param client_key_path: Path to the client private key file + :param ca_cert_path: Path to the CA certificate file + :param cert_password: Password for the certificate file + :param verify_ssl: Whether to verify SSL certificates (default: True) + :return: HTTP response + """ + if self.logger is not None: + self.logger.info(f"Sending HTTP POST request to URL: {endpoint_url}") + + if verify_ssl: + # Create SSL context with default CA certificates + ssl_context = ssl.create_default_context(cafile=certifi.where()) + + # If server trust cert path is provided, add it to the context + if ca_cert_path: + ssl_context.load_verify_locations(cafile=ca_cert_path) + + # Create a pool manager with the SSL context + http = urllib3.PoolManager( + cert_file=client_cert_path, + key_file=client_key_path, + key_password=cert_password, + ssl_context=ssl_context, + ) + else: + # Create a pool manager with SSL verification disabled + if self.logger is not None: + self.logger.warning("SSL certificate verification is disabled") + + # Use direct parameters to disable SSL verification + http = urllib3.PoolManager( + cert_file=client_cert_path, + key_file=client_key_path, + key_password=cert_password, + cert_reqs=ssl.CERT_NONE, + assert_hostname=False, + ) + + # Prepare multipart form data + field_name, filename, file_data, content_type = file_tuple + + # Send the request + return http.request( + "POST", + endpoint_url, + headers=headers, + fields={field_name: (filename, file_data, content_type)}, + ) + + def _handle_response(self, response: urllib3.HTTPResponse) -> None: + """ + Handle the HTTP response. + + :param response: HTTP response + :raises urllib3.exceptions.HTTPError: If the response status is not 201 + """ + status_code = response.status + self.logger.info(f"Upload completed. Response status code: {status_code}") + + if status_code == 201: + if self.logger is not None: + self.logger.info("File uploaded successfully") + else: + error_message = f"File upload failed. Status code: {status_code}, body: {response.data.decode('utf-8')}" + if self.logger is not None: + self.logger.error(error_message) + raise HTTPError(error_message) diff --git a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py new file mode 100644 index 00000000..7227b3b7 --- /dev/null +++ b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py @@ -0,0 +1,303 @@ +""" +PGP Encryption Utility for CyberSource Batch Upload. + +This module provides functionality for PGP encryption operations, +supporting both file-based and in-memory operations with various configuration options. +""" + +import os +from datetime import datetime +from pathlib import Path + +import pgpy + +import CyberSource.logging.log_factory as LogFactory + + +class PgpEncryption: + """ + A class for handling PGP encryption operations. + + This class provides methods for encrypting data using PGP, + with support for both file-based and in-memory operations. + + Attributes: + logger: Logger instance for logging operations and errors + """ + + def __init__(self, log_config=None): + """ + Initialize the PGP encryption utility. + + Args: + log_config: Optional configuration for the logger + """ + self.logger = LogFactory.setup_logger(self.__class__.__name__, log_config) + + def handle_encrypt_operation( + self, + input_file: str, + pgp_public_key: str, + ) -> bytes: + """ + Handle the encryption operation for a file using a PGP public key. + + Args: + input_file: Path to the file to encrypt + pgp_public_key: Path to the PGP public key file + + Returns: + bytes: The encrypted data as bytes + + Raises: + ValueError: If required parameters are missing + FileNotFoundError: If input file or public key file doesn't exist + pgpy.errors.PGPError: If there's an error in PGP operations + IOError: If there's an error reading or writing files + """ + try: + if input_file is None or pgp_public_key is None: + raise ValueError("Missing required options for encrypt operation") + + self.validate_path(input_file, "Input file") + self.validate_path(pgp_public_key, "Public key") + input_file_name = os.path.basename(input_file) + + public_key = self.load_public_key(pgp_public_key) + self._validate_key_for_encryption(public_key) + if self.logger is not None: + self.logger.info(f"Starting file encryption: {input_file_name}") + encrypted_pgp_bytes = self.encrypt_file(input_file, public_key) + if self.logger is not None: + self.logger.info(f"File encrypted successfully: {input_file_name}") + + return encrypted_pgp_bytes + except ValueError as e: + if self.logger is not None: + self.logger.error(f"Value error in handle_encrypt_operation: {str(e)}") + raise + except FileNotFoundError as e: + if self.logger is not None: + self.logger.error( + f"File not found in handle_encrypt_operation: {str(e)}" + ) + raise + except pgpy.errors.PGPError as e: + if self.logger is not None: + self.logger.error(f"PGP error in handle_encrypt_operation: {str(e)}") + raise + except IOError as e: + if self.logger is not None: + self.logger.error(f"I/O error in handle_encrypt_operation: {str(e)}") + raise + except Exception as e: + if self.logger is not None: + self.logger.error( + f"Unexpected error in handle_encrypt_operation: {str(e)}" + ) + raise + + def encrypt_file( + self, + input_file: str, + public_key: pgpy.PGPKey, + ) -> bytes: + """ + Encrypt a file using the provided PGP public key. + + Args: + input_file: Path to the file to encrypt + public_key: PGP public key to use for encryption + + Returns: + bytes: The encrypted data as bytes + + Raises: + IOError: If there's an error reading the file + pgpy.errors.PGPError: If there's an error in PGP operations + """ + try: + with open(input_file, "rb") as f: + file_data = f.read() + return self.encrypt_data(file_data, public_key) + except IOError as e: + if self.logger is not None: + self.logger.error(f"I/O error in encrypt_file: {str(e)}") + raise + except pgpy.errors.PGPError as e: + if self.logger is not None: + self.logger.error(f"PGP error in encrypt_file: {str(e)}") + raise + except Exception as e: + if self.logger is not None: + self.logger.error(f"Unexpected error in encrypt_file: {str(e)}") + raise + + def encrypt_data( + self, + data: bytes, + public_key: pgpy.PGPKey, + ) -> bytes: + """ + Encrypt data using the provided PGP public key. + + Args: + data: The data to encrypt + public_key: PGP public key to use for encryption + + Returns: + bytes: The encrypted data as bytes + + Raises: + pgpy.errors.PGPError: If there's an error in PGP operations + """ + try: + message = pgpy.PGPMessage.new(data) + + # Configure encryption options + encryption_options = {} + + encrypted_message = public_key.encrypt(message, **encryption_options) + return bytes(encrypted_message) + except pgpy.errors.PGPError as e: + if self.logger is not None: + self.logger.error(f"PGP error in encrypt_data: {str(e)}") + raise + except Exception as e: + if self.logger is not None: + self.logger.error(f"Unexpected error in encrypt_data: {str(e)}") + raise + + def load_public_key(self, key_path: str) -> pgpy.PGPKey: + """ + Load a PGP public key from a file. + + Args: + key_path: Path to the public key file + + Returns: + pgpy.PGPKey: The loaded PGP public key + + Raises: + IOError: If there's an error reading the key file + pgpy.errors.PGPError: If there's an error parsing the key + """ + try: + with open(key_path, "r") as key_file: + key_data = key_file.read() + key, _ = pgpy.PGPKey.from_blob(key_data) + if self.logger is not None: + self.logger.info(f"Successfully loaded public key from: {key_path}") + return key + except IOError as e: + if self.logger is not None: + self.logger.error(f"I/O error loading public key: {str(e)}") + raise + except pgpy.errors.PGPError as e: + if self.logger is not None: + self.logger.error(f"PGP error loading public key: {str(e)}") + raise + except Exception as e: + if self.logger is not None: + self.logger.error(f"Unexpected error loading public key: {str(e)}") + raise + + def _validate_key_for_encryption(self, key: pgpy.PGPKey) -> None: + """ + Validate that a key is suitable for encryption. + + Args: + key: The PGP key to validate + + Raises: + ValueError: If the key is not suitable for encryption + """ + # Enforce that the key must be a public key + if not key.is_public: + if self.logger is not None: + self.logger.error("The provided key is not a public key") + raise ValueError("Only public keys can be used for encryption") + + # Check if key has expired + + now = datetime.utcnow() + if key.expires_at is not None: + expires_at = key.expires_at + # Compare naive datetime objects for expiration check + if expires_at.tzinfo is not None: + # If expires_at is timezone-aware, convert to naive UTC for comparison + expires_at = expires_at.replace(tzinfo=None) + if now > expires_at: + raise ValueError(f"The public key has expired on {expires_at}") + + # Check if key can be used for encryption + # PGP keys typically have usage flags that indicate their capabilities + # We'll check if the key or any of its subkeys can be used for encryption + can_encrypt = False + + # Check if the primary key can encrypt + if hasattr(key, "key_algorithm"): + # RSA, ElGamal, and some other algorithms support encryption + can_encrypt = True + + # If primary key can't encrypt, check subkeys + if not can_encrypt and hasattr(key, "subkeys"): + for subkey in key.subkeys.values(): + # Check if this subkey can be used for encryption + if hasattr(subkey, "key_algorithm"): + can_encrypt = True + break + + if not can_encrypt: + raise ValueError("The public key cannot be used for encryption") + + def validate_path(self, path: str, path_type: str) -> None: + """ + Validate that a path exists, creating directories if necessary. + + Args: + path: The path to validate + path_type: Description of the path type for error messages + + Raises: + FileNotFoundError: If the path doesn't exist and isn't an output directory + """ + file_path = Path(path) + if path_type == "Output directory": + if not file_path.exists(): + file_path.mkdir(parents=True, exist_ok=True) + if self.logger is not None: + self.logger.info(f"Created output directory: {path}") + elif not file_path.exists(): + raise FileNotFoundError(f"{path_type} does not exist: {path}") + + def __enter__(self): + """ + Enter the context manager. + + The context manager pattern allows the class to be used with Python's 'with' statement. + This provides a cleaner way to use the class and ensures proper resource management. + + Example: + with PgpEncryption() as pgp: + pgp.handle_encrypt_operation(...) + + Returns: + PgpEncryption: The instance itself + """ + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Exit the context manager. + + This method is called when exiting the 'with' block. It can be used to clean up + resources, handle exceptions, or perform other finalization tasks. + + Args: + exc_type: Exception type if an exception was raised + exc_val: Exception value if an exception was raised + exc_tb: Exception traceback if an exception was raised + """ + # Currently no resources to clean up, but this method is required for the context manager pattern + pass diff --git a/generator/cybersource-python-template/__init__api.mustache b/generator/cybersource-python-template/__init__api.mustache index 777b65a6..e8b11906 100644 --- a/generator/cybersource-python-template/__init__api.mustache +++ b/generator/cybersource-python-template/__init__api.mustache @@ -2,6 +2,7 @@ from __future__ import absolute_import # import apis into api package from .o_auth_api import OAuthApi +from .batch_upload_with_mtls_api import BatchUploadWithMTLSApi {{#apiInfo}} {{#apis}} from .{{classVarName}} import {{classname}} diff --git a/requirements.txt b/requirements.txt index a2eb9b77..e00174c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ six urllib3 jwcrypto cryptography +pgpy From 7de48ece46e4fa102966c0047e89ea74be5ed237 Mon Sep 17 00:00:00 2001 From: monkumar Date: Fri, 20 Jun 2025 10:52:05 +0530 Subject: [PATCH 02/12] returning the response as a string which is coming from server --- CyberSource/api/batch_upload_with_mtls_api.py | 8 ++++++-- .../pgpBatchUpload/mutual_auth_upload.py | 20 ++++++++++++------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py index db405a95..fbb19eeb 100644 --- a/CyberSource/api/batch_upload_with_mtls_api.py +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -69,7 +69,7 @@ def upload_batch_api_with_separate_key_and_cert_file( server_trust_cert_path: str = None, client_cert_password: Optional[str] = None, verify_ssl: bool = True, - ) -> None: + ) -> str: """ Upload a batch file using separate key and certificate files. @@ -86,6 +86,9 @@ def upload_batch_api_with_separate_key_and_cert_file( client_cert_password: Optional password for the client certificate file verify_ssl: Whether to verify SSL certificates (default: True). Set to False only for development purposes. + Returns: + str: The response data from the server as a string + Raises: ValueError: If required parameters are missing or invalid FileNotFoundError: If any of the required files don't exist @@ -140,7 +143,7 @@ def upload_batch_api_with_separate_key_and_cert_file( "SSL certificate verification is disabled. This should only be used in development environments." ) - self.mutual_auth_upload.handle_upload_operation_using_private_key_and_certs( + response_data = self.mutual_auth_upload.handle_upload_operation_using_private_key_and_certs( encrypted_pgp_bytes=encrypted_pgp_bytes, endpoint_url=endpoint_url, file_name=file_name, @@ -152,6 +155,7 @@ def upload_batch_api_with_separate_key_and_cert_file( ) if self.logger is not None: self.logger.info("Batch file uploaded successfully") + return response_data except ValueError as e: if self.logger is not None: self.logger.error(f"Validation error: {str(e)}") diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py index 7defe965..4380826b 100644 --- a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -34,7 +34,7 @@ def handle_upload_operation_with_certificate( cert_file_password: str, server_trust_cert_path: str = None, verify_ssl: bool = True, - ) -> None: + ) -> str: """ Handle upload operation using certificate file. @@ -46,6 +46,7 @@ def handle_upload_operation_with_certificate( :param server_trust_cert_path: Path to the server trust certificate (optional) :param verify_ssl: Whether to verify SSL certificates (default: True) :raises Exception: If there's an error during the upload process + :return: The response data from the server as a string """ try: self.logger.info("Handling upload operation with certificate") @@ -55,7 +56,7 @@ def handle_upload_operation_with_certificate( "SSL verification is disabled. This should only be used in development environments." ) - self.upload_file( + return self.upload_file( encrypted_pgp_bytes, endpoint_url, file_name, @@ -82,7 +83,7 @@ def handle_upload_operation_using_private_key_and_certs( server_trust_cert_path: str = None, client_cert_password: str = None, verify_ssl: bool = True, - ) -> None: + ) -> str: """ Handle upload operation using private key and certificates. @@ -95,6 +96,7 @@ def handle_upload_operation_using_private_key_and_certs( :param client_cert_password: Password for the client certificate file :param verify_ssl: Whether to verify SSL certificates (default: True) :raises urllib3.exceptions.HTTPError: If there's an error during the upload process + :return: The response data from the server as a string """ try: if self.logger is not None: @@ -105,7 +107,7 @@ def handle_upload_operation_using_private_key_and_certs( self.logger.warning( "SSL verification is disabled. This should only be used in development environments." ) - self.upload_file( + return self.upload_file( encrypted_pgp_bytes, endpoint_url, file_name, @@ -132,7 +134,7 @@ def upload_file( ca_cert_path: str = None, cert_password: str = None, verify_ssl: bool = True, - ) -> None: + ) -> str: """ Upload encrypted PGP file to the specified endpoint. @@ -145,6 +147,7 @@ def upload_file( :param cert_password: Password for the certificate file :param verify_ssl: Whether to verify SSL certificates (default: True) :raises urllib3.exceptions.HTTPError: If there's an error during the upload process + :return: The response data from the server as a string """ try: if self.logger is not None: @@ -162,7 +165,7 @@ def upload_file( cert_password, verify_ssl, ) - self._handle_response(response) + return self._handle_response(response) except HTTPError as e: if self.logger is not None: self.logger.error(f"HTTPError in upload_file: {str(e)}") @@ -241,12 +244,13 @@ def _send_request( fields={field_name: (filename, file_data, content_type)}, ) - def _handle_response(self, response: urllib3.HTTPResponse) -> None: + def _handle_response(self, response: urllib3.HTTPResponse) -> str: """ Handle the HTTP response. :param response: HTTP response :raises urllib3.exceptions.HTTPError: If the response status is not 201 + :return: The response data as a string """ status_code = response.status self.logger.info(f"Upload completed. Response status code: {status_code}") @@ -254,6 +258,8 @@ def _handle_response(self, response: urllib3.HTTPResponse) -> None: if status_code == 201: if self.logger is not None: self.logger.info("File uploaded successfully") + # Decode the response data to a string + return response.data.decode('utf-8') else: error_message = f"File upload failed. Status code: {status_code}, body: {response.data.decode('utf-8')}" if self.logger is not None: From a54db178562268129515b28b6034bf5c3041b5ec Mon Sep 17 00:00:00 2001 From: monkumar Date: Fri, 20 Jun 2025 13:59:34 +0530 Subject: [PATCH 03/12] resolving PR comments --- CyberSource/api/batch_upload_with_mtls_api.py | 201 ++++++++++++------ .../pgpBatchUpload/mutual_auth_upload.py | 73 ++----- 2 files changed, 152 insertions(+), 122 deletions(-) diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py index fbb19eeb..421bd3c7 100644 --- a/CyberSource/api/batch_upload_with_mtls_api.py +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -1,8 +1,24 @@ """ Batch Upload with mTLS API for CyberSource. -This module provides functionality for batch uploads to CyberSource using mTLS authentication, -supporting both file-based and in-memory operations with various configuration options. +This module provides functionality for batch uploads to CyberSource using mutual TLS (mTLS) authentication. +It enables secure transmission of batch transaction files to CyberSource's batch processing system +by combining PGP encryption with mTLS authentication for enhanced security. + +Key features: +- PGP encryption of batch files before transmission +- Mutual TLS authentication for secure server communication +- Support for various certificate and key configurations +- File validation including size and format checks +- Comprehensive error handling and logging + +Typical use cases include: +- Batch processing of payment transactions +- Uploading large volumes of transaction data securely +- Automated scheduled batch uploads in production environments + +This module is designed to work with CyberSource's batch upload API endpoint and +requires proper configuration of certificates and keys for successful operation. """ from pathlib import Path @@ -11,6 +27,7 @@ import CyberSource.logging.log_factory as LogFactory from CyberSource.utilities.pgpBatchUpload.mutual_auth_upload import MutualAuthUpload from CyberSource.utilities.pgpBatchUpload.pgp_encryption import PgpEncryption +from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters def _validate_paths(paths: list) -> None: @@ -21,11 +38,19 @@ def _validate_paths(paths: list) -> None: paths: List of tuples containing (path, description) Raises: + ValueError: If a required path is None or empty FileNotFoundError: If any of the paths don't exist """ for path, description in paths: + # Check if this is an optional parameter (currently only "Server trust certificate") + is_optional = "Server trust certificate" in description + if path is None: - continue # Skip validation for None paths + if is_optional: + continue # Skip validation for None paths that are optional + else: + raise ValueError(f"{description} is required but was None") + if not path: raise ValueError(f"{description} path is required") if not Path(path).exists(): @@ -37,8 +62,18 @@ class BatchUploadWithMTLSApi: A class for handling batch uploads to CyberSource using mTLS authentication. This class provides methods for encrypting and uploading batch files to CyberSource - using PGP encryption and mutual TLS authentication. It supports various authentication - methods including separate key/cert files. + using PGP encryption and mutual TLS authentication. It orchestrates the entire process + from file validation, PGP encryption, to secure transmission via mTLS. + + The class follows a secure workflow: + 1. Validates input files and certificates + 2. Encrypts the input file using PGP encryption + 3. Establishes a secure mTLS connection with CyberSource + 4. Uploads the encrypted file to CyberSource's batch processing endpoint + 5. Handles responses and provides comprehensive error reporting + + This class supports various authentication methods including separate key/cert files + and can be used either directly or as a context manager with the 'with' statement. Attributes: logger: Logger instance for logging operations and errors @@ -47,52 +82,106 @@ class BatchUploadWithMTLSApi: """ _end_point = "/pts/v1/transaction-batch-upload" + _max_size_bytes = 75 * 1024 * 1024 # 75 MB in bytes def __init__(self, log_config=None): """ Initialize the BatchUploadWithMTLSApi. Args: - log_config: Optional configuration for the logger + log_config: Optional configuration for the logger. If provided, it should be + a LogConfiguration instance with appropriate settings. + + Example: + ```python + from CyberSource.logging.log_configuration import LogConfiguration + + # Create and configure the logger + log_config = LogConfiguration() + log_config.set_enable_log(True) + log_config.set_log_directory("logs") + log_config.set_log_file_name("cybersource_batch.log") + log_config.set_log_maximum_size(10 * 1024 * 1024) # 10 MB + log_config.set_log_level("INFO") + log_config.set_enable_masking(True) + log_config.set_log_format("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + log_config.set_log_date_format("%Y-%m-%d %H:%M:%S") + + # Create the batch upload API instance with logging + batch_api = BatchUploadWithMTLSApi(log_config) + ``` """ self.logger = LogFactory.setup_logger(self.__class__.__name__, log_config) self.pgp_encryption = PgpEncryption(log_config) self.mutual_auth_upload = MutualAuthUpload(log_config) - def upload_batch_api_with_separate_key_and_cert_file( + def upload_batch_api_with_key_and_certs_file( self, - input_file: str, + input_file_path: str, environment_hostname: str, pgp_encryption_public_key_path: str, client_cert_path: str, client_key_path: str, server_trust_cert_path: str = None, - client_cert_password: Optional[str] = None, + client_key_password: Optional[str] = None, verify_ssl: bool = True, - ) -> str: + ) -> tuple: """ Upload a batch file using separate key and certificate files. - This method encrypts the input file using PGP and uploads it to CyberSource - using mutual TLS authentication with separate key and certificate files. + This method handles the complete process of batch file upload to CyberSource: + 1. Validates all input parameters and files + 2. Checks file size (must be under 75MB) and format (must be .csv) + 3. Encrypts the input file using PGP with the provided public key + 4. Uploads the encrypted file to CyberSource using mutual TLS authentication + 5. Returns the server response or raises appropriate exceptions + + The method supports secure communication through mutual TLS authentication, + where both the client and server authenticate each other using certificates. Args: - input_file: Path to the file to upload - environment_hostname: CyberSource environment hostname - pgp_encryption_public_key_path: Path to the PGP public key file - client_cert_path: Path to the client certificate file - client_key_path: Path to the client private key file + input_file_path: Path to the CSV file to upload (must exist and be under 75MB) + environment_hostname: CyberSource environment hostname (e.g., "apitest.cybersource.com") + pgp_encryption_public_key_path: Path to the PGP public key file for encryption + client_cert_path: Path to the client certificate file (.pem or .crt) + client_key_path: Path to the client private key file (.key) server_trust_cert_path: Path to the server trust certificate file (optional) - client_cert_password: Optional password for the client certificate file - verify_ssl: Whether to verify SSL certificates (default: True). Set to False only for development purposes. + If not provided, system CA certificates will be used + client_key_password: Optional password for the client private key if it's encrypted + verify_ssl: Whether to verify SSL certificates (default: True) + Set to False only for development/testing purposes. + WARNING: Disabling SSL verification poses a significant security risk + and should never be used in production environments as it makes the + connection vulnerable to man-in-the-middle attacks. Returns: - str: The response data from the server as a string + tuple: A tuple containing (response_data, status_code, headers) where: + - response_data: The response data from the server as a string, typically containing a batch ID and status information + - status_code: The HTTP status code of the response + - headers: The HTTP headers of the response Raises: - ValueError: If required parameters are missing or invalid + ValueError: If required parameters are missing or invalid (e.g., file too large) FileNotFoundError: If any of the required files don't exist - requests.exceptions.RequestException: If there's an error during the upload process + urllib3.exceptions.HTTPError: If there's an error during the upload process + Exception: For any other unexpected errors + + Example: + ```python + batch_api = BatchUploadWithMTLSApi() + try: + response = batch_api.upload_batch_api_with_key_and_certs_file( + input_file_path="path/to/transactions.csv", + environment_hostname="apitest.cybersource.com", + pgp_encryption_public_key_path="path/to/cybersource_public.asc", + client_cert_path="path/to/client.crt", + client_key_path="path/to/client.key", + server_trust_cert_path="path/to/server_ca.crt" + ) + print(f"Upload successful. Response: {response}") + except Exception as e: + print(f"Upload failed: {str(e)}") + ``` """ try: # Step 1: Create endpoint URL @@ -101,7 +190,7 @@ def upload_batch_api_with_separate_key_and_cert_file( # Step 2: Validations _validate_paths( [ - (input_file, "Input file"), + (input_file_path, "Input file"), (pgp_encryption_public_key_path, "PGP public key"), (client_cert_path, "Client certificate"), (client_key_path, "Client private key"), @@ -110,16 +199,15 @@ def upload_batch_api_with_separate_key_and_cert_file( ) # Validate file size (maximum 75 MB) - file_size = Path(input_file).stat().st_size - max_size_bytes = 75 * 1024 * 1024 # 75 MB in bytes - if file_size > max_size_bytes: - error_msg = f"Input file size ({file_size} bytes) exceeds the maximum allowed size of 75 MB ({max_size_bytes} bytes)" + file_size = Path(input_file_path).stat().st_size + if file_size > self._max_size_bytes: + error_msg = f"Input file size ({file_size} bytes) exceeds the maximum allowed size of 75 MB ({self._max_size_bytes} bytes)" if self.logger is not None: self.logger.error(error_msg) raise ValueError(error_msg) # Validate file extension (.csv) - file_extension = Path(input_file).suffix.lower() + file_extension = Path(input_file_path).suffix.lower() if file_extension != ".csv": error_msg = ( f"Input file must have a .csv extension, but got '{file_extension}'" @@ -130,32 +218,32 @@ def upload_batch_api_with_separate_key_and_cert_file( # Step 3: PGP encryption encrypted_pgp_bytes = self.pgp_encryption.handle_encrypt_operation( - input_file, pgp_encryption_public_key_path + input_file_path, pgp_encryption_public_key_path ) # Step 4: Upload the encrypted PGP file using mTLS # Replace the file extension with .pgp - file_name = Path(input_file).stem + ".pgp" - + file_name = Path(input_file_path).stem + ".pgp" + # Log a warning if SSL verification is disabled if not verify_ssl and self.logger is not None: self.logger.warning( - "SSL certificate verification is disabled. This should only be used in development environments." + "SSL certificate verification is disabled. This should not be used in production environments." ) - - response_data = self.mutual_auth_upload.handle_upload_operation_using_private_key_and_certs( + + response_tuple = self.mutual_auth_upload.handle_upload_operation_using_private_key_and_certs( encrypted_pgp_bytes=encrypted_pgp_bytes, endpoint_url=endpoint_url, file_name=file_name, client_private_key_path=client_key_path, client_cert_path=client_cert_path, server_trust_cert_path=server_trust_cert_path, - client_cert_password=client_cert_password, + client_key_password=client_key_password, verify_ssl=verify_ssl, ) if self.logger is not None: self.logger.info("Batch file uploaded successfully") - return response_data + return response_tuple except ValueError as e: if self.logger is not None: self.logger.error(f"Validation error: {str(e)}") @@ -176,37 +264,26 @@ def get_base_url(environment_hostname: str) -> str: """ Get the base URL from the environment hostname. + This method ensures that the environment hostname is properly formatted as a URL + by adding the 'https://' prefix if not already present. It also validates that + the hostname is not empty. + Args: - environment_hostname: CyberSource environment hostname + environment_hostname: CyberSource environment hostname (e.g., "apitest.cybersource.com") Returns: - str: The base URL with https:// prefix if not already present + str: The base URL with https:// prefix (e.g., "https://apitest.cybersource.com") + + Raises: + ValueError: If the environment hostname is None, empty, or consists only of whitespace + """ if not environment_hostname: - raise ValueError("Environment hostname is required") + raise ValueError( + "Environment Host Name for Batch Upload API cannot be null or empty." + ) base_url = environment_hostname.strip() - if not base_url.startswith("https://"): - base_url = "https://" + base_url + if not base_url.startswith(GlobalLabelParameters.HTTP_URL_PREFIX): + base_url = GlobalLabelParameters.HTTP_URL_PREFIX + base_url return base_url - - def __enter__(self): - """ - Enter the context manager. - - Returns: - BatchUploadWithMTLSApi: The instance itself - """ - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Exit the context manager. - - Args: - exc_type: Exception type if an exception was raised - exc_val: Exception value if an exception was raised - exc_tb: Exception traceback if an exception was raised - """ - # Currently no resources to clean up - pass diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py index 4380826b..f744cdfe 100644 --- a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -25,54 +25,6 @@ class MutualAuthUpload: def __init__(self, log_config=None): self.logger = LogFactory.setup_logger(self.__class__.__name__, log_config) - def handle_upload_operation_with_certificate( - self, - encrypted_pgp_bytes: bytes, - endpoint_url: str, - file_name: str, - cert_file_path: str, - cert_file_password: str, - server_trust_cert_path: str = None, - verify_ssl: bool = True, - ) -> str: - """ - Handle upload operation using certificate file. - - :param encrypted_pgp_bytes: Encrypted PGP file content - :param endpoint_url: URL to upload the file - :param file_name: Name of the file to be uploaded - :param cert_file_path: Path to the certificate file - :param cert_file_password: Password for the certificate file - :param server_trust_cert_path: Path to the server trust certificate (optional) - :param verify_ssl: Whether to verify SSL certificates (default: True) - :raises Exception: If there's an error during the upload process - :return: The response data from the server as a string - """ - try: - self.logger.info("Handling upload operation with certificate") - - if not verify_ssl and self.logger is not None: - self.logger.warning( - "SSL verification is disabled. This should only be used in development environments." - ) - - return self.upload_file( - encrypted_pgp_bytes, - endpoint_url, - file_name, - client_cert_path=cert_file_path, - client_key_path=cert_file_path, - ca_cert_path=server_trust_cert_path, - cert_password=cert_file_password, - verify_ssl=verify_ssl, - ) - except HTTPError as e: - if self.logger is not None: - self.logger.error( - f"Error in handle_upload_operation_with_certificate: {str(e)}" - ) - raise - def handle_upload_operation_using_private_key_and_certs( self, encrypted_pgp_bytes: bytes, @@ -81,9 +33,9 @@ def handle_upload_operation_using_private_key_and_certs( client_private_key_path: str, client_cert_path: str, server_trust_cert_path: str = None, - client_cert_password: str = None, + client_key_password: str = None, verify_ssl: bool = True, - ) -> str: + ) -> tuple: """ Handle upload operation using private key and certificates. @@ -93,10 +45,10 @@ def handle_upload_operation_using_private_key_and_certs( :param client_private_key_path: Path to the client private key file :param client_cert_path: Path to the client certificate file :param server_trust_cert_path: Path to the server trust certificate file - :param client_cert_password: Password for the client certificate file + :param client_key_password: Password for the client key file :param verify_ssl: Whether to verify SSL certificates (default: True) :raises urllib3.exceptions.HTTPError: If there's an error during the upload process - :return: The response data from the server as a string + :return: A tuple containing (response_data, status_code, headers) """ try: if self.logger is not None: @@ -114,7 +66,7 @@ def handle_upload_operation_using_private_key_and_certs( client_cert_path=client_cert_path, client_key_path=client_private_key_path, ca_cert_path=server_trust_cert_path, - cert_password=client_cert_password, + cert_password=client_key_password, verify_ssl=verify_ssl, ) except HTTPError as e: @@ -134,7 +86,7 @@ def upload_file( ca_cert_path: str = None, cert_password: str = None, verify_ssl: bool = True, - ) -> str: + ) -> tuple: """ Upload encrypted PGP file to the specified endpoint. @@ -147,7 +99,7 @@ def upload_file( :param cert_password: Password for the certificate file :param verify_ssl: Whether to verify SSL certificates (default: True) :raises urllib3.exceptions.HTTPError: If there's an error during the upload process - :return: The response data from the server as a string + :return: A tuple containing (response_data, status_code, headers) """ try: if self.logger is not None: @@ -244,22 +196,23 @@ def _send_request( fields={field_name: (filename, file_data, content_type)}, ) - def _handle_response(self, response: urllib3.HTTPResponse) -> str: + def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: """ Handle the HTTP response. :param response: HTTP response - :raises urllib3.exceptions.HTTPError: If the response status is not 201 - :return: The response data as a string + :raises urllib3.exceptions.HTTPError: If the response status is not successful (2xx) + :return: A tuple containing (response_data, status_code, headers) """ status_code = response.status self.logger.info(f"Upload completed. Response status code: {status_code}") - if status_code == 201: + if 200 <= status_code < 300: if self.logger is not None: self.logger.info("File uploaded successfully") # Decode the response data to a string - return response.data.decode('utf-8') + response_data = response.data.decode('utf-8') + return (response_data, status_code, response.headers) else: error_message = f"File upload failed. Status code: {status_code}, body: {response.data.decode('utf-8')}" if self.logger is not None: From efe0373e5622df643cd998a122a59bdf121c2d9a Mon Sep 17 00:00:00 2001 From: monkumar Date: Fri, 20 Jun 2025 14:02:55 +0530 Subject: [PATCH 04/12] added batch upload mtls in generation script --- generator/cybersource_python_sdk_gen.bat | 1 + 1 file changed, 1 insertion(+) diff --git a/generator/cybersource_python_sdk_gen.bat b/generator/cybersource_python_sdk_gen.bat index fb87e0a9..63e91254 100644 --- a/generator/cybersource_python_sdk_gen.bat +++ b/generator/cybersource_python_sdk_gen.bat @@ -42,6 +42,7 @@ git checkout ..\README.md git checkout ..\setup.py git checkout ..\CyberSource\api\o_auth_api.py +git checkout ..\CyberSource\api\batch_upload_with_mtls_api.py git checkout ..\CyberSource\models\access_token_response.py git checkout ..\CyberSource\models\bad_request_error.py git checkout ..\CyberSource\models\create_access_token_request.py From d62793c4f8f204c8057fda857453b9f192ef7655 Mon Sep 17 00:00:00 2001 From: monkumar Date: Fri, 20 Jun 2025 16:30:33 +0530 Subject: [PATCH 05/12] resolvinf PR comments --- CyberSource/api/batch_upload_with_mtls_api.py | 46 +++----- .../pgpBatchUpload/mutual_auth_upload.py | 104 +++++++++++------- .../pgpBatchUpload/pgp_encryption.py | 26 +---- 3 files changed, 83 insertions(+), 93 deletions(-) diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py index 421bd3c7..9784883a 100644 --- a/CyberSource/api/batch_upload_with_mtls_api.py +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -27,34 +27,10 @@ import CyberSource.logging.log_factory as LogFactory from CyberSource.utilities.pgpBatchUpload.mutual_auth_upload import MutualAuthUpload from CyberSource.utilities.pgpBatchUpload.pgp_encryption import PgpEncryption +from CyberSource.utilities.file_utils import validate_paths from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters - - -def _validate_paths(paths: list) -> None: - """ - Validate that all specified paths exist. - - Args: - paths: List of tuples containing (path, description) - - Raises: - ValueError: If a required path is None or empty - FileNotFoundError: If any of the paths don't exist - """ - for path, description in paths: - # Check if this is an optional parameter (currently only "Server trust certificate") - is_optional = "Server trust certificate" in description - - if path is None: - if is_optional: - continue # Skip validation for None paths that are optional - else: - raise ValueError(f"{description} is required but was None") - - if not path: - raise ValueError(f"{description} path is required") - if not Path(path).exists(): - raise FileNotFoundError(f"{description} not found: {path}") +from CyberSource.rest import ApiException +from urllib3.exceptions import HTTPError class BatchUploadWithMTLSApi: @@ -164,6 +140,7 @@ def upload_batch_api_with_key_and_certs_file( ValueError: If required parameters are missing or invalid (e.g., file too large) FileNotFoundError: If any of the required files don't exist urllib3.exceptions.HTTPError: If there's an error during the upload process + CyberSource.rest.ApiException: If the response status is not successful (2xx) Exception: For any other unexpected errors Example: @@ -188,14 +165,15 @@ def upload_batch_api_with_key_and_certs_file( endpoint_url = self.get_base_url(environment_hostname) + self._end_point # Step 2: Validations - _validate_paths( + validate_paths( [ (input_file_path, "Input file"), (pgp_encryption_public_key_path, "PGP public key"), (client_cert_path, "Client certificate"), (client_key_path, "Client private key"), (server_trust_cert_path, "Server trust certificate"), - ] + ], + self.logger, ) # Validate file size (maximum 75 MB) @@ -252,10 +230,18 @@ def upload_batch_api_with_key_and_certs_file( if self.logger is not None: self.logger.error(f"File not found: {str(e)}") raise + except HTTPError as e: + if self.logger is not None: + self.logger.error(f"HTTP error: {str(e)}") + raise + except ApiException as e: + if self.logger is not None: + self.logger.error(f"API error: {str(e)}") + raise except Exception as e: if self.logger is not None: self.logger.error( - f"Error in upload_batch_api_with_separate_key_and_cert_file: {str(e)}" + f"Error in upload_batch_api_with_key_and_certs_file: {str(e)}" ) raise diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py index f744cdfe..161401f2 100644 --- a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -8,6 +8,7 @@ import CyberSource.logging.log_factory as LogFactory from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters +from CyberSource.rest import ApiException def _prepare_files(encrypted_pgp_bytes: bytes, file_name: str) -> tuple: @@ -48,6 +49,7 @@ def handle_upload_operation_using_private_key_and_certs( :param client_key_password: Password for the client key file :param verify_ssl: Whether to verify SSL certificates (default: True) :raises urllib3.exceptions.HTTPError: If there's an error during the upload process + :raises CyberSource.rest.ApiException: If the response status is not successful (2xx) :return: A tuple containing (response_data, status_code, headers) """ try: @@ -69,7 +71,7 @@ def handle_upload_operation_using_private_key_and_certs( cert_password=client_key_password, verify_ssl=verify_ssl, ) - except HTTPError as e: + except (HTTPError, ApiException) as e: if self.logger is not None: self.logger.error( f"Error in handle_upload_operation_using_private_key_and_certs: {str(e)}" @@ -99,6 +101,7 @@ def upload_file( :param cert_password: Password for the certificate file :param verify_ssl: Whether to verify SSL certificates (default: True) :raises urllib3.exceptions.HTTPError: If there's an error during the upload process + :raises CyberSource.rest.ApiException: If the response status is not successful (2xx) :return: A tuple containing (response_data, status_code, headers) """ try: @@ -122,6 +125,10 @@ def upload_file( if self.logger is not None: self.logger.error(f"HTTPError in upload_file: {str(e)}") raise + except ApiException as e: + if self.logger is not None: + self.logger.error(f"ApiException in upload_file: {str(e)}") + raise def _create_headers(self) -> dict: correlation_id = str(uuid.uuid4()) @@ -152,56 +159,72 @@ def _send_request( :param cert_password: Password for the certificate file :param verify_ssl: Whether to verify SSL certificates (default: True) :return: HTTP response + :raises urllib3.exceptions.HTTPError: If there's an error during the HTTP request + :raises ssl.SSLError: If there's an SSL-related error + :raises Exception: For any other unexpected errors """ if self.logger is not None: self.logger.info(f"Sending HTTP POST request to URL: {endpoint_url}") - if verify_ssl: - # Create SSL context with default CA certificates - ssl_context = ssl.create_default_context(cafile=certifi.where()) + try: + if verify_ssl: + # Create SSL context with default CA certificates + ssl_context = ssl.create_default_context(cafile=certifi.where()) + + # If server trust cert path is provided, add it to the context + if ca_cert_path: + ssl_context.load_verify_locations(cafile=ca_cert_path) + + # Create a pool manager with the SSL context + http = urllib3.PoolManager( + cert_file=client_cert_path, + key_file=client_key_path, + key_password=cert_password, + ssl_context=ssl_context, + ) + else: + # Create a pool manager with SSL verification disabled + if self.logger is not None: + self.logger.warning("SSL certificate verification is disabled") + + # Use direct parameters to disable SSL verification + http = urllib3.PoolManager( + cert_file=client_cert_path, + key_file=client_key_path, + key_password=cert_password, + cert_reqs=ssl.CERT_NONE, + assert_hostname=False, + ) - # If server trust cert path is provided, add it to the context - if ca_cert_path: - ssl_context.load_verify_locations(cafile=ca_cert_path) + # Prepare multipart form data + field_name, filename, file_data, content_type = file_tuple - # Create a pool manager with the SSL context - http = urllib3.PoolManager( - cert_file=client_cert_path, - key_file=client_key_path, - key_password=cert_password, - ssl_context=ssl_context, + # Send the request + return http.request( + "POST", + endpoint_url, + headers=headers, + fields={field_name: (filename, file_data, content_type)}, ) - else: - # Create a pool manager with SSL verification disabled + except HTTPError as e: if self.logger is not None: - self.logger.warning("SSL certificate verification is disabled") - - # Use direct parameters to disable SSL verification - http = urllib3.PoolManager( - cert_file=client_cert_path, - key_file=client_key_path, - key_password=cert_password, - cert_reqs=ssl.CERT_NONE, - assert_hostname=False, - ) - - # Prepare multipart form data - field_name, filename, file_data, content_type = file_tuple - - # Send the request - return http.request( - "POST", - endpoint_url, - headers=headers, - fields={field_name: (filename, file_data, content_type)}, - ) + self.logger.error(f"HTTP error in _send_request: {str(e)}") + raise + except ssl.SSLError as e: + if self.logger is not None: + self.logger.error(f"SSL error in _send_request: {str(e)}") + raise HTTPError(f"SSL error: {str(e)}") + except Exception as e: + if self.logger is not None: + self.logger.error(f"Unexpected error in _send_request: {str(e)}") + raise HTTPError(f"Unexpected error during HTTP request: {str(e)}") def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: """ Handle the HTTP response. :param response: HTTP response - :raises urllib3.exceptions.HTTPError: If the response status is not successful (2xx) + :raises CyberSource.rest.ApiException: If the response status is not successful (2xx) :return: A tuple containing (response_data, status_code, headers) """ status_code = response.status @@ -211,10 +234,11 @@ def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: if self.logger is not None: self.logger.info("File uploaded successfully") # Decode the response data to a string - response_data = response.data.decode('utf-8') - return (response_data, status_code, response.headers) + response_data = response.data.decode("utf-8") + return response_data, status_code, response.headers else: error_message = f"File upload failed. Status code: {status_code}, body: {response.data.decode('utf-8')}" if self.logger is not None: self.logger.error(error_message) - raise HTTPError(error_message) + # Throw ApiException instead of HTTPError for consistency with rest.py + raise ApiException(http_resp=response) diff --git a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py index 7227b3b7..fc22536f 100644 --- a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py +++ b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py @@ -7,11 +7,11 @@ import os from datetime import datetime -from pathlib import Path import pgpy import CyberSource.logging.log_factory as LogFactory +from CyberSource.utilities.file_utils import validate_path class PgpEncryption: @@ -59,8 +59,8 @@ def handle_encrypt_operation( if input_file is None or pgp_public_key is None: raise ValueError("Missing required options for encrypt operation") - self.validate_path(input_file, "Input file") - self.validate_path(pgp_public_key, "Public key") + validate_path(input_file, "Input file", self.logger) + validate_path(pgp_public_key, "Public key", self.logger) input_file_name = os.path.basename(input_file) public_key = self.load_public_key(pgp_public_key) @@ -251,26 +251,6 @@ def _validate_key_for_encryption(self, key: pgpy.PGPKey) -> None: if not can_encrypt: raise ValueError("The public key cannot be used for encryption") - def validate_path(self, path: str, path_type: str) -> None: - """ - Validate that a path exists, creating directories if necessary. - - Args: - path: The path to validate - path_type: Description of the path type for error messages - - Raises: - FileNotFoundError: If the path doesn't exist and isn't an output directory - """ - file_path = Path(path) - if path_type == "Output directory": - if not file_path.exists(): - file_path.mkdir(parents=True, exist_ok=True) - if self.logger is not None: - self.logger.info(f"Created output directory: {path}") - elif not file_path.exists(): - raise FileNotFoundError(f"{path_type} does not exist: {path}") - def __enter__(self): """ Enter the context manager. From f4a58d54cbc3e25a90e72b18da759e51afdf2d78 Mon Sep 17 00:00:00 2001 From: monkumar Date: Fri, 20 Jun 2025 17:09:03 +0530 Subject: [PATCH 06/12] added file utils --- CyberSource/utilities/file_utils.py | 59 +++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 CyberSource/utilities/file_utils.py diff --git a/CyberSource/utilities/file_utils.py b/CyberSource/utilities/file_utils.py new file mode 100644 index 00000000..a0c364e1 --- /dev/null +++ b/CyberSource/utilities/file_utils.py @@ -0,0 +1,59 @@ +""" +File Utility Functions for CyberSource. + +This module provides utility functions for file operations, +including path validation and directory creation. +""" + +from pathlib import Path +from typing import List, Tuple + + +def validate_path(path: str, path_type: str, logger=None) -> None: + """ + Validate that a path exists, creating directories if necessary. + + Args: + path: The path to validate + path_type: Description of the path type for error messages + logger: Optional logger instance for logging operations + + Raises: + FileNotFoundError: If the path doesn't exist and isn't an output directory + """ + file_path = Path(path) + if path_type == "Output directory": + if not file_path.exists(): + file_path.mkdir(parents=True, exist_ok=True) + if logger is not None: + logger.info(f"Created output directory: {path}") + elif not file_path.exists(): + raise FileNotFoundError(f"{path_type} does not exist: {path}") + + +def validate_paths(paths: List[Tuple[str, str]], logger=None) -> None: + """ + Validate that all specified paths exist. + + Args: + paths: List of tuples containing (path, description) + logger: Optional logger instance for logging operations + + Raises: + ValueError: If a required path is None or empty + FileNotFoundError: If any of the paths don't exist + """ + for path, description in paths: + # Check if this is an optional parameter (currently only "Server trust certificate") + is_optional = "Server trust certificate" in description + + if path is None: + if is_optional: + continue # Skip validation for None paths that are optional + else: + raise ValueError(f"{description} is required but was None") + + if not path: + raise ValueError(f"{description} path is required") + if not Path(path).exists(): + raise FileNotFoundError(f"{description} not found: {path}") From 087996ac3ab11e32f89ad1968f4510c9b9cbe879 Mon Sep 17 00:00:00 2001 From: monkumar Date: Fri, 20 Jun 2025 17:42:28 +0530 Subject: [PATCH 07/12] minor fix --- CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py index 161401f2..2a11fa64 100644 --- a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -228,7 +228,8 @@ def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: :return: A tuple containing (response_data, status_code, headers) """ status_code = response.status - self.logger.info(f"Upload completed. Response status code: {status_code}") + if self.logger is not None: + self.logger.info(f"Upload completed. Response status code: {status_code}") if 200 <= status_code < 300: if self.logger is not None: From d4c25ea0f3b489ec494f650bdc53ee7b32b47e06 Mon Sep 17 00:00:00 2001 From: monkumar Date: Fri, 20 Jun 2025 18:47:18 +0530 Subject: [PATCH 08/12] some fixs --- CyberSource/api/batch_upload_with_mtls_api.py | 5 +++ .../pgpBatchUpload/mutual_auth_upload.py | 8 ++-- .../pgpBatchUpload/pgp_encryption.py | 40 +++++-------------- 3 files changed, 19 insertions(+), 34 deletions(-) diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py index 9784883a..eab5e8f0 100644 --- a/CyberSource/api/batch_upload_with_mtls_api.py +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -142,6 +142,11 @@ def upload_batch_api_with_key_and_certs_file( urllib3.exceptions.HTTPError: If there's an error during the upload process CyberSource.rest.ApiException: If the response status is not successful (2xx) Exception: For any other unexpected errors + + Notes: + This method will log warnings but not raise exceptions if: + - The provided PGP key is not a public key + - The PGP public key has expired Example: ```python diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py index 2a11fa64..1d783db7 100644 --- a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -11,9 +11,9 @@ from CyberSource.rest import ApiException -def _prepare_files(encrypted_pgp_bytes: bytes, file_name: str) -> tuple: +def _create_file_form_field_tuple(encrypted_pgp_bytes: bytes, file_name: str) -> tuple: """ - Prepare the file data for multipart/form-data upload with urllib3. + Create a tuple that represents a file field for a multipart/form-data upload with urllib3. :param encrypted_pgp_bytes: Encrypted PGP file content :param file_name: Name of the file to be uploaded @@ -108,7 +108,7 @@ def upload_file( if self.logger is not None: self.logger.info("Starting file upload process") headers = self._create_headers() - file_tuple = _prepare_files(encrypted_pgp_bytes, file_name) + file_tuple = _create_file_form_field_tuple(encrypted_pgp_bytes, file_name) response = self._send_request( endpoint_url, @@ -242,4 +242,4 @@ def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: if self.logger is not None: self.logger.error(error_message) # Throw ApiException instead of HTTPError for consistency with rest.py - raise ApiException(http_resp=response) + raise ApiException(http_resp=response, status=status_code, reason=error_message) diff --git a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py index fc22536f..9e48b8a4 100644 --- a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py +++ b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py @@ -54,6 +54,11 @@ def handle_encrypt_operation( FileNotFoundError: If input file or public key file doesn't exist pgpy.errors.PGPError: If there's an error in PGP operations IOError: If there's an error reading or writing files + + Notes: + This method will log warnings but not raise exceptions if: + - The provided key is not a public key + - The public key has expired """ try: if input_file is None or pgp_public_key is None: @@ -208,18 +213,13 @@ def _validate_key_for_encryption(self, key: pgpy.PGPKey) -> None: Args: key: The PGP key to validate - - Raises: - ValueError: If the key is not suitable for encryption """ - # Enforce that the key must be a public key + # Check if the key is a public key (log warning instead of raising exception) if not key.is_public: if self.logger is not None: - self.logger.error("The provided key is not a public key") - raise ValueError("Only public keys can be used for encryption") - - # Check if key has expired + self.logger.warning("The provided key is not a public key. This may cause issues with encryption.") + # Check if key has expired (log warning instead of raising exception) now = datetime.utcnow() if key.expires_at is not None: expires_at = key.expires_at @@ -228,28 +228,8 @@ def _validate_key_for_encryption(self, key: pgpy.PGPKey) -> None: # If expires_at is timezone-aware, convert to naive UTC for comparison expires_at = expires_at.replace(tzinfo=None) if now > expires_at: - raise ValueError(f"The public key has expired on {expires_at}") - - # Check if key can be used for encryption - # PGP keys typically have usage flags that indicate their capabilities - # We'll check if the key or any of its subkeys can be used for encryption - can_encrypt = False - - # Check if the primary key can encrypt - if hasattr(key, "key_algorithm"): - # RSA, ElGamal, and some other algorithms support encryption - can_encrypt = True - - # If primary key can't encrypt, check subkeys - if not can_encrypt and hasattr(key, "subkeys"): - for subkey in key.subkeys.values(): - # Check if this subkey can be used for encryption - if hasattr(subkey, "key_algorithm"): - can_encrypt = True - break - - if not can_encrypt: - raise ValueError("The public key cannot be used for encryption") + if self.logger is not None: + self.logger.warning(f"The public key has expired on {expires_at}. This may cause issues with encryption.") def __enter__(self): """ From 268d88265ee654d70afa075cfdf08fcdcbf39204 Mon Sep 17 00:00:00 2001 From: monkumar Date: Sun, 22 Jun 2025 21:32:01 +0530 Subject: [PATCH 09/12] fixed exception handelling --- CyberSource/api/batch_upload_with_mtls_api.py | 42 +------ CyberSource/utilities/file_utils.py | 46 +++---- .../pgpBatchUpload/mutual_auth_upload.py | 112 +++++++++++++----- .../pgpBatchUpload/pgp_encryption.py | 18 ++- 4 files changed, 111 insertions(+), 107 deletions(-) diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py index eab5e8f0..b7a7fb2f 100644 --- a/CyberSource/api/batch_upload_with_mtls_api.py +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -1,36 +1,12 @@ -""" -Batch Upload with mTLS API for CyberSource. - -This module provides functionality for batch uploads to CyberSource using mutual TLS (mTLS) authentication. -It enables secure transmission of batch transaction files to CyberSource's batch processing system -by combining PGP encryption with mTLS authentication for enhanced security. - -Key features: -- PGP encryption of batch files before transmission -- Mutual TLS authentication for secure server communication -- Support for various certificate and key configurations -- File validation including size and format checks -- Comprehensive error handling and logging - -Typical use cases include: -- Batch processing of payment transactions -- Uploading large volumes of transaction data securely -- Automated scheduled batch uploads in production environments - -This module is designed to work with CyberSource's batch upload API endpoint and -requires proper configuration of certificates and keys for successful operation. -""" - from pathlib import Path from typing import Optional import CyberSource.logging.log_factory as LogFactory from CyberSource.utilities.pgpBatchUpload.mutual_auth_upload import MutualAuthUpload from CyberSource.utilities.pgpBatchUpload.pgp_encryption import PgpEncryption -from CyberSource.utilities.file_utils import validate_paths +from CyberSource.utilities.file_utils import validate_path from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters from CyberSource.rest import ApiException -from urllib3.exceptions import HTTPError class BatchUploadWithMTLSApi: @@ -139,10 +115,9 @@ def upload_batch_api_with_key_and_certs_file( Raises: ValueError: If required parameters are missing or invalid (e.g., file too large) FileNotFoundError: If any of the required files don't exist - urllib3.exceptions.HTTPError: If there's an error during the upload process - CyberSource.rest.ApiException: If the response status is not successful (2xx) + CyberSource.rest.ApiException: If there's an error during the upload process or if the response status is not successful (2xx) Exception: For any other unexpected errors - + Notes: This method will log warnings but not raise exceptions if: - The provided PGP key is not a public key @@ -170,15 +145,14 @@ def upload_batch_api_with_key_and_certs_file( endpoint_url = self.get_base_url(environment_hostname) + self._end_point # Step 2: Validations - validate_paths( + validate_path( [ (input_file_path, "Input file"), (pgp_encryption_public_key_path, "PGP public key"), (client_cert_path, "Client certificate"), (client_key_path, "Client private key"), (server_trust_cert_path, "Server trust certificate"), - ], - self.logger, + ] ) # Validate file size (maximum 75 MB) @@ -211,7 +185,7 @@ def upload_batch_api_with_key_and_certs_file( # Log a warning if SSL verification is disabled if not verify_ssl and self.logger is not None: self.logger.warning( - "SSL certificate verification is disabled. This should not be used in production environments." + "SSL verification is disabled. This should not be used in production environments." ) response_tuple = self.mutual_auth_upload.handle_upload_operation_using_private_key_and_certs( @@ -235,10 +209,6 @@ def upload_batch_api_with_key_and_certs_file( if self.logger is not None: self.logger.error(f"File not found: {str(e)}") raise - except HTTPError as e: - if self.logger is not None: - self.logger.error(f"HTTP error: {str(e)}") - raise except ApiException as e: if self.logger is not None: self.logger.error(f"API error: {str(e)}") diff --git a/CyberSource/utilities/file_utils.py b/CyberSource/utilities/file_utils.py index a0c364e1..18b4e8b4 100644 --- a/CyberSource/utilities/file_utils.py +++ b/CyberSource/utilities/file_utils.py @@ -1,49 +1,30 @@ -""" -File Utility Functions for CyberSource. - -This module provides utility functions for file operations, -including path validation and directory creation. -""" - from pathlib import Path from typing import List, Tuple -def validate_path(path: str, path_type: str, logger=None) -> None: - """ - Validate that a path exists, creating directories if necessary. - - Args: - path: The path to validate - path_type: Description of the path type for error messages - logger: Optional logger instance for logging operations - - Raises: - FileNotFoundError: If the path doesn't exist and isn't an output directory - """ - file_path = Path(path) - if path_type == "Output directory": - if not file_path.exists(): - file_path.mkdir(parents=True, exist_ok=True) - if logger is not None: - logger.info(f"Created output directory: {path}") - elif not file_path.exists(): - raise FileNotFoundError(f"{path_type} does not exist: {path}") - - -def validate_paths(paths: List[Tuple[str, str]], logger=None) -> None: +def validate_path(paths: List[Tuple[str, str]]) -> None: """ Validate that all specified paths exist. Args: paths: List of tuples containing (path, description) - logger: Optional logger instance for logging operations Raises: ValueError: If a required path is None or empty FileNotFoundError: If any of the paths don't exist + TypeError: If the paths parameter is not a list of tuples """ - for path, description in paths: + if not isinstance(paths, list): + raise TypeError("paths must be a list of (path, description) tuples") + + for path_tuple in paths: + if not isinstance(path_tuple, tuple) or len(path_tuple) != 2: + raise TypeError( + "Each item in the paths list must be a (path, description) tuple" + ) + + path, description = path_tuple + # Check if this is an optional parameter (currently only "Server trust certificate") is_optional = "Server trust certificate" in description @@ -55,5 +36,6 @@ def validate_paths(paths: List[Tuple[str, str]], logger=None) -> None: if not path: raise ValueError(f"{description} path is required") + if not Path(path).exists(): raise FileNotFoundError(f"{description} not found: {path}") diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py index 1d783db7..27ff6a31 100644 --- a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -4,7 +4,6 @@ import certifi import urllib3 from urllib3 import BaseHTTPResponse -from urllib3.exceptions import HTTPError import CyberSource.logging.log_factory as LogFactory from authenticationsdk.util.GlobalLabelParameters import GlobalLabelParameters @@ -48,8 +47,7 @@ def handle_upload_operation_using_private_key_and_certs( :param server_trust_cert_path: Path to the server trust certificate file :param client_key_password: Password for the client key file :param verify_ssl: Whether to verify SSL certificates (default: True) - :raises urllib3.exceptions.HTTPError: If there's an error during the upload process - :raises CyberSource.rest.ApiException: If the response status is not successful (2xx) + :raises CyberSource.rest.ApiException: If there's an error during the upload process or if the response status is not successful (2xx) :return: A tuple containing (response_data, status_code, headers) """ try: @@ -57,10 +55,6 @@ def handle_upload_operation_using_private_key_and_certs( self.logger.info( "Handling upload operation with private key and certificates" ) - if not verify_ssl and self.logger is not None: - self.logger.warning( - "SSL verification is disabled. This should only be used in development environments." - ) return self.upload_file( encrypted_pgp_bytes, endpoint_url, @@ -71,7 +65,7 @@ def handle_upload_operation_using_private_key_and_certs( cert_password=client_key_password, verify_ssl=verify_ssl, ) - except (HTTPError, ApiException) as e: + except ApiException as e: if self.logger is not None: self.logger.error( f"Error in handle_upload_operation_using_private_key_and_certs: {str(e)}" @@ -100,8 +94,7 @@ def upload_file( :param ca_cert_path: Path to the CA certificate file :param cert_password: Password for the certificate file :param verify_ssl: Whether to verify SSL certificates (default: True) - :raises urllib3.exceptions.HTTPError: If there's an error during the upload process - :raises CyberSource.rest.ApiException: If the response status is not successful (2xx) + :raises CyberSource.rest.ApiException: If there's an error during the upload process or if the response status is not successful (2xx) :return: A tuple containing (response_data, status_code, headers) """ try: @@ -121,10 +114,6 @@ def upload_file( verify_ssl, ) return self._handle_response(response) - except HTTPError as e: - if self.logger is not None: - self.logger.error(f"HTTPError in upload_file: {str(e)}") - raise except ApiException as e: if self.logger is not None: self.logger.error(f"ApiException in upload_file: {str(e)}") @@ -159,9 +148,7 @@ def _send_request( :param cert_password: Password for the certificate file :param verify_ssl: Whether to verify SSL certificates (default: True) :return: HTTP response - :raises urllib3.exceptions.HTTPError: If there's an error during the HTTP request - :raises ssl.SSLError: If there's an SSL-related error - :raises Exception: For any other unexpected errors + :raises CyberSource.rest.ApiException: If there's an error during the HTTP request, including SSL errors or any other unexpected errors """ if self.logger is not None: self.logger.info(f"Sending HTTP POST request to URL: {endpoint_url}") @@ -206,26 +193,45 @@ def _send_request( headers=headers, fields={field_name: (filename, file_data, content_type)}, ) - except HTTPError as e: + except urllib3.exceptions.HTTPError as e: if self.logger is not None: self.logger.error(f"HTTP error in _send_request: {str(e)}") - raise + raise ApiException( + status=None, + reason=f"HTTP error during request: {str(e)}", + ) except ssl.SSLError as e: if self.logger is not None: self.logger.error(f"SSL error in _send_request: {str(e)}") - raise HTTPError(f"SSL error: {str(e)}") + raise ApiException( + status=None, + reason=f"SSL error: {str(e)}", + ) except Exception as e: if self.logger is not None: self.logger.error(f"Unexpected error in _send_request: {str(e)}") - raise HTTPError(f"Unexpected error during HTTP request: {str(e)}") + raise ApiException( + status=None, + reason=f"Unexpected error during HTTP request: {str(e)}", + ) def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: """ - Handle the HTTP response. + Handle the HTTP response from the file upload operation. - :param response: HTTP response - :raises CyberSource.rest.ApiException: If the response status is not successful (2xx) + This method processes the HTTP response, checks the status code, and decodes the response body. + For successful responses (status code 2xx), it returns the decoded response data along with + status code and headers. For unsuccessful responses, it raises an ApiException. + + :param response: HTTP response from the upload request + :raises CyberSource.rest.ApiException: If any of the following conditions occur: + - The response status code is not in the 2xx range (indicating failure) + - The response body cannot be decoded using the specified encoding + - The response body cannot be decoded using UTF-8 as a fallback :return: A tuple containing (response_data, status_code, headers) + - response_data: Decoded response body as a string + - status_code: HTTP status code of the response + - headers: HTTP headers from the response """ status_code = response.status if self.logger is not None: @@ -234,12 +240,62 @@ def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: if 200 <= status_code < 300: if self.logger is not None: self.logger.info("File uploaded successfully") - # Decode the response data to a string - response_data = response.data.decode("utf-8") + + # Determine encoding from Content-Type header or default to UTF-8 + content_type = response.headers.get("Content-Type", "").lower() + encoding = "utf-8" # Default to UTF-8 + if "charset=" in content_type: + encoding = content_type.split("charset=")[-1] + + # Attempt to decode the response data + try: + response_data = response.data.decode(encoding) + except UnicodeDecodeError as e: + if self.logger is not None: + self.logger.error( + f"Decoding error with {encoding} encoding: {str(e)}" + ) + + # Fall back to UTF-8 if the specified encoding fails and it's not already UTF-8 + if encoding.lower() != "utf-8": + try: + response_data = response.data.decode("utf-8") + if self.logger is not None: + self.logger.info( + "Successfully decoded response with UTF-8 fallback" + ) + except Exception as e: + if self.logger is not None: + self.logger.error( + f"UTF-8 fallback decoding error: {str(e)}" + ) + raise ApiException( + http_resp=response, + status=status_code, + reason=f"Failed to decode response data: {str(e)}", + ) + else: + raise ApiException( + http_resp=response, + status=status_code, + reason=f"Failed to decode response data: {str(e)}", + ) + return response_data, status_code, response.headers else: - error_message = f"File upload failed. Status code: {status_code}, body: {response.data.decode('utf-8')}" + try: + error_body = response.data.decode("utf-8") + except Exception as e: + if self.logger is not None: + self.logger.error(f"Error decoding error response: {str(e)}") + error_body = "" + + error_message = ( + f"File upload failed. Status code: {status_code}, body: {error_body}" + ) if self.logger is not None: self.logger.error(error_message) # Throw ApiException instead of HTTPError for consistency with rest.py - raise ApiException(http_resp=response, status=status_code, reason=error_message) + raise ApiException( + http_resp=response, status=status_code, reason=error_message + ) diff --git a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py index 9e48b8a4..06278acd 100644 --- a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py +++ b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py @@ -1,10 +1,3 @@ -""" -PGP Encryption Utility for CyberSource Batch Upload. - -This module provides functionality for PGP encryption operations, -supporting both file-based and in-memory operations with various configuration options. -""" - import os from datetime import datetime @@ -64,8 +57,7 @@ def handle_encrypt_operation( if input_file is None or pgp_public_key is None: raise ValueError("Missing required options for encrypt operation") - validate_path(input_file, "Input file", self.logger) - validate_path(pgp_public_key, "Public key", self.logger) + validate_path([(input_file, "Input file"), (pgp_public_key, "Public key")]) input_file_name = os.path.basename(input_file) public_key = self.load_public_key(pgp_public_key) @@ -217,7 +209,9 @@ def _validate_key_for_encryption(self, key: pgpy.PGPKey) -> None: # Check if the key is a public key (log warning instead of raising exception) if not key.is_public: if self.logger is not None: - self.logger.warning("The provided key is not a public key. This may cause issues with encryption.") + self.logger.warning( + "The provided key is not a public key. This may cause issues with encryption." + ) # Check if key has expired (log warning instead of raising exception) now = datetime.utcnow() @@ -229,7 +223,9 @@ def _validate_key_for_encryption(self, key: pgpy.PGPKey) -> None: expires_at = expires_at.replace(tzinfo=None) if now > expires_at: if self.logger is not None: - self.logger.warning(f"The public key has expired on {expires_at}. This may cause issues with encryption.") + self.logger.warning( + f"The public key has expired on {expires_at}. This may cause issues with encryption." + ) def __enter__(self): """ From 5ed0d02c2b2b205e4f431c14b3344f805f448de1 Mon Sep 17 00:00:00 2001 From: monkumar Date: Mon, 23 Jun 2025 10:55:31 +0530 Subject: [PATCH 10/12] added batch upload in mustache and init --- CyberSource/__init__.py | 1 + generator/cybersource-python-template/__init__package.mustache | 1 + 2 files changed, 2 insertions(+) diff --git a/CyberSource/__init__.py b/CyberSource/__init__.py index 29562b7f..2172822c 100644 --- a/CyberSource/__init__.py +++ b/CyberSource/__init__.py @@ -1469,6 +1469,7 @@ # import api into sdk package from .api.o_auth_api import OAuthApi +from .api.batch_upload_with_mtls_api import BatchUploadWithMTLSApi from .api.batches_api import BatchesApi from .api.bin_lookup_api import BinLookupApi from .api.chargeback_details_api import ChargebackDetailsApi diff --git a/generator/cybersource-python-template/__init__package.mustache b/generator/cybersource-python-template/__init__package.mustache index da1df3de..d8ae72d0 100644 --- a/generator/cybersource-python-template/__init__package.mustache +++ b/generator/cybersource-python-template/__init__package.mustache @@ -14,6 +14,7 @@ from .models.unauthorized_client_error import UnauthorizedClientError {{/model}}{{/models}} # import api into sdk package from .api.o_auth_api import OAuthApi +from .api.batch_upload_with_mtls_api import BatchUploadWithMTLSApi {{#apiInfo}}{{#apis}}from .api.{{classVarName}} import {{classname}} {{/apis}}{{/apiInfo}} From 89cb8c8f2184d6d908145673547cb83f797a802d Mon Sep 17 00:00:00 2001 From: monkumar Date: Tue, 24 Jun 2025 11:43:11 +0530 Subject: [PATCH 11/12] changed exception handelling --- CyberSource/api/batch_upload_with_mtls_api.py | 69 +++++++---- .../pgpBatchUpload/mutual_auth_upload.py | 115 +++++++++--------- .../pgpBatchUpload/pgp_encryption.py | 48 ++++++-- 3 files changed, 137 insertions(+), 95 deletions(-) diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py index b7a7fb2f..d66c8040 100644 --- a/CyberSource/api/batch_upload_with_mtls_api.py +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -113,10 +113,11 @@ def upload_batch_api_with_key_and_certs_file( - headers: The HTTP headers of the response Raises: - ValueError: If required parameters are missing or invalid (e.g., file too large) - FileNotFoundError: If any of the required files don't exist - CyberSource.rest.ApiException: If there's an error during the upload process or if the response status is not successful (2xx) - Exception: For any other unexpected errors + CyberSource.rest.ApiException: If there's an error during the upload process, including: + - Validation errors (status 400): If required parameters are missing or invalid (e.g., file too large) + - File not found errors (status 400): If any of the required files don't exist + - API errors: If the response status is not successful (2xx) + - Unexpected errors (status 500): For any other unexpected errors Notes: This method will log warnings but not raise exceptions if: @@ -136,8 +137,10 @@ def upload_batch_api_with_key_and_certs_file( server_trust_cert_path="path/to/server_ca.crt" ) print(f"Upload successful. Response: {response}") - except Exception as e: + except ApiException as e: print(f"Upload failed: {str(e)}") + print(f"Status code: {e.status}") + print(f"Response body: {e.body}") ``` """ try: @@ -158,20 +161,24 @@ def upload_batch_api_with_key_and_certs_file( # Validate file size (maximum 75 MB) file_size = Path(input_file_path).stat().st_size if file_size > self._max_size_bytes: - error_msg = f"Input file size ({file_size} bytes) exceeds the maximum allowed size of 75 MB ({self._max_size_bytes} bytes)" + error_msg = "Input file size exceeds the maximum allowed size of 75 MB" if self.logger is not None: self.logger.error(error_msg) - raise ValueError(error_msg) + raise ApiException( + status=400, + reason="Validation error: Input file size exceeds the maximum allowed size of 75 MB", + ) # Validate file extension (.csv) file_extension = Path(input_file_path).suffix.lower() if file_extension != ".csv": - error_msg = ( - f"Input file must have a .csv extension, but got '{file_extension}'" - ) + error_msg = "Input file must have a .csv extension" if self.logger is not None: self.logger.error(error_msg) - raise ValueError(error_msg) + raise ApiException( + status=400, + reason="Validation error: Input file must have a .csv extension", + ) # Step 3: PGP encryption encrypted_pgp_bytes = self.pgp_encryption.handle_encrypt_operation( @@ -201,24 +208,35 @@ def upload_batch_api_with_key_and_certs_file( if self.logger is not None: self.logger.info("Batch file uploaded successfully") return response_tuple - except ValueError as e: + except ValueError: if self.logger is not None: - self.logger.error(f"Validation error: {str(e)}") - raise - except FileNotFoundError as e: + self.logger.error( + "Validation error: Input parameters failed validation requirements" + ) + raise ApiException( + status=400, + reason="Validation error: Input parameters failed validation requirements", + ) + except FileNotFoundError: if self.logger is not None: - self.logger.error(f"File not found: {str(e)}") - raise - except ApiException as e: + self.logger.error("File not found: Required file could not be located") + raise ApiException( + status=400, reason="File not found: Required file could not be located" + ) + except ApiException: if self.logger is not None: - self.logger.error(f"API error: {str(e)}") + self.logger.error( + "API error: Batch upload request to CyberSource Batch Upload API failed" + ) raise - except Exception as e: + except Exception: if self.logger is not None: self.logger.error( - f"Error in upload_batch_api_with_key_and_certs_file: {str(e)}" + "Error in upload_batch_api_with_key_and_certs_file: Batch upload operation failed" ) - raise + raise ApiException( + status=500, reason="Unexpected error during batch upload operation" + ) @staticmethod def get_base_url(environment_hostname: str) -> str: @@ -236,12 +254,13 @@ def get_base_url(environment_hostname: str) -> str: str: The base URL with https:// prefix (e.g., "https://apitest.cybersource.com") Raises: - ValueError: If the environment hostname is None, empty, or consists only of whitespace + CyberSource.rest.ApiException: If the environment hostname is None, empty, or consists only of whitespace """ if not environment_hostname: - raise ValueError( - "Environment Host Name for Batch Upload API cannot be null or empty." + raise ApiException( + status=400, + reason="Environment Host Name for Batch Upload API cannot be null or empty.", ) base_url = environment_hostname.strip() diff --git a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py index 27ff6a31..ca0ae9ed 100644 --- a/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py +++ b/CyberSource/utilities/pgpBatchUpload/mutual_auth_upload.py @@ -65,10 +65,10 @@ def handle_upload_operation_using_private_key_and_certs( cert_password=client_key_password, verify_ssl=verify_ssl, ) - except ApiException as e: + except ApiException: if self.logger is not None: self.logger.error( - f"Error in handle_upload_operation_using_private_key_and_certs: {str(e)}" + "Error in handle_upload_operation_using_private_key_and_certs: Failed to upload file using mTLS authentication" ) raise @@ -114,9 +114,11 @@ def upload_file( verify_ssl, ) return self._handle_response(response) - except ApiException as e: + except ApiException: if self.logger is not None: - self.logger.error(f"ApiException in upload_file: {str(e)}") + self.logger.error( + "ApiException in upload_file: File upload operation failed" + ) raise def _create_headers(self) -> dict: @@ -193,26 +195,32 @@ def _send_request( headers=headers, fields={field_name: (filename, file_data, content_type)}, ) - except urllib3.exceptions.HTTPError as e: + except urllib3.exceptions.HTTPError: if self.logger is not None: - self.logger.error(f"HTTP error in _send_request: {str(e)}") + self.logger.error( + "HTTP error in _send_request: Network communication failure during file upload" + ) raise ApiException( status=None, - reason=f"HTTP error during request: {str(e)}", + reason="HTTP communication error occurred during file upload", ) - except ssl.SSLError as e: + except ssl.SSLError: if self.logger is not None: - self.logger.error(f"SSL error in _send_request: {str(e)}") + self.logger.error( + "SSL error in _send_request: Certificate validation or secure connection failure" + ) raise ApiException( status=None, - reason=f"SSL error: {str(e)}", + reason="SSL certificate or connection error occurred", ) - except Exception as e: + except Exception: if self.logger is not None: - self.logger.error(f"Unexpected error in _send_request: {str(e)}") + self.logger.error( + "Unexpected error in _send_request: Failure during HTTP request processing" + ) raise ApiException( status=None, - reason=f"Unexpected error during HTTP request: {str(e)}", + reason="An unexpected error occurred during file upload", ) def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: @@ -241,61 +249,54 @@ def _handle_response(self, response: urllib3.HTTPResponse) -> tuple: if self.logger is not None: self.logger.info("File uploaded successfully") - # Determine encoding from Content-Type header or default to UTF-8 - content_type = response.headers.get("Content-Type", "").lower() - encoding = "utf-8" # Default to UTF-8 - if "charset=" in content_type: - encoding = content_type.split("charset=")[-1] - - # Attempt to decode the response data try: - response_data = response.data.decode(encoding) - except UnicodeDecodeError as e: + # Check the type of response.data + if isinstance(response.data, bytes): + response_data = response.data.decode("utf-8") + else: + # Already a string (Python 2 case) + response_data = response.data + if self.logger is not None: + self.logger.info("Successfully processed response data") + except Exception: if self.logger is not None: self.logger.error( - f"Decoding error with {encoding} encoding: {str(e)}" - ) - - # Fall back to UTF-8 if the specified encoding fails and it's not already UTF-8 - if encoding.lower() != "utf-8": - try: - response_data = response.data.decode("utf-8") - if self.logger is not None: - self.logger.info( - "Successfully decoded response with UTF-8 fallback" - ) - except Exception as e: - if self.logger is not None: - self.logger.error( - f"UTF-8 fallback decoding error: {str(e)}" - ) - raise ApiException( - http_resp=response, - status=status_code, - reason=f"Failed to decode response data: {str(e)}", - ) - else: - raise ApiException( - http_resp=response, - status=status_code, - reason=f"Failed to decode response data: {str(e)}", + "Response data processing error: Unable to decode or process server response" ) + raise ApiException( + http_resp=response, + status=status_code, + reason="Failed to process response data", + ) return response_data, status_code, response.headers else: try: - error_body = response.data.decode("utf-8") - except Exception as e: + if isinstance(response.data, bytes): + error_body = response.data.decode("utf-8") + else: + error_body = response.data + except Exception: if self.logger is not None: - self.logger.error(f"Error decoding error response: {str(e)}") - error_body = "" + self.logger.error( + "Error processing error response: Unable to decode error response from server" + ) + raise ApiException( + http_resp=response, + status=status_code, + reason="Unable to process error response from server", + ) - error_message = ( - f"File upload failed. Status code: {status_code}, body: {error_body}" - ) if self.logger is not None: - self.logger.error(error_message) - # Throw ApiException instead of HTTPError for consistency with rest.py + self.logger.error(f"File upload failed. Status code: {status_code}") + + if 400 <= status_code < 500: + error_message = f"File upload failed due to client error (Status code: {status_code}). Details: {error_body}" + elif 500 <= status_code < 600: + error_message = f"File upload failed due to server error (Status code: {status_code}). Details: {error_body}" + else: + error_message = f"File upload failed (Status code: {status_code}). Details: {error_body}" + raise ApiException( http_resp=response, status=status_code, reason=error_message ) diff --git a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py index 06278acd..0b972508 100644 --- a/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py +++ b/CyberSource/utilities/pgpBatchUpload/pgp_encryption.py @@ -71,26 +71,32 @@ def handle_encrypt_operation( return encrypted_pgp_bytes except ValueError as e: if self.logger is not None: - self.logger.error(f"Value error in handle_encrypt_operation: {str(e)}") + self.logger.error( + "Value error in handle_encrypt_operation: Missing or invalid parameters for encryption" + ) raise except FileNotFoundError as e: if self.logger is not None: self.logger.error( - f"File not found in handle_encrypt_operation: {str(e)}" + "File not found in handle_encrypt_operation: Input file or public key file could not be found" ) raise except pgpy.errors.PGPError as e: if self.logger is not None: - self.logger.error(f"PGP error in handle_encrypt_operation: {str(e)}") + self.logger.error( + "PGP error in handle_encrypt_operation: Failed to perform PGP encryption operation" + ) raise except IOError as e: if self.logger is not None: - self.logger.error(f"I/O error in handle_encrypt_operation: {str(e)}") + self.logger.error( + "I/O error in handle_encrypt_operation: Failed to read input file or write output" + ) raise except Exception as e: if self.logger is not None: self.logger.error( - f"Unexpected error in handle_encrypt_operation: {str(e)}" + "Unexpected error in handle_encrypt_operation: Failure during encryption process" ) raise @@ -119,15 +125,21 @@ def encrypt_file( return self.encrypt_data(file_data, public_key) except IOError as e: if self.logger is not None: - self.logger.error(f"I/O error in encrypt_file: {str(e)}") + self.logger.error( + "I/O error in encrypt_file: Failed to read input file for encryption" + ) raise except pgpy.errors.PGPError as e: if self.logger is not None: - self.logger.error(f"PGP error in encrypt_file: {str(e)}") + self.logger.error( + "PGP error in encrypt_file: Failed to encrypt file with PGP" + ) raise except Exception as e: if self.logger is not None: - self.logger.error(f"Unexpected error in encrypt_file: {str(e)}") + self.logger.error( + "Unexpected error in encrypt_file: Failure during file encryption" + ) raise def encrypt_data( @@ -158,11 +170,15 @@ def encrypt_data( return bytes(encrypted_message) except pgpy.errors.PGPError as e: if self.logger is not None: - self.logger.error(f"PGP error in encrypt_data: {str(e)}") + self.logger.error( + "PGP error in encrypt_data: Failed to encrypt data with PGP" + ) raise except Exception as e: if self.logger is not None: - self.logger.error(f"Unexpected error in encrypt_data: {str(e)}") + self.logger.error( + "Unexpected error in encrypt_data: Failure during data encryption" + ) raise def load_public_key(self, key_path: str) -> pgpy.PGPKey: @@ -188,15 +204,21 @@ def load_public_key(self, key_path: str) -> pgpy.PGPKey: return key except IOError as e: if self.logger is not None: - self.logger.error(f"I/O error loading public key: {str(e)}") + self.logger.error( + "I/O error loading public key: Failed to read public key file" + ) raise except pgpy.errors.PGPError as e: if self.logger is not None: - self.logger.error(f"PGP error loading public key: {str(e)}") + self.logger.error( + "PGP error loading public key: Invalid or corrupted PGP key format" + ) raise except Exception as e: if self.logger is not None: - self.logger.error(f"Unexpected error loading public key: {str(e)}") + self.logger.error( + "Unexpected error loading public key: Failure during key loading" + ) raise def _validate_key_for_encryption(self, key: pgpy.PGPKey) -> None: From f82cc8568155f6ba5367c0cd5d19c6d278f971c6 Mon Sep 17 00:00:00 2001 From: monkumar Date: Tue, 24 Jun 2025 13:06:09 +0530 Subject: [PATCH 12/12] minor fix --- CyberSource/api/batch_upload_with_mtls_api.py | 2 +- setup.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CyberSource/api/batch_upload_with_mtls_api.py b/CyberSource/api/batch_upload_with_mtls_api.py index d66c8040..6c96f265 100644 --- a/CyberSource/api/batch_upload_with_mtls_api.py +++ b/CyberSource/api/batch_upload_with_mtls_api.py @@ -93,7 +93,7 @@ def upload_batch_api_with_key_and_certs_file( Args: input_file_path: Path to the CSV file to upload (must exist and be under 75MB) - environment_hostname: CyberSource environment hostname (e.g., "apitest.cybersource.com") + environment_hostname: CyberSource environment hostname (e.g., "secure-batch-test.cybersource.com") pgp_encryption_public_key_path: Path to the PGP public key file for encryption client_cert_path: Path to the client certificate file (.pem or .crt) client_key_path: Path to the client private key file (.key) diff --git a/setup.py b/setup.py index b5ff7dee..0e563e4c 100644 --- a/setup.py +++ b/setup.py @@ -28,7 +28,8 @@ "six", "urllib3", "jwcrypto", - "cryptography" + "cryptography", + "pgpy" ], packages=find_packages(), include_package_data=True,