-
-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
628 additions
and
458 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
"""WDL Expressions (literal values, arithmetic, comparison, conditional, string interpolation, array, map, and functions).""" | ||
|
||
from typing import Union, Any, cast | ||
|
||
import WDL | ||
from wdl2cwl.errors import WDLSourceLine, ConversionException | ||
from wdl2cwl.util import get_input, ConversionContext | ||
|
||
|
||
def get_literal_name( | ||
expr: Union[WDL.Expr.Boolean, WDL.Expr.Int, WDL.Expr.Float, WDL.Expr.Array] | ||
) -> str: | ||
"""Translate WDL Boolean, Int, Float, or Array Expression.""" | ||
# if the literal expr is used inside WDL.Expr.Apply | ||
# the literal value is what's needed | ||
parent = expr.parent # type: ignore[union-attr] | ||
if isinstance(parent, (WDL.Expr.Apply, WDL.Expr.IfThenElse)): | ||
return expr.literal.value # type: ignore | ||
raise WDLSourceLine(expr, ConversionException).makeError( | ||
f"The parent expression for {expr} is not WDL.Expr.Apply, but {parent}." | ||
) | ||
|
||
|
||
def get_expr_ifthenelse( | ||
wdl_ifthenelse: WDL.Expr.IfThenElse, ctx: ConversionContext | ||
) -> str: | ||
"""Translate WDL IfThenElse Expressions.""" | ||
condition = get_expr(wdl_ifthenelse.condition, ctx) | ||
if_true = get_expr(wdl_ifthenelse.consequent, ctx) | ||
if_false = get_expr(wdl_ifthenelse.alternative, ctx) | ||
return f"{condition} ? {if_true} : {if_false}" | ||
|
||
|
||
def translate_wdl_placeholder( | ||
wdl_placeholder: WDL.Expr.Placeholder, ctx: ConversionContext | ||
) -> str: | ||
"""Translate WDL Expr Placeholder to a valid CWL command string.""" | ||
cwl_command_str = "" | ||
expr = wdl_placeholder.expr | ||
if expr is None: | ||
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( | ||
f"Placeholder '{wdl_placeholder}' has no expr." | ||
) | ||
placeholder_expr = get_expr(expr, ctx) | ||
options = wdl_placeholder.options | ||
if options: | ||
if "true" in options: | ||
true_value = options["true"] | ||
false_value = options["false"] | ||
true_str = f'"{true_value}"' if '"' not in true_value else f"'{true_value}'" | ||
false_str = ( | ||
f'"{false_value}"' if '"' not in false_value else f"'{false_value}'" | ||
) | ||
is_optional = False | ||
if isinstance(expr, WDL.Expr.Get): | ||
is_optional = expr.type.optional | ||
elif isinstance(expr, WDL.Expr.Apply): | ||
is_optional = expr.arguments[0].type.optional | ||
if not is_optional: | ||
cwl_command_str = f"$({placeholder_expr} ? {true_str} : {false_str})" | ||
else: | ||
cwl_command_str = ( | ||
f"$({placeholder_expr} === null ? {false_str} : {true_str})" | ||
) | ||
elif "sep" in options: | ||
seperator = options["sep"] | ||
if isinstance(expr.type, WDL.Type.Array): | ||
item_type: WDL.Expr.Base = expr.type.item_type # type: ignore | ||
if isinstance(item_type, WDL.Type.String): | ||
cwl_command_str = f'$({placeholder_expr}.join("{seperator}"))' | ||
elif isinstance(item_type, WDL.Type.File): | ||
cwl_command_str = ( | ||
f"$({placeholder_expr}.map(" | ||
+ 'function(el) {return el.path}).join("' | ||
+ seperator | ||
+ '"))' | ||
) | ||
else: | ||
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( | ||
f"{wdl_placeholder} with separator and item type {item_type} is not yet handled" | ||
) | ||
else: | ||
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( | ||
f"{wdl_placeholder} with expr of type {expr.type} is not yet handled" | ||
) | ||
else: | ||
raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( | ||
f"Placeholders with options {options} are not yet handled." | ||
) | ||
else: | ||
# for the one case where the $(input.some_input_name) is used within the placeholder_expr | ||
# we return the placeholder_expr without enclosing in another $() | ||
cwl_command_str = ( | ||
f"$({placeholder_expr})" | ||
if placeholder_expr[-1] != ")" | ||
else placeholder_expr | ||
) | ||
# sometimes placeholders are used inside WDL.Expr.String. | ||
# with the parent and grand_parent we can confirm that we are in | ||
# the command string (WDL.Expr.String) and task (WDL.Tree.Task) respectively | ||
parent = wdl_placeholder.parent # type: ignore | ||
grand_parent = parent.parent | ||
return ( | ||
cwl_command_str | ||
if isinstance(parent, WDL.Expr.String) | ||
and isinstance(grand_parent, WDL.Tree.Task) | ||
else cwl_command_str[2:-1] | ||
) | ||
|
||
|
||
def get_expr_string(wdl_expr_string: WDL.Expr.String, ctx: ConversionContext) -> str: | ||
"""Translate WDL String Expressions.""" | ||
if wdl_expr_string.literal is not None: | ||
return f'"{wdl_expr_string.literal.value}"' | ||
string = "" | ||
parts = wdl_expr_string.parts | ||
for index, part in enumerate(parts[1:-1], start=1): | ||
if isinstance( | ||
part, | ||
(WDL.Expr.Placeholder, WDL.Expr.Apply, WDL.Expr.Get, WDL.Expr.Ident), | ||
): | ||
placeholder = get_expr(part, ctx) | ||
part = ( | ||
"" if parts[index - 1] == '"' or parts[index - 1] == "'" else "' + " # type: ignore | ||
) | ||
part += placeholder | ||
part += ( | ||
"" if parts[index + 1] == '"' or parts[index + 1] == "'" else " + '" # type: ignore | ||
) | ||
string += part | ||
# condition to determine if the opening and closing quotes should be added to string | ||
# for cases where a placeholder begins or ends a WDL.Expr.String | ||
if type(parts[1]) == str: | ||
string = "'" + string | ||
if type(parts[-2]) == str: | ||
string = string + "'" | ||
return string | ||
|
||
|
||
def get_expr_name(wdl_expr: WDL.Expr.Ident) -> str: | ||
"""Extract name from WDL expr.""" | ||
if not hasattr(wdl_expr, "name"): | ||
raise WDLSourceLine(wdl_expr, ConversionException).makeError( | ||
f"{type(wdl_expr)} has not attribute 'name'" | ||
) | ||
return get_input(wdl_expr.name) | ||
|
||
|
||
def get_expr(wdl_expr: Any, ctx: ConversionContext) -> str: | ||
"""Translate WDL Expressions.""" | ||
if isinstance(wdl_expr, WDL.Expr.Apply): | ||
return get_expr_apply(wdl_expr, ctx) | ||
elif isinstance(wdl_expr, WDL.Expr.Get): | ||
return get_expr_get(wdl_expr, ctx) | ||
elif isinstance(wdl_expr, WDL.Expr.IfThenElse): | ||
return get_expr_ifthenelse(wdl_expr, ctx) | ||
elif isinstance(wdl_expr, WDL.Expr.Placeholder): | ||
return translate_wdl_placeholder(wdl_expr, ctx) | ||
elif isinstance(wdl_expr, WDL.Expr.String): | ||
return get_expr_string(wdl_expr, ctx) | ||
elif isinstance(wdl_expr, WDL.Tree.Decl): | ||
return get_expr(wdl_expr.expr, ctx) | ||
elif isinstance( | ||
wdl_expr, | ||
( | ||
WDL.Expr.Boolean, | ||
WDL.Expr.Int, | ||
WDL.Expr.Float, | ||
WDL.Expr.Array, | ||
), | ||
): | ||
return get_literal_name(wdl_expr) | ||
else: | ||
raise WDLSourceLine(wdl_expr, ConversionException).makeError( | ||
f"The expression '{wdl_expr}' is not handled yet." | ||
) | ||
|
||
|
||
def get_expr_apply(wdl_apply_expr: WDL.Expr.Apply, ctx: ConversionContext) -> str: | ||
"""Translate WDL Apply Expressions.""" | ||
# N.B: This import here avoids circular dependency error when loading the modules. | ||
from wdl2cwl import functions | ||
|
||
function_name = wdl_apply_expr.function_name | ||
arguments = wdl_apply_expr.arguments | ||
if not arguments: | ||
raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError( | ||
f"The '{wdl_apply_expr}' expression has no arguments." | ||
) | ||
treat_as_optional = wdl_apply_expr.type.optional | ||
|
||
# Call the function if we have it in our wdl2cwl.functions module | ||
if hasattr(functions, function_name): | ||
return cast( | ||
str, | ||
getattr(functions, function_name)( | ||
arguments, | ||
ctx, | ||
{ | ||
"treat_as_optional": treat_as_optional, | ||
"wdl_apply_expr": wdl_apply_expr, | ||
}, | ||
), | ||
) | ||
|
||
raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError( | ||
f"Function name '{function_name}' not yet handled." | ||
) | ||
|
||
|
||
def get_expr_get(wdl_get_expr: WDL.Expr.Get, ctx: ConversionContext) -> str: | ||
"""Translate WDL Get Expressions.""" | ||
member = wdl_get_expr.member | ||
if not member: | ||
return get_expr_ident(wdl_get_expr.expr, ctx) # type: ignore | ||
struct_name = get_expr(wdl_get_expr.expr, ctx) | ||
member_str = f"{struct_name}.{member}" | ||
return ( | ||
member_str | ||
if not isinstance(wdl_get_expr.type, WDL.Type.File) | ||
else f"{member_str}.path" | ||
) | ||
|
||
|
||
def get_expr_ident(wdl_ident_expr: WDL.Expr.Ident, ctx: ConversionContext) -> str: | ||
"""Translate WDL Ident Expressions.""" | ||
id_name = wdl_ident_expr.name | ||
ident_name = get_input(id_name) | ||
referee: Any = wdl_ident_expr.referee | ||
optional = wdl_ident_expr.type.optional | ||
if referee: | ||
with WDLSourceLine(referee, ConversionException): | ||
if isinstance(referee, WDL.Tree.Call): | ||
return id_name | ||
if referee.expr and ( | ||
wdl_ident_expr.name in ctx.optional_cwl_null | ||
or wdl_ident_expr.name not in ctx.non_static_values | ||
): | ||
return get_expr(referee.expr, ctx) | ||
if optional and isinstance(wdl_ident_expr.type, WDL.Type.File): | ||
# To prevent null showing on the terminal for inputs of type File | ||
name_with_file_check = get_expr_name_with_is_file_check(wdl_ident_expr) | ||
return f'{ident_name} === null ? "" : {name_with_file_check}' | ||
return ( | ||
ident_name | ||
if not isinstance(wdl_ident_expr.type, WDL.Type.File) | ||
else f"{ident_name}.path" | ||
) | ||
|
||
|
||
def get_expr_name_with_is_file_check(wdl_expr: WDL.Expr.Ident) -> str: | ||
"""Extract name from WDL expr and check if it's a file path.""" | ||
if wdl_expr is None or not hasattr(wdl_expr, "name"): | ||
raise WDLSourceLine(wdl_expr, ConversionException).makeError( | ||
f"{type(wdl_expr)} has not attribute 'name'" | ||
) | ||
expr_name = get_input(wdl_expr.name) | ||
is_file = isinstance(wdl_expr.type, WDL.Type.File) | ||
return expr_name if not is_file else f"{expr_name}.path" | ||
|
||
|
||
__all__ = ["get_expr", "get_expr_string", "translate_wdl_placeholder"] |
Oops, something went wrong.