diff --git a/src/firebase_functions/private/util.py b/src/firebase_functions/private/util.py index 9df09035..843ba51a 100644 --- a/src/firebase_functions/private/util.py +++ b/src/firebase_functions/private/util.py @@ -402,17 +402,44 @@ def get_precision_timestamp(time: str) -> PrecisionTimestamp: return PrecisionTimestamp.MICROSECONDS -def timestamp_conversion(time: str) -> _dt.datetime: - """Converts a timestamp and returns a datetime object of the current time in UTC""" - precision_timestamp = get_precision_timestamp(time) +def timestamp_conversion(timestamp: str | dict | _typing.Any) -> _dt.datetime: + """ + Converts a timestamp-like value to a timezone-aware UTC datetime. + Accepts RFC 3339/ISO 8601 strings or Firebase Timestamp objects + (with 'seconds', 'nanoseconds' attributes). + """ + # Handle Firebase Timestamp object case + # Accept dict-like objects, or python objects with 'seconds' and 'nanoseconds' attributes + if hasattr(timestamp, "seconds") and hasattr(timestamp, "nanoseconds"): + # Normalize nanoseconds into seconds (handles values >= 1_000_000_000 or < 0) + carry, ns = divmod(int(timestamp.nanoseconds), 1_000_000_000) + secs = int(timestamp.seconds) + carry + # Truncate (deterministic, no floating precision issues, matches string path behavior) + microseconds = ns // 1_000 + # Build without using fromtimestamp + epoch = _dt.datetime(1970, 1, 1, tzinfo=_dt.timezone.utc) + return epoch + _dt.timedelta(seconds=secs, microseconds=microseconds) + elif isinstance(timestamp, dict) and "seconds" in timestamp and "nanoseconds" in timestamp: + # Normalize nanoseconds into seconds (handles values >= 1_000_000_000 or < 0) + carry, ns = divmod(int(timestamp["nanoseconds"]), 1_000_000_000) + secs = int(timestamp["seconds"]) + carry + # Truncate (deterministic, no floating precision issues, matches string path behavior) + microseconds = ns // 1_000 + # Build without using fromtimestamp + epoch = _dt.datetime(1970, 1, 1, tzinfo=_dt.timezone.utc) + return epoch + _dt.timedelta(seconds=secs, microseconds=microseconds) + + # Assume string input + if not isinstance(timestamp, str): + raise ValueError("timestamp_conversion expects a string or a Timestamp-like object") + precision_timestamp = get_precision_timestamp(timestamp) if precision_timestamp == PrecisionTimestamp.NANOSECONDS: - return nanoseconds_timestamp_conversion(time) + return nanoseconds_timestamp_conversion(timestamp) elif precision_timestamp == PrecisionTimestamp.MICROSECONDS: - return microsecond_timestamp_conversion(time) + return microsecond_timestamp_conversion(timestamp) elif precision_timestamp == PrecisionTimestamp.SECONDS: - return second_timestamp_conversion(time) - + return second_timestamp_conversion(timestamp) raise ValueError("Invalid timestamp") diff --git a/tests/test_util.py b/tests/test_util.py index 34d975d2..d194aa5d 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -18,6 +18,8 @@ import datetime as _dt from os import environ, path +import pytest + from firebase_functions.private.util import ( PrecisionTimestamp, _unsafe_decode_id_token, @@ -28,6 +30,7 @@ nanoseconds_timestamp_conversion, normalize_path, second_timestamp_conversion, + timestamp_conversion, ) test_bucket = "python-functions-testing.appspot.com" @@ -187,3 +190,147 @@ def test_unsafe_decode_token(): result = _unsafe_decode_id_token(test_token) assert result["sub"] == "firebase" assert result["name"] == "John Doe" + + +# Helper class for timestamp conversion tests +class _Timestamp: + """Helper class to simulate Firebase Timestamp objects.""" + + def __init__(self, seconds: int, nanoseconds: int): + self.seconds = seconds + self.nanoseconds = nanoseconds + + +def _assert_utc_datetime(dt: _dt.datetime) -> None: + """Helper to assert datetime is UTC timezone-aware.""" + assert dt.tzinfo == _dt.timezone.utc + + +@pytest.mark.parametrize( + "seconds,nanoseconds,expected_str", + [ + (0, 0, "1970-01-01T00:00:00.000000+00:00"), # The epoch + (1, 0, "1970-01-01T00:00:01.000000+00:00"), # 1 second after epoch + (0, 1, "1970-01-01T00:00:00.000000+00:00"), # 1 nanosecond (truncated) + (0, 999_999, "1970-01-01T00:00:00.000999+00:00"), # < 1 microsecond + (0, 1_000, "1970-01-01T00:00:00.000001+00:00"), # 1 microsecond + (0, 999_999_999, "1970-01-01T00:00:00.999999+00:00"), # almost 1 second + (0, 1_000_000_000, "1970-01-01T00:00:01.000000+00:00"), # exactly 1 second (carries) + (123456, 1_500_000_000, "1970-01-02T10:17:37.500000+00:00"), # overflow with remainder + (1672578896, 123456789, "2023-01-01T13:14:56.123456+00:00"), # real-world example + (-1, 0, "1969-12-31T23:59:59.000000+00:00"), # 1 second before epoch + (-1, 500_000_000, "1969-12-31T23:59:59.500000+00:00"), # negative seconds, positive nsec + ], +) +def test_timestamp_conversion_object_known_cases(seconds: int, nanoseconds: int, expected_str: str): + """Test timestamp_conversion with objects using known correct expected values.""" + timestamp_obj = _Timestamp(seconds=seconds, nanoseconds=nanoseconds) + result = timestamp_conversion(timestamp_obj) + expected = _dt.datetime.fromisoformat(expected_str) + assert result == expected + _assert_utc_datetime(result) + + +@pytest.mark.parametrize( + "seconds,nanoseconds", + [ + (123456, -500_000_000), # negative nanoseconds + (123456, 2_999_999_999), # large nanoseconds, multiple second carry + (2_147_483_647, 0), # max 32-bit int + (-2, 2_000_000_000), # negative seconds, nanoseconds w/ carry + (0, -1), # negative nanoseconds underflow + (0, -1_000_000_000), # underflow full second + (0, -1_500_000_000), # underflow more than one second + (1687256122, 396358000), # nominal case + (1687256122, 0), + (0, 0), + (0, 1), + (0, 999_999_999), + (1687256122, 2_000_000_000), + (1687256122, -500_000_000), + (-1, 999_999_999), + (-1, 500_000_000), + (-2, 2_000_000_000), + (2_147_483_647, 999_999_999), + (-2_147_483_648, 0), + (0, -2_000_000_000), + (0, 2_000_000_000), + ], +) +def test_timestamp_conversion_object_dict_consistency(seconds: int, nanoseconds: int): + """Test that object and dict branches produce identical results.""" + timestamp_obj = _Timestamp(seconds=seconds, nanoseconds=nanoseconds) + timestamp_dict = {"seconds": seconds, "nanoseconds": nanoseconds} + + result_obj = timestamp_conversion(timestamp_obj) + result_dict = timestamp_conversion(timestamp_dict) + + assert result_obj == result_dict + _assert_utc_datetime(result_obj) + _assert_utc_datetime(result_dict) + + +@pytest.mark.parametrize( + "seconds,nanoseconds", + [ + (1672576496, 123456000), # nanoseconds already in microsecond precision + (1672576496, 0), + (1672576496, 999999000), + ], +) +def test_timestamp_conversion_string_cross_validation(seconds: int, nanoseconds: int): + """Test cross-validation with string path for microsecond-precision nanoseconds.""" + dt_from_obj = timestamp_conversion(_Timestamp(seconds=seconds, nanoseconds=nanoseconds)) + iso_str = dt_from_obj.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + dt_from_string = timestamp_conversion(iso_str) + + assert dt_from_obj == dt_from_string + + +@pytest.mark.parametrize( + "timestamp_str,conversion_func", + [ + ("2023-01-01T12:34:56.123456789Z", nanoseconds_timestamp_conversion), + ("2023-06-20T10:15:22.396358Z", microsecond_timestamp_conversion), + ("2023-01-01T12:34:56Z", second_timestamp_conversion), + ], +) +def test_timestamp_conversion_with_string(timestamp_str: str, conversion_func): + """Test timestamp_conversion works with string inputs.""" + result = timestamp_conversion(timestamp_str) + expected = conversion_func(timestamp_str) + assert result == expected + _assert_utc_datetime(result) + + +@pytest.mark.parametrize( + "invalid_input,expected_error_msg", + [ + (12345, "timestamp_conversion expects a string or a Timestamp-like object"), + ("invalid_timestamp", None), # Error message varies, just check ValueError + (None, None), + ], +) +def test_timestamp_conversion_errors(invalid_input, expected_error_msg): + """Test timestamp_conversion raises appropriate errors for invalid inputs.""" + with pytest.raises(ValueError) as exc_info: + timestamp_conversion(invalid_input) + if expected_error_msg: + assert expected_error_msg in str(exc_info.value) + + +def test_timestamp_conversion_error_missing_seconds(): + """Test timestamp_conversion raises error when seconds attribute is missing.""" + + class IncompleteTimestamp: + def __init__(self, nanoseconds: int): + self.nanoseconds = nanoseconds + + with pytest.raises(ValueError): + timestamp_conversion(IncompleteTimestamp(nanoseconds=123456789)) + + +def test_timestamp_conversion_error_missing_nanoseconds(): + """Test timestamp_conversion raises error when nanoseconds key is missing in dict.""" + with pytest.raises(ValueError): + timestamp_conversion({"nanoseconds": 123456789})