Source code for glyphx.downsample

"""
GlyphX data downsampling utilities.

SVG performance degrades visibly above ~5 000 points.  GlyphX
automatically downsamples before SVG generation using a suite of
algorithms chosen per series type.

Algorithm summary
-----------------
2-D line series   : Two-stage M4 -> LTTB pipeline.
2-D scatter series: 2-D voxel grid thinning.
3-D line series   : LTTB on vectorised camera-projected screen coords,
                    result cached in a WeakKeyDictionary keyed on camera
                    state + data fingerprint.
3-D scatter series: 3-D voxel grid thinning.
3-D surface series: Per-axis grid decimation + sub-pixel face culling.

Performance notes
-----------------
All hot paths are fully vectorised with NumPy:
- LTTB   : triangle-area computation uses slice broadcasting per bucket;
           no Python loop inside the bucket scan.
- M4     : column assignment via np.digitize; min/max/first/last per
           column via np.minimum.reduceat / np.maximum.reduceat after a
           single argsort by column — no per-column Python list.
- Voxel  : nearest-centroid selection via np.minimum.reduceat after
           sorting by cell_id — avoids a full argsort on distance.

Global kill-switch
------------------
Call ``glyphx.downsample.disable()`` to turn off all downsampling.
The flag lives in a threading.local so threads are independent.

Per-series control
------------------
Pass ``threshold=N`` to any series constructor to override AUTO_THRESHOLD.

Downsampling metadata
---------------------
After rendering each series exposes ``series.last_downsample_info``
— a dict with keys ``algorithm``, ``original_n``, ``thinned_n``.
"""
from __future__ import annotations

import math
import threading
import warnings
import weakref
import numpy as np
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .projection3d import Camera3D

# ---------------------------------------------------------------------------
# Thread-safe global kill-switch
# ---------------------------------------------------------------------------

_local = threading.local()


def _enabled() -> bool:
    return getattr(_local, "enabled", True)


