| | | 1 | | using System.Text; |
| | | 2 | | |
| | | 3 | | using Microsoft.Extensions.AI.Evaluation; |
| | | 4 | | |
| | | 5 | | namespace NexusLabs.Needlr.AgentFramework.Langfuse; |
| | | 6 | | |
| | | 7 | | /// <summary> |
| | | 8 | | /// Maps typed scores and <see cref="EvaluationResult"/> metrics to Langfuse score payloads, posts |
| | | 9 | | /// them via <see cref="LangfuseScoreApiClient"/>, and routes failures through a |
| | | 10 | | /// <see cref="LangfuseScoreFailureSink"/>. Shared by <see cref="LangfuseScenario"/> (session path) |
| | | 11 | | /// and <see cref="LangfuseScoreClient"/> (host path) so the mapping lives in one place. |
| | | 12 | | /// </summary> |
| | | 13 | | internal sealed class LangfuseScoreRecorder |
| | | 14 | | { |
| | | 15 | | private const string NumericDataType = "NUMERIC"; |
| | | 16 | | private const string BooleanDataType = "BOOLEAN"; |
| | | 17 | | private const string CategoricalDataType = "CATEGORICAL"; |
| | | 18 | | |
| | | 19 | | private readonly LangfuseScoreApiClient _apiClient; |
| | | 20 | | private readonly LangfuseScoreFailureSink _failureSink; |
| | | 21 | | private readonly bool _normalizeNames; |
| | | 22 | | |
| | 16 | 23 | | public LangfuseScoreRecorder( |
| | 16 | 24 | | LangfuseScoreApiClient apiClient, |
| | 16 | 25 | | LangfuseScoreFailureSink failureSink, |
| | 16 | 26 | | bool normalizeNames) |
| | | 27 | | { |
| | 16 | 28 | | ArgumentNullException.ThrowIfNull(apiClient); |
| | 16 | 29 | | ArgumentNullException.ThrowIfNull(failureSink); |
| | | 30 | | |
| | 16 | 31 | | _apiClient = apiClient; |
| | 16 | 32 | | _failureSink = failureSink; |
| | 16 | 33 | | _normalizeNames = normalizeNames; |
| | 16 | 34 | | } |
| | | 35 | | |
| | | 36 | | public Task RecordNumericAsync(string traceId, string name, double value, string? comment, CancellationToken cancell |
| | 6 | 37 | | RecordNumericAsync(LangfuseScoreTarget.Trace(traceId), name, value, comment, cancellationToken); |
| | | 38 | | |
| | | 39 | | public Task RecordBooleanAsync(string traceId, string name, bool value, string? comment, CancellationToken cancellat |
| | 1 | 40 | | RecordBooleanAsync(LangfuseScoreTarget.Trace(traceId), name, value, comment, cancellationToken); |
| | | 41 | | |
| | | 42 | | public Task RecordCategoricalAsync(string traceId, string name, string value, string? comment, CancellationToken can |
| | 1 | 43 | | RecordCategoricalAsync(LangfuseScoreTarget.Trace(traceId), name, value, comment, cancellationToken); |
| | | 44 | | |
| | | 45 | | public Task RecordNumericAsync(LangfuseScoreTarget target, string name, double value, string? comment, CancellationT |
| | 7 | 46 | | SendAsync(target, name, value, NumericDataType, comment, cancellationToken); |
| | | 47 | | |
| | | 48 | | public Task RecordBooleanAsync(LangfuseScoreTarget target, string name, bool value, string? comment, CancellationTok |
| | 3 | 49 | | SendAsync(target, name, value ? 1.0 : 0.0, BooleanDataType, comment, cancellationToken); |
| | | 50 | | |
| | | 51 | | public Task RecordCategoricalAsync(LangfuseScoreTarget target, string name, string value, string? comment, Cancellat |
| | | 52 | | { |
| | 1 | 53 | | ArgumentNullException.ThrowIfNull(value); |
| | 1 | 54 | | return SendAsync(target, name, value, CategoricalDataType, comment, cancellationToken); |
| | | 55 | | } |
| | | 56 | | |
| | | 57 | | public async Task RecordEvaluationAsync(string traceId, EvaluationResult result, CancellationToken cancellationToken |
| | | 58 | | { |
| | 1 | 59 | | ArgumentNullException.ThrowIfNull(result); |
| | | 60 | | |
| | 10 | 61 | | foreach (var metric in result.Metrics.Values) |
| | | 62 | | { |
| | | 63 | | switch (metric) |
| | | 64 | | { |
| | | 65 | | case NumericMetric { Value: { } numeric }: |
| | 1 | 66 | | await RecordNumericAsync(traceId, metric.Name, numeric, metric.Reason, cancellationToken).ConfigureA |
| | 1 | 67 | | break; |
| | | 68 | | case BooleanMetric { Value: { } boolean }: |
| | 1 | 69 | | await RecordBooleanAsync(traceId, metric.Name, boolean, metric.Reason, cancellationToken).ConfigureA |
| | 1 | 70 | | break; |
| | | 71 | | case StringMetric { Value: { Length: > 0 } category }: |
| | 1 | 72 | | await RecordCategoricalAsync(traceId, metric.Name, category, metric.Reason, cancellationToken).Confi |
| | | 73 | | break; |
| | | 74 | | } |
| | | 75 | | } |
| | 1 | 76 | | } |
| | | 77 | | |
| | | 78 | | private async Task SendAsync(LangfuseScoreTarget target, string name, object value, string dataType, string? comment |
| | | 79 | | { |
| | 11 | 80 | | ArgumentException.ThrowIfNullOrWhiteSpace(name); |
| | | 81 | | |
| | 11 | 82 | | var score = new LangfuseScore |
| | 11 | 83 | | { |
| | 11 | 84 | | TraceId = target.TraceId, |
| | 11 | 85 | | ObservationId = target.ObservationId, |
| | 11 | 86 | | SessionId = target.SessionId, |
| | 11 | 87 | | Name = NormalizeName(name), |
| | 11 | 88 | | Value = value, |
| | 11 | 89 | | DataType = dataType, |
| | 11 | 90 | | Comment = comment, |
| | 11 | 91 | | }; |
| | | 92 | | |
| | | 93 | | try |
| | | 94 | | { |
| | 11 | 95 | | await _apiClient.CreateAsync(score, cancellationToken).ConfigureAwait(false); |
| | 9 | 96 | | } |
| | 2 | 97 | | catch (LangfuseException ex) |
| | | 98 | | { |
| | 2 | 99 | | _failureSink.Record(name, target.TraceId, ex); |
| | 1 | 100 | | } |
| | 10 | 101 | | } |
| | | 102 | | |
| | | 103 | | /// <summary> |
| | | 104 | | /// Records a score that could not be attached because no sampled trace was available (for |
| | | 105 | | /// example, head sampling dropped the scenario span). Routed through the failure sink so it is |
| | | 106 | | /// surfaced rather than silently lost. |
| | | 107 | | /// </summary> |
| | | 108 | | public Task RecordSkippedAsync(string name, CancellationToken cancellationToken) => |
| | 0 | 109 | | RecordSkippedAsync( |
| | 0 | 110 | | name, |
| | 0 | 111 | | $"Cannot record score '{name}': no sampled trace was available to attach it to. " + |
| | 0 | 112 | | "Ensure the Langfuse session is enabled and sampling is not dropping the scenario span.", |
| | 0 | 113 | | cancellationToken); |
| | | 114 | | |
| | | 115 | | /// <summary> |
| | | 116 | | /// Records a score that was skipped for a specific reason, routed through the failure sink so it |
| | | 117 | | /// is surfaced rather than silently lost. |
| | | 118 | | /// </summary> |
| | | 119 | | /// <param name="name">The score name.</param> |
| | | 120 | | /// <param name="message">The reason the score was skipped.</param> |
| | | 121 | | /// <param name="cancellationToken">A cancellation token.</param> |
| | | 122 | | public Task RecordSkippedAsync(string name, string message, CancellationToken cancellationToken) |
| | | 123 | | { |
| | 1 | 124 | | var failure = new LangfuseException(message); |
| | | 125 | | |
| | | 126 | | try |
| | | 127 | | { |
| | 1 | 128 | | _failureSink.Record(name, null, failure); |
| | 1 | 129 | | return Task.CompletedTask; |
| | | 130 | | } |
| | | 131 | | catch (LangfuseException ex) |
| | | 132 | | { |
| | 0 | 133 | | return Task.FromException(ex); |
| | | 134 | | } |
| | 1 | 135 | | } |
| | | 136 | | |
| | 11 | 137 | | private string NormalizeName(string name) => _normalizeNames ? ToSnakeCase(name) : name; |
| | | 138 | | |
| | | 139 | | private static string ToSnakeCase(string name) |
| | | 140 | | { |
| | 1 | 141 | | var builder = new StringBuilder(name.Length); |
| | 1 | 142 | | var previousWasUnderscore = false; |
| | | 143 | | |
| | 50 | 144 | | foreach (var ch in name.Trim()) |
| | | 145 | | { |
| | 24 | 146 | | if (char.IsLetterOrDigit(ch)) |
| | | 147 | | { |
| | 21 | 148 | | builder.Append(char.ToLowerInvariant(ch)); |
| | 21 | 149 | | previousWasUnderscore = false; |
| | | 150 | | } |
| | 3 | 151 | | else if (!previousWasUnderscore && builder.Length > 0) |
| | | 152 | | { |
| | 3 | 153 | | builder.Append('_'); |
| | 3 | 154 | | previousWasUnderscore = true; |
| | | 155 | | } |
| | | 156 | | } |
| | | 157 | | |
| | 1 | 158 | | return builder.ToString().Trim('_'); |
| | | 159 | | } |
| | | 160 | | } |