"""Parsing helpers for HTS UTF-8 CSV packets."""
import re
from hand_tracking_sdk.constants import LANDMARK_COUNT, LANDMARK_VALUE_COUNT, WRIST_VALUE_COUNT
from hand_tracking_sdk.exceptions import ParseError
from hand_tracking_sdk.models import (
HandLandmarks,
HandSide,
HeadPose,
HeadPosePacket,
LandmarksPacket,
PacketDebugInfo,
PacketType,
ParsedPacket,
WristPacket,
WristPose,
)
[docs]
def parse_line(line: str) -> ParsedPacket:
"""Parse one HTS CSV line into a typed packet object.
The input line must use one of the supported labels:
``Left wrist:``, ``Right wrist:``, ``Left landmarks:``, or
``Right landmarks:``, or ``Head pose:``.
:param line:
Raw UTF-8 decoded line from HTS transport.
:returns:
A parsed packet instance for wrist or landmark data.
:rtype:
ParsedPacket
:raises ParseError:
If the line is empty, malformed, has unsupported labels, includes
non-float values, or does not match expected value counts.
"""
stripped = line.strip()
if not stripped:
raise ParseError("Empty line.")
head, sep, tail = stripped.partition(":")
if not sep:
raise ParseError("Missing ':' separator.")
label, debug_info = _parse_label_and_debug(head.strip())
payload = _parse_floats(tail)
side, kind = _parse_label(label)
if kind == PacketType.WRIST:
return _parse_wrist(side=side, values=payload, debug=debug_info)
if kind == PacketType.POSE:
return _parse_pose(side=side, values=payload, debug=debug_info)
return _parse_landmarks(side=side, values=payload, debug=debug_info)
def _parse_label_and_debug(label_with_meta: str) -> tuple[str, PacketDebugInfo | None]:
"""Parse optional debug metadata from the label segment."""
parts = [part.strip() for part in label_with_meta.split("|")]
if not parts or not parts[0]:
raise ParseError("Invalid label.")
label = parts[0]
if len(parts) == 1:
return label, None
source_frame_seq: int | None = None
source_ts_ns: int | None = None
saw_kv = False
for raw_part in parts[1:]:
if not raw_part:
continue
match = re.match(r"^([A-Za-z]+)\s*=\s*(.+)$", raw_part)
if not match:
continue
saw_kv = True
key, value_raw = match.group(1).lower(), match.group(2).strip()
try:
value = int(value_raw)
except ValueError as exc:
raise ParseError(f"Invalid debug metadata value: {raw_part!r}") from exc
if key in {"f", "frame", "frame_id"}:
source_frame_seq = value
elif key in {"t", "ts", "timestamp"}:
source_ts_ns = value
if not saw_kv:
return label, None
return label, PacketDebugInfo(source_frame_seq=source_frame_seq, source_ts_ns=source_ts_ns)
def _parse_label(label: str) -> tuple[HandSide, PacketType]:
"""Parse packet label into hand side and packet type.
:param label:
Label segment before ``:`` (for example ``"Right wrist"``).
:returns:
Parsed hand side and packet type tuple.
:raises ParseError:
If label format, side, or packet type is unsupported.
"""
parts = label.split()
if len(parts) != 2:
raise ParseError(f"Invalid label: {label!r}")
side_raw, kind_raw = parts
try:
side = HandSide(side_raw)
except ValueError as exc:
raise ParseError(f"Unsupported hand side: {side_raw!r}") from exc
normalized_kind = kind_raw.lower()
if normalized_kind == PacketType.WRIST.value:
return side, PacketType.WRIST
if normalized_kind == PacketType.LANDMARKS.value:
return side, PacketType.LANDMARKS
if normalized_kind == PacketType.POSE.value:
return side, PacketType.POSE
raise ParseError(f"Unsupported packet type: {kind_raw!r}")
def _parse_floats(payload: str) -> list[float]:
"""Parse comma-separated numeric payload into floats.
:param payload:
CSV payload segment after ``:``.
:returns:
Parsed float list with empty chunks removed.
:raises ParseError:
If any value cannot be parsed as ``float``.
"""
chunks = [chunk.strip() for chunk in payload.split(",") if chunk.strip()]
try:
return [float(value) for value in chunks]
except ValueError as exc:
raise ParseError("Payload contains non-float values.") from exc
def _parse_wrist(
side: HandSide,
values: list[float],
debug: PacketDebugInfo | None,
) -> WristPacket:
"""Validate and map wrist values into a typed packet.
:param side:
Hand side of the packet.
:param values:
Parsed float values expected to contain exactly 7 elements.
:returns:
Typed wrist packet.
:raises ParseError:
If value count does not match the wrist contract.
"""
if len(values) != WRIST_VALUE_COUNT:
raise ParseError(f"Wrist packet must contain {WRIST_VALUE_COUNT} values, got {len(values)}")
pose = WristPose(*values)
return WristPacket(side=side, kind=PacketType.WRIST, data=pose, debug=debug)
def _parse_pose(
side: HandSide,
values: list[float],
debug: PacketDebugInfo | None,
) -> HeadPosePacket:
"""Validate and map head pose values into a typed packet."""
if len(values) != WRIST_VALUE_COUNT:
raise ParseError(f"Pose packet must contain {WRIST_VALUE_COUNT} values, got {len(values)}")
pose = HeadPose(*values)
return HeadPosePacket(side=side, kind=PacketType.POSE, data=pose, debug=debug)
def _parse_landmarks(
side: HandSide,
values: list[float],
debug: PacketDebugInfo | None,
) -> LandmarksPacket:
"""Validate and map landmark values into a typed packet.
:param side:
Hand side of the packet.
:param values:
Parsed float values expected to contain exactly 63 elements.
:returns:
Typed landmark packet with 21 ``(x, y, z)`` points.
:raises ParseError:
If value count does not match the landmarks contract.
"""
if len(values) != LANDMARK_VALUE_COUNT:
raise ParseError(
f"Landmarks packet must contain {LANDMARK_VALUE_COUNT} values, got {len(values)}"
)
points = tuple(
(values[i], values[i + 1], values[i + 2]) for i in range(0, LANDMARK_COUNT * 3, 3)
)
return LandmarksPacket(
side=side,
kind=PacketType.LANDMARKS,
data=HandLandmarks(points=points),
debug=debug,
)