< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Diagnostics.DiagnosticsChatClientMiddleware
Assembly: NexusLabs.Needlr.AgentFramework
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework/Diagnostics/DiagnosticsChatClientMiddleware.cs
Line coverage
100%
Covered lines: 293
Uncovered lines: 0
Coverable lines: 293
Total lines: 479
Line coverage: 100%
Branch coverage
90%
Covered branches: 201
Total branches: 222
Branch coverage: 90.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
DrainCompletions()100%22100%
HandleAsync()87.8%8282100%
HandleStreamingAsync()90.56%106106100%
EmitGenAiTokenUsage(...)95.83%2424100%
StartChatActivity(...)100%88100%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework/Diagnostics/DiagnosticsChatClientMiddleware.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using System.Diagnostics;
 3using System.Runtime.CompilerServices;
 4
 5using Microsoft.Extensions.AI;
 6
 7using NexusLabs.Needlr.AgentFramework.Progress;
 8
 9namespace NexusLabs.Needlr.AgentFramework.Diagnostics;
 10
 11/// <summary>
 12/// Single writer for chat completion diagnostics. Wraps each
 13/// <c>IChatClient.GetResponseAsync()</c> call to capture per-completion timing,
 14/// token usage, and full request/response payloads. Records to the AsyncLocal
 15/// <see cref="AgentRunDiagnosticsBuilder"/> and a thread-safe collection (for
 16/// workflow runs where AsyncLocal doesn't propagate). Optionally emits
 17/// <see cref="LlmCallStartedEvent"/>/<see cref="LlmCallCompletedEvent"/> to the
 18/// progress reporter and OTel metrics via <see cref="IAgentMetrics"/>.
 19/// </summary>
 20/// <remarks>
 21/// <para>
 22/// <c>IterativeAgentLoop</c> wraps its chat client with this middleware
 23/// internally, making it the sole writer for <see cref="ChatCompletionDiagnostics"/>.
 24/// No other code should call <see cref="AgentRunDiagnosticsBuilder.AddChatCompletion"/>
 25/// for calls that pass through this middleware.
 26/// </para>
 27/// <para>
 28/// <see cref="IAgentMetrics"/> and <see cref="IProgressReporterAccessor"/> are optional.
 29/// When null, recording still occurs but OTel metrics and progress events are skipped.
 30/// </para>
 31/// </remarks>
 32[DoNotAutoRegister]
 33internal sealed class DiagnosticsChatClientMiddleware : IChatCompletionCollector
 34{
 35    private readonly IAgentMetrics? _metrics;
 36    private readonly IGenAiTokenMetrics? _genAiTokenMetrics;
 37    private readonly IProgressReporterAccessor? _progressAccessor;
 38    private readonly ChatCompletionActivityMode _activityMode;
 20639    private readonly ConcurrentQueue<ChatCompletionDiagnostics> _allCompletions = new();
 40    private int _sequenceCounter;
 41
 20642    internal DiagnosticsChatClientMiddleware(
 20643        IAgentMetrics? metrics = null,
 20644        IProgressReporterAccessor? progressAccessor = null,
 20645        ChatCompletionActivityMode activityMode = ChatCompletionActivityMode.Always,
 20646        IGenAiTokenMetrics? genAiTokenMetrics = null)
 47    {
 20648        _metrics = metrics;
 20649        _genAiTokenMetrics = genAiTokenMetrics;
 20650        _progressAccessor = progressAccessor;
 20651        _activityMode = activityMode;
 20652    }
 53
 54    /// <summary>
 55    /// Drains all captured completions since the last drain. Thread-safe.
 56    /// </summary>
 57    public IReadOnlyList<ChatCompletionDiagnostics> DrainCompletions()
 58    {
 2359        var results = new List<ChatCompletionDiagnostics>();
 4460        while (_allCompletions.TryDequeue(out var completion))
 61        {
 2162            results.Add(completion);
 2163        }
 2364        return results;
 65    }
 66
 67    internal async Task<ChatResponse> HandleAsync(
 68        IEnumerable<ChatMessage> messages,
 69        ChatOptions? options,
 70        IChatClient innerChatClient,
 71        CancellationToken cancellationToken)
 72    {
 30473        var builder = AgentRunDiagnosticsBuilder.GetCurrent();
 30474        var sequence = builder?.NextChatCompletionSequence()
 30475            ?? Interlocked.Increment(ref _sequenceCounter) - 1;
 30476        var startedAt = DateTimeOffset.UtcNow;
 30477        var stopwatch = Stopwatch.StartNew();
 78
 30479        var (ownedActivity, targetActivity) = StartChatActivity("agent.chat");
 30480        using var _ = ownedActivity;
 81
 30482        if (_progressAccessor is not null)
 83        {
 3584            _progressAccessor.Current.Report(new LlmCallStartedEvent(
 3585                Timestamp: startedAt,
 3586                WorkflowId: _progressAccessor.Current.WorkflowId,
 3587                AgentId: _progressAccessor.Current.AgentId,
 3588                ParentAgentId: builder?.ParentAgentName,
 3589                Depth: _progressAccessor.Current.Depth,
 3590                SequenceNumber: _progressAccessor.Current.NextSequence(),
 3591                CallSequence: sequence));
 92        }
 93
 94        try
 95        {
 30496            var response = await innerChatClient.GetResponseAsync(messages, options, cancellationToken)
 30497                .ConfigureAwait(false);
 98
 29399            stopwatch.Stop();
 100
 293101            var model = response.ModelId ?? "unknown";
 102
 293103            targetActivity?.SetTag("gen_ai.response.model", model);
 293104            targetActivity?.SetTag("agent.chat.sequence", sequence);
 293105            targetActivity?.SetTag("status", "success");
 106
 293107            _metrics?.RecordChatCompletion(model, stopwatch.Elapsed, succeeded: true, agentName: builder?.AgentName);
 108
 293109            var usage = response.Usage;
 293110            var tokens = new TokenUsage(
 293111                InputTokens: usage?.InputTokenCount ?? 0,
 293112                OutputTokens: usage?.OutputTokenCount ?? 0,
 293113                TotalTokens: usage?.TotalTokenCount ?? 0,
 293114                CachedInputTokens:
 293115                    usage?.CachedInputTokenCount
 293116                    ?? usage?.AdditionalCounts?.GetValueOrDefault("CachedInputTokens")
 293117                    ?? 0,
 293118                ReasoningTokens:
 293119                    usage?.ReasoningTokenCount
 293120                    ?? usage?.AdditionalCounts?.GetValueOrDefault("ReasoningTokens")
 293121                    ?? 0);
 122
 293123            targetActivity?.SetTag("gen_ai.usage.input_tokens", tokens.InputTokens);
 293124            targetActivity?.SetTag("gen_ai.usage.output_tokens", tokens.OutputTokens);
 293125            targetActivity?.SetTag("gen_ai.usage.cached_input_tokens", tokens.CachedInputTokens);
 293126            targetActivity?.SetTag("gen_ai.usage.reasoning_tokens", tokens.ReasoningTokens);
 127
 293128            EmitGenAiTokenUsage(tokens, options?.ModelId, response, innerChatClient);
 129
 293130            var messageList = messages as ICollection<ChatMessage> ?? messages.ToList();
 131
 293132            var diagnostics = new ChatCompletionDiagnostics(
 293133                Sequence: sequence,
 293134                Model: model,
 293135                Tokens: tokens,
 293136                InputMessageCount: messageList.Count,
 293137                Duration: stopwatch.Elapsed,
 293138                Succeeded: true,
 293139                ErrorMessage: null,
 293140                StartedAt: startedAt,
 293141                CompletedAt: DateTimeOffset.UtcNow)
 293142            {
 293143                AgentName = builder?.AgentName,
 293144                RequestMessages = messageList as IReadOnlyList<ChatMessage> ?? messageList.ToList(),
 293145                Response = response,
 293146                RequestCharCount = DiagnosticsCharCounter.ChatMessagesLength(messageList as IReadOnlyList<ChatMessage> ?
 293147                ResponseCharCount = DiagnosticsCharCounter.ChatResponseLength(response),
 293148            };
 149
 293150            builder?.AddChatCompletion(diagnostics);
 293151            _allCompletions.Enqueue(diagnostics);
 152
 293153            if (_progressAccessor is not null)
 154            {
 34155                _progressAccessor.Current.Report(new LlmCallCompletedEvent(
 34156                    Timestamp: DateTimeOffset.UtcNow,
 34157                    WorkflowId: _progressAccessor.Current.WorkflowId,
 34158                    AgentId: _progressAccessor.Current.AgentId,
 34159                    ParentAgentId: builder?.ParentAgentName,
 34160                    Depth: _progressAccessor.Current.Depth,
 34161                    SequenceNumber: _progressAccessor.Current.NextSequence(),
 34162                    CallSequence: sequence,
 34163                    Model: model,
 34164                    Duration: stopwatch.Elapsed,
 34165                    InputTokens: tokens.InputTokens,
 34166                    OutputTokens: tokens.OutputTokens,
 34167                    TotalTokens: tokens.TotalTokens));
 168            }
 169
 293170            return response;
 171        }
 11172        catch (Exception ex)
 173        {
 11174            stopwatch.Stop();
 175
 11176            targetActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);
 11177            targetActivity?.SetTag("status", "failed");
 178
 11179            _metrics?.RecordChatCompletion("unknown", stopwatch.Elapsed, succeeded: false, agentName: builder?.AgentName
 180
 11181            var failedMessageList = messages as IReadOnlyList<ChatMessage> ?? messages.ToList();
 182
 11183            var diagnostics = new ChatCompletionDiagnostics(
 11184                Sequence: sequence,
 11185                Model: "unknown",
 11186                Tokens: new TokenUsage(0, 0, 0, 0, 0),
 11187                InputMessageCount: 0,
 11188                Duration: stopwatch.Elapsed,
 11189                Succeeded: false,
 11190                ErrorMessage: ex.Message,
 11191                StartedAt: startedAt,
 11192                CompletedAt: DateTimeOffset.UtcNow)
 11193            {
 11194                AgentName = builder?.AgentName,
 11195                RequestMessages = failedMessageList,
 11196                RequestCharCount = DiagnosticsCharCounter.ChatMessagesLength(failedMessageList),
 11197            };
 198
 11199            builder?.AddChatCompletion(diagnostics);
 11200            _allCompletions.Enqueue(diagnostics);
 201
 11202            if (_progressAccessor is not null)
 203            {
 1204                _progressAccessor.Current.Report(new LlmCallFailedEvent(
 1205                    Timestamp: DateTimeOffset.UtcNow,
 1206                    WorkflowId: _progressAccessor.Current.WorkflowId,
 1207                    AgentId: _progressAccessor.Current.AgentId,
 1208                    ParentAgentId: builder?.ParentAgentName,
 1209                    Depth: _progressAccessor.Current.Depth,
 1210                    SequenceNumber: _progressAccessor.Current.NextSequence(),
 1211                    CallSequence: sequence,
 1212                    ErrorMessage: ex.Message,
 1213                    Duration: stopwatch.Elapsed));
 214            }
 215
 11216            throw;
 217        }
 293218    }
 219
 220    internal async IAsyncEnumerable<ChatResponseUpdate> HandleStreamingAsync(
 221        IEnumerable<ChatMessage> messages,
 222        ChatOptions? options,
 223        IChatClient innerChatClient,
 224        [EnumeratorCancellation] CancellationToken cancellationToken)
 225    {
 33226        var builder = AgentRunDiagnosticsBuilder.GetCurrent();
 33227        var sequence = builder?.NextChatCompletionSequence()
 33228            ?? Interlocked.Increment(ref _sequenceCounter) - 1;
 33229        var startedAt = DateTimeOffset.UtcNow;
 33230        var stopwatch = Stopwatch.StartNew();
 231
 33232        var (ownedStreamActivity, targetActivity) = StartChatActivity("agent.chat.stream");
 33233        using var _s = ownedStreamActivity;
 234
 33235        if (_progressAccessor is not null)
 236        {
 26237            _progressAccessor.Current.Report(new LlmCallStartedEvent(
 26238                Timestamp: startedAt,
 26239                WorkflowId: _progressAccessor.Current.WorkflowId,
 26240                AgentId: _progressAccessor.Current.AgentId,
 26241                ParentAgentId: builder?.ParentAgentName,
 26242                Depth: _progressAccessor.Current.Depth,
 26243                SequenceNumber: _progressAccessor.Current.NextSequence(),
 26244                CallSequence: sequence));
 245        }
 246
 33247        var messageList = messages as IReadOnlyList<ChatMessage> ?? messages.ToList();
 33248        var buffered = new List<ChatResponseUpdate>();
 33249        Exception? failure = null;
 250
 33251        var enumerable = innerChatClient.GetStreamingResponseAsync(messages, options, cancellationToken);
 32252        var enumerator = enumerable.GetAsyncEnumerator(cancellationToken);
 253
 254        try
 255        {
 256            while (true)
 257            {
 258                ChatResponseUpdate update;
 259                try
 260                {
 73261                    if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
 262                    {
 27263                        break;
 264                    }
 41265                    update = enumerator.Current;
 41266                }
 5267                catch (Exception ex)
 268                {
 5269                    failure = ex;
 5270                    break;
 271                }
 272
 41273                buffered.Add(update);
 41274                yield return update;
 275            }
 276        }
 277        finally
 278        {
 32279            await enumerator.DisposeAsync().ConfigureAwait(false);
 280        }
 281
 32282        stopwatch.Stop();
 283
 32284        var aggregated = buffered.ToChatResponse();
 285
 32286        if (failure is null)
 287        {
 27288            var model = aggregated.ModelId ?? "unknown";
 289
 27290            targetActivity?.SetTag("gen_ai.response.model", model);
 27291            targetActivity?.SetTag("agent.chat.sequence", sequence);
 27292            targetActivity?.SetTag("status", "success");
 293
 27294            _metrics?.RecordChatCompletion(model, stopwatch.Elapsed, succeeded: true, agentName: builder?.AgentName);
 295
 27296            var usage = aggregated.Usage;
 27297            var tokens = new TokenUsage(
 27298                InputTokens: usage?.InputTokenCount ?? 0,
 27299                OutputTokens: usage?.OutputTokenCount ?? 0,
 27300                TotalTokens: usage?.TotalTokenCount ?? 0,
 27301                CachedInputTokens:
 27302                    usage?.CachedInputTokenCount
 27303                    ?? usage?.AdditionalCounts?.GetValueOrDefault("CachedInputTokens")
 27304                    ?? 0,
 27305                ReasoningTokens:
 27306                    usage?.ReasoningTokenCount
 27307                    ?? usage?.AdditionalCounts?.GetValueOrDefault("ReasoningTokens")
 27308                    ?? 0);
 309
 27310            targetActivity?.SetTag("gen_ai.usage.input_tokens", tokens.InputTokens);
 27311            targetActivity?.SetTag("gen_ai.usage.output_tokens", tokens.OutputTokens);
 27312            targetActivity?.SetTag("gen_ai.usage.cached_input_tokens", tokens.CachedInputTokens);
 27313            targetActivity?.SetTag("gen_ai.usage.reasoning_tokens", tokens.ReasoningTokens);
 314
 27315            EmitGenAiTokenUsage(tokens, options?.ModelId, aggregated, innerChatClient);
 316
 27317            var diagnostics = new ChatCompletionDiagnostics(
 27318                Sequence: sequence,
 27319                Model: model,
 27320                Tokens: tokens,
 27321                InputMessageCount: messageList.Count,
 27322                Duration: stopwatch.Elapsed,
 27323                Succeeded: true,
 27324                ErrorMessage: null,
 27325                StartedAt: startedAt,
 27326                CompletedAt: DateTimeOffset.UtcNow)
 27327            {
 27328                AgentName = builder?.AgentName,
 27329                RequestMessages = messageList,
 27330                Response = aggregated,
 27331                RequestCharCount = DiagnosticsCharCounter.ChatMessagesLength(messageList),
 27332                ResponseCharCount = DiagnosticsCharCounter.ChatResponseLength(aggregated),
 27333            };
 334
 27335            builder?.AddChatCompletion(diagnostics);
 27336            _allCompletions.Enqueue(diagnostics);
 337
 27338            if (_progressAccessor is not null)
 339            {
 22340                _progressAccessor.Current.Report(new LlmCallCompletedEvent(
 22341                    Timestamp: DateTimeOffset.UtcNow,
 22342                    WorkflowId: _progressAccessor.Current.WorkflowId,
 22343                    AgentId: _progressAccessor.Current.AgentId,
 22344                    ParentAgentId: builder?.ParentAgentName,
 22345                    Depth: _progressAccessor.Current.Depth,
 22346                    SequenceNumber: _progressAccessor.Current.NextSequence(),
 22347                    CallSequence: sequence,
 22348                    Model: model,
 22349                    Duration: stopwatch.Elapsed,
 22350                    InputTokens: tokens.InputTokens,
 22351                    OutputTokens: tokens.OutputTokens,
 22352                    TotalTokens: tokens.TotalTokens));
 353            }
 354        }
 355        else
 356        {
 5357            targetActivity?.SetStatus(ActivityStatusCode.Error, failure.Message);
 5358            targetActivity?.SetTag("status", "failed");
 359
 5360            _metrics?.RecordChatCompletion("unknown", stopwatch.Elapsed, succeeded: false, agentName: builder?.AgentName
 361
 5362            var failureUsage = aggregated.Usage;
 5363            var failureTokens = new TokenUsage(
 5364                InputTokens: failureUsage?.InputTokenCount ?? 0,
 5365                OutputTokens: failureUsage?.OutputTokenCount ?? 0,
 5366                TotalTokens: failureUsage?.TotalTokenCount ?? 0,
 5367                CachedInputTokens:
 5368                    failureUsage?.CachedInputTokenCount
 5369                    ?? failureUsage?.AdditionalCounts?.GetValueOrDefault("CachedInputTokens")
 5370                    ?? 0,
 5371                ReasoningTokens:
 5372                    failureUsage?.ReasoningTokenCount
 5373                    ?? failureUsage?.AdditionalCounts?.GetValueOrDefault("ReasoningTokens")
 5374                    ?? 0);
 375
 5376            EmitGenAiTokenUsage(failureTokens, options?.ModelId, aggregated, innerChatClient);
 377
 5378            var diagnostics = new ChatCompletionDiagnostics(
 5379                Sequence: sequence,
 5380                Model: aggregated.ModelId ?? "unknown",
 5381                Tokens: failureTokens,
 5382                InputMessageCount: messageList.Count,
 5383                Duration: stopwatch.Elapsed,
 5384                Succeeded: false,
 5385                ErrorMessage: failure.Message,
 5386                StartedAt: startedAt,
 5387                CompletedAt: DateTimeOffset.UtcNow)
 5388            {
 5389                AgentName = builder?.AgentName,
 5390                RequestMessages = messageList,
 5391                Response = aggregated,
 5392                RequestCharCount = DiagnosticsCharCounter.ChatMessagesLength(messageList),
 5393                ResponseCharCount = DiagnosticsCharCounter.ChatResponseLength(aggregated),
 5394            };
 395
 5396            builder?.AddChatCompletion(diagnostics);
 5397            _allCompletions.Enqueue(diagnostics);
 398
 5399            if (_progressAccessor is not null)
 400            {
 3401                _progressAccessor.Current.Report(new LlmCallFailedEvent(
 3402                    Timestamp: DateTimeOffset.UtcNow,
 3403                    WorkflowId: _progressAccessor.Current.WorkflowId,
 3404                    AgentId: _progressAccessor.Current.AgentId,
 3405                    ParentAgentId: builder?.ParentAgentName,
 3406                    Depth: _progressAccessor.Current.Depth,
 3407                    SequenceNumber: _progressAccessor.Current.NextSequence(),
 3408                    CallSequence: sequence,
 3409                    ErrorMessage: failure.Message,
 3410                    Duration: stopwatch.Elapsed));
 411            }
 412
 5413            throw failure;
 414        }
 27415    }
 416
 417    /// <summary>
 418    /// Records <c>cache_read</c> and/or <c>reasoning</c> measurements on the
 419    /// <c>gen_ai.client.token.usage</c> histogram (the same histogram MEAI's
 420    /// <see cref="Microsoft.Extensions.AI.OpenTelemetryChatClient"/> emits <c>input</c> and
 421    /// <c>output</c> on). Short-circuits BEFORE any tag construction or
 422    /// <see cref="Microsoft.Extensions.AI.ChatClientMetadata"/> resolution when both
 423    /// counts are zero — that is the common path for non-cached, non-reasoning calls.
 424    /// </summary>
 425    private void EmitGenAiTokenUsage(
 426        TokenUsage tokens,
 427        string? requestModel,
 428        ChatResponse response,
 429        IChatClient innerChatClient)
 430    {
 325431        if (_genAiTokenMetrics is null)
 255432            return;
 433
 70434        if (tokens.CachedInputTokens <= 0 && tokens.ReasoningTokens <= 0)
 53435            return;
 436
 17437        var metadata = innerChatClient.GetService(typeof(ChatClientMetadata)) as ChatClientMetadata;
 17438        var tags = new GenAiTokenUsageTags(
 17439            OperationName: "chat",
 17440            RequestModel: requestModel ?? metadata?.DefaultModelId,
 17441            ResponseModel: response.ModelId,
 17442            ProviderName: metadata?.ProviderName,
 17443            ServerAddress: metadata?.ProviderUri?.Host,
 17444            ServerPort: metadata?.ProviderUri is { } uri ? uri.Port : null);
 445
 17446        if (tokens.CachedInputTokens > 0)
 14447            _genAiTokenMetrics.RecordTokenUsage(GenAiTokenTypes.CacheRead, tokens.CachedInputTokens, tags);
 448
 17449        if (tokens.ReasoningTokens > 0)
 8450            _genAiTokenMetrics.RecordTokenUsage(GenAiTokenTypes.Reasoning, tokens.ReasoningTokens, tags);
 17451    }
 452
 453    /// <summary>
 454    /// Creates a chat completion activity respecting <see cref="_activityMode"/>.
 455    /// When <see cref="ChatCompletionActivityMode.EnrichParent"/> is active and a
 456    /// parent <c>gen_ai.*</c> activity exists, returns <c>created = null</c> and
 457    /// <c>target = parent</c> so callers enrich the parent span without creating a
 458    /// duplicate child. The caller must only dispose <c>created</c>, never <c>target</c>.
 459    /// </summary>
 460    private (Activity? Created, Activity? Target) StartChatActivity(string operationName)
 461    {
 337462        if (_metrics is null)
 463        {
 259464            return (null, null);
 465        }
 466
 78467        if (_activityMode == ChatCompletionActivityMode.EnrichParent)
 468        {
 7469            var parent = Activity.Current;
 7470            if (parent?.OperationName.StartsWith("gen_ai.", StringComparison.Ordinal) == true)
 471            {
 5472                return (Created: null, Target: parent);
 473            }
 474        }
 475
 73476        var created = _metrics.ActivitySource.StartActivity(operationName, ActivityKind.Client);
 73477        return (Created: created, Target: created);
 478    }
 479}