< Summary

Information
Class: NexusLabs.Needlr.AgentFramework.Langfuse.LangfuseTraceAttributeProcessor
Assembly: NexusLabs.Needlr.AgentFramework.Langfuse
File(s): /home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Langfuse/LangfuseTraceAttributeProcessor.cs
Line coverage
87%
Covered lines: 51
Uncovered lines: 7
Coverable lines: 58
Total lines: 161
Line coverage: 87.9%
Branch coverage
81%
Covered branches: 52
Total branches: 64
Branch coverage: 81.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%44100%
OnStart(...)100%11100%
OnEnd(...)100%11100%
SetContextAttributes(...)100%88100%
CopyBaggageToTags(...)100%66100%
SetObservationType(...)90%101090.9%
SetUsageDetails(...)95.45%222293.75%
ReadTokenTag(...)28.57%391450%

File(s)

/home/runner/work/needlr/needlr/src/NexusLabs.Needlr.AgentFramework.Langfuse/LangfuseTraceAttributeProcessor.cs

#LineLine coverage
 1using System.Diagnostics;
 2using System.Text.Json;
 3
 4using OpenTelemetry;
 5
 6namespace NexusLabs.Needlr.AgentFramework.Langfuse;
 7
 8/// <summary>
 9/// Enriches spans with Langfuse-specific attributes as they flow through the export pipeline.
 10/// </summary>
 11/// <remarks>
 12/// <list type="bullet">
 13/// <item>
 14/// Copies <see cref="Activity.Baggage"/> entries (trace-level <c>session.id</c> / <c>user.id</c>
 15/// set on the scenario root) onto each span's tags so per-observation filtering works.
 16/// </item>
 17/// <item>
 18/// Sets <c>langfuse.observation.type</c> explicitly — <c>generation</c> for chat-completion spans
 19/// and <c>span</c> for tool-call spans — instead of relying on Langfuse's implicit
 20/// "has a model attribute ⇒ generation" inference.
 21/// </item>
 22/// <item>
 23/// Projects Needlr's <c>gen_ai.usage.*</c> tags into a <c>langfuse.observation.usage_details</c>
 24/// JSON attribute so token usage (including <c>cache_read_input_tokens</c> and
 25/// <c>reasoning_tokens</c>) reliably lands rather than depending on auto-mapping of non-standard
 26/// keys.
 27/// </item>
 28/// </list>
 29/// <para>
 30/// This type derives from the OpenTelemetry SDK's <see cref="BaseProcessor{T}"/>, which is the
 31/// only supported extension point for span processing; subclassing it is a framework requirement,
 32/// not a design choice. It is registered before the OTLP exporter so its <see cref="OnEnd"/>
 33/// mutations are visible to the exporter.
 34/// </para>
 35/// </remarks>
 36internal sealed class LangfuseTraceAttributeProcessor : BaseProcessor<Activity>
 37{
 38    private const string ChatActivityName = "agent.chat";
 39    private const string ChatStreamActivityName = "agent.chat.stream";
 40    private const string ToolActivityPrefix = "agent.tool";
 41
 42    private readonly string? _environment;
 43    private readonly string? _release;
 44
 45    /// <summary>
 46    /// Initializes the processor with optional trace-level context propagated to every span.
 47    /// </summary>
 48    /// <param name="environment">
 49    /// The Langfuse environment emitted as <c>langfuse.environment</c>, or <see langword="null"/>
 50    /// to leave it unset.
 51    /// </param>
 52    /// <param name="release">
 53    /// The release identifier emitted as <c>langfuse.release</c>, or <see langword="null"/> to
 54    /// leave it unset.
 55    /// </param>
 1656    public LangfuseTraceAttributeProcessor(string? environment = null, string? release = null)
 57    {
 1658        _environment = string.IsNullOrWhiteSpace(environment) ? null : environment;
 1659        _release = string.IsNullOrWhiteSpace(release) ? null : release;
 1660    }
 61
 62    /// <inheritdoc />
 63    public override void OnStart(Activity data)
 64    {
 1465        ArgumentNullException.ThrowIfNull(data);
 66
 1467        CopyBaggageToTags(data);
 1468        SetObservationType(data);
 1469        SetContextAttributes(data);
 1470    }
 71
 72    /// <inheritdoc />
 73    public override void OnEnd(Activity data)
 74    {
 275        ArgumentNullException.ThrowIfNull(data);
 76
 277        SetUsageDetails(data);
 278    }
 79
 80    private void SetContextAttributes(Activity data)
 81    {
 1482        if (_environment is not null && data.GetTagItem("langfuse.environment") is null)
 83        {
 184            data.SetTag("langfuse.environment", _environment);
 85        }
 86
 1487        if (_release is not null && data.GetTagItem("langfuse.release") is null)
 88        {
 189            data.SetTag("langfuse.release", _release);
 90        }
 1491    }
 92
 93    private static void CopyBaggageToTags(Activity data)
 94    {
 4095        foreach (var entry in data.Baggage)
 96        {
 697            if (entry.Value is not null && data.GetTagItem(entry.Key) is null)
 98            {
 599                data.SetTag(entry.Key, entry.Value);
 100            }
 101        }
 14102    }
 103
 104    private static void SetObservationType(Activity data)
 105    {
 14106        if (data.GetTagItem("langfuse.observation.type") is not null)
 107        {
 0108            return;
 109        }
 110
 14111        var type = data.OperationName switch
 14112        {
 6113            ChatActivityName or ChatStreamActivityName => "generation",
 12114            _ when data.OperationName.StartsWith(ToolActivityPrefix, StringComparison.Ordinal) => "span",
 4115            _ => null,
 14116        };
 117
 14118        if (type is not null)
 119        {
 10120            data.SetTag("langfuse.observation.type", type);
 121        }
 14122    }
 123
 124    private static void SetUsageDetails(Activity data)
 125    {
 2126        if (data.GetTagItem("langfuse.observation.usage_details") is not null)
 127        {
 0128            return;
 129        }
 130
 2131        var input = ReadTokenTag(data, "gen_ai.usage.input_tokens");
 2132        var output = ReadTokenTag(data, "gen_ai.usage.output_tokens");
 2133        var cacheRead = ReadTokenTag(data, "gen_ai.usage.cached_input_tokens");
 2134        var reasoning = ReadTokenTag(data, "gen_ai.usage.reasoning_tokens");
 135
 2136        if (input is null && output is null && cacheRead is null && reasoning is null)
 137        {
 1138            return;
 139        }
 140
 1141        var usage = new Dictionary<string, long>(StringComparer.Ordinal);
 2142        if (input is { } i) usage["input"] = i;
 2143        if (output is { } o) usage["output"] = o;
 2144        if (cacheRead is { } c) usage["cache_read_input_tokens"] = c;
 2145        if (reasoning is { } r) usage["reasoning_tokens"] = r;
 2146        if (input is { } ti && output is { } to) usage["total"] = ti + to;
 147
 1148        data.SetTag("langfuse.observation.usage_details", JsonSerializer.Serialize(usage));
 1149    }
 150
 8151    private static long? ReadTokenTag(Activity data, string key) => data.GetTagItem(key) switch
 8152    {
 4153        null => null,
 0154        long l => l,
 4155        int n => n,
 0156        short s => s,
 0157        byte b => b,
 0158        string str when long.TryParse(str, out var parsed) => parsed,
 0159        _ => null,
 8160    };
 161}