Source code for glyphx.streaming
"""
GlyphX Streaming / Real-time series.
Accepts a Python generator or callable that yields new values,
and re-renders the chart in place — in Jupyter via IPython display,
or in any environment by returning the latest SVG string.
No server, no Dash, no external process.
from glyphx import Figure
from glyphx.streaming import StreamingSeries
import random, time
fig = Figure(title="Live Sensor", auto_display=False)
stream = StreamingSeries(max_points=60, color="#1f77b4", label="Sensor A")
fig.add(stream)
# Jupyter: re-renders each iteration
with stream.live(fig, fps=10) as s:
for _ in range(200):
s.push(random.gauss(50, 5))
# Manual control
stream.push(42.0)
svg = fig.render_svg()
"""
from __future__ import annotations
import time
from collections import deque
from typing import Iterator
import numpy as np
from .series import BaseSeries
from .utils import svg_escape
[docs]
class StreamingSeries(BaseSeries):
"""
Sliding-window line series for real-time / streaming data.
Args:
max_points: Maximum number of data points kept in the window.
color: Line color.
label: Legend label.
line_width: Stroke width.
show_points: Draw a dot at each data point.
"""
def __init__(
self,
max_points: int = 100,
color: str | None = None,
label: str | None = None,
line_width: float = 2.0,
show_points: bool = False,
) -> None:
self._buffer: deque[float] = deque(maxlen=max_points)
self.max_points = max_points
self.line_width = line_width
self.show_points = show_points
self._tick = 0
super().__init__(x=[], y=[], color=color or "#1f77b4", label=label)
# ── Data push ──────────────────────────────────────────────────────────
[docs]
def push(self, value: float) -> StreamingSeries:
"""
Append a new value to the stream.
Updates ``self.x`` and ``self.y`` so the parent Figure re-renders
correctly on the next ``fig.render_svg()`` call.
Returns ``self`` for chaining::
stream.push(42.0).push(43.5)
Args:
value: New data value.
"""
self._buffer.append(float(value))
self._tick += 1
self.x = list(range(
self._tick - len(self._buffer),
self._tick,
))
self.y = list(self._buffer)
# Clear any cached categorical mapping
for attr in ("_numeric_x", "_x_categories"):
if hasattr(self, attr):
delattr(self, attr)
return self
[docs]
def push_many(self, values: list[float] | np.ndarray) -> StreamingSeries:
"""Push multiple values at once. Returns ``self``."""
for v in values:
self.push(float(v))
return self
[docs]
def reset(self) -> StreamingSeries:
"""Clear the buffer and reset the tick counter. Returns ``self``."""
self._buffer.clear()
self._tick = 0
self.x = []
self.y = []
return self
# ── SVG rendering ─────────────────────────────────────────────────────
[docs]
def to_svg(self, ax: object, use_y2: bool = False) -> str:
if not self.x or not self.y:
return ""
scale_y = ax.scale_y2 if use_y2 else ax.scale_y # type: ignore[union-attr]
elements: list[str] = []
points = " ".join(
f"{ax.scale_x(x):.1f},{scale_y(y):.1f}" # type: ignore[union-attr]
for x, y in zip(self.x, self.y)
)
elements.append(
f'<polyline class="{self.css_class}" fill="none" '
f'stroke="{self.color}" stroke-width="{self.line_width}" '
f'points="{points}"/>'
)
if self.show_points:
for x, y in zip(self.x, self.y):
elements.append(
f'<circle class="glyphx-point {self.css_class}" '
f'cx="{ax.scale_x(x):.1f}" cy="{scale_y(y):.1f}" ' # type: ignore[union-attr]
f'r="3" fill="{self.color}" '
f'data-x="{x}" data-y="{y:.3g}" '
f'data-label="{svg_escape(self.label or "")}"/>'
)
return "\n".join(elements)
# ── Live display context manager ───────────────────────────────────────
[docs]
def live(self, fig: object, fps: float = 10.0) -> _LiveContext:
"""
Context manager for live display in Jupyter.
Usage::
with stream.live(fig, fps=10) as s:
for value in sensor_generator():
s.push(value)
Args:
fig: The :class:`~glyphx.Figure` containing this series.
fps: Target frames per second (throttles re-renders).
Returns:
Context manager that yields ``self``.
"""
return _LiveContext(self, fig, fps)
class _LiveContext:
"""Internal context manager for ``StreamingSeries.live()``."""
def __init__(self, stream: StreamingSeries, fig: object, fps: float) -> None:
self._stream = stream
self._fig = fig
self._interval = 1.0 / fps
self._last_draw = 0.0
def __enter__(self) -> "_LiveContext":
return self
def push(self, value: float) -> None:
"""Push a value and re-render if enough time has elapsed."""
self._stream.push(value)
now = time.monotonic()
if now - self._last_draw >= self._interval:
self._render()
self._last_draw = now
def _render(self) -> None:
try:
from IPython.display import clear_output, display, SVG
clear_output(wait=True)
display(SVG(self._fig.render_svg())) # type: ignore[union-attr]
except Exception:
pass
def __exit__(self, *_: object) -> None:
self._render()