remaining med value features

This commit is contained in:
2026-03-30 22:31:04 -07:00
parent ea749938bb
commit ced43bec4f
21 changed files with 4257 additions and 9 deletions

View File

@@ -69,4 +69,13 @@ from backend.nodes import (
tip_model,
tip_deconvolution,
tip_blind_estimate,
# New: remaining Gwyddion feature-gap nodes
filter_kuwahara,
entropy,
local_contrast,
filter_custom,
spot_removal,
wavelet_denoise,
cross_correlate,
template_match,
)

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import numpy as np
from backend.data_types import DataField
from backend.node_registry import register_node
@register_node(display_name="Cross-Correlate")
class CrossCorrelate:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"field_a": ("DATA_FIELD",),
"field_b": ("DATA_FIELD",),
"mode": (["full", "same", "valid"], {"default": "same"}),
"normalize": ("BOOLEAN", {"default": True}),
}
}
OUTPUTS = (
('DATA_FIELD', 'correlation'),
)
FUNCTION = "process"
DESCRIPTION = (
"Compute 2D cross-correlation between two fields. The correlation peak indicates "
"the offset where the two fields best match. Useful for drift measurement and feature "
"alignment. Equivalent to Gwyddion crosscor.c."
)
def process(
self,
field_a: DataField,
field_b: DataField,
mode: str,
normalize: bool,
) -> tuple:
from scipy.signal import fftconvolve
a = field_a.data - field_a.data.mean()
b = field_b.data - field_b.data.mean()
# Cross-correlation via FFT: correlate(a,b) = ifft(fft(a) * conj(fft(b)))
# Achieved by convolving a with the flipped b
corr = fftconvolve(a, b[::-1, ::-1], mode=mode)
if normalize:
denom = np.sqrt((a ** 2).sum() * (b ** 2).sum())
if denom > 0:
corr = corr / denom
if mode == "same":
# Output is the same shape as field_a — reuse its physical dimensions
return (field_a.replace(data=corr),)
# For "full" mode: output shape is (Na+Nb-1, Ma+Mb-1)
# Scale physical dimensions proportionally
na, ma = field_a.data.shape
nb, mb = field_b.data.shape
out_y, out_x = corr.shape
# Physical size per pixel stays the same as field_a; total physical size scales
new_xreal = field_a.xreal * out_x / ma if ma > 0 else field_a.xreal
new_yreal = field_a.yreal * out_y / na if na > 0 else field_a.yreal
return (field_a.replace(data=corr, xreal=new_xreal, yreal=new_yreal),)

74
backend/nodes/entropy.py Normal file
View File

@@ -0,0 +1,74 @@
from __future__ import annotations
import numpy as np
from backend.node_registry import register_node
from backend.data_types import DataField, RecordTable
from backend.execution_context import emit_table
@register_node(display_name="Entropy")
class Entropy:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"field": ("DATA_FIELD",),
"mode": (["height values", "slope magnitude"], {"default": "height values"}),
"n_bins": ("INT", {"default": 256, "min": 16, "max": 1024}),
}
}
OUTPUTS = (
('FLOAT', 'entropy'),
('FLOAT', 'normalised_entropy'),
)
FUNCTION = "process"
DESCRIPTION = (
"Shannon entropy of the height or slope distribution. "
"H = -\u03a3 p\u00b7ln(p). Equivalent to Gwyddion entropy.c."
)
def process(self, field: DataField, mode: str, n_bins: int) -> tuple:
n_bins = max(16, int(n_bins))
data = np.asarray(field.data, dtype=np.float64)
if mode == "slope magnitude":
# Compute slope magnitude from Sobel-like finite differences.
# Central differences along x and y (axis=1 and axis=0).
# np.gradient uses central differences, same spirit as Sobel.
dy, dx = np.gradient(data)
values = np.hypot(dx, dy).ravel()
else:
values = data.ravel()
# Remove non-finite values before binning.
values = values[np.isfinite(values)]
if values.size == 0:
h = 0.0
h_norm = 0.0
else:
counts, _ = np.histogram(values, bins=n_bins)
total = counts.sum()
if total == 0:
h = 0.0
h_norm = 0.0
else:
# Probability distribution; skip zero bins.
p = counts[counts > 0].astype(np.float64) / float(total)
h = float(-np.sum(p * np.log(p)))
# Maximum possible entropy for n_bins equally occupied bins is ln(n_bins).
h_max = float(np.log(n_bins))
h_norm = h / h_max if h_max > 0.0 else 0.0
table = RecordTable([
{"quantity": "entropy", "value": h, "unit": "nat"},
{"quantity": "normalised entropy", "value": h_norm, "unit": ""},
{"quantity": "mode", "value": mode, "unit": ""},
{"quantity": "n_bins", "value": n_bins, "unit": ""},
])
emit_table(table)
return (h, h_norm)

