< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Workflows.Diagnostics.PipelineRunExtensions
Assembly: NexusLabs.Needlr.AgentFramework.Workflows
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Diagnostics/PipelineRunExtensions.cs
Line coverage
73%
Covered lines: 210
Uncovered lines: 76
Coverable lines: 286
Total lines: 494
Line coverage: 73.4%
Branch coverage
63%
Covered branches: 74
Total branches: 116
Branch coverage: 63.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
RunWithDiagnosticsAsync(...)100%11100%
RunWithDiagnosticsAsync(...)100%210%
RunWithDiagnosticsAsync(...)100%210%
RunWithDiagnosticsAsync()75%838092.26%
BuildStageResultFromCompletions(...)50%12420.68%
PartitionCompletionsByAgent(...)62.5%251666.66%
PartitionToolCallsByAgent(...)12.5%1921611.76%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Diagnostics/PipelineRunExtensions.cs

#LineLine coverage
 1using System.Diagnostics;
 2using System.Text;
 3
 4using Microsoft.Agents.AI.Workflows;
 5using Microsoft.Extensions.AI;
 6
 7using NexusLabs.Needlr.AgentFramework.Diagnostics;
 8
 9using ProgressEvents = NexusLabs.Needlr.AgentFramework.Progress;
 10
 11namespace NexusLabs.Needlr.AgentFramework.Workflows.Diagnostics;
 12
 13/// <summary>
 14/// Extension methods for running workflows with per-stage diagnostics and real-time
 15/// progress reporting.
 16/// </summary>
 17public static class PipelineRunExtensions
 18{
 19    /// <summary>
 20    /// Executes the workflow with per-stage diagnostics (no progress reporting).
 21    /// </summary>
 22    public static Task<IPipelineRunResult> RunWithDiagnosticsAsync(
 23        this Workflow workflow,
 24        string message,
 25        IAgentDiagnosticsAccessor diagnosticsAccessor,
 26        CancellationToken cancellationToken = default) =>
 827        RunWithDiagnosticsAsync(workflow, message, new WorkflowRunOptions
 828        {
 829            DiagnosticsAccessor = diagnosticsAccessor,
 830            CancellationToken = cancellationToken,
 831        });
 32
 33    /// <summary>
 34    /// Executes the workflow with per-stage diagnostics and real-time progress reporting.
 35    /// </summary>
 36    public static Task<IPipelineRunResult> RunWithDiagnosticsAsync(
 37        this Workflow workflow,
 38        string message,
 39        IAgentDiagnosticsAccessor diagnosticsAccessor,
 40        ProgressEvents.IProgressReporter? progressReporter,
 41        CancellationToken cancellationToken = default) =>
 042        RunWithDiagnosticsAsync(workflow, message, new WorkflowRunOptions
 043        {
 044            DiagnosticsAccessor = diagnosticsAccessor,
 045            ProgressReporter = progressReporter,
 046            CancellationToken = cancellationToken,
 047        });
 48
 49    /// <summary>
 50    /// Executes the workflow with per-stage diagnostics, real-time progress reporting,
 51    /// and per-LLM-call completion draining via the provided collector.
 52    /// </summary>
 53    public static Task<IPipelineRunResult> RunWithDiagnosticsAsync(
 54        this Workflow workflow,
 55        string message,
 56        IAgentDiagnosticsAccessor diagnosticsAccessor,
 57        ProgressEvents.IProgressReporter? progressReporter,
 58        IChatCompletionCollector? completionCollector,
 59        ProgressEvents.IProgressReporterAccessor? progressReporterAccessor = null,
 60        CancellationToken cancellationToken = default) =>
 061        RunWithDiagnosticsAsync(workflow, message, new WorkflowRunOptions
 062        {
 063            DiagnosticsAccessor = diagnosticsAccessor,
 064            ProgressReporter = progressReporter,
 065            CompletionCollector = completionCollector,
 066            ProgressReporterAccessor = progressReporterAccessor,
 067            CancellationToken = cancellationToken,
 068        });
 69
 70    /// <summary>
 71    /// Executes the workflow with per-stage diagnostics and progress reporting
 72    /// configured via <see cref="WorkflowRunOptions"/>.
 73    /// </summary>
 74    /// <param name="workflow">The workflow to execute.</param>
 75    /// <param name="message">The user message to send.</param>
 76    /// <param name="options">Configuration for diagnostics, progress, and completion collection.</param>
 77    public static async Task<IPipelineRunResult> RunWithDiagnosticsAsync(
 78        this Workflow workflow,
 79        string message,
 80        WorkflowRunOptions options)
 81    {
 982        ArgumentNullException.ThrowIfNull(workflow);
 983        ArgumentException.ThrowIfNullOrEmpty(message);
 984        ArgumentNullException.ThrowIfNull(options);
 85
 986        var diagnosticsAccessor = options.DiagnosticsAccessor;
 987        var reporter = options.ProgressReporter ?? ProgressEvents.NullProgressReporter.Instance;
 988        var collector = options.CompletionCollector
 989            ?? diagnosticsAccessor.CompletionCollector
 990            ?? NullChatCompletionCollector.Instance;
 991        var toolCollector = diagnosticsAccessor.ToolCallCollector;
 992        var progressReporterAccessor = options.ProgressReporterAccessor;
 993        var cancellationToken = options.CancellationToken;
 994        var pipelineStart = Stopwatch.StartNew();
 995        var stages = new List<IAgentStageResult>();
 996        var responses = new Dictionary<string, StringBuilder>();
 997        var invocations = new List<(string ExecutorId, DateTimeOffset InvokedAt)>();
 998        string? currentExecutorId = null;
 999        bool succeeded = true;
 9100        string? errorMessage = null;
 9101        Exception? caughtException = null;
 9102        int superStepCount = 0;
 103
 9104        collector.DrainCompletions(); // drain stale
 9105        toolCollector?.DrainToolCalls(); // drain stale
 106
 107        // Set the progress reporter on the AsyncLocal accessor so chat/tool middleware
 108        // can emit LLM call and tool call events in real-time.
 9109        var progressScope = progressReporterAccessor?.BeginScope(reporter);
 110
 9111        reporter.Report(new ProgressEvents.WorkflowStartedEvent(
 9112            Timestamp: DateTimeOffset.UtcNow,
 9113            WorkflowId: reporter.WorkflowId,
 9114            AgentId: null,
 9115            ParentAgentId: null,
 9116            Depth: 0,
 9117            SequenceNumber: reporter.NextSequence()));
 118
 119        try
 120        {
 9121            using (diagnosticsAccessor.BeginCapture())
 122            {
 9123                await using var run = await InProcessExecution.RunStreamingAsync(
 9124                    workflow,
 9125                    new ChatMessage(ChatRole.User, message),
 9126                    cancellationToken: cancellationToken);
 127
 9128                await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
 129
 9130                CancellationTokenRegistration? budgetRegistration = null;
 9131                if (cancellationToken.CanBeCanceled)
 132                {
 9133                    budgetRegistration = cancellationToken.Register(() =>
 9134                    {
 0135                        _ = run.CancelRunAsync();
 9136                    });
 137                }
 138
 139                try
 140                {
 648141                await foreach (var evt in run.WatchStreamAsync(cancellationToken))
 142                {
 315143                    if (evt is ExecutorInvokedEvent invoked)
 144                    {
 98145                        var invokedId = invoked.ExecutorId ?? "unknown";
 98146                        invocations.Add((invokedId, DateTimeOffset.UtcNow));
 147
 98148                        reporter.Report(new ProgressEvents.AgentInvokedEvent(
 98149                            Timestamp: DateTimeOffset.UtcNow,
 98150                            WorkflowId: reporter.WorkflowId,
 98151                            AgentId: invokedId,
 98152                            ParentAgentId: null,
 98153                            Depth: 1,
 98154                            SequenceNumber: reporter.NextSequence(),
 98155                            AgentName: invokedId));
 98156                        continue;
 157                    }
 158
 217159                    if (evt is ExecutorFailedEvent executorFailed)
 160                    {
 1161                        succeeded = false;
 1162                        errorMessage = executorFailed.Data?.Message;
 1163                        reporter.Report(new ProgressEvents.AgentFailedEvent(
 1164                            Timestamp: DateTimeOffset.UtcNow,
 1165                            WorkflowId: reporter.WorkflowId,
 1166                            AgentId: executorFailed.ExecutorId,
 1167                            ParentAgentId: null,
 1168                            Depth: 1,
 1169                            SequenceNumber: reporter.NextSequence(),
 1170                            AgentName: executorFailed.ExecutorId ?? "unknown",
 1171                            ErrorMessage: executorFailed.Data?.Message ?? "unknown error"));
 1172                        continue;
 173                    }
 174
 216175                    if (evt is WorkflowErrorEvent workflowError)
 176                    {
 1177                        succeeded = false;
 1178                        errorMessage = workflowError.Exception?.Message;
 1179                        continue;
 180                    }
 181
 215182                    if (evt is SuperStepStartedEvent)
 183                    {
 41184                        superStepCount++;
 41185                        reporter.Report(new ProgressEvents.SuperStepStartedProgressEvent(
 41186                            Timestamp: DateTimeOffset.UtcNow,
 41187                            WorkflowId: reporter.WorkflowId,
 41188                            AgentId: null,
 41189                            ParentAgentId: null,
 41190                            Depth: 0,
 41191                            SequenceNumber: reporter.NextSequence(),
 41192                            StepNumber: superStepCount));
 41193                        continue;
 194                    }
 195
 174196                    if (evt is SuperStepCompletedEvent)
 197                    {
 40198                        reporter.Report(new ProgressEvents.SuperStepCompletedProgressEvent(
 40199                            Timestamp: DateTimeOffset.UtcNow,
 40200                            WorkflowId: reporter.WorkflowId,
 40201                            AgentId: null,
 40202                            ParentAgentId: null,
 40203                            Depth: 0,
 40204                            SequenceNumber: reporter.NextSequence(),
 40205                            StepNumber: superStepCount));
 40206                        continue;
 207                    }
 208
 134209                    if (evt is not AgentResponseUpdateEvent update
 134210                        || update.ExecutorId is null
 134211                        || update.Data is null)
 212                    {
 213                        continue;
 214                    }
 215
 20216                    var text = update.Data.ToString();
 20217                    if (string.IsNullOrEmpty(text))
 218                        continue;
 219
 220                    // Emit handoff progress event at turn boundaries
 16221                    if (currentExecutorId is not null
 16222                        && currentExecutorId != update.ExecutorId)
 223                    {
 8224                        reporter.Report(new ProgressEvents.AgentHandoffEvent(
 8225                            Timestamp: DateTimeOffset.UtcNow,
 8226                            WorkflowId: reporter.WorkflowId,
 8227                            AgentId: null,
 8228                            ParentAgentId: null,
 8229                            Depth: 0,
 8230                            SequenceNumber: reporter.NextSequence(),
 8231                            FromAgentId: currentExecutorId,
 8232                            ToAgentId: update.ExecutorId));
 233                    }
 234
 16235                    currentExecutorId = update.ExecutorId;
 236
 16237                    if (!responses.TryGetValue(update.ExecutorId, out var sb))
 16238                        responses[update.ExecutorId] = sb = new StringBuilder();
 239
 16240                    sb.Append(text);
 241                }
 242
 243                // Drain all completions and partition them across agent stages.
 244                // Event loop timestamps are unreliable (events are buffered), so we use
 245                // completion timestamps (captured at actual LLM call time) for both
 246                // duration calculation and agent attribution.
 9247                var allCompletions = collector.DrainCompletions()
 16248                    .OrderBy(c => c.StartedAt)
 9249                    .ToList();
 250
 251                // Drain tool calls from the collector for the fallback path.
 9252                var allToolCalls = toolCollector?.DrainToolCalls()
 0253                    ?.OrderBy(t => t.StartedAt)
 9254                    .ToList()
 9255                    ?? [];
 256
 257                // Filter invocations to only real agents (have response text or completions
 258                // attributed by name). Skips non-agent executors like "GroupChatHost".
 9259                var agentInvocations = invocations
 98260                    .Where(inv => responses.ContainsKey(inv.ExecutorId))
 32261                    .Select(inv => inv.ExecutorId)
 9262                    .Distinct()
 9263                    .ToList();
 264
 265                // Partition completions by agent using name matching or temporal gaps.
 9266                var partitioned = PartitionCompletionsByAgent(
 9267                    allCompletions, agentInvocations);
 268
 269                // Partition tool calls by agent using AgentName attribution.
 9270                var partitionedToolCalls = PartitionToolCallsByAgent(
 9271                    allToolCalls, agentInvocations);
 272
 50273                for (int i = 0; i < agentInvocations.Count; i++)
 274                {
 16275                    var executorId = agentInvocations[i];
 16276                    var stageCompletions = i < partitioned.Count
 16277                        ? partitioned[i] : [];
 16278                    var stageToolCalls = i < partitionedToolCalls.Count
 16279                        ? partitionedToolCalls[i] : [];
 280
 16281                    var responseText = responses.TryGetValue(executorId, out var respSb)
 16282                        ? respSb.ToString()
 16283                        : string.Empty;
 284
 285                    // Duration from completion timestamps (reliable), not event timestamps.
 16286                    var duration = stageCompletions.Count > 0
 16287                        ? stageCompletions[^1].CompletedAt - stageCompletions[0].StartedAt
 16288                        : TimeSpan.Zero;
 16289                    var startedAt = stageCompletions.Count > 0
 16290                        ? stageCompletions[0].StartedAt
 16291                        : DateTimeOffset.UtcNow;
 292
 16293                    stages.Add(BuildStageResultFromCompletions(
 16294                        executorId,
 16295                        responseText,
 16296                        diagnosticsAccessor,
 16297                        stageCompletions,
 16298                        stageToolCalls,
 16299                        duration,
 16300                        startedAt));
 301
 16302                    reporter.Report(new ProgressEvents.AgentCompletedEvent(
 16303                        Timestamp: DateTimeOffset.UtcNow,
 16304                        WorkflowId: reporter.WorkflowId,
 16305                        AgentId: executorId,
 16306                        ParentAgentId: null,
 16307                        Depth: 1,
 16308                        SequenceNumber: reporter.NextSequence(),
 16309                        AgentName: executorId,
 16310                        Duration: duration,
 16311                        TotalTokens: stages[^1].Diagnostics?.AggregateTokenUsage.TotalTokens ?? 0));
 312                }
 9313                }
 314                finally
 315                {
 9316                    budgetRegistration?.Dispose();
 317                }
 9318            }
 9319        }
 0320        catch (Exception ex)
 321        {
 0322            succeeded = false;
 0323            errorMessage = ex.ToString();
 0324            caughtException = ex;
 325
 326            // If we know which agent was running when the exception propagated
 327            // out of the stream, emit an AgentFailedEvent for it so sinks see
 328            // the per-agent failure before the trailing WorkflowCompletedEvent.
 0329            if (currentExecutorId is not null)
 330            {
 0331                reporter.Report(new ProgressEvents.AgentFailedEvent(
 0332                    Timestamp: DateTimeOffset.UtcNow,
 0333                    WorkflowId: reporter.WorkflowId,
 0334                    AgentId: currentExecutorId,
 0335                    ParentAgentId: null,
 0336                    Depth: 1,
 0337                    SequenceNumber: reporter.NextSequence(),
 0338                    AgentName: currentExecutorId,
 0339                    ErrorMessage: ex.Message));
 340            }
 0341        }
 342
 9343        pipelineStart.Stop();
 344
 9345        reporter.Report(new ProgressEvents.WorkflowCompletedEvent(
 9346            Timestamp: DateTimeOffset.UtcNow,
 9347            WorkflowId: reporter.WorkflowId,
 9348            AgentId: null,
 9349            ParentAgentId: null,
 9350            Depth: 0,
 9351            SequenceNumber: reporter.NextSequence(),
 9352            Succeeded: succeeded,
 9353            ErrorMessage: errorMessage,
 9354            TotalDuration: pipelineStart.Elapsed));
 355
 9356        progressScope?.Dispose();
 357
 9358        return new PipelineRunResult(
 9359            stages: stages,
 9360            totalDuration: pipelineStart.Elapsed,
 9361            succeeded: succeeded,
 9362            errorMessage: errorMessage,
 9363            exception: caughtException);
 9364    }
 365
 366    private static IAgentStageResult BuildStageResultFromCompletions(
 367        string agentName,
 368        string responseText,
 369        IAgentDiagnosticsAccessor diagnosticsAccessor,
 370        IReadOnlyList<ChatCompletionDiagnostics> completions,
 371        IReadOnlyList<ToolCallDiagnostics> toolCalls,
 372        TimeSpan turnDuration,
 373        DateTimeOffset turnStartedAt)
 374    {
 16375        var finalResponse = string.IsNullOrEmpty(responseText)
 16376            ? null
 16377            : new ChatResponse(new ChatMessage(ChatRole.Assistant, responseText));
 378
 16379        var middlewareDiag = diagnosticsAccessor.LastRunDiagnostics;
 16380        if (middlewareDiag is not null)
 381        {
 16382            return new AgentStageResult(agentName, finalResponse, middlewareDiag);
 383        }
 384
 0385        var totalTokens = new TokenUsage(
 0386            InputTokens: completions.Sum(c => c.Tokens.InputTokens),
 0387            OutputTokens: completions.Sum(c => c.Tokens.OutputTokens),
 0388            TotalTokens: completions.Sum(c => c.Tokens.TotalTokens),
 0389            CachedInputTokens: completions.Sum(c => c.Tokens.CachedInputTokens),
 0390            ReasoningTokens: completions.Sum(c => c.Tokens.ReasoningTokens));
 391
 0392        return new AgentStageResult(
 0393            agentName,
 0394            finalResponse,
 0395            new AgentRunDiagnostics(
 0396                AgentName: agentName,
 0397                TotalDuration: turnDuration,
 0398                AggregateTokenUsage: totalTokens,
 0399                ChatCompletions: completions,
 0400                ToolCalls: toolCalls,
 0401                TotalInputMessages: 0,
 0402                TotalOutputMessages: 0,
 0403                InputMessages: [],
 0404                OutputResponse: null,
 0405                Succeeded: true,
 0406                ErrorMessage: null,
 0407                StartedAt: turnStartedAt,
 0408                CompletedAt: DateTimeOffset.UtcNow));
 409    }
 410
 411    /// <summary>
 412    /// Partitions an ordered list of completions into groups, one per agent invocation.
 413    /// First attempts name-based matching. When completions lack agent names, uses temporal
 414    /// gap analysis: finds the N-1 largest gaps between consecutive completions (where N is
 415    /// the agent count) and splits at those boundaries.
 416    /// </summary>
 417    private static IReadOnlyList<IReadOnlyList<ChatCompletionDiagnostics>> PartitionCompletionsByAgent(
 418        List<ChatCompletionDiagnostics> sorted,
 419        IReadOnlyList<string> agentExecutorIds)
 420    {
 9421        if (sorted.Count == 0 || agentExecutorIds.Count == 0)
 1422            return agentExecutorIds.Select(_ => (IReadOnlyList<ChatCompletionDiagnostics>)[]).ToList();
 423
 424        // Try name-based partitioning first.
 8425        var byName = new List<IReadOnlyList<ChatCompletionDiagnostics>>();
 8426        bool allMatched = true;
 48427        foreach (var executorId in agentExecutorIds)
 428        {
 16429            var matched = sorted
 32430                .Where(c => c.AgentName is not null
 32431                    && (executorId.Equals(c.AgentName, StringComparison.Ordinal)
 32432                        || executorId.StartsWith(c.AgentName + "_", StringComparison.Ordinal)))
 16433                .ToList();
 16434            byName.Add(matched);
 16435            if (matched.Count == 0)
 0436                allMatched = false;
 437        }
 438
 8439        if (allMatched)
 8440            return byName;
 441
 442        // Fall back to round-robin interleaving: in a RoundRobinGroupChatManager,
 443        // agents alternate turns. Completion[i] belongs to agent[i % N].
 0444        var interleaved = agentExecutorIds
 0445            .Select(_ => new List<ChatCompletionDiagnostics>())
 0446            .ToList();
 447
 0448        for (int i = 0; i < sorted.Count; i++)
 449        {
 0450            interleaved[i % agentExecutorIds.Count].Add(sorted[i]);
 451        }
 452
 0453        return interleaved.Select(l => (IReadOnlyList<ChatCompletionDiagnostics>)l).ToList();
 454    }
 455
 456    /// <summary>
 457    /// Partitions tool calls by agent using the <see cref="ToolCallDiagnostics.AgentName"/>
 458    /// field. Tool calls without an agent name are distributed to the first agent bucket.
 459    /// </summary>
 460    private static IReadOnlyList<IReadOnlyList<ToolCallDiagnostics>> PartitionToolCallsByAgent(
 461        List<ToolCallDiagnostics> sorted,
 462        IReadOnlyList<string> agentExecutorIds)
 463    {
 9464        if (sorted.Count == 0 || agentExecutorIds.Count == 0)
 25465            return agentExecutorIds.Select(_ => (IReadOnlyList<ToolCallDiagnostics>)[]).ToList();
 466
 0467        var buckets = agentExecutorIds
 0468            .Select(_ => new List<ToolCallDiagnostics>())
 0469            .ToList();
 470
 0471        foreach (var tc in sorted)
 472        {
 0473            var matched = false;
 0474            for (int i = 0; i < agentExecutorIds.Count; i++)
 475            {
 0476                if (tc.AgentName is not null
 0477                    && (agentExecutorIds[i].Equals(tc.AgentName, StringComparison.Ordinal)
 0478                        || agentExecutorIds[i].StartsWith(tc.AgentName + "_", StringComparison.Ordinal)))
 479                {
 0480                    buckets[i].Add(tc);
 0481                    matched = true;
 0482                    break;
 483                }
 484            }
 485
 0486            if (!matched)
 487            {
 0488                buckets[0].Add(tc);
 489            }
 490        }
 491
 0492        return buckets.Select(l => (IReadOnlyList<ToolCallDiagnostics>)l).ToList();
 493    }
 494}

