# Copyright (c) 2023-2024 Thomas Mathieson.
# Distributed under the terms of the MIT license.
import logging
from multiprocessing import Process, Queue, set_start_method
from queue import Empty
from threading import Thread, Lock
from typing import Callable, Optional, Any, Union, Set, Tuple, Dict, List
import sys
if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias
import numpy as np
import numpy.typing as npt
from . import ssv_logging
from .ssv_future import Future
from .ssv_logging import log
from .ssv_render import SSVStreamingMode
from .ssv_render_process_server import SSVRenderProcessServer
from .environment import ENVIRONMENT, Env
OnRenderObserverDelegate: TypeAlias = Callable[[bytes], None]
OnLogObserverDelegate: TypeAlias = Callable[[str], None]
[docs]
class SSVRenderProcessClient:
"""
This class creates, manages, and provides a communication interface for the render process (an
``SSVRenderProcessServer``).
"""
def __init__(self, backend: str, gl_version: Optional[int] = None, timeout: Optional[float] = 1,
use_renderdoc_api: bool = False):
"""
Initialises a new Render Process Client and starts the render process.
:param backend: the rendering backend to use.
:param gl_version: optionally, the minimum version of OpenGL to support.
:param timeout: the render process watchdog timeout, set to None to disable.
:param use_renderdoc_api: whether the renderdoc_api should be initialised.
"""
self._command_queue_tx: Queue[Tuple[Any, ...]] = Queue()
self._command_queue_rx: Queue[Tuple[Any, ...]] = Queue()
self._query_futures: Dict[int, Future] = dict()
self._query_future_id_counter = 0
self._query_futures_lock = Lock()
self._rx_thread = Thread(target=self.__rx_thread_process, daemon=True,
name=f"SSV Render Process Client RX Thread - {id(self):#08x}")
self._rx_thread.start()
self._on_render_observers: List[OnRenderObserverDelegate] = []
self._on_log_observers: List[OnLogObserverDelegate] = []
# Set the multiprocessing start method
if ENVIRONMENT != Env.COLAB:
try:
set_start_method("spawn")
except RuntimeError:
pass
# Construct the render process server in its own process, passing it the backend string and the command queues
# (note that the rx and tx queues are flipped here, the rx queue of the server is the tx queue of the client)
self._render_process = Process(target=SSVRenderProcessServer, daemon=True,
name=f"SSV Render Process - {id(self):#08x}",
args=(backend, gl_version, self._command_queue_rx, self._command_queue_tx,
ssv_logging.get_severity(), timeout, use_renderdoc_api))
self._render_process.start()
self._is_alive = True
def __del__(self):
self._render_process.kill()
self._render_process.close()
def __rx_thread_process(self):
while True:
try:
command, *command_args = self._command_queue_rx.get(block=True)
except Empty:
command = None
command_args = None
if command == "NFrm":
# New frame data is available
for observer in self._on_render_observers:
observer(command_args[0])
elif command == "LogM":
# Log message
for observer in self._on_log_observers:
observer(command_args[1])
log(command_args[1], raw=True, severity=command_args[0])
elif command == "Stop":
# Render server stopping
log("Render server shut down.", severity=logging.INFO)
self._is_alive = False
elif command == "ARes":
# Async result
with self._query_futures_lock:
if command_args[0] in self._query_futures:
res = command_args[1:] if len(command_args) > 2 else command_args[1]
self._query_futures[command_args[0]].set_result(res)
del self._query_futures[command_args[0]]
else:
log(f"Received unknown command from render process '{command}' with args: {command_args}!",
severity=logging.ERROR)
def __create_async_query(self, command: str, *args) -> Future[Any]:
"""
Runs a command which returns an async result and waits for its result to be returned.
:param command: the command to run.
:param args: any additional args to pass to the command.
:return: the result of the async query command.
"""
# While dictionaries are atomic in python, it's still a good idea to use a lock
with self._query_futures_lock:
result: Future[Any] = Future()
query_id = self._query_future_id_counter
self._query_future_id_counter += 1
self._query_futures[query_id] = result
self._command_queue_tx.put((command, query_id, *args))
return result
@property
def is_alive(self):
return self._is_alive and self._render_process.is_alive()
[docs]
def subscribe_on_render(self, observer: OnRenderObserverDelegate):
"""
Subscribes an event handler to the on_render event, triggered after each frame is rendered.
:param observer: a function to handle the event (must have the signature: `callback(data: bytes) -> None`).
"""
self._on_render_observers.append(observer)
[docs]
def unsubscribe_on_render(self, observer: OnRenderObserverDelegate):
"""
Unsubscribes an event handler from the on_render event.
:param observer: a function currently registered to handle the event.
"""
self._on_render_observers.remove(observer)
[docs]
def subscribe_on_log(self, observer: OnLogObserverDelegate):
"""
Subscribes an event handler to the on_log event, triggered when the render process logs a message.
:param observer: a function to handle the event (must have the signature: `callback(data: bytes) -> None`).
"""
self._on_log_observers.append(observer)
[docs]
def unsubscribe_on_log(self, observer: OnLogObserverDelegate):
"""
Unsubscribes an event handler from the on_log event.
:param observer: a function currently registered to handle the event.
"""
self._on_log_observers.remove(observer)
[docs]
def update_frame_buffer(self, frame_buffer_uid: int, order: Optional[int], size: Optional[Tuple[int, int]],
uniform_name: Optional[str], components: Optional[int] = 4,
dtype: Optional[str] = "f1"):
"""
Updates the resolution/format of the given frame buffer. Note that framebuffer 0 is always used for output.
If the given framebuffer id does not exist, it is created.
Setting a parameter to ``None`` preserves the current value for that frame buffer.
:param frame_buffer_uid: the uid of the framebuffer to update/create. Buffer 0 is the output framebuffer.
:param order: the sorting order to render the frame buffers in, smaller values are rendered first.
:param size: the new resolution of the framebuffer.
:param uniform_name: the name of the uniform to bind this frame buffer to.
:param components: how many vector components should each pixel have (RGB=3, RGBA=4).
:param dtype: the data type for each pixel component (see:
https://moderngl.readthedocs.io/en/5.8.2/topics/texture_formats.html).
"""
self._command_queue_tx.put(("UFBO", frame_buffer_uid, order, size, uniform_name, components, dtype))
[docs]
def delete_frame_buffer(self, buffer_uid: int):
"""
Destroys the given framebuffer. *Note* that framebuffer 0 can't be destroyed as it is the output framebuffer.
:param buffer_uid: the id of the framebuffer to destroy.
"""
self._command_queue_tx.put(("DFBO", buffer_uid))
[docs]
def render(self, target_framerate: float, stream_mode: str, encode_quality: Optional[float] = None):
"""
Starts rendering frames at the given framerate.
:param target_framerate: the framerate to render at. Set to -1 to render a single frame.
:param stream_mode: the streaming format to use to send the frames to the widget.
:param encode_quality: the encoding quality to use for the given encoding format. Takes a float between 0-100
(some stream modes support values larger than 100, others clamp it internally), where 100
results in the highest quality. This value is scaled to give a bit rate target or
quality factor for the chosen encoder. Pass in ``None`` to use the encoder's default
quality settings.
"""
self._command_queue_tx.put(("Rndr", target_framerate, stream_mode, encode_quality))
[docs]
def stop(self):
"""
Kills the render process.
"""
self._command_queue_tx.put(("Stop", ))
[docs]
def send_heartbeat(self):
"""
Sends a heartbeat to the render process to keep it alive.
"""
self._command_queue_tx.put(("HrtB",))
[docs]
def set_timeout(self, time: Optional[float] = 1):
"""
Sets the maximum time the render process will wait for a heartbeat before killing itself.
Set to None to disable the watchdog.
:param time: timeout in seconds.
"""
self._command_queue_tx.put(("SWdg", time))
[docs]
def update_vertex_buffer(self, frame_buffer_uid: int, draw_call_uid: int,
vertex_array: Optional[npt.NDArray], index_array: Optional[npt.NDArray],
vertex_attributes: Optional[Tuple[str, ...]]):
"""
Updates the data inside a vertex buffer.
:param frame_buffer_uid: the uid of the framebuffer of the vertex buffer to update.
:param draw_call_uid: the uid of the draw call of the vertex buffer to update.
:param vertex_array: a numpy array containing the new vertex data.
:param index_array: optionally, a numpy array containing the indices of vertices ordered to make triangles.
:param vertex_attributes: a tuple of the names of the vertex attributes to map to in the shader, in the order
that they appear in the vertex array.
"""
self._command_queue_tx.put(("UpdV", frame_buffer_uid, draw_call_uid, vertex_array, index_array,
vertex_attributes))
[docs]
def delete_vertex_buffer(self, frame_buffer_uid: int, draw_call_uid: int):
"""
Deletes an existing vertex buffer.
:param frame_buffer_uid: the uid of the framebuffer of the vertex buffer to delete.
:param draw_call_uid: the uid of the draw call of the vertex buffer to delete.
"""
self._command_queue_tx.put(("DelV", frame_buffer_uid, draw_call_uid))
[docs]
def update_texture(self, texture_uid: int, data: npt.NDArray, uniform_name: Optional[str],
override_dtype: Optional[str],
rect: Optional[Union[Tuple[int, int, int, int], Tuple[int, int, int, int, int, int]]],
treat_as_normalized_integer: bool):
"""
Creates or updates a texture from the NumPy array provided.
:param texture_uid: the uid of the texture to create or update.
:param data: a NumPy array containing the image data to copy to the texture.
:param uniform_name: the name of the shader uniform to associate this texture with.
:param override_dtype: Optionally, a moderngl override
:param rect: optionally, a rectangle (left, top, right, bottom) specifying the area of the target texture to
update.
:param treat_as_normalized_integer: when enabled, integer types (singed/unsigned) are treated as normalized
integers by OpenGL, such that when the texture is sampled values in the
texture are mapped to floats in the range [0, 1] or [-1, 1]. See:
https://www.khronos.org/opengl/wiki/Normalized_Integer for more details.
"""
# TODO: Optimise data transport by using shared buffers
self._command_queue_tx.put(("UpdT", texture_uid, data, uniform_name, override_dtype, rect,
treat_as_normalized_integer))
[docs]
def update_texture_sampler(self, texture_uid: int, repeat_x: Optional[bool] = None, repeat_y: Optional[bool] = None,
linear_filtering: Optional[bool] = None, linear_mipmap_filtering: Optional[bool] = None,
anisotropy: Optional[int] = None,
build_mip_maps: bool = False):
"""
Updates a texture's sampling settings. Parameters set to ``None`` are not updated.
:param texture_uid: the uid of the texture to update.
:param repeat_x: whether the texture should repeat or be clamped in the x-axis.
:param repeat_y: whether the texture should repeat or be clamped in the y-axis.
:param linear_filtering: whether the texture should use nearest neighbour (``False``) or linear (``True``)
interpolation.
:param linear_mipmap_filtering: whether different mipmap levels should blend linearly (``True``) or not
(``False``).
:param anisotropy: the number of anisotropy samples to use. (minimum of 1 = disabled, maximum of 16)
:param build_mip_maps: when set to ``True``, immediately builds mipmaps for the texture.
"""
self._command_queue_tx.put(("UpdS", texture_uid, repeat_x, repeat_y, linear_filtering, linear_mipmap_filtering,
anisotropy, build_mip_maps))
[docs]
def delete_texture(self, texture_uid: int):
"""
Destroys the given texture object.
:param texture_uid: the uid of the texture to destroy.
"""
self._command_queue_tx.put(("DelT", texture_uid))
[docs]
def register_shader(self, frame_buffer_uid: int, draw_call_uid: int,
vertex_shader: str, fragment_shader: Optional[str] = None,
tess_control_shader: Optional[str] = None, tess_evaluation_shader: Optional[str] = None,
geometry_shader: Optional[str] = None, compute_shader: Optional[str] = None,
primitive_type: Optional[str] = None):
"""
Compiles and registers a shader to a given framebuffer.
:param frame_buffer_uid: the uid of the framebuffer to register the shader to.
:param draw_call_uid: the uid of the draw call to register the shader to.
:param vertex_shader: the preprocessed vertex shader GLSL source.
:param fragment_shader: the preprocessed fragment shader GLSL source.
:param tess_control_shader: the preprocessed tessellation control shader GLSL source.
:param tess_evaluation_shader: the preprocessed tessellation evaluation shader GLSL source.
:param geometry_shader: the preprocessed geometry shader GLSL source.
:param compute_shader: *[Not implemented]* the preprocessed compute shader GLSL source.
:param primitive_type: what type of input primitive to treat the vertex data as. One of ("TRIANGLES", "LINES",
"POINTS), defaults to "TRIANGLES" if ``None``.
"""
self._command_queue_tx.put(("RegS", frame_buffer_uid, draw_call_uid, vertex_shader, fragment_shader,
tess_control_shader, tess_evaluation_shader, geometry_shader, compute_shader,
primitive_type))
[docs]
def renderdoc_capture_frame(self, filename: Optional[str]):
"""
Triggers a frame capture with Renderdoc if it's initialised.
:param filename: optionally, the filename and path to save the capture with.
"""
self._command_queue_tx.put(("RdCp", filename))
[docs]
def set_start_time(self, start_time: float) -> None:
"""
Sets the renderer's start time; this is used by the renderer to compute the canvas time which is injected into
shaders.
:param start_time: the start time of the renderer in seconds since the start of the epoch.
"""
self._command_queue_tx.put(("StTm", start_time))
[docs]
def get_context_info(self, timeout: Optional[float] = None) -> Optional[Dict[str, str]]:
"""
Returns the OpenGL context information.
:param timeout: the maximum amount of time in seconds to wait for the result. Set to ``None`` to wait
indefinitely.
"""
return self.__create_async_query("GtCt").wait_result(timeout)
[docs]
def get_frame_times(self, timeout: Optional[float] = None) -> Optional[Tuple[float, float, float, float]]:
"""
Gets the frame time statistics from the renderer. This function is blocking and shouldn't be called too often.
Returns the following statistics:
- avg_frame_time: Average time taken to render a frame (calculated each
frame using: ``last_avg_frame_time * 0.9 + frame_time * 0.1``)
- max_frame_time: The maximum frame time since this
function was last called. (Using ``dbg_log_frame_times`` interferes with this value.)
- avg_encode_time: Average time taken to encode a frame for streaming (calculated each
frame using: ``last_avg_encode_time * 0.9 + encode_time * 0.1``)
- max_encode_time: The maximum time taken to encode a frame for streaming since this
function was last called. (Using ``dbg_log_frame_times`` interferes with this value.)
:param timeout: the maximum amount of time in seconds to wait for the result. Set to ``None`` to wait
indefinitely.
:return: (avg_frame_time, max_frame_time, avg_encode_time, max_encode_time)
"""
return self.__create_async_query("GtFt").wait_result(timeout)
[docs]
def get_supported_extensions(self, timeout: Optional[float] = None) -> Optional[Set[str]]:
"""
Gets the set of supported OpenGL shader compiler extensions.
:param timeout: the maximum amount of time in seconds to wait for the result. Set to ``None`` to wait
indefinitely.
"""
return self.__create_async_query("GtEx").wait_result(timeout)
[docs]
def save_image(self, image_type: SSVStreamingMode, quality: float, size: Optional[Tuple[int, int]],
render_buffer: int, suppress_ui: bool) -> Future[bytes]:
"""
Saves the current frame as an image.
:param image_type: the image compression algorithm to use.
:param quality: the encoding quality to use for the given encoding format. Takes a float between 0-100
(some stream modes support values larger than 100, others clamp it internally), where 100
results in the highest quality. This value is scaled to give a bit rate target or
quality factor for the chosen encoder.
:param size: optionally, the width and height of the saved image. If set to ``None`` uses the current
resolution of the render buffer.
:param render_buffer: the uid of the render buffer to save.
:param suppress_ui: whether any active SSVGUIs should be suppressed.
:return: the bytes representing the compressed image.
"""
return self.__create_async_query("SvIm", image_type, quality, size, render_buffer, suppress_ui)
[docs]
def dbg_log_context_info(self, full=False):
"""
Logs the OpenGL information to the console for debugging.
:param full: whether to log *all* of the OpenGL context information (including extensions).
"""
self._command_queue_tx.put(("LogC", full))
[docs]
def dbg_log_frame_times(self, enabled=True):
"""
Enables or disables frame time logging.
:param enabled: whether to log frame times.
"""
self._command_queue_tx.put(("LogT", enabled))
[docs]
def dbg_render_test(self):
"""
**DEPRECATED**
*[For debugging only]* Sets up the pipeline to render with a demo shader.
"""
self._command_queue_tx.put(("DbRT",))
[docs]
def dbg_render_command(self, command: str, *args):
"""
*[For debugging only]* Sends a custom command to the render process.
:param command: the custom command to send
:param args: the arguments to send with the command
"""
self._command_queue_tx.put((command, *args))