< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Providers.TieredProviderSelector<T1, T2>
Assembly: NexusLabs.Needlr.AgentFramework
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework/Providers/TieredProviderSelector.cs
Line coverage
100%
Covered lines: 71
Uncovered lines: 0
Coverable lines: 71
Total lines: 208
Line coverage: 100%
Branch coverage
100%
Covered branches: 38
Total branches: 38
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%1212100%
.cctor()100%22100%
ExecuteAsync()100%2222100%
ComputeSkipUntil(...)100%22100%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework/Providers/TieredProviderSelector.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2
 3using NexusLabs.Needlr.AgentFramework.Context;
 4
 5namespace NexusLabs.Needlr.AgentFramework.Providers;
 6
 7/// <summary>
 8/// Default <see cref="ITieredProviderSelector{TQuery, TResult}"/> that iterates providers
 9/// in ascending <see cref="ITieredProvider{TQuery, TResult}.Priority"/> order, gated by
 10/// an <see cref="IQuotaGate"/>. Exception handling is configurable via
 11/// <see cref="TieredProviderSelectorOptions.FailurePolicies"/>; the default options
 12/// (<see cref="TieredProviderSelectorOptions.Default"/>) preserve the framework's
 13/// historical behaviour of falling through to the next provider on
 14/// <see cref="ProviderUnavailableException"/>.
 15/// </summary>
 16/// <remarks>
 17/// <para>
 18/// The quota partition key is resolved from the ambient
 19/// <see cref="IAgentExecutionContextAccessor"/> using a <see cref="QuotaPartitionSelector"/>.
 20/// By default, <see cref="IAgentExecutionContext.UserId"/> is used as the partition.
 21/// Consumers that need a different partitioning strategy (e.g., tenant ID, API key)
 22/// can provide a custom <see cref="QuotaPartitionSelector"/> via the constructor.
 23/// </para>
 24/// <para>
 25/// When no execution context is active (e.g., during integration tests that don't
 26/// establish a scope), the partition is <see langword="null"/> and quota is global.
 27/// </para>
 28/// <para>
 29/// <b>Failure policies and skip cache.</b> When a provider throws,
 30/// <see cref="TieredProviderSelectorOptions.FailurePolicies"/> are evaluated in order
 31/// against the thrown exception (first match wins). A matching policy causes the
 32/// selector to fall through to the next provider; if the policy specifies a
 33/// <see cref="ProviderFailurePolicy.SkipDuration"/>, an entry is added to a per-instance
 34/// in-memory skip cache so subsequent calls bypass the failing provider until the
 35/// skip-until timestamp elapses. The cache is a thread-safe
 36/// <see cref="ConcurrentDictionary{TKey, TValue}"/> keyed by provider name. Skip state
 37/// is per-selector-instance and lives only in the host process; it is not persisted.
 38/// </para>
 39/// <para>
 40/// <b>Quota release.</b> Quota release happens in a single <see langword="finally"/>
 41/// block so it runs on the success path, on a matched-policy fall-through, on an
 42/// unmatched-exception re-throw, and even when a
 43/// <see cref="ProviderFailurePolicy.OnHit"/> callback throws. The release records
 44/// <c>succeeded: true</c> only when the provider returned a value.
 45/// </para>
 46/// <para>
 47/// <b>Cancellation.</b> When the active <see cref="CancellationToken"/> is cancelled,
 48/// the selector skips policy evaluation entirely and propagates the
 49/// <see cref="OperationCanceledException"/> directly. Cancelled calls do not apply
 50/// skip mode and do not fall through to the next provider.
 51/// </para>
 52/// </remarks>
 53[DoNotAutoRegister]
 54public sealed class TieredProviderSelector<TQuery, TResult> : ITieredProviderSelector<TQuery, TResult>
 55{
 56    private readonly IReadOnlyList<ITieredProvider<TQuery, TResult>> _providers;
 57    private readonly IQuotaGate _quotaGate;
 58    private readonly IAgentExecutionContextAccessor _contextAccessor;
 59    private readonly QuotaPartitionSelector _partitionSelector;
 60    private readonly TieredProviderSelectorOptions _options;
 61    private readonly TimeProvider _timeProvider;
 5462    private readonly ConcurrentDictionary<string, DateTimeOffset> _skipUntil =
 5463        new(StringComparer.OrdinalIgnoreCase);
 64
 65    /// <summary>
 66    /// The default partition selector: uses <see cref="IAgentExecutionContext.UserId"/>
 67    /// from the ambient context.
 68    /// </summary>
 269    public static readonly QuotaPartitionSelector DefaultPartitionSelector =
 5470        context => context?.UserId;
 71
 72    /// <param name="providers">All registered providers (filtering and ordering is handled internally).</param>
 73    /// <param name="quotaGate">Quota gate for reservation/release. Use <see cref="AlwaysGrantQuotaGate"/> for no-op.</p
 74    /// <param name="contextAccessor">Accessor for ambient execution context (provides partition identity).</param>
 75    /// <param name="partitionSelector">
 76    /// Custom partition selector. Defaults to <see cref="DefaultPartitionSelector"/>
 77    /// (<see cref="IAgentExecutionContext.UserId"/>).
 78    /// </param>
 79    /// <param name="options">
 80    /// Failure-handling policy configuration. Defaults to
 81    /// <see cref="TieredProviderSelectorOptions.Default"/>, which falls through on
 82    /// <see cref="ProviderUnavailableException"/> with no skip and no callback.
 83    /// </param>
 84    /// <param name="timeProvider">
 85    /// Time source used for skip-cache "now" comparisons. Defaults to
 86    /// <see cref="TimeProvider.System"/>. Inject a fake time provider in tests to
 87    /// drive deterministic skip-mode behaviour.
 88    /// </param>
 5489    public TieredProviderSelector(
 5490        IEnumerable<ITieredProvider<TQuery, TResult>> providers,
 5491        IQuotaGate quotaGate,
 5492        IAgentExecutionContextAccessor contextAccessor,
 5493        QuotaPartitionSelector? partitionSelector = null,
 5494        TieredProviderSelectorOptions? options = null,
 5495        TimeProvider? timeProvider = null)
 96    {
 5497        ArgumentNullException.ThrowIfNull(providers);
 5398        ArgumentNullException.ThrowIfNull(quotaGate);
 5299        ArgumentNullException.ThrowIfNull(contextAccessor);
 100
 51101        _providers = providers
 80102            .Where(p => p.IsEnabled)
 61103            .OrderBy(p => p.Priority)
 61104            .ThenBy(p => p.Name, StringComparer.OrdinalIgnoreCase)
 51105            .ToList();
 106
 51107        _quotaGate = quotaGate;
 51108        _contextAccessor = contextAccessor;
 51109        _partitionSelector = partitionSelector ?? DefaultPartitionSelector;
 51110        _options = options ?? TieredProviderSelectorOptions.Default;
 51111        _timeProvider = timeProvider ?? TimeProvider.System;
 51112    }
 113
 114    /// <inheritdoc />
 115    public async Task<TResult> ExecuteAsync(TQuery query, CancellationToken cancellationToken)
 116    {
 58117        if (_providers.Count == 0)
 5118            throw new NoProvidersRegisteredException();
 119
 53120        var partition = _partitionSelector(_contextAccessor.Current);
 53121        var attempts = new List<string>();
 122
 229123        foreach (var provider in _providers)
 124        {
 84125            var now = _timeProvider.GetUtcNow();
 84126            if (_skipUntil.TryGetValue(provider.Name, out var skipUntilCached) &&
 84127                skipUntilCached > now)
 128            {
 4129                attempts.Add(skipUntilCached == DateTimeOffset.MaxValue
 4130                    ? $"{provider.Name}: skipped indefinitely"
 4131                    : $"{provider.Name}: skipped until {skipUntilCached:o}");
 4132                continue;
 133            }
 134
 80135            if (!await _quotaGate.TryReserveAsync(provider.Name, partition, cancellationToken)
 80136                .ConfigureAwait(false))
 137            {
 4138                attempts.Add($"{provider.Name}: quota denied");
 4139                continue;
 140            }
 141
 76142            var succeeded = false;
 143            try
 144            {
 76145                var result = await provider.ExecuteAsync(query, cancellationToken)
 76146                    .ConfigureAwait(false);
 40147                succeeded = true;
 40148                return result;
 149            }
 36150            catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
 151            {
 152                // Filter intent: cancellation MUST NOT participate in policy matching.
 153                // If the active token is cancelled, fall through this catch entirely so
 154                // OperationCanceledException propagates raw, no skip cache entry is
 155                // added, and subsequent providers are not attempted. A future maintainer
 156                // who "improves" this filter to e.g. `when (ex is not OperationCanceledException)`
 157                // would silently let cancellation participate in policy matching, which
 158                // breaks linked-CTS patterns where a sibling cancels the in-flight call.
 35159                ProviderFailurePolicy? matched = null;
 118160                foreach (var policy in _options.FailurePolicies)
 161                {
 40162                    if (policy.Match(ex))
 163                    {
 32164                        matched = policy;
 32165                        break;
 166                    }
 167                }
 168
 35169                if (matched is null)
 170                {
 3171                    throw;
 172                }
 173
 32174                attempts.Add($"{provider.Name}: {ex.Message}");
 175
 32176                DateTimeOffset? skipUntil = null;
 32177                if (matched.SkipDuration is { } duration)
 178                {
 9179                    skipUntil = ComputeSkipUntil(now, duration);
 9180                    _skipUntil[provider.Name] = skipUntil.Value;
 181                }
 182
 32183                if (matched.OnHit is { } onHit)
 184                {
 6185                    await onHit(new ProviderFailureContext(provider.Name, ex, skipUntil))
 6186                        .ConfigureAwait(false);
 187                }
 188            }
 189            finally
 190            {
 76191                await _quotaGate
 76192                    .ReleaseAsync(provider.Name, partition, succeeded, cancellationToken)
 76193                    .ConfigureAwait(false);
 194            }
 31195        }
 196
 8197        throw new AllProvidersFailedException(attempts);
 40198    }
 199
 200    private static DateTimeOffset ComputeSkipUntil(DateTimeOffset now, TimeSpan duration)
 201    {
 9202        if (DateTimeOffset.MaxValue - now <= duration)
 203        {
 2204            return DateTimeOffset.MaxValue;
 205        }
 7206        return now + duration;
 207    }
 208}