Methods/Properties

RunWithDiagnosticsAsync(Microsoft.Agents.AI.Workflows.Workflow,System.String,NexusLabs.Needlr.AgentFramework.Diagnostics.IAgentDiagnosticsAccessor,System.Threading.CancellationToken)
RunWithDiagnosticsAsync(Microsoft.Agents.AI.Workflows.Workflow,System.String,NexusLabs.Needlr.AgentFramework.Diagnostics.IAgentDiagnosticsAccessor,NexusLabs.Needlr.AgentFramework.Progress.IProgressReporter,System.Threading.CancellationToken)
RunWithDiagnosticsAsync(Microsoft.Agents.AI.Workflows.Workflow,System.String,NexusLabs.Needlr.AgentFramework.Diagnostics.IAgentDiagnosticsAccessor,NexusLabs.Needlr.AgentFramework.Progress.IProgressReporter,NexusLabs.Needlr.AgentFramework.Diagnostics.IChatCompletionCollector,NexusLabs.Needlr.AgentFramework.Progress.IProgressReporterAccessor,System.Threading.CancellationToken)
RunWithDiagnosticsAsync()
BuildStageResultFromCompletions(System.String,System.String,NexusLabs.Needlr.AgentFramework.Diagnostics.IAgentDiagnosticsAccessor,System.Collections.Generic.IReadOnlyList`1<NexusLabs.Needlr.AgentFramework.Diagnostics.ChatCompletionDiagnostics>,System.Collections.Generic.IReadOnlyList`1<NexusLabs.Needlr.AgentFramework.Diagnostics.ToolCallDiagnostics>,System.TimeSpan,System.DateTimeOffset)
PartitionCompletionsByAgent(System.Collections.Generic.List`1<NexusLabs.Needlr.AgentFramework.Diagnostics.ChatCompletionDiagnostics>,System.Collections.Generic.IReadOnlyList`1<System.String>)
PartitionToolCallsByAgent(System.Collections.Generic.List`1<NexusLabs.Needlr.AgentFramework.Diagnostics.ToolCallDiagnostics>,System.Collections.Generic.IReadOnlyList`1<System.String>)