From 5d4c6dfceadf8ea97c96f62dda41629ea5af0614 Mon Sep 17 00:00:00 2001 From: matei jordache Date: Fri, 3 Apr 2026 22:09:19 -0700 Subject: [PATCH] low pri features --- backend/node_menu.py | 84 +++++++------ backend/nodes/__init__.py | 87 +------------- backend/nodes/affine_correction.py | 66 ++++++++++ backend/nodes/deconvolution.py | 80 +++++++++++++ backend/nodes/drift_correction.py | 81 +++++++++++++ backend/nodes/facet_analysis.py | 80 +++++++++++++ backend/nodes/feature_detection.py | 99 +++++++++++++++ backend/nodes/hough_transform.py | 102 ++++++++++++++++ backend/nodes/image_stitch.py | 131 ++++++++++++++++++++ backend/nodes/lattice_measurement.py | 116 ++++++++++++++++++ backend/nodes/mfm_analysis.py | 89 ++++++++++++++ backend/nodes/shape_fitting.py | 132 ++++++++++++++++++++ backend/nodes/synthetic_surface.py | 152 ++++++++++++++++++++++++ tests/node_tests/affine_correction.py | 45 +++++++ tests/node_tests/deconvolution.py | 42 +++++++ tests/node_tests/drift_correction.py | 53 +++++++++ tests/node_tests/facet_analysis.py | 42 +++++++ tests/node_tests/feature_detection.py | 50 ++++++++ tests/node_tests/hough_transform.py | 50 ++++++++ tests/node_tests/image_stitch.py | 45 +++++++ tests/node_tests/lattice_measurement.py | 46 +++++++ tests/node_tests/mfm_analysis.py | 47 ++++++++ tests/node_tests/scar_removal.py | 3 +- tests/node_tests/shape_fitting.py | 45 +++++++ tests/node_tests/synthetic_surface.py | 57 +++++++++ 25 files changed, 1707 insertions(+), 117 deletions(-) create mode 100644 backend/nodes/affine_correction.py create mode 100644 backend/nodes/deconvolution.py create mode 100644 backend/nodes/drift_correction.py create mode 100644 backend/nodes/facet_analysis.py create mode 100644 backend/nodes/feature_detection.py create mode 100644 backend/nodes/hough_transform.py create mode 100644 backend/nodes/image_stitch.py create mode 100644 backend/nodes/lattice_measurement.py create mode 100644 backend/nodes/mfm_analysis.py create mode 100644 backend/nodes/shape_fitting.py create mode 100644 backend/nodes/synthetic_surface.py create mode 100644 tests/node_tests/affine_correction.py create mode 100644 tests/node_tests/deconvolution.py create mode 100644 tests/node_tests/drift_correction.py create mode 100644 tests/node_tests/facet_analysis.py create mode 100644 tests/node_tests/feature_detection.py create mode 100644 tests/node_tests/hough_transform.py create mode 100644 tests/node_tests/image_stitch.py create mode 100644 tests/node_tests/lattice_measurement.py create mode 100644 tests/node_tests/mfm_analysis.py create mode 100644 tests/node_tests/shape_fitting.py create mode 100644 tests/node_tests/synthetic_surface.py diff --git a/backend/node_menu.py b/backend/node_menu.py index ae7c916..b395273 100644 --- a/backend/node_menu.py +++ b/backend/node_menu.py @@ -14,75 +14,93 @@ from typing import Any MENU_LAYOUT: dict[str, list[str]] = { "Input": [ "Image", - "Note", - "ImageDemo", "Folder", + "ImageDemo", + "SyntheticSurface", + "Note", + "TextNote", + "Number", "RangeSlider", "Coordinate", "CoordinatePair", ], "Display": [ - "ColorMap", - "Font", - "ColormapAdjust", "PreviewImage", - "ValueIO", "View3D", + "ColorMap", + "ColormapAdjust", + "Font", + "ValueIO", + "PrintTable", "Save", "SaveImage", - "PrintTable", ], "Overlay": [ "Markup", "Annotations", "AngleMeasure", + "Cursors", ], "Geometry": [ "CropResizeField", "RotateField", "FlipField", "Resample", + "AffineCorrection", + "ImageStitch", "FieldArithmetic", ], - "Filter": [ - "GaussianFilter", - "MedianFilter", - "EdgeDetect", - "Gradient", - "FFTFilter", - "ScarRemoval", - ], - "Spectral": [ - "FFT2D", - "FFT2DInverse", - "FFTFilter", - "ACF2D", - "ACF1D", - "PSDF", - ], "Level & Correct": [ "FixZero", "PlaneLevelField", "PolyLevelField", "FacetLevelField", "LineCorrection", + "DriftCorrection", "ScarRemoval", + "SpotRemoval", ], - "Measure": [ + "Filter": [ + "GaussianFilter", + "MedianFilter", + "KuwaharaFilter", + "WaveletDenoise", + "LocalContrast", + "CustomConvolution", + "Deconvolution", + "Gradient", + "EdgeDetect", + ], + "Spectral": [ + "FFT2D", + "FFT2DInverse", + "FFTFilter", "FFT1D", - "AngleMeasure", - "CrossSection", - "Histogram", - "Cursors", - "Curvature", - "FractalDimension", "ACF2D", "ACF1D", "PSDF", - "SlopeDistribution", - "RadialProfile", + "CrossCorrelate", + ], + "Measure": [ + "CrossSection", + "Histogram", "Statistics", "Stats", + "Curvature", + "ShapeFitting", + "FractalDimension", + "Entropy", + "SlopeDistribution", + "RadialProfile", + "LatticeMeasurement", + "AngleMeasure", + ], + "Detect": [ + "FeatureDetection", + "HoughTransform", + "TemplateMatch", + "FacetAnalysis", + "MFMAnalysis", ], "Mask": [ "DrawMask", @@ -90,8 +108,6 @@ MENU_LAYOUT: dict[str, list[str]] = { "MaskMorphology", "MaskInvert", "MaskOperations", - "GrainDistanceTransform", - "WatershedSegmentation", ], "Grains": [ "GrainDistanceTransform", diff --git a/backend/nodes/__init__.py b/backend/nodes/__init__.py index 08c7990..7d0eb17 100644 --- a/backend/nodes/__init__.py +++ b/backend/nodes/__init__.py @@ -1,81 +1,6 @@ -# Import all node modules to trigger @register_node decorators. -from backend.nodes import ( - # IO - colormap, - crop_resize, - fft_2d_inverse, - filter_fft, - filter_gaussian, - filter_median, - flip, - font, - image, - image_demo, - folder, - coordinate, - coordinate_pair, - level_facet, - level_plane, - level_poly, - mask_draw, - mask_threshold, - note, - number, - text_note, - range_slider, - rotate, - save, - edge_detect, - colormap_adjust, - fix_zero, - line_correction, - # Mask - mask_morphology, - mask_invert, - mask_operations, - grain_distance_transform, - save_layers, - # Correction - scar_removal, - # Display - annotations, - angle_measure, - markup, - preview_image, - statistics, - value_io, - view_3d, - print_table, - curvature, - fractal_dimension, - histogram, - acf_2d, - acf_1d, - cursors, - fft_2d, - psdf, - cross_section, - stats, - watershed_segmentation, - grain_analysis, - fft_1d, - # Medium-value nodes (Gwyddion feature-gap) - field_arithmetic, - gradient, - resample, - slope_distribution, - grain_filter, - radial_profile, - tip_model, - tip_deconvolution, - tip_blind_estimate, - # New: remaining Gwyddion feature-gap nodes - filter_kuwahara, - entropy, - local_contrast, - filter_custom, - spot_removal, - wavelet_denoise, - cross_correlate, - template_match, -) +# Auto-import all node modules to trigger @register_node decorators. +import importlib +import pkgutil + +for _finder, _name, _ispkg in pkgutil.iter_modules(__path__): + importlib.import_module(f"{__name__}.{_name}") diff --git a/backend/nodes/affine_correction.py b/backend/nodes/affine_correction.py new file mode 100644 index 0000000..d9fe8bb --- /dev/null +++ b/backend/nodes/affine_correction.py @@ -0,0 +1,66 @@ +"""Affine correction — fix geometric distortions from scanner nonlinearity.""" + +from __future__ import annotations + +import numpy as np +from scipy.ndimage import affine_transform + +from backend.node_registry import register_node +from backend.data_types import DataField + + +@register_node(display_name="Affine Correction") +class AffineCorrection: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "shear_x": ("FLOAT", {"default": 0.0, "min": -1.0, "max": 1.0, "step": 0.01}), + "shear_y": ("FLOAT", {"default": 0.0, "min": -1.0, "max": 1.0, "step": 0.01}), + "scale_x": ("FLOAT", {"default": 1.0, "min": 0.1, "max": 10.0, "step": 0.01}), + "scale_y": ("FLOAT", {"default": 1.0, "min": 0.1, "max": 10.0, "step": 0.01}), + "angle": ("FLOAT", {"default": 0.0, "min": -45.0, "max": 45.0, "step": 0.1}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'corrected'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Apply an affine correction to fix geometric distortions from scanner " + "nonlinearity. Parameters specify shear, scale, and rotation corrections. " + "The transform is applied about the centre of the field. " + "Equivalent to Gwyddion's correct_affine.c module." + ) + + def process( + self, + field: DataField, + shear_x: float, + shear_y: float, + scale_x: float, + scale_y: float, + angle: float, + ) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + yres, xres = data.shape + cy, cx = yres / 2.0, xres / 2.0 + + theta = np.radians(angle) + cos_t, sin_t = np.cos(theta), np.sin(theta) + + # Build the correction matrix: Scale * Shear * Rotation + rotation = np.array([[cos_t, -sin_t], [sin_t, cos_t]]) + shear = np.array([[1.0, shear_x], [shear_y, 1.0]]) + scale = np.array([[1.0 / scale_x, 0.0], [0.0, 1.0 / scale_y]]) + + matrix = scale @ shear @ rotation + + # Offset so the transform is centred + offset = np.array([cy, cx]) - matrix @ np.array([cy, cx]) + + corrected = affine_transform(data, matrix, offset=offset, order=3, mode="reflect") + return (field.replace(data=corrected),) diff --git a/backend/nodes/deconvolution.py b/backend/nodes/deconvolution.py new file mode 100644 index 0000000..fcb4bf8 --- /dev/null +++ b/backend/nodes/deconvolution.py @@ -0,0 +1,80 @@ +"""Deconvolution — image restoration via regularised deconvolution.""" + +from __future__ import annotations + +import numpy as np + +from backend.node_registry import register_node +from backend.data_types import DataField + + +@register_node(display_name="Deconvolution") +class Deconvolution: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "method": (["wiener", "richardson_lucy"], {"default": "wiener"}), + "sigma": ("FLOAT", {"default": 2.0, "min": 0.1, "max": 50.0, "step": 0.1}), + "regularisation": ("FLOAT", {"default": 0.01, "min": 1e-6, "max": 1.0, "step": 0.001}), + "iterations": ("INT", {"default": 10, "min": 1, "max": 200}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'restored'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Restore an image via regularised deconvolution. Assumes the image was " + "blurred by a Gaussian PSF with the given sigma (in pixels). " + "Wiener filtering is fast and works in one pass. " + "Richardson-Lucy is iterative and preserves positivity. " + "Equivalent to Gwyddion's deconvolve.c module." + ) + + def process( + self, + field: DataField, + method: str, + sigma: float, + regularisation: float, + iterations: int, + ) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + yres, xres = data.shape + + # Build Gaussian PSF in frequency domain + kx = np.fft.fftfreq(xres) + ky = np.fft.fftfreq(yres) + KX, KY = np.meshgrid(kx, ky) + K2 = KX**2 + KY**2 + H = np.exp(-2.0 * (np.pi * sigma)**2 * K2) + + if method == "wiener": + fft_data = np.fft.fft2(data) + H2 = H * H + # Wiener filter: H* / (|H|² + λ) + wiener = np.conj(H) / (H2 + regularisation) + restored = np.real(np.fft.ifft2(fft_data * wiener)) + + elif method == "richardson_lucy": + # Richardson-Lucy iterative deconvolution + estimate = data.copy() + H_conj = np.conj(H) + + for _ in range(iterations): + estimate = np.maximum(estimate, 1e-30) # positivity + blurred = np.real(np.fft.ifft2(np.fft.fft2(estimate) * H)) + blurred = np.maximum(blurred, 1e-30) + ratio = data / blurred + correction = np.real(np.fft.ifft2(np.fft.fft2(ratio) * H_conj)) + estimate = estimate * correction + + restored = estimate + else: + raise ValueError(f"Unknown deconvolution method: {method!r}") + + return (field.replace(data=restored),) diff --git a/backend/nodes/drift_correction.py b/backend/nodes/drift_correction.py new file mode 100644 index 0000000..bdfb3da --- /dev/null +++ b/backend/nodes/drift_correction.py @@ -0,0 +1,81 @@ +"""Drift correction — compensate for thermal/piezo drift between scan lines.""" + +from __future__ import annotations + +import numpy as np +from scipy.ndimage import shift as ndimage_shift + +from backend.node_registry import register_node +from backend.data_types import DataField + + +def _estimate_drift(data: np.ndarray, reference: str) -> np.ndarray: + """Estimate per-row horizontal drift via cross-correlation with a reference. + + Returns an array of shape (yres,) with the estimated x-shift (in pixels) + for each row. + """ + yres, xres = data.shape + shifts = np.zeros(yres) + + if reference == "previous_row": + for i in range(1, yres): + corr = np.correlate(data[i - 1], data[i], mode="full") + peak = np.argmax(corr) - (xres - 1) + shifts[i] = shifts[i - 1] + peak + elif reference == "mean_row": + mean_row = data.mean(axis=0) + for i in range(yres): + corr = np.correlate(mean_row, data[i], mode="full") + peak = np.argmax(corr) - (xres - 1) + shifts[i] = peak + else: + raise ValueError(f"Unknown reference: {reference!r}") + + # Remove the overall mean to centre the correction + shifts -= shifts.mean() + return shifts + + +@register_node(display_name="Drift Correction") +class DriftCorrection: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "reference": (["previous_row", "mean_row"], {"default": "previous_row"}), + "direction": (["horizontal", "vertical"], {"default": "horizontal"}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'corrected'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Compensate for thermal or piezo drift between scan lines. " + "Cross-correlates each row (or column) against a reference to estimate " + "the drift offset, then shifts lines to correct. " + "Equivalent to Gwyddion's drift.c module." + ) + + def process(self, field: DataField, reference: str, direction: str) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + + if direction == "vertical": + data = data.T + + shifts = _estimate_drift(data, reference) + corrected = np.empty_like(data) + for i, s in enumerate(shifts): + if abs(s) < 0.01: + corrected[i] = data[i] + else: + ndimage_shift(data[i], -s, output=corrected[i], mode="reflect") + + if direction == "vertical": + corrected = corrected.T + + return (field.replace(data=corrected),) diff --git a/backend/nodes/facet_analysis.py b/backend/nodes/facet_analysis.py new file mode 100644 index 0000000..aa7f4bb --- /dev/null +++ b/backend/nodes/facet_analysis.py @@ -0,0 +1,80 @@ +"""Facet analysis — orientation distribution of surface facets.""" + +from __future__ import annotations + +import numpy as np + +from backend.node_registry import register_node +from backend.data_types import DataField + + +@register_node(display_name="Facet Analysis") +class FacetAnalysis: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "n_bins": ("INT", {"default": 180, "min": 30, "max": 720}), + "kernel_size": ("INT", {"default": 3, "min": 3, "max": 9, "step": 2}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'facet_map'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Compute the facet orientation distribution of a surface. " + "Outputs a 2D histogram (stereographic projection) where the x-axis " + "is the azimuthal angle (phi) and y-axis is the inclination (theta). " + "Intensity represents how much surface area faces each orientation. " + "Equivalent to Gwyddion's facet_analysis.c module." + ) + + def process(self, field: DataField, n_bins: int, kernel_size: int) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + + # Compute gradients with Sobel-like kernel for robustness + from scipy.ndimage import sobel + gx = sobel(data, axis=1) / (kernel_size * field.dx) + gy = sobel(data, axis=0) / (kernel_size * field.dy) + + # Convert to spherical angles + theta = np.arctan(np.sqrt(gx**2 + gy**2)) # inclination from vertical + phi = np.arctan2(gy, gx) # azimuthal angle + + # Map to [0, pi/2] and [0, 2*pi] + theta_flat = theta.ravel() + phi_flat = (phi.ravel() + 2 * np.pi) % (2 * np.pi) + + # Build 2D histogram (stereographic projection) + theta_max = float(theta_flat.max()) if theta_flat.size > 0 else np.pi / 4 + theta_max = max(theta_max, 0.01) + + n_theta = max(1, n_bins // 4) + n_phi = n_bins + + hist, _, _ = np.histogram2d( + theta_flat, + phi_flat, + bins=[n_theta, n_phi], + range=[[0.0, theta_max], [0.0, 2 * np.pi]], + ) + + # Normalise so it represents a probability density + total = hist.sum() + if total > 0: + hist = hist / total + + facet_field = DataField( + data=hist, + xreal=360.0, + yreal=np.degrees(theta_max), + si_unit_xy="deg", + si_unit_z="", + domain="spatial", + ) + + return (facet_field,) diff --git a/backend/nodes/feature_detection.py b/backend/nodes/feature_detection.py new file mode 100644 index 0000000..9638ce5 --- /dev/null +++ b/backend/nodes/feature_detection.py @@ -0,0 +1,99 @@ +"""Feature detection — Canny edge and Harris corner detection.""" + +from __future__ import annotations + +import numpy as np +from skimage.feature import canny, corner_harris, corner_peaks + +from backend.node_registry import register_node +from backend.data_types import DataField, RecordTable + + +@register_node(display_name="Feature Detection") +class FeatureDetection: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "method": (["canny", "harris"], {"default": "canny"}), + "sigma": ("FLOAT", {"default": 1.0, "min": 0.1, "max": 20.0, "step": 0.1}), + }, + "optional": { + "low_threshold": ("FLOAT", {"default": 0.1, "min": 0.0, "max": 1.0, "step": 0.01}), + "high_threshold": ("FLOAT", {"default": 0.2, "min": 0.0, "max": 1.0, "step": 0.01}), + "harris_k": ("FLOAT", {"default": 0.05, "min": 0.01, "max": 0.5, "step": 0.01}), + "min_distance": ("INT", {"default": 5, "min": 1, "max": 100}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'result'), + ('RECORD_TABLE', 'features'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Detect edges or corners in a surface. " + "Canny: multi-stage edge detector with hysteresis thresholding. " + "Harris: corner/interest point detector based on structure tensor. " + "Outputs a feature map and a table of detected feature locations. " + "Equivalent to Gwyddion's edge/corner detection in filters.c." + ) + + def process( + self, + field: DataField, + method: str, + sigma: float, + low_threshold: float = 0.1, + high_threshold: float = 0.2, + harris_k: float = 0.05, + min_distance: int = 5, + ) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + + # Normalise to [0, 1] + dmin, dmax = data.min(), data.max() + if dmax > dmin: + norm = (data - dmin) / (dmax - dmin) + else: + norm = np.zeros_like(data) + + records: RecordTable = RecordTable() + + if method == "canny": + edges = canny( + norm, + sigma=sigma, + low_threshold=low_threshold, + high_threshold=high_threshold, + ) + result = edges.astype(np.float64) + n_edge_pixels = int(edges.sum()) + edge_fraction = n_edge_pixels / data.size + records.append({"quantity": "Edge pixels", "value": str(n_edge_pixels), "unit": ""}) + records.append({"quantity": "Edge fraction", "value": f"{edge_fraction:.4f}", "unit": ""}) + + elif method == "harris": + response = corner_harris(norm, sigma=sigma, k=harris_k) + result = response.astype(np.float64) + corners = corner_peaks( + response, + min_distance=min_distance, + threshold_rel=0.1, + ) + records.append({"quantity": "Corners detected", "value": str(len(corners)), "unit": ""}) + for i, (row, col) in enumerate(corners[:20]): + x_phys = col * field.dx + y_phys = row * field.dy + records.append({ + "quantity": f"Corner {i + 1}", + "value": f"({x_phys:.4g}, {y_phys:.4g})", + "unit": field.si_unit_xy, + }) + + else: + raise ValueError(f"Unknown method: {method!r}") + + return (field.replace(data=result, si_unit_z=""), records) diff --git a/backend/nodes/hough_transform.py b/backend/nodes/hough_transform.py new file mode 100644 index 0000000..614c17d --- /dev/null +++ b/backend/nodes/hough_transform.py @@ -0,0 +1,102 @@ +"""Hough transform — detect lines and circles in images.""" + +from __future__ import annotations + +import numpy as np +from skimage.transform import hough_line, hough_line_peaks, hough_circle, hough_circle_peaks +from skimage.feature import canny + +from backend.node_registry import register_node +from backend.data_types import DataField, RecordTable + + +@register_node(display_name="Hough Transform") +class HoughTransform: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "detect": (["lines", "circles"], {"default": "lines"}), + "n_features": ("INT", {"default": 5, "min": 1, "max": 50}), + "canny_sigma": ("FLOAT", {"default": 1.0, "min": 0.1, "max": 10.0, "step": 0.1}), + "min_radius_px": ("INT", {"default": 10, "min": 2, "max": 500}), + "max_radius_px": ("INT", {"default": 100, "min": 5, "max": 1000}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'accumulator'), + ('RECORD_TABLE', 'features'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Detect lines or circles using the Hough transform. " + "First applies Canny edge detection, then accumulates votes in " + "Hough parameter space. Reports detected features with their parameters. " + "For lines: angle and distance from origin. " + "For circles: centre coordinates and radius. " + "Equivalent to Gwyddion's hough.c module." + ) + + def process( + self, + field: DataField, + detect: str, + n_features: int, + canny_sigma: float, + min_radius_px: int, + max_radius_px: int, + ) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + + # Normalise to [0, 1] for edge detection + dmin, dmax = data.min(), data.max() + if dmax > dmin: + norm = (data - dmin) / (dmax - dmin) + else: + norm = np.zeros_like(data) + + edges = canny(norm, sigma=canny_sigma) + + records: RecordTable = RecordTable() + + if detect == "lines": + tested_angles = np.linspace(-np.pi / 2, np.pi / 2, 180, endpoint=False) + h, theta, d = hough_line(edges, theta=tested_angles) + + accum = field.replace(data=h.astype(np.float64), si_unit_z="", domain="frequency") + + peaks = hough_line_peaks(h, theta, d, num_peaks=n_features) + for i, (hval, angle, dist) in enumerate(zip(*peaks)): + angle_deg = np.degrees(angle) + dist_phys = dist * field.dx + records.append({"quantity": f"Line {i + 1} angle", "value": f"{angle_deg:.1f}", "unit": "deg"}) + records.append({"quantity": f"Line {i + 1} distance", "value": f"{dist_phys:.4g}", "unit": field.si_unit_xy}) + records.append({"quantity": f"Line {i + 1} votes", "value": f"{hval:.0f}", "unit": ""}) + + elif detect == "circles": + max_radius_px = max(min_radius_px + 1, max_radius_px) + radii = np.arange(min_radius_px, max_radius_px + 1) + h = hough_circle(edges, radii) + + # Sum over radii for the accumulator visualisation + accum_data = h.sum(axis=0).astype(np.float64) + accum = field.replace(data=accum_data, si_unit_z="") + + accum_list, cx_list, cy_list, rad_list = hough_circle_peaks( + h, radii, total_num_peaks=n_features, + ) + for i, (hval, cx, cy, r) in enumerate(zip(accum_list, cx_list, cy_list, rad_list)): + cx_phys = cx * field.dx + cy_phys = cy * field.dy + r_phys = r * field.dx + records.append({"quantity": f"Circle {i + 1} x", "value": f"{cx_phys:.4g}", "unit": field.si_unit_xy}) + records.append({"quantity": f"Circle {i + 1} y", "value": f"{cy_phys:.4g}", "unit": field.si_unit_xy}) + records.append({"quantity": f"Circle {i + 1} radius", "value": f"{r_phys:.4g}", "unit": field.si_unit_xy}) + + else: + raise ValueError(f"Unknown detect mode: {detect!r}") + + return (accum, records) diff --git a/backend/nodes/image_stitch.py b/backend/nodes/image_stitch.py new file mode 100644 index 0000000..c1e0c5d --- /dev/null +++ b/backend/nodes/image_stitch.py @@ -0,0 +1,131 @@ +"""Image stitching — combine two overlapping scans into one image.""" + +from __future__ import annotations + +import numpy as np + +from backend.node_registry import register_node +from backend.data_types import DataField + + +def _find_overlap_shift(a: np.ndarray, b: np.ndarray) -> tuple[int, int]: + """Estimate the (dy, dx) pixel shift of b relative to a via cross-correlation.""" + fa = np.fft.fft2(a - a.mean()) + fb = np.fft.fft2(b - b.mean()) + cross = np.fft.ifft2(fa * np.conj(fb)) + cc = np.abs(np.fft.fftshift(cross)) + cy, cx = np.array(cc.shape) // 2 + peak = np.unravel_index(np.argmax(cc), cc.shape) + dy = peak[0] - cy + dx = peak[1] - cx + return int(dy), int(dx) + + +def _blend_overlap(a: np.ndarray, b: np.ndarray, axis: int) -> np.ndarray: + """Linear blend along the overlap axis.""" + length = a.shape[axis] + weight = np.linspace(1.0, 0.0, length) + if axis == 0: + weight = weight[:, np.newaxis] + return a * weight + b * (1.0 - weight) + + +@register_node(display_name="Image Stitch") +class ImageStitch: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field_a": ("DATA_FIELD",), + "field_b": ("DATA_FIELD",), + "direction": (["right", "below", "auto"], {"default": "auto"}), + "blend": (["linear", "none"], {"default": "linear"}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'stitched'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Combine two overlapping scans into a single image. " + "Uses cross-correlation to align the images and blends the overlap region. " + "Direction specifies how field_b is positioned relative to field_a. " + "'auto' uses cross-correlation to determine the best placement. " + "Equivalent to Gwyddion's merge.c / stitch.c modules." + ) + + def process(self, field_a: DataField, field_b: DataField, direction: str, blend: str) -> tuple: + a = np.asarray(field_a.data, dtype=np.float64) + b = np.asarray(field_b.data, dtype=np.float64) + + if direction == "auto": + # Pad b to match a's shape for cross-correlation + shape = (max(a.shape[0], b.shape[0]), max(a.shape[1], b.shape[1])) + a_pad = np.zeros(shape) + b_pad = np.zeros(shape) + a_pad[:a.shape[0], :a.shape[1]] = a + b_pad[:b.shape[0], :b.shape[1]] = b + dy, dx = _find_overlap_shift(a_pad, b_pad) + direction = "right" if abs(dx) >= abs(dy) else "below" + + if direction == "right": + # b is to the right of a + dy, dx = _find_overlap_shift( + a[:, -min(a.shape[1], b.shape[1]):], + b[:, :min(a.shape[1], b.shape[1])], + ) + overlap = max(0, min(a.shape[1], b.shape[1]) - abs(dx)) + if overlap <= 0: + # No overlap, just concatenate + out_h = max(a.shape[0], b.shape[0]) + out = np.zeros((out_h, a.shape[1] + b.shape[1])) + out[:a.shape[0], :a.shape[1]] = a + out[:b.shape[0], a.shape[1]:] = b + else: + out_h = max(a.shape[0], b.shape[0]) + out_w = a.shape[1] + b.shape[1] - overlap + out = np.zeros((out_h, out_w)) + out[:a.shape[0], :a.shape[1]] = a + b_start = a.shape[1] - overlap + if blend == "linear" and overlap > 1: + ov_a = a[:min(a.shape[0], b.shape[0]), a.shape[1] - overlap:] + ov_b = b[:min(a.shape[0], b.shape[0]), :overlap] + blended = _blend_overlap(ov_a, ov_b, axis=1) + out[:blended.shape[0], b_start:b_start + overlap] = blended + out[:b.shape[0], b_start + overlap:b_start + b.shape[1]] = b[:, overlap:] + else: + out[:b.shape[0], b_start:b_start + b.shape[1]] = b + + elif direction == "below": + dy, dx = _find_overlap_shift( + a[-min(a.shape[0], b.shape[0]):, :], + b[:min(a.shape[0], b.shape[0]), :], + ) + overlap = max(0, min(a.shape[0], b.shape[0]) - abs(dy)) + if overlap <= 0: + out_w = max(a.shape[1], b.shape[1]) + out = np.zeros((a.shape[0] + b.shape[0], out_w)) + out[:a.shape[0], :a.shape[1]] = a + out[a.shape[0]:, :b.shape[1]] = b + else: + out_w = max(a.shape[1], b.shape[1]) + out_h = a.shape[0] + b.shape[0] - overlap + out = np.zeros((out_h, out_w)) + out[:a.shape[0], :a.shape[1]] = a + b_start = a.shape[0] - overlap + if blend == "linear" and overlap > 1: + ov_a = a[a.shape[0] - overlap:, :min(a.shape[1], b.shape[1])] + ov_b = b[:overlap, :min(a.shape[1], b.shape[1])] + blended = _blend_overlap(ov_a, ov_b, axis=0) + out[b_start:b_start + overlap, :blended.shape[1]] = blended + out[b_start + overlap:b_start + b.shape[0], :b.shape[1]] = b[overlap:, :] + else: + out[b_start:b_start + b.shape[0], :b.shape[1]] = b + else: + raise ValueError(f"Unknown direction: {direction!r}") + + xreal = out.shape[1] * field_a.dx + yreal = out.shape[0] * field_a.dy + return (field_a.replace(data=out, xreal=xreal, yreal=yreal),) diff --git a/backend/nodes/lattice_measurement.py b/backend/nodes/lattice_measurement.py new file mode 100644 index 0000000..1d8170f --- /dev/null +++ b/backend/nodes/lattice_measurement.py @@ -0,0 +1,116 @@ +"""Lattice measurement — detect and measure periodic lattice structures.""" + +from __future__ import annotations + +import numpy as np + +from backend.node_registry import register_node +from backend.data_types import DataField, RecordTable + + +def _find_acf_peaks(acf: np.ndarray, dx: float, dy: float, n_peaks: int = 6): + """Find the strongest off-centre peaks in a 2D ACF. + + Returns list of (row, col, distance, angle_deg) tuples sorted by + correlation strength. + """ + yres, xres = acf.shape + cy, cx = yres // 2, xres // 2 + + # Mask the central DC region (within ~5% of the field size) + mask_radius = max(3, min(yres, xres) // 20) + yy, xx = np.ogrid[:yres, :xres] + dc_mask = ((yy - cy) ** 2 + (xx - cx) ** 2) <= mask_radius ** 2 + acf_masked = acf.copy() + acf_masked[dc_mask] = -np.inf + + peaks = [] + for _ in range(n_peaks): + idx = np.argmax(acf_masked) + r, c = np.unravel_index(idx, acf.shape) + if acf_masked[r, c] == -np.inf: + break + # Physical distance from centre + dist_x = (c - cx) * dx + dist_y = (r - cy) * dy + distance = np.hypot(dist_x, dist_y) + angle = np.degrees(np.arctan2(dist_y, dist_x)) + peaks.append((r, c, distance, angle, float(acf[r, c]))) + # Suppress neighbourhood + sup_r = max(0, r - mask_radius), min(yres, r + mask_radius + 1) + sup_c = max(0, c - mask_radius), min(xres, c + mask_radius + 1) + acf_masked[sup_r[0]:sup_r[1], sup_c[0]:sup_c[1]] = -np.inf + + return peaks + + +@register_node(display_name="Lattice Measurement") +class LatticeMeasurement: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "method": (["acf", "fft"], {"default": "acf"}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'correlation'), + ('RECORD_TABLE', 'lattice'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Detect and measure periodic lattice structures from a surface. " + "Computes the 2D ACF or FFT power spectrum, finds the strongest peaks, " + "and reports lattice vectors (spacing and angle). " + "Equivalent to Gwyddion's measure_lattice.c module." + ) + + def process(self, field: DataField, method: str) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + data = data - data.mean() + + if method == "acf": + fft_data = np.fft.fft2(data) + power = np.abs(fft_data) ** 2 + acf = np.real(np.fft.ifft2(power)) + acf = np.fft.fftshift(acf) + acf /= acf.max() if acf.max() != 0 else 1.0 + corr_field = field.replace(data=acf, si_unit_z="", domain="spatial") + elif method == "fft": + fft_data = np.fft.fft2(data) + power = np.fft.fftshift(np.abs(fft_data) ** 2) + power = np.log1p(power) + corr_field = field.replace(data=power, si_unit_z="", domain="frequency") + else: + raise ValueError(f"Unknown method: {method!r}") + + # Find lattice peaks from ACF + if method == "fft": + # Convert FFT peaks to ACF for lattice measurement + fft_data = np.fft.fft2(data) + power_raw = np.abs(fft_data) ** 2 + acf = np.real(np.fft.ifft2(power_raw)) + acf = np.fft.fftshift(acf) + acf /= acf.max() if acf.max() != 0 else 1.0 + + peaks = _find_acf_peaks(acf, field.dx, field.dy) + + records: RecordTable = RecordTable() + if len(peaks) >= 1: + _, _, d1, a1, s1 = peaks[0] + records.append({"quantity": "Vector 1 spacing", "value": f"{d1:.4g}", "unit": field.si_unit_xy}) + records.append({"quantity": "Vector 1 angle", "value": f"{a1:.1f}", "unit": "deg"}) + records.append({"quantity": "Vector 1 strength", "value": f"{s1:.4g}", "unit": ""}) + if len(peaks) >= 2: + _, _, d2, a2, s2 = peaks[1] + records.append({"quantity": "Vector 2 spacing", "value": f"{d2:.4g}", "unit": field.si_unit_xy}) + records.append({"quantity": "Vector 2 angle", "value": f"{a2:.1f}", "unit": "deg"}) + records.append({"quantity": "Vector 2 strength", "value": f"{s2:.4g}", "unit": ""}) + if len(peaks) >= 2: + angle_diff = abs(peaks[1][3] - peaks[0][3]) + records.append({"quantity": "Lattice angle", "value": f"{angle_diff:.1f}", "unit": "deg"}) + + return (corr_field, records) diff --git a/backend/nodes/mfm_analysis.py b/backend/nodes/mfm_analysis.py new file mode 100644 index 0000000..05905d8 --- /dev/null +++ b/backend/nodes/mfm_analysis.py @@ -0,0 +1,89 @@ +"""MFM analysis — magnetic force microscopy field calculations.""" + +from __future__ import annotations + +import numpy as np + +from backend.node_registry import register_node +from backend.data_types import DataField + + +@register_node(display_name="MFM Analysis") +class MFMAnalysis: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "operation": (["phase_to_force_gradient", "force_gradient_to_field", + "charge_density", "magnetisation"],), + "lift_height": ("FLOAT", { + "default": 50e-9, "min": 1e-9, "max": 1e-6, "step": 1e-9, + }), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'result'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Magnetic force microscopy analysis. Converts MFM phase or frequency " + "shift data into physical quantities using Fourier-domain transfer " + "functions. Operations: phase_to_force_gradient converts phase to " + "d²F/dz²; force_gradient_to_field recovers the stray field Hz; " + "charge_density computes the effective magnetic charge; " + "magnetisation estimates the z-component of sample magnetisation. " + "Equivalent to Gwyddion's mfm_*.c modules." + ) + + def process(self, field: DataField, operation: str, lift_height: float) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + yres, xres = data.shape + + # Build frequency grids + kx = np.fft.fftfreq(xres, d=field.dx) * 2 * np.pi + ky = np.fft.fftfreq(yres, d=field.dy) * 2 * np.pi + KX, KY = np.meshgrid(kx, ky) + K = np.sqrt(KX**2 + KY**2) + + # Avoid division by zero at DC + K_safe = np.where(K == 0, 1.0, K) + + fft_data = np.fft.fft2(data) + + if operation == "phase_to_force_gradient": + # Phase ∝ F'' / k_cantilever, output is proportional to d²F/dz² + # Just propagate to the surface by multiplying by exp(k*h) + transfer = np.exp(K * lift_height) + transfer[0, 0] = 1.0 + result = np.real(np.fft.ifft2(fft_data * transfer)) + z_unit = field.si_unit_z + + elif operation == "force_gradient_to_field": + # Recover Hz from d²F/dz² using the relation in Fourier space: + # Hz(k) = F''(k) * exp(k*h) / (μ₀ * k²) + transfer = np.exp(K * lift_height) / (K_safe**2) + transfer[0, 0] = 0.0 # remove DC + result = np.real(np.fft.ifft2(fft_data * transfer)) + z_unit = "A/m" + + elif operation == "charge_density": + # σ_m = -dHz/dz ∝ k * Hz in Fourier space + transfer = K * np.exp(K * lift_height) / (K_safe**2) + transfer[0, 0] = 0.0 + result = np.real(np.fft.ifft2(fft_data * transfer)) + z_unit = "A/m²" + + elif operation == "magnetisation": + # Mz ∝ σ_m, further divide by k to integrate + transfer = np.exp(K * lift_height) / K_safe + transfer[0, 0] = 0.0 + result = np.real(np.fft.ifft2(fft_data * transfer)) + z_unit = "A/m" + + else: + raise ValueError(f"Unknown MFM operation: {operation!r}") + + return (field.replace(data=result, si_unit_z=z_unit),) diff --git a/backend/nodes/shape_fitting.py b/backend/nodes/shape_fitting.py new file mode 100644 index 0000000..d788740 --- /dev/null +++ b/backend/nodes/shape_fitting.py @@ -0,0 +1,132 @@ +"""Shape fitting — fit geometric primitives to surface data.""" + +from __future__ import annotations + +import numpy as np +from scipy.optimize import least_squares + +from backend.node_registry import register_node +from backend.data_types import DataField, RecordTable + + +def _fit_sphere(x, y, z): + """Fit z = z0 - sqrt(R² - (x-cx)² - (y-cy)²) via least squares.""" + cx0 = x.mean() + cy0 = y.mean() + r0 = max(x.max() - x.min(), y.max() - y.min()) * 2 + + def residuals(params): + cx, cy, z0, R = params + r2 = (x - cx)**2 + (y - cy)**2 + valid = r2 < R**2 + model = np.where(valid, z0 - np.sqrt(np.maximum(R**2 - r2, 0)), z0) + return z - model + + result = least_squares(residuals, [cx0, cy0, z.max(), r0], method="lm") + cx, cy, z0, R = result.x + return {"cx": cx, "cy": cy, "z0": z0, "R": abs(R)}, result.fun + + +def _fit_paraboloid(x, y, z): + """Fit z = z0 + a*(x-cx)² + b*(y-cy)² via least squares.""" + cx0 = x.mean() + cy0 = y.mean() + + def residuals(params): + cx, cy, z0, a, b = params + model = z0 + a * (x - cx)**2 + b * (y - cy)**2 + return z - model + + result = least_squares(residuals, [cx0, cy0, z.mean(), 0.0, 0.0], method="lm") + cx, cy, z0, a, b = result.x + return {"cx": cx, "cy": cy, "z0": z0, "a": a, "b": b}, result.fun + + +def _fit_cylinder(x, y, z): + """Fit z = z0 + a*(x*cos(θ) + y*sin(θ) - d)² (cylinder along one axis).""" + def residuals(params): + z0, a, theta, d = params + u = x * np.cos(theta) + y * np.sin(theta) - d + model = z0 + a * u**2 + return z - model + + result = least_squares(residuals, [z.mean(), 0.0, 0.0, 0.0], method="lm") + z0, a, theta, d = result.x + R = abs(0.5 / a) if abs(a) > 1e-20 else float("inf") + return {"z0": z0, "curvature": a, "angle_deg": np.degrees(theta), "R": R}, result.fun + + +@register_node(display_name="Shape Fitting") +class ShapeFitting: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "field": ("DATA_FIELD",), + "shape": (["sphere", "paraboloid", "cylinder"], {"default": "sphere"}), + "output": (["residual", "fitted"], {"default": "residual"}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'result'), + ('RECORD_TABLE', 'parameters'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Fit a geometric primitive (sphere, paraboloid, or cylinder) to the " + "surface data. Outputs either the fitted surface or the residual " + "(original minus fit). Reports fitted parameters including radius " + "of curvature, centre position, etc. " + "Equivalent to Gwyddion's fit-shape.c module." + ) + + def process(self, field: DataField, shape: str, output: str) -> tuple: + data = np.asarray(field.data, dtype=np.float64) + yres, xres = data.shape + + # Build physical coordinate grids + x = np.arange(xres) * field.dx + field.xoff + y = np.arange(yres) * field.dy + field.yoff + X, Y = np.meshgrid(x, y) + x_flat = X.ravel() + y_flat = Y.ravel() + z_flat = data.ravel() + + if shape == "sphere": + params, residuals = _fit_sphere(x_flat, y_flat, z_flat) + elif shape == "paraboloid": + params, residuals = _fit_paraboloid(x_flat, y_flat, z_flat) + elif shape == "cylinder": + params, residuals = _fit_cylinder(x_flat, y_flat, z_flat) + else: + raise ValueError(f"Unknown shape: {shape!r}") + + # Reconstruct the fitted surface + residual_map = residuals.reshape(data.shape) + fitted_map = data - residual_map + + if output == "residual": + out_data = residual_map + else: + out_data = fitted_map + + # Build result table + records: RecordTable = RecordTable() + rms = float(np.sqrt(np.mean(residuals**2))) + records.append({"quantity": "RMS residual", "value": f"{rms:.4g}", "unit": field.si_unit_z}) + + unit_xy = field.si_unit_xy + unit_z = field.si_unit_z + for key, val in params.items(): + if key in ("cx", "cy", "R", "d"): + records.append({"quantity": key, "value": f"{val:.4g}", "unit": unit_xy}) + elif key in ("z0",): + records.append({"quantity": key, "value": f"{val:.4g}", "unit": unit_z}) + elif key == "angle_deg": + records.append({"quantity": "angle", "value": f"{val:.2f}", "unit": "deg"}) + else: + records.append({"quantity": key, "value": f"{val:.4g}", "unit": ""}) + + return (field.replace(data=out_data), records) diff --git a/backend/nodes/synthetic_surface.py b/backend/nodes/synthetic_surface.py new file mode 100644 index 0000000..f5dc57b --- /dev/null +++ b/backend/nodes/synthetic_surface.py @@ -0,0 +1,152 @@ +"""Synthetic surface generation — create test surfaces for development and calibration.""" + +from __future__ import annotations + +import numpy as np + +from backend.node_registry import register_node +from backend.data_types import DataField + + +def _fbm_surface(shape, rng, H=0.7): + """Fractional Brownian motion surface via spectral synthesis.""" + yres, xres = shape + kx = np.fft.fftfreq(xres) + ky = np.fft.fftfreq(yres) + KX, KY = np.meshgrid(kx, ky) + K = np.sqrt(KX**2 + KY**2) + K[0, 0] = 1.0 # avoid division by zero + power = K ** (-(H + 1.0)) + power[0, 0] = 0.0 + + phases = rng.uniform(0, 2 * np.pi, shape) + amplitudes = rng.standard_normal(shape) + fft_data = amplitudes * np.sqrt(power) * np.exp(1j * phases) + surface = np.real(np.fft.ifft2(fft_data)) + return surface + + +def _lattice_surface(shape, xreal, yreal, spacing, angle_deg): + """Periodic lattice (sinusoidal grid).""" + yres, xres = shape + x = np.linspace(0, xreal, xres, endpoint=False) + y = np.linspace(0, yreal, yres, endpoint=False) + X, Y = np.meshgrid(x, y) + + theta = np.radians(angle_deg) + k = 2 * np.pi / spacing + surface = np.cos(k * X) + np.cos(k * (X * np.cos(theta) + Y * np.sin(theta))) + return surface + + +def _steps_surface(shape, n_steps): + """Terraced step structure.""" + yres, xres = shape + ramp = np.linspace(0, n_steps, xres, endpoint=False) + steps = np.floor(ramp) + return np.tile(steps, (yres, 1)).astype(np.float64) + + +def _particles_surface(shape, rng, n_particles, radius_px): + """Random spherical particles on a flat background.""" + yres, xres = shape + surface = np.zeros(shape) + yy, xx = np.ogrid[:yres, :xres] + for _ in range(n_particles): + cy = rng.integers(0, yres) + cx = rng.integers(0, xres) + r2 = (yy - cy)**2 + (xx - cx)**2 + height = np.sqrt(np.maximum(radius_px**2 - r2, 0.0)) + surface = np.maximum(surface, height) + return surface + + +@register_node(display_name="Synthetic Surface") +class SyntheticSurface: + @classmethod + def INPUT_TYPES(cls): + return { + "required": { + "pattern": (["fbm", "white_noise", "lattice", "steps", "particles", "flat"], + {"default": "fbm"}), + "xres": ("INT", {"default": 256, "min": 16, "max": 2048}), + "yres": ("INT", {"default": 256, "min": 16, "max": 2048}), + "xreal": ("FLOAT", {"default": 1e-6, "min": 1e-9, "max": 1.0, "step": 1e-9}), + "yreal": ("FLOAT", {"default": 1e-6, "min": 1e-9, "max": 1.0, "step": 1e-9}), + "amplitude": ("FLOAT", {"default": 1e-9, "min": 0.0, "max": 1e-3, "step": 1e-10}), + "seed": ("INT", {"default": 42, "min": 0, "max": 999999}), + }, + "optional": { + "hurst_exponent": ("FLOAT", {"default": 0.7, "min": 0.0, "max": 1.0, "step": 0.05}), + "lattice_spacing": ("FLOAT", { + "default": 100e-9, "min": 1e-9, "max": 1e-3, "step": 1e-9, + }), + "lattice_angle": ("FLOAT", {"default": 90.0, "min": 0.0, "max": 180.0, "step": 1.0}), + "n_steps": ("INT", {"default": 5, "min": 1, "max": 100}), + "n_particles": ("INT", {"default": 20, "min": 1, "max": 500}), + "particle_radius_px": ("INT", {"default": 10, "min": 2, "max": 100}), + } + } + + OUTPUTS = ( + ('DATA_FIELD', 'surface'), + ) + FUNCTION = "process" + + DESCRIPTION = ( + "Generate synthetic test surfaces for development, calibration, and " + "algorithm testing. Patterns: fbm (fractional Brownian motion), " + "white_noise, lattice (periodic grid), steps (terraced), " + "particles (spherical bumps on flat), flat (zero surface). " + "Equivalent to Gwyddion's *_synth.c modules." + ) + + def process( + self, + pattern: str, + xres: int, + yres: int, + xreal: float, + yreal: float, + amplitude: float, + seed: int, + hurst_exponent: float = 0.7, + lattice_spacing: float = 100e-9, + lattice_angle: float = 90.0, + n_steps: int = 5, + n_particles: int = 20, + particle_radius_px: int = 10, + ) -> tuple: + shape = (yres, xres) + rng = np.random.default_rng(seed) + + if pattern == "fbm": + data = _fbm_surface(shape, rng, H=hurst_exponent) + elif pattern == "white_noise": + data = rng.standard_normal(shape) + elif pattern == "lattice": + data = _lattice_surface(shape, xreal, yreal, lattice_spacing, lattice_angle) + elif pattern == "steps": + data = _steps_surface(shape, n_steps) + elif pattern == "particles": + data = _particles_surface(shape, rng, n_particles, particle_radius_px) + elif pattern == "flat": + data = np.zeros(shape) + else: + raise ValueError(f"Unknown pattern: {pattern!r}") + + # Normalise and scale by amplitude + drange = data.max() - data.min() + if drange > 0: + data = (data - data.min()) / drange * amplitude + else: + data = np.zeros(shape) + + field = DataField( + data=data, + xreal=xreal, + yreal=yreal, + si_unit_xy="m", + si_unit_z="m", + ) + return (field,) diff --git a/tests/node_tests/affine_correction.py b/tests/node_tests/affine_correction.py new file mode 100644 index 0000000..ca7bf7c --- /dev/null +++ b/tests/node_tests/affine_correction.py @@ -0,0 +1,45 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_identity_transform(): + """No shear, unit scale, zero rotation should return near-identical data.""" + from backend.nodes.affine_correction import AffineCorrection + + node = AffineCorrection() + field = make_field(shape=(32, 32)) + result, = node.process(field, shear_x=0.0, shear_y=0.0, scale_x=1.0, scale_y=1.0, angle=0.0) + assert result.data.shape == (32, 32) + assert np.allclose(result.data, field.data, atol=1e-10) + + +def test_scale_changes_values(): + from backend.nodes.affine_correction import AffineCorrection + + node = AffineCorrection() + field = make_field(shape=(32, 32)) + result, = node.process(field, shear_x=0.0, shear_y=0.0, scale_x=2.0, scale_y=1.0, angle=0.0) + assert result.data.shape == (32, 32) + # Scaled field should differ from original + assert not np.allclose(result.data, field.data) + + +def test_rotation_preserves_shape(): + from backend.nodes.affine_correction import AffineCorrection + + node = AffineCorrection() + field = make_field(shape=(48, 64)) + result, = node.process(field, shear_x=0.0, shear_y=0.0, scale_x=1.0, scale_y=1.0, angle=10.0) + assert result.data.shape == (48, 64) + + +def test_shear_changes_values(): + from backend.nodes.affine_correction import AffineCorrection + + node = AffineCorrection() + # Use a non-symmetric field so shear has a visible effect + data = np.outer(np.arange(32), np.ones(32)) + field = make_field(data=data) + result, = node.process(field, shear_x=0.3, shear_y=0.0, scale_x=1.0, scale_y=1.0, angle=0.0) + assert not np.allclose(result.data, field.data) diff --git a/tests/node_tests/deconvolution.py b/tests/node_tests/deconvolution.py new file mode 100644 index 0000000..bf33b2a --- /dev/null +++ b/tests/node_tests/deconvolution.py @@ -0,0 +1,42 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_wiener_preserves_shape(): + from backend.nodes.deconvolution import Deconvolution + + node = Deconvolution() + field = make_field(shape=(32, 32)) + result, = node.process(field, "wiener", 2.0, 0.01, 10) + assert result.data.shape == (32, 32) + assert np.isfinite(result.data).all() + + +def test_richardson_lucy_preserves_shape(): + from backend.nodes.deconvolution import Deconvolution + + node = Deconvolution() + field = make_field(data=np.abs(np.random.default_rng(42).standard_normal((32, 32))) + 0.1) + result, = node.process(field, "richardson_lucy", 2.0, 0.01, 5) + assert result.data.shape == (32, 32) + assert np.isfinite(result.data).all() + + +def test_wiener_flat_field(): + from backend.nodes.deconvolution import Deconvolution + + node = Deconvolution() + field = make_field(data=np.ones((32, 32))) + result, = node.process(field, "wiener", 2.0, 0.01, 10) + # A flat field convolved with anything is still flat; Wiener should preserve it + assert result.data.shape == (32, 32) + + +def test_unknown_method(): + from backend.nodes.deconvolution import Deconvolution + + node = Deconvolution() + field = make_field(shape=(32, 32)) + with pytest.raises(ValueError): + node.process(field, "unknown", 2.0, 0.01, 10) diff --git a/tests/node_tests/drift_correction.py b/tests/node_tests/drift_correction.py new file mode 100644 index 0000000..7c2f917 --- /dev/null +++ b/tests/node_tests/drift_correction.py @@ -0,0 +1,53 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_drift_correction_flat(): + from backend.nodes.drift_correction import DriftCorrection + + node = DriftCorrection() + field = make_field(data=np.zeros((32, 32))) + result, = node.process(field, "previous_row", "horizontal") + assert result.data.shape == (32, 32) + assert np.allclose(result.data, 0.0, atol=1e-10) + + +def test_drift_correction_preserves_shape(): + from backend.nodes.drift_correction import DriftCorrection + + node = DriftCorrection() + field = make_field(shape=(48, 64)) + for ref in ("previous_row", "mean_row"): + for direction in ("horizontal", "vertical"): + result, = node.process(field, ref, direction) + assert result.data.shape == (48, 64) + + +def test_drift_correction_reduces_drift(): + """A field with artificial row-by-row drift should have less variance after correction.""" + from backend.nodes.drift_correction import DriftCorrection + + node = DriftCorrection() + rng = np.random.default_rng(42) + base = rng.standard_normal((32, 64)) + # Add artificial drift: shift each row by cumulative offset + drifted = base.copy() + for i in range(1, 32): + drifted[i] = np.roll(base[i], i) + + field = make_field(data=drifted) + result, = node.process(field, "previous_row", "horizontal") + # The corrected field should have lower inter-row variance + row_means_before = np.var(np.diff(drifted, axis=0)) + row_means_after = np.var(np.diff(result.data, axis=0)) + assert row_means_after <= row_means_before + + +def test_drift_correction_mean_row_reference(): + from backend.nodes.drift_correction import DriftCorrection + + node = DriftCorrection() + field = make_field(shape=(32, 32)) + result, = node.process(field, "mean_row", "horizontal") + assert result.data.shape == (32, 32) diff --git a/tests/node_tests/facet_analysis.py b/tests/node_tests/facet_analysis.py new file mode 100644 index 0000000..f840d7b --- /dev/null +++ b/tests/node_tests/facet_analysis.py @@ -0,0 +1,42 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_facet_analysis_basic(): + from backend.nodes.facet_analysis import FacetAnalysis + + node = FacetAnalysis() + field = make_field(shape=(64, 64)) + result, = node.process(field, 180, 3) + assert result.data.ndim == 2 + assert result.si_unit_xy == "deg" + + +def test_facet_analysis_flat_field(): + from backend.nodes.facet_analysis import FacetAnalysis + + node = FacetAnalysis() + field = make_field(data=np.zeros((32, 32))) + result, = node.process(field, 180, 3) + assert result.data.ndim == 2 + + +def test_facet_analysis_density_normalised(): + from backend.nodes.facet_analysis import FacetAnalysis + + node = FacetAnalysis() + field = make_field(shape=(64, 64)) + result, = node.process(field, 180, 3) + # Should be a normalised probability density + assert np.isclose(result.data.sum(), 1.0, atol=1e-10) + + +def test_facet_analysis_bin_count(): + from backend.nodes.facet_analysis import FacetAnalysis + + node = FacetAnalysis() + field = make_field(shape=(64, 64)) + result, = node.process(field, 360, 3) + # phi bins = n_bins, theta bins = n_bins // 4 + assert result.data.shape == (90, 360) diff --git a/tests/node_tests/feature_detection.py b/tests/node_tests/feature_detection.py new file mode 100644 index 0000000..6491095 --- /dev/null +++ b/tests/node_tests/feature_detection.py @@ -0,0 +1,50 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_canny_edge_detection(): + from backend.nodes.feature_detection import FeatureDetection + + node = FeatureDetection() + # Create a field with a sharp edge + data = np.zeros((64, 64)) + data[:, 32:] = 1.0 + field = make_field(data=data) + result, records = node.process(field, "canny", 1.0) + assert result.data.shape == (64, 64) + # Should detect some edge pixels + assert result.data.sum() > 0 + assert isinstance(records, list) + + +def test_harris_corner_detection(): + from backend.nodes.feature_detection import FeatureDetection + + node = FeatureDetection() + # Create a field with a sharp corner (L-shape) + data = np.zeros((64, 64)) + data[16:48, 16:48] = 1.0 + field = make_field(data=data) + result, records = node.process(field, "harris", 1.0) + assert result.data.shape == (64, 64) + assert isinstance(records, list) + + +def test_canny_flat_field(): + from backend.nodes.feature_detection import FeatureDetection + + node = FeatureDetection() + field = make_field(data=np.ones((32, 32))) + result, records = node.process(field, "canny", 1.0) + # Flat field should have no edges + assert result.data.sum() == 0 + + +def test_unknown_method(): + from backend.nodes.feature_detection import FeatureDetection + + node = FeatureDetection() + field = make_field(shape=(32, 32)) + with pytest.raises(ValueError): + node.process(field, "unknown", 1.0) diff --git a/tests/node_tests/hough_transform.py b/tests/node_tests/hough_transform.py new file mode 100644 index 0000000..d7f97d4 --- /dev/null +++ b/tests/node_tests/hough_transform.py @@ -0,0 +1,50 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_hough_lines_basic(): + from backend.nodes.hough_transform import HoughTransform + + node = HoughTransform() + # Create a field with a horizontal line + data = np.zeros((64, 64)) + data[32, :] = 1.0 + field = make_field(data=data) + accum, records = node.process(field, "lines", 3, 1.0, 10, 30) + assert accum.data.ndim == 2 + assert isinstance(records, list) + + +def test_hough_circles_basic(): + from backend.nodes.hough_transform import HoughTransform + + node = HoughTransform() + # Create a field with a circle + data = np.zeros((64, 64)) + yy, xx = np.ogrid[:64, :64] + r2 = (yy - 32)**2 + (xx - 32)**2 + data[(r2 > 144) & (r2 < 196)] = 1.0 # ring at radius ~13 + field = make_field(data=data) + accum, records = node.process(field, "circles", 3, 1.0, 8, 20) + assert accum.data.shape == (64, 64) + assert isinstance(records, list) + + +def test_hough_preserves_output_types(): + from backend.nodes.hough_transform import HoughTransform + + node = HoughTransform() + field = make_field(shape=(32, 32)) + accum, records = node.process(field, "lines", 2, 1.0, 5, 15) + assert hasattr(accum, 'data') + assert isinstance(records, list) + + +def test_hough_unknown_mode(): + from backend.nodes.hough_transform import HoughTransform + + node = HoughTransform() + field = make_field(shape=(32, 32)) + with pytest.raises(ValueError): + node.process(field, "unknown", 1, 1.0, 5, 15) diff --git a/tests/node_tests/image_stitch.py b/tests/node_tests/image_stitch.py new file mode 100644 index 0000000..cacfa01 --- /dev/null +++ b/tests/node_tests/image_stitch.py @@ -0,0 +1,45 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_stitch_right_no_overlap(): + from backend.nodes.image_stitch import ImageStitch + + node = ImageStitch() + a = make_field(data=np.ones((32, 32))) + b = make_field(data=np.ones((32, 32)) * 2) + result, = node.process(a, b, "right", "none") + assert result.data.shape[0] == 32 + assert result.data.shape[1] >= 32 + + +def test_stitch_below(): + from backend.nodes.image_stitch import ImageStitch + + node = ImageStitch() + a = make_field(data=np.ones((32, 32))) + b = make_field(data=np.ones((32, 32)) * 2) + result, = node.process(a, b, "below", "none") + assert result.data.shape[1] == 32 + assert result.data.shape[0] >= 32 + + +def test_stitch_auto_direction(): + from backend.nodes.image_stitch import ImageStitch + + node = ImageStitch() + a = make_field(data=np.random.default_rng(0).standard_normal((32, 32))) + b = make_field(data=np.random.default_rng(1).standard_normal((32, 32))) + result, = node.process(a, b, "auto", "linear") + assert result.data.ndim == 2 + + +def test_stitch_unknown_direction(): + from backend.nodes.image_stitch import ImageStitch + + node = ImageStitch() + a = make_field(shape=(16, 16)) + b = make_field(shape=(16, 16)) + with pytest.raises(ValueError): + node.process(a, b, "unknown", "none") diff --git a/tests/node_tests/lattice_measurement.py b/tests/node_tests/lattice_measurement.py new file mode 100644 index 0000000..50be67a --- /dev/null +++ b/tests/node_tests/lattice_measurement.py @@ -0,0 +1,46 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_lattice_acf_returns_outputs(): + from backend.nodes.lattice_measurement import LatticeMeasurement + + node = LatticeMeasurement() + field = make_field(shape=(64, 64)) + corr, records = node.process(field, "acf") + assert corr.data.shape == (64, 64) + assert isinstance(records, list) + + +def test_lattice_fft_returns_outputs(): + from backend.nodes.lattice_measurement import LatticeMeasurement + + node = LatticeMeasurement() + field = make_field(shape=(64, 64)) + corr, records = node.process(field, "fft") + assert corr.data.shape == (64, 64) + + +def test_lattice_detects_periodic_structure(): + """A simple cosine grid should produce lattice measurements.""" + from backend.nodes.lattice_measurement import LatticeMeasurement + + node = LatticeMeasurement() + x = np.linspace(0, 4 * np.pi, 64, endpoint=False) + X, Y = np.meshgrid(x, x) + data = np.cos(X) + np.cos(Y) + field = make_field(data=data) + + corr, records = node.process(field, "acf") + # Should detect at least one vector + assert len(records) >= 3 + + +def test_lattice_unknown_method(): + from backend.nodes.lattice_measurement import LatticeMeasurement + + node = LatticeMeasurement() + field = make_field(shape=(32, 32)) + with pytest.raises(ValueError): + node.process(field, "unknown") diff --git a/tests/node_tests/mfm_analysis.py b/tests/node_tests/mfm_analysis.py new file mode 100644 index 0000000..153452c --- /dev/null +++ b/tests/node_tests/mfm_analysis.py @@ -0,0 +1,47 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_mfm_all_operations(): + from backend.nodes.mfm_analysis import MFMAnalysis + + node = MFMAnalysis() + field = make_field(shape=(32, 32)) + + for op in ("phase_to_force_gradient", "force_gradient_to_field", + "charge_density", "magnetisation"): + result, = node.process(field, op, 50e-9) + assert result.data.shape == (32, 32) + assert np.isfinite(result.data).all() + + +def test_mfm_flat_field(): + from backend.nodes.mfm_analysis import MFMAnalysis + + node = MFMAnalysis() + field = make_field(data=np.zeros((32, 32))) + result, = node.process(field, "phase_to_force_gradient", 50e-9) + assert np.allclose(result.data, 0.0, atol=1e-10) + + +def test_mfm_units(): + from backend.nodes.mfm_analysis import MFMAnalysis + + node = MFMAnalysis() + field = make_field(shape=(32, 32)) + + result, = node.process(field, "force_gradient_to_field", 50e-9) + assert result.si_unit_z == "A/m" + + result, = node.process(field, "charge_density", 50e-9) + assert result.si_unit_z == "A/m²" + + +def test_mfm_unknown_operation(): + from backend.nodes.mfm_analysis import MFMAnalysis + + node = MFMAnalysis() + field = make_field(shape=(32, 32)) + with pytest.raises(ValueError): + node.process(field, "unknown_op", 50e-9) diff --git a/tests/node_tests/scar_removal.py b/tests/node_tests/scar_removal.py index be85bd5..27e3450 100644 --- a/tests/node_tests/scar_removal.py +++ b/tests/node_tests/scar_removal.py @@ -8,8 +8,7 @@ def test_scar_removal(): node = ScarRemoval() info = get_node_info("ScarRemoval") - assert info["category"] == "Filter" - assert {entry["category"] for entry in info["menu_categories"]} == {"Filter", "Level & Correct"} + assert info["category"] == "Level & Correct" rows = 96 cols = 128 diff --git a/tests/node_tests/shape_fitting.py b/tests/node_tests/shape_fitting.py new file mode 100644 index 0000000..3aef7e8 --- /dev/null +++ b/tests/node_tests/shape_fitting.py @@ -0,0 +1,45 @@ +import numpy as np +import pytest +from tests.node_tests._shared import make_field + + +def test_fit_sphere_residual(): + from backend.nodes.shape_fitting import ShapeFitting + + node = ShapeFitting() + # Create a spherical surface + y, x = np.mgrid[:32, :32] + data = 100.0 - np.sqrt(np.maximum(500**2 - (x - 16)**2 - (y - 16)**2, 0)) + field = make_field(data=data) + result, records = node.process(field, "sphere", "residual") + assert result.data.shape == (32, 32) + assert isinstance(records, list) + + +def test_fit_paraboloid(): + from backend.nodes.shape_fitting import ShapeFitting + + node = ShapeFitting() + field = make_field(shape=(32, 32)) + result, records = node.process(field, "paraboloid", "fitted") + assert result.data.shape == (32, 32) + assert isinstance(records, list) + + +def test_fit_cylinder(): + from backend.nodes.shape_fitting import ShapeFitting + + node = ShapeFitting() + field = make_field(shape=(32, 32)) + result, records = node.process(field, "cylinder", "residual") + assert result.data.shape == (32, 32) + assert isinstance(records, list) + + +def test_fit_unknown_shape(): + from backend.nodes.shape_fitting import ShapeFitting + + node = ShapeFitting() + field = make_field(shape=(32, 32)) + with pytest.raises(ValueError): + node.process(field, "cone", "residual") diff --git a/tests/node_tests/synthetic_surface.py b/tests/node_tests/synthetic_surface.py new file mode 100644 index 0000000..1fa68e8 --- /dev/null +++ b/tests/node_tests/synthetic_surface.py @@ -0,0 +1,57 @@ +import numpy as np +import pytest + + +def test_all_patterns_produce_correct_shape(): + from backend.nodes.synthetic_surface import SyntheticSurface + + node = SyntheticSurface() + for pattern in ("fbm", "white_noise", "lattice", "steps", "particles", "flat"): + result, = node.process( + pattern=pattern, xres=64, yres=48, xreal=1e-6, yreal=1e-6, + amplitude=1e-9, seed=42, + ) + assert result.data.shape == (48, 64), f"Failed for {pattern}" + + +def test_flat_is_zero(): + from backend.nodes.synthetic_surface import SyntheticSurface + + node = SyntheticSurface() + result, = node.process( + pattern="flat", xres=32, yres=32, xreal=1e-6, yreal=1e-6, + amplitude=1e-9, seed=0, + ) + assert np.allclose(result.data, 0.0) + + +def test_seed_reproducibility(): + from backend.nodes.synthetic_surface import SyntheticSurface + + node = SyntheticSurface() + kwargs = dict(pattern="fbm", xres=32, yres=32, xreal=1e-6, yreal=1e-6, + amplitude=1e-9, seed=123) + r1, = node.process(**kwargs) + r2, = node.process(**kwargs) + assert np.array_equal(r1.data, r2.data) + + +def test_amplitude_scaling(): + from backend.nodes.synthetic_surface import SyntheticSurface + + node = SyntheticSurface() + result, = node.process( + pattern="white_noise", xres=64, yres=64, xreal=1e-6, yreal=1e-6, + amplitude=5e-9, seed=42, + ) + assert result.data.max() <= 5e-9 + 1e-15 + assert result.data.min() >= -1e-15 + + +def test_unknown_pattern(): + from backend.nodes.synthetic_surface import SyntheticSurface + + node = SyntheticSurface() + with pytest.raises(ValueError): + node.process(pattern="unknown", xres=32, yres=32, xreal=1e-6, yreal=1e-6, + amplitude=1e-9, seed=0)