View File

@@ -0,0 +1,127 @@
from __future__ import annotations
import numpy as np
from backend.node_registry import register_node
from backend.data_types import DataField
from backend.execution_context import emit_warning
_DEFAULT_KERNEL = "0 -1 0\n-1 5 -1\n0 -1 0"
_MAX_KERNEL_DIM = 51
def _parse_kernel(kernel_str: str) -> np.ndarray | None:
"""Parse a multi-line kernel string into a 2-D float64 array.
Returns *None* and issues a warning via emit_warning if the string is
invalid. The returned array is always at least 1×1 and at most
_MAX_KERNEL_DIM × _MAX_KERNEL_DIM.
"""
lines = [ln for ln in kernel_str.splitlines() if ln.strip()]
if not lines:
emit_warning("Custom Convolution: kernel string is empty. Using identity.")
return None
rows = []
for ln in lines:
try:
row = [float(v) for v in ln.split()]
except ValueError:
emit_warning(
f"Custom Convolution: could not parse kernel row {ln!r}. "
"Input returned unchanged."
)
return None
if not row:
continue
rows.append(row)
if not rows:
emit_warning("Custom Convolution: kernel has no valid rows. Input returned unchanged.")
return None
# All rows must have the same length.
ncols = len(rows[0])
for i, row in enumerate(rows):
if len(row) != ncols:
emit_warning(
f"Custom Convolution: row {i} has {len(row)} values but row 0 has {ncols}. "
"All rows must be the same length. Input returned unchanged."
)
return None
arr = np.array(rows, dtype=np.float64)
if arr.ndim != 2 or arr.size == 0:
emit_warning("Custom Convolution: kernel is empty after parsing. Input returned unchanged.")
return None
nrows, ncols = arr.shape
if nrows > _MAX_KERNEL_DIM or ncols > _MAX_KERNEL_DIM:
emit_warning(
f"Custom Convolution: kernel size {nrows}×{ncols} exceeds maximum "
f"{_MAX_KERNEL_DIM}×{_MAX_KERNEL_DIM}. Input returned unchanged."
)
return None
if not np.all(np.isfinite(arr)):
emit_warning("Custom Convolution: kernel contains non-finite values. Input returned unchanged.")
return None
return arr
@register_node(display_name="Custom Convolution")
class CustomConvolution:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"field": ("DATA_FIELD",),
"kernel": ("STRING", {
"multiline": True,
"default": _DEFAULT_KERNEL,
"placeholder": "kernel rows, space-separated",
}),
"normalize": ("BOOLEAN", {"default": True}),
"boundary": (["reflect", "nearest", "wrap"], {"default": "reflect"}),
}
}
OUTPUTS = (
('DATA_FIELD', 'result'),
)
FUNCTION = "process"
DESCRIPTION = (
"Apply a user-defined convolution kernel. "
"Enter rows of space-separated numbers. "
"Example sharpen: '0 -1 0 / -1 5 -1 / 0 -1 0' (use newlines, not slashes). "
"Equivalent to Gwyddion convolution_filter.c."
)
def process(
self,
field: DataField,
kernel: str,
normalize: bool,
boundary: str,
) -> tuple:
from scipy.ndimage import convolve
kernel_arr = _parse_kernel(kernel)
if kernel_arr is None:
# Fallback: return input unchanged.
return (field.replace(data=field.data.copy()),)
data = np.asarray(field.data, dtype=np.float64)
# scipy.ndimage.convolve boundary mode names match our choices directly.
result = convolve(data, kernel_arr, mode=boundary)
if normalize:
abs_sum = float(np.sum(np.abs(kernel_arr)))
if abs_sum > 0.0:
result = result / abs_sum
return (field.replace(data=result),)

