security improvements
This commit is contained in:
@@ -25,7 +25,7 @@ import hashlib
|
||||
import json
|
||||
import uuid
|
||||
from copy import deepcopy
|
||||
from collections import defaultdict, deque
|
||||
from collections import OrderedDict, defaultdict, deque
|
||||
from math import isfinite
|
||||
from threading import RLock
|
||||
from time import perf_counter
|
||||
@@ -55,9 +55,12 @@ class NodeExecutionError(Exception):
|
||||
class ExecutionEngine:
|
||||
"""Synchronous (blocking) graph executor. Run inside a thread pool from async code."""
|
||||
|
||||
NODE_CACHE_LIMIT = 256
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._node_cache: dict[str, dict[str, Any]] = {}
|
||||
self._node_cache: OrderedDict[str, dict[str, Any]] = OrderedDict()
|
||||
self._cache_lock = RLock()
|
||||
self._cache_warning_emitted = False
|
||||
|
||||
def execute(
|
||||
self,
|
||||
@@ -154,6 +157,7 @@ class ExecutionEngine:
|
||||
input_signature=input_signature,
|
||||
output_signatures=output_signatures,
|
||||
outputs=self._clone_cached_outputs(result),
|
||||
on_warning=on_warning,
|
||||
)
|
||||
|
||||
# Auto-preview: broadcast a thumbnail for any DATA_FIELD,
|
||||
@@ -275,6 +279,8 @@ class ExecutionEngine:
|
||||
return None
|
||||
if entry.get("input_signature") != input_signature:
|
||||
return None
|
||||
# Move to end for LRU ordering
|
||||
self._node_cache.move_to_end(node_id)
|
||||
return entry
|
||||
|
||||
def _store_cache_entry(
|
||||
@@ -285,14 +291,27 @@ class ExecutionEngine:
|
||||
input_signature: str,
|
||||
output_signatures: tuple[str, ...],
|
||||
outputs: tuple,
|
||||
on_warning: Callable[[str, str], None] | None = None,
|
||||
) -> None:
|
||||
with self._cache_lock:
|
||||
if node_id in self._node_cache:
|
||||
self._node_cache.move_to_end(node_id)
|
||||
self._node_cache[node_id] = {
|
||||
"class_name": class_name,
|
||||
"input_signature": input_signature,
|
||||
"output_signatures": output_signatures,
|
||||
"outputs": outputs,
|
||||
}
|
||||
if len(self._node_cache) > self.NODE_CACHE_LIMIT:
|
||||
self._node_cache.popitem(last=False)
|
||||
if not self._cache_warning_emitted and on_warning is not None:
|
||||
self._cache_warning_emitted = True
|
||||
on_warning(
|
||||
node_id,
|
||||
f"Node cache exceeded {self.NODE_CACHE_LIMIT} entries — "
|
||||
"oldest cached results are being evicted. "
|
||||
"Very large workflows may re-compute nodes that would otherwise be cached.",
|
||||
)
|
||||
|
||||
def _build_input_signature(
|
||||
self,
|
||||
|
||||
Reference in New Issue
Block a user