""" Shared helper functions for tono nodes. """ from __future__ import annotations import json import re from functools import lru_cache from pathlib import Path from typing import Callable import numpy as np from backend.runtime_paths import demo_dir, input_dir, output_dir # --------------------------------------------------------------------------- # Scalar payload helpers (from display.py) # --------------------------------------------------------------------------- _SI_PREFIXES: dict[str, float] = { 'Y': 1e24, 'Z': 1e21, 'E': 1e18, 'P': 1e15, 'T': 1e12, 'G': 1e9, 'M': 1e6, 'k': 1e3, 'm': 1e-3, 'u': 1e-6, 'µ': 1e-6, 'n': 1e-9, 'p': 1e-12, 'f': 1e-15, 'a': 1e-18, 'z': 1e-21, 'y': 1e-24, } _PREFIXABLE_UNITS: frozenset[str] = frozenset({ 'm', 's', 'A', 'V', 'W', 'Hz', 'F', 'C', 'J', 'N', 'Pa', 'T', 'H', 'S', 'g', 'K', 'Ohm', 'ohm', 'Ω', }) _NUMBER_RE = re.compile( r'^([+-]?(?:\d+\.?\d*|\.\d+)(?:[eE][+-]?\d+)?)\s*(.*)?$' ) def parse_number_with_unit(text: str) -> tuple[float, str]: """Parse a string like '1.5 nm' into (1.5e-9, 'm'). The numeric part may use scientific notation. The unit is stripped of any recognised SI prefix and the raw value is scaled accordingly, so the returned float is always in the base SI unit. Units that are not prefixable are returned unchanged alongside the unscaled value. Examples:: parse_number_with_unit("1 um") → (1e-6, "m") parse_number_with_unit("500 nm") → (5e-7, "m") parse_number_with_unit("3.14") → (3.14, "") parse_number_with_unit("2 kHz") → (2000.0, "Hz") """ text = text.strip() if not text: return 0.0, "" m = _NUMBER_RE.match(text) if not m: raise ValueError(f"Cannot parse number: {text!r}") numeric = float(m.group(1)) unit_str = (m.group(2) or "").strip() if not unit_str: return numeric, "" # Try prefix + base-unit split (handle multi-byte µ as a prefix) if len(unit_str) >= 2: prefix_char = unit_str[0] rest = unit_str[1:] if prefix_char in _SI_PREFIXES and rest in _PREFIXABLE_UNITS: return numeric * _SI_PREFIXES[prefix_char], rest return numeric, unit_str def _scalar_payload(value: float, unit: str = "") -> dict: payload = {"value": float(value)} if isinstance(unit, str) and unit.strip(): payload["unit"] = unit.strip() return payload # --------------------------------------------------------------------------- # Measurement helpers (from display.py — used by ValueIO) # --------------------------------------------------------------------------- def _measurement_names(table: list) -> list[str]: names = [] for row in table: if not isinstance(row, dict): continue quantity = row.get("quantity") if isinstance(quantity, str) and quantity and quantity not in names: names.append(quantity) return names def _measurement_entry(table: list, selection: str) -> dict: names = _measurement_names(table) if not names: raise ValueError("Measurement table has no selectable rows.") target = selection if selection in names else names[0] for row in table: if isinstance(row, dict) and row.get("quantity") == target: return row raise ValueError(f"Measurement '{target}' was not found.") def _measurement_value(table: list, selection: str) -> float: row = _measurement_entry(table, selection) value = row.get("value") if isinstance(value, bool): raise ValueError(f"Measurement '{row.get('quantity', selection)}' does not have a numeric value.") try: numeric = float(value) except (TypeError, ValueError) as exc: raise ValueError(f"Measurement '{row.get('quantity', selection)}' does not have a numeric value.") from exc if np.isfinite(numeric): return numeric raise ValueError(f"Measurement '{row.get('quantity', selection)}' does not have a numeric value.") # --------------------------------------------------------------------------- # SI formatting helpers (from display.py — used by Annotations) # --------------------------------------------------------------------------- _SI_PREFIXES = [ (1e24, "Y"), (1e21, "Z"), (1e18, "E"), (1e15, "P"), (1e12, "T"), (1e9, "G"), (1e6, "M"), (1e3, "k"), (1.0, ""), (1e-3, "m"), (1e-6, "u"), (1e-9, "n"), (1e-12, "p"), (1e-15, "f"), (1e-18, "a"), (1e-21, "z"), (1e-24, "y"), ] _PREFIXABLE_UNITS = {"m", "s", "A", "V", "W", "Hz", "F", "C", "J", "N", "Pa", "T", "H", "S", "g", "K", "Ohm", "ohm", "\u03a9"} def _format_numeric(value: float) -> str: if not np.isfinite(value): return str(value) abs_value = abs(value) if abs_value == 0: return "0" if abs_value >= 1e4 or abs_value < 1e-3: return f"{value:.3e}" return f"{value:.4g}" def _format_with_unit(value: float, unit: str) -> str: unit = (unit or "").strip() if not unit: return _format_numeric(value) if unit in _PREFIXABLE_UNITS and np.isfinite(value) and value != 0: abs_value = abs(value) for scale, prefix in _SI_PREFIXES: scaled = abs_value / scale if 1 <= scaled < 1000: signed = value / scale return f"{_format_numeric(signed)} {prefix}{unit}" return f"{_format_numeric(value)} {unit}" def _nice_length(target: float) -> float: if not np.isfinite(target) or target <= 0: return 0.0 exponent = np.floor(np.log10(target)) base = 10.0 ** exponent for step in (5.0, 2.0, 1.0): candidate = step * base if candidate <= target: return candidate return base def _display_value_range(field) -> tuple[float, float, float]: data = np.asarray(field.data, dtype=np.float64) dmin = float(data.min()) dmax = float(data.max()) if not np.isfinite(dmin) or not np.isfinite(dmax) or dmax <= dmin: return dmin, dmin, dmin offset = float(field.display_offset) scale = float(field.display_scale) if not np.isfinite(offset): offset = 0.0 if not np.isfinite(scale) or scale <= 0.0: scale = 1.0 low_norm = float(np.clip(offset, 0.0, 1.0)) high_norm = float(np.clip(offset + scale, 0.0, 1.0)) if high_norm < low_norm: low_norm, high_norm = high_norm, low_norm mid_norm = 0.5 * (low_norm + high_norm) span = dmax - dmin return ( dmin + low_norm * span, dmin + mid_norm * span, dmin + high_norm * span, ) def _render_annotation_text(text: str, size_px: int, color: tuple[int, int, int]): from PIL import Image, ImageDraw, ImageFont size_px = max(8, int(round(size_px))) try: font = ImageFont.truetype("DejaVuSans.ttf", size_px) probe = Image.new("RGBA", (1, 1), (0, 0, 0, 0)) probe_draw = ImageDraw.Draw(probe) bbox = probe_draw.textbbox((0, 0), text, font=font) width = max(1, bbox[2] - bbox[0]) height = max(1, bbox[3] - bbox[1]) text_image = Image.new("RGBA", (width, height), (0, 0, 0, 0)) text_draw = ImageDraw.Draw(text_image) text_draw.text((-bbox[0], -bbox[1]), text, font=font, fill=(*color, 255)) return text_image except Exception: font = ImageFont.load_default() probe = Image.new("L", (1, 1), 0) probe_draw = ImageDraw.Draw(probe) bbox = probe_draw.textbbox((0, 0), text, font=font) width = max(1, bbox[2] - bbox[0]) height = max(1, bbox[3] - bbox[1]) mask = Image.new("L", (width, height), 0) mask_draw = ImageDraw.Draw(mask) mask_draw.text((-bbox[0], -bbox[1]), text, font=font, fill=255) scale = max(1.0, size_px / max(1, height)) scaled_width = max(1, int(round(width * scale))) scaled_height = max(1, int(round(height * scale))) resampling = getattr(Image, "Resampling", Image) scaled_mask = mask.resize((scaled_width, scaled_height), resample=resampling.BILINEAR) text_image = Image.new("RGBA", (scaled_width, scaled_height), (*color, 0)) text_image.putalpha(scaled_mask) return text_image def _import_ibw_loader(): """Import igor's binary wave loader with NumPy 2 compatibility.""" if not hasattr(np, "complex"): # igor 0.3 still references np.complex at import time. setattr(np, "complex", complex) try: from igor.binarywave import load as load_ibw except ImportError: raise ImportError("Install 'igor' package to load .ibw files: pip install igor") return load_ibw # --------------------------------------------------------------------------- # Markup helpers (from display.py — used by Markup) # --------------------------------------------------------------------------- def _normalize_markup_color(color: object, default: str = "#ffd54f") -> str: if isinstance(color, str): text = color.strip() if len(text) == 4 and text.startswith("#"): text = "#" + "".join(ch * 2 for ch in text[1:]) if len(text) == 7 and text.startswith("#"): try: int(text[1:], 16) return text.lower() except ValueError: pass return default def _parse_markup_shapes(raw_shapes) -> list[dict]: if isinstance(raw_shapes, str): try: raw_shapes = json.loads(raw_shapes or "[]") except json.JSONDecodeError: raw_shapes = [] if not isinstance(raw_shapes, list): return [] parsed = [] for shape in raw_shapes: if not isinstance(shape, dict): continue kind = str(shape.get("kind", "")).strip().lower() if kind not in {"line", "rectangle", "circle", "arrow"}: continue try: x1 = float(shape.get("x1")) y1 = float(shape.get("y1")) x2 = float(shape.get("x2")) y2 = float(shape.get("y2")) width = int(round(float(shape.get("width", 3)))) except (TypeError, ValueError): continue coords = [x1, y1, x2, y2] if not all(np.isfinite(value) for value in coords): continue parsed.append({ "kind": kind, "x1": float(np.clip(x1, 0.0, 1.0)), "y1": float(np.clip(y1, 0.0, 1.0)), "x2": float(np.clip(x2, 0.0, 1.0)), "y2": float(np.clip(y2, 0.0, 1.0)), "width": max(1, min(128, width)), "color": _normalize_markup_color(shape.get("color")), }) return parsed def _draw_arrow(draw, start, end, color, width): dx = end[0] - start[0] dy = end[1] - start[1] length = float(np.hypot(dx, dy)) if length <= 1e-6: radius = max(1.0, width / 2.0) draw.ellipse( (start[0] - radius, start[1] - radius, start[0] + radius, start[1] + radius), fill=color, ) return ux = dx / length uy = dy / length head_length = max(10.0, width * 4.0) head_width = max(8.0, width * 3.0) shaft_end = ( end[0] - ux * head_length, end[1] - uy * head_length, ) draw.line((start, shaft_end), fill=color, width=width) px = -uy py = ux left = ( shaft_end[0] + px * head_width / 2.0, shaft_end[1] + py * head_width / 2.0, ) right = ( shaft_end[0] - px * head_width / 2.0, shaft_end[1] - py * head_width / 2.0, ) draw.polygon([end, left, right], fill=color) def _render_markup_image(image, shapes): from PIL import Image as PILImage, ImageDraw from backend.data_types import image_to_uint8 base = image_to_uint8(image) if base.ndim == 2: base = np.repeat(base[:, :, np.newaxis], 3, axis=2) canvas = PILImage.fromarray(base.copy()) draw = ImageDraw.Draw(canvas) height, width = base.shape[:2] for shape in shapes: x1 = float(shape["x1"]) * width y1 = float(shape["y1"]) * height x2 = float(shape["x2"]) * width y2 = float(shape["y2"]) * height color = str(shape["color"]) stroke_width = int(shape["width"]) kind = str(shape["kind"]) if kind == "line": draw.line(((x1, y1), (x2, y2)), fill=color, width=stroke_width) elif kind == "rectangle": draw.rectangle((x1, y1, x2, y2), outline=color, width=stroke_width) elif kind == "circle": draw.ellipse((x1, y1, x2, y2), outline=color, width=stroke_width) elif kind == "arrow": _draw_arrow(draw, (x1, y1), (x2, y2), color, stroke_width) return np.asarray(canvas, dtype=np.uint8) # --------------------------------------------------------------------------- # Mask helpers (from mask.py — used by multiple mask nodes) # --------------------------------------------------------------------------- def _mask_overlay(field, mask): from backend.data_types import datafield_to_uint8 grey = datafield_to_uint8(field, "gray") mask_bool = mask > 127 if not np.any(mask_bool): return grey overlay = grey.copy() red = overlay[..., 0] green = overlay[..., 1] blue = overlay[..., 2] red_vals = red[mask_bool].astype(np.uint16) green_vals = green[mask_bool].astype(np.uint16) blue_vals = blue[mask_bool].astype(np.uint16) red[mask_bool] = ((red_vals * 55) + (255 * 45) + 50) // 100 green[mask_bool] = ((green_vals * 55) + 50) // 100 blue[mask_bool] = ((blue_vals * 55) + 50) // 100 return overlay @lru_cache(maxsize=128) def _mask_structure(radius: int, shape: str): radius = max(1, int(radius)) if shape == "disk": y, x = np.ogrid[-radius:radius + 1, -radius:radius + 1] struct = (x * x + y * y) <= radius * radius else: size = 2 * radius + 1 struct = np.ones((size, size), dtype=bool) struct.setflags(write=False) return struct def _clamp_fraction(value) -> float: try: numeric = float(value) except (TypeError, ValueError): return 0.0 return max(0.0, min(1.0, numeric)) def _parse_mask_strokes(mask_paths) -> list[dict]: if isinstance(mask_paths, list): raw_strokes = mask_paths elif isinstance(mask_paths, str) and mask_paths.strip(): try: parsed = json.loads(mask_paths) except json.JSONDecodeError: return [] raw_strokes = parsed if isinstance(parsed, list) else [] else: return [] strokes = [] for stroke in raw_strokes: if not isinstance(stroke, dict): continue raw_points = stroke.get("points") if not isinstance(raw_points, list): continue points = [] for point in raw_points: if not isinstance(point, dict): continue if "x" not in point or "y" not in point: continue points.append({ "x": _clamp_fraction(point.get("x")), "y": _clamp_fraction(point.get("y")), }) if not points: continue try: size = max(1, int(round(float(stroke.get("size", 1))))) except (TypeError, ValueError): size = 1 strokes.append({ "size": size, "points": points, }) return strokes def _rasterize_mask(width, height, strokes, default_pen_size): from PIL import Image as PILImage, ImageDraw width = max(1, int(width)) height = max(1, int(height)) default_pen_size = max(1, int(default_pen_size)) mask_image = PILImage.new("L", (width, height), 0) draw = ImageDraw.Draw(mask_image) for stroke in strokes: points = stroke.get("points") or [] if not points: continue size = stroke.get("size", default_pen_size) try: size = max(1, int(round(float(size)))) except (TypeError, ValueError): size = default_pen_size pixel_points = [] for point in points: px = int(round(_clamp_fraction(point.get("x")) * (width - 1))) py = int(round(_clamp_fraction(point.get("y")) * (height - 1))) pixel_points.append((px, py)) radius = max(0.5, size / 2.0) if len(pixel_points) == 1: x, y = pixel_points[0] draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill=255) continue draw.line(pixel_points, fill=255, width=size) for x, y in pixel_points: draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill=255) return np.asarray(mask_image, dtype=np.uint8) # --------------------------------------------------------------------------- # Path / directory helpers (from io.py) # --------------------------------------------------------------------------- DEMO_DIR = demo_dir() INPUT_DIR = input_dir() OUTPUT_DIR = output_dir() _MAX_SAVE_FIELDS = 8 _DEMO_EXTENSIONS = {".png", ".jpg", ".jpeg", ".tiff", ".tif", ".npy", ".npz", ".gwy", ".sxm", ".ibw"} _SPM_EXTENSIONS = {".gwy", ".sxm", ".ibw"} _IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".tiff", ".tif", ".bmp"} _ARRAY_EXTENSIONS = {".npy", ".npz"} _PATH_COMPATIBLE_EXTENSIONS = _IMAGE_EXTENSIONS | _ARRAY_EXTENSIONS | _SPM_EXTENSIONS def _resolve_path(filepath: str): path = Path(filepath) if path.is_absolute(): return path candidate = INPUT_DIR / filepath if candidate.exists(): return candidate candidate = DEMO_DIR / filepath if candidate.exists(): return candidate return INPUT_DIR / filepath def list_channels(filepath: str) -> list[dict]: path = _resolve_path(filepath) if not path.exists(): return [{"name": "field", "type": "DATA_FIELD"}] ext = path.suffix.lower() if ext == ".gwy": try: import gwyfile obj = gwyfile.load(str(path)) channels = gwyfile.util.get_datafields(obj) if channels: return [{"name": k, "type": "DATA_FIELD"} for k in channels] except Exception: pass return [{"name": "field", "type": "DATA_FIELD"}] if ext == ".sxm": try: import nanonispy as nap sxm = nap.read.Scan(str(path)) if sxm.signals: return [{"name": k, "type": "DATA_FIELD"} for k in sxm.signals] except Exception: pass return [{"name": "field", "type": "DATA_FIELD"}] if ext == ".ibw": try: load_ibw = _import_ibw_loader() wave = load_ibw(str(path)) raw = wave["wave"]["wData"] labels = wave["wave"].get("labels", None) if raw.ndim >= 3 and labels: dim_idx = min(2, len(labels) - 1) if dim_idx >= 0 and labels[dim_idx]: decoded = [] for lbl in labels[dim_idx]: if lbl: name = lbl.split(b"\x00")[0].decode("ascii", errors="replace").strip() if name: decoded.append(name) if decoded: return [{"name": n, "type": "DATA_FIELD"} for n in decoded] if raw.ndim >= 3 and raw.shape[2] > 1: return [{"name": f"ch{i}", "type": "DATA_FIELD"} for i in range(raw.shape[2])] except Exception: pass return [{"name": "field", "type": "DATA_FIELD"}] return [{"name": "field", "type": "DATA_FIELD"}] def list_folder_paths(folderpath: str) -> list[dict]: path = _resolve_path(folderpath) if not path.exists() or not path.is_dir(): return [] resolved_dir = str(path.resolve()) results = [{"name": "directory", "type": "DIRECTORY", "path": resolved_dir}] for entry in sorted(path.iterdir(), key=lambda p: p.name.lower()): if not entry.is_file() or entry.name.startswith("."): continue if entry.suffix.lower() not in _PATH_COMPATIBLE_EXTENSIONS: continue results.append({"name": entry.name, "type": "FILE_PATH", "path": str(entry.resolve())}) return results def _list_demo_files() -> list[str]: if not DEMO_DIR.exists(): return [] return sorted( f.name for f in DEMO_DIR.iterdir() if f.is_file() and not f.name.startswith(".") and f.suffix.lower() in _DEMO_EXTENSIONS ) # --------------------------------------------------------------------------- # Butterworth / FFT helpers (from filters.py — used by FFTFilter # --------------------------------------------------------------------------- def _butterworth_lp(freq, cutoff, order): with np.errstate(divide="ignore", over="ignore"): return 1.0 / (1.0 + (freq / cutoff) ** (2 * order)) def _butterworth_hp(freq, cutoff, order): with np.errstate(divide="ignore", invalid="ignore"): h = 1.0 / (1.0 + (cutoff / freq) ** (2 * order)) h = np.where(np.isfinite(h), h, 0.0) return h def _build_1d_transfer(n, filter_type, cutoff, cutoff_high, order): freq = np.linspace(0, 1, n // 2 + 1) if filter_type == "lowpass": H = _butterworth_lp(freq, cutoff, order) elif filter_type == "highpass": H = _butterworth_hp(freq, cutoff, order) elif filter_type == "bandpass": H = _butterworth_hp(freq, cutoff, order) * _butterworth_lp(freq, cutoff_high, order) elif filter_type == "notch": bp = _butterworth_hp(freq, cutoff, order) * _butterworth_lp(freq, cutoff_high, order) H = 1.0 - bp else: H = np.ones_like(freq) return H @lru_cache(maxsize=64) def _cached_1d_transfer(n, filter_type, cutoff, cutoff_high, order): transfer = _build_1d_transfer(n, filter_type, cutoff, cutoff_high, order) transfer.setflags(write=False) return transfer @lru_cache(maxsize=32) def _fft_radius_grid(yres, xres): fy = np.fft.fftfreq(yres)[:, np.newaxis] * 2.0 fx = np.fft.rfftfreq(xres)[np.newaxis, :] * 2.0 radius = np.sqrt(fx * fx + fy * fy) / np.sqrt(2.0) np.clip(radius, 0.0, 1.0, out=radius) radius.setflags(write=False) return radius @lru_cache(maxsize=128) def _cached_2d_transfer(yres, xres, filter_type, cutoff, cutoff_high, order): radius = _fft_radius_grid(yres, xres) if filter_type == "lowpass": transfer = _butterworth_lp(radius, cutoff, order) elif filter_type == "highpass": transfer = _butterworth_hp(radius, cutoff, order) elif filter_type == "bandpass": transfer = _butterworth_hp(radius, cutoff, order) * _butterworth_lp(radius, cutoff_high, order) elif filter_type == "notch": band = _butterworth_hp(radius, cutoff, order) * _butterworth_lp(radius, cutoff_high, order) transfer = 1.0 - band else: transfer = np.ones_like(radius) transfer.setflags(write=False) return transfer # --------------------------------------------------------------------------- # Cross-section and stats helpers (from analysis.py) # --------------------------------------------------------------------------- def _extend_to_edges(x1, y1, x2, y2): dx = x2 - x1 dy = y2 - y1 t_candidates = [] if abs(dx) > 1e-12: for bx in (0.0, 1.0): t = (bx - x1) / dx y_at_t = y1 + t * dy if -1e-9 <= y_at_t <= 1.0 + 1e-9: t_candidates.append(t) if abs(dy) > 1e-12: for by in (0.0, 1.0): t = (by - y1) / dy x_at_t = x1 + t * dx if -1e-9 <= x_at_t <= 1.0 + 1e-9: t_candidates.append(t) if len(t_candidates) < 2: return x1, y1, x2, y2 t_min = min(t_candidates) t_max = max(t_candidates) return ( np.clip(x1 + t_min * dx, 0, 1), np.clip(y1 + t_min * dy, 0, 1), np.clip(x1 + t_max * dx, 0, 1), np.clip(y1 + t_max * dy, 0, 1), ) def _safe_rq(d): return float(np.sqrt(np.mean(d * d))) LINE_OPS: dict[str, tuple] = {} def _line_op(name, unit=""): def decorator(fn): LINE_OPS[name] = (fn, unit) return fn return decorator @_line_op("min") def _op_min(z): return float(z.min()) @_line_op("max") def _op_max(z): return float(z.max()) @_line_op("mean") def _op_mean(z): return float(z.mean()) @_line_op("median") def _op_median(z): return float(np.median(z)) @_line_op("sum") def _op_sum(z): return float(z.sum()) @_line_op("range") def _op_range(z): return float(z.max() - z.min()) @_line_op("length", unit="pts") def _op_length(z): return float(len(z)) @_line_op("rms") def _op_rms(z): return float(np.sqrt(np.mean(z * z))) @_line_op("Ra") def _op_ra(z): return float(np.mean(np.abs(z - z.mean()))) @_line_op("Rq") def _op_rq(z): d = z - z.mean() return _safe_rq(d) @_line_op("Rsk") def _op_rsk(z): d = z - z.mean() rq = _safe_rq(d) return float(np.mean(d**3) / rq**3) if rq > 0 else 0.0 @_line_op("Rku") def _op_rku(z): d = z - z.mean() rq = _safe_rq(d) return float(np.mean(d**4) / rq**4) if rq > 0 else 0.0 @_line_op("Rp") def _op_rp(z): return float((z - z.mean()).max()) @_line_op("Rv") def _op_rv(z): return float(-(z - z.mean()).min()) @_line_op("Rt") def _op_rt(z): d = z - z.mean() return float(d.max() - d.min()) @_line_op("Dq") def _op_dq(z): dz = np.diff(z) return float(np.sqrt(np.mean(dz * dz))) @_line_op("Da") def _op_da(z): return float(np.mean(np.abs(np.diff(z)))) TABLE_OPS: dict[str, Callable[[np.ndarray], float]] = { "min": lambda values: float(np.min(values)), "max": lambda values: float(np.max(values)), "avg": lambda values: float(np.mean(values)), "mean": lambda values: float(np.mean(values)), "median": lambda values: float(np.median(values)), "sum": lambda values: float(np.sum(values)), "range": lambda values: float(np.max(values) - np.min(values)), "std": lambda values: float(np.std(values)), "variance": lambda values: float(np.var(values)), "count": lambda values: float(len(values)), } ARRAY_OPS: dict[str, Callable[[np.ndarray], float]] = { "min": lambda values: float(np.min(values)), "max": lambda values: float(np.max(values)), "avg": lambda values: float(np.mean(values)), "mean": lambda values: float(np.mean(values)), "median": lambda values: float(np.median(values)), "sum": lambda values: float(np.sum(values)), "range": lambda values: float(np.max(values) - np.min(values)), "std": lambda values: float(np.std(values)), "variance": lambda values: float(np.var(values)), "rms": lambda values: float(np.sqrt(np.mean(values * values))), "count": lambda values: float(values.size), } def _square_unit(unit: str) -> str: unit = str(unit or "").strip() if not unit: return "" if any(token in unit for token in ("^", "(", ")", "/", "*", " ")): return f"({unit})^2" return f"{unit}^2" def _apply_scalar_unit(base_unit: str, operation: str) -> str: unit = str(base_unit or "").strip() if operation == "count": return "count" if not unit: return "" if operation == "variance": return _square_unit(unit) return unit def _common_table_unit(table: list, column: str) -> str: candidates = [] seen = set() unit_key = f"{column}_unit" for row in table: if not isinstance(row, dict): continue unit = None if unit_key in row and isinstance(row.get(unit_key), str): unit = row.get(unit_key) elif column == "value" and isinstance(row.get("unit"), str): unit = row.get("unit") if unit is None: continue unit = unit.strip() if not unit or unit in seen: continue seen.add(unit) candidates.append(unit) if len(candidates) == 1: return candidates[0] return "" def extract_numeric_table_values(table: list, column: str) -> list[float]: values = [] for row in table: if not isinstance(row, dict) or column not in row: continue value = row[column] if isinstance(value, bool): continue try: numeric = float(value) except (TypeError, ValueError): continue if np.isfinite(numeric): values.append(numeric) return values def resolve_table_column_name(table: list, column: str) -> str: requested = str(column or "").strip() if requested: return requested if extract_numeric_table_values(table, "value"): return "value" numeric_columns = [] seen = set() for row in table: if not isinstance(row, dict): continue for key in row.keys(): if key in seen: continue seen.add(key) if extract_numeric_table_values(table, key): numeric_columns.append(key) if len(numeric_columns) == 1: return numeric_columns[0] if not numeric_columns: raise ValueError("Stats could not find any numeric columns in the input table.") raise ValueError( "Stats found multiple numeric columns; set the column name explicitly." )