View File

@@ -0,0 +1,86 @@
from __future__ import annotations
import numpy as np
from backend.node_registry import register_node
from backend.data_types import DataField
def _kuwahara_pass(data: np.ndarray) -> np.ndarray:
"""Single pass of the 5x5 Kuwahara filter.
Divides a 5x5 neighbourhood around each pixel into four overlapping 3x3
quadrants (TL, TR, BL, BR), computes the mean and variance of each quadrant,
and replaces the centre pixel with the mean of the quadrant that has the
smallest variance. Boundary pixels are handled by reflecting the image.
"""
# Pad with reflect so every pixel has a full 5x5 neighbourhood.
padded = np.pad(data, pad_width=2, mode="reflect")
rows, cols = data.shape
# For each of the four 3x3 quadrant offsets we need the per-pixel mean and
# variance. The quadrant window positions (relative to the padded array)
# for a centre pixel at (r+2, c+2) are:
# TL : rows r..r+2, cols c..c+2 → offset (0,0) in padded
# TR : rows r..r+2, cols c+2..c+4 → offset (0,2)
# BL : rows r+2..r+4, cols c..c+2 → offset (2,0)
# BR : rows r+2..r+4, cols c+2..c+4 → offset (2,2)
# Each window is 3×3 = 9 pixels.
quadrant_offsets = [(0, 0), (0, 2), (2, 0), (2, 2)]
n = 9 # 3×3 quadrant
# Accumulate sum and sum-of-squares for each quadrant using a simple nested
# index loop over the 3×3 window positions.
quad_sum = np.zeros((4, rows, cols), dtype=np.float64)
quad_sum2 = np.zeros((4, rows, cols), dtype=np.float64)
for qi, (dr0, dc0) in enumerate(quadrant_offsets):
for drow in range(3):
for dcol in range(3):
patch = padded[dr0 + drow: dr0 + drow + rows,
dc0 + dcol: dc0 + dcol + cols]
quad_sum[qi] += patch
quad_sum2[qi] += patch * patch
quad_mean = quad_sum / n
# var = E[x^2] - (E[x])^2, clamped to 0 to avoid floating-point negatives.
quad_var = np.maximum(quad_sum2 / n - quad_mean * quad_mean, 0.0)
# Select the quadrant index with minimum variance for each pixel.
best_qi = np.argmin(quad_var, axis=0) # shape (rows, cols)
# Gather the mean from the winning quadrant.
result = np.choose(best_qi, quad_mean)
return result.astype(np.float64)
@register_node(display_name="Kuwahara Filter")
class KuwaharaFilter:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"field": ("DATA_FIELD",),
"iterations": ("INT", {"default": 1, "min": 1, "max": 20}),
}
}
OUTPUTS = (
('DATA_FIELD', 'filtered'),
)
FUNCTION = "process"
DESCRIPTION = (
"Edge-preserving smoothing using Kuwahara's minimum-variance quadrant method. "
"Unlike Gaussian blur, sharp boundaries are preserved. "
"Equivalent to Gwyddion's Kuwahara filter."
)
def process(self, field: DataField, iterations: int) -> tuple:
data = np.asarray(field.data, dtype=np.float64)
iterations = max(1, int(iterations))
for _ in range(iterations):
data = _kuwahara_pass(data)
return (field.replace(data=data),)

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import numpy as np
from backend.node_registry import register_node
from backend.data_types import DataField
@register_node(display_name="Local Contrast")
class LocalContrast:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"field": ("DATA_FIELD",),
"kernel_size": ("INT", {"default": 10, "min": 2, "max": 100}),
"weight": ("FLOAT", {"default": 0.5, "min": 0.0, "max": 1.0, "step": 0.05}),
}
}
OUTPUTS = (
('DATA_FIELD', 'result'),
)
FUNCTION = "process"
DESCRIPTION = (
"Expand the local dynamic range at each pixel. "
"Reveals fine surface features that are hidden by global contrast range. "
"Equivalent to Gwyddion local_contrast.c."
)
def process(self, field: DataField, kernel_size: int, weight: float) -> tuple:
from scipy.ndimage import minimum_filter, maximum_filter
data = np.asarray(field.data, dtype=np.float64)
kernel_size = max(2, int(kernel_size))
weight = float(np.clip(weight, 0.0, 1.0))
local_min = minimum_filter(data, size=kernel_size, mode="reflect")
local_max = maximum_filter(data, size=kernel_size, mode="reflect")
global_min = float(data.min())
global_max = float(data.max())
local_range = local_max - local_min
eps = np.finfo(np.float64).eps * max(abs(global_max), abs(global_min), 1.0)
# Remap each pixel from its local range to the global range.
# Where local_range is near zero, the pixel is already flat: leave it
# unchanged (enhancement factor = 1).
safe_range = np.where(local_range > eps, local_range, 1.0)
global_span = global_max - global_min
if global_span <= eps:
# Uniform field nothing to enhance.
return (field.replace(data=data.copy()),)
enhancement_factor = global_span / safe_range
# Locally enhanced value: remap v from [local_min, local_max] → [global_min, global_max]
v_enhanced = global_min + enhancement_factor * (data - local_min)
# Blend between original and enhanced.
result = (1.0 - weight) * data + weight * v_enhanced
return (field.replace(data=result),)

