< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Progress.ChannelProgressReporter
Assembly: NexusLabs.Needlr.AgentFramework
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework/Progress/ChannelProgressReporter.cs
Line coverage
91%
Covered lines: 51
Uncovered lines: 5
Coverable lines: 56
Total lines: 149
Line coverage: 91%
Branch coverage
100%
Covered branches: 6
Total branches: 6
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%22100%
get_WorkflowId()100%11100%
get_AgentId()100%210%
get_Depth()100%11100%
NextSequence()100%11100%
Report(...)100%11100%
CreateChild(...)100%11100%
DisposeAsync()100%11100%
ConsumeAsync()100%4483.33%
.ctor(...)100%11100%
get_WorkflowId()100%11100%
get_AgentId()100%11100%
get_Depth()100%11100%
NextSequence()100%210%
Report(...)100%11100%
CreateChild(...)100%210%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework/Progress/ChannelProgressReporter.cs

#LineLine coverage
 1using System.Threading.Channels;
 2
 3namespace NexusLabs.Needlr.AgentFramework.Progress;
 4
 5/// <summary>
 6/// Non-blocking <see cref="IProgressReporter"/> that pushes events to a
 7/// <see cref="Channel{T}"/> and drains them to sinks on a background task.
 8/// Use this when sinks do I/O (database, network) and you don't want to
 9/// block the agent pipeline.
 10/// </summary>
 11/// <remarks>
 12/// <para>
 13/// <see cref="Report"/> writes to the channel and returns immediately.
 14/// A background consumer drains events to all sinks. Events are delivered
 15/// in order but asynchronously.
 16/// </para>
 17/// <para>
 18/// <see cref="CreateChild"/> returns a lightweight wrapper that shares
 19/// the parent's channel — no additional background tasks are created.
 20/// </para>
 21/// <para>
 22/// Call <see cref="DisposeAsync"/> to drain remaining events and stop
 23/// the background consumer.
 24/// </para>
 25/// </remarks>
 26[DoNotAutoRegister]
 27public sealed class ChannelProgressReporter : IProgressReporter, IAsyncDisposable
 28{
 29    private readonly Channel<IProgressEvent> _channel;
 30    private readonly IProgressSequence _sequence;
 31    private readonly IProgressReporterErrorHandler _errorHandler;
 32    private readonly Task _consumer;
 33
 34    /// <summary>
 35    /// Creates a channel-based reporter with the given sinks.
 36    /// Starts a background consumer immediately.
 37    /// </summary>
 638    public ChannelProgressReporter(
 639        string workflowId,
 640        IReadOnlyList<IProgressSink> sinks,
 641        IProgressSequence sequence,
 642        IProgressReporterErrorHandler? errorHandler = null,
 643        string? agentId = null,
 644        string? parentAgentId = null,
 645        int depth = 0,
 646        int capacity = 1000)
 47    {
 648        WorkflowId = workflowId;
 649        _sequence = sequence;
 650        _errorHandler = errorHandler ?? new NullProgressReporterErrorHandler();
 651        AgentId = agentId;
 652        Depth = depth;
 53
 654        _channel = Channel.CreateBounded<IProgressEvent>(new BoundedChannelOptions(capacity)
 655        {
 656            FullMode = BoundedChannelFullMode.Wait,
 657            SingleReader = true,
 658            SingleWriter = false,
 659        });
 60
 1261        _consumer = Task.Run(() => ConsumeAsync(sinks));
 662    }
 63
 64    /// <inheritdoc />
 165    public string WorkflowId { get; }
 66
 67    /// <inheritdoc />
 068    public string? AgentId { get; }
 69
 70    /// <inheritdoc />
 171    public int Depth { get; }
 72
 73    /// <inheritdoc />
 274    public long NextSequence() => _sequence.Next();
 75
 76    /// <inheritdoc />
 77    public void Report(IProgressEvent progressEvent)
 78    {
 1779        _channel.Writer.TryWrite(progressEvent);
 1780    }
 81
 82    /// <inheritdoc />
 83    public IProgressReporter CreateChild(string agentId) =>
 184        new ChannelChildReporter(this, agentId);
 85
 86    /// <summary>
 87    /// Completes the channel and waits for the background consumer to drain
 88    /// all remaining events to sinks.
 89    /// </summary>
 90    public async ValueTask DisposeAsync()
 91    {
 692        _channel.Writer.TryComplete();
 693        await _consumer;
 694    }
 95
 96    private async Task ConsumeAsync(IReadOnlyList<IProgressSink> sinks)
 97    {
 98        try
 99        {
 46100            await foreach (var evt in _channel.Reader.ReadAllAsync())
 101            {
 74102                for (int i = 0; i < sinks.Count; i++)
 103                {
 104                    try
 105                    {
 20106                        await sinks[i].OnEventAsync(evt, CancellationToken.None);
 19107                    }
 1108                    catch (Exception ex)
 109                    {
 1110                        _errorHandler.OnSinkException(sinks[i], evt, ex);
 1111                    }
 112                }
 17113            }
 6114        }
 0115        catch (OperationCanceledException)
 116        {
 117            // Shutdown
 0118        }
 6119    }
 120
 121    /// <summary>
 122    /// Lightweight child reporter that shares the parent's channel.
 123    /// No background task — writes go directly to the parent's channel.
 124    /// </summary>
 125    private sealed class ChannelChildReporter : IProgressReporter
 126    {
 127        private readonly ChannelProgressReporter _root;
 128
 1129        internal ChannelChildReporter(ChannelProgressReporter root, string agentId)
 130        {
 1131            _root = root;
 1132            WorkflowId = root.WorkflowId;
 1133            AgentId = agentId;
 1134            Depth = root.Depth + 1;
 1135        }
 136
 1137        public string WorkflowId { get; }
 1138        public string? AgentId { get; }
 1139        public int Depth { get; }
 140
 0141        public long NextSequence() => _root.NextSequence();
 142
 143        public void Report(IProgressEvent progressEvent) =>
 1144            _root.Report(progressEvent);
 145
 146        public IProgressReporter CreateChild(string agentId) =>
 0147            new ChannelChildReporter(_root, agentId);
 148    }
 149}