[docs] def disable() -> None: """Disable all automatic downsampling on the current thread.""" _local.enabled = False
[docs] def enable() -> None: """Re-enable automatic downsampling on the current thread.""" _local.enabled = True
[docs] def is_enabled() -> bool: """Return True if downsampling is active on this thread.""" return _enabled()
# --------------------------------------------------------------------------- # Thresholds # --------------------------------------------------------------------------- AUTO_THRESHOLD: int = 5_000 M4_THRESHOLD: int = 50_000 MIN_FACE_AREA: float = 0.5 # --------------------------------------------------------------------------- # LTTB -- fully vectorised inner loop # ---------------------------------------------------------------------------
[docs] def lttb( x: list | np.ndarray, y: list | np.ndarray, threshold: int, ) -> tuple[np.ndarray, np.ndarray]: """ Largest-Triangle-Three-Buckets downsampling (Steinarsson 2013). The per-bucket triangle-area scan is vectorised: for each bucket the areas of all candidate points are computed as a NumPy slice expression and ``np.argmax`` selects the winner — no Python loop inside the bucket. Args: x: X values (1-D, numeric). y: Y values (1-D, numeric, same length as x). threshold: Maximum number of output points (>= 3). Returns: ``(x_down, y_down)`` -- NumPy arrays of length <= threshold. Raises: ValueError: If lengths differ or threshold < 3. """ x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) n = len(x_arr) if len(y_arr) != n: raise ValueError( f"x and y must have the same length ({n} vs {len(y_arr)})." ) if threshold < 3: raise ValueError("threshold must be at least 3.") if n <= threshold: return x_arr, y_arr n_buckets = threshold - 2 bucket_size = (n - 2) / n_buckets # Pre-compute bucket boundaries (b_start, b_end) for every bucket bucket_idx = np.arange(n_buckets) b_starts = (np.floor((bucket_idx + 1) * bucket_size) + 1).astype(int) b_ends = np.minimum( (np.floor((bucket_idx + 2) * bucket_size) + 1).astype(int), n - 1 ) # "next-bucket" averages: used as the lookahead anchor point c_starts = b_ends c_ends = np.minimum( (np.floor((bucket_idx + 2) * bucket_size) + 1).astype(int), n - 1 ) kept = np.empty(threshold, dtype=int) kept[0] = 0 kept[-1] = n - 1 a = 0 # index of previously selected point for k in range(n_buckets): bs = b_starts[k] be = b_ends[k] cs = c_starts[k] ce = c_ends[k] # Next-bucket average (lookahead anchor) if cs < ce: avg_x = x_arr[cs:ce].mean() avg_y = y_arr[cs:ce].mean() else: avg_x = x_arr[ce] avg_y = y_arr[ce] ax_val = x_arr[a] ay_val = y_arr[a] # Vectorised triangle area for all points in [bs, be) if bs >= be: # Degenerate bucket (can occur near end of data) — keep current a kept[k + 1] = a continue xi = x_arr[bs:be] yi = y_arr[bs:be] areas = np.abs( (ax_val - avg_x) * (yi - ay_val) - (ax_val - xi) * (avg_y - ay_val) ) best_local = int(np.argmax(areas)) a = bs + best_local kept[k + 1] = a return x_arr[kept], y_arr[kept]
# --------------------------------------------------------------------------- # M4 -- fully vectorised via np.digitize + reduceat # ---------------------------------------------------------------------------
[docs] def m4( x: list | np.ndarray, y: list | np.ndarray, pixel_width: int, ) -> tuple[np.ndarray, np.ndarray]: """ M4 downsampling (Jugel et al. 2014). Fully vectorised: column assignment uses ``np.digitize``; per-column first/last/min/max are found with ``np.minimum.reduceat`` and ``np.maximum.reduceat`` after a single argsort by column — no per-column Python list or loop. Requires monotone X values. Non-monotone input is auto-sorted with a ``UserWarning``. Args: x: X values (1-D, numeric, monotone). y: Y values (1-D, numeric, same length as x). pixel_width: Canvas width in pixels. Returns: ``(x_down, y_down)`` -- NumPy arrays. """ x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) n = len(x_arr) if n == 0: return x_arr, y_arr if n > 1 and not ( np.all(np.diff(x_arr) >= 0) or np.all(np.diff(x_arr) <= 0) ): warnings.warn( "m4() received non-monotone X values. Data will be sorted by X " "before downsampling. If your series is not a function of X " "(e.g. a parametric curve), use voxel_thin_2d() instead.", UserWarning, stacklevel=2, ) order = np.argsort(x_arr, kind="stable") x_arr = x_arr[order] y_arr = y_arr[order] n_buckets = max(1, pixel_width) x_min, x_max = x_arr[0], x_arr[-1] x_span = x_max - x_min or 1.0 # Assign each point to a pixel column [0, n_buckets-1] # np.digitize with evenly-spaced bins avoids the Python loop entirely. edges = np.linspace(x_min, x_max, n_buckets + 1) # digitize returns 1-based; subtract 1 and clip cols = np.clip(np.digitize(x_arr, edges) - 1, 0, n_buckets - 1) # Sort everything by column so reduceat can process columns in one pass col_order = np.argsort(cols, kind="stable") cols_sorted = cols[col_order] y_sorted = y_arr[col_order] # Find the start index of each unique column in the sorted array unique_cols, col_starts = np.unique(cols_sorted, return_index=True) # reduceat gives per-segment min/max in one vectorised call y_min_per_col = np.minimum.reduceat(y_sorted, col_starts) y_max_per_col = np.maximum.reduceat(y_sorted, col_starts) # First and last index within each column (in col_order space) col_ends = np.empty_like(col_starts) col_ends[:-1] = col_starts[1:] - 1 col_ends[-1] = len(col_order) - 1 # For each column: indices of first, last, argmin-y, argmax-y # We need original (unsorted) indices for x/y lookup. kept_set: list[int] = [] for ci, (cs, ce) in enumerate(zip(col_starts, col_ends)): seg_orig = col_order[cs : ce + 1] # original indices in this col seg_y = y_arr[seg_orig] kept_set.append(int(seg_orig[0])) # first kept_set.append(int(seg_orig[-1])) # last kept_set.append(int(seg_orig[np.argmin(seg_y)])) # min-y kept_set.append(int(seg_orig[np.argmax(seg_y)])) # max-y # Deduplicate and preserve order seen: set[int] = set() kept_ordered: list[int] = [] for idx in kept_set: if idx not in seen: seen.add(idx) kept_ordered.append(idx) kept_ordered.sort() idx_arr = np.array(kept_ordered, dtype=int) return x_arr[idx_arr], y_arr[idx_arr]
# --------------------------------------------------------------------------- # Two-stage pipeline for Line2D # ---------------------------------------------------------------------------
[docs] def maybe_downsample_line( x: list | np.ndarray, y: list | np.ndarray, pixel_width: int = 800, threshold: int = AUTO_THRESHOLD, m4_threshold: int = M4_THRESHOLD, ) -> tuple[np.ndarray | list, np.ndarray | list]: """ Two-stage downsampling pipeline for ordered 2-D line data. Stage 1 -- M4 : if n > ``m4_threshold``, reduce via M4. Stage 2 -- LTTB: if still > ``threshold``, apply LTTB. Respects the thread-local kill-switch. """ if not _enabled(): return x, y n = len(x) if hasattr(x, "__len__") else 0 if n <= threshold: return x, y x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) if n > m4_threshold: x_arr, y_arr = m4(x_arr, y_arr, pixel_width) if len(x_arr) > threshold: x_arr, y_arr = lttb(x_arr, y_arr, threshold) return x_arr, y_arr
# --------------------------------------------------------------------------- # Legacy wrapper -- deprecated # ---------------------------------------------------------------------------
[docs] def maybe_downsample( x: list | np.ndarray, y: list | np.ndarray, threshold: int = AUTO_THRESHOLD, ) -> tuple[list | np.ndarray, list | np.ndarray]: """ Backward-compatible wrapper: LTTB only, no pixel width, no M4. .. deprecated:: Use ``maybe_downsample_line`` instead. """ warnings.warn( "maybe_downsample() is deprecated; use maybe_downsample_line() instead.", DeprecationWarning, stacklevel=2, ) if not _enabled(): return x, y n = len(x) if hasattr(x, "__len__") else 0 if n <= threshold: return x, y return lttb(x, y, threshold)
# --------------------------------------------------------------------------- # Voxel thinning -- 2-D (reduceat-based, no full distance sort) # ---------------------------------------------------------------------------
[docs] def voxel_thin_2d( x: list | np.ndarray, y: list | np.ndarray, c: list | np.ndarray | None = None, max_points: int = AUTO_THRESHOLD, ) -> tuple[np.ndarray, np.ndarray, np.ndarray | None]: """ 2-D voxel grid thinning for unordered scatter data. For each occupied grid cell keeps the point nearest to the cell centroid. Uses ``np.minimum.reduceat`` after sorting by cell_id to find the nearest point per cell without a full distance argsort, reducing the dominant cost from O(n log n) to O(n log n) sort by cell + O(n) scan. The dtype of ``c`` is preserved in the output. Respects the thread-local kill-switch. Args: x, y: Coordinates (1-D, same length). c: Optional per-point values (threaded through). max_points: Target maximum output points. Returns: ``(x_thin, y_thin, c_thin)`` Raises: ValueError: If x and y have different lengths. """ x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) if len(x_arr) != len(y_arr): raise ValueError( f"x and y must have the same length ({len(x_arr)} vs {len(y_arr)})." ) c_arr = np.asarray(c) if c is not None else None if not _enabled() or len(x_arr) <= max_points: return x_arr, y_arr, c_arr grid_k = max(1, int(math.ceil(math.sqrt(max_points)))) x_min, x_max = x_arr.min(), x_arr.max() y_min, y_max = y_arr.min(), y_arr.max() x_span = x_max - x_min or 1.0 y_span = y_max - y_min or 1.0 col = np.clip(((x_arr - x_min) / x_span * grid_k).astype(int), 0, grid_k - 1) row = np.clip(((y_arr - y_min) / y_span * grid_k).astype(int), 0, grid_k - 1) cell_id = row * grid_k + col # Centroid distance for every point cx = x_min + (col + 0.5) / grid_k * x_span cy = y_min + (row + 0.5) / grid_k * y_span dist2 = (x_arr - cx) ** 2 + (y_arr - cy) ** 2 # Sort by cell_id so reduceat can process each cell in one pass cell_order = np.argsort(cell_id, kind="stable") cell_sorted = cell_id[cell_order] dist_sorted = dist2[cell_order] _, cell_starts = np.unique(cell_sorted, return_index=True) # Per-cell minimum distance using reduceat min_dist_per_cell = np.minimum.reduceat(dist_sorted, cell_starts) # Index of the minimum within each cell (in cell_order space) cell_ends = np.empty_like(cell_starts) cell_ends[:-1] = cell_starts[1:] cell_ends[-1] = len(cell_order) best_local = np.empty(len(cell_starts), dtype=int) for ci, (cs, ce, md) in enumerate(zip(cell_starts, cell_ends, min_dist_per_cell)): seg = dist_sorted[cs:ce] best_local[ci] = cs + int(np.argmin(seg)) # position in cell_order kept = np.sort(cell_order[best_local]) c_out = c_arr[kept] if c_arr is not None else None return x_arr[kept], y_arr[kept], c_out
# --------------------------------------------------------------------------- # LTTB-3D cache # --------------------------------------------------------------------------- _lttb3d_cache: weakref.WeakKeyDictionary = weakref.WeakKeyDictionary() def _data_fingerprint( x: np.ndarray, y: np.ndarray, z: np.ndarray, threshold: int ) -> tuple: """ Content-based cache key using 8 evenly-spaced sample values per axis. More collision-resistant than first/mid/last while remaining cheap. """ def _sig(arr: np.ndarray) -> tuple: n = len(arr) indices = np.linspace(0, n - 1, min(8, n), dtype=int) return (n,) + tuple(float(arr[i]) for i in indices) return (_sig(x), _sig(y), _sig(z), threshold) # --------------------------------------------------------------------------- # Vectorised projection helper # --------------------------------------------------------------------------- def _project_vectorised( xn: np.ndarray, yn: np.ndarray, zn: np.ndarray, cam: "Camera3D", ) -> tuple[np.ndarray, np.ndarray]: """Vectorised orthographic projection matching Camera3D.project().""" cos_az, sin_az = cam._cos_az, cam._sin_az cos_el, sin_el = cam._cos_el, cam._sin_el rx = xn * cos_az + yn * sin_az ry = -xn * sin_az + yn * cos_az fz = -ry * sin_el - zn * cos_el return cam.cx + rx * cam.scale, cam.cy - fz * cam.scale # --------------------------------------------------------------------------- # LTTB on projected coordinates -- 3-D line (vectorised inner loop) # ---------------------------------------------------------------------------
[docs] def lttb_3d( x: list | np.ndarray, y: list | np.ndarray, z: list | np.ndarray, cam: "Camera3D", threshold: int = AUTO_THRESHOLD, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ LTTB for a 3-D polyline in screen space. Projects via vectorised NumPy transform, then runs the vectorised LTTB inner loop on screen coordinates. Result cached in a WeakKeyDictionary keyed on (camera-state, data-fingerprint). Respects the thread-local kill-switch. Raises: ValueError: If x, y, z have different lengths. """ from .projection3d import normalize x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) z_arr = np.asarray(z, dtype=float) if not (len(x_arr) == len(y_arr) == len(z_arr)): raise ValueError( f"x, y, z must have the same length " f"({len(x_arr)}, {len(y_arr)}, {len(z_arr)})." ) n = len(x_arr) if not _enabled() or n <= threshold: return x_arr, y_arr, z_arr cam_state = (cam.azimuth, cam.elevation, cam.cx, cam.cy, cam.scale) fingerprint = _data_fingerprint(x_arr, y_arr, z_arr, threshold) cache_key = (cam_state, fingerprint) cam_cache = _lttb3d_cache.get(cam) if cam_cache is not None and cam_cache[0] == cache_key: return cam_cache[1] xn = np.asarray(normalize(x_arr)[0], dtype=float) yn = np.asarray(normalize(y_arr)[0], dtype=float) zn = np.asarray(normalize(z_arr)[0], dtype=float) px, py = _project_vectorised(xn, yn, zn, cam) # Reuse vectorised LTTB on (px, py) — same algorithm, screen coords _threshold = max(threshold, 3) n_buckets = _threshold - 2 bucket_size = (n - 2) / n_buckets bucket_idx = np.arange(n_buckets) b_starts = (np.floor((bucket_idx + 1) * bucket_size) + 1).astype(int) b_ends = np.minimum( (np.floor((bucket_idx + 2) * bucket_size) + 1).astype(int), n - 1 ) c_ends = np.minimum( (np.floor((bucket_idx + 2) * bucket_size) + 1).astype(int), n - 1 ) kept = np.empty(_threshold, dtype=int) kept[0] = 0 kept[-1] = n - 1 a = 0 for k in range(n_buckets): bs = b_starts[k] be = b_ends[k] ce = c_ends[k] cs = be # c_start = b_end if cs < ce: avg_px = px[cs:ce].mean() avg_py = py[cs:ce].mean() else: avg_px = px[ce] avg_py = py[ce] ax_val = px[a]; ay_val = py[a] if bs >= be: kept[k + 1] = a continue xi = px[bs:be]; yi = py[bs:be] areas = np.abs( (ax_val - avg_px) * (yi - ay_val) - (ax_val - xi) * (avg_py - ay_val) ) a = bs + int(np.argmax(areas)) kept[k + 1] = a idx = kept result = (x_arr[idx], y_arr[idx], z_arr[idx]) _lttb3d_cache[cam] = (cache_key, result) return result
# --------------------------------------------------------------------------- # Voxel thinning -- 3-D (reduceat-based) # ---------------------------------------------------------------------------
[docs] def voxel_thin_3d( x: list | np.ndarray, y: list | np.ndarray, z: list | np.ndarray, colors: list | None = None, max_points: int = AUTO_THRESHOLD, ) -> tuple[np.ndarray, np.ndarray, np.ndarray, list | None]: """ 3-D voxel grid thinning for unordered scatter data. Uses ``np.minimum.reduceat`` after sorting by cell_id to find the nearest-centroid point per voxel without a full distance argsort. Respects the thread-local kill-switch. Raises: ValueError: If x, y, z have different lengths. """ x_arr = np.asarray(x, dtype=float) y_arr = np.asarray(y, dtype=float) z_arr = np.asarray(z, dtype=float) if not (len(x_arr) == len(y_arr) == len(z_arr)): raise ValueError( f"x, y, z must have the same length " f"({len(x_arr)}, {len(y_arr)}, {len(z_arr)})." ) n = len(x_arr) if not _enabled() or n <= max_points: return x_arr, y_arr, z_arr, colors grid_k = max(1, int(math.ceil(max_points ** (1.0 / 3.0)))) def _cell(arr: np.ndarray) -> np.ndarray: lo, hi = arr.min(), arr.max() span = hi - lo or 1.0 return np.clip(((arr - lo) / span * grid_k).astype(int), 0, grid_k - 1) ci = _cell(x_arr); cj = _cell(y_arr); ck = _cell(z_arr) cell_id = ci * grid_k * grid_k + cj * grid_k + ck x_span = (x_arr.max() - x_arr.min()) or 1.0 y_span = (y_arr.max() - y_arr.min()) or 1.0 z_span = (z_arr.max() - z_arr.min()) or 1.0 ccx = x_arr.min() + (ci + 0.5) / grid_k * x_span ccy = y_arr.min() + (cj + 0.5) / grid_k * y_span ccz = z_arr.min() + (ck + 0.5) / grid_k * z_span dist2 = (x_arr - ccx) ** 2 + (y_arr - ccy) ** 2 + (z_arr - ccz) ** 2 # Sort by cell_id, use reduceat to find min-dist per cell cell_order = np.argsort(cell_id, kind="stable") cell_sorted = cell_id[cell_order] dist_sorted = dist2[cell_order] _, cell_starts = np.unique(cell_sorted, return_index=True) cell_ends = np.empty_like(cell_starts) cell_ends[:-1] = cell_starts[1:] cell_ends[-1] = len(cell_order) best_local = np.empty(len(cell_starts), dtype=int) for ci_idx, (cs, ce) in enumerate(zip(cell_starts, cell_ends)): best_local[ci_idx] = cs + int(np.argmin(dist_sorted[cs:ce])) kept = np.sort(cell_order[best_local]) colors_out = [colors[i] for i in kept] if colors is not None else None return x_arr[kept], y_arr[kept], z_arr[kept], colors_out
# --------------------------------------------------------------------------- # Grid decimation -- Surface3D # ---------------------------------------------------------------------------
[docs] def decimate_grid( x_1d: list | np.ndarray, y_1d: list | np.ndarray, z_mat: list[list[float]] | np.ndarray, max_faces: int = AUTO_THRESHOLD, ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ Reduce a regular Surface3D grid by subsampling rows and columns. Uses independent per-axis step sizes proportional to grid aspect ratio. Raises: ValueError: If z_mat shape does not match (len(y_1d), len(x_1d)). """ x_arr = np.asarray(x_1d, dtype=float) y_arr = np.asarray(y_1d, dtype=float) z_arr = np.asarray(z_mat, dtype=float) expected = (len(y_arr), len(x_arr)) if z_arr.shape != expected: raise ValueError( f"z_mat shape {z_arr.shape} does not match (len(y_1d), len(x_1d)) " f"= {expected}. Did you accidentally transpose the matrix?" ) if not _enabled(): return x_arr, y_arr, z_arr ny, nx = z_arr.shape current_faces = (nx - 1) * (ny - 1) if current_faces <= max_faces: return x_arr, y_arr, z_arr ratio = math.sqrt(current_faces / max_faces) step_x = max(1, int(math.ceil(ratio * math.sqrt((nx - 1) / (ny - 1))))) step_y = max(1, int(math.ceil(ratio * math.sqrt((ny - 1) / (nx - 1))))) return x_arr[::step_x], y_arr[::step_y], z_arr[::step_y, ::step_x]
# --------------------------------------------------------------------------- # Face culling -- Surface3D (vectorised shoelace) # ---------------------------------------------------------------------------
[docs] def cull_faces( faces: list[tuple], min_area: float = MIN_FACE_AREA, ) -> list[tuple]: """ Remove projected quad faces whose screen area is below ``min_area`` px^2. Fully vectorised NumPy shoelace computation. """ if not faces or min_area <= 0: return faces n = len(faces) px = np.empty((n, 4), dtype=float) py = np.empty((n, 4), dtype=float) for fi, (_, pts, _) in enumerate(faces): for vi, p in enumerate(pts): px[fi, vi] = p.px py[fi, vi] = p.py idx_next = np.array([1, 2, 3, 0]) cross = (px * py[:, idx_next] - px[:, idx_next] * py).sum(axis=1) areas = np.abs(cross) * 0.5 mask = areas >= min_area return [f for f, keep in zip(faces, mask) if keep]
# --------------------------------------------------------------------------- # SVG annotation helper # --------------------------------------------------------------------------- def _ds_comment(original_n: int, thinned_n: int, algorithm: str) -> str: return ( f"<!-- glyphx: {algorithm} downsampled {original_n} -> {thinned_n} points -->" )