Skip to content

Allow AWS::NoValue to omit Role property in if - FeatureRequest#3728 #3736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions samtranslator/intrinsics/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,34 @@ def resolve_resource_id_refs(

return {self.intrinsic_name: resolved_value}

def resolve_ref_value(self, input_dict: Optional[Any]) -> Optional[Any]:
"""
Handles the AWS::NoValue special case for Ref intrinsic function.
All other Ref values are returned as-is for resolution by CloudFormation at runtime.

:param input_dict: Dictionary containing a Ref intrinsic function.
Must be in the format: {"Ref": value} where value is a string.
Examples:
{"Ref": "AWS::NoValue"} -> Returns None
{"Ref": "MyParameter"} -> Returns {"Ref": "MyParameter"} unchanged
{"Ref": {"Some": "Dict"}} -> Returns input unchanged (invalid)
None -> Returns None

:return: Optional[Any]
"""
if input_dict is None or not self.can_handle(input_dict):
return input_dict

dict_value = input_dict[self.intrinsic_name]

if not isinstance(dict_value, str):
return input_dict

if dict_value == "AWS::NoValue":
return None

return input_dict


class SubAction(Action):
intrinsic_name = "Fn::Sub"
Expand Down
123 changes: 123 additions & 0 deletions samtranslator/intrinsics/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,126 @@ def _is_intrinsic_dict(self, _input: Dict[str, Any]) -> bool:
"""
# All intrinsic functions are dictionaries with just one key
return isinstance(_input, dict) and len(_input) == 1 and next(iter(_input.keys())) in self.supported_intrinsics

def evaluate_condition(self, condition: Any, conditions: Dict[str, bool]) -> bool:
"""
Evaluates a condition using the appropriate Action class.

:param condition: The condition to evaluate
:param conditions: Dictionary of condition names and their values
:return: The evaluated boolean result
"""
# Handle direct boolean values
if isinstance(condition, bool):
return condition

# Handle string values
if isinstance(condition, str):
if condition.lower() == "true":
return True
if condition.lower() == "false":
return False
if condition in conditions:
condition_value = conditions[condition]
return self.evaluate_condition(condition_value, conditions)
raise InvalidDocumentException(
[InvalidTemplateException(f"Condition '{condition}' not found in conditions")]
)

# Handle dictionary-based conditions (intrinsic functions)
if isinstance(condition, dict):
if len(condition) != 1:
raise InvalidDocumentException([InvalidTemplateException(f"Invalid condition structure {condition}")])

function_type = next(iter(condition.keys()))
value = condition[function_type]

return self._evaluate_function_type(function_type, value, conditions)

raise InvalidDocumentException([InvalidTemplateException(f"Invalid condition value {condition}")])

def _evaluate_function_type(self, function_type: str, value: Any, conditions: Dict[str, bool]) -> bool:
"""Helper method to evaluate different function types."""
if function_type == "Fn::Equals":
return self._evaluate_equals(value)
if function_type == "Fn::And":
if not isinstance(value, list):
raise InvalidDocumentException(
[InvalidTemplateException(f"Invalid Fn::And value {value}. Fn::And requires a list of conditions")]
)
return all(self.evaluate_condition(cond, conditions) for cond in value)
if function_type == "Fn::Or":
if not isinstance(value, list):
raise InvalidDocumentException(
[InvalidTemplateException(f"Invalid Fn::Or value {value}. Fn::Or requires a list of conditions")]
)
return any(self.evaluate_condition(cond, conditions) for cond in value)
if function_type == "Fn::Not":
if not isinstance(value, list) or len(value) != 1:
raise InvalidDocumentException(
[
InvalidTemplateException(
f"Invalid Fn::Not value {value}. Fn::Not requires a list with exactly 1 value"
)
]
)
return not self.evaluate_condition(value[0], conditions)

raise InvalidDocumentException([InvalidTemplateException(f"Invalid condition function {function_type}")])

def _evaluate_equals(self, equals_values: List[Any]) -> bool:
"""
Evaluates Fn::Equals condition by resolving and comparing its operands.

:param equals_values: List containing the two values to compare
:return: Boolean result of the comparison
"""
_NUM_ARGUMENTS = 2
if not isinstance(equals_values, list) or len(equals_values) != _NUM_ARGUMENTS:
raise InvalidDocumentException(
[
InvalidTemplateException(
f"Invalid Fn::Equals value {equals_values}. Fn::Equals requires a list with exactly 2 values"
)
]
)

left_value, right_value = equals_values

# Resolve left and right values based on theirs types
resolved_left = self._resolve_equals_operand(left_value)
resolved_right = self._resolve_equals_operand(right_value)

return bool(resolved_left == resolved_right)

def _resolve_equals_operand(self, value: Any) -> Any:
"""
Resolves an operand in an Fn::Equals condition based on its type.
Specifically handles Ref.

