improve back and frontend testing

This commit is contained in:
2026-03-29 19:58:06 -07:00
parent 29eee8a42c
commit e9215a64ff
70 changed files with 13441 additions and 134 deletions

View File

@@ -23,3 +23,17 @@ def test_color_map_node():
assert custom["stops"][0]["position"] == 0.0
assert custom["stops"][-1]["position"] == 1.0
assert len(custom["stops"]) == 3
# invalid JSON raises ValueError
try:
node.build(mode="custom", preset="viridis", stops_json="not valid json{")
assert False, "Expected ValueError"
except ValueError:
pass
# fewer than 2 stops raises ValueError
try:
node.build(mode="custom", preset="viridis", stops_json=json.dumps([{"position": 0.0, "color": "#000000"}]))
assert False, "Expected ValueError"
except ValueError:
pass

View File

@@ -0,0 +1,100 @@
import numpy as np
from backend.data_types import DataField
from tests.node_tests._shared import make_field
def _make_freq_field(N=32):
"""Return a frequency-domain DataField built from a real spatial field."""
from backend.nodes.fft_2d import FFT2D
field = make_field(data=np.random.default_rng(0).standard_normal((N, N)), xreal=1e-6, yreal=1e-6)
spectrum, spec_mag, spec_phase, spec_psdf = FFT2D().process(field, windowing="none", level="none")
return spectrum, spec_mag, spec_phase, spec_psdf, field
def test_fft2d_inverse_magnitude():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
spectrum, spec_mag, spec_phase, _, original = _make_freq_field()
result, = node.process(spec_mag, representation="magnitude", phase=spec_phase)
assert isinstance(result, DataField)
assert result.domain == "spatial"
assert result.data.shape == original.data.shape
assert np.allclose(result.data, original.data, atol=1e-9)
def test_fft2d_inverse_log_magnitude():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
_, spec_mag, spec_phase, _, original = _make_freq_field()
from backend.nodes.fft_2d import FFT2D
field = make_field(data=np.random.default_rng(1).standard_normal((32, 32)), xreal=1e-6, yreal=1e-6)
spectrum, spec_mag2, spec_phase2, _ = FFT2D().process(field, windowing="none", level="none")
# log_magnitude = log1p(magnitude), so inverse should recover original
result, = node.process(spec_mag2, representation="log_magnitude", phase=spec_phase2)
assert result.domain == "spatial"
assert result.data.shape == field.data.shape
def test_fft2d_inverse_psdf():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
_, _, _, spec_psdf, original = _make_freq_field()
result, = node.process(spec_psdf, representation="psdf")
assert result.domain == "spatial"
assert result.data.shape == original.data.shape
def test_fft2d_inverse_no_phase():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
_, spec_mag, _, _, original = _make_freq_field()
result, = node.process(spec_mag, representation="magnitude")
assert result.domain == "spatial"
assert result.data.shape == original.data.shape
def test_fft2d_inverse_spatial_domain_raises():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
spatial_field = make_field(data=np.ones((16, 16)))
assert spatial_field.domain == "spatial"
try:
node.process(spatial_field, representation="magnitude")
assert False, "Expected ValueError"
except ValueError:
pass
def test_fft2d_inverse_unsupported_representation():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
_, spec_mag, _, _, _ = _make_freq_field()
try:
node.process(spec_mag, representation="invalid_repr")
assert False, "Expected ValueError"
except ValueError:
pass
def test_fft2d_inverse_phase_shape_mismatch():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
_, spec_mag, spec_phase, _, _ = _make_freq_field(N=32)
_, spec_mag_big, spec_phase_big, _, _ = _make_freq_field(N=64)
try:
node.process(spec_mag, representation="magnitude", phase=spec_phase_big)
assert False, "Expected ValueError"
except ValueError:
pass
def test_fft2d_inverse_phase_not_frequency_domain():
from backend.nodes.fft_2d_inverse import FFT2DInverse
node = FFT2DInverse()
_, spec_mag, _, _, _ = _make_freq_field()
spatial_phase = make_field(data=np.zeros((32, 32)))
try:
node.process(spec_mag, representation="magnitude", phase=spatial_phase)
assert False, "Expected ValueError"
except ValueError:
pass

