366 lines
14 KiB
Python
366 lines
14 KiB
Python
"""
|
|
Exporter for DATA_FIELD values (single layer or multi-layer stacks).
|
|
|
|
Format choices:
|
|
|
|
* **TIFF** — 8-bit RGB colormap preview. *Not* round-trippable and single-layer
|
|
only; connect multiple channels and pick "TIFF (data)" for a stack.
|
|
* **TIFF (data)** — float64 pixels with tono metadata JSON-embedded in the
|
|
TIFF ImageDescription tag. Round-trips and supports multi-page stacks: one
|
|
IFD per layer, the first page's description carries a ``{"tono": {...},
|
|
"layers": [...]}`` document.
|
|
* **PNG** — 8-bit RGB colormap preview. Single-layer only.
|
|
* **NPZ** — for a single layer, writes a plain ``field=...`` key. For a stack,
|
|
each layer gets its own key derived from its display name (identifier-safe,
|
|
deduplicated).
|
|
* **GWY** — Gwyddion native format via the ``gwyfile`` package. A multi-layer
|
|
save writes one channel per layer (``/0/data``, ``/1/data``, …), each with
|
|
its own title, producing a true multi-channel .gwy file.
|
|
* **HDF5** — generic HDF5 with one ``data`` dataset per layer and physical
|
|
dimensions as dataset attrs. Round-trips via our generic ``hdf5`` importer,
|
|
which picks up every 2-D numeric dataset.
|
|
* **HDF5 (Ergo)** — Asylum Research / Ergo layout, one dataset per layer under
|
|
``Image/DataSet/Resolution 0/Frame 0/<title>/Image`` plus a matching sidecar
|
|
group ``Image/DataSetInfo/Global/Channels/<title>/ImageDims``. Round-trips
|
|
via our ``ergo_hdf5`` importer and opens in Ergo / Igor.
|
|
|
|
Mixed layer stacks (DataField + Image) are supported for TIFF (data) and NPZ
|
|
only; the physics-carrying formats (GWY, HDF5, HDF5 Ergo) require every layer
|
|
to be a DataField and raise a clear error otherwise.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Sequence
|
|
|
|
import numpy as np
|
|
|
|
from backend.data_types import DataField, datafield_to_uint8, image_to_uint8
|
|
from backend.exporters._base import FormatSpec
|
|
|
|
accepted_types: tuple[str, ...] = ("DATA_FIELD",)
|
|
|
|
FORMATS: dict[str, FormatSpec] = {
|
|
"TIFF": FormatSpec(ext=".tiff", round_trip=False, label="TIFF (preview)"),
|
|
"TIFF (data)": FormatSpec(ext=".tiff", round_trip=True, label="TIFF (calibrated data)"),
|
|
"PNG": FormatSpec(ext=".png", round_trip=False, label="PNG (preview)"),
|
|
"NPZ": FormatSpec(ext=".npz", round_trip=False, label="NumPy (.npz)"),
|
|
"GWY": FormatSpec(ext=".gwy", round_trip=True, label="Gwyddion (.gwy)"),
|
|
"HDF5": FormatSpec(ext=".h5", round_trip=True, label="HDF5 (generic)"),
|
|
"HDF5 (Ergo)": FormatSpec(ext=".h5", round_trip=True, label="HDF5 (Asylum Research / Ergo)"),
|
|
}
|
|
|
|
# Formats that only make sense for a single layer. When extra layers are
|
|
# connected, the Save node raises before we get here, but we keep the check
|
|
# defensive so the protocol is enforced at the exporter boundary too.
|
|
_SINGLE_LAYER_ONLY: frozenset[str] = frozenset({"TIFF", "PNG"})
|
|
|
|
|
|
def save(
|
|
path: Path,
|
|
value: DataField,
|
|
format_name: str,
|
|
*,
|
|
extra_layers: Sequence[Any] | None = None,
|
|
layer_names: Sequence[str] | None = None,
|
|
**_opts,
|
|
) -> None:
|
|
extras = list(extra_layers or [])
|
|
layers: list[Any] = [value, *extras]
|
|
names = _resolve_layer_names(layers, layer_names, default_primary=path.stem or "field")
|
|
|
|
if extras and format_name in _SINGLE_LAYER_ONLY:
|
|
raise ValueError(
|
|
f"{format_name} only supports a single layer. Use 'TIFF (data)', "
|
|
f"'NPZ', 'GWY', or an HDF5 format for multi-layer saves."
|
|
)
|
|
|
|
if format_name == "TIFF":
|
|
_save_tiff_preview(path, value)
|
|
return
|
|
if format_name == "TIFF (data)":
|
|
_save_tiff_data(path, layers, names)
|
|
return
|
|
if format_name == "PNG":
|
|
_save_png_preview(path, value)
|
|
return
|
|
if format_name == "NPZ":
|
|
_save_npz(path, layers, names)
|
|
return
|
|
if format_name == "GWY":
|
|
_save_gwy(path, _require_all_datafields(layers, "GWY"), names)
|
|
return
|
|
if format_name == "HDF5":
|
|
_save_hdf5_generic(path, _require_all_datafields(layers, "HDF5"), names)
|
|
return
|
|
if format_name == "HDF5 (Ergo)":
|
|
_save_hdf5_ergo(path, _require_all_datafields(layers, "HDF5 (Ergo)"), names)
|
|
return
|
|
raise ValueError(f"Format {format_name!r} is not supported for DATA_FIELD.")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Layer helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _resolve_layer_names(
|
|
layers: Sequence[Any],
|
|
raw_names: Sequence[str] | None,
|
|
*,
|
|
default_primary: str,
|
|
) -> list[str]:
|
|
"""Fill in layer names, falling back to defaults for blank/missing entries.
|
|
|
|
The primary layer (index 0) defaults to ``default_primary`` (usually the
|
|
file stem), and each extra layer defaults to ``layer_N+1`` (1-indexed for
|
|
humans: "layer 2", "layer 3", …).
|
|
"""
|
|
raw_names = list(raw_names or [])
|
|
out: list[str] = []
|
|
for i in range(len(layers)):
|
|
raw = str(raw_names[i]).strip() if i < len(raw_names) and raw_names[i] is not None else ""
|
|
if raw:
|
|
out.append(raw)
|
|
elif i == 0:
|
|
out.append(default_primary)
|
|
else:
|
|
out.append(f"layer_{i + 1}")
|
|
return out
|
|
|
|
|
|
def _require_all_datafields(layers: Sequence[Any], format_label: str) -> list[DataField]:
|
|
"""Return the list cast to DataFields, raising if any layer is not one."""
|
|
out: list[DataField] = []
|
|
for i, layer in enumerate(layers):
|
|
if not isinstance(layer, DataField):
|
|
raise ValueError(
|
|
f"{format_label} only supports DataField layers; layer {i + 1} "
|
|
f"is a {type(layer).__name__}. Use TIFF (data) or NPZ for mixed stacks."
|
|
)
|
|
out.append(layer)
|
|
return out
|
|
|
|
|
|
def _safe_identifier(name: str, index: int) -> str:
|
|
"""Turn a free-form layer name into a safe identifier (used as an NPZ key)."""
|
|
key = re.sub(r"[^0-9A-Za-z_]+", "_", str(name).strip()).strip("_")
|
|
if not key:
|
|
key = f"layer_{index + 1}"
|
|
if key[0].isdigit():
|
|
key = f"layer_{key}"
|
|
return key
|
|
|
|
|
|
def _dedupe_keys(raw_keys: Sequence[str]) -> list[str]:
|
|
used: set[str] = set()
|
|
result: list[str] = []
|
|
for k in raw_keys:
|
|
candidate = k
|
|
suffix = 2
|
|
while candidate in used:
|
|
candidate = f"{k}_{suffix}"
|
|
suffix += 1
|
|
used.add(candidate)
|
|
result.append(candidate)
|
|
return result
|
|
|
|
|
|
def _layer_to_float_array(layer: Any) -> np.ndarray:
|
|
"""Coerce a layer into a float array for TIFF (data). Images are promoted."""
|
|
if isinstance(layer, DataField):
|
|
return np.ascontiguousarray(layer.data, dtype=np.float64)
|
|
if isinstance(layer, np.ndarray):
|
|
# Images are left as-is so multi-channel RGB pages survive the write.
|
|
return np.ascontiguousarray(layer)
|
|
raise ValueError(f"Unsupported layer type for TIFF (data): {type(layer).__name__}")
|
|
|
|
|
|
def _layer_to_npz_array(layer: Any) -> np.ndarray:
|
|
if isinstance(layer, DataField):
|
|
return np.asarray(layer.data)
|
|
if isinstance(layer, np.ndarray):
|
|
return np.asarray(layer)
|
|
raise ValueError(f"Unsupported layer type for NPZ: {type(layer).__name__}")
|
|
|
|
|
|
def _datafield_meta(field: DataField) -> dict:
|
|
"""Build the JSON-serializable physics metadata dict for a DataField."""
|
|
return {
|
|
"xreal": float(field.xreal),
|
|
"yreal": float(field.yreal),
|
|
"xoff": float(field.xoff),
|
|
"yoff": float(field.yoff),
|
|
"si_unit_xy": str(field.si_unit_xy),
|
|
"si_unit_z": str(field.si_unit_z),
|
|
"domain": str(field.domain),
|
|
"colormap": field.colormap if isinstance(field.colormap, str) else "viridis",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-format writers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _save_tiff_preview(path: Path, field: DataField) -> None:
|
|
import tifffile
|
|
tifffile.imwrite(str(path), datafield_to_uint8(field, field.colormap))
|
|
|
|
|
|
def _save_tiff_data(path: Path, layers: Sequence[Any], names: Sequence[str]) -> None:
|
|
"""Write the raw pixels as a multi-page TIFF with tono metadata.
|
|
|
|
The ImageDescription tag on the first page carries a JSON document of
|
|
shape ``{"tono": {"version": 1, "layers": [{...}, {...}]}}``. Each entry in
|
|
``layers`` gives the per-layer physics (xreal/yreal/xoff/yoff/units/domain)
|
|
and its display name so a future multi-layer importer can reconstruct the
|
|
whole stack. Non-DataField layers (plain images) get a minimal entry with
|
|
just the name and dtype — they're pixels, not physics.
|
|
"""
|
|
import tifffile
|
|
|
|
per_layer_meta: list[dict] = []
|
|
for layer, layer_name in zip(layers, names):
|
|
if isinstance(layer, DataField):
|
|
entry = {"name": layer_name, "kind": "data_field", **_datafield_meta(layer)}
|
|
else:
|
|
arr = np.asarray(layer)
|
|
entry = {"name": layer_name, "kind": "image", "dtype": str(arr.dtype), "shape": list(arr.shape)}
|
|
per_layer_meta.append(entry)
|
|
|
|
description = json.dumps(
|
|
{"tono": {"version": 1, "layers": per_layer_meta}},
|
|
separators=(",", ":"),
|
|
)
|
|
|
|
with tifffile.TiffWriter(str(path)) as tif:
|
|
for i, (layer, layer_name) in enumerate(zip(layers, names)):
|
|
arr = _layer_to_float_array(layer)
|
|
# Full metadata document lives on the first page; subsequent pages
|
|
# carry only their display name so readers that walk IFDs see
|
|
# something meaningful per channel.
|
|
page_desc = description if i == 0 else layer_name
|
|
tif.write(arr, description=page_desc)
|
|
|
|
|
|
def _save_png_preview(path: Path, field: DataField) -> None:
|
|
from PIL import Image
|
|
Image.fromarray(datafield_to_uint8(field, field.colormap)).save(str(path))
|
|
|
|
|
|
def _save_npz(path: Path, layers: Sequence[Any], names: Sequence[str]) -> None:
|
|
if len(layers) == 1:
|
|
# Single-layer: keep the historical `field` key so nothing that reads
|
|
# existing tono .npz outputs breaks.
|
|
np.savez(str(path), field=_layer_to_npz_array(layers[0]))
|
|
return
|
|
raw_keys = [_safe_identifier(name, i) for i, name in enumerate(names)]
|
|
keys = _dedupe_keys(raw_keys)
|
|
arrays = {key: _layer_to_npz_array(layer) for key, layer in zip(keys, layers)}
|
|
np.savez(str(path), **arrays)
|
|
|
|
|
|
def _save_gwy(path: Path, fields: list[DataField], names: Sequence[str]) -> None:
|
|
"""Write an N-channel .gwy file via the gwyfile package."""
|
|
from gwyfile.objects import GwyContainer, GwyDataField, GwySIUnit
|
|
|
|
container_data: dict[str, Any] = {}
|
|
for i, (field, title) in enumerate(zip(fields, names)):
|
|
gwy_field = GwyDataField(
|
|
np.ascontiguousarray(field.data, dtype=np.float64),
|
|
xreal=float(field.xreal),
|
|
yreal=float(field.yreal),
|
|
xoff=float(field.xoff),
|
|
yoff=float(field.yoff),
|
|
si_unit_xy=GwySIUnit(unitstr=str(field.si_unit_xy or "")),
|
|
si_unit_z=GwySIUnit(unitstr=str(field.si_unit_z or "")),
|
|
)
|
|
container_data[f"/{i}/data"] = gwy_field
|
|
container_data[f"/{i}/data/title"] = title
|
|
GwyContainer(container_data).tofile(str(path))
|
|
|
|
|
|
def _save_hdf5_generic(path: Path, fields: list[DataField], names: Sequence[str]) -> None:
|
|
"""Write one HDF5 dataset per layer with physical dims as dataset attrs.
|
|
|
|
Single-layer saves use ``/data`` for backward compatibility with the
|
|
tests that read the original layout; multi-layer saves use one
|
|
top-level dataset per channel, keyed by the safe-identifier form of its
|
|
name and deduplicated against collisions.
|
|
"""
|
|
import h5py
|
|
|
|
with h5py.File(str(path), "w") as f:
|
|
if len(fields) == 1:
|
|
_write_hdf5_dataset(f, "data", fields[0])
|
|
return
|
|
raw_keys = [_safe_identifier(name, i) for i, name in enumerate(names)]
|
|
keys = _dedupe_keys(raw_keys)
|
|
for key, field in zip(keys, fields):
|
|
_write_hdf5_dataset(f, key, field)
|
|
|
|
|
|
def _write_hdf5_dataset(h5file: Any, name: str, field: DataField) -> None:
|
|
arr = np.ascontiguousarray(field.data, dtype=np.float64)
|
|
ds = h5file.create_dataset(name, data=arr)
|
|
ds.attrs["xreal"] = float(field.xreal)
|
|
ds.attrs["yreal"] = float(field.yreal)
|
|
ds.attrs["xoff"] = float(field.xoff)
|
|
ds.attrs["yoff"] = float(field.yoff)
|
|
ds.attrs["si_unit_xy"] = str(field.si_unit_xy or "")
|
|
ds.attrs["si_unit_z"] = str(field.si_unit_z or "")
|
|
|
|
|
|
def _save_hdf5_ergo(path: Path, fields: list[DataField], names: Sequence[str]) -> None:
|
|
"""Write an Asylum Research / Ergo-compatible HDF5 file (N channels).
|
|
|
|
Each channel gets its own dataset at
|
|
``Image/DataSet/Resolution 0/Frame 0/<title>/Image`` with a matching
|
|
sidecar group ``Image/DataSetInfo/Global/Channels/<title>/ImageDims``
|
|
carrying ``DimScaling`` / ``DimUnits`` / ``DataUnits``. The channel
|
|
names are the dedupe-safe form of each layer name. Opens in Ergo / Igor
|
|
and round-trips through :mod:`backend.importers.ergo_hdf5`.
|
|
"""
|
|
import h5py
|
|
|
|
raw_keys = [_safe_identifier(name, i) for i, name in enumerate(names)]
|
|
titles = _dedupe_keys(raw_keys)
|
|
|
|
with h5py.File(str(path), "w") as f:
|
|
for field, title in zip(fields, titles):
|
|
arr = np.ascontiguousarray(field.data, dtype=np.float64)
|
|
ds = f.create_dataset(
|
|
f"Image/DataSet/Resolution 0/Frame 0/{title}/Image",
|
|
data=arr,
|
|
)
|
|
ds.attrs["xreal"] = float(field.xreal)
|
|
ds.attrs["yreal"] = float(field.yreal)
|
|
ds.attrs["xoff"] = float(field.xoff)
|
|
ds.attrs["yoff"] = float(field.yoff)
|
|
xy_unit = str(field.si_unit_xy or "m")
|
|
z_unit = str(field.si_unit_z or "")
|
|
ds.attrs["si_unit_xy"] = xy_unit
|
|
ds.attrs["si_unit_z"] = z_unit
|
|
|
|
x_start = float(field.xoff)
|
|
x_end = float(field.xoff) + float(field.xreal)
|
|
y_start = float(field.yoff)
|
|
y_end = float(field.yoff) + float(field.yreal)
|
|
# DimScaling is Y-first to match the importer (ergo_hdf5.py:110-113).
|
|
dim_scaling = np.array(
|
|
[[y_start, y_end], [x_start, x_end]],
|
|
dtype=np.float64,
|
|
)
|
|
dim_units = np.array([xy_unit, xy_unit], dtype=h5py.string_dtype())
|
|
|
|
dims_grp = f.create_group(
|
|
f"Image/DataSetInfo/Global/Channels/{title}/ImageDims"
|
|
)
|
|
dims_grp.attrs["DimScaling"] = dim_scaling
|
|
dims_grp.attrs["DimUnits"] = dim_units
|
|
dims_grp.attrs["DataUnits"] = z_unit
|