"""High-level streaming client API for HTS telemetry."""
from __future__ import annotations
from collections.abc import Callable, Iterator
from dataclasses import dataclass
from typing import Protocol
from hand_tracking_sdk._compat import StrEnum
from hand_tracking_sdk.exceptions import (
ClientCallbackError,
ClientConfigurationError,
ParseError,
)
from hand_tracking_sdk.frame import HandFrame, HandFrameAssembler, HeadFrame
from hand_tracking_sdk.models import HandSide, ParsedPacket
from hand_tracking_sdk.parser import parse_line
from hand_tracking_sdk.transport import (
TCPClientConfig,
TCPClientLineReceiver,
TCPServerConfig,
TCPServerLineReceiver,
UDPLineReceiver,
UDPReceiverConfig,
)
[docs]
class TransportMode(StrEnum):
"""Transport mode used by :class:`HTSClient`."""
UDP = "udp"
TCP_SERVER = "tcp_server"
TCP_CLIENT = "tcp_client"
[docs]
class StreamOutput(StrEnum):
"""Output type emitted by :meth:`HTSClient.iter_events`."""
PACKETS = "packets"
FRAMES = "frames"
BOTH = "both"
[docs]
class HandFilter(StrEnum):
"""Hand-side filter applied before packet/frame emission."""
BOTH = "both"
LEFT = "left"
RIGHT = "right"
[docs]
class ErrorPolicy(StrEnum):
"""Error policy applied to parse failures."""
STRICT = "strict"
TOLERANT = "tolerant"
[docs]
class LogEventKind(StrEnum):
"""Structured log event kinds emitted by :class:`HTSClient`."""
RECEIVED_LINE = "received_line"
PARSE_ERROR = "parse_error"
FILTERED_PACKET = "filtered_packet"
EMITTED_PACKET = "emitted_packet"
EMITTED_FRAME = "emitted_frame"
CALLBACK_ERROR = "callback_error"
[docs]
@dataclass(frozen=True, slots=True)
class StreamLogEvent:
"""Structured client log event for observability hooks.
:param kind:
Event kind discriminator.
:param message:
Human-readable event message.
:param side:
Optional hand side associated with the event.
:param line:
Optional raw input line associated with the event.
:param exception:
Optional exception associated with the event.
"""
kind: LogEventKind
message: str
side: HandSide | None = None
line: str | None = None
exception: Exception | None = None
[docs]
@dataclass(frozen=True, slots=True)
class ClientStats:
"""Observable counters for :class:`HTSClient` runtime behavior."""
lines_received: int = 0
parse_errors: int = 0
dropped_lines: int = 0
packets_filtered: int = 0
packets_emitted: int = 0
frames_emitted: int = 0
callbacks_invoked: int = 0
callback_errors: int = 0
[docs]
@dataclass(frozen=True, slots=True)
class HTSClientConfig:
"""Configuration for high-level HTS streaming client.
:param transport_mode:
Network transport mode.
:param host:
Host address used for bind/connect according to transport mode.
:param port:
Port used for bind/connect according to transport mode.
:param timeout_s:
I/O timeout in seconds for receive operations. In ``tcp_server`` mode,
initial connection wait uses ``max(timeout_s, 5.0)`` to avoid premature
startup timeouts while waiting for a device to connect.
:param reconnect_delay_s:
Delay used by TCP client reconnect loop.
:param output:
Event output mode (`packets`, `frames`, or `both`).
:param hand_filter:
Hand-side filter for emitted events.
:param error_policy:
Parse error handling strategy.
:param include_wall_time:
Whether assembled frames include `recv_time_unix_ns` by default.
:param log_hook:
Optional structured log callback invoked for client lifecycle events.
"""
transport_mode: TransportMode = TransportMode.UDP
host: str = "0.0.0.0"
port: int = 9000
timeout_s: float = 1.0
reconnect_delay_s: float = 0.25
output: StreamOutput = StreamOutput.FRAMES
hand_filter: HandFilter = HandFilter.BOTH
error_policy: ErrorPolicy = ErrorPolicy.STRICT
include_wall_time: bool = True
log_hook: Callable[[StreamLogEvent], None] | None = None
def __post_init__(self) -> None:
"""Validate configuration constraints.
:raises ClientConfigurationError:
If one or more fields are invalid for runtime operation.
"""
if not self.host:
raise ClientConfigurationError("host must not be empty.")
if self.port < 0 or self.port > 65535:
raise ClientConfigurationError("port must be in range [0, 65535].")
if self.timeout_s <= 0:
raise ClientConfigurationError("timeout_s must be greater than 0.")
if self.reconnect_delay_s < 0:
raise ClientConfigurationError("reconnect_delay_s must be non-negative.")
class _LineReceiver(Protocol):
"""Protocol for line-oriented transport receivers used by :class:`HTSClient`."""
def __enter__(self) -> _LineReceiver: ...
def __exit__(self, *_: object) -> None: ...
def iter_lines(self) -> Iterator[str]: ...
StreamEvent = ParsedPacket | HandFrame | HeadFrame
"""Public event type emitted by :class:`HTSClient` streaming methods."""
[docs]
class HTSClient:
"""High-level client for streaming parsed packets and assembled frames."""
def __init__(
self,
config: HTSClientConfig,
*,
receiver_factory: Callable[[HTSClientConfig], _LineReceiver] | None = None,
) -> None:
"""Create a streaming client.
:param config:
Client configuration.
:param receiver_factory:
Optional factory for line receiver dependency injection.
"""
self._config = config
self._receiver_factory = receiver_factory
self._frame_assembler = HandFrameAssembler(
include_wall_time=config.include_wall_time,
include_head_frames=config.output in (StreamOutput.FRAMES, StreamOutput.BOTH),
)
self._stats = ClientStats()
[docs]
def iter_events(self) -> Iterator[StreamEvent]:
"""Iterate streaming events from configured transport.
:returns:
Iterator yielding packet and/or frame events per configured output mode.
:raises ParseError:
When ``error_policy=strict`` and an incoming line cannot be parsed.
"""
receiver = self._make_receiver()
with receiver:
for line in receiver.iter_lines():
self._stats = self._stats_with(lines_received=self._stats.lines_received + 1)
self._emit_log(
StreamLogEvent(
kind=LogEventKind.RECEIVED_LINE,
message="Received input line.",
line=line,
)
)
packet = self._parse_with_policy(line)
if packet is None:
continue
if not self._matches_hand_filter(packet.side):
self._stats = self._stats_with(
packets_filtered=self._stats.packets_filtered + 1,
dropped_lines=self._stats.dropped_lines + 1,
)
self._emit_log(
StreamLogEvent(
kind=LogEventKind.FILTERED_PACKET,
message="Packet dropped due to hand filter.",
side=packet.side,
)
)
continue
if self._config.output in (StreamOutput.PACKETS, StreamOutput.BOTH):
self._stats = self._stats_with(packets_emitted=self._stats.packets_emitted + 1)
self._emit_log(
StreamLogEvent(
kind=LogEventKind.EMITTED_PACKET,
message="Emitted packet event.",
side=packet.side,
)
)
yield packet
if self._config.output in (StreamOutput.FRAMES, StreamOutput.BOTH):
frame = self._frame_assembler.push_packet(packet)
if frame is not None:
self._stats = self._stats_with(
frames_emitted=self._stats.frames_emitted + 1
)
self._emit_log(
StreamLogEvent(
kind=LogEventKind.EMITTED_FRAME,
message="Emitted frame event.",
side=frame.side,
)
)
yield frame
[docs]
def run(
self,
callback: Callable[[StreamEvent], None],
*,
max_events: int | None = None,
wrap_callback_exceptions: bool = False,
) -> int:
"""Run stream loop and invoke callback for each emitted event.
:param callback:
Function called for each emitted stream event.
:param max_events:
Optional cap on processed events; useful for controlled execution.
:param wrap_callback_exceptions:
If ``True``, callback exceptions are re-raised as
:class:`ClientCallbackError`.
:returns:
Number of callback invocations performed.
:raises ParseError:
When ``error_policy=strict`` and parsing fails.
:raises ClientCallbackError:
When callback raises and wrapping is enabled.
"""
processed = 0
for event in self.iter_events():
try:
callback(event)
except Exception as exc:
self._stats = self._stats_with(callback_errors=self._stats.callback_errors + 1)
self._emit_log(
StreamLogEvent(
kind=LogEventKind.CALLBACK_ERROR,
message="Callback raised an exception.",
exception=exc,
)
)
if wrap_callback_exceptions:
raise ClientCallbackError("Callback failed during stream processing.") from exc
raise
processed += 1
self._stats = self._stats_with(callbacks_invoked=self._stats.callbacks_invoked + 1)
if max_events is not None and processed >= max_events:
return processed
return processed
[docs]
def get_stats(self) -> ClientStats:
"""Return a snapshot of current client counters."""
return self._stats
[docs]
def reset_stats(self) -> None:
"""Reset client counters to zero values."""
self._stats = ClientStats()
def _make_receiver(self) -> _LineReceiver:
"""Create a line receiver according to configured transport mode."""
if self._receiver_factory is not None:
return self._receiver_factory(self._config)
if self._config.transport_mode == TransportMode.UDP:
return UDPLineReceiver(
UDPReceiverConfig(
host=self._config.host,
port=self._config.port,
timeout_s=self._config.timeout_s,
)
)
if self._config.transport_mode == TransportMode.TCP_SERVER:
return TCPServerLineReceiver(
TCPServerConfig(
host=self._config.host,
port=self._config.port,
accept_timeout_s=max(self._config.timeout_s, 5.0),
read_timeout_s=self._config.timeout_s,
)
)
return TCPClientLineReceiver(
TCPClientConfig(
host=self._config.host,
port=self._config.port,
connect_timeout_s=self._config.timeout_s,
read_timeout_s=self._config.timeout_s,
reconnect_delay_s=self._config.reconnect_delay_s,
)
)
def _parse_with_policy(self, line: str) -> ParsedPacket | None:
"""Parse one line according to configured strict/tolerant policy."""
try:
return parse_line(line)
except ParseError as exc:
self._stats = self._stats_with(
parse_errors=self._stats.parse_errors + 1,
dropped_lines=self._stats.dropped_lines + 1,
)
self._emit_log(
StreamLogEvent(
kind=LogEventKind.PARSE_ERROR,
message="Failed to parse input line.",
line=line,
exception=exc,
)
)
if self._config.error_policy == ErrorPolicy.STRICT:
raise
return None
def _matches_hand_filter(self, side: HandSide) -> bool:
"""Return whether a packet/frame side passes the configured hand filter."""
if self._config.hand_filter == HandFilter.BOTH:
return True
if self._config.hand_filter == HandFilter.LEFT:
return side == HandSide.LEFT
return side == HandSide.RIGHT
def _stats_with(self, **changes: int) -> ClientStats:
"""Return updated stats snapshot with selected counter changes."""
return ClientStats(
lines_received=changes.get("lines_received", self._stats.lines_received),
parse_errors=changes.get("parse_errors", self._stats.parse_errors),
dropped_lines=changes.get("dropped_lines", self._stats.dropped_lines),
packets_filtered=changes.get("packets_filtered", self._stats.packets_filtered),
packets_emitted=changes.get("packets_emitted", self._stats.packets_emitted),
frames_emitted=changes.get("frames_emitted", self._stats.frames_emitted),
callbacks_invoked=changes.get("callbacks_invoked", self._stats.callbacks_invoked),
callback_errors=changes.get("callback_errors", self._stats.callback_errors),
)
def _emit_log(self, event: StreamLogEvent) -> None:
"""Emit one structured log event if a hook is configured."""
if self._config.log_hook is not None:
self._config.log_hook(event)