View File

@@ -16,3 +16,15 @@ def test_fix_zero():
result_median, = node.process(field, method="median")
assert abs(np.median(result_median.data)) < 1e-10
def test_fix_zero_unknown_method():
from backend.nodes.fix_zero import FixZero
import pytest
node = FixZero()
field = make_field(data=np.array([[1.0, 2.0], [3.0, 4.0]]))
try:
node.process(field, method="invalid_method")
assert False, "Expected ValueError"
except ValueError:
pass

View File

@@ -50,3 +50,19 @@ def test_flip_field():
raise AssertionError("Expected invalid flip axis to raise ValueError")
except ValueError:
pass
# _flip_overlays with non-list input → returns []
assert FlipField._flip_overlays(None, "x") == []
assert FlipField._flip_overlays("not_a_list", "y") == []
# Non-dict item in overlays → skipped
field_bad_overlay = DataField(data=data, overlays=["not_a_dict", markup_overlay])
flipped_bad, = node.process(field_bad_overlay, axis="x")
assert len(flipped_bad.overlays) == 1
# Shape with None coordinates → returns shape unchanged
shape_no_coords = {"kind": "line", "x1": None, "y1": None, "x2": None, "y2": None}
overlay_no_coords = {"kind": "markup", "shapes": [shape_no_coords]}
field_no_coords = DataField(data=data, overlays=[overlay_no_coords])
flipped_nc, = node.process(field_no_coords, axis="x")
assert len(flipped_nc.overlays) == 1

View File

@@ -147,22 +147,58 @@ def test_load_file_warning():
def test_load_file_ibw():
from backend.nodes.image import Image
node = Image()
ibw_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "demo", "BR_New20012.ibw"))
if not os.path.exists(ibw_path):
demo_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "demo"))
# Use first available .ibw file in demo
ibw_path = None
for candidate in ["Calcite.ibw", "DNA.ibw", "nanoparticles.npy"]:
p = os.path.join(demo_dir, candidate)
if os.path.exists(p):
ibw_path = p
break
if ibw_path is None:
return
result = node.load(filename=ibw_path)
assert len(result) == 4
assert len(result) >= 2
for item in result[1:]:
assert isinstance(item, DataField)
assert item.data.dtype == np.float64
assert item.xreal > 0
for i, field in enumerate(result):
assert isinstance(field, DataField)
assert field.data.shape == (512, 1024)
assert field.data.dtype == np.float64
assert field.xreal > 1e-8
assert field.yreal > 1e-8
assert field.si_unit_xy == "m"
assert field.si_unit_z == "m"
assert result[0].xreal == result[1].xreal
assert result[0].yreal == result[1].yreal
assert not np.array_equal(result[0].data, result[1].data)
def test_load_empty_filename_raises():
from backend.nodes.image import Image
node = Image()
try:
node.load(filename="")
assert False, "Expected ValueError"
except ValueError:
pass
def test_load_directory_raises():
from backend.nodes.image import Image
node = Image()
with tempfile.TemporaryDirectory() as tmpdir:
try:
node.load(filename=tmpdir)
assert False, "Expected IsADirectoryError"
except IsADirectoryError:
pass
def test_load_float_tiff():
"""float32 TIFF images should be converted to float64 (covers non-uint8 branch)."""
import tifffile
from backend.nodes.image import Image
node = Image()
Image._load_fields_cached.cache_clear()
with tempfile.TemporaryDirectory() as tmpdir:
arr = np.random.default_rng(42).random((16, 16)).astype(np.float32)
path = os.path.join(tmpdir, "float.tiff")
tifffile.imwrite(path, arr)
result = node.load(filename=path)
assert len(result) == 2
assert result[1].data.dtype == np.float64
Image._load_fields_cached.cache_clear()

View File

@@ -15,3 +15,21 @@ def test_mask_invert():
double, = node.process(inverted)
assert np.array_equal(double, mask)
def test_mask_invert_with_field():
from backend.nodes.mask_invert import MaskInvert
from backend.execution_context import active_node, execution_callbacks
from tests.node_tests._shared import make_field
node = MaskInvert()
mask = np.zeros((32, 32), dtype=np.uint8)
mask[8:24, 8:24] = 255
field = make_field(data=np.ones((32, 32)))
previews = []
with execution_callbacks(preview=lambda nid, d: previews.append(d)), active_node("test"):
inverted, = node.process(mask, field=field)
assert inverted.dtype == np.uint8
assert len(previews) == 1
assert previews[0].startswith("data:image/png;base64,")