View File

@@ -0,0 +1,122 @@
from __future__ import annotations
import numpy as np
from backend.data_types import DataField
from backend.node_registry import register_node
@register_node(display_name="Spot Removal")
class SpotRemoval:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"field": ("DATA_FIELD",),
"method": (["laplace", "mean", "zero"], {"default": "laplace"}),
"max_iter": ("INT", {"default": 100, "min": 1, "max": 2000, "step": 1}),
},
"optional": {
"mask": ("IMAGE", {"label": "defects"}),
},
}
OUTPUTS = (
('DATA_FIELD', 'result'),
)
FUNCTION = "process"
DESCRIPTION = (
"Fill defect pixels (hot pixels, dropouts, scan artifacts) by interpolation. "
"The mask defines defect locations. Laplace method solves the 2D Laplace equation "
"for smooth inpainting. Equivalent to Gwyddion spotremove.c."
)
def process(
self,
field: DataField,
method: str,
max_iter: int,
mask: np.ndarray | None = None,
) -> tuple:
if mask is None:
return (field,)
mask_array = np.asarray(mask)
# Reshape mask to match field shape if it has extra dimensions (e.g. HxWx1)
if mask_array.ndim == 3:
mask_array = mask_array[:, :, 0]
if mask_array.shape != field.data.shape:
raise ValueError(
f"Mask shape {mask_array.shape} does not match field shape {field.data.shape}."
)
defect = mask_array > 0
if not np.any(defect):
return (field,)
data = np.asarray(field.data, dtype=np.float64)
if method == "zero":
result = data.copy()
result[defect] = 0.0
return (field.replace(data=result),)
if method == "mean":
result = _mean_fill(data, defect)
return (field.replace(data=result),)
# method == "laplace"
result = _laplace_fill(data, defect, int(max_iter))
return (field.replace(data=result),)
def _mean_fill(data: np.ndarray, defect: np.ndarray) -> np.ndarray:
"""Fill defect pixels with the mean of non-defect neighbours in a 3x3 window."""
result = data.copy()
yres, xres = data.shape
# Global fallback: mean of all non-defect pixels
non_defect_vals = data[~defect]
global_mean = float(non_defect_vals.mean()) if non_defect_vals.size > 0 else 0.0
defect_coords = np.argwhere(defect)
for y, x in defect_coords:
y0 = max(y - 1, 0)
y1 = min(y + 2, yres)
x0 = max(x - 1, 0)
x1 = min(x + 2, xres)
neighbourhood_data = data[y0:y1, x0:x1]
neighbourhood_defect = defect[y0:y1, x0:x1]
good = neighbourhood_data[~neighbourhood_defect]
if good.size > 0:
result[y, x] = float(good.mean())
else:
result[y, x] = global_mean
return result
def _laplace_fill(data: np.ndarray, defect: np.ndarray, max_iter: int) -> np.ndarray:
"""Iterative Laplace solver: set defect pixels to neighbour average each iteration."""
non_defect_vals = data[~defect]
init_val = float(non_defect_vals.mean()) if non_defect_vals.size > 0 else 0.0
result = data.copy()
result[defect] = init_val
for _ in range(max_iter):
# Compute neighbour averages via rolled arrays
neighbour_sum = (
np.roll(result, -1, axis=0)
+ np.roll(result, 1, axis=0)
+ np.roll(result, -1, axis=1)
+ np.roll(result, 1, axis=1)
)
new_vals = neighbour_sum / 4.0
result[defect] = new_vals[defect]
return result

