< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Workflows.Diagnostics.DiagnosticsAgentRunMiddleware
Assembly: NexusLabs.Needlr.AgentFramework.Workflows
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Diagnostics/DiagnosticsAgentRunMiddleware.cs
Line coverage
95%
Covered lines: 92
Uncovered lines: 4
Coverable lines: 96
Total lines: 220
Line coverage: 95.8%
Branch coverage
66%
Covered branches: 44
Total branches: 66
Branch coverage: 66.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
HandleStreamingAsync()64.28%2828100%
HandleAsync()40%222084%
SynthesizeResponse(...)100%1818100%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Diagnostics/DiagnosticsAgentRunMiddleware.cs

#LineLine coverage
 1using System.Diagnostics;
 2using System.Runtime.CompilerServices;
 3
 4using Microsoft.Agents.AI;
 5using Microsoft.Extensions.AI;
 6
 7using NexusLabs.Needlr.AgentFramework.Diagnostics;
 8
 9namespace NexusLabs.Needlr.AgentFramework.Workflows.Diagnostics;
 10
 11/// <summary>
 12/// Outermost middleware layer: wraps <c>agent.RunAsync()</c> and
 13/// <c>agent.RunStreamingAsync()</c> to capture per-run diagnostics including
 14/// total duration, message counts, and success/failure state. Emits
 15/// <see cref="IAgentMetrics"/> counters on start and completion.
 16/// </summary>
 17/// <remarks>
 18/// Both the non-streaming and streaming paths produce equivalent
 19/// <see cref="IAgentRunDiagnostics"/> via <see cref="IAgentDiagnosticsWriter.Set"/>.
 20/// </remarks>
 21internal sealed class DiagnosticsAgentRunMiddleware
 22{
 23    private readonly string _agentName;
 24    private readonly IAgentDiagnosticsWriter _writer;
 25    private readonly IAgentMetrics _metrics;
 26
 4627    internal DiagnosticsAgentRunMiddleware(
 4628        string agentName,
 4629        IAgentDiagnosticsWriter writer,
 4630        IAgentMetrics metrics)
 31    {
 4632        _agentName = agentName;
 4633        _writer = writer;
 4634        _metrics = metrics;
 4635    }
 36
 37    internal async IAsyncEnumerable<AgentResponseUpdate> HandleStreamingAsync(
 38        IEnumerable<ChatMessage> messages,
 39        AgentSession? session,
 40        AgentRunOptions? options,
 41        AIAgent innerAgent,
 42        [EnumeratorCancellation] CancellationToken cancellationToken)
 43    {
 2544        var resolvedName = !string.IsNullOrEmpty(innerAgent.Name) ? innerAgent.Name : _agentName;
 45
 2546        _metrics.RecordRunStarted(resolvedName);
 2547        using var activity = _metrics.ActivitySource.StartActivity($"agent.run {resolvedName}", ActivityKind.Internal);
 2548        activity?.SetTag("gen_ai.agent.name", resolvedName);
 2549        activity?.SetTag("gen_ai.agent.streaming", true);
 50
 2551        using var builder = AgentRunDiagnosticsBuilder.StartNew(resolvedName);
 52
 2553        var messageList = messages as IReadOnlyList<ChatMessage> ?? messages.ToList();
 2554        builder.RecordInputMessageCount(messageList.Count);
 2555        builder.RecordInputMessages(messageList);
 56
 2557        var messageIds = new HashSet<string>(StringComparer.Ordinal);
 2558        var accumulated = new List<AgentResponseUpdate>();
 2559        Exception? failure = null;
 60
 2561        var enumerator = innerAgent
 2562            .RunStreamingAsync(messageList, session, options, cancellationToken)
 2563            .GetAsyncEnumerator(cancellationToken);
 64        try
 65        {
 66            while (true)
 67            {
 68                AgentResponseUpdate update;
 69                try
 70                {
 5871                    if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
 72                    {
 2273                        break;
 74                    }
 3375                    update = enumerator.Current;
 3376                }
 377                catch (Exception ex)
 78                {
 379                    failure = ex;
 380                    break;
 81                }
 82
 3383                if (!string.IsNullOrEmpty(update.MessageId))
 84                {
 1285                    messageIds.Add(update.MessageId);
 86                }
 3387                accumulated.Add(update);
 88
 3389                yield return update;
 90            }
 91        }
 92        finally
 93        {
 2594            await enumerator.DisposeAsync().ConfigureAwait(false);
 95        }
 96
 2597        builder.RecordOutputMessageCount(messageIds.Count);
 2598        builder.RecordOutputResponse(SynthesizeResponse(accumulated));
 99
 25100        if (failure is not null)
 101        {
 3102            builder.RecordFailure(failure.Message);
 3103            activity?.SetStatus(ActivityStatusCode.Error, failure.Message);
 104        }
 105
 25106        var diagnostics = builder.Build();
 25107        _writer.Set(diagnostics);
 25108        _metrics.RecordRunCompleted(diagnostics);
 109
 25110        activity?.SetTag("status", diagnostics.Succeeded ? "success" : "failed");
 25111        activity?.SetTag("gen_ai.usage.input_tokens", diagnostics.AggregateTokenUsage.InputTokens);
 25112        activity?.SetTag("gen_ai.usage.output_tokens", diagnostics.AggregateTokenUsage.OutputTokens);
 25113        activity?.SetTag("gen_ai.usage.total_tokens", diagnostics.AggregateTokenUsage.TotalTokens);
 114
 25115        if (failure is not null)
 116        {
 3117            throw failure;
 118        }
 22119    }
 120
 121    internal async Task<AgentResponse> HandleAsync(
 122        IEnumerable<ChatMessage> messages,
 123        AgentSession? session,
 124        AgentRunOptions? options,
 125        AIAgent innerAgent,
 126        CancellationToken cancellationToken)
 127    {
 128        // Resolve the agent name at runtime from the inner agent. The plugin creates
 129        // this middleware before the agent is fully built, so the name passed at
 130        // construction time is a fallback.
 21131        var resolvedName = !string.IsNullOrEmpty(innerAgent.Name) ? innerAgent.Name : _agentName;
 132
 21133        _metrics.RecordRunStarted(resolvedName);
 21134        using var activity = _metrics.ActivitySource.StartActivity($"agent.run {resolvedName}", ActivityKind.Internal);
 21135        activity?.SetTag("gen_ai.agent.name", resolvedName);
 136
 21137        using var builder = AgentRunDiagnosticsBuilder.StartNew(resolvedName);
 138
 139        try
 140        {
 21141            var messageList = messages as IReadOnlyList<ChatMessage> ?? messages.ToList();
 21142            builder.RecordInputMessageCount(messageList.Count);
 21143            builder.RecordInputMessages(messageList);
 144
 21145            var response = await innerAgent.RunAsync(messageList, session, options, cancellationToken)
 21146                .ConfigureAwait(false);
 147
 21148            builder.RecordOutputMessageCount(response.Messages?.Count ?? 0);
 21149            builder.RecordOutputResponse(response);
 150
 21151            return response;
 152        }
 0153        catch (Exception ex)
 154        {
 0155            builder.RecordFailure(ex.Message);
 0156            activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 0157            throw;
 158        }
 159        finally
 160        {
 21161            var diagnostics = builder.Build();
 21162            _writer.Set(diagnostics);
 21163            _metrics.RecordRunCompleted(diagnostics);
 164
 21165            activity?.SetTag("status", diagnostics.Succeeded ? "success" : "failed");
 21166            activity?.SetTag("gen_ai.usage.input_tokens", diagnostics.AggregateTokenUsage.InputTokens);
 21167            activity?.SetTag("gen_ai.usage.output_tokens", diagnostics.AggregateTokenUsage.OutputTokens);
 21168            activity?.SetTag("gen_ai.usage.total_tokens", diagnostics.AggregateTokenUsage.TotalTokens);
 169        }
 21170    }
 171
 172    /// <summary>
 173    /// Synthesizes an <see cref="AgentResponse"/> from the raw stream of
 174    /// <see cref="AgentResponseUpdate"/> items observed during a streaming run.
 175    /// Groups contents by <c>MessageId</c> so each logical message becomes one
 176    /// <see cref="ChatMessage"/>. Updates with no <c>MessageId</c> are grouped
 177    /// positionally so partial streams (mid-failure) still capture what was
 178    /// observed. Returns <see langword="null"/> when no updates were received.
 179    /// </summary>
 180    private static AgentResponse? SynthesizeResponse(List<AgentResponseUpdate> updates)
 181    {
 25182        if (updates.Count == 0)
 183        {
 1184            return null;
 185        }
 186
 24187        var order = new List<string>();
 24188        var groups = new Dictionary<string, (ChatRole Role, List<AIContent> Contents, string? AuthorName)>(StringCompare
 189
 114190        for (var i = 0; i < updates.Count; i++)
 191        {
 33192            var u = updates[i];
 33193            var key = !string.IsNullOrEmpty(u.MessageId) ? u.MessageId : $"__ordinal_{i}";
 33194            if (!groups.TryGetValue(key, out var entry))
 195            {
 29196                entry = (u.Role ?? ChatRole.Assistant, new List<AIContent>(), u.AuthorName);
 29197                groups[key] = entry;
 29198                order.Add(key);
 199            }
 33200            if (u.Contents is { Count: > 0 })
 201            {
 33202                entry.Contents.AddRange(u.Contents);
 203            }
 204        }
 205
 24206        var messages = new List<ChatMessage>(order.Count);
 106207        foreach (var k in order)
 208        {
 29209            var (role, contents, authorName) = groups[k];
 29210            var msg = new ChatMessage(role, contents);
 29211            if (!string.IsNullOrEmpty(authorName))
 212            {
 29213                msg.AuthorName = authorName;
 214            }
 29215            messages.Add(msg);
 216        }
 217
 24218        return new AgentResponse(messages);
 219    }
 220}