< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Workflows.GraphWorkflowRunner
Assembly: NexusLabs.Needlr.AgentFramework.Workflows
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/GraphWorkflowRunner.cs
Line coverage
91%
Covered lines: 536
Uncovered lines: 51
Coverable lines: 587
Total lines: 820
Line coverage: 91.3%
Branch coverage
67%
Covered branches: 183
Total branches: 270
Branch coverage: 67.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
RunGraphAsync()100%22100%
RunWaitAllWithDiagnosticsAsync()55.76%14310484.65%
RunWithNeedlrExecutorAsync()81.66%606098.92%
<RunWithNeedlrExecutorAsync()81.81%666697.54%
IsNodeRequiredByAllIncomingEdges(...)100%88100%
IsOptionalEdge(...)50%2266.66%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/GraphWorkflowRunner.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using System.Diagnostics;
 3using System.Text;
 4
 5using Microsoft.Agents.AI;
 6using Microsoft.Agents.AI.Workflows;
 7using Microsoft.Extensions.AI;
 8
 9using NexusLabs.Needlr.AgentFramework;
 10using NexusLabs.Needlr.AgentFramework.Diagnostics;
 11using NexusLabs.Needlr.AgentFramework.Iterative;
 12
 13using ProgressEvents = NexusLabs.Needlr.AgentFramework.Progress;
 14
 15namespace NexusLabs.Needlr.AgentFramework.Workflows;
 16
 17/// <summary>
 18/// Executes DAG/graph workflows using either MAF's native BSP engine or the
 19/// Needlr-native executor, depending on declared topology.
 20/// </summary>
 21/// <remarks>
 22/// All dependencies are resolved via DI — no reflection into private fields.
 23/// </remarks>
 24internal sealed class GraphWorkflowRunner : IGraphWorkflowRunner
 25{
 26    private readonly IWorkflowFactory _workflowFactory;
 27    private readonly IAgentFactory _agentFactory;
 28    private readonly IChatClientAccessor _chatClientAccessor;
 29    private readonly IAgentDiagnosticsAccessor? _diagnosticsAccessor;
 30    private readonly GraphTopologyProvider _topologyProvider;
 31    private readonly GraphEdgeRouter _edgeRouter;
 32
 3733    public GraphWorkflowRunner(
 3734        IWorkflowFactory workflowFactory,
 3735        IAgentFactory agentFactory,
 3736        IChatClientAccessor chatClientAccessor,
 3737        GraphTopologyProvider topologyProvider,
 3738        GraphEdgeRouter edgeRouter,
 3739        IAgentDiagnosticsAccessor? diagnosticsAccessor = null)
 40    {
 3741        _workflowFactory = workflowFactory;
 3742        _agentFactory = agentFactory;
 3743        _chatClientAccessor = chatClientAccessor;
 3744        _topologyProvider = topologyProvider;
 3745        _edgeRouter = edgeRouter;
 3746        _diagnosticsAccessor = diagnosticsAccessor;
 3747    }
 48
 49    public async Task<IDagRunResult> RunGraphAsync(
 50        string graphName,
 51        string input,
 52        ProgressEvents.IProgressReporter? progress = null,
 53        CancellationToken cancellationToken = default)
 54    {
 3155        ArgumentException.ThrowIfNullOrWhiteSpace(graphName);
 3156        ArgumentException.ThrowIfNullOrWhiteSpace(input);
 57
 3158        var topology = _topologyProvider.GetTopology(graphName);
 59
 3160        if (!topology.RequiresNeedlrExecutor)
 61        {
 162            return await RunWaitAllWithDiagnosticsAsync(
 163                topology, graphName, input, progress, cancellationToken);
 64        }
 65
 3066        return await RunWithNeedlrExecutorAsync(
 3067            topology, graphName, input, progress, cancellationToken);
 3168    }
 69
 70    private async Task<IDagRunResult> RunWaitAllWithDiagnosticsAsync(
 71        GraphTopology topology,
 72        string graphName,
 73        string input,
 74        ProgressEvents.IProgressReporter? progress,
 75        CancellationToken cancellationToken)
 76    {
 177        var workflow = _workflowFactory.CreateGraphWorkflow(graphName);
 178        var dagStart = Stopwatch.GetTimestamp();
 79
 180        var responses = new Dictionary<string, StringBuilder>();
 181        var invocationTimestamps = new List<(string ExecutorId, DateTimeOffset At)>();
 182        bool succeeded = true;
 183        string? errorMessage = null;
 184        Exception? caughtException = null;
 85
 186        var collector = _diagnosticsAccessor?.CompletionCollector;
 187        var toolCollector = _diagnosticsAccessor?.ToolCallCollector;
 188        collector?.DrainCompletions();
 189        toolCollector?.DrainToolCalls();
 90
 191        progress?.Report(new ProgressEvents.WorkflowStartedEvent(
 192            DateTimeOffset.UtcNow,
 193            progress.WorkflowId,
 194            progress.AgentId,
 195            null,
 196            progress.Depth,
 197            progress.NextSequence()));
 98
 99        try
 100        {
 1101            IDisposable? captureScope = _diagnosticsAccessor?.BeginCapture();
 102            try
 103            {
 1104                await using var run = await InProcessExecution.RunStreamingAsync(
 1105                    workflow,
 1106                    new ChatMessage(ChatRole.User, input),
 1107                    cancellationToken: cancellationToken);
 108
 1109                await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
 110
 1111                await using var budgetReg = cancellationToken.CanBeCanceled
 0112                    ? cancellationToken.Register(() => _ = run.CancelRunAsync())
 1113                    : default(CancellationTokenRegistration?);
 114
 16115                await foreach (var evt in run.WatchStreamAsync(cancellationToken))
 116                {
 7117                    if (evt is ExecutorInvokedEvent invoked)
 118                    {
 2119                        var id = invoked.ExecutorId ?? "unknown";
 2120                        invocationTimestamps.Add((id, DateTimeOffset.UtcNow));
 121
 2122                        progress?.Report(new ProgressEvents.AgentInvokedEvent(
 2123                            DateTimeOffset.UtcNow,
 2124                            progress.WorkflowId,
 2125                            id,
 2126                            null,
 2127                            progress.Depth + 1,
 2128                            progress.NextSequence(),
 2129                            AgentName: id,
 2130                            GraphName: graphName,
 2131                            NodeId: id));
 0132                        continue;
 133                    }
 134
 5135                    if (evt is ExecutorFailedEvent executorFailed)
 136                    {
 1137                        succeeded = false;
 1138                        errorMessage = executorFailed.Data?.Message;
 1139                        var failedId = executorFailed.ExecutorId ?? "unknown";
 140
 1141                        progress?.Report(new ProgressEvents.AgentFailedEvent(
 1142                            DateTimeOffset.UtcNow,
 1143                            progress.WorkflowId,
 1144                            failedId,
 1145                            null,
 1146                            progress.Depth + 1,
 1147                            progress.NextSequence(),
 1148                            AgentName: failedId,
 1149                            ErrorMessage: executorFailed.Data?.Message ?? "unknown error"));
 0150                        continue;
 151                    }
 152
 4153                    if (evt is WorkflowErrorEvent workflowError)
 154                    {
 1155                        succeeded = false;
 1156                        errorMessage = workflowError.Exception?.Message;
 1157                        continue;
 158                    }
 159
 3160                    if (evt is not AgentResponseUpdateEvent update
 3161                        || update.ExecutorId is null
 3162                        || update.Data is null)
 163                    {
 164                        continue;
 165                    }
 166
 0167                    var text = update.Data.ToString();
 0168                    if (string.IsNullOrEmpty(text))
 169                    {
 170                        continue;
 171                    }
 172
 0173                    if (!responses.TryGetValue(update.ExecutorId, out var sb))
 174                    {
 0175                        responses[update.ExecutorId] = sb = new StringBuilder();
 176                    }
 177
 0178                    sb.Append(text);
 179
 0180                    progress?.Report(new ProgressEvents.AgentResponseChunkEvent(
 0181                        DateTimeOffset.UtcNow,
 0182                        progress.WorkflowId,
 0183                        update.ExecutorId,
 0184                        null,
 0185                        progress.Depth + 1,
 0186                        progress.NextSequence(),
 0187                        AgentName: update.ExecutorId,
 0188                        Text: text));
 189                }
 1190            }
 191            finally
 192            {
 1193                captureScope?.Dispose();
 194            }
 1195        }
 0196        catch (Exception ex)
 197        {
 0198            succeeded = false;
 0199            errorMessage = ex.Message;
 0200            caughtException = ex;
 0201        }
 202
 1203        cancellationToken.ThrowIfCancellationRequested();
 204
 1205        var totalDuration = Stopwatch.GetElapsedTime(dagStart);
 206
 1207        var allCompletions = collector?.DrainCompletions()
 0208            ?.OrderBy(c => c.StartedAt).ToList()
 1209            ?? [];
 1210        var allToolCalls = toolCollector?.DrainToolCalls()
 0211            ?.OrderBy(t => t.StartedAt).ToList()
 1212            ?? [];
 213
 3214        var invokedIds = invocationTimestamps.Select(inv => inv.ExecutorId).ToHashSet();
 1215        var respondedIds = responses.Keys.ToHashSet();
 1216        var agentIds = invokedIds.Union(respondedIds).Distinct().ToList();
 217
 218        // Build a mapping from agent IDs (executor IDs from MAF) to their
 219        // corresponding Type for namespace-safe edge lookups.
 1220        var agentIdToType = new Dictionary<string, Type>(StringComparer.Ordinal);
 4221        foreach (var id in agentIds)
 222        {
 1223            var matchedType = topology.AllTypes.FirstOrDefault(t =>
 2224                id.Equals(t.Name, StringComparison.Ordinal) ||
 2225                id.StartsWith(t.Name + "_", StringComparison.Ordinal));
 1226            if (matchedType is not null)
 227            {
 1228                agentIdToType[id] = matchedType;
 229            }
 230        }
 231
 1232        var completionsByAgent = new Dictionary<string, List<ChatCompletionDiagnostics>>();
 1233        var toolCallsByAgent = new Dictionary<string, List<ToolCallDiagnostics>>();
 4234        foreach (var id in agentIds)
 235        {
 1236            completionsByAgent[id] = [];
 1237            toolCallsByAgent[id] = [];
 238        }
 239
 2240        foreach (var c in allCompletions)
 241        {
 0242            var matched = agentIds.FirstOrDefault(id =>
 0243                c.AgentName is not null &&
 0244                (id.Equals(c.AgentName, StringComparison.Ordinal) ||
 0245                 id.StartsWith(c.AgentName + "_", StringComparison.Ordinal)));
 0246            if (matched is not null)
 247            {
 0248                completionsByAgent[matched].Add(c);
 249            }
 250        }
 251
 2252        foreach (var tc in allToolCalls)
 253        {
 0254            var matched = agentIds.FirstOrDefault(id =>
 0255                tc.AgentName is not null &&
 0256                (id.Equals(tc.AgentName, StringComparison.Ordinal) ||
 0257                 id.StartsWith(tc.AgentName + "_", StringComparison.Ordinal)));
 0258            if (matched is not null)
 259            {
 0260                toolCallsByAgent[matched].Add(tc);
 261            }
 262        }
 263
 1264        var nodeResults = new Dictionary<string, IDagNodeResult>();
 1265        var stages = new List<IAgentStageResult>();
 1266        var branchResults = new Dictionary<string, IReadOnlyList<IAgentStageResult>>();
 267
 4268        foreach (var agentId in agentIds)
 269        {
 1270            var responseText = responses.TryGetValue(agentId, out var respSb)
 1271                ? respSb.ToString()
 1272                : string.Empty;
 273
 1274            ChatResponse? finalResponse = !string.IsNullOrEmpty(responseText)
 1275                ? new ChatResponse(new ChatMessage(ChatRole.Assistant, responseText))
 1276                : null;
 277
 1278            var agentCompletions = completionsByAgent.GetValueOrDefault(agentId, []);
 1279            var agentToolCalls = toolCallsByAgent.GetValueOrDefault(agentId, []);
 280
 281            TimeSpan nodeDuration;
 282            DateTimeOffset nodeStartedAt;
 1283            if (agentCompletions.Count > 0)
 284            {
 0285                nodeStartedAt = agentCompletions[0].StartedAt;
 0286                nodeDuration = agentCompletions[^1].CompletedAt - nodeStartedAt;
 287            }
 288            else
 289            {
 1290                var invTs = invocationTimestamps
 2291                    .FirstOrDefault(x => x.ExecutorId == agentId).At;
 1292                nodeStartedAt = invTs != default ? invTs : DateTimeOffset.UtcNow;
 1293                nodeDuration = totalDuration / Math.Max(agentIds.Count, 1);
 294            }
 295
 1296            var dagStartTime = DateTimeOffset.UtcNow - totalDuration;
 1297            var startOffset = nodeStartedAt - dagStartTime;
 1298            if (startOffset < TimeSpan.Zero)
 299            {
 0300                startOffset = TimeSpan.Zero;
 301            }
 302
 1303            var tokenUsage = new TokenUsage(
 0304                InputTokens: agentCompletions.Sum(c => c.Tokens.InputTokens),
 0305                OutputTokens: agentCompletions.Sum(c => c.Tokens.OutputTokens),
 0306                TotalTokens: agentCompletions.Sum(c => c.Tokens.TotalTokens),
 0307                CachedInputTokens: agentCompletions.Sum(c => c.Tokens.CachedInputTokens),
 1308                ReasoningTokens: agentCompletions.Sum(c => c.Tokens.ReasoningTokens));
 309
 1310            IAgentRunDiagnostics diag = new AgentRunDiagnostics(
 1311                AgentName: agentId,
 1312                TotalDuration: nodeDuration,
 1313                AggregateTokenUsage: tokenUsage,
 1314                ChatCompletions: agentCompletions,
 1315                ToolCalls: agentToolCalls,
 1316                TotalInputMessages: 0,
 1317                TotalOutputMessages: 0,
 1318                InputMessages: [],
 1319                OutputResponse: null,
 1320                Succeeded: true,
 1321                ErrorMessage: null,
 1322                StartedAt: nodeStartedAt,
 1323                CompletedAt: nodeStartedAt + nodeDuration);
 324
 325            // Resolve Type-based edges to FullName strings for the public interface.
 1326            var resolvedType = agentIdToType.GetValueOrDefault(agentId);
 1327            var inEdges = resolvedType is not null
 1328                ? topology.InboundEdges.GetValueOrDefault(resolvedType, [])
 0329                    .Select(t => t.FullName ?? t.Name).ToList()
 1330                : (IReadOnlyList<string>)[];
 1331            var outEdges = resolvedType is not null
 1332                ? topology.OutboundEdges.GetValueOrDefault(resolvedType, [])
 2333                    .Select(t => t.FullName ?? t.Name).ToList()
 1334                : (IReadOnlyList<string>)[];
 335
 1336            var nodeResult = new DagNodeResult(
 1337                nodeId: agentId,
 1338                agentName: agentId,
 1339                kind: NodeKind.Agent,
 1340                diagnostics: diag,
 1341                finalResponse: finalResponse,
 1342                inboundEdges: inEdges,
 1343                outboundEdges: outEdges,
 1344                startOffset: startOffset,
 1345                duration: nodeDuration);
 1346            nodeResults[agentId] = nodeResult;
 347
 1348            var stageResult = new AgentStageResult(agentId, finalResponse, diag);
 1349            stages.Add(stageResult);
 350
 1351            progress?.Report(new ProgressEvents.AgentCompletedEvent(
 1352                DateTimeOffset.UtcNow,
 1353                progress.WorkflowId,
 1354                agentId,
 1355                null,
 1356                progress.Depth + 1,
 1357                progress.NextSequence(),
 1358                AgentName: agentId,
 1359                Duration: nodeDuration,
 1360                TotalTokens: tokenUsage.TotalTokens,
 1361                InputTokens: tokenUsage.InputTokens,
 1362                OutputTokens: tokenUsage.OutputTokens));
 363        }
 364
 1365        var branchIndex = 0;
 1366        var nodesByInbound = stages
 1367            .Where(s => nodeResults.ContainsKey(s.AgentName))
 1368            .GroupBy(s => string.Join(",", nodeResults[s.AgentName].InboundEdges))
 2369            .Where(g => g.Count() > 1);
 2370        foreach (var group in nodesByInbound)
 371        {
 0372            branchResults[$"branch-{branchIndex++}"] = group.ToList();
 373        }
 374
 1375        progress?.Report(new ProgressEvents.WorkflowCompletedEvent(
 1376            DateTimeOffset.UtcNow,
 1377            progress.WorkflowId,
 1378            progress.AgentId,
 1379            null,
 1380            progress.Depth,
 1381            progress.NextSequence(),
 1382            Succeeded: succeeded,
 1383            ErrorMessage: errorMessage,
 1384            TotalDuration: totalDuration));
 385
 1386        return new DagRunResult(
 1387            stages: stages,
 1388            nodeResults: nodeResults,
 1389            branchResults: branchResults,
 1390            totalDuration: totalDuration,
 1391            succeeded: succeeded,
 1392            errorMessage: errorMessage,
 1393            exception: caughtException);
 1394    }
 395
 396    private async Task<IDagRunResult> RunWithNeedlrExecutorAsync(
 397        GraphTopology topology,
 398        string graphName,
 399        string input,
 400        ProgressEvents.IProgressReporter? progress,
 401        CancellationToken cancellationToken)
 402    {
 30403        if (topology.EntryType is null)
 404        {
 0405            throw new InvalidOperationException(
 0406                $"Cannot run graph workflow '{graphName}': no entry point found.");
 407        }
 408
 30409        var agents = new Dictionary<Type, AIAgent>();
 282410        foreach (var type in topology.AllTypes)
 411        {
 111412            agents[type] = _agentFactory.CreateAgent(type.FullName ?? type.Name);
 413        }
 414
 30415        var completionSources = new Dictionary<Type, TaskCompletionSource<string>>();
 282416        foreach (var type in topology.AllTypes)
 417        {
 111418            completionSources[type] = new TaskCompletionSource<string>();
 419        }
 420
 30421        var skippedNodes = new ConcurrentDictionary<Type, bool>();
 30422        var dagStart = Stopwatch.GetTimestamp();
 30423        var nodeTimings = new ConcurrentDictionary<Type, (TimeSpan StartOffset, TimeSpan Duration)>();
 30424        var nodeDiagnostics = new ConcurrentDictionary<Type, IAgentRunDiagnostics?>();
 30425        var nodeExceptions = new ConcurrentDictionary<Type, Exception>();
 426
 30427        progress?.Report(new ProgressEvents.WorkflowStartedEvent(
 30428            DateTimeOffset.UtcNow,
 30429            progress?.WorkflowId ?? string.Empty,
 30430            progress?.AgentId,
 30431            null,
 30432            progress?.Depth ?? 0,
 30433            progress?.NextSequence() ?? 0));
 434
 30435        var routingChatClient = _chatClientAccessor.ChatClient;
 30436        var nodeTasks = new List<Task>();
 437
 438        // Create a linked CTS for each WaitAny join node so that remaining
 439        // branches can be cancelled once the first valid result arrives.
 30440        var waitAnyCtsMap = new ConcurrentDictionary<Type, CancellationTokenSource>();
 282441        foreach (var type in topology.AllTypes)
 442        {
 111443            if (topology.JoinModes.GetValueOrDefault(type, GraphJoinMode.WaitAll) == GraphJoinMode.WaitAny)
 444            {
 19445                waitAnyCtsMap[type] = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 446            }
 447        }
 448
 449        // Pre-compute the effective cancellation token for each node.
 450        // Nodes that are dependencies of a WaitAny join use the linked token
 451        // so they can be cancelled when the winning branch completes.
 30452        var nodeEffectiveTokens = new Dictionary<Type, CancellationToken>();
 282453        foreach (var type in topology.AllTypes)
 454        {
 111455            CancellationToken effectiveToken = cancellationToken;
 344456            foreach (var (waitAnyType, cts) in waitAnyCtsMap)
 457            {
 78458                var waitAnyDeps = topology.IncomingTypes.GetValueOrDefault(waitAnyType, []);
 78459                if (waitAnyDeps.Contains(type))
 460                {
 34461                    effectiveToken = cts.Token;
 34462                    break;
 463                }
 464            }
 465
 111466            nodeEffectiveTokens[type] = effectiveToken;
 467        }
 468
 282469        foreach (var type in topology.AllTypes)
 470        {
 111471            var nodeType = type;
 111472            var deps = topology.IncomingTypes.GetValueOrDefault(nodeType, []);
 111473            var joinMode = topology.JoinModes.GetValueOrDefault(nodeType, GraphJoinMode.WaitAll);
 474
 111475            nodeTasks.Add(Task.Run(async () =>
 111476            {
 111477                try
 111478                {
 111479                    string nodeInput;
 107480                    if (nodeType == topology.EntryType)
 111481                    {
 29482                        nodeInput = input;
 111483                    }
 78484                    else if (deps.Count == 0)
 111485                    {
 0486                        nodeInput = input;
 111487                    }
 111488                    else
 111489                    {
 78490                        if (joinMode == GraphJoinMode.WaitAny)
 111491                        {
 18492                            var taskToDepType = new Dictionary<Task<string>, Type>();
 100493                            foreach (var dep in deps)
 111494                            {
 32495                                taskToDepType[completionSources[dep].Task] = dep;
 111496                            }
 111497
 18498                            var remaining = new HashSet<Task<string>>(taskToDepType.Keys);
 18499                            nodeInput = input;
 111500
 25501                            while (remaining.Count > 0)
 111502                            {
 25503                                var first = await Task.WhenAny(remaining).WaitAsync(cancellationToken);
 25504                                remaining.Remove(first);
 111505
 25506                                var depType = taskToDepType[first];
 25507                                var result = await first;
 111508
 23509                                if (!skippedNodes.ContainsKey(depType) && !string.IsNullOrWhiteSpace(result))
 111510                                {
 16511                                    nodeInput = result;
 111512
 111513                                    // Cancel remaining branches for this WaitAny scope.
 16514                                    if (waitAnyCtsMap.TryGetValue(nodeType, out var waitAnyCts))
 111515                                    {
 16516                                        waitAnyCts.Cancel();
 111517                                    }
 111518
 16519                                    break;
 111520                                }
 7521                            }
 16522                        }
 111523                        else
 111524                        {
 60525                            var pendingResults = new List<string>();
 245526                            foreach (var dep in deps)
 111527                            {
 65528                                if (skippedNodes.ContainsKey(dep))
 111529                                    continue;
 111530
 111531                                try
 111532                                {
 65533                                    var depResult = await completionSources[dep].Task.WaitAsync(cancellationToken);
 60534                                    if (!string.IsNullOrEmpty(depResult))
 60535                                        pendingResults.Add(depResult);
 60536                                }
 5537                                catch when (IsOptionalEdge(dep, nodeType, topology))
 111538                                {
 111539                                    // Optional upstream failed — treat as degraded.
 0540                                }
 60541                            }
 111542
 55543                            if (pendingResults.Count >= 2 && topology.ReducerFunc is not null)
 111544                            {
 5545                                var reducerStart = Stopwatch.GetTimestamp();
 5546                                nodeInput = topology.ReducerFunc(pendingResults);
 4547                                var reducerDuration = Stopwatch.GetElapsedTime(reducerStart);
 111548
 4549                                progress?.Report(new ProgressEvents.ReducerNodeInvokedEvent(
 4550                                    DateTimeOffset.UtcNow,
 4551                                    progress.WorkflowId,
 4552                                    progress.AgentId,
 4553                                    null,
 4554                                    progress.Depth + 1,
 4555                                    progress.NextSequence(),
 4556                                    NodeId: topology.ReducerType?.FullName ?? topology.ReducerType?.Name ?? "reducer",
 4557                                    GraphName: graphName,
 4558                                    BranchId: null,
 4559                                    InputBranchCount: pendingResults.Count,
 4560                                    Duration: reducerDuration));
 111561                            }
 50562                            else if (pendingResults.Count > 0)
 111563                            {
 50564                                nodeInput = string.Join("\n\n---\n\n", pendingResults);
 111565                            }
 111566                            else
 111567                            {
 0568                                nodeInput = input;
 111569                            }
 54570                        }
 111571                    }
 111572
 99573                    if (skippedNodes.ContainsKey(nodeType))
 111574                    {
 12575                        return;
 111576                    }
 111577
 87578                    var agent = agents[nodeType];
 87579                    var agentName = agent.Name ?? nodeType.Name;
 111580
 87581                    progress?.Report(new ProgressEvents.AgentInvokedEvent(
 87582                        DateTimeOffset.UtcNow,
 87583                        progress.WorkflowId,
 87584                        agentName,
 87585                        null,
 87586                        progress.Depth + 1,
 87587                        progress.NextSequence(),
 87588                        AgentName: agentName,
 87589                        GraphName: graphName,
 87590                        NodeId: nodeType.FullName ?? nodeType.Name));
 111591
 87592                    var nodeStart = Stopwatch.GetTimestamp();
 87593                    using var diagnosticsBuilder = AgentRunDiagnosticsBuilder.StartNew(agentName);
 87594                    var nodeToken = nodeEffectiveTokens[nodeType];
 87595                    var response = await agent.RunAsync(nodeInput, cancellationToken: nodeToken);
 81596                    var nodeElapsed = Stopwatch.GetElapsedTime(nodeStart);
 81597                    var startOffset = Stopwatch.GetElapsedTime(dagStart, nodeStart);
 111598
 81599                    var diag = diagnosticsBuilder.Build();
 81600                    nodeDiagnostics[nodeType] = diag;
 81601                    nodeTimings[nodeType] = (startOffset, nodeElapsed);
 111602
 81603                    var text = string.Join("\n", response.Messages
 81604                        .Where(m => !string.IsNullOrEmpty(m.Text))
 162605                        .Select(m => m.Text));
 111606
 81607                    if (topology.OutgoingEdgesBySource.TryGetValue(nodeType, out var outEdges) && outEdges.Count > 0)
 111608                    {
 59609                        var conditionInput = !string.IsNullOrWhiteSpace(text) ? text : nodeInput;
 59610                        var resolvedEdges = await _edgeRouter.ResolveOutgoingEdgesAsync(
 59611                            nodeType, conditionInput, topology, routingChatClient, nodeToken);
 124612                        var resolvedTargets = resolvedEdges.Select(e => e.Target).ToHashSet();
 111613
 272614                        foreach (var edge in outEdges)
 111615                        {
 80616                            if (!resolvedTargets.Contains(edge.Target))
 111617                            {
 12618                                skippedNodes[edge.Target] = true;
 12619                                completionSources[edge.Target].TrySetResult(string.Empty);
 111620                            }
 111621                        }
 111622                    }
 111623
 78624                    var totalTokens = diag.AggregateTokenUsage.TotalTokens;
 78625                    progress?.Report(new ProgressEvents.AgentCompletedEvent(
 78626                        DateTimeOffset.UtcNow,
 78627                        progress.WorkflowId,
 78628                        agentName,
 78629                        null,
 78630                        progress.Depth + 1,
 78631                        progress.NextSequence(),
 78632                        AgentName: agentName,
 78633                        Duration: nodeElapsed,
 78634                        TotalTokens: totalTokens,
 78635                        InputTokens: diag.AggregateTokenUsage.InputTokens,
 78636                        OutputTokens: diag.AggregateTokenUsage.OutputTokens));
 111637
 78638                    completionSources[nodeType].TrySetResult(text);
 78639                }
 1640                catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
 111641                {
 111642                    // Graceful cancellation from a WaitAny scope — treat as skip.
 1643                    skippedNodes[nodeType] = true;
 1644                    completionSources[nodeType].TrySetResult(string.Empty);
 1645                }
 16646                catch (Exception ex)
 111647                {
 16648                    var agentName = agents.TryGetValue(nodeType, out var a)
 16649                        ? a.Name ?? nodeType.Name
 16650                        : nodeType.Name;
 111651
 16652                    nodeExceptions[nodeType] = ex;
 111653
 16654                    progress?.Report(new ProgressEvents.AgentFailedEvent(
 16655                        DateTimeOffset.UtcNow,
 16656                        progress.WorkflowId,
 16657                        agentName,
 16658                        null,
 16659                        progress.Depth + 1,
 16660                        progress.NextSequence(),
 16661                        AgentName: agentName,
 16662                        ErrorMessage: ex.Message));
 111663
 16664                    if (IsNodeRequiredByAllIncomingEdges(nodeType, topology))
 111665                    {
 14666                        completionSources[nodeType].TrySetException(ex);
 111667                    }
 111668                    else
 111669                    {
 2670                        completionSources[nodeType].TrySetResult(string.Empty);
 111671                    }
 16672                }
 218673            }, cancellationToken));
 674        }
 675
 30676        Exception? dagException = null;
 677        bool succeeded;
 30678        string? errorMessage = null;
 679
 680        try
 681        {
 30682            await Task.WhenAll(nodeTasks).WaitAsync(cancellationToken);
 683
 29684            var requiredFailures = nodeExceptions
 16685                .Where(kv => IsNodeRequiredByAllIncomingEdges(kv.Key, topology))
 29686                .ToList();
 687
 29688            succeeded = requiredFailures.Count == 0;
 29689            if (!succeeded)
 690            {
 6691                var firstError = requiredFailures.First().Value;
 6692                errorMessage = firstError.Message;
 20693                dagException = new AggregateException(requiredFailures.Select(kv => kv.Value));
 694            }
 29695        }
 1696        catch (Exception ex)
 697        {
 1698            succeeded = false;
 1699            errorMessage = ex.Message;
 1700            dagException = ex;
 1701        }
 702        finally
 703        {
 98704            foreach (var cts in waitAnyCtsMap.Values)
 705            {
 19706                cts.Dispose();
 707            }
 708        }
 709
 30710        var totalDuration = Stopwatch.GetElapsedTime(dagStart);
 711
 30712        var nodeResultsDict = new Dictionary<string, IDagNodeResult>();
 30713        var stagesList = new List<IAgentStageResult>();
 714
 282715        foreach (var type in topology.AllTypes)
 716        {
 111717            if (skippedNodes.ContainsKey(type))
 718                continue;
 719
 98720            var agentName = agents.TryGetValue(type, out var ag)
 98721                ? ag.Name ?? type.Name
 98722                : type.Name;
 98723            var (startOffsetVal, duration) = nodeTimings.GetValueOrDefault(type, (TimeSpan.Zero, TimeSpan.Zero));
 98724            var diag = nodeDiagnostics.GetValueOrDefault(type);
 725
 98726            ChatResponse? finalResponse = null;
 98727            if (completionSources[type].Task.IsCompletedSuccessfully)
 728            {
 80729                var text = completionSources[type].Task.Result;
 80730                if (!string.IsNullOrEmpty(text))
 731                {
 78732                    finalResponse = new ChatResponse(new ChatMessage(ChatRole.Assistant, text));
 733                }
 734            }
 735
 98736            var nodeResult = new DagNodeResult(
 98737                nodeId: type.FullName ?? type.Name,
 98738                agentName: agentName,
 98739                kind: NodeKind.Agent,
 98740                diagnostics: diag,
 98741                finalResponse: finalResponse,
 98742                inboundEdges: topology.InboundEdges.GetValueOrDefault(type, [])
 88743                    .Select(t => t.FullName ?? t.Name).ToList(),
 98744                outboundEdges: topology.OutboundEdges.GetValueOrDefault(type, [])
 94745                    .Select(t => t.FullName ?? t.Name).ToList(),
 98746                startOffset: startOffsetVal,
 98747                duration: duration);
 748
 98749            nodeResultsDict[type.FullName ?? type.Name] = nodeResult;
 98750            stagesList.Add(new AgentStageResult(agentName, finalResponse, diag));
 751        }
 752
 30753        var branchResults = new Dictionary<string, IReadOnlyList<IAgentStageResult>>();
 30754        var branchIndex = 0;
 30755        var nodesByInbound = topology.AllTypes
 111756            .Where(t => !skippedNodes.ContainsKey(t) &&
 111757                        topology.InboundEdges.ContainsKey(t) &&
 111758                        topology.InboundEdges[t].Count > 0)
 68759            .GroupBy(t => string.Join(",",
 68760                topology.InboundEdges[t]
 88761                    .Select(dep => dep.FullName ?? dep.Name)
 156762                    .OrderBy(n => n)))
 83763            .Where(g => g.Count() > 1);
 90764        foreach (var group in nodesByInbound)
 765        {
 15766            var groupStages = group
 30767                .Select(t => stagesList.FirstOrDefault(s =>
 105768                    s.AgentName == (agents.TryGetValue(t, out var a)
 105769                        ? a.Name ?? t.Name
 105770                        : t.Name)))
 30771                .Where(s => s is not null)
 15772                .Cast<IAgentStageResult>()
 15773                .ToList();
 15774            if (groupStages.Count > 1)
 15775                branchResults[$"branch-{branchIndex++}"] = groupStages;
 776        }
 777
 30778        progress?.Report(new ProgressEvents.WorkflowCompletedEvent(
 30779            DateTimeOffset.UtcNow,
 30780            progress.WorkflowId,
 30781            progress.AgentId,
 30782            null,
 30783            progress.Depth,
 30784            progress.NextSequence(),
 30785            Succeeded: succeeded,
 30786            ErrorMessage: errorMessage,
 30787            TotalDuration: totalDuration));
 788
 30789        return new DagRunResult(
 30790            stages: stagesList,
 30791            nodeResults: nodeResultsDict,
 30792            branchResults: branchResults,
 30793            totalDuration: totalDuration,
 30794            succeeded: succeeded,
 30795            errorMessage: errorMessage,
 30796            exception: dagException);
 30797    }
 798
 799    private static bool IsNodeRequiredByAllIncomingEdges(Type nodeType, GraphTopology topology)
 800    {
 32801        var incomingDeps = topology.IncomingTypes.GetValueOrDefault(nodeType, []);
 32802        if (incomingDeps.Count == 0)
 6803            return true;
 804
 112805        foreach (var dep in incomingDeps)
 806        {
 32807            if (topology.EdgeIsRequired.TryGetValue((dep, nodeType), out var isReq) && !isReq)
 4808                return false;
 809        }
 810
 22811        return true;
 4812    }
 813
 814    private static bool IsOptionalEdge(Type sourceType, Type targetType, GraphTopology topology)
 815    {
 5816        if (topology.EdgeIsRequired.TryGetValue((sourceType, targetType), out var isReq))
 5817            return !isReq;
 0818        return false;
 819    }
 820}