26
26
from functools import partial
27
27
from os import environ
28
28
from time import time
29
- from typing import Any , Callable , Final , Literal , TextIO , cast
29
+ from typing import Any , Callable , Final , Literal
30
30
from uuid import uuid4
31
31
32
32
import fsspec
@@ -78,22 +78,17 @@ class CompletionRefs:
78
78
UploadData = dict [str , Callable [[], JsonEncodeable ]]
79
79
80
80
81
- def fsspec_open (urlpath : str , mode : Literal ["w" ]) -> TextIO :
82
- """typed wrapper around `fsspec.open`"""
83
- return cast (TextIO , fsspec .open (urlpath , mode )) # pyright: ignore[reportUnknownMemberType]
84
-
85
-
86
- class FsspecUploadCompletionHook (CompletionHook ):
81
+ class UploadCompletionHook (CompletionHook ):
87
82
"""An completion hook using ``fsspec`` to upload to external storage
88
83
89
84
This function can be used as the
90
85
:func:`~opentelemetry.util.genai.completion_hook.load_completion_hook` implementation by
91
- setting :envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK` to ``fsspec_upload ``.
86
+ setting :envvar:`OTEL_INSTRUMENTATION_GENAI_COMPLETION_HOOK` to ``upload ``.
92
87
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
93
88
base path for uploads.
94
89
95
90
Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
96
- implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec ]``
91
+ implementation will be used instead. You can use ``opentelemetry-util-genai[upload ]``
97
92
as a requirement to achieve this.
98
93
"""
99
94
@@ -104,8 +99,9 @@ def __init__(
104
99
max_size : int = 20 ,
105
100
upload_format : Format | None = None ,
106
101
) -> None :
107
- self ._base_path = base_path
108
102
self ._max_size = max_size
103
+ self ._fs , base_path = fsspec .url_to_fs (base_path )
104
+ self ._base_path = self ._fs .unstrip_protocol (base_path )
109
105
110
106
if upload_format not in _FORMATS + (None ,):
111
107
raise ValueError (
@@ -133,15 +129,15 @@ def done(future: Future[None]) -> None:
133
129
try :
134
130
future .result ()
135
131
except Exception : # pylint: disable=broad-except
136
- _logger .exception ("fsspec uploader failed" )
132
+ _logger .exception ("uploader failed" )
137
133
finally :
138
134
self ._semaphore .release ()
139
135
140
136
for path , json_encodeable in upload_data .items ():
141
137
# could not acquire, drop data
142
138
if not self ._semaphore .acquire (blocking = False ): # pylint: disable=consider-using-with
143
139
_logger .warning (
144
- "fsspec upload queue is full, dropping upload %s" ,
140
+ "upload queue is full, dropping upload %s" ,
145
141
path ,
146
142
)
147
143
continue
@@ -153,7 +149,7 @@ def done(future: Future[None]) -> None:
153
149
fut .add_done_callback (done )
154
150
except RuntimeError :
155
151
_logger .info (
156
- "attempting to upload file after FsspecUploadCompletionHook .shutdown() was already called"
152
+ "attempting to upload file after UploadCompletionHook .shutdown() was already called"
157
153
)
158
154
self ._semaphore .release ()
159
155
@@ -188,7 +184,13 @@ def _do_upload(
188
184
for message_idx , line in enumerate (message_lines ):
189
185
line [_MESSAGE_INDEX_KEY ] = message_idx
190
186
191
- with fsspec_open (path , "w" ) as file :
187
+ content_type = (
188
+ "application/json"
189
+ if self ._format == "json"
190
+ else "application/jsonl"
191
+ )
192
+
193
+ with self ._fs .open (path , "w" , content_type = content_type ) as file :
192
194
for message in message_lines :
193
195
json .dump (
194
196
message ,
0 commit comments