View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import numpy as np
from backend.data_types import DataField
from backend.node_registry import register_node
@register_node(display_name="Template Match")
class TemplateMatch:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"image": ("DATA_FIELD",),
"template": ("DATA_FIELD",),
"threshold": (
"FLOAT",
{"default": 0.8, "min": 0.0, "max": 1.0, "step": 0.05},
),
}
}
OUTPUTS = (
('DATA_FIELD', 'score'),
('IMAGE', 'detections'),
)
FUNCTION = "process"
DESCRIPTION = (
"Find a template pattern within a larger data field using normalised cross-correlation. "
"The score output shows match quality (1 = perfect match). Detections mask marks positions "
"above the threshold. Equivalent to Gwyddion maskcor.c."
)
def process(
self,
image: DataField,
template: DataField,
threshold: float,
) -> tuple:
from skimage.feature import match_template
score = match_template(image.data, template.data, pad_input=True)
# Clip to [0, 1] for display (match_template returns values in [-1, 1])
score_clipped = np.clip(score, 0.0, 1.0)
detections = (score_clipped >= float(threshold)).astype(np.uint8) * 255
score_field = image.replace(data=score_clipped)
return (score_field, detections)

View File

@@ -0,0 +1,74 @@
from __future__ import annotations
import numpy as np
from backend.data_types import DataField
from backend.node_registry import register_node
@register_node(display_name="Wavelet Denoise")
class WaveletDenoise:
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"field": ("DATA_FIELD",),
"wavelet": (
["db1", "db2", "db4", "db8", "sym4", "coif1", "bior1.3"],
{"default": "db4"},
),
"method": (["BayesShrink", "VisuShrink"], {"default": "BayesShrink"}),
"sigma": (
"FLOAT",
{"default": 0.0, "min": 0.0, "max": 1.0, "step": 0.01},
),
"mode": (["soft", "hard"], {"default": "soft"}),
}
}
OUTPUTS = (
('DATA_FIELD', 'denoised'),
)
FUNCTION = "process"
DESCRIPTION = (
"Denoise using wavelet coefficient thresholding. BayesShrink adapts the threshold "
"per sub-band; VisuShrink uses a global threshold. Equivalent to applying DWT from "
"Gwyddion dwt.c with coefficient thresholding."
)
def process(
self,
field: DataField,
wavelet: str,
method: str,
sigma: float,
mode: str,
) -> tuple:
from skimage.restoration import denoise_wavelet
d = field.data
dmin = float(d.min())
drange = float(np.ptp(d))
if drange == 0:
return (field,)
norm = (d - dmin) / drange
sigma_val = float(sigma) if sigma > 0 else None
# `mode` is a Python builtin name; use threshold_mode locally to avoid shadowing
threshold_mode = mode
denoised_norm = denoise_wavelet(
norm,
wavelet=wavelet,
method=method,
mode=threshold_mode,
sigma=sigma_val,
rescale_sigma=True,
channel_axis=None,
)
result = denoised_norm * drange + dmin
return (field.replace(data=result),)