View File

@@ -27,3 +27,31 @@ def test_mask_morphology():
dilated_disk, = node.process(mask, operation="dilate", radius=2, shape="disk")
assert np.count_nonzero(dilated_disk) > orig_count
def test_mask_morphology_with_field():
from backend.nodes.mask_morphology import MaskMorphology
from backend.execution_context import active_node, execution_callbacks
from tests.node_tests._shared import make_field
node = MaskMorphology()
mask = np.zeros((32, 32), dtype=np.uint8)
mask[10:22, 10:22] = 255
field = make_field(data=np.ones((32, 32)))
previews = []
with execution_callbacks(preview=lambda nid, d: previews.append(d)), active_node("test"):
dilated, = node.process(mask, operation="dilate", radius=1, shape="square", field=field)
assert dilated.dtype == np.uint8
assert len(previews) == 1
def test_mask_morphology_unknown_operation():
from backend.nodes.mask_morphology import MaskMorphology
node = MaskMorphology()
mask = np.zeros((32, 32), dtype=np.uint8)
try:
node.process(mask, operation="invalid_op", radius=1, shape="square")
assert False, "Expected ValueError"
except ValueError:
pass

View File

@@ -42,3 +42,15 @@ def test_mask_operations():
assert result_xnor[5, 5] == 255
assert result_xnor[15, 15] == 0
assert result_xnor[35, 35] == 0
def test_mask_operations_unknown_operation():
from backend.nodes.mask_operations import MaskOperations
node = MaskOperations()
a = np.zeros((16, 16), dtype=np.uint8)
b = np.zeros((16, 16), dtype=np.uint8)
try:
node.process(a, b, operation="invalid_op")
assert False, "Expected ValueError"
except ValueError:
pass

View File

@@ -34,3 +34,14 @@ def test_threshold_mask():
assert mask_otsu[:, 32:].sum() > mask_otsu[:, :32].sum()
ThresholdMask._broadcast_fn = None
def test_threshold_mask_unknown_method():
from backend.nodes.mask_threshold import ThresholdMask
node = ThresholdMask()
field = make_field(data=np.zeros((16, 16)))
try:
node.process(field, method="invalid", threshold=0.5, direction="above")
assert False, "Expected ValueError"
except ValueError:
pass

View File

@@ -0,0 +1,32 @@
"""
Calls get_node_info() for every registered node to ensure INPUT_TYPES()
is exercised and the registry metadata is well-formed.
"""
import backend.nodes # noqa: F401 — registers all nodes
from backend.node_registry import get_node_info, NODE_CLASS_MAPPINGS
_REQUIRED_KEYS = {"name", "display_name", "category", "input", "output", "output_name", "description"}
def test_all_nodes_have_valid_info():
for class_name in NODE_CLASS_MAPPINGS:
info = get_node_info(class_name)
missing = _REQUIRED_KEYS - info.keys()
assert not missing, f"{class_name} info missing keys: {missing}"
assert isinstance(info["input"], dict), f"{class_name} input must be a dict"
assert isinstance(info["output"], list), f"{class_name} output must be a list"
assert isinstance(info["output_name"], list), f"{class_name} output_name must be a list"
assert len(info["output"]) == len(info["output_name"]), (
f"{class_name} output/output_name length mismatch"
)
def test_all_nodes_input_types_callable():
"""INPUT_TYPES() must return a dict with at least a 'required' or 'optional' key."""
for class_name, cls in NODE_CLASS_MAPPINGS.items():
result = cls.INPUT_TYPES()
assert isinstance(result, dict), f"{class_name}.INPUT_TYPES() must return a dict"
assert "required" in result or "optional" in result, (
f"{class_name}.INPUT_TYPES() must have 'required' or 'optional'"
)

119
tests/node_tests/note.py Normal file
View File

