Skip to content

Commit 319c5a4

Browse files
committed
[FSTORE-1630] Model Dependent Transformation Functions creates feature names that are longer than 64 character causing logging feature group ingestion to fail (#428)
* adding alias function to modify output column names * adding unit tests * fixing generation of output column names when udf is shared between transformation functions * updating documentation * converting error caused due to long column names to a warning and trying to solve the problem by manually slicing the column names * adding tests for creation of stripped column names * correcting documentation mistake and making test self contained
1 parent 47e25d8 commit 319c5a4

8 files changed

+696
-60
lines changed

docs/templates/api/hopsworks_udf.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
{{hopsworks_udf_properties}}
88

9+
## Methods
10+
11+
{{hopsworks_udf_methods}}
12+
913
## TransformationFeature
1014

11-
{{transformation_feature}}
15+
{{transformation_feature}}

python/auto_doc.py

+9
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,15 @@
451451
"hopsworks_udf_properties": keras_autodoc.get_properties(
452452
"hsfs.hopsworks_udf.HopsworksUdf"
453453
),
454+
"hopsworks_udf_methods": keras_autodoc.get_methods(
455+
"hsfs.hopsworks_udf.HopsworksUdf",
456+
exclude=[
457+
"update_return_type_one_hot",
458+
"python_udf_wrapper",
459+
"pandas_udf_wrapper",
460+
"get_udf",
461+
],
462+
),
454463
"transformation_feature": ["hsfs.hopsworks_udf.TransformationFeature"],
455464
},
456465
"api/transformation_statistics.md": {

python/hopsworks_common/constants.py

+8
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ class OPENSEARCH_CONFIG:
7070
CA_CERTS = "ca_certs"
7171

7272

73+
class FEATURES:
74+
"""
75+
Class that stores constants about a feature.
76+
"""
77+
78+
MAX_LENGTH_NAME = 63
79+
80+
7381
class KAFKA_SSL_CONFIG:
7482
"""
7583
Kafka SSL constant strings for configuration

python/hsfs/core/feature_monitoring_config.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import humps
2424
from hopsworks_common.client.exceptions import FeatureStoreException
25+
from hopsworks_common.constants import FEATURES
2526
from hsfs import util
2627
from hsfs.core import (
2728
feature_monitoring_config_engine,
@@ -34,7 +35,6 @@
3435
from hsfs.core.job_schedule import JobSchedule
3536

3637

37-
MAX_LENGTH_NAME = 63
3838
MAX_LENGTH_DESCRIPTION = 2000
3939

4040

@@ -686,8 +686,10 @@ def name(self, name: str):
686686
raise AttributeError("The name of a registered config is read-only.")
687687
elif not isinstance(name, str):
688688
raise TypeError("name must be of type str")
689-
if len(name) > MAX_LENGTH_NAME:
690-
raise ValueError("name must be less than {MAX_LENGTH_NAME} characters.")
689+
if len(name) > FEATURES.MAX_LENGTH_NAME:
690+
raise ValueError(
691+
"name must be less than {FEATURES.MAX_LENGTH_NAME} characters."
692+
)
691693
self._name = name
692694

693695
@property

python/hsfs/hopsworks_udf.py

+72-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import humps
2929
from hopsworks_common.client.exceptions import FeatureStoreException
30+
from hopsworks_common.constants import FEATURES
3031
from hsfs import engine, util
3132
from hsfs.core.feature_descriptive_statistics import FeatureDescriptiveStatistics
3233
from hsfs.decorators import typechecked
@@ -146,7 +147,9 @@ class HopsworksUdf:
146147
transformation_function_argument_names : `Optional[List[TransformationFeature]]`. The argument names of the transformation function.
147148
dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied.
148149
dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped
149-
feature_name_prefix: `Optional[str]` = None. Prefixes if any used in the feature view.
150+
feature_name_prefix: `Optional[str]`. Prefixes if any used in the feature view.
151+
output_column_names: `Optional[List[str]]`. The names of the output columns returned from the transformation function.
152+
generate_output_col_names: `bool`. Generate default output column names for the transformation function. Default's to True.
150153
"""
151154

152155
# Mapping for converting python types to spark types - required for creating pandas UDF's.
@@ -173,6 +176,8 @@ def __init__(
173176
dropped_argument_names: Optional[List[str]] = None,
174177
dropped_feature_names: Optional[List[str]] = None,
175178
feature_name_prefix: Optional[str] = None,
179+
output_column_names: Optional[str] = None,
180+
generate_output_col_names: bool = True,
176181
):
177182
self._return_types: List[str] = HopsworksUdf._validate_and_convert_output_types(
178183
return_types
@@ -191,6 +196,12 @@ def __init__(
191196
if isinstance(func, Callable)
192197
else func
193198
)
199+
200+
# The parameter `output_column_names` is initialized lazily.
201+
# It is only initialized if the output column names are retrieved from the backend or explicitly specified using the `alias` function or is initialized with default column names if the UDF is accessed from a transformation function.
202+
# Output column names are only stored in the backend when a model dependent or on demand transformation function is created using the defined UDF.
203+
self._output_column_names: List[str] = []
204+
194205
if not transformation_features:
195206
# New transformation function being declared so extract source code from function
196207
self._transformation_features: List[TransformationFeature] = (
@@ -211,6 +222,7 @@ def __init__(
211222
)
212223
)
213224
self._dropped_features = self._dropped_argument_names
225+
214226
else:
215227
self._transformation_features = transformation_features
216228
self._transformation_function_argument_names = (
@@ -222,14 +234,19 @@ def __init__(
222234
if dropped_feature_names
223235
else dropped_argument_names
224236
)
237+
self._output_column_names = (
238+
output_column_names if output_column_names else []
239+
)
225240

226241
self._formatted_function_source, self._module_imports = (
227242
HopsworksUdf._format_source_code(self._function_source)
228243
)
229244

230245
self._statistics: Optional[TransformationStatistics] = None
231246

232-
self._output_column_names: List[str] = []
247+
# Denote if the output feature names have to be generated.
248+
# Set to `False` if the output column names are saved in the backend and the udf is constructed from it using `from_response_json` function or if user has specified the output feature names using the `alias`` function.
249+
self._generate_output_col_name: bool = generate_output_col_names
233250

234251
@staticmethod
235252
def _validate_and_convert_drop_features(
@@ -691,6 +708,47 @@ def __call__(self, *features: List[str]) -> "HopsworksUdf":
691708
udf.dropped_features = updated_dropped_features
692709
return udf
693710

711+
def alias(self, *args: str):
712+
"""
713+
Set the names of the transformed features output by the UDF.
714+
"""
715+
if len(args) == 1 and isinstance(args[0], list):
716+
# If a single list is passed, use it directly
717+
output_col_names = args[0]
718+
else:
719+
# Otherwise, use the individual arguments as a list
720+
output_col_names = list(args)
721+
if any(
722+
not isinstance(output_col_name, str) for output_col_name in output_col_names
723+
):
724+
raise FeatureStoreException(
725+
f"Invalid output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments are strings."
726+
)
727+
728+
self._generate_output_col_name = False
729+
self.output_column_names = output_col_names
730+
731+
return self
732+
733+
def _validate_output_col_name(self, output_col_names):
734+
if any(
735+
len(output_col_name) > FEATURES.MAX_LENGTH_NAME
736+
for output_col_name in output_col_names
737+
):
738+
raise FeatureStoreException(
739+
f"Invalid output feature names specified for the transformation function '{repr(self)}'. Please provide names shorter than {FEATURES.MAX_LENGTH_NAME} characters."
740+
)
741+
742+
if len(output_col_names) != len(set(output_col_names)):
743+
raise FeatureStoreException(
744+
f"Duplicate output feature names provided for the transformation function '{repr(self)}'. Please ensure all arguments names are unique."
745+
)
746+
747+
if output_col_names and len(output_col_names) != len(self.return_types):
748+
raise FeatureStoreException(
749+
f"The number of output feature names provided does not match the number of features returned by the transformation function '{repr(self)}'. Pease provide exactly {len(self.return_types)} feature name(s) to match the output."
750+
)
751+
694752
def update_return_type_one_hot(self):
695753
self._return_types = [
696754
self._return_types[0]
@@ -765,6 +823,7 @@ def to_dict(self) -> Dict[str, Any]:
765823
"name": self.function_name,
766824
"featureNamePrefix": self._feature_name_prefix,
767825
"executionMode": self.execution_mode.value.upper(),
826+
"outputColumnNames": self.output_column_names,
768827
}
769828

770829
def json(self) -> str:
@@ -826,6 +885,12 @@ def from_response_json(
826885
else None
827886
)
828887

888+
output_column_names = (
889+
[feature.strip() for feature in json_decamelized["output_column_names"]]
890+
if json_decamelized.get("output_column_names", None)
891+
else None
892+
)
893+
829894
# Reconstructing statistics arguments.
830895
arg_list, _, _, _ = HopsworksUdf._parse_function_signature(function_source_code)
831896

@@ -858,7 +923,7 @@ def from_response_json(
858923
for arg_index in range(len(arg_list))
859924
]
860925

861-
hopsworks_udf = cls(
926+
hopsworks_udf: HopsworksUdf = cls(
862927
func=function_source_code,
863928
return_types=output_types,
864929
name=function_name,
@@ -870,6 +935,8 @@ def from_response_json(
870935
execution_mode=UDFExecutionMode.from_string(
871936
json_decamelized["execution_mode"]
872937
),
938+
output_column_names=output_column_names,
939+
generate_output_col_names=not output_column_names, # Do not generate output column names if they are retrieved from the back
873940
)
874941

875942
# Set transformation features if already set.
@@ -998,12 +1065,8 @@ def transformation_statistics(
9981065
def output_column_names(self, output_col_names: Union[str, List[str]]) -> None:
9991066
if not isinstance(output_col_names, List):
10001067
output_col_names = [output_col_names]
1001-
if not output_col_names and len(output_col_names) != len(self.return_types):
1002-
raise FeatureStoreException(
1003-
f"Provided names for output columns does not match the number of columns returned from the UDF. Please provide {len(self.return_types)} names."
1004-
)
1005-
else:
1006-
self._output_column_names = output_col_names
1068+
self._validate_output_col_name(output_col_names)
1069+
self._output_column_names = output_col_names
10071070

10081071
def __repr__(self):
10091072
return f'{self.function_name}({", ".join(self.transformation_features)})'

0 commit comments

Comments
 (0)