import os import tempfile import threading class StreamingBuffer: def __init__(self, temp_dir: str | None = None, total_size: int | None = None): fd, self.path = tempfile.mkstemp(dir=temp_dir, suffix=".tmp") os.close(fd) self._writer = open(self.path, "wb") self._reader = open(self.path, "rb") self._written = 0 self._total_size = total_size self._eof = False self._cond = threading.Condition() def append(self, data: bytes) -> None: with self._cond: self._writer.write(data) self._writer.flush() self._written += len(data) self._cond.notify_all() def close_writer(self) -> None: with self._cond: self._writer.close() self._eof = True self._cond.notify_all() def read(self, size: int = -1) -> bytes: with self._cond: pos = self._reader.tell() if size < 0: while not self._eof: self._cond.wait() to_read = self._written - pos else: while self._written <= pos and not self._eof: self._cond.wait() available = self._written - pos if available <= 0: return b"" to_read = min(size, available) return self._reader.read(to_read) def seek(self, offset: int, whence: int = 0) -> int: with self._cond: if whence == 2: if self._total_size is not None: target = self._total_size + offset self._reader.seek(target, 0) return target while not self._eof: self._cond.wait() return self._reader.seek(offset, 2) if whence == 1: target = self._reader.tell() + offset else: target = offset if target <= self._written: return self._reader.seek(offset, whence) if self._total_size is not None: self._reader.seek(target, 0) return target while target > self._written and not self._eof: self._cond.wait() return self._reader.seek(offset, whence) def tell(self) -> int: return self._reader.tell() def readable(self) -> bool: return True def seekable(self) -> bool: return True def writable(self) -> bool: return False def close(self) -> None: try: self._reader.close() except Exception: pass try: self._writer.close() except Exception: pass @property def written(self) -> int: return self._written