diff --git a/backend/execution.py b/backend/execution.py index 5858da8..935a138 100644 --- a/backend/execution.py +++ b/backend/execution.py @@ -45,6 +45,13 @@ def _is_link(value: Any) -> bool: ) +class NodeExecutionError(Exception): + """Wraps an error that occurred while executing a specific node.""" + def __init__(self, node_id: str, original: Exception): + self.node_id = node_id + super().__init__(str(original)) + + class ExecutionEngine: """Synchronous (blocking) graph executor. Run inside a thread pool from async code.""" @@ -99,12 +106,15 @@ class ExecutionEngine: class_name = node_def["class_type"] if class_name not in NODE_CLASS_MAPPINGS: - raise ValueError(f"Unknown node type: '{class_name}'") + raise NodeExecutionError(node_id, ValueError(f"Unknown node type: '{class_name}'")) cls = NODE_CLASS_MAPPINGS[class_name] raw_inputs = node_def.get("inputs", {}) input_types = cls.INPUT_TYPES() - inputs = self._resolve_inputs(raw_inputs, node_outputs, input_types) + try: + inputs = self._resolve_inputs(raw_inputs, node_outputs, input_types) + except Exception as exc: + raise NodeExecutionError(node_id, exc) from exc input_signature = self._build_input_signature(class_name, raw_inputs, node_output_signatures) cache_entry = self._get_cached_entry(node_id, class_name, input_signature) @@ -118,8 +128,13 @@ class ExecutionEngine: instance = cls() func = getattr(instance, cls.FUNCTION) start_time = perf_counter() - with active_node(node_id): - result = func(**inputs) + try: + with active_node(node_id): + result = func(**inputs) + except NodeExecutionError: + raise + except Exception as exc: + raise NodeExecutionError(node_id, exc) from exc elapsed_ms = (perf_counter() - start_time) * 1000.0 # Nodes must return a tuple; coerce single values just in case diff --git a/backend/server.py b/backend/server.py index b74050e..0c065bd 100644 --- a/backend/server.py +++ b/backend/server.py @@ -116,7 +116,7 @@ def create_app( from backend.plugin_loader import load_plugins load_plugins(plugins_dir()) - from backend.execution import ExecutionEngine, new_prompt_id + from backend.execution import ExecutionEngine, NodeExecutionError, new_prompt_id from backend.node_registry import NODE_CLASS_MAPPINGS, get_all_node_info ensure_runtime_dirs(with_plugins=_plugins_on) @@ -522,6 +522,12 @@ def create_app( ), ) broadcast(session_id, {"type": "execution_complete", "data": {"prompt_id": prompt_id}}) + except NodeExecutionError as exc: + log.exception("Execution error on node %s", exc.node_id) + broadcast(session_id, { + "type": "execution_error", + "data": {"node_id": exc.node_id, "message": str(exc)}, + }) except Exception as exc: log.exception("Execution error") broadcast(session_id, { diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx index 6a1ecf0..bf082e5 100644 --- a/frontend/src/App.jsx +++ b/frontend/src/App.jsx @@ -1300,7 +1300,7 @@ function Flow() { case 'execution_start': setNodes((ns) => ns.map((n) => ({ ...n, - data: { ...n.data, processingTimeMs: null }, + data: { ...n.data, processingTimeMs: null, error: null }, }))); setExecutingNodeId(null); setStatus({ text: 'Running workflow…', level: 'info' }); @@ -1315,7 +1315,12 @@ function Flow() { break; case 'execution_error': setExecutingNodeId(null); - setStatus({ text: 'Error: ' + msg.data.message, level: 'error' }); + if (msg.data.node_id) { + updateNodeData(msg.data.node_id, { error: msg.data.message }); + } + if (!msg.data.node_id) { + setStatus({ text: 'Error: ' + msg.data.message, level: 'error' }); + } console.error('[tono] execution error', msg.data); break; case 'preview': diff --git a/frontend/src/CustomNode.jsx b/frontend/src/CustomNode.jsx index 6f03464..41842df 100644 --- a/frontend/src/CustomNode.jsx +++ b/frontend/src/CustomNode.jsx @@ -1182,7 +1182,7 @@ function CustomNode({ id, data }) { return ( <> {ctx?.executingNodeId === id &&
} -