@@ -0,0 +1,119 @@
import os
import tempfile
from pathlib import Path
def test_parse_ibw_note():
from backend.nodes.note import _parse_ibw_note
# key=value pairs
note = b"ScanSize=500e-9\nSetPoint=1.0\nScanRate=1.0\n"
rows = _parse_ibw_note(note)
assert len(rows) == 3
assert rows[0] == {"key": "ScanSize", "value": "500e-9"}
# key:value pairs
note_colon = b"Operator: alice\nDate: 2025-01-01\n"
rows2 = _parse_ibw_note(note_colon)
assert len(rows2) == 2
assert rows2[0]["key"] == "Operator"
assert rows2[0]["value"] == "alice"
# empty bytes → empty result
assert _parse_ibw_note(b"") == []
# blank lines are skipped (covers 'if not line: continue')
note_blanks = b"\n\nKey=Val\n\n"
rows_blank = _parse_ibw_note(note_blanks)
assert len(rows_blank) == 1
# lines without separator are skipped
note_mixed = b"NoSeparatorLine\nKey=Value\n"
rows3 = _parse_ibw_note(note_mixed)
assert len(rows3) == 1
assert rows3[0]["key"] == "Key"
# non-bytes input → exception handler returns []
rows_exc = _parse_ibw_note(None)
assert rows_exc == []
def test_note_load_no_file():
from backend.nodes.note import Note
node = Note()
try:
node.load(filename="")
assert False, "Expected ValueError for empty filename"
except ValueError:
pass
def test_note_load_file_not_found():
from backend.nodes.note import Note
node = Note()
try:
node.load(filename="/nonexistent/path/file.ibw")
assert False, "Expected FileNotFoundError"
except FileNotFoundError:
pass
def test_note_load_wrong_extension():
from backend.nodes.note import Note
node = Note()
with tempfile.TemporaryDirectory() as tmpdir:
txt_path = os.path.join(tmpdir, "notes.txt")
Path(txt_path).write_text("Key=Value")
try:
node.load(filename=txt_path)
assert False, "Expected ValueError for non-ibw file"
except ValueError as e:
assert ".txt" in str(e)
def test_note_load_empty_note():
"""An .ibw file with no parseable note entries raises ValueError."""
from backend.nodes.note import Note
from unittest.mock import patch
import numpy as np
node = Note()
with tempfile.TemporaryDirectory() as tmpdir:
ibw_path = os.path.join(tmpdir, "empty_note.ibw")
Path(ibw_path).write_bytes(b"fake_ibw")
# Mock the IBW loader to return a wave with empty note
fake_wave = {"wave": {"wave_header": {}, "wData": np.zeros((4, 4)), "note": b""}}
with patch("backend.nodes.note._import_ibw_loader", return_value=lambda path: fake_wave):
try:
node.load(filename=ibw_path)
assert False, "Expected ValueError for empty note"
except ValueError as e:
assert "No metadata" in str(e)
def test_note_load_ibw():
"""Load from a real .ibw file if available in the demo directory."""
from backend.nodes.note import Note
from backend.data_types import DataTable
demo_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "demo"))
ibw_path = None
for candidate in ["Calcite.ibw", "DNA.ibw", "APL_Figure4.ibw"]:
p = os.path.join(demo_dir, candidate)
if os.path.exists(p):
ibw_path = p
break
if ibw_path is None:
return
node = Note()
try:
result, = node.load(filename=ibw_path)
assert isinstance(result, DataTable)
assert len(result) > 0
assert "key" in result[0]
assert "value" in result[0]
except ValueError:
# Some .ibw files have no note metadata — that's acceptable
pass

View File

