# Copyright (c) 2023-2024 Thomas Mathieson.
# Distributed under the terms of the MIT license.
import base64
import io
import logging
import time
from io import BytesIO
from multiprocessing import Queue, current_process
from queue import Empty
from threading import current_thread
from typing import Optional, Dict, Set, Tuple
import av # type: ignore
import numpy as np
import numpy.typing as npt
from PIL import Image
from . import ssv_logging
from .ssv_logging import log, SSVLogStream
from .ssv_render import SSVRender, SSVStreamingMode
from .ssv_render_opengl import SSVRenderOpenGL
[docs]
class SSVRenderProcessLogger(SSVLogStream):
"""
A StringIO pipe for sending log messages to, this class pipes incoming messages to "LogM" commands.
"""
def __init__(self, tx_queue: Queue):
super().__init__()
self.tx_queue = tx_queue
[docs]
def write(self, text: str, severity: int = logging.INFO+1) -> int:
self.tx_queue.put(("LogM", severity, text))
return len(text) # super().write(text)
[docs]
class SSVRenderProcessServer:
"""
This class listens for render commands and dispatches them to the renderer. This class is intended to be constructed
in a dedicated process by SSVRenderProcessClient.
"""
def __init__(self, backend: str, gl_version: Optional[int], command_queue_tx: Queue,
command_queue_rx: Queue, log_severity: int, timeout: Optional[float], use_renderdoc_api: bool = False):
self._renderer: Optional[SSVRender] = None
self._command_queue_tx: Queue = command_queue_tx
self._command_queue_rx: Queue = command_queue_rx
self.__init_logger(log_severity)
self._use_renderdoc_api = use_renderdoc_api
# Makes it easier to find the render thread in the profiler
current_thread().name = current_process().name
self.running = False
self.target_framerate = 60
self.output_size = (640, 480)
self.stream_mode: SSVStreamingMode = SSVStreamingMode.PNG
self.watchdog_time = timeout
self.encode_quality: Optional[float] = None
self.log_frame_timing = False
self._last_heartbeat_time: float = 0
self._frame_buffer_bytes = bytearray()
# self._dbg_command_stats = {}
self._video_stream: Optional[av.video.VideoStream] = None
self._video_container: Optional[av.container.OutputContainer] = None
# Frame time stats for debugging
self.avg_delta_time = 1/self.target_framerate
self.max_delta_time = 1/self.target_framerate
self.avg_delta_time_encode = 1 / self.target_framerate
self.max_delta_time_encode = 1 / self.target_framerate
self.__init_video_encoder()
self.__init_render_process(backend, gl_version)
_supported_video_formats: Set[SSVStreamingMode] = {
SSVStreamingMode.H264,
SSVStreamingMode.HEVC,
SSVStreamingMode.VP8,
SSVStreamingMode.VP9,
SSVStreamingMode.MPEG4,
SSVStreamingMode.MJPEG
}
# This dict stores the bit rate/quality factor that a stream quality of '100' should equal
_streaming_format_quality_scaling: Dict[SSVStreamingMode, int] = {
SSVStreamingMode.JPG: 100,
SSVStreamingMode.PNG: 7,
SSVStreamingMode.VP8: 3500000,
SSVStreamingMode.VP9: 3500000,
SSVStreamingMode.H264: 40000000,
SSVStreamingMode.HEVC: 40000000,
SSVStreamingMode.MPEG4: 40000000,
SSVStreamingMode.MJPEG: 68,
}
def __init_video_encoder(self):
"""
Initialises or reinitialises the video encoder using the given stream_mode.
"""
if self.stream_mode not in self._supported_video_formats:
return
class FakeIO(io.RawIOBase): # type: ignore
name: str = "stream.mkv" # type: ignore
def writable(self) -> bool:
return True
def readable(self) -> bool:
return False
def write(self, __b: bytes) -> Optional[int]: # type: ignore
# print(f"Writing: {len(__b)} bytes...")
return len(__b)
if self._video_container is not None:
self._video_container.close()
# Setting format to "null" here effectively disables muxing
self._video_container = av.open(FakeIO(), mode="w", format="null")
# self._video_container.flags |= self._video_container.flags.FLUSH_PACKETS
stream = self._video_container.add_stream(self.stream_mode.value, rate=self.target_framerate)
stream.width = self.output_size[0]
stream.height = self.output_size[1]
stream.pix_fmt = "yuv420p"
stream.options = {
# Set some options to reduce latency as much as possible, depending on the codec these can can have a
# large impact on the output size.
"g": "30",
"lag-in-frames": "2",
"speed": "10",
"quality": "realtime",
}
if self.stream_mode == SSVStreamingMode.H264:
stream.options["g"] = "1"
stream.options["zerolatency"] = "1"
stream.options["tune"] = "zerolatency"
stream.options["preset"] = "fast"
elif self.stream_mode == SSVStreamingMode.MJPEG:
stream.options["strict_std_compliance"] = "unofficial"
stream.options["color_range"] = "2"
stream.options["qscale"] = "0"
stream.options["huffman"] = "optimal"
# Creates a warning message, but seems to be the only way to get full range jpeg encoding...
stream.pix_fmt = "yuvj420p"
if self.encode_quality is not None:
if self.stream_mode == SSVStreamingMode.MJPEG:
# MJPEG doesn't seem to respect CBR
q = min(max(round(
(1-(self.encode_quality/100)) * self._streaming_format_quality_scaling[self.stream_mode])+1, 1), 69)
stream.options["qmin"] = str(q)
stream.options["qmax"] = str(q)
if self.encode_quality >= 90:
stream.pix_fmt = "yuvj444p"
else:
q = max(round(self.encode_quality/100 * self._streaming_format_quality_scaling[self.stream_mode]), 10)
stream.options["b"] = str(q)
if self.encode_quality >= 90 and self.stream_mode in {SSVStreamingMode.HEVC, SSVStreamingMode.VP9}:
stream.pix_fmt = "yuv444p"
self._video_stream = stream
def __init_logger(self, log_severity: int):
ssv_logging.set_severity(log_severity)
log_stream = SSVRenderProcessLogger(self._command_queue_tx)
ssv_logging.set_output_stream(log_stream, level=log_severity, prefix="pySSV_Render")
def __init_render_process(self, backend: str, gl_version: Optional[int]):
"""
Creates a new renderer for the given backend and starts the render process loop.
:param backend: the render backend to use.
"""
if backend == "opengl":
self._renderer = SSVRenderOpenGL(gl_version, self._use_renderdoc_api)
else:
self._renderer = None
log(f"Backend '{backend}' does not exist!", logging.ERROR)
return
self.__render_process_loop()
def __render_process_loop(self):
"""
Runs the main render process loop. This function continuously checks for new render commands and
dispatches render frame commands as needed.
"""
last_frame_time = time.perf_counter()
timeout = 0
self._last_heartbeat_time = time.monotonic()
frame = 0
while True:
# Check heartbeat
if self.watchdog_time is not None and (time.monotonic() - self._last_heartbeat_time) > self.watchdog_time:
self.__shutdown("watchdog")
return
# Render the next frame if it's time to
delta_time = time.perf_counter() - last_frame_time
if self.running and (self.target_framerate <= 0 or delta_time >= 1 / self.target_framerate):
last_frame_time = time.perf_counter()
self.__render_frame()
# Frame time stats
if self.log_frame_timing:
frame += 1
if frame % self.target_framerate == 0:
log(f"Render time: Avg={self.avg_delta_time*1000:.2f} ms "
f"Max={self.max_delta_time*1000:.2f} ms \t// "
f"Encode time Avg={self.avg_delta_time_encode*1000:.2f} ms "
f"Max={self.max_delta_time_encode*1000:.2f} ms \t// "
f"asleep={timeout*1000:.2f} ms \t// "
f"FPS: Avg={1/(self.avg_delta_time+self.avg_delta_time_encode):.1f} "
f"Avg+Sync={1/(self.avg_delta_time+self.avg_delta_time_encode+timeout):.1f} "
f"Min={1/(self.max_delta_time+self.max_delta_time_encode):.1f}", severity=logging.INFO)
self.max_delta_time = 0
self.max_delta_time_encode = 0
# Execute any render commands that are waiting for us
if self._command_queue_rx.qsize() > 0:
size = self._command_queue_rx.qsize()
# if size > 32:
# log(f"Render process is struggling to keep up! Command queue size>32 (={size})",
# severity=logging.WARN)
# If the command queue is getting backed up (due to poor framerate for instance) prioritize that so that
# user control is not delayed.
for i in range(size):
if not self.__parse_render_command(0):
self.__shutdown("requested by client")
return
else:
# Work out how long the command processor can block for
delta_time = time.perf_counter() - last_frame_time
if self.running and self.target_framerate > 0:
timeout = max(1 / self.target_framerate - delta_time, 0) * 0.5
else:
# If this timeout is infinite then the watchdog can't kill paused render processes which also need
# to be killed otherwise all the RenderDoc sockets get used up...
if self.watchdog_time is None:
timeout = 5
else:
timeout = min(self.watchdog_time*0.5, 1)
# Wait for <timeout and (potentially) execute one render command
# if not self.__parse_render_command(timeout):
# self.__shutdown("requested by client")
# return
# Because Queue.get() uses time.monotonic() internally, it doesn't have the required timeout precision
# for our use case.
time.sleep(timeout)
def __send_async_result(self, query_id: int, *args):
"""
Sends the result of a query command back to the client.
:param query_id: the query id from the client associated with this query.
:param args: the result to send back to the client.
"""
# Send an async result back to the client with the client's request id
self._command_queue_tx.put(("ARes", query_id, *args))
def __parse_render_command(self, timeout: Optional[float]) -> bool:
"""
Parses and executes the next render command. Blocks for up to ``timeout`` seconds to wait for the command
queue to fill up.
:param timeout: the maximum amount of time to wait for a new message in seconds before giving up. Pass ``None``
to wait indefinitely.
:return: ``False`` if the render process should exit.
"""
try:
command, *command_args = self._command_queue_rx.get(block=(timeout is None or timeout != 0),
timeout=timeout)
# log(f"Render Process: Received command '{command}': {command_args}", severity=logging.INFO)
except Empty:
return True
except (KeyboardInterrupt, ValueError):
log(f"Render process shutting down because client died unexpectedly.", severity=logging.INFO)
return False
# DBG
# _command = command
# if _command is None:
# _command = "NnBlk" if timeout != 0 else "Nn000"
# if _command in self._dbg_command_stats:
# self._dbg_command_stats[_command] += 1
# else:
# self._dbg_command_stats[_command] = 1
if self._renderer is None:
return False
if command is None:
# Command is None if we time out before receiving a new command, this isn't an error in this case.
pass
elif command == "Stop":
return False
elif command == "HrtB":
# Heartbeat
self._last_heartbeat_time = time.monotonic()
elif command == "SWdg":
# Set Watchdog time
self.watchdog_time = command_args[0]
elif command == "UFBO":
# New/Update Frame Buffer
self._renderer.update_frame_buffer(*command_args)
if command_args[0] == 0:
self.output_size = command_args[2]
elif command == "DFBO":
# Delete Frame Buffer
self._renderer.delete_frame_buffer(command_args[0])
elif command == "Rndr":
# A render command needs to count as the first heartbeat so that the watchdog doesn't kill us immediately
self._last_heartbeat_time = time.monotonic()
# Start rendering at a given framerate
self.target_framerate = command_args[0]
self.stream_mode = SSVStreamingMode(command_args[1])
self.encode_quality = command_args[2]
self.__init_video_encoder()
self.running = self.target_framerate != 0
elif command == "UpdU":
# Update Uniform
self._renderer.update_uniform(*command_args)
elif command == "UpdV":
# Update Vertex buffer
self._renderer.update_vertex_buffer(*command_args)
elif command == "DelV":
# Delete Vertex buffer
self._renderer.delete_vertex_buffer(*command_args)
elif command == "UpdT":
# Update Texture
self._renderer.update_texture(*command_args)
elif command == "UpdS":
# Update texture Sampler
self._renderer.update_texture_sampler(*command_args)
elif command == "DelT":
# Delete Texture
self._renderer.delete_texture(*command_args)
elif command == "RegS":
# Register Shader
self._renderer.register_shader(*command_args)
elif command == "RdCp":
# Renderdoc Capture frame
self._renderer.renderdoc_capture_frame(*command_args)
elif command == "StTm":
# Set start Time
self._renderer.set_start_time(*command_args)
elif command == "LogC":
# Log Context Info
self._renderer.log_context_info(command_args[0])
elif command == "LogT":
# Log frame Times
self.log_frame_timing = command_args[0]
elif command == "GtCt":
# Get Context info
ctx = self._renderer.get_context_info()
self.__send_async_result(command_args[0], ctx)
elif command == "GtFt":
# Get average Frame-times
self.__send_async_result(command_args[0],
self.avg_delta_time, self.max_delta_time,
self.avg_delta_time_encode, self.max_delta_time_encode)
self.max_delta_time = 0
self.max_delta_time_encode = 0
elif command == "GtEx":
# Get supported Extensions
ext = self._renderer.get_supported_extensions()
self.__send_async_result(command_args[0], ext)
elif command == "SvIm":
# Save Image
self.__send_async_result(command_args[0], self.__save_image(*command_args[1:]))
elif command == "DbRT":
# Debug Render Test
pass
else:
log(f"Render process received unknown command from client '{command}' with args: {command_args}",
severity=logging.ERROR)
return False
return True
def __shutdown(self, reason: str):
"""
Informs the client that this render process is shutting down.
:param reason: a string describing why this process is shutting down.
"""
log(f"Render process shutting down... ({reason})", severity=logging.WARN)
if self._video_container is not None:
self._video_container.close()
self._command_queue_tx.put(("Stop",))
def __render_frame(self):
"""
Asks the renderer to render the next frame and sends the new frame back to the client.
"""
start_time = time.perf_counter()
render_time = start_time
if not self._renderer.render():
self.running = False
if self.stream_mode == SSVStreamingMode.PNG:
if len(self._frame_buffer_bytes) == self.output_size[0]*self.output_size[1] * 4:
self._renderer.read_frame_into(self._frame_buffer_bytes)
else:
self._frame_buffer_bytes = bytearray(self._renderer.read_frame())
render_time = time.perf_counter()
stream_data = self.__to_png(self._frame_buffer_bytes, self.output_size, self.encode_quality)
elif self.stream_mode == SSVStreamingMode.JPG:
if len(self._frame_buffer_bytes) == self.output_size[0] * self.output_size[1] * 3:
self._renderer.read_frame_into(self._frame_buffer_bytes, 3)
else:
self._frame_buffer_bytes = bytearray(self._renderer.read_frame(3))
render_time = time.perf_counter()
stream_data = self.__to_jpg(self._frame_buffer_bytes, self.output_size, self.encode_quality)
elif self.stream_mode in self._supported_video_formats:
if len(self._frame_buffer_bytes) == self.output_size[0] * self.output_size[1] * 3:
self._renderer.read_frame_into(self._frame_buffer_bytes, 3)
else:
self._frame_buffer_bytes = bytearray(self._renderer.read_frame(3))
render_time = time.perf_counter()
stream_data = self.__encode_video_frame(self._frame_buffer_bytes)
else:
stream_data = None
# if self.log_frame_timing:
encode_time = time.perf_counter()
self.max_delta_time = max(self.max_delta_time, render_time - start_time)
self.max_delta_time_encode = max(self.max_delta_time_encode, encode_time - render_time)
self.avg_delta_time = self.avg_delta_time * 0.9 + (render_time - start_time) * 0.1
self.avg_delta_time_encode = self.avg_delta_time_encode * 0.9 + (encode_time - render_time) * 0.1
self._command_queue_tx.put(("NFrm", stream_data))
def __save_image(self, image_type: SSVStreamingMode, quality: float, size: Optional[Tuple[int, int]],
render_buffer: int, suppress_ui: bool) -> bytes:
"""
Saves the current frame as an image.
:param image_type: the image compression algorithm to use.
:param quality: the encoding quality to use for the given encoding format. Takes a float between 0-100
(some stream modes support values larger than 100, others clamp it internally), where 100
results in the highest quality. This value is scaled to give a bit rate target or
quality factor for the chosen encoder.
:param size: optionally, the width and height of the saved image. If set to ``None`` uses the current
resolution of the render buffer.
:param render_buffer: the uid of the render buffer to save.
:param suppress_ui: whether any active SSVGUIs should be suppressed.
:return: the bytes representing the compressed image.
"""
if self._renderer is None:
return b''
render_size = size if size else self.output_size
if render_size != self.output_size:
self._renderer.update_frame_buffer(render_buffer, None, render_size, None, None, None)
if suppress_ui:
self._renderer.update_uniform(None, None, "_suppress_ui", True)
if render_size != self.output_size or suppress_ui:
if not self._renderer.render():
return b''
if image_type == SSVStreamingMode.PNG:
if len(self._frame_buffer_bytes) == render_size[0] * render_size[1] * 4:
self._renderer.read_frame_into(self._frame_buffer_bytes, 4, render_buffer)
frame = self._frame_buffer_bytes
else:
frame = bytearray(self._renderer.read_frame(4, render_buffer))
stream_data = self.__to_png(frame, render_size, quality, True)
elif image_type == SSVStreamingMode.JPG:
if len(self._frame_buffer_bytes) == render_size[0] * render_size[1] * 3:
self._renderer.read_frame_into(self._frame_buffer_bytes, 3, render_buffer)
frame = self._frame_buffer_bytes
else:
frame = bytearray(self._renderer.read_frame(3, render_buffer))
stream_data = self.__to_jpg(frame, render_size, quality, True)
else:
log(f"Can't save image in format '{image_type}'!", severity=logging.ERROR)
stream_data = b''
if render_size != self.output_size:
self._renderer.update_frame_buffer(render_buffer, None, self.output_size, None, None, None)
if suppress_ui:
self._renderer.update_uniform(None, None, "_suppress_ui", False)
return stream_data
def __to_png(self, frame: bytearray, output_size: Tuple[int, int], encode_quality: float,
flip_y: bool = False) -> bytes:
"""
Converts a framebuffer into a base64 encoded png data url.
:param frame: the frame as an RGBA8888 buffer of bytes.
:param output_size: the resolution of the frame.
:param encode_quality: the encoding quality (0-100).
:param flip_y: whether the frame should be flipped vertically.
:return: a data url string containing the frame.
"""
image = Image.frombytes('RGBA', output_size, frame)
if flip_y:
image = image.transpose(Image.FLIP_TOP_BOTTOM)
image_bytes = BytesIO()
quality = 5
if encode_quality is not None:
quality = min(max(round(
encode_quality / 100 * self._streaming_format_quality_scaling[SSVStreamingMode.PNG]), 0), 7)
image.save(image_bytes, format='png', optimize=False, compress_level=quality)
return b"data:image/png;base64," + base64.b64encode(image_bytes.getvalue())
def __to_jpg(self, frame: bytearray, output_size: Tuple[int, int], encode_quality: float,
flip_y: bool = False) -> bytes:
"""
Converts a framebuffer into a base64 encoded jpeg data url.
:param frame: the frame as an RGB888 buffer of bytes.
:param output_size: the resolution of the frame.
:param encode_quality: the encoding quality (0-100).
:param flip_y: whether the frame should be flipped vertically.
:return: a data url string containing the frame.
"""
image = Image.frombytes('RGB', output_size, frame)
if flip_y:
image = image.transpose(Image.FLIP_TOP_BOTTOM)
image_bytes = BytesIO()
quality = 75
if encode_quality is not None:
quality = min(max(round(
encode_quality / 100 * self._streaming_format_quality_scaling[SSVStreamingMode.JPG]), 0), 100)
image.save(image_bytes, format='jpeg', quality=quality)
return b"data:image/jpg;base64," + base64.b64encode(image_bytes.getvalue())
def __encode_video_frame(self, frame: bytearray) -> bytes:
"""
Encodes a frame using the initialized video encoder and returns the produced video packet.
Note that this method provides raw, un-muxed video stream data. If the codec used buffers frames internally
then this method may return an empty bytes, and the bytes returned may not necessarily be for the current frame.
:param frame: the frame as an RGB888 buffer of bytes.
:return: the encoded frame as bytes.
"""
if self._video_stream is None:
raise Exception("Video encoder has not been initialised yet!")
# img = Image.frombytes("RGB", self.output_size, frame)
# av_frame = av.VideoFrame.from_image(img)
frame_np: npt.NDArray[np.uint8] = np.array(frame, copy=False, dtype=np.uint8)
frame_np = frame_np.reshape((self.output_size[1], self.output_size[0], 3))
av_frame = av.VideoFrame.from_ndarray(frame_np, format="rgb24")
packets = self._video_stream.encode(av_frame)
if len(packets) == 1:
return bytes(packets[0])
elif len(packets) > 1:
return b"".join([bytes(p) for p in packets])
else:
# raise ValueError(f"Video encoder produced didn't produce any packets for the given frame. Frame might "
# f"have been buffered.")
return b""