Skip to content

Commit 32fe5f4

Browse files
committed
fix VideoFromFile stream source to _ReentrantBytesIO for parallel async use (with pickle support)
1 parent 0ce2698 commit 32fe5f4

File tree

1 file changed

+21
-29
lines changed

1 file changed

+21
-29
lines changed

comfy_api/latest/_input_impl/video_types.py

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from av.container import InputContainer
33
from av.subtitles.stream import SubtitleStream
44
from fractions import Fraction
5-
from typing import Optional, IO, Iterator
5+
from typing import Optional, Iterator
66
from contextlib import contextmanager, suppress
77
from .._input import AudioInput, VideoInput
88
import av
@@ -22,8 +22,8 @@ def __init__(self, data: bytes):
2222
if data is None:
2323
raise TypeError("data must be bytes, not None")
2424
self._data = data
25-
self._view = memoryview(data)
2625
self._pos = 0
26+
self._len = len(data)
2727

2828
def getvalue(self) -> bytes:
2929
if self.closed:
@@ -33,7 +33,7 @@ def getvalue(self) -> bytes:
3333
def getbuffer(self) -> memoryview:
3434
if self.closed:
3535
raise ValueError("I/O operation on closed file.")
36-
return memoryview(self._data) # return a NEW view; external .release() won't break our internal _view.
36+
return memoryview(self._data)
3737

3838
def readable(self) -> bool:
3939
return True
@@ -55,7 +55,7 @@ def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
5555
elif whence == io.SEEK_CUR:
5656
new_pos = self._pos + offset
5757
elif whence == io.SEEK_END:
58-
new_pos = len(self._view) + offset
58+
new_pos = self._len + offset
5959
else:
6060
raise ValueError(f"Invalid whence: {whence}")
6161
if new_pos < 0:
@@ -70,10 +70,10 @@ def readinto(self, b) -> int:
7070
if mv.readonly:
7171
raise TypeError("readinto() argument must be writable")
7272
mv = mv.cast("B")
73-
if self._pos >= len(self._view):
73+
if self._pos >= self._len:
7474
return 0
75-
n = min(len(mv), len(self._view) - self._pos)
76-
mv[:n] = self._view[self._pos:self._pos + n]
75+
n = min(len(mv), self._len - self._pos)
76+
mv[:n] = self._data[self._pos:self._pos + n]
7777
self._pos += n
7878
return n
7979

@@ -84,10 +84,10 @@ def read(self, size: int = -1) -> bytes:
8484
if self.closed:
8585
raise ValueError("I/O operation on closed file.")
8686
if size is None or size < 0:
87-
size = len(self._view) - self._pos
88-
if self._pos >= len(self._view):
87+
size = self._len - self._pos
88+
if self._pos >= self._len:
8989
return b""
90-
end = min(self._pos + size, len(self._view))
90+
end = min(self._pos + size, self._len)
9191
out = self._data[self._pos:end]
9292
self._pos = end
9393
return out
@@ -98,9 +98,9 @@ def read1(self, size: int = -1) -> bytes:
9898
def readline(self, size: int = -1) -> bytes:
9999
if self.closed:
100100
raise ValueError("I/O operation on closed file.")
101-
if self._pos >= len(self._view):
101+
if self._pos >= self._len:
102102
return b""
103-
end_limit = len(self._view) if size is None or size < 0 else min(len(self._view), self._pos + size)
103+
end_limit = self._len if size is None or size < 0 else min(self._len, self._pos + size)
104104
nl = self._data.find(b"\n", self._pos, end_limit)
105105
end = (nl + 1) if nl != -1 else end_limit
106106
out = self._data[self._pos:end]
@@ -131,11 +131,6 @@ def writelines(self, lines) -> None:
131131
def truncate(self, size: int | None = None) -> int:
132132
raise io.UnsupportedOperation("not writable")
133133

134-
def close(self) -> None:
135-
with suppress(Exception):
136-
self._view.release()
137-
super().close()
138-
139134

140135
def container_to_output_format(container_format: str | None) -> str | None:
141136
"""
@@ -181,15 +176,15 @@ class VideoFromFile(VideoInput):
181176
Class representing video input from a file.
182177
"""
183178

179+
__data: str | bytes
180+
184181
def __init__(self, file: str | io.BytesIO | bytes | bytearray | memoryview):
185182
"""
186183
Initialize the VideoFromFile object based off of either a path on disk or a BytesIO object
187184
containing the file contents.
188185
"""
189-
self.__path: Optional[str] = None
190-
self.__data: Optional[bytes] = None
191186
if isinstance(file, str):
192-
self.__path = file
187+
self.__data = file
193188
elif isinstance(file, io.BytesIO):
194189
# Snapshot to immutable bytes once to ensure re-entrant, parallel-safe readers.
195190
self.__data = file.getbuffer().tobytes()
@@ -203,12 +198,9 @@ def get_stream_source(self) -> str | io.BytesIO:
203198
Return the underlying file source for efficient streaming.
204199
This avoids unnecessary memory copies when the source is already a file path.
205200
"""
206-
if self.__path is not None:
207-
return self.__path
208-
data = self.__data
209-
if data is None:
210-
raise RuntimeError("VideoFromFile: missing in-memory bytes (__data is None)")
211-
return _ReentrantBytesIO(data)
201+
if isinstance(self.__data, str):
202+
return self.__data
203+
return _ReentrantBytesIO(self.__data)
212204

213205
def get_dimensions(self) -> tuple[int, int]:
214206
"""
@@ -430,12 +422,12 @@ def _get_first_video_stream(self, container: InputContainer):
430422
return video_stream
431423

432424
def _source_label(self) -> str:
433-
if self.__path is not None:
434-
return self.__path
425+
if isinstance(self.__data, str):
426+
return self.__data
435427
return f"<in-memory video: {len(self.__data)} bytes>"
436428

437429
@contextmanager
438-
def _open_source(self) -> Iterator[str | IO[bytes]]:
430+
def _open_source(self) -> Iterator[str | io.BytesIO]:
439431
"""Internal helper to ensure file-like sources are closed after use."""
440432
src = self.get_stream_source()
441433
try:

0 commit comments

Comments
 (0)