"""Frame assembly utilities for combining wrist and landmark packets."""
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass
from time import monotonic_ns, time_ns
from typing import Any
from hand_tracking_sdk.models import (
FingerName,
HandLandmarks,
HandSide,
HeadPose,
HeadPosePacket,
JointName,
LandmarksPacket,
ParsedPacket,
WristPacket,
WristPose,
)
from hand_tracking_sdk.parser import parse_line
[docs]
@dataclass(frozen=True, slots=True)
class HandFrame:
"""Coherent per-hand frame assembled from wrist and landmark packets.
:param side:
Hand side for this frame.
:param frame_id:
Frame identifier for downstream middleware mapping (for example ROS2).
:param wrist:
Wrist pose payload.
:param landmarks:
Ordered set of 21 hand landmarks.
:param sequence_id:
Monotonic sequence number per hand side, incremented on each emitted frame.
:param recv_ts_ns:
Monotonic receive timestamp for the assembled frame.
:param recv_time_unix_ns:
Optional wall-clock timestamp in Unix nanoseconds.
:param source_ts_ns:
Optional source timestamp supplied by upstream sender.
:param source_frame_seq:
Optional upstream source frame sequence identifier.
:param wrist_recv_ts_ns:
Receive timestamp of the wrist payload included in this frame.
:param landmarks_recv_ts_ns:
Receive timestamp of the landmark payload included in this frame.
"""
side: HandSide
frame_id: str
wrist: WristPose
landmarks: HandLandmarks
sequence_id: int
recv_ts_ns: int
recv_time_unix_ns: int | None
source_ts_ns: int | None
wrist_recv_ts_ns: int
landmarks_recv_ts_ns: int
source_frame_seq: int | None = None
[docs]
def get_joint(self, joint: JointName | str) -> tuple[float, float, float]:
"""Return one landmark point by joint name.
:param joint:
Joint to query, either as :class:`JointName` or canonical joint string.
:returns:
Joint ``(x, y, z)`` tuple.
:raises ValueError:
If the joint name is unknown.
"""
return self.landmarks.get_joint(joint)
[docs]
def get_finger(self, finger: FingerName | str) -> dict[JointName, tuple[float, float, float]]:
"""Return all landmark points for one finger group.
:param finger:
Finger group to query.
:returns:
Dictionary mapping :class:`JointName` to ``(x, y, z)`` points for
the selected finger group.
:raises ValueError:
If the finger group is unknown.
"""
return self.landmarks.get_finger(finger)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize frame into a deterministic mapping-friendly dictionary.
:returns:
Dictionary representation suitable for adapter-layer mapping.
"""
return {
"side": self.side.value,
"frame_id": self.frame_id,
"wrist": self.wrist.to_dict(),
"landmarks": self.landmarks.to_dict(),
"sequence_id": self.sequence_id,
"recv_ts_ns": self.recv_ts_ns,
"recv_time_unix_ns": self.recv_time_unix_ns,
"source_ts_ns": self.source_ts_ns,
"source_frame_seq": self.source_frame_seq,
"wrist_recv_ts_ns": self.wrist_recv_ts_ns,
"landmarks_recv_ts_ns": self.landmarks_recv_ts_ns,
}
[docs]
@classmethod
def from_dict(cls, values: Mapping[str, Any]) -> HandFrame:
"""Build :class:`HandFrame` from serialized mapping data.
:param values:
Mapping containing side, frame metadata, and geometry payloads.
:returns:
Parsed frame object.
"""
return cls(
side=HandSide(str(values["side"])),
frame_id=str(values["frame_id"]),
wrist=WristPose.from_dict(values["wrist"]),
landmarks=HandLandmarks.from_dict(values["landmarks"]),
sequence_id=int(values["sequence_id"]),
recv_ts_ns=int(values["recv_ts_ns"]),
recv_time_unix_ns=(
None if values["recv_time_unix_ns"] is None else int(values["recv_time_unix_ns"])
),
source_ts_ns=(None if values["source_ts_ns"] is None else int(values["source_ts_ns"])),
source_frame_seq=(
None
if values.get("source_frame_seq") is None
else int(values["source_frame_seq"])
),
wrist_recv_ts_ns=int(values["wrist_recv_ts_ns"]),
landmarks_recv_ts_ns=int(values["landmarks_recv_ts_ns"]),
)
[docs]
@dataclass(frozen=True, slots=True)
class HeadFrame:
"""Frame-like head pose event with normalized metadata fields.
:param side:
Always ``HandSide.HEAD``.
:param frame_id:
Frame identifier for downstream middleware mapping.
:param head:
Head pose payload.
:param sequence_id:
Monotonic sequence number for head frames.
:param recv_ts_ns:
Monotonic receive timestamp for the emitted frame.
:param recv_time_unix_ns:
Optional wall-clock timestamp in Unix nanoseconds.
:param source_ts_ns:
Optional source timestamp supplied by upstream sender.
:param source_frame_seq:
Optional upstream source frame sequence identifier.
"""
side: HandSide
frame_id: str
head: HeadPose
sequence_id: int
recv_ts_ns: int
recv_time_unix_ns: int | None
source_ts_ns: int | None
source_frame_seq: int | None = None
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize frame into a deterministic mapping-friendly dictionary."""
return {
"side": self.side.value,
"frame_id": self.frame_id,
"head": self.head.to_dict(),
"sequence_id": self.sequence_id,
"recv_ts_ns": self.recv_ts_ns,
"recv_time_unix_ns": self.recv_time_unix_ns,
"source_ts_ns": self.source_ts_ns,
"source_frame_seq": self.source_frame_seq,
}
[docs]
@classmethod
def from_dict(cls, values: Mapping[str, Any]) -> HeadFrame:
"""Build :class:`HeadFrame` from serialized mapping data."""
return cls(
side=HandSide(str(values["side"])),
frame_id=str(values["frame_id"]),
head=HeadPose.from_dict(values["head"]),
sequence_id=int(values["sequence_id"]),
recv_ts_ns=int(values["recv_ts_ns"]),
recv_time_unix_ns=(
None if values["recv_time_unix_ns"] is None else int(values["recv_time_unix_ns"])
),
source_ts_ns=(None if values["source_ts_ns"] is None else int(values["source_ts_ns"])),
source_frame_seq=(
None
if values.get("source_frame_seq") is None
else int(values["source_frame_seq"])
),
)
@dataclass(slots=True)
class _SideAssemblyState:
"""Mutable per-side assembly state for incomplete and emitted components."""
wrist: WristPose | None = None
wrist_recv_ts_ns: int | None = None
landmarks: HandLandmarks | None = None
landmarks_recv_ts_ns: int | None = None
wrist_source_ts_ns: int | None = None
landmarks_source_ts_ns: int | None = None
wrist_source_frame_seq: int | None = None
landmarks_source_frame_seq: int | None = None
last_emitted_wrist_recv_ts_ns: int | None = None
last_emitted_landmarks_recv_ts_ns: int | None = None
next_sequence_id: int = 0
@dataclass(slots=True)
class _HeadAssemblyState:
"""Mutable head assembly state for optional `HeadFrame` emission."""
head: HeadPose | None = None
head_recv_ts_ns: int | None = None
head_source_ts_ns: int | None = None
head_source_frame_seq: int | None = None
last_emitted_head_recv_ts_ns: int | None = None
next_sequence_id: int = 0
AssembledFrame = HandFrame | HeadFrame
"""Frame event emitted by :class:`HandFrameAssembler`."""
[docs]
class HandFrameAssembler:
"""Assemble coherent frames from incoming parsed HTS packets.
Emission policy:
- A frame is emitted only after both wrist and landmarks are available for a hand side.
- A new frame is emitted when at least one component timestamp advances.
- Stale component updates (older timestamps than currently stored) are ignored.
"""
def __init__(
self,
*,
include_wall_time: bool = True,
include_head_frames: bool = False,
frame_id_by_side: Mapping[HandSide, str] | None = None,
) -> None:
"""Create a frame assembler.
:param include_wall_time:
If ``True``, :class:`HandFrame` includes ``recv_time_unix_ns`` using
``time.time_ns()`` when caller does not provide one.
:param include_head_frames:
If ``True``, emit :class:`HeadFrame` events for ``Head pose`` packets.
:param frame_id_by_side:
Optional per-side frame identifiers used in emitted frames.
"""
self._include_wall_time = include_wall_time
self._include_head_frames = include_head_frames
self._frame_id_by_side = {
HandSide.LEFT: "hts_left_hand",
HandSide.RIGHT: "hts_right_hand",
HandSide.HEAD: "hts_head",
}
if frame_id_by_side is not None:
self._frame_id_by_side.update(frame_id_by_side)
self._state: dict[HandSide, _SideAssemblyState] = {
HandSide.LEFT: _SideAssemblyState(),
HandSide.RIGHT: _SideAssemblyState(),
}
self._head_state = _HeadAssemblyState()
[docs]
def reset(self, side: HandSide | None = None) -> None:
"""Reset assembler state.
:param side:
Optional side to reset. If omitted, both sides are reset.
"""
if side is None:
self._state[HandSide.LEFT] = _SideAssemblyState()
self._state[HandSide.RIGHT] = _SideAssemblyState()
self._head_state = _HeadAssemblyState()
return
if side == HandSide.HEAD:
self._head_state = _HeadAssemblyState()
return
self._state[side] = _SideAssemblyState()
[docs]
def push_packet(
self,
packet: ParsedPacket,
*,
recv_ts_ns: int | None = None,
recv_time_unix_ns: int | None = None,
source_ts_ns: int | None = None,
) -> AssembledFrame | None:
"""Push one parsed packet and optionally emit a coherent frame.
:param packet:
Parsed wrist or landmarks packet.
:param recv_ts_ns:
Monotonic receive timestamp in nanoseconds. If omitted, generated
with ``time.monotonic_ns()``.
:param recv_time_unix_ns:
Optional Unix wall-clock timestamp in nanoseconds. If omitted and
``include_wall_time=True``, generated with ``time.time_ns()``.
:param source_ts_ns:
Optional source timestamp supplied by upstream sender.
:returns:
A newly assembled frame or ``None`` if frame is incomplete/unchanged.
"""
recv_ts_ns_value, recv_time_unix_ns_value = self._resolve_timestamps(
recv_ts_ns=recv_ts_ns,
recv_time_unix_ns=recv_time_unix_ns,
)
if isinstance(packet, HeadPosePacket):
if not self._include_head_frames:
return None
return self._emit_head_frame(
packet=packet,
recv_ts_ns=recv_ts_ns_value,
recv_time_unix_ns=recv_time_unix_ns_value,
source_ts_ns=source_ts_ns,
)
side_state = self._state[packet.side]
if isinstance(packet, WristPacket):
if (
side_state.wrist_recv_ts_ns is not None
and recv_ts_ns_value < side_state.wrist_recv_ts_ns
):
return None
side_state.wrist = packet.data
side_state.wrist_recv_ts_ns = recv_ts_ns_value
side_state.wrist_source_ts_ns = (
packet.debug.source_ts_ns if packet.debug is not None else None
)
side_state.wrist_source_frame_seq = (
packet.debug.source_frame_seq if packet.debug is not None else None
)
elif isinstance(packet, LandmarksPacket):
if (
side_state.landmarks_recv_ts_ns is not None
and recv_ts_ns_value < side_state.landmarks_recv_ts_ns
):
return None
side_state.landmarks = packet.data
side_state.landmarks_recv_ts_ns = recv_ts_ns_value
side_state.landmarks_source_ts_ns = (
packet.debug.source_ts_ns if packet.debug is not None else None
)
side_state.landmarks_source_frame_seq = (
packet.debug.source_frame_seq if packet.debug is not None else None
)
return self._maybe_emit_frame(
side=packet.side,
recv_time_unix_ns=recv_time_unix_ns_value,
source_ts_ns=source_ts_ns,
)
[docs]
def push_line(
self,
line: str,
*,
recv_ts_ns: int | None = None,
recv_time_unix_ns: int | None = None,
source_ts_ns: int | None = None,
) -> AssembledFrame | None:
"""Parse and push one raw HTS line into assembler state.
:param line:
Raw UTF-8 decoded HTS CSV line.
:param recv_ts_ns:
Monotonic receive timestamp in nanoseconds.
:param recv_time_unix_ns:
Optional Unix wall-clock timestamp in nanoseconds.
:param source_ts_ns:
Optional source timestamp supplied by upstream sender.
:returns:
A newly assembled frame or ``None`` if frame is incomplete/unchanged.
"""
packet = parse_line(line)
return self.push_packet(
packet,
recv_ts_ns=recv_ts_ns,
recv_time_unix_ns=recv_time_unix_ns,
source_ts_ns=source_ts_ns,
)
def _emit_head_frame(
self,
*,
packet: HeadPosePacket,
recv_ts_ns: int,
recv_time_unix_ns: int | None,
source_ts_ns: int | None,
) -> HeadFrame | None:
"""Emit a `HeadFrame` if head state has advanced."""
if (
self._head_state.head_recv_ts_ns is not None
and recv_ts_ns < self._head_state.head_recv_ts_ns
):
return None
self._head_state.head = packet.data
self._head_state.head_recv_ts_ns = recv_ts_ns
self._head_state.head_source_ts_ns = (
packet.debug.source_ts_ns if packet.debug is not None else None
)
self._head_state.head_source_frame_seq = (
packet.debug.source_frame_seq if packet.debug is not None else None
)
if self._head_state.last_emitted_head_recv_ts_ns == self._head_state.head_recv_ts_ns:
return None
sequence_id = self._head_state.next_sequence_id
self._head_state.next_sequence_id += 1
self._head_state.last_emitted_head_recv_ts_ns = self._head_state.head_recv_ts_ns
resolved_source_ts_ns = source_ts_ns
if resolved_source_ts_ns is None:
resolved_source_ts_ns = self._head_state.head_source_ts_ns
if self._head_state.head is None or self._head_state.head_recv_ts_ns is None:
return None
return HeadFrame(
side=HandSide.HEAD,
frame_id=self._frame_id_by_side[HandSide.HEAD],
head=self._head_state.head,
sequence_id=sequence_id,
recv_ts_ns=self._head_state.head_recv_ts_ns,
recv_time_unix_ns=recv_time_unix_ns,
source_ts_ns=resolved_source_ts_ns,
source_frame_seq=self._head_state.head_source_frame_seq,
)
def _resolve_timestamps(
self,
*,
recv_ts_ns: int | None,
recv_time_unix_ns: int | None,
) -> tuple[int, int | None]:
"""Resolve receive timestamps for one pushed packet.
:param recv_ts_ns:
Optional monotonic timestamp from caller.
:param recv_time_unix_ns:
Optional Unix wall-clock timestamp from caller.
:returns:
``(recv_ts_ns, recv_time_unix_ns)`` with defaults generated as configured.
"""
recv_ts_ns_value = monotonic_ns() if recv_ts_ns is None else recv_ts_ns
if recv_time_unix_ns is not None:
return recv_ts_ns_value, recv_time_unix_ns
if self._include_wall_time:
return recv_ts_ns_value, time_ns()
return recv_ts_ns_value, None
def _maybe_emit_frame(
self,
*,
side: HandSide,
recv_time_unix_ns: int | None,
source_ts_ns: int | None,
) -> HandFrame | None:
"""Emit a frame for one hand side if component state has advanced.
:param side:
Target hand side for emission.
:param recv_time_unix_ns:
Wall-clock receive timestamp assigned to this push call.
:param source_ts_ns:
Optional source timestamp associated with this push call.
:returns:
Newly assembled frame when complete and updated, otherwise ``None``.
"""
side_state = self._state[side]
if (
side_state.wrist is None
or side_state.wrist_recv_ts_ns is None
or side_state.landmarks is None
or side_state.landmarks_recv_ts_ns is None
):
return None
has_new_wrist = side_state.last_emitted_wrist_recv_ts_ns != side_state.wrist_recv_ts_ns
has_new_landmarks = (
side_state.last_emitted_landmarks_recv_ts_ns != side_state.landmarks_recv_ts_ns
)
if not has_new_wrist and not has_new_landmarks:
return None
sequence_id = side_state.next_sequence_id
side_state.next_sequence_id += 1
side_state.last_emitted_wrist_recv_ts_ns = side_state.wrist_recv_ts_ns
side_state.last_emitted_landmarks_recv_ts_ns = side_state.landmarks_recv_ts_ns
resolved_source_ts_ns = source_ts_ns
if resolved_source_ts_ns is None:
source_ts_candidates = [
ts
for ts in (side_state.wrist_source_ts_ns, side_state.landmarks_source_ts_ns)
if ts is not None
]
resolved_source_ts_ns = max(source_ts_candidates) if source_ts_candidates else None
source_frame_seq_candidates = [
seq
for seq in (
side_state.wrist_source_frame_seq,
side_state.landmarks_source_frame_seq,
)
if seq is not None
]
resolved_source_frame_seq = (
max(source_frame_seq_candidates) if source_frame_seq_candidates else None
)
return HandFrame(
side=side,
frame_id=self._frame_id_by_side[side],
wrist=side_state.wrist,
landmarks=side_state.landmarks,
sequence_id=sequence_id,
recv_ts_ns=max(side_state.wrist_recv_ts_ns, side_state.landmarks_recv_ts_ns),
recv_time_unix_ns=recv_time_unix_ns,
source_ts_ns=resolved_source_ts_ns,
source_frame_seq=resolved_source_frame_seq,
wrist_recv_ts_ns=side_state.wrist_recv_ts_ns,
landmarks_recv_ts_ns=side_state.landmarks_recv_ts_ns,
)