@@ -56,3 +56,43 @@ def test_preview_image():
node.preview(colormap="auto", input=annotated_image)
assert len(captured) == 1
assert captured[0].startswith("data:image/png;base64,")
def test_preview_no_input_raises():
from backend.nodes.preview_image import PreviewImage
node = PreviewImage()
with execution_callbacks(preview=lambda nid, d: None), active_node("test"):
try:
node.preview(colormap="gray", input=None)
assert False, "Expected ValueError"
except ValueError:
pass
def test_preview_invalid_type_raises():
from backend.nodes.preview_image import PreviewImage
node = PreviewImage()
with execution_callbacks(preview=lambda nid, d: None), active_node("test"):
try:
node.preview(colormap="gray", input="not_an_image")
assert False, "Expected TypeError"
except TypeError:
pass
def test_preview_float_grayscale():
from backend.nodes.preview_image import PreviewImage
node = PreviewImage()
captured = []
with execution_callbacks(preview=lambda nid, d: captured.append(d)), active_node("test"):
# float32 2-D array — covers the float normalization branch
float_arr = np.random.default_rng(7).random((16, 16)).astype(np.float32)
node.preview(colormap="viridis", input=float_arr)
assert len(captured) == 1
assert captured[0].startswith("data:image/png;base64,")
captured.clear()
# constant float array → normalized = zeros branch
const_arr = np.ones((8, 8), dtype=np.float32)
node.preview(colormap="viridis", input=const_arr)
assert len(captured) == 1

View File

@@ -58,3 +58,14 @@ def test_rotate_field_overlay_warning():
assert "clears annotation/markup overlays" in warnings[0]
RotateField._broadcast_warning_fn = None
def test_rotate_unknown_interpolation():
from backend.nodes.rotate import RotateField
node = RotateField()
field = DataField(data=np.arange(9, dtype=np.float64).reshape(3, 3))
try:
node.process(field, angle=0.0, interpolation="invalid", expand_canvas=False)
assert False, "Expected ValueError"
except ValueError:
pass

View File

@@ -136,3 +136,55 @@ def test_save_generic():
assert False, "DATA_FIELD should reject unsupported save formats"
except ValueError:
pass
# 1-D ndarray → _save_line path
arr_1d = np.array([1.0, 2.0, 3.0])
node.save(filename="line_1d", directory_path=tmpdir, format="CSV", value=arr_1d)
assert Path(tmpdir, "line_1d.csv").exists()
# Unsupported input type
try:
node.save(filename="bad_type", directory_path=tmpdir, format="JSON", value=object())
assert False, "Expected ValueError for unsupported type"
except ValueError:
pass
# Unsupported IMAGE format
try:
node.save(filename="img_bad", directory_path=tmpdir, format="JSON", value=image)
assert False, "Expected ValueError for IMAGE + JSON"
except ValueError:
pass
# Unsupported LINE format
try:
node.save(filename="line_bad", directory_path=tmpdir, format="TIFF", value=line)
assert False, "Expected ValueError for LINE + TIFF"
except ValueError:
pass
# Unsupported table format
try:
node.save(filename="table_bad", directory_path=tmpdir, format="TIFF", value=list(measure_table))
assert False, "Expected ValueError for table + TIFF"
except ValueError:
pass
# Unsupported scalar format
try:
node.save(filename="scalar_bad", directory_path=tmpdir, format="NPZ", value=3.14)
assert False, "Expected ValueError for scalar + NPZ"
except ValueError:
pass
def test_save_no_filename():
from backend.nodes.save import Save
import tempfile
node = Save()
with tempfile.TemporaryDirectory() as tmpdir:
try:
node.save(filename="", directory_path=tmpdir, format="JSON", value=1.0)
assert False, "Expected ValueError for empty filename"
except ValueError:
pass

View File

@@ -66,3 +66,42 @@ def test_stats():
pass
Stats._broadcast_value_fn = None
def test_stats_empty_inputs():
from backend.nodes.stats import Stats
from backend.data_types import DataTable
node = Stats()
# empty record table (DataTable with no rows)
try:
node.process(DataTable([]), operation="mean", column="value")
assert False, "Expected ValueError for empty table"
except ValueError:
pass
# empty ndarray
try:
node.process(np.array([]), operation="mean", column="value")
assert False, "Expected ValueError for empty ndarray"
except ValueError:
pass
# empty LINE (1-D array)
try:
node.process(np.array([], dtype=np.float64), operation="Rq", column="value")
assert False, "Expected ValueError for empty LINE"
except ValueError:
pass
def test_stats_unsupported_type():
from backend.nodes.stats import Stats
node = Stats()
try:
node.process("not_a_valid_input", operation="mean", column="value")
assert False, "Expected ValueError for unsupported type"
except ValueError:
pass