:param value: The value to resolve
:return: The resolved value after processing Ref function
"""
if isinstance(value, dict) and "Ref" in value:
return self.supported_intrinsics["Ref"].resolve_parameter_refs(value, self.parameters)

# For string literals (like "" or AWS::NoValue) or other direct values, return as is
return value

def resolve_ref_value(self, _input: Any) -> Any:
"""
Resolve Ref intrinsic function in the given input object.

:param _input: Input object to resolve. Expected to be a dictionary with "Ref" key
if it contains a reference, e.g., {"Ref": "MyResource"}

:return: The resolved value if input is a Ref intrinsic function.
Returns original input if Input is an intrinsic function other than "Ref"
"""
if not self._is_intrinsic_dict(_input):
return _input

function_type = next(iter(_input.keys()))
if function_type != "Ref":
return _input
return self.supported_intrinsics[function_type].resolve_ref_value(_input)
106 changes: 97 additions & 9 deletions samtranslator/model/sam_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,16 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P
managed_policy_map = kwargs.get("managed_policy_map", {})
get_managed_policy_map = kwargs.get("get_managed_policy_map")

execution_role = None
if lambda_function.Role is None:
execution_role = self._construct_role(
managed_policy_map,
event_invoke_policies,
intrinsics_resolver,
get_managed_policy_map,
)
lambda_function.Role = execution_role.get_runtime_attr("arn")
policy_configs = {
"managed_policy_map": managed_policy_map,
"event_invoke_policies": event_invoke_policies,
"get_managed_policy_map": get_managed_policy_map,
}

execution_role = self._handle_role_configuration(
lambda_function, conditions, intrinsics_resolver, policy_configs
)
if execution_role:
resources.append(execution_role)

try:
Expand All @@ -371,6 +372,93 @@ def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: P

return resources

def _handle_role_configuration(
self,
lambda_function: LambdaFunction,
conditions: Dict[str, Any],
intrinsics_resolver: IntrinsicsResolver,
policy_configs: Dict[str, Any],
) -> Optional[IAMRole]:
"""Handle the configuration of the Lambda function's execution role.

:param lambda_function: The lambda function resource
:param conditions: Conditions from the template
:param intrinsics_resolver: Resolver for intrinsic functions
:param policy_configs: Dictionary containing policy-related configurations including:
- managed_policy_map
- event_invoke_policies
- get_managed_policy_map
:returns: IAMRole if a new role needs to be created, None otherwise
:rtype: Optional[IAMRole]
"""
should_create_role = lambda_function.Role is None or not self._resolve_property_value(
lambda_function.Role, conditions, intrinsics_resolver
)

if should_create_role:
execution_role = self._construct_role(
policy_configs["managed_policy_map"],
policy_configs["event_invoke_policies"],
intrinsics_resolver,
policy_configs["get_managed_policy_map"],
)
lambda_function.Role = execution_role.get_runtime_attr("arn")
return execution_role

# Role exists and is resolved
lambda_function.Role = self._resolve_property_value(lambda_function.Role, conditions, intrinsics_resolver)
return None

def _resolve_property_value(
self, property_value: Any, conditions: Dict[str, Any], intrinsics_resolver: IntrinsicsResolver
) -> Any:
"""Recursively resolve property value that could contain nested supported intrinsic functions.

:param property_value: The property value to resolve, can be None, a primitive value, or a dict
containing intrinsic functions
:param conditions: Dictionary of condition names and their evaluated boolean values
:param intrinsics_resolver: Resolver instance that handles intrinsic function resolution
:returns: Resolved property value after evaluating all nested intrinsic functions
:rtype: Any
"""
if property_value is None:
return None

if not isinstance(property_value, dict):
return property_value

if is_intrinsic_if(property_value):
return self._resolve_if_condition(property_value, conditions, intrinsics_resolver)

if is_intrinsic(property_value) and "Ref" in property_value:
return self._resolve_ref_value(property_value, intrinsics_resolver)

return property_value

def _resolve_if_condition(
self, if_value: Dict[str, Any], conditions: Dict[str, Any], intrinsics_resolver: IntrinsicsResolver
) -> Any:
"""Handle Fn::If resolution separately to reduce branch count."""
condition_name = if_value["Fn::If"][0]
true_value = if_value["Fn::If"][1]
false_value = if_value["Fn::If"][2]

if intrinsics_resolver.evaluate_condition(condition_name, conditions):
return self._resolve_property_value(true_value, conditions, intrinsics_resolver)
return self._resolve_property_value(false_value, conditions, intrinsics_resolver)

def _resolve_ref_value(self, property_value: Dict[str, Any], intrinsics_resolver: IntrinsicsResolver) -> Any:
"""Handle Ref resolution.

