< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Workflows.Sequential.SequentialPipelineRunner
Assembly: NexusLabs.Needlr.AgentFramework.Workflows
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Sequential/SequentialPipelineRunner.cs
Line coverage
86%
Covered lines: 435
Uncovered lines: 68
Coverable lines: 503
Total lines: 876
Line coverage: 86.4%
Branch coverage
82%
Covered branches: 146
Total branches: 176
Branch coverage: 82.9%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
RunAsync(...)100%11100%
RunAsync(...)100%11100%
RunCoreAsync()87.5%646498.98%
RunPhasedAsync(...)100%11100%
RunPhasedAsync(...)100%11100%
RunPhasedCoreAsync()75.6%1958274.41%
ReportCompleted(...)100%11100%
ResolvePipelineName(...)100%44100%
StartPipelineScope(...)100%22100%
ReportPipelineCompletion(...)83.33%66100%
StartStageScope(...)100%88100%
EmitStageMetricsAndDisposeActivity(...)90%1010100%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Workflows/Sequential/SequentialPipelineRunner.cs

#LineLine coverage
 1using System.Diagnostics;
 2
 3using Microsoft.Extensions.AI;
 4
 5using NexusLabs.Needlr.AgentFramework.Budget;
 6using NexusLabs.Needlr.AgentFramework.Diagnostics;
 7using NexusLabs.Needlr.AgentFramework.Progress;
 8using NexusLabs.Needlr.AgentFramework.Workspace;
 9
 10namespace NexusLabs.Needlr.AgentFramework.Workflows.Sequential;
 11
 12/// <summary>
 13/// Executes a linear sequence of <see cref="PipelineStage"/> instances,
 14/// evaluating policies (skip, retry, budget) and producing an
 15/// <see cref="IPipelineRunResult"/> with per-stage diagnostics.
 16/// </summary>
 17/// <remarks>
 18/// <para>
 19/// This runner is a peer of <see cref="GraphWorkflowRunner"/> for linear pipelines.
 20/// It supports hybrid agent/programmatic stages via <see cref="IStageExecutor"/>,
 21/// conditional skipping, post-validation with retries, per-stage and overall token
 22/// budgets, and structured progress reporting.
 23/// </para>
 24/// </remarks>
 25/// <example>
 26/// <code>
 27/// var runner = new SequentialPipelineRunner(diagnosticsAccessor, budgetTracker, progressFactory, pipelineMetrics);
 28/// var stages = new[]
 29/// {
 30///     new PipelineStage("Writer", new AgentStageExecutor(writerAgent, ctx =&gt; "Write a draft.")),
 31///     new PipelineStage("Editor", new AgentStageExecutor(editorAgent, ctx =&gt; "Edit the draft.")),
 32/// };
 33/// var result = await runner.RunAsync(workspace, stages, options: null, cancellationToken);
 34/// </code>
 35/// </example>
 36[DoNotAutoRegister]
 37public sealed class SequentialPipelineRunner
 38{
 39    private readonly IAgentDiagnosticsAccessor _diagnosticsAccessor;
 40    private readonly ITokenBudgetTracker _budgetTracker;
 41    private readonly IProgressReporterFactory _progressReporterFactory;
 42    private readonly IPipelineMetrics _pipelineMetrics;
 43
 44    /// <summary>
 45    /// Initializes a new <see cref="SequentialPipelineRunner"/>.
 46    /// </summary>
 47    /// <param name="diagnosticsAccessor">Accessor for capturing per-stage agent diagnostics.</param>
 48    /// <param name="budgetTracker">Token budget tracker for scoping per-stage and pipeline-level budgets.</param>
 49    /// <param name="progressReporterFactory">Factory for creating progress reporters.</param>
 50    /// <param name="pipelineMetrics">
 51    /// Pipeline-shape metrics sink used to emit per-pipeline and per-stage instruments
 52    /// + spans. Resolved from DI; defaults to <see cref="NoOpPipelineMetrics"/> when no
 53    /// <see cref="PipelineMetricsOptions"/> was configured via
 54    /// <c>ConfigurePipelineMetrics</c> on the agent-framework syringe — observability
 55    /// is opt-in with zero overhead by default.
 56    /// </param>
 8157    public SequentialPipelineRunner(
 8158        IAgentDiagnosticsAccessor diagnosticsAccessor,
 8159        ITokenBudgetTracker budgetTracker,
 8160        IProgressReporterFactory progressReporterFactory,
 8161        IPipelineMetrics pipelineMetrics)
 62    {
 8163        _diagnosticsAccessor = diagnosticsAccessor;
 8164        _budgetTracker = budgetTracker;
 8165        _progressReporterFactory = progressReporterFactory;
 8166        _pipelineMetrics = pipelineMetrics;
 8167    }
 68
 69    /// <summary>
 70    /// Runs all pipeline stages sequentially, applying policies and collecting results.
 71    /// </summary>
 72    /// <param name="workspace">The shared workspace for file I/O across stages.</param>
 73    /// <param name="stages">The ordered list of stages to execute.</param>
 74    /// <param name="options">Optional pipeline-level configuration.</param>
 75    /// <param name="cancellationToken">Token to observe for cancellation.</param>
 76    /// <returns>An <see cref="IPipelineRunResult"/> describing the pipeline outcome.</returns>
 77    public Task<IPipelineRunResult> RunAsync(
 78        IWorkspace workspace,
 79        IReadOnlyList<PipelineStage> stages,
 80        SequentialPipelineOptions? options,
 81        CancellationToken cancellationToken) =>
 4582        RunCoreAsync(workspace, stages, pipelineState: null, options, cancellationToken);
 83
 84    /// <summary>
 85    /// Runs all pipeline stages sequentially with a shared typed state object,
 86    /// applying policies and collecting results.
 87    /// </summary>
 88    /// <typeparam name="TState">The type of the shared pipeline state.</typeparam>
 89    /// <param name="workspace">The shared workspace for file I/O across stages.</param>
 90    /// <param name="stages">The ordered list of stages to execute.</param>
 91    /// <param name="state">A shared state object accessible to all stages via
 92    /// <see cref="StageExecutionContext.GetRequiredState{T}"/>.</param>
 93    /// <param name="options">Optional pipeline-level configuration.</param>
 94    /// <param name="cancellationToken">Token to observe for cancellation.</param>
 95    /// <returns>An <see cref="IPipelineRunResult"/> describing the pipeline outcome.</returns>
 96    public Task<IPipelineRunResult> RunAsync<TState>(
 97        IWorkspace workspace,
 98        IReadOnlyList<PipelineStage> stages,
 99        TState state,
 100        SequentialPipelineOptions? options,
 101        CancellationToken cancellationToken) where TState : class =>
 2102        RunCoreAsync(workspace, stages, state, options, cancellationToken);
 103
 104    private async Task<IPipelineRunResult> RunCoreAsync(
 105        IWorkspace workspace,
 106        IReadOnlyList<PipelineStage> stages,
 107        object? pipelineState,
 108        SequentialPipelineOptions? options,
 109        CancellationToken cancellationToken)
 110    {
 47111        var stopwatch = Stopwatch.StartNew();
 47112        var reporter = _progressReporterFactory.Create(Guid.NewGuid().ToString("N"));
 47113        var stageResults = new List<IAgentStageResult>();
 47114        var pipelineName = ResolvePipelineName(options, reporter);
 115
 47116        reporter.Report(new WorkflowStartedEvent(
 47117            DateTimeOffset.UtcNow,
 47118            reporter.WorkflowId,
 47119            reporter.AgentId,
 47120            ParentAgentId: null,
 47121            reporter.Depth,
 47122            reporter.NextSequence()));
 123
 47124        var pipelineActivity = StartPipelineScope(pipelineName);
 47125        IDisposable? pipelineBudgetScope = null;
 126        try
 127        {
 47128            if (options?.TotalTokenBudget is { } totalBudget)
 129            {
 0130                pipelineBudgetScope = _budgetTracker.BeginScope(totalBudget);
 131            }
 132
 198133            for (var i = 0; i < stages.Count; i++)
 134            {
 65135                cancellationToken.ThrowIfCancellationRequested();
 136
 64137                var stage = stages[i];
 64138                var policy = stage.Policy;
 139
 64140                var context = new StageExecutionContext(
 64141                    workspace,
 64142                    _diagnosticsAccessor,
 64143                    reporter,
 64144                    StageIndex: i,
 64145                    TotalStages: stages.Count,
 64146                    StageName: stage.Name,
 64147                    CallerCancellationToken: cancellationToken,
 64148                    PipelineState: pipelineState);
 149
 150                // Evaluate ShouldSkip
 64151                if (policy?.ShouldSkip?.Invoke(context) == true)
 152                {
 5153                    var skipResult = new AgentStageResult(
 5154                        stage.Name,
 5155                        FinalResponse: null,
 5156                        Diagnostics: null,
 5157                        Outcome: StageOutcome.Skipped,
 5158                        Termination: new StageTermination.Skipped());
 5159                    stageResults.Add(skipResult);
 5160                    _pipelineMetrics.RecordStageCompleted(pipelineName, skipResult, TimeSpan.Zero);
 5161                    continue;
 162                }
 163
 59164                var (stageStopwatch, stageActivity) = StartStageScope(pipelineName, stage.Name, phaseName: null);
 165
 59166                reporter.Report(new AgentInvokedEvent(
 59167                    DateTimeOffset.UtcNow,
 59168                    reporter.WorkflowId,
 59169                    stage.Name,
 59170                    ParentAgentId: null,
 59171                    reporter.Depth,
 59172                    reporter.NextSequence(),
 59173                    stage.Name));
 174
 59175                var maxAttempts = policy?.MaxAttempts ?? 1;
 59176                StageExecutionResult? stageResult = null;
 59177                string? validationError = null;
 178
 59179                IDisposable? stageBudgetScope = null;
 180                try
 181                {
 59182                    if (policy?.TokenBudget is { } stageBudget)
 183                    {
 0184                        stageBudgetScope = _budgetTracker.BeginChildScope(stage.Name, stageBudget);
 185                    }
 186
 126187                    for (var attempt = 0; attempt < maxAttempts; attempt++)
 188                    {
 62189                        cancellationToken.ThrowIfCancellationRequested();
 62190                        stageResult = await stage.Executor.ExecuteAsync(context, cancellationToken);
 191
 56192                        if (policy?.PostValidation is { } validate)
 193                        {
 5194                            validationError = validate(stageResult);
 5195                            if (validationError is null)
 196                            {
 197                                break;
 198                            }
 199
 200                            // Last attempt failed — will throw after loop
 4201                            if (attempt < maxAttempts - 1)
 202                            {
 3203                                validationError = null;
 204                            }
 205                        }
 206                        else
 207                        {
 208                            break;
 209                        }
 210                    }
 53211                }
 6212                catch (Exception ex)
 213                {
 214                    // Always record the failed stage so it appears in diagnostics.
 215                    // Capture any partial diagnostics the stage may have produced.
 6216                    var partialDiag = _diagnosticsAccessor.LastRunDiagnostics;
 6217                    var failedStageResult = new AgentStageResult(
 6218                        stage.Name,
 6219                        FinalResponse: null,
 6220                        Diagnostics: partialDiag,
 6221                        Outcome: StageOutcome.Failed,
 6222                        Termination: new StageTermination.Failed(ex));
 6223                    stageResults.Add(failedStageResult);
 6224                    EmitStageMetricsAndDisposeActivity(pipelineName, failedStageResult, stageStopwatch, stageActivity);
 225
 6226                    reporter.Report(new AgentFailedEvent(
 6227                        DateTimeOffset.UtcNow,
 6228                        reporter.WorkflowId,
 6229                        stage.Name,
 6230                        ParentAgentId: null,
 6231                        reporter.Depth,
 6232                        reporter.NextSequence(),
 6233                        AgentName: stage.Name,
 6234                        ErrorMessage: ex.Message));
 235
 6236                    throw;
 237                }
 238                finally
 239                {
 59240                    stageBudgetScope?.Dispose();
 241                }
 242
 53243                if (stageResult is not null && policy?.AfterExecution is { } afterExec)
 244                {
 3245                    await afterExec(stageResult, context);
 246                }
 247
 53248                if (validationError is not null)
 249                {
 1250                    throw new StageValidationException(stage.Name, validationError);
 251                }
 252
 253                // Handle explicit failure results from the stage executor.
 52254                if (!stageResult!.Succeeded)
 255                {
 9256                    var failedExecResult = new AgentStageResult(
 9257                        stage.Name,
 9258                        FinalResponse: null,
 9259                        Diagnostics: stageResult.Diagnostics,
 9260                        Outcome: StageOutcome.Failed,
 9261                        Termination: stageResult.Termination);
 9262                    stageResults.Add(failedExecResult);
 9263                    EmitStageMetricsAndDisposeActivity(pipelineName, failedExecResult, stageStopwatch, stageActivity);
 264
 9265                    reporter.Report(new AgentFailedEvent(
 9266                        DateTimeOffset.UtcNow,
 9267                        reporter.WorkflowId,
 9268                        stage.Name,
 9269                        ParentAgentId: null,
 9270                        reporter.Depth,
 9271                        reporter.NextSequence(),
 9272                        AgentName: stage.Name,
 9273                        ErrorMessage: stageResult.Exception?.Message ?? "Stage failed"));
 274
 9275                    if (stageResult.FailureDisposition == FailureDisposition.AbortPipeline)
 276                    {
 5277                        stopwatch.Stop();
 5278                        var errorMsg = stageResult.Exception?.Message
 5279                            ?? $"Stage '{stage.Name}' failed";
 5280                        ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, stopwatch.Elapsed, succeeded:
 5281                        return new PipelineRunResult(
 5282                            stageResults,
 5283                            stopwatch.Elapsed,
 5284                            succeeded: false,
 5285                            errorMessage: errorMsg,
 5286                            exception: stageResult.Exception,
 5287                            plannedStageCount: stages.Count);
 288                    }
 289
 290                    // ContinueAdvisory — proceed to the next stage.
 291                    continue;
 292                }
 293
 43294                ChatResponse? chatResponse = stageResult!.ResponseText is not null
 43295                    ? new ChatResponse(new ChatMessage(ChatRole.Assistant, stageResult.ResponseText))
 43296                    : null;
 297
 43298                var successResult = new AgentStageResult(
 43299                    stage.Name,
 43300                    chatResponse,
 43301                    stageResult.Diagnostics,
 43302                    Termination: stageResult.Termination);
 43303                stageResults.Add(successResult);
 43304                EmitStageMetricsAndDisposeActivity(pipelineName, successResult, stageStopwatch, stageActivity);
 305
 43306                reporter.Report(new AgentCompletedEvent(
 43307                    DateTimeOffset.UtcNow,
 43308                    reporter.WorkflowId,
 43309                    stage.Name,
 43310                    ParentAgentId: null,
 43311                    reporter.Depth,
 43312                    reporter.NextSequence(),
 43313                    stage.Name,
 43314                    Duration: stopwatch.Elapsed,
 43315                    TotalTokens: stageResult.Diagnostics?.AggregateTokenUsage.TotalTokens ?? 0));
 43316            }
 317
 34318            stopwatch.Stop();
 319
 34320            var pipelineResult = new PipelineRunResult(
 34321                stageResults,
 34322                stopwatch.Elapsed,
 34323                succeeded: true,
 34324                errorMessage: null,
 34325                plannedStageCount: stages.Count);
 326
 34327            if (options?.CompletionGate is { } gate)
 328            {
 1329                var gateError = gate(pipelineResult);
 1330                if (gateError is not null)
 331                {
 1332                    var failedResult = new PipelineRunResult(
 1333                        stageResults,
 1334                        stopwatch.Elapsed,
 1335                        succeeded: false,
 1336                        errorMessage: gateError,
 1337                        plannedStageCount: stages.Count);
 338
 1339                    ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, stopwatch.Elapsed, succeeded: fal
 1340                    return failedResult;
 341                }
 342            }
 343
 33344            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, stopwatch.Elapsed, succeeded: true, error
 33345            return pipelineResult;
 346        }
 4347        catch (OperationCanceledException ex) when (ex.InnerException is TokenBudgetExceededException budgetEx)
 348        {
 1349            stopwatch.Stop();
 1350            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, stopwatch.Elapsed, succeeded: false, budg
 1351            return new PipelineRunResult(
 1352                stageResults,
 1353                stopwatch.Elapsed,
 1354                succeeded: false,
 1355                errorMessage: budgetEx.Message,
 1356                exception: budgetEx,
 1357                plannedStageCount: stages.Count);
 358        }
 3359        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 360        {
 1361            stopwatch.Stop();
 1362            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, stopwatch.Elapsed, succeeded: false, "Can
 1363            throw;
 364        }
 2365        catch (OperationCanceledException ex)
 366        {
 367            // HTTP timeouts and other non-user cancellations — treat as stage failure,
 368            // not as user cancellation. HttpClient.Timeout throws TaskCanceledException
 369            // which is OperationCanceledException, but the caller's token is NOT cancelled.
 2370            stopwatch.Stop();
 2371            var message = ex.InnerException is TimeoutException
 2372                ? $"Stage timed out: {ex.InnerException.Message}"
 2373                : $"Operation cancelled (not by caller): {ex.Message}";
 2374            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, stopwatch.Elapsed, succeeded: false, mess
 2375            return new PipelineRunResult(
 2376                stageResults,
 2377                stopwatch.Elapsed,
 2378                succeeded: false,
 2379                errorMessage: message,
 2380                exception: ex,
 2381                plannedStageCount: stages.Count);
 382        }
 4383        catch (Exception ex)
 384        {
 4385            stopwatch.Stop();
 4386            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, stopwatch.Elapsed, succeeded: false, ex.M
 4387            return new PipelineRunResult(
 4388                stageResults,
 4389                stopwatch.Elapsed,
 4390                succeeded: false,
 4391                errorMessage: ex.Message,
 4392                exception: ex,
 4393                plannedStageCount: stages.Count);
 394        }
 395        finally
 396        {
 47397            pipelineBudgetScope?.Dispose();
 398        }
 46399    }
 400
 401    /// <summary>
 402    /// Runs a phased pipeline where stages are grouped into named phases with
 403    /// lifecycle hooks and optional phase-level token budgets.
 404    /// </summary>
 405    /// <param name="workspace">The shared workspace for file I/O across stages.</param>
 406    /// <param name="phases">The ordered list of phases, each containing stages.</param>
 407    /// <param name="options">Optional pipeline-level configuration.</param>
 408    /// <param name="cancellationToken">Token to observe for cancellation.</param>
 409    /// <returns>An <see cref="IPipelineRunResult"/> describing the pipeline outcome.</returns>
 410    public Task<IPipelineRunResult> RunPhasedAsync(
 411        IWorkspace workspace,
 412        IReadOnlyList<PipelinePhase> phases,
 413        SequentialPipelineOptions? options,
 414        CancellationToken cancellationToken) =>
 31415        RunPhasedCoreAsync(workspace, phases, pipelineState: null, options, cancellationToken);
 416
 417    /// <summary>
 418    /// Runs a phased pipeline with a shared typed state object accessible to both
 419    /// phase lifecycle hooks and stage executors.
 420    /// </summary>
 421    /// <typeparam name="TState">The type of the shared pipeline state.</typeparam>
 422    /// <param name="workspace">The shared workspace for file I/O across stages.</param>
 423    /// <param name="phases">The ordered list of phases, each containing stages.</param>
 424    /// <param name="state">A shared state object accessible via
 425    /// <see cref="PhaseContext.GetRequiredState{T}"/> and
 426    /// <see cref="StageExecutionContext.GetRequiredState{T}"/>.</param>
 427    /// <param name="options">Optional pipeline-level configuration.</param>
 428    /// <param name="cancellationToken">Token to observe for cancellation.</param>
 429    /// <returns>An <see cref="IPipelineRunResult"/> describing the pipeline outcome.</returns>
 430    public Task<IPipelineRunResult> RunPhasedAsync<TState>(
 431        IWorkspace workspace,
 432        IReadOnlyList<PipelinePhase> phases,
 433        TState state,
 434        SequentialPipelineOptions? options,
 435        CancellationToken cancellationToken) where TState : class =>
 1436        RunPhasedCoreAsync(workspace, phases, state, options, cancellationToken);
 437
 438    private async Task<IPipelineRunResult> RunPhasedCoreAsync(
 439        IWorkspace workspace,
 440        IReadOnlyList<PipelinePhase> phases,
 441        object? pipelineState,
 442        SequentialPipelineOptions? options,
 443        CancellationToken cancellationToken)
 444    {
 32445        var pipelineStopwatch = Stopwatch.StartNew();
 32446        var reporter = _progressReporterFactory.Create(Guid.NewGuid().ToString("N"));
 32447        var allStageResults = new List<IAgentStageResult>();
 80448        var totalStages = phases.Sum(p => p.Stages.Count);
 32449        var globalStageIndex = 0;
 32450        var pipelineName = ResolvePipelineName(options, reporter);
 451
 32452        reporter.Report(new WorkflowStartedEvent(
 32453            DateTimeOffset.UtcNow,
 32454            reporter.WorkflowId,
 32455            reporter.AgentId,
 32456            ParentAgentId: null,
 32457            reporter.Depth,
 32458            reporter.NextSequence()));
 459
 32460        var pipelineActivity = StartPipelineScope(pipelineName);
 32461        IDisposable? pipelineBudgetScope = null;
 462        try
 463        {
 32464            if (options?.TotalTokenBudget is { } totalBudget)
 465            {
 1466                pipelineBudgetScope = _budgetTracker.BeginScope(totalBudget);
 467            }
 468
 154469            for (var phaseIndex = 0; phaseIndex < phases.Count; phaseIndex++)
 470            {
 48471                cancellationToken.ThrowIfCancellationRequested();
 472
 48473                var phase = phases[phaseIndex];
 48474                var phasePolicy = phase.Policy;
 48475                var phaseStopwatch = Stopwatch.StartNew();
 48476                var phaseSucceeded = true;
 477
 48478                reporter.Report(new PhaseStartedEvent(
 48479                    DateTimeOffset.UtcNow,
 48480                    reporter.WorkflowId,
 48481                    reporter.AgentId,
 48482                    ParentAgentId: null,
 48483                    reporter.Depth,
 48484                    reporter.NextSequence(),
 48485                    phase.Name,
 48486                    phaseIndex,
 48487                    phases.Count,
 48488                    phase.Stages.Count));
 489
 48490                var phaseContext = new PhaseContext(
 48491                    phase.Name,
 48492                    phaseIndex,
 48493                    phases.Count,
 48494                    workspace,
 48495                    pipelineState);
 496
 48497                IDisposable? phaseBudgetScope = null;
 498                try
 499                {
 48500                    if (phasePolicy?.TokenBudget is { } phaseBudget)
 501                    {
 3502                        phaseBudgetScope = _budgetTracker.BeginScope(phaseBudget);
 503                    }
 504
 48505                    if (phasePolicy?.OnEnterAsync is { } onEnter)
 506                    {
 10507                        await onEnter(phaseContext, cancellationToken);
 508                    }
 509
 202510                    for (var stageInPhase = 0; stageInPhase < phase.Stages.Count; stageInPhase++)
 511                    {
 56512                        cancellationToken.ThrowIfCancellationRequested();
 513
 56514                        var stage = phase.Stages[stageInPhase];
 56515                        var policy = stage.Policy;
 516
 56517                        var context = new StageExecutionContext(
 56518                            workspace,
 56519                            _diagnosticsAccessor,
 56520                            reporter,
 56521                            StageIndex: globalStageIndex,
 56522                            TotalStages: totalStages,
 56523                            StageName: stage.Name,
 56524                            CallerCancellationToken: cancellationToken,
 56525                            PipelineState: pipelineState,
 56526                            PhaseName: phase.Name,
 56527                            PhaseIndex: phaseIndex,
 56528                            StageIndexInPhase: stageInPhase,
 56529                            TotalStagesInPhase: phase.Stages.Count);
 530
 56531                        if (policy?.ShouldSkip?.Invoke(context) == true)
 532                        {
 3533                            var skipResult = new AgentStageResult(
 3534                                stage.Name,
 3535                                FinalResponse: null,
 3536                                Diagnostics: null,
 3537                                Outcome: StageOutcome.Skipped,
 3538                                PhaseName: phase.Name,
 3539                                Termination: new StageTermination.Skipped());
 3540                            allStageResults.Add(skipResult);
 3541                            _pipelineMetrics.RecordStageCompleted(pipelineName, skipResult, TimeSpan.Zero);
 3542                            globalStageIndex++;
 3543                            continue;
 544                        }
 545
 53546                        var (stageStopwatch, stageActivity) = StartStageScope(pipelineName, stage.Name, phase.Name);
 547
 53548                        reporter.Report(new AgentInvokedEvent(
 53549                            DateTimeOffset.UtcNow,
 53550                            reporter.WorkflowId,
 53551                            stage.Name,
 53552                            ParentAgentId: null,
 53553                            reporter.Depth,
 53554                            reporter.NextSequence(),
 53555                            stage.Name));
 556
 53557                        var maxAttempts = policy?.MaxAttempts ?? 1;
 53558                        StageExecutionResult? stageResult = null;
 53559                        string? validationError = null;
 560
 53561                        IDisposable? stageBudgetScope = null;
 562                        try
 563                        {
 53564                            if (policy?.TokenBudget is { } stageBudget)
 565                            {
 1566                                stageBudgetScope = _budgetTracker.BeginChildScope(stage.Name, stageBudget);
 567                            }
 568
 106569                            for (var attempt = 0; attempt < maxAttempts; attempt++)
 570                            {
 53571                                cancellationToken.ThrowIfCancellationRequested();
 53572                                stageResult = await stage.Executor.ExecuteAsync(context, cancellationToken);
 573
 51574                                if (policy?.PostValidation is { } validate)
 575                                {
 0576                                    validationError = validate(stageResult);
 0577                                    if (validationError is null)
 578                                    {
 579                                        break;
 580                                    }
 581
 0582                                    if (attempt < maxAttempts - 1)
 583                                    {
 0584                                        validationError = null;
 585                                    }
 586                                }
 587                                else
 588                                {
 589                                    break;
 590                                }
 591                            }
 51592                        }
 2593                        catch (Exception ex)
 594                        {
 2595                            var partialDiag = _diagnosticsAccessor.LastRunDiagnostics;
 2596                            var failedPhasedResult = new AgentStageResult(
 2597                                stage.Name,
 2598                                FinalResponse: null,
 2599                                Diagnostics: partialDiag,
 2600                                Outcome: StageOutcome.Failed,
 2601                                PhaseName: phase.Name,
 2602                                Termination: new StageTermination.Failed(ex));
 2603                            allStageResults.Add(failedPhasedResult);
 2604                            EmitStageMetricsAndDisposeActivity(pipelineName, failedPhasedResult, stageStopwatch, stageAc
 605
 2606                            reporter.Report(new AgentFailedEvent(
 2607                                DateTimeOffset.UtcNow,
 2608                                reporter.WorkflowId,
 2609                                stage.Name,
 2610                                ParentAgentId: null,
 2611                                reporter.Depth,
 2612                                reporter.NextSequence(),
 2613                                AgentName: stage.Name,
 2614                                ErrorMessage: ex.Message));
 615
 2616                            throw;
 617                        }
 618                        finally
 619                        {
 53620                            stageBudgetScope?.Dispose();
 621                        }
 622
 51623                        if (stageResult is not null && policy?.AfterExecution is { } afterExec)
 624                        {
 0625                            await afterExec(stageResult, context);
 626                        }
 627
 51628                        if (validationError is not null)
 629                        {
 0630                            throw new StageValidationException(stage.Name, validationError);
 631                        }
 632
 51633                        if (!stageResult!.Succeeded)
 634                        {
 0635                            var failedPhasedExecResult = new AgentStageResult(
 0636                                stage.Name,
 0637                                FinalResponse: null,
 0638                                Diagnostics: stageResult.Diagnostics,
 0639                                Outcome: StageOutcome.Failed,
 0640                                PhaseName: phase.Name,
 0641                                Termination: stageResult.Termination);
 0642                            allStageResults.Add(failedPhasedExecResult);
 0643                            EmitStageMetricsAndDisposeActivity(pipelineName, failedPhasedExecResult, stageStopwatch, sta
 644
 0645                            reporter.Report(new AgentFailedEvent(
 0646                                DateTimeOffset.UtcNow,
 0647                                reporter.WorkflowId,
 0648                                stage.Name,
 0649                                ParentAgentId: null,
 0650                                reporter.Depth,
 0651                                reporter.NextSequence(),
 0652                                AgentName: stage.Name,
 0653                                ErrorMessage: stageResult.Exception?.Message ?? "Stage failed"));
 654
 0655                            if (stageResult.FailureDisposition == FailureDisposition.AbortPipeline)
 656                            {
 0657                                phaseSucceeded = false;
 0658                                pipelineStopwatch.Stop();
 0659                                var errorMsg = stageResult.Exception?.Message
 0660                                    ?? $"Stage '{stage.Name}' failed";
 0661                                ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, pipelineStopwatch.Ela
 0662                                return new PipelineRunResult(
 0663                                    allStageResults,
 0664                                    pipelineStopwatch.Elapsed,
 0665                                    succeeded: false,
 0666                                    errorMessage: errorMsg,
 0667                                    exception: stageResult.Exception,
 0668                                    plannedStageCount: totalStages);
 669                            }
 670
 0671                            globalStageIndex++;
 0672                            continue;
 673                        }
 674
 51675                        ChatResponse? chatResponse = stageResult.ResponseText is not null
 51676                            ? new ChatResponse(new ChatMessage(ChatRole.Assistant, stageResult.ResponseText))
 51677                            : null;
 678
 51679                        allStageResults.Add(new AgentStageResult(
 51680                            stage.Name,
 51681                            chatResponse,
 51682                            stageResult.Diagnostics,
 51683                            PhaseName: phase.Name,
 51684                            Termination: stageResult.Termination));
 51685                        EmitStageMetricsAndDisposeActivity(
 51686                            pipelineName,
 51687                            allStageResults[^1],
 51688                            stageStopwatch,
 51689                            stageActivity);
 690
 51691                        reporter.Report(new AgentCompletedEvent(
 51692                            DateTimeOffset.UtcNow,
 51693                            reporter.WorkflowId,
 51694                            stage.Name,
 51695                            ParentAgentId: null,
 51696                            reporter.Depth,
 51697                            reporter.NextSequence(),
 51698                            stage.Name,
 51699                            Duration: pipelineStopwatch.Elapsed,
 51700                            TotalTokens: stageResult.Diagnostics?.AggregateTokenUsage.TotalTokens ?? 0));
 701
 51702                        globalStageIndex++;
 51703                    }
 704                }
 705                finally
 706                {
 48707                    if (phasePolicy?.OnExitAsync is { } onExit)
 708                    {
 5709                        await onExit(phaseContext, cancellationToken);
 710                    }
 711
 48712                    phaseBudgetScope?.Dispose();
 713
 48714                    phaseStopwatch.Stop();
 48715                    reporter.Report(new PhaseCompletedEvent(
 48716                        DateTimeOffset.UtcNow,
 48717                        reporter.WorkflowId,
 48718                        reporter.AgentId,
 48719                        ParentAgentId: null,
 48720                        reporter.Depth,
 48721                        reporter.NextSequence(),
 48722                        phase.Name,
 48723                        phaseIndex,
 48724                        phases.Count,
 48725                        phaseSucceeded,
 48726                        phaseStopwatch.Elapsed));
 727                }
 45728            }
 729
 29730            pipelineStopwatch.Stop();
 731
 29732            var pipelineResult = new PipelineRunResult(
 29733                allStageResults,
 29734                pipelineStopwatch.Elapsed,
 29735                succeeded: true,
 29736                errorMessage: null,
 29737                plannedStageCount: totalStages);
 738
 29739            if (options?.CompletionGate is { } gate)
 740            {
 1741                var gateError = gate(pipelineResult);
 1742                if (gateError is not null)
 743                {
 1744                    ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, pipelineStopwatch.Elapsed, succee
 1745                    return new PipelineRunResult(
 1746                        allStageResults,
 1747                        pipelineStopwatch.Elapsed,
 1748                        succeeded: false,
 1749                        errorMessage: gateError,
 1750                        plannedStageCount: totalStages);
 751                }
 752            }
 753
 28754            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, pipelineStopwatch.Elapsed, succeeded: tru
 28755            return pipelineResult;
 756        }
 0757        catch (OperationCanceledException ex) when (ex.InnerException is TokenBudgetExceededException budgetEx)
 758        {
 0759            pipelineStopwatch.Stop();
 0760            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, pipelineStopwatch.Elapsed, succeeded: fal
 0761            return new PipelineRunResult(
 0762                allStageResults,
 0763                pipelineStopwatch.Elapsed,
 0764                succeeded: false,
 0765                errorMessage: budgetEx.Message,
 0766                exception: budgetEx,
 0767                plannedStageCount: totalStages);
 768        }
 0769        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 770        {
 0771            pipelineStopwatch.Stop();
 0772            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, pipelineStopwatch.Elapsed, succeeded: fal
 0773            throw;
 774        }
 0775        catch (OperationCanceledException ex)
 776        {
 0777            pipelineStopwatch.Stop();
 0778            var message = ex.InnerException is TimeoutException
 0779                ? $"Stage timed out: {ex.InnerException.Message}"
 0780                : $"Operation cancelled (not by caller): {ex.Message}";
 0781            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, pipelineStopwatch.Elapsed, succeeded: fal
 0782            return new PipelineRunResult(
 0783                allStageResults,
 0784                pipelineStopwatch.Elapsed,
 0785                succeeded: false,
 0786                errorMessage: message,
 0787                exception: ex,
 0788                plannedStageCount: totalStages);
 789        }
 3790        catch (Exception ex)
 791        {
 3792            pipelineStopwatch.Stop();
 3793            ReportPipelineCompletion(reporter, pipelineActivity, pipelineName, pipelineStopwatch.Elapsed, succeeded: fal
 3794            return new PipelineRunResult(
 3795                allStageResults,
 3796                pipelineStopwatch.Elapsed,
 3797                succeeded: false,
 3798                errorMessage: ex.Message,
 3799                exception: ex,
 3800                plannedStageCount: totalStages);
 801        }
 802        finally
 803        {
 32804            pipelineBudgetScope?.Dispose();
 805        }
 32806    }
 807
 808    private static void ReportCompleted(
 809        IProgressReporter reporter,
 810        TimeSpan duration,
 811        bool succeeded,
 812        string? errorMessage)
 813    {
 79814        reporter.Report(new WorkflowCompletedEvent(
 79815            DateTimeOffset.UtcNow,
 79816            reporter.WorkflowId,
 79817            reporter.AgentId,
 79818            ParentAgentId: null,
 79819            reporter.Depth,
 79820            reporter.NextSequence(),
 79821            succeeded,
 79822            errorMessage,
 79823            duration));
 79824    }
 825
 826    private static string ResolvePipelineName(SequentialPipelineOptions? options, IProgressReporter reporter) =>
 79827        options?.PipelineName ?? reporter.WorkflowId;
 828
 829    private Activity? StartPipelineScope(string pipelineName)
 830    {
 79831        var activity = _pipelineMetrics.ActivitySource.StartActivity("pipeline.run");
 79832        activity?.SetTag("pipeline_name", pipelineName);
 79833        _pipelineMetrics.RecordPipelineStarted(pipelineName);
 79834        return activity;
 835    }
 836
 837    private void ReportPipelineCompletion(
 838        IProgressReporter reporter,
 839        Activity? pipelineActivity,
 840        string pipelineName,
 841        TimeSpan duration,
 842        bool succeeded,
 843        string? errorMessage)
 844    {
 79845        ReportCompleted(reporter, duration, succeeded, errorMessage);
 79846        pipelineActivity?.SetTag("outcome", succeeded ? "Succeeded" : "Failed");
 79847        pipelineActivity?.Dispose();
 79848        _pipelineMetrics.RecordPipelineCompleted(pipelineName, succeeded, duration);
 79849    }
 850
 851    private (Stopwatch stopwatch, Activity? activity) StartStageScope(
 852        string pipelineName,
 853        string stageName,
 854        string? phaseName)
 855    {
 112856        var stopwatch = Stopwatch.StartNew();
 112857        var activity = _pipelineMetrics.ActivitySource.StartActivity("pipeline.stage");
 112858        activity?.SetTag("pipeline_name", pipelineName);
 112859        activity?.SetTag("stage_name", stageName);
 112860        activity?.SetTag("phase_name", phaseName ?? "(none)");
 112861        return (stopwatch, activity);
 862    }
 863
 864    private void EmitStageMetricsAndDisposeActivity(
 865        string pipelineName,
 866        IAgentStageResult stage,
 867        Stopwatch stageStopwatch,
 868        Activity? stageActivity)
 869    {
 111870        stageStopwatch.Stop();
 111871        stageActivity?.SetTag("outcome", stage.Outcome.ToString());
 111872        stageActivity?.SetTag("termination_cause", stage.Termination?.ToTagValue() ?? "Unspecified");
 111873        stageActivity?.Dispose();
 111874        _pipelineMetrics.RecordStageCompleted(pipelineName, stage, stageStopwatch.Elapsed);
 111875    }
 876}

Methods/Properties

.ctor(NexusLabs.Needlr.AgentFramework.Diagnostics.IAgentDiagnosticsAccessor,NexusLabs.Needlr.AgentFramework.Budget.ITokenBudgetTracker,NexusLabs.Needlr.AgentFramework.Progress.IProgressReporterFactory,NexusLabs.Needlr.AgentFramework.Diagnostics.IPipelineMetrics)
RunAsync(NexusLabs.Needlr.AgentFramework.Workspace.IWorkspace,System.Collections.Generic.IReadOnlyList`1<NexusLabs.Needlr.AgentFramework.Workflows.Sequential.PipelineStage>,NexusLabs.Needlr.AgentFramework.Workflows.Sequential.SequentialPipelineOptions,System.Threading.CancellationToken)
RunAsync(NexusLabs.Needlr.AgentFramework.Workspace.IWorkspace,System.Collections.Generic.IReadOnlyList`1<NexusLabs.Needlr.AgentFramework.Workflows.Sequential.PipelineStage>,TState,NexusLabs.Needlr.AgentFramework.Workflows.Sequential.SequentialPipelineOptions,System.Threading.CancellationToken)
RunCoreAsync()
RunPhasedAsync(NexusLabs.Needlr.AgentFramework.Workspace.IWorkspace,System.Collections.Generic.IReadOnlyList`1<NexusLabs.Needlr.AgentFramework.Workflows.Sequential.PipelinePhase>,NexusLabs.Needlr.AgentFramework.Workflows.Sequential.SequentialPipelineOptions,System.Threading.CancellationToken)
RunPhasedAsync(NexusLabs.Needlr.AgentFramework.Workspace.IWorkspace,System.Collections.Generic.IReadOnlyList`1<NexusLabs.Needlr.AgentFramework.Workflows.Sequential.PipelinePhase>,TState,NexusLabs.Needlr.AgentFramework.Workflows.Sequential.SequentialPipelineOptions,System.Threading.CancellationToken)
RunPhasedCoreAsync()
ReportCompleted(NexusLabs.Needlr.AgentFramework.Progress.IProgressReporter,System.TimeSpan,System.Boolean,System.String)
ResolvePipelineName(NexusLabs.Needlr.AgentFramework.Workflows.Sequential.SequentialPipelineOptions,NexusLabs.Needlr.AgentFramework.Progress.IProgressReporter)
StartPipelineScope(System.String)
ReportPipelineCompletion(NexusLabs.Needlr.AgentFramework.Progress.IProgressReporter,System.Diagnostics.Activity,System.String,System.TimeSpan,System.Boolean,System.String)
StartStageScope(System.String,System.String,System.String)
EmitStageMetricsAndDisposeActivity(System.String,NexusLabs.Needlr.AgentFramework.Diagnostics.IAgentStageResult,System.Diagnostics.Stopwatch,System.Diagnostics.Activity)