Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import re
import zmq
import numpy as np
import atexit
import signal

logging.basicConfig(
level=logging.INFO,
format='%(levelname)s - %(message)s'
Expand Down Expand Up @@ -72,6 +75,7 @@ def recv_json_with_retry(self):

# Global ZeroMQ ports registry
zmq_ports = {}
_cleanup_in_progress = False

def init_zmq_port(port_name, port_type, address, socket_type_str):
"""
Expand All @@ -98,12 +102,45 @@ def init_zmq_port(port_name, port_type, address, socket_type_str):
logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")

def terminate_zmq():
for port in zmq_ports.values():
"""Clean up all ZMQ sockets and contexts before exit."""
global _cleanup_in_progress

if _cleanup_in_progress:
return # Already cleaning up, prevent reentrant calls

if not zmq_ports:
return # No ports to clean up

_cleanup_in_progress = True
print("\nCleaning up ZMQ resources...")
for port_name, port in zmq_ports.items():
try:
port.socket.close()
port.context.term()
print(f"Closed ZMQ port: {port_name}")
except Exception as e:
logging.error(f"Error while terminating ZMQ port {port.address}: {e}")
zmq_ports.clear()
_cleanup_in_progress = False

def signal_handler(sig, frame):
"""Handle interrupt signals gracefully."""
print(f"\nReceived signal {sig}, shutting down gracefully...")
# Prevent terminate_zmq from being called twice: once here and once via atexit
try:
atexit.unregister(terminate_zmq)
except Exception:
# If unregister fails for any reason, proceed with explicit cleanup anyway
pass
terminate_zmq()
sys.exit(0)

# Register cleanup handlers
atexit.register(terminate_zmq)
signal.signal(signal.SIGINT, signal_handler) # Handle Ctrl+C
if not hasattr(sys, 'getwindowsversion'):
signal.signal(signal.SIGTERM, signal_handler) # Handle termination (Unix only)
Comment on lines +138 to +142
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registering signal handlers at import time can raise ValueError when concore is imported from a non-main thread, and it also overrides the host application’s signal handling just by importing the module. Consider making this opt-in (e.g., an init/enable_signal_handlers function) or guarding registration so it only happens in the main thread / when running as the top-level process.

Copilot uses AI. Check for mistakes.
Comment on lines +138 to +142
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description is focused on improving concore validate, but this change set also modifies concore.py to add ZMQ cleanup + signal/atexit handling. If intentional, please mention it in the PR description (and why), or split it into a separate PR so the validate changes can be reviewed/merged independently.

Copilot uses AI. Check for mistakes.

# --- ZeroMQ Integration End ---


Expand Down
205 changes: 199 additions & 6 deletions concore_cli/commands/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,134 @@
from bs4 import BeautifulSoup
from rich.panel import Panel
from rich.table import Table
from rich.tree import Tree
import re
from collections import defaultdict, deque
from typing import List, Set, Tuple, Dict
Comment on lines 4 to +8
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports were added here (e.g., Tree, Table, Set). They aren’t referenced anywhere in this module, which will trip linters and adds noise—please remove them or use them as intended.

Suggested change
from rich.table import Table
from rich.tree import Tree
import re
from collections import defaultdict, deque
from typing import List, Set, Tuple, Dict
import re
from collections import defaultdict, deque
from typing import List, Tuple, Dict

Copilot uses AI. Check for mistakes.


def detect_cycles(nodes_dict: Dict[str, str], edges: List[Tuple[str, str]]) -> List[List[str]]:
"""
Detect all cycles in the workflow graph using DFS.

Args:
nodes_dict: Mapping of node IDs to labels
edges: List of (source_id, target_id) tuples

Returns:
List of cycles, where each cycle is a list of node IDs
Comment on lines +13 to +20
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says this function "Detect[s] all cycles", but this DFS implementation only records cycles when it encounters a back-edge and does not reliably enumerate all distinct simple cycles (and can emit duplicates in some graphs). Either adjust the wording/return contract or switch to a proper cycle-enumeration approach (e.g., Johnson’s algorithm) / SCC-based reporting.

Suggested change
Detect all cycles in the workflow graph using DFS.
Args:
nodes_dict: Mapping of node IDs to labels
edges: List of (source_id, target_id) tuples
Returns:
List of cycles, where each cycle is a list of node IDs
Detect cycles in the workflow graph using a DFS-based approach.
Note:
This function records cycles when a back-edge is encountered during DFS.
It is intended for detecting the presence and structure of cycles and may
not enumerate all distinct simple cycles in the graph, and some cycles
may be reported more than once in certain graph topologies.
Args:
nodes_dict: Mapping of node IDs to labels
edges: List of (source_id, target_id) tuples
Returns:
List of detected cycles, where each cycle is a list of node IDs

Copilot uses AI. Check for mistakes.
"""
# Build adjacency list
graph = defaultdict(list)
for source, target in edges:
graph[source].append(target)

cycles = []
visited = set()
rec_stack = set()
path = []

def dfs(node):
visited.add(node)
rec_stack.add(node)
path.append(node)

for neighbor in graph[node]:
if neighbor not in visited:
dfs(neighbor)
elif neighbor in rec_stack:
# Found a cycle
cycle_start = path.index(neighbor)
cycle = path[cycle_start:] + [neighbor]
cycles.append(cycle)

path.pop()
rec_stack.remove(node)

# Run DFS from each unvisited node
for node in nodes_dict.keys():
if node not in visited:
dfs(node)

return cycles


def analyze_control_loop(cycle: List[str], nodes_dict: Dict[str, str]) -> Dict:
"""
Analyze if a cycle represents a valid control loop.

A valid control loop typically has:
- A controller node (contains 'control', 'controller', 'pid', 'mpc')
- A plant/PM node (contains 'pm', 'plant', 'model')
- At least 2 nodes (for feedback)

Returns:
Dict with analysis results
"""
# Get unique nodes in cycle (cycle has duplicate first/last node)
unique_nodes = []
seen = set()
for node_id in cycle:
if node_id not in seen:
unique_nodes.append(node_id)
seen.add(node_id)

node_labels = [nodes_dict.get(node_id, '').lower() for node_id in unique_nodes if node_id in nodes_dict]

# Keywords for different node types
controller_keywords = ['control', 'controller', 'pid', 'mpc', 'observer', 'regulator']
plant_keywords = ['pm', 'plant', 'model', 'physio', 'cardiac', 'neural']

has_controller = any(any(keyword in label for keyword in controller_keywords) for label in node_labels)
has_plant = any(any(keyword in label for keyword in plant_keywords) for label in node_labels)

cycle_length = len(unique_nodes)

analysis = {
'is_valid_control_loop': has_controller and has_plant and cycle_length >= 2,
'has_controller': has_controller,
'has_plant': has_plant,
'length': cycle_length,
'nodes': [nodes_dict.get(nid, nid) for nid in unique_nodes]
}

return analysis


def check_graph_connectivity(nodes_dict: Dict[str, str], edges: List[Tuple[str, str]]) -> Tuple[bool, List[str]]:
"""
Check if all nodes are reachable in the graph.

Returns:
(is_fully_connected, list_of_unreachable_nodes)
"""
if not nodes_dict:
return True, []

# Build adjacency list (undirected for connectivity check)
graph = defaultdict(set)
for source, target in edges:
graph[source].add(target)
graph[target].add(source)
Comment on lines +109 to +113
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this is described as a reachability check, the implementation explicitly treats edges as undirected (adds both source->target and target->source). That makes "unreachable"/"reachable" wording misleading for directed workflows; consider renaming/rewording to "disconnected" (weak connectivity) or implement directed reachability from a defined entrypoint.

Copilot uses AI. Check for mistakes.

# BFS from first node
start_node = next(iter(nodes_dict.keys()))
visited = set()
queue = deque([start_node])

while queue:
node = queue.popleft()
if node in visited:
continue
visited.add(node)
for neighbor in graph[node]:
if neighbor not in visited:
queue.append(neighbor)

unreachable = [nodes_dict[nid] for nid in nodes_dict.keys() if nid not in visited]

return len(unreachable) == 0, unreachable


def validate_workflow(workflow_file, console):
workflow_path = Path(workflow_file)
Expand Down Expand Up @@ -104,6 +231,72 @@ def validate_workflow(workflow_file, console):
if file_edges > 0:
info.append(f"File-based edges: {file_edges}")

# NEW: Advanced graph analysis
# Build edge list for cycle detection
edge_list = []
for edge in edges:
source = edge.get('source')
target = edge.get('target')
if source and target and source in node_ids and target in node_ids:
edge_list.append((source, target))

# Build node dictionary (id -> label)
node_id_to_label = {}
for node in nodes:
node_id = node.get('id')
if node_id:
label_tag = node.find('y:NodeLabel')
if label_tag and label_tag.text:
node_id_to_label[node_id] = label_tag.text.strip()
else:
node_id_to_label[node_id] = node_id

# Check connectivity
is_connected, unreachable = check_graph_connectivity(node_id_to_label, edge_list)
if not is_connected:
for node_label in unreachable:
warnings.append(f"Unreachable node: {node_label}")
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These warnings say "Unreachable node", but the connectivity check is weak/undirected from an arbitrary node, so nodes in other components are really "disconnected" rather than unreachable in the directed sense. Rewording the warning (or changing the connectivity algorithm) would avoid confusing users.

Suggested change
warnings.append(f"Unreachable node: {node_label}")
warnings.append(f"Disconnected node (not connected to main component): {node_label}")

Copilot uses AI. Check for mistakes.

# Detect cycles
cycles = detect_cycles(node_id_to_label, edge_list)

if cycles:
info.append(f"Found {len(cycles)} cycle(s) in workflow")

# Analyze each cycle
control_loops = []
other_cycles = []

for cycle in cycles:
analysis = analyze_control_loop(cycle, node_id_to_label)
if analysis['is_valid_control_loop']:
control_loops.append(analysis)
else:
other_cycles.append(analysis)

if control_loops:
info.append(f"Valid control loops: {len(control_loops)}")
console.print()
console.print("[green]Control Loops Detected:[/green]")
for i, loop in enumerate(control_loops, 1):
console.print(f" [green]Loop {i}:[/green] {' -> '.join(loop['nodes'])} -> [cycle]")

if other_cycles:
warnings.append(f"Non-standard cycles detected: {len(other_cycles)}")
console.print()
console.print("[yellow]! Non-Standard Cycles:[/yellow]")
for i, cycle_info in enumerate(other_cycles, 1):
cycle_desc = ' -> '.join(cycle_info['nodes'])
console.print(f" [yellow]Cycle {i}:[/yellow] {cycle_desc} -> [cycle]")

if not cycle_info['has_controller']:
console.print(f" [dim]Missing controller node[/dim]")
if not cycle_info['has_plant']:
console.print(f" [dim]Missing plant/PM node[/dim]")
else:
info.append("No cycles detected (DAG workflow)")
warnings.append("Workflow has no feedback loops - not a control system")

show_results(console, errors, warnings, info)

except FileNotFoundError:
Expand All @@ -113,27 +306,27 @@ def validate_workflow(workflow_file, console):

def show_results(console, errors, warnings, info):
if errors:
console.print("[red] Validation failed[/red]\n")
console.print("[red]X Validation failed[/red]\n")
for error in errors:
console.print(f" [red][/red] {error}")
console.print(f" [red]X[/red] {error}")
else:
Comment on lines 308 to 312
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This switches validation output symbols to ASCII (X/OK/!/i), but other CLI commands consistently use Rich/unicode symbols (e.g., run.py/stop.py use ✓/✗/⚠). Unless there’s a specific compatibility requirement, this makes CLI output inconsistent; consider keeping the same symbols across commands or centralizing the style choice.

Copilot uses AI. Check for mistakes.
console.print("[green] Validation passed[/green]\n")
console.print("[green]OK Validation passed[/green]\n")

if warnings:
console.print()
for warning in warnings:
console.print(f" [yellow][/yellow] {warning}")
console.print(f" [yellow]![/yellow] {warning}")

if info:
console.print()
for item in info:
console.print(f" [blue][/blue] {item}")
console.print(f" [blue]i[/blue] {item}")

console.print()

if not errors:
console.print(Panel.fit(
"[green][/green] Workflow is valid and ready to run",
"[green]OK[/green] Workflow is valid and ready to run",
border_style="green"
))
else:
Expand Down
Loading
Loading