:param ref_value: Dictionary containing Ref
:param intrinsics_resolver: Resolver instance that handles intrinsic function resolution
:returns: Resolved value after evaluating the Ref
:rtype: Any
"""
if property_value["Ref"] in intrinsics_resolver.parameters:
return intrinsics_resolver.resolve_parameter_refs(property_value)
return intrinsics_resolver.resolve_ref_value(property_value)

def _construct_event_invoke_config( # noqa: PLR0913
self,
function_name: str,
Expand Down
5 changes: 5 additions & 0 deletions tests/intrinsics/test_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ def test_unknown_ref(self):
ref = RefAction()
self.assertEqual(expected, ref.resolve_parameter_refs(input, parameters))

def test_resolve_ref_value(self):
self.ref = RefAction()
input_aws_no_value = {"Ref": "AWS::NoValue"}
self.assertEqual(None, self.ref.resolve_ref_value(input_aws_no_value))

def test_must_ignore_invalid_value(self):
parameters = {"key": "value"}
input = {"Ref": ["invalid value"]}
Expand Down
62 changes: 62 additions & 0 deletions tests/intrinsics/test_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,55 @@ def test_short_circuit_on_empty_parameters(self):
self.assertEqual(resolver.resolve_parameter_refs(input), expected)
resolver._try_resolve_parameter_refs.assert_not_called()

def test_evaluate_condition_scenarios(self):
resolver = IntrinsicsResolver({})
conditions = {"Condition1": True, "Condition2": False, "NestedCondition": "Condition1"}

# Test direct boolean values
self.assertTrue(resolver.evaluate_condition(True, conditions))
self.assertFalse(resolver.evaluate_condition(False, conditions))

# Test string literals
self.assertTrue(resolver.evaluate_condition("true", conditions))
self.assertFalse(resolver.evaluate_condition("false", conditions))

# Test condition lookups
self.assertTrue(resolver.evaluate_condition("Condition1", conditions))
self.assertFalse(resolver.evaluate_condition("Condition2", conditions))
self.assertTrue(resolver.evaluate_condition("NestedCondition", conditions))

def test_evaluate_condition_intrinsic_functions(self):
resolver = IntrinsicsResolver({})
conditions = {"Condition1": True, "Condition2": False}

# Test Fn::Equals
self.assertTrue(resolver.evaluate_condition({"Fn::Equals": ["value1", "value1"]}, conditions))

# Test Fn::And
self.assertFalse(resolver.evaluate_condition({"Fn::And": ["Condition1", "Condition2"]}, conditions))

# Test Fn::Or
self.assertTrue(resolver.evaluate_condition({"Fn::Or": ["Condition1", "Condition2"]}, conditions))

# Test Fn::Not
self.assertFalse(resolver.evaluate_condition({"Fn::Not": ["Condition1"]}, conditions))

def test_evaluate_condition_error_cases(self):
resolver = IntrinsicsResolver({})
conditions = {}

# Test invalid condition name
with self.assertRaises(InvalidDocumentException):
resolver.evaluate_condition("NonExistentCondition", conditions)

# Test invalid function
with self.assertRaises(InvalidDocumentException):
resolver.evaluate_condition({"InvalidFunction": []}, conditions)

# Test invalid condition structure
with self.assertRaises(InvalidDocumentException):
resolver.evaluate_condition({"key1": "value1", "key2": "value2"}, conditions)


class TestResourceReferenceResolution(TestCase):
def setUp(self):
Expand Down Expand Up @@ -162,6 +211,19 @@ def test_short_circuit_on_empty_parameters(self):
self.assertEqual(resolver.resolve_sam_resource_refs(input, {}), expected)
resolver._try_resolve_sam_resource_refs.assert_not_called()

def test_resolve_ref_value(self):
# Test with non-intrinsic input
non_intrinsic = {"key": "value"}
self.assertEqual(non_intrinsic, self.resolver.resolve_ref_value(non_intrinsic))

# Test with non-Ref intrinsic
non_ref = {"Fn::Sub": "value"}
self.assertEqual(non_ref, self.resolver.resolve_ref_value(non_ref))

# Test with AWS::NoValue
ref_input = {"Ref": "AWS::NoValue"}
self.assertEqual(None, self.resolver.resolve_ref_value(ref_input))


class TestSupportedIntrinsics(TestCase):
def test_by_default_all_intrinsics_must_be_supported(self):
Expand Down
Loading