< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Workflows.StreamingRunWorkflowExtensions
Assembly: NexusLabs.Needlr.AgentFramework.Workflows
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/StreamingRunWorkflowExtensions.cs
Line coverage
64%
Covered lines: 70
Uncovered lines: 38
Coverable lines: 108
Total lines: 278
Line coverage: 64.8%
Branch coverage
71%
Covered branches: 40
Total branches: 56
Branch coverage: 71.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
RunAsync()100%1150%
RunAsync()100%210%
RunAsync()0%24620%
RunAsync()0%110100%
CollectAgentResponsesAsync(...)100%210%
CollectFromEventsAsync()100%1212100%
CollectWithTerminationAsync()100%2424100%
ShouldTerminate(...)100%44100%
FinalizeResponses(...)100%11100%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/StreamingRunWorkflowExtensions.cs

#LineLine coverage
 1using Microsoft.Agents.AI.Workflows;
 2using Microsoft.Extensions.AI;
 3using NexusLabs.Needlr.AgentFramework;
 4
 5namespace NexusLabs.Needlr.AgentFramework.Workflows;
 6
 7/// <summary>
 8/// Extension methods on <see cref="StreamingRun"/> and <see cref="Workflow"/> for collecting agent responses.
 9/// </summary>
 10public static class StreamingRunWorkflowExtensions
 11{
 12    /// <summary>
 13    /// Creates a streaming execution of the workflow, sends the message, and collects all agent responses.
 14    /// </summary>
 15    /// <param name="workflow">The workflow to execute.</param>
 16    /// <param name="message">The user message to send to the workflow.</param>
 17    /// <param name="cancellationToken">Optional cancellation token.</param>
 18    /// <returns>
 19    /// A dictionary mapping each agent's executor ID to its complete response text.
 20    /// Agents that emitted no text produce no entry.
 21    /// </returns>
 22    public static async Task<IReadOnlyDictionary<string, string>> RunAsync(
 23        this Workflow workflow,
 24        string message,
 25        CancellationToken cancellationToken = default)
 26    {
 327        ArgumentNullException.ThrowIfNull(workflow);
 228        ArgumentException.ThrowIfNullOrEmpty(message);
 029        return await workflow.RunAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
 030    }
 31
 32    /// <summary>
 33    /// Creates a streaming execution of the workflow, sends the message, collects all agent
 34    /// responses, and stops early when any termination condition is met.
 35    /// </summary>
 36    /// <param name="workflow">The workflow to execute.</param>
 37    /// <param name="message">The user message to send to the workflow.</param>
 38    /// <param name="terminationConditions">
 39    /// Conditions evaluated after each completed agent turn. The first condition that returns
 40    /// <see langword="true"/> causes the loop to stop and remaining responses to be discarded.
 41    /// Pass an empty collection (or <see langword="null"/>) to disable Layer 2 termination.
 42    /// </param>
 43    /// <param name="cancellationToken">Optional cancellation token.</param>
 44    /// <returns>
 45    /// A dictionary mapping each agent's executor ID to its complete response text up to the
 46    /// point of termination. Agents that emitted no text produce no entry.
 47    /// </returns>
 48    public static async Task<IReadOnlyDictionary<string, string>> RunAsync(
 49        this Workflow workflow,
 50        string message,
 51        IReadOnlyList<IWorkflowTerminationCondition>? terminationConditions,
 52        CancellationToken cancellationToken = default)
 53    {
 054        ArgumentNullException.ThrowIfNull(workflow);
 055        ArgumentException.ThrowIfNullOrEmpty(message);
 056        return await workflow.RunAsync(
 057            new ChatMessage(ChatRole.User, message),
 058            terminationConditions,
 059            cancellationToken);
 060    }
 61
 62    /// <summary>
 63    /// Creates a streaming execution of the workflow, sends the message, and collects all agent responses.
 64    /// </summary>
 65    /// <param name="workflow">The workflow to execute.</param>
 66    /// <param name="message">The chat message to send to the workflow.</param>
 67    /// <param name="cancellationToken">Optional cancellation token.</param>
 68    /// <returns>
 69    /// A dictionary mapping each agent's executor ID to its complete response text.
 70    /// Agents that emitted no text produce no entry.
 71    /// </returns>
 72    public static async Task<IReadOnlyDictionary<string, string>> RunAsync(
 73        this Workflow workflow,
 74        ChatMessage message,
 75        CancellationToken cancellationToken = default)
 76    {
 277        ArgumentNullException.ThrowIfNull(workflow);
 178        ArgumentNullException.ThrowIfNull(message);
 079        await using var run = await InProcessExecution.RunStreamingAsync(workflow, message, cancellationToken: cancellat
 080        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
 81
 82        // Register CancelRunAsync on the cancellation token so the workflow
 83        // actually stops (e.g., when a token budget is exceeded).
 084        await using var registration = cancellationToken.CanBeCanceled
 085            ? cancellationToken.Register(() => _ = run.CancelRunAsync())
 086            : default(CancellationTokenRegistration?);
 87
 088        var result = await run.CollectAgentResponsesAsync(cancellationToken);
 89
 90        // If the cancellation token fired during execution (e.g., budget exceeded),
 91        // throw now. MAF may have swallowed the cancellation internally.
 092        cancellationToken.ThrowIfCancellationRequested();
 93
 094        return result;
 095    }
 96
 97    /// <summary>
 98    /// Creates a streaming execution of the workflow, sends the message, collects all agent
 99    /// responses, and stops early when any termination condition is met.
 100    /// </summary>
 101    /// <param name="workflow">The workflow to execute.</param>
 102    /// <param name="message">The chat message to send to the workflow.</param>
 103    /// <param name="terminationConditions">
 104    /// Conditions evaluated after each completed agent turn. The first condition that returns
 105    /// <see langword="true"/> causes the loop to stop and remaining responses to be discarded.
 106    /// Pass an empty collection (or <see langword="null"/>) to disable Layer 2 termination.
 107    /// </param>
 108    /// <param name="cancellationToken">Optional cancellation token.</param>
 109    /// <returns>
 110    /// A dictionary mapping each agent's executor ID to its complete response text up to the
 111    /// point of termination. Agents that emitted no text produce no entry.
 112    /// </returns>
 113    public static async Task<IReadOnlyDictionary<string, string>> RunAsync(
 114        this Workflow workflow,
 115        ChatMessage message,
 116        IReadOnlyList<IWorkflowTerminationCondition>? terminationConditions,
 117        CancellationToken cancellationToken = default)
 118    {
 0119        ArgumentNullException.ThrowIfNull(workflow);
 0120        ArgumentNullException.ThrowIfNull(message);
 0121        await using var run = await InProcessExecution.RunStreamingAsync(workflow, message, cancellationToken: cancellat
 0122        await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
 123
 0124        await using var registration = cancellationToken.CanBeCanceled
 0125            ? cancellationToken.Register(() => _ = run.CancelRunAsync())
 0126            : default(CancellationTokenRegistration?);
 127
 0128        if (terminationConditions is null || terminationConditions.Count == 0)
 129        {
 0130            var result = await run.CollectAgentResponsesAsync(cancellationToken);
 0131            cancellationToken.ThrowIfCancellationRequested();
 0132            return result;
 133        }
 134
 0135        var terminationResult = await CollectWithTerminationAsync(
 0136            run.WatchStreamAsync(cancellationToken),
 0137            terminationConditions,
 0138            cancellationToken);
 0139        cancellationToken.ThrowIfCancellationRequested();
 0140        return terminationResult;
 0141    }
 142
 143    /// <summary>
 144    /// Collects all agent response text from a streaming run, grouped by executor ID.
 145    /// </summary>
 146    /// <returns>
 147    /// A dictionary mapping each agent's executor ID to its complete response text.
 148    /// Agents that emitted no text produce no entry.
 149    /// </returns>
 150    public static Task<IReadOnlyDictionary<string, string>> CollectAgentResponsesAsync(
 151        this StreamingRun run,
 152        CancellationToken cancellationToken = default)
 153    {
 0154        ArgumentNullException.ThrowIfNull(run);
 0155        return CollectFromEventsAsync(run.WatchStreamAsync(cancellationToken));
 156    }
 157
 158    internal static async Task<IReadOnlyDictionary<string, string>> CollectFromEventsAsync(
 159        IAsyncEnumerable<WorkflowEvent> events)
 160    {
 5161        var responses = new Dictionary<string, System.Text.StringBuilder>();
 162
 30163        await foreach (var evt in events)
 164        {
 10165            if (evt is AgentResponseUpdateEvent update
 10166                && update.ExecutorId is not null
 10167                && update.Data is not null)
 168            {
 10169                var text = update.Data.ToString();
 10170                if (string.IsNullOrEmpty(text))
 171                    continue;
 172
 9173                if (!responses.TryGetValue(update.ExecutorId, out var sb))
 6174                    responses[update.ExecutorId] = sb = new System.Text.StringBuilder();
 175
 9176                sb.Append(text);
 177            }
 178        }
 179
 5180        return responses.ToDictionary(
 6181            kv => kv.Key,
 11182            kv => kv.Value.ToString());
 5183    }
 184
 185    internal static async Task<IReadOnlyDictionary<string, string>> CollectWithTerminationAsync(
 186        IAsyncEnumerable<WorkflowEvent> events,
 187        IReadOnlyList<IWorkflowTerminationCondition> conditions,
 188        CancellationToken cancellationToken)
 189    {
 8190        var responses = new Dictionary<string, System.Text.StringBuilder>();
 8191        var history = new List<ChatMessage>();
 8192        var turnCount = 0;
 193
 8194        string? currentExecutorId = null;
 195
 48196        await foreach (var evt in events.WithCancellation(cancellationToken))
 197        {
 18198            if (evt is not AgentResponseUpdateEvent update
 18199                || update.ExecutorId is null
 18200                || update.Data is null)
 201            {
 202                continue;
 203            }
 204
 18205            var text = update.Data.ToString();
 18206            if (string.IsNullOrEmpty(text))
 207                continue;
 208
 209            // Detect executor change — previous agent's turn is complete
 18210            if (currentExecutorId is not null
 18211                && currentExecutorId != update.ExecutorId
 18212                && responses.TryGetValue(currentExecutorId, out var completedSb))
 213            {
 9214                var responseText = completedSb.ToString();
 9215                turnCount++;
 9216                var completedMessage = new ChatMessage(ChatRole.Assistant, responseText);
 9217                history.Add(completedMessage);
 218
 9219                var ctx = new TerminationContext
 9220                {
 9221                    AgentId = currentExecutorId,
 9222                    LastMessage = completedMessage,
 9223                    TurnCount = turnCount,
 9224                    ConversationHistory = history,
 9225                };
 226
 9227                if (ShouldTerminate(ctx, conditions))
 4228                    return FinalizeResponses(responses);
 229            }
 230
 14231            currentExecutorId = update.ExecutorId;
 232
 14233            if (!responses.TryGetValue(update.ExecutorId, out var sb))
 12234                responses[update.ExecutorId] = sb = new System.Text.StringBuilder();
 235
 14236            sb.Append(text);
 237        }
 238
 239        // Check the last agent's turn
 4240        if (currentExecutorId is not null
 4241            && responses.TryGetValue(currentExecutorId, out var lastSb))
 242        {
 3243            var responseText = lastSb.ToString();
 3244            turnCount++;
 3245            var lastMessage = new ChatMessage(ChatRole.Assistant, responseText);
 3246            history.Add(lastMessage);
 247
 3248            var ctx = new TerminationContext
 3249            {
 3250                AgentId = currentExecutorId,
 3251                LastMessage = lastMessage,
 3252                TurnCount = turnCount,
 3253                ConversationHistory = history,
 3254            };
 255
 3256            ShouldTerminate(ctx, conditions); // evaluate but don't stop — stream already ended
 257        }
 258
 4259        return FinalizeResponses(responses);
 8260    }
 261
 262    private static bool ShouldTerminate(
 263        TerminationContext ctx,
 264        IReadOnlyList<IWorkflowTerminationCondition> conditions)
 265    {
 40266        foreach (var condition in conditions)
 267        {
 10268            if (condition.ShouldTerminate(ctx))
 4269                return true;
 270        }
 8271        return false;
 4272    }
 273
 274    private static IReadOnlyDictionary<string, string> FinalizeResponses(
 275        Dictionary<string, System.Text.StringBuilder> responses)
 32276        => responses.ToDictionary(kv => kv.Key, kv => kv.Value.ToString());
 277}
 278