Source code for hand_tracking_sdk.visualization

"""Optional real-time visualization helpers."""

from __future__ import annotations

import importlib
from collections import deque
from dataclasses import dataclass, field
from types import ModuleType
from typing import cast

from hand_tracking_sdk._compat import StrEnum
from hand_tracking_sdk.convert import (
    convert_hand_frame_unity_left_to_right,
    convert_landmarks_unity_left_to_right,
    convert_wrist_pose_unity_left_to_right,
    unity_left_to_right_position,
    unity_right_to_flu_position,
)
from hand_tracking_sdk.frame import HandFrame, HeadFrame
from hand_tracking_sdk.models import (
    HandSide,
    HeadPosePacket,
    LandmarksPacket,
    ParsedPacket,
    WristPacket,
    WristPose,
)

from .exceptions import VisualizationDependencyError


[docs] class VisualizationFrame(StrEnum): """Output frame convention used for visualization points.""" SDK = "sdk" FLU = "flu"
[docs] @dataclass(frozen=True, slots=True) class RerunVisualizerConfig: """Configuration for :class:`RerunVisualizer`. :param application_id: Application identifier displayed in Rerun. :param spawn: If ``True``, spawn a local Rerun viewer on initialization. :param landmarks_are_wrist_relative: If ``True``, landmark points are interpreted as wrist-local coordinates and transformed into world frame before logging. :param wrist_radius: Point radius for wrist markers in meters. :param landmark_radius: Point radius for landmark markers in meters. :param wrist_color: RGB color for wrist markers. :param left_landmark_color: RGB color for left-hand landmark markers. :param right_landmark_color: RGB color for right-hand landmark markers. :param convert_to_right_handed: If ``True``, incoming Unity left-handed data are converted to right-handed coordinates before visualization. :param background_color: Optional RGB background color for the Rerun 3D view. :param visualization_frame: Point frame convention for visualization output. ``flu`` means ``x=forward, y=left, z=up``. :param show_jitter_panel: If ``True``, log jitter/drop scalar metrics under ``metrics/jitter/...``. :param jitter_window_size: Rolling window size for jitter percentile metrics. :param show_coordinate_frames: If ``True``, render local XYZ axes for wrist/head poses. :param coordinate_frame_axis_length: Axis length in meters for rendered coordinate frames. """ application_id: str = "hand-tracking-sdk" spawn: bool = True landmarks_are_wrist_relative: bool = True wrist_radius: float = 0.025 landmark_radius: float = 0.015 wrist_color: tuple[int, int, int] = (255, 220, 0) left_landmark_color: tuple[int, int, int] = (64, 128, 255) right_landmark_color: tuple[int, int, int] = (255, 64, 64) convert_to_right_handed: bool = True background_color: tuple[int, int, int] | None = (18, 22, 30) visualization_frame: VisualizationFrame = VisualizationFrame.FLU show_jitter_panel: bool = False jitter_window_size: int = 200 show_coordinate_frames: bool = False coordinate_frame_axis_length: float = 0.08
@dataclass(slots=True) class _JitterSideState: prev_recv_ts_ns: int | None = None prev_source_ts_ns: int | None = None prev_source_frame_seq: int | None = None jitter_ns_window: deque[int] = field(default_factory=deque)
[docs] class RerunVisualizer: """Visualizer that logs hand telemetry to `rerun`. This component is optional and requires installing the visualization extra. """ def __init__(self, config: RerunVisualizerConfig | None = None) -> None: """Create a Rerun visualizer. :param config: Optional visualizer configuration. :raises VisualizationDependencyError: If `rerun-sdk` is not installed. """ self._config = config or RerunVisualizerConfig() self._rr = self._import_rerun() self._latest_wrist_by_side: dict[HandSide, WristPose] = {} self._jitter_state_by_side: dict[HandSide, _JitterSideState] = { HandSide.LEFT: _JitterSideState(), HandSide.RIGHT: _JitterSideState(), } self._rr.init(self._config.application_id, spawn=self._config.spawn) self._apply_view_background()
[docs] def log_packet(self, packet: ParsedPacket) -> None: """Log one parsed packet to Rerun. :param packet: Parsed packet event from HTS stream. """ side_path = f"hands/{packet.side.value.lower()}" if isinstance(packet, WristPacket): pose = packet.data if self._config.convert_to_right_handed: pose = convert_wrist_pose_unity_left_to_right(pose) self._latest_wrist_by_side[packet.side] = pose self._log_points( f"{side_path}/wrist", [(pose.x, pose.y, pose.z)], radius=self._config.wrist_radius, color=self._config.wrist_color, ) if self._config.show_coordinate_frames: self._log_pose_axes( f"{side_path}/frame_axes", position=(pose.x, pose.y, pose.z), quaternion=(pose.qx, pose.qy, pose.qz, pose.qw), ) return if isinstance(packet, LandmarksPacket): points = packet.data.points if self._config.convert_to_right_handed: points = convert_landmarks_unity_left_to_right(packet.data).points if self._config.landmarks_are_wrist_relative: wrist = self._latest_wrist_by_side.get(packet.side) if wrist is None: return points = self._transform_landmarks_by_wrist(points=points, wrist=wrist) self._log_points( f"{side_path}/landmarks", points, radius=self._config.landmark_radius, color=self._landmark_color(packet.side), ) return if isinstance(packet, HeadPosePacket): hx, hy, hz = packet.data.x, packet.data.y, packet.data.z hqx, hqy, hqz, hqw = packet.data.qx, packet.data.qy, packet.data.qz, packet.data.qw if self._config.convert_to_right_handed: hx, hy, hz = unity_left_to_right_position(x=hx, y=hy, z=hz) as_wrist = WristPose(x=packet.data.x, y=packet.data.y, z=packet.data.z, qx=packet.data.qx, qy=packet.data.qy, qz=packet.data.qz, qw=packet.data.qw) converted = convert_wrist_pose_unity_left_to_right(as_wrist) hqx, hqy, hqz, hqw = converted.qx, converted.qy, converted.qz, converted.qw self._log_points( "head/pose", [(hx, hy, hz)], radius=self._config.wrist_radius, color=self._config.wrist_color, ) if self._config.show_coordinate_frames: self._log_pose_axes( "head/frame_axes", position=(hx, hy, hz), quaternion=(hqx, hqy, hqz, hqw), )
[docs] def log_frame(self, frame: HandFrame) -> None: """Log one assembled frame to Rerun. :param frame: Assembled hand frame. """ visual_frame = ( convert_hand_frame_unity_left_to_right(frame) if self._config.convert_to_right_handed else frame ) base = f"frames/{visual_frame.frame_id}" self._log_points( f"{base}/wrist", [(visual_frame.wrist.x, visual_frame.wrist.y, visual_frame.wrist.z)], radius=self._config.wrist_radius, color=self._config.wrist_color, ) if self._config.show_coordinate_frames: self._log_pose_axes( f"{base}/frame_axes", position=(visual_frame.wrist.x, visual_frame.wrist.y, visual_frame.wrist.z), quaternion=( visual_frame.wrist.qx, visual_frame.wrist.qy, visual_frame.wrist.qz, visual_frame.wrist.qw, ), ) points = visual_frame.landmarks.points if self._config.landmarks_are_wrist_relative: points = self._transform_landmarks_by_wrist(points=points, wrist=visual_frame.wrist) self._log_points( f"{base}/landmarks", points, radius=self._config.landmark_radius, color=self._landmark_color(visual_frame.side), ) if self._config.show_jitter_panel: self._log_jitter_metrics(visual_frame)
[docs] def log_event(self, event: ParsedPacket | HandFrame | HeadFrame) -> None: """Log either packet or frame event. :param event: Stream event from :class:`hand_tracking_sdk.HTSClient`. """ if isinstance(event, HandFrame): self.log_frame(event) return if isinstance(event, HeadFrame): hx, hy, hz = event.head.x, event.head.y, event.head.z hqx, hqy, hqz, hqw = event.head.qx, event.head.qy, event.head.qz, event.head.qw if self._config.convert_to_right_handed: hx, hy, hz = unity_left_to_right_position(x=hx, y=hy, z=hz) as_wrist = WristPose(x=event.head.x, y=event.head.y, z=event.head.z, qx=event.head.qx, qy=event.head.qy, qz=event.head.qz, qw=event.head.qw) converted = convert_wrist_pose_unity_left_to_right(as_wrist) hqx, hqy, hqz, hqw = converted.qx, converted.qy, converted.qz, converted.qw self._log_points( f"frames/{event.frame_id}/head", [(hx, hy, hz)], radius=self._config.wrist_radius, color=self._config.wrist_color, ) if self._config.show_coordinate_frames: self._log_pose_axes( f"frames/{event.frame_id}/frame_axes", position=(hx, hy, hz), quaternion=(hqx, hqy, hqz, hqw), ) return self.log_packet(event)
def _log_pose_axes( self, path: str, *, position: tuple[float, float, float], quaternion: tuple[float, float, float, float], ) -> None: """Log XYZ axes for one pose with SDK-version compatibility. :param path: Entity path for axis geometry. :param position: Pose origin in SDK frame convention. :param quaternion: Pose orientation as ``(qx, qy, qz, qw)``. """ px, py, pz = position qx, qy, qz, qw = quaternion axis_len = self._config.coordinate_frame_axis_length local_axes = self._coordinate_axes_for_visualization(axis_len=axis_len) axis_colors = ([255, 0, 0], [0, 255, 0], [0, 128, 255]) origin = self._map_point_frame(x=px, y=py, z=pz) segments: list[list[list[float]]] = [] for ax, ay, az in local_axes: rx, ry, rz = _rotate_vector_by_quaternion( x=ax, y=ay, z=az, qx=qx, qy=qy, qz=qz, qw=qw, ) endpoint = self._map_point_frame(x=px + rx, y=py + ry, z=pz + rz) segments.append( [ [origin[0], origin[1], origin[2]], [endpoint[0], endpoint[1], endpoint[2]], ] ) if hasattr(self._rr, "LineStrips3D"): try: self._rr.log( path, self._rr.LineStrips3D( segments, colors=axis_colors, ), ) return except Exception: pass fallback_points = [point for segment in segments for point in segment] fallback_colors = [ axis_colors[0], axis_colors[0], axis_colors[1], axis_colors[1], axis_colors[2], axis_colors[2], ] self._rr.log( path, self._rr.Points3D( fallback_points, radii=[self._config.landmark_radius] * len(fallback_points), colors=fallback_colors, ), ) def _coordinate_axes_for_visualization( self, *, axis_len: float ) -> tuple[tuple[float, float, float], tuple[float, float, float], tuple[float, float, float]]: """Return local axis vectors so displayed axes match configured view basis. In FLU mode, colors remain RGB while semantic directions become: red=+X(forward), green=+Y(left), blue=+Z(up). """ if self._config.visualization_frame == VisualizationFrame.FLU: # Source (Unity-right) vectors that map to FLU +X/+Y/+Z after _map_point_frame. return ( (0.0, 0.0, axis_len), # FLU +X (forward) (-axis_len, 0.0, 0.0), # FLU +Y (left) (0.0, -axis_len, 0.0), # FLU +Z (up) ) return ( (axis_len, 0.0, 0.0), (0.0, axis_len, 0.0), (0.0, 0.0, axis_len), ) def _log_points( self, path: str, points: list[tuple[float, float, float]] | tuple[tuple[float, float, float], ...], *, radius: float, color: tuple[int, int, int], ) -> None: """Log a homogeneous point set to Rerun. :param path: Entity path for the emitted points. :param points: Sequence of points in SDK frame convention prior to frame mapping. :param radius: Radius in meters for each point marker. :param color: RGB tuple used for all emitted points. """ mapped_points = [self._map_point_frame(x=x, y=y, z=z) for x, y, z in points] points_as_lists = [[x, y, z] for x, y, z in mapped_points] self._rr.log( path, self._rr.Points3D( points_as_lists, radii=[radius] * len(points_as_lists), colors=[list(color)] * len(points_as_lists), ), ) def _import_rerun(self) -> ModuleType: """Import the optional ``rerun`` dependency. :raises VisualizationDependencyError: If ``rerun-sdk`` is not installed. :returns: Imported ``rerun`` module. """ try: module = importlib.import_module("rerun") except ModuleNotFoundError as exc: raise VisualizationDependencyError( "rerun is not installed. Install with: pip install hand-tracking-sdk[visualization]" ) from exc return module def _apply_view_background(self) -> None: """Apply optional background color to the default 3D view.""" if self._config.background_color is None: return if not hasattr(self._rr, "send_blueprint"): return try: blueprint_module = importlib.import_module("rerun.blueprint") except ModuleNotFoundError: return eye_controls = None if hasattr(blueprint_module, "EyeControls3D"): eye_controls = blueprint_module.EyeControls3D( kind=blueprint_module.Eye3DKind.Orbital, position=[-0.06, 0.02, 1.5], look_target=[0.2, 0.02, 1.5], eye_up=[0.0, 0.0, 1.0], ) spatial_view = blueprint_module.Spatial3DView( origin="/", name="3D Scene", background=list(self._config.background_color), eye_controls=eye_controls, ) if not self._config.show_jitter_panel or not hasattr(blueprint_module, "TimeSeriesView"): self._rr.send_blueprint(blueprint_module.Blueprint(spatial_view)) return try: jitter_view = blueprint_module.TimeSeriesView( origin="/metrics/jitter", name="Jitter", ) if hasattr(blueprint_module, "Horizontal"): layout = blueprint_module.Horizontal(spatial_view, jitter_view) else: layout = [spatial_view, jitter_view] self._rr.send_blueprint(blueprint_module.Blueprint(layout)) except Exception: # Fall back to plain 3D blueprint if viewer API shape differs by rerun version. self._rr.send_blueprint(blueprint_module.Blueprint(spatial_view)) def _landmark_color(self, side: HandSide) -> tuple[int, int, int]: """Return the configured landmark color for a hand side. :param side: Hand side associated with the packet or frame. :returns: RGB color tuple for landmark points. """ if side == HandSide.LEFT: return self._config.left_landmark_color return self._config.right_landmark_color def _map_point_frame(self, *, x: float, y: float, z: float) -> tuple[float, float, float]: """Map a point from SDK coordinates to the visualization frame. :param x: Point X value in SDK frame. :param y: Point Y value in SDK frame. :param z: Point Z value in SDK frame. :returns: Point in the configured visualization frame. """ if self._config.visualization_frame == VisualizationFrame.SDK: return (x, y, z) return unity_right_to_flu_position(x=x, y=y, z=z) def _transform_landmarks_by_wrist( self, *, points: tuple[tuple[float, float, float], ...], wrist: WristPose, ) -> tuple[tuple[float, float, float], ...]: """Transform wrist-relative landmarks into world coordinates. :param points: Landmark points interpreted as coordinates in wrist-local space. :param wrist: Wrist world pose used to rotate and translate each point. :returns: Landmark points in world coordinates. """ transformed: list[tuple[float, float, float]] = [] for px, py, pz in points: rx, ry, rz = _rotate_vector_by_quaternion( x=px, y=py, z=pz, qx=wrist.qx, qy=wrist.qy, qz=wrist.qz, qw=wrist.qw, ) transformed.append((rx + wrist.x, ry + wrist.y, rz + wrist.z)) return tuple(transformed) def _log_jitter_metrics(self, frame: HandFrame) -> None: """Log per-side jitter and drop metrics as scalar timeseries.""" if frame.source_ts_ns is None: return side = frame.side state = self._jitter_state_by_side[side] base = f"metrics/jitter/{side.value.lower()}" if state.prev_recv_ts_ns is not None and state.prev_source_ts_ns is not None: source_dt_ns = frame.source_ts_ns - state.prev_source_ts_ns recv_dt_ns = frame.recv_ts_ns - state.prev_recv_ts_ns jitter_ns = recv_dt_ns - source_dt_ns self._append_jitter_window(state.jitter_ns_window, jitter_ns) self._log_scalar(f"{base}/source_dt_ms", source_dt_ns / 1e6) self._log_scalar(f"{base}/recv_dt_ms", recv_dt_ns / 1e6) self._log_scalar(f"{base}/jitter_ms", jitter_ns / 1e6) self._log_scalar( f"{base}/jitter_p95_ms", self._percentile_ms(state.jitter_ns_window, 0.95), ) if frame.source_frame_seq is not None and state.prev_source_frame_seq is not None: dropped = max(0, frame.source_frame_seq - state.prev_source_frame_seq - 1) self._log_scalar(f"{base}/drop_gap_frames", float(dropped)) state.prev_recv_ts_ns = frame.recv_ts_ns state.prev_source_ts_ns = frame.source_ts_ns state.prev_source_frame_seq = frame.source_frame_seq def _append_jitter_window(self, window: deque[int], value: int) -> None: """Append to a bounded jitter window.""" window.append(value) while len(window) > self._config.jitter_window_size: window.popleft() def _percentile_ms(self, values_ns: deque[int], fraction: float) -> float: """Return percentile value in milliseconds for a small rolling window.""" if not values_ns: return 0.0 sorted_values = sorted(values_ns) index = int(round((len(sorted_values) - 1) * fraction)) return sorted_values[index] / 1e6 def _log_scalar(self, path: str, value: float) -> None: """Log one scalar point, with compatibility across rerun SDK versions.""" if hasattr(self._rr, "Scalar"): self._rr.log(path, self._rr.Scalar(value)) return if hasattr(self._rr, "Scalars"): self._rr.log(path, self._rr.Scalars([value]))
def _rotate_vector_by_quaternion( *, x: float, y: float, z: float, qx: float, qy: float, qz: float, qw: float, ) -> tuple[float, float, float]: """Rotate a 3D vector by a quaternion. :param x: Input vector X component. :param y: Input vector Y component. :param z: Input vector Z component. :param qx: Quaternion X component. :param qy: Quaternion Y component. :param qz: Quaternion Z component. :param qw: Quaternion W component. :returns: Rotated ``(x, y, z)`` tuple. """ norm = (qx * qx + qy * qy + qz * qz + qw * qw) ** 0.5 if norm == 0.0: return x, y, z qx_n = qx / norm qy_n = qy / norm qz_n = qz / norm qw_n = qw / norm # v' = q * v * q^-1 ix = qw_n * x + qy_n * z - qz_n * y iy = qw_n * y + qz_n * x - qx_n * z iz = qw_n * z + qx_n * y - qy_n * x iw = -qx_n * x - qy_n * y - qz_n * z out_x = ix * qw_n + iw * -qx_n + iy * -qz_n - iz * -qy_n out_y = iy * qw_n + iw * -qy_n + iz * -qx_n - ix * -qz_n out_z = iz * qw_n + iw * -qz_n + ix * -qy_n - iy * -qx_n return cast(tuple[float, float, float], (out_x, out_y, out_z))