< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Workflows.Budget.TokenBudgetChatMiddleware
Assembly: NexusLabs.Needlr.AgentFramework.Workflows
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Budget/TokenBudgetChatMiddleware.cs
Line coverage
100%
Covered lines: 59
Uncovered lines: 0
Coverable lines: 59
Total lines: 140
Line coverage: 100%
Branch coverage
78%
Covered branches: 22
Total branches: 28
Branch coverage: 78.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
GetResponseAsync()100%1010100%
IsBudgetExceeded()80%1010100%
EmitBudgetUpdatedEvent()100%11100%
EmitBudgetExceededEvent()50%88100%
ThrowBudgetExceeded(...)100%11100%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Budget/TokenBudgetChatMiddleware.cs

#LineLine coverage
 1using Microsoft.Extensions.AI;
 2
 3using NexusLabs.Needlr.AgentFramework.Budget;
 4using NexusLabs.Needlr.AgentFramework.Progress;
 5
 6namespace NexusLabs.Needlr.AgentFramework.Workflows.Budget;
 7
 8/// <summary>
 9/// <see cref="DelegatingChatClient"/> that enforces token budget limits by aborting
 10/// when <see cref="ITokenBudgetTracker"/> thresholds are exceeded. Depends on
 11/// <see cref="TokenUsageRecordingMiddleware"/> (wired as an inner middleware) to
 12/// keep the tracker's token counts up to date.
 13/// </summary>
 14/// <remarks>
 15/// <para>
 16/// This middleware does NOT record token usage — that is handled by
 17/// <see cref="TokenUsageRecordingMiddleware"/>, which runs before this middleware
 18/// in the pipeline. Use <c>UsingTokenBudget()</c> to wire both correctly.
 19/// </para>
 20/// <para>
 21/// Budget enforcement uses two mechanisms:
 22/// <list type="number">
 23///   <item>
 24///     <see cref="OperationCanceledException"/> wrapping <see cref="TokenBudgetExceededException"/>
 25///     thrown from the middleware (works for direct agent runs).
 26///   </item>
 27///   <item>
 28///     <see cref="ITokenBudgetTracker.BudgetCancellationToken"/> cancelled when tokens are recorded
 29///     past the limit (works for MAF workflow runs — pass this token to the workflow).
 30///   </item>
 31/// </list>
 32/// </para>
 33/// </remarks>
 34public sealed class TokenBudgetChatMiddleware : DelegatingChatClient
 35{
 36    private readonly ITokenBudgetTracker _tracker;
 37    private readonly IProgressReporterAccessor _progressAccessor;
 38
 39    /// <param name="innerClient">The inner chat client to delegate to.</param>
 40    /// <param name="tracker">The token budget tracker scoped to the current pipeline run.</param>
 41    /// <param name="progressAccessor">Progress reporter accessor for emitting budget events.</param>
 42    public TokenBudgetChatMiddleware(
 43        IChatClient innerClient,
 44        ITokenBudgetTracker tracker,
 45        IProgressReporterAccessor progressAccessor)
 846        : base(innerClient)
 47    {
 848        ArgumentNullException.ThrowIfNull(tracker);
 749        ArgumentNullException.ThrowIfNull(progressAccessor);
 750        _tracker = tracker;
 751        _progressAccessor = progressAccessor;
 752    }
 53
 54    /// <inheritdoc />
 55    public override async Task<ChatResponse> GetResponseAsync(
 56        IEnumerable<ChatMessage> messages,
 57        ChatOptions? options,
 58        CancellationToken cancellationToken)
 59    {
 60        // Pre-call gate: abort if any budget already exhausted.
 761        if (IsBudgetExceeded())
 62        {
 163            EmitBudgetExceededEvent();
 164            ThrowBudgetExceeded(_tracker.CurrentTokens, _tracker.MaxTokens ?? 0);
 65        }
 66
 667        var response = await base.GetResponseAsync(messages, options, cancellationToken)
 668            .ConfigureAwait(false);
 69
 70        // Post-call check: the recording middleware (inner) has already updated
 71        // the tracker. Check if any limit was exceeded.
 672        if (IsBudgetExceeded())
 73        {
 274            EmitBudgetExceededEvent();
 275            ThrowBudgetExceeded(_tracker.CurrentTokens, _tracker.MaxTokens ?? 0);
 76        }
 77
 78        // Emit an update event if a budget scope is active
 479        if (_tracker.MaxTokens.HasValue || _tracker.MaxInputTokens.HasValue || _tracker.MaxOutputTokens.HasValue)
 80        {
 381            EmitBudgetUpdatedEvent();
 82        }
 83
 484        return response;
 485    }
 86
 87    private bool IsBudgetExceeded() =>
 1388        (_tracker.MaxTokens.HasValue && _tracker.CurrentTokens >= _tracker.MaxTokens.Value) ||
 1389        (_tracker.MaxInputTokens.HasValue && _tracker.CurrentInputTokens >= _tracker.MaxInputTokens.Value) ||
 1390        (_tracker.MaxOutputTokens.HasValue && _tracker.CurrentOutputTokens >= _tracker.MaxOutputTokens.Value);
 91
 92    private void EmitBudgetUpdatedEvent()
 93    {
 394        var reporter = _progressAccessor.Current;
 395        reporter.Report(new BudgetUpdatedEvent(
 396            Timestamp: DateTimeOffset.UtcNow,
 397            WorkflowId: reporter.WorkflowId,
 398            AgentId: reporter.AgentId,
 399            ParentAgentId: null,
 3100            Depth: reporter.Depth,
 3101            SequenceNumber: _progressAccessor.Current.NextSequence(),
 3102            CurrentInputTokens: _tracker.CurrentInputTokens,
 3103            CurrentOutputTokens: _tracker.CurrentOutputTokens,
 3104            CurrentTotalTokens: _tracker.CurrentTokens,
 3105            MaxInputTokens: _tracker.MaxInputTokens,
 3106            MaxOutputTokens: _tracker.MaxOutputTokens,
 3107            MaxTotalTokens: _tracker.MaxTokens));
 3108    }
 109
 110    private void EmitBudgetExceededEvent()
 111    {
 3112        var reporter = _progressAccessor.Current;
 3113        var (limitType, current, max) =
 3114            _tracker.MaxInputTokens.HasValue && _tracker.CurrentInputTokens >= _tracker.MaxInputTokens.Value
 3115                ? ("input", _tracker.CurrentInputTokens, _tracker.MaxInputTokens.Value)
 3116            : _tracker.MaxOutputTokens.HasValue && _tracker.CurrentOutputTokens >= _tracker.MaxOutputTokens.Value
 3117                ? ("output", _tracker.CurrentOutputTokens, _tracker.MaxOutputTokens.Value)
 3118            : ("total", _tracker.CurrentTokens, _tracker.MaxTokens ?? 0);
 119
 3120        reporter.Report(new BudgetExceededEvent(
 3121            Timestamp: DateTimeOffset.UtcNow,
 3122            WorkflowId: reporter.WorkflowId,
 3123            AgentId: reporter.AgentId,
 3124            ParentAgentId: null,
 3125            Depth: reporter.Depth,
 3126            SequenceNumber: _progressAccessor.Current.NextSequence(),
 3127            LimitType: limitType,
 3128            CurrentValue: current,
 3129            MaxValue: max));
 3130    }
 131
 132    private void ThrowBudgetExceeded(long currentTokens, long maxTokens)
 133    {
 3134        var budgetException = new TokenBudgetExceededException(currentTokens, maxTokens);
 3135        throw new OperationCanceledException(
 3136            budgetException.Message,
 3137            budgetException,
 3138            _tracker.BudgetCancellationToken);
 139    }
 140}