| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using System.Diagnostics.CodeAnalysis; |
| | | 3 | | using System.Reflection; |
| | | 4 | | using Microsoft.Agents.AI; |
| | | 5 | | using Microsoft.Agents.AI.Workflows; |
| | | 6 | | using Microsoft.Extensions.AI; |
| | | 7 | | |
| | | 8 | | namespace NexusLabs.Needlr.AgentFramework; |
| | | 9 | | |
| | | 10 | | /// <summary> |
| | | 11 | | /// Default implementation of <see cref="IWorkflowFactory"/> that assembles MAF workflows from |
| | | 12 | | /// topology declared via <see cref="AgentHandoffsToAttribute"/> and |
| | | 13 | | /// <see cref="AgentGroupChatMemberAttribute"/>. |
| | | 14 | | /// </summary> |
| | | 15 | | /// <remarks> |
| | | 16 | | /// When the source generator bootstrap is registered (i.e., the generator package is included |
| | | 17 | | /// and <c>UsingAgentFramework()</c> detected a <c>[ModuleInitializer]</c>-emitted registration), |
| | | 18 | | /// topology data is read from the compile-time registry for zero-allocation discovery. |
| | | 19 | | /// When no bootstrap data is available, topology is discovered via reflection at workflow |
| | | 20 | | /// creation time; that path is annotated <see cref="RequiresUnreferencedCodeAttribute"/>. |
| | | 21 | | /// </remarks> |
| | | 22 | | internal sealed class WorkflowFactory : IWorkflowFactory |
| | | 23 | | { |
| | | 24 | | private readonly IAgentFactory _agentFactory; |
| | | 25 | | |
| | 86 | 26 | | public WorkflowFactory(IAgentFactory agentFactory) |
| | | 27 | | { |
| | 86 | 28 | | ArgumentNullException.ThrowIfNull(agentFactory); |
| | 86 | 29 | | _agentFactory = agentFactory; |
| | 86 | 30 | | } |
| | | 31 | | |
| | | 32 | | /// <inheritdoc/> |
| | | 33 | | public Workflow CreateHandoffWorkflow<TInitialAgent>() where TInitialAgent : class |
| | | 34 | | { |
| | 9 | 35 | | var targets = ResolveHandoffTargets(typeof(TInitialAgent)); |
| | 8 | 36 | | var initialAgent = _agentFactory.CreateAgent<TInitialAgent>(); |
| | 8 | 37 | | return BuildHandoff(initialAgent, targets); |
| | | 38 | | } |
| | | 39 | | |
| | | 40 | | /// <inheritdoc/> |
| | | 41 | | public Workflow CreateGroupChatWorkflow(string groupName, int maxIterations = 10) |
| | 10 | 42 | | => CreateGroupChatWorkflowCore(groupName, maxIterations, configureAgent: null); |
| | | 43 | | |
| | | 44 | | /// <inheritdoc/> |
| | | 45 | | public Workflow CreateGroupChatWorkflow(string groupName, int maxIterations, Action<Type, AgentFactoryOptions> confi |
| | | 46 | | { |
| | 6 | 47 | | ArgumentNullException.ThrowIfNull(configureAgent); |
| | 6 | 48 | | return CreateGroupChatWorkflowCore(groupName, maxIterations, configureAgent); |
| | | 49 | | } |
| | | 50 | | |
| | | 51 | | private Workflow CreateGroupChatWorkflowCore(string groupName, int maxIterations, Action<Type, AgentFactoryOptions>? |
| | | 52 | | { |
| | 16 | 53 | | ArgumentException.ThrowIfNullOrWhiteSpace(groupName); |
| | | 54 | | |
| | 16 | 55 | | var memberTypes = ResolveGroupChatMembers(groupName) |
| | 28 | 56 | | .OrderBy(t => t.GetCustomAttributes<AgentGroupChatMemberAttribute>() |
| | 28 | 57 | | .Where(a => string.Equals(a.GroupName, groupName, StringComparison.Ordinal)) |
| | 28 | 58 | | .Select(a => a.Order) |
| | 28 | 59 | | .FirstOrDefault()) |
| | 28 | 60 | | .ThenBy(t => t.Name, StringComparer.Ordinal) |
| | 16 | 61 | | .ToList(); |
| | | 62 | | |
| | 16 | 63 | | if (memberTypes.Count < 2) |
| | | 64 | | { |
| | 2 | 65 | | throw new InvalidOperationException( |
| | 2 | 66 | | $"CreateGroupChatWorkflow(\"{groupName}\") failed: {memberTypes.Count} agent(s) are " + |
| | 2 | 67 | | $"registered as members of group \"{groupName}\". At least two are required. " + |
| | 2 | 68 | | $"Add [AgentGroupChatMember(\"{groupName}\")] to the agent classes and ensure their " + |
| | 2 | 69 | | $"assemblies are scanned."); |
| | | 70 | | } |
| | | 71 | | |
| | 14 | 72 | | var agents = memberTypes.Select(t => |
| | 14 | 73 | | { |
| | 28 | 74 | | if (configureAgent is not null) |
| | 24 | 75 | | return _agentFactory.CreateAgent(t.FullName ?? t.Name, opts => configureAgent(t, opts)); |
| | 16 | 76 | | return _agentFactory.CreateAgent(t.FullName ?? t.Name); |
| | 14 | 77 | | }).ToList(); |
| | | 78 | | |
| | 14 | 79 | | var conditions = BuildTerminationConditions(memberTypes); |
| | | 80 | | |
| | 14 | 81 | | Func<IReadOnlyList<AIAgent>, RoundRobinGroupChatManager> managerFactory = conditions.Count > 0 |
| | 8 | 82 | | ? a => new RoundRobinGroupChatManager(a, ShouldTerminateAsync(conditions)) { MaximumIterationCount = maxIter |
| | 14 | 83 | | : a => new RoundRobinGroupChatManager(a) { MaximumIterationCount = maxIterations }; |
| | | 84 | | |
| | 14 | 85 | | return AgentWorkflowBuilder |
| | 14 | 86 | | .CreateGroupChatBuilderWith(managerFactory) |
| | 14 | 87 | | .AddParticipants(agents) |
| | 14 | 88 | | .Build(); |
| | | 89 | | } |
| | | 90 | | |
| | | 91 | | /// <inheritdoc/> |
| | | 92 | | public Workflow CreateSequentialWorkflow(params AIAgent[] agents) |
| | | 93 | | { |
| | 0 | 94 | | ArgumentNullException.ThrowIfNull(agents); |
| | | 95 | | |
| | 0 | 96 | | if (agents.Length == 0) |
| | 0 | 97 | | throw new ArgumentException("At least one agent is required.", nameof(agents)); |
| | | 98 | | |
| | 0 | 99 | | return AgentWorkflowBuilder.BuildSequential(agents); |
| | | 100 | | } |
| | | 101 | | |
| | | 102 | | /// <inheritdoc/> |
| | | 103 | | public Workflow CreateSequentialWorkflow(string pipelineName) |
| | | 104 | | { |
| | 8 | 105 | | ArgumentException.ThrowIfNullOrWhiteSpace(pipelineName); |
| | | 106 | | |
| | 8 | 107 | | var memberTypes = ResolveSequentialMembers(pipelineName); |
| | 27 | 108 | | var agents = memberTypes.Select(t => _agentFactory.CreateAgent(t.FullName ?? t.Name)).ToArray(); |
| | 7 | 109 | | return AgentWorkflowBuilder.BuildSequential(agents); |
| | | 110 | | } |
| | | 111 | | |
| | | 112 | | [UnconditionalSuppressMessage("TrimAnalysis", "IL2026", Justification = "Reflection fallback is unreachable in AOT b |
| | | 113 | | private IReadOnlyList<(Type TargetType, string? HandoffReason)> ResolveHandoffTargets(Type initialAgentType) |
| | | 114 | | { |
| | 9 | 115 | | if (AgentFrameworkGeneratedBootstrap.TryGetHandoffTopology(out var provider)) |
| | | 116 | | { |
| | 9 | 117 | | var topology = provider(); |
| | 9 | 118 | | if (topology.TryGetValue(initialAgentType, out var targets)) |
| | 2 | 119 | | return targets; |
| | | 120 | | |
| | | 121 | | // Type is not in the bootstrap topology — it may be from an assembly that didn't run the |
| | | 122 | | // generator. Fall back to reflection so multi-assembly and test scenarios work correctly. |
| | 7 | 123 | | return ResolveHandoffTargetsViaReflection(initialAgentType); |
| | | 124 | | } |
| | | 125 | | |
| | 0 | 126 | | return ResolveHandoffTargetsViaReflection(initialAgentType); |
| | | 127 | | } |
| | | 128 | | |
| | | 129 | | [RequiresUnreferencedCode("Reflection-based topology discovery may not work after trimming. Use the source generator |
| | | 130 | | private static IReadOnlyList<(Type TargetType, string? HandoffReason)> ResolveHandoffTargetsViaReflection(Type initi |
| | | 131 | | { |
| | 7 | 132 | | var attrs = initialAgentType.GetCustomAttributes<AgentHandoffsToAttribute>().ToList(); |
| | | 133 | | |
| | 7 | 134 | | if (attrs.Count == 0) |
| | | 135 | | { |
| | 1 | 136 | | throw new InvalidOperationException( |
| | 1 | 137 | | $"CreateHandoffWorkflow<{initialAgentType.Name}>() failed: {initialAgentType.Name} has no " + |
| | 1 | 138 | | $"[AgentHandoffsTo] attributes. Declare at least one " + |
| | 1 | 139 | | $"[AgentHandoffsTo(typeof(TargetAgent))] on {initialAgentType.Name} to specify its handoff targets."); |
| | | 140 | | } |
| | | 141 | | |
| | 6 | 142 | | return attrs |
| | 12 | 143 | | .Select(a => (a.TargetAgentType, a.HandoffReason)) |
| | 6 | 144 | | .ToList() |
| | 6 | 145 | | .AsReadOnly(); |
| | | 146 | | } |
| | | 147 | | |
| | | 148 | | [UnconditionalSuppressMessage("TrimAnalysis", "IL2026", Justification = "Reflection fallback is unreachable in AOT b |
| | | 149 | | private IReadOnlyList<Type> ResolveGroupChatMembers(string groupName) |
| | | 150 | | { |
| | 16 | 151 | | if (AgentFrameworkGeneratedBootstrap.TryGetGroupChatGroups(out var provider)) |
| | | 152 | | { |
| | 16 | 153 | | var groups = provider(); |
| | 16 | 154 | | if (groups.TryGetValue(groupName, out var members)) |
| | 2 | 155 | | return members; |
| | | 156 | | |
| | | 157 | | // Group not in the bootstrap — may be from an assembly that didn't run the generator. |
| | | 158 | | // Fall back to reflection so multi-assembly and test scenarios work correctly. |
| | 14 | 159 | | return ResolveGroupChatMembersViaReflection(groupName); |
| | | 160 | | } |
| | | 161 | | |
| | 0 | 162 | | return ResolveGroupChatMembersViaReflection(groupName); |
| | | 163 | | } |
| | | 164 | | |
| | | 165 | | [RequiresUnreferencedCode("Reflection-based group chat discovery may not work after trimming. Use the source generat |
| | | 166 | | private static IReadOnlyList<Type> ResolveGroupChatMembersViaReflection(string groupName) |
| | | 167 | | { |
| | 14 | 168 | | return AppDomain.CurrentDomain.GetAssemblies() |
| | 14 | 169 | | .SelectMany(a => |
| | 14 | 170 | | { |
| | 1288 | 171 | | try { return a.GetTypes(); } |
| | 0 | 172 | | catch { return []; } |
| | 1288 | 173 | | }) |
| | 241561 | 174 | | .Where(t => t.GetCustomAttributes<AgentGroupChatMemberAttribute>() |
| | 241715 | 175 | | .Any(attr => string.Equals(attr.GroupName, groupName, StringComparison.Ordinal))) |
| | 14 | 176 | | .ToList() |
| | 14 | 177 | | .AsReadOnly(); |
| | | 178 | | } |
| | | 179 | | |
| | | 180 | | [UnconditionalSuppressMessage("TrimAnalysis", "IL2026", Justification = "Reflection fallback is unreachable in AOT b |
| | | 181 | | private static IReadOnlyList<Type> ResolveSequentialMembers(string pipelineName) |
| | | 182 | | { |
| | 8 | 183 | | if (AgentFrameworkGeneratedBootstrap.TryGetSequentialTopology(out var provider)) |
| | | 184 | | { |
| | 8 | 185 | | var topology = provider(); |
| | 8 | 186 | | if (topology.TryGetValue(pipelineName, out var members) && members.Count > 0) |
| | 2 | 187 | | return members; |
| | | 188 | | |
| | 6 | 189 | | return ResolveSequentialMembersViaReflection(pipelineName); |
| | | 190 | | } |
| | | 191 | | |
| | 0 | 192 | | return ResolveSequentialMembersViaReflection(pipelineName); |
| | | 193 | | } |
| | | 194 | | |
| | | 195 | | [RequiresUnreferencedCode("Reflection-based sequential pipeline discovery may not work after trimming. Use the sourc |
| | | 196 | | private static IReadOnlyList<Type> ResolveSequentialMembersViaReflection(string pipelineName) |
| | | 197 | | { |
| | 6 | 198 | | var members = AppDomain.CurrentDomain.GetAssemblies() |
| | 6 | 199 | | .SelectMany(a => |
| | 6 | 200 | | { |
| | 550 | 201 | | try { return a.GetTypes(); } |
| | 0 | 202 | | catch { return []; } |
| | 550 | 203 | | }) |
| | 103413 | 204 | | .SelectMany(t => t.GetCustomAttributes<AgentSequenceMemberAttribute>() |
| | 30 | 205 | | .Where(attr => string.Equals(attr.PipelineName, pipelineName, StringComparison.Ordinal)) |
| | 103427 | 206 | | .Select(attr => (Type: t, attr.Order))) |
| | 14 | 207 | | .OrderBy(x => x.Order) |
| | 14 | 208 | | .Select(x => x.Type) |
| | 6 | 209 | | .ToList() |
| | 6 | 210 | | .AsReadOnly(); |
| | | 211 | | |
| | 6 | 212 | | if (members.Count == 0) |
| | 1 | 213 | | throw new InvalidOperationException( |
| | 1 | 214 | | $"No agents found for sequential pipeline '{pipelineName}'. " + |
| | 1 | 215 | | $"Decorate agent classes with [AgentSequenceMember(\"{pipelineName}\", order)] and ensure their assembli |
| | | 216 | | |
| | 5 | 217 | | return members; |
| | | 218 | | } |
| | | 219 | | |
| | | 220 | | private Workflow BuildHandoff( |
| | | 221 | | AIAgent initialAgent, |
| | | 222 | | IReadOnlyList<(Type TargetType, string? HandoffReason)> targets) |
| | | 223 | | { |
| | 8 | 224 | | if (targets.Count == 0) |
| | | 225 | | { |
| | 0 | 226 | | throw new InvalidOperationException( |
| | 0 | 227 | | $"Cannot build handoff workflow for agent '{initialAgent.Name ?? initialAgent.Id}': no handoff targets f |
| | | 228 | | } |
| | | 229 | | |
| | 8 | 230 | | var targetPairs = targets |
| | 16 | 231 | | .Select(t => (_agentFactory.CreateAgent(t.TargetType.FullName ?? t.TargetType.Name), t.HandoffReason)) |
| | 8 | 232 | | .ToArray(); |
| | | 233 | | |
| | 8 | 234 | | var builder = AgentWorkflowBuilder.CreateHandoffBuilderWith(initialAgent); |
| | | 235 | | |
| | 8 | 236 | | var withoutReason = targetPairs |
| | 16 | 237 | | .Where(t => string.IsNullOrEmpty(t.HandoffReason)) |
| | 16 | 238 | | .Select(t => t.Item1) |
| | 8 | 239 | | .ToArray(); |
| | | 240 | | |
| | 8 | 241 | | if (withoutReason.Length > 0) |
| | 8 | 242 | | builder.WithHandoffs(initialAgent, withoutReason); |
| | | 243 | | |
| | 32 | 244 | | foreach (var (target, reason) in targetPairs.Where(t => !string.IsNullOrEmpty(t.HandoffReason))) |
| | 0 | 245 | | builder.WithHandoff(initialAgent, target, reason!); |
| | | 246 | | |
| | 8 | 247 | | return builder.Build(); |
| | | 248 | | } |
| | | 249 | | |
| | | 250 | | private static IReadOnlyList<(string AgentName, IWorkflowTerminationCondition Condition)> BuildTerminationConditions |
| | | 251 | | IReadOnlyList<Type> memberTypes) |
| | | 252 | | { |
| | 14 | 253 | | var result = new List<(string, IWorkflowTerminationCondition)>(); |
| | 84 | 254 | | foreach (var type in memberTypes) |
| | | 255 | | { |
| | 74 | 256 | | foreach (var attr in type.GetCustomAttributes<AgentTerminationConditionAttribute>()) |
| | | 257 | | { |
| | 9 | 258 | | var condition = (IWorkflowTerminationCondition)Activator.CreateInstance( |
| | 9 | 259 | | attr.ConditionType, attr.CtorArgs)!; |
| | 9 | 260 | | result.Add((type.Name, condition)); |
| | | 261 | | } |
| | | 262 | | } |
| | 14 | 263 | | return result; |
| | | 264 | | } |
| | | 265 | | |
| | | 266 | | private static Func<RoundRobinGroupChatManager, IEnumerable<Microsoft.Extensions.AI.ChatMessage>, CancellationToken, |
| | | 267 | | IReadOnlyList<(string AgentName, IWorkflowTerminationCondition Condition)> conditions) |
| | | 268 | | { |
| | 8 | 269 | | return (manager, history, ct) => |
| | 8 | 270 | | { |
| | 24 | 271 | | var historyList = history.ToList(); |
| | 24 | 272 | | if (historyList.Count == 0) |
| | 0 | 273 | | return ValueTask.FromResult(false); |
| | 8 | 274 | | |
| | 24 | 275 | | var lastMessage = historyList[^1]; |
| | 24 | 276 | | var agentId = lastMessage.AuthorName ?? string.Empty; |
| | 8 | 277 | | |
| | 24 | 278 | | var toolCallNames = lastMessage.Contents |
| | 24 | 279 | | .OfType<FunctionCallContent>() |
| | 0 | 280 | | .Select(fc => fc.Name) |
| | 0 | 281 | | .Where(n => !string.IsNullOrEmpty(n)) |
| | 24 | 282 | | .ToList(); |
| | 8 | 283 | | |
| | 24 | 284 | | var ctx = new TerminationContext |
| | 24 | 285 | | { |
| | 24 | 286 | | AgentId = agentId, |
| | 24 | 287 | | LastMessage = lastMessage, |
| | 24 | 288 | | TurnCount = historyList.Count, |
| | 24 | 289 | | ConversationHistory = historyList, |
| | 24 | 290 | | ToolCallNames = toolCallNames, |
| | 24 | 291 | | }; |
| | 8 | 292 | | |
| | 94 | 293 | | foreach (var (_, condition) in conditions) |
| | 8 | 294 | | { |
| | 24 | 295 | | if (condition.ShouldTerminate(ctx)) |
| | 2 | 296 | | return ValueTask.FromResult(true); |
| | 8 | 297 | | } |
| | 8 | 298 | | |
| | 22 | 299 | | return ValueTask.FromResult(false); |
| | 10 | 300 | | }; |
| | | 301 | | } |
| | | 302 | | |
| | | 303 | | /// <inheritdoc /> |
| | | 304 | | [RequiresUnreferencedCode("Graph workflow discovery uses reflection when source-generated bootstrap data is unavaila |
| | | 305 | | public Workflow CreateGraphWorkflow(string graphName) |
| | | 306 | | { |
| | 20 | 307 | | ArgumentException.ThrowIfNullOrWhiteSpace(graphName); |
| | | 308 | | |
| | 18 | 309 | | var entryType = FindGraphEntryType(graphName); |
| | 17 | 310 | | var entryAttr = entryType.GetCustomAttributes<AgentGraphEntryAttribute>() |
| | 34 | 311 | | .First(a => string.Equals(a.GraphName, graphName, StringComparison.Ordinal)); |
| | | 312 | | |
| | 17 | 313 | | var edges = DiscoverGraphEdges(graphName); |
| | 17 | 314 | | if (edges.Count == 0) |
| | | 315 | | { |
| | 1 | 316 | | throw new InvalidOperationException( |
| | 1 | 317 | | $"Cannot build graph workflow '{graphName}': no edges found."); |
| | | 318 | | } |
| | | 319 | | |
| | 16 | 320 | | var allAgentTypes = new HashSet<Type> { entryType }; |
| | 134 | 321 | | foreach (var edge in edges) |
| | | 322 | | { |
| | 51 | 323 | | allAgentTypes.Add(edge.SourceType); |
| | 51 | 324 | | allAgentTypes.Add(edge.TargetType); |
| | | 325 | | } |
| | | 326 | | |
| | | 327 | | // Create agents and bind executors ONCE per agent to avoid duplicate bindings. |
| | 16 | 328 | | var agents = new Dictionary<Type, AIAgent>(); |
| | 16 | 329 | | var executorBindings = new Dictionary<Type, ExecutorBinding>(); |
| | 146 | 330 | | foreach (var type in allAgentTypes) |
| | | 331 | | { |
| | 57 | 332 | | var agent = _agentFactory.CreateAgent(type.FullName ?? type.Name); |
| | 57 | 333 | | agents[type] = agent; |
| | 57 | 334 | | executorBindings[type] = agent.BindAsExecutor(); |
| | | 335 | | } |
| | | 336 | | |
| | | 337 | | // Discover [AgentGraphNode] attributes for JoinMode metadata. |
| | | 338 | | // WaitAll (default) maps to MAF's barrier-style edges (default AddEdge behavior). |
| | | 339 | | // WaitAny is supported via RunGraphAsync which uses Needlr's own executor. |
| | | 340 | | // CreateGraphWorkflow only supports WaitAll since it returns a MAF Workflow. |
| | 16 | 341 | | var nodeJoinModes = DiscoverNodeJoinModes(graphName, allAgentTypes); |
| | 53 | 342 | | foreach (var (type, joinMode) in nodeJoinModes) |
| | | 343 | | { |
| | 11 | 344 | | if (joinMode == GraphJoinMode.WaitAny) |
| | | 345 | | { |
| | 1 | 346 | | throw new NotSupportedException( |
| | 1 | 347 | | $"GraphJoinMode.WaitAny on '{type.FullName ?? type.Name}' in graph '{graphName}' is not compatible " |
| | 1 | 348 | | $"with CreateGraphWorkflow (which returns a MAF Workflow using BSP execution). " + |
| | 1 | 349 | | $"Use RunGraphAsync(\"{graphName}\", input) instead — it handles WaitAny via " + |
| | 1 | 350 | | $"Needlr's own graph executor."); |
| | | 351 | | } |
| | | 352 | | } |
| | | 353 | | |
| | 15 | 354 | | var builder = new WorkflowBuilder(executorBindings[entryType]); |
| | | 355 | | |
| | | 356 | | // Discover reducer bindings before wiring edges so fan-in edges can be |
| | | 357 | | // routed through the reducer FunctionExecutor node. |
| | 15 | 358 | | var reducerBinding = DiscoverReducerBinding(graphName); |
| | | 359 | | |
| | | 360 | | // Identify fan-in targets (agent types with two or more incoming edges) |
| | | 361 | | // so their inbound edges can be redirected through the reducer. |
| | 15 | 362 | | var fanInSources = new Dictionary<Type, List<Type>>(); |
| | 128 | 363 | | foreach (var edge in edges) |
| | | 364 | | { |
| | 49 | 365 | | if (!fanInSources.TryGetValue(edge.TargetType, out var sources)) |
| | | 366 | | { |
| | 39 | 367 | | sources = []; |
| | 39 | 368 | | fanInSources[edge.TargetType] = sources; |
| | | 369 | | } |
| | | 370 | | |
| | 49 | 371 | | sources.Add(edge.SourceType); |
| | | 372 | | } |
| | | 373 | | |
| | 15 | 374 | | var fanInTargets = reducerBinding is not null |
| | 15 | 375 | | ? fanInSources |
| | 18 | 376 | | .Where(kv => kv.Value.Count >= 2) |
| | 6 | 377 | | .Select(kv => kv.Key) |
| | 15 | 378 | | .ToHashSet() |
| | 15 | 379 | | : []; |
| | | 380 | | |
| | | 381 | | // Compute effective routing mode per source node: per-node override wins, |
| | | 382 | | // then graph-wide default from the entry attribute. |
| | 15 | 383 | | var graphRoutingMode = entryAttr.RoutingMode; |
| | 132 | 384 | | var edgesBySource = edges.GroupBy(e => e.SourceType).ToDictionary(g => g.Key, g => g.ToList()); |
| | 15 | 385 | | var effectiveRoutingModes = new Dictionary<Type, GraphRoutingMode>(); |
| | 98 | 386 | | foreach (var (sourceType, sourceEdges) in edgesBySource) |
| | | 387 | | { |
| | 34 | 388 | | var nodeOverride = sourceEdges |
| | 49 | 389 | | .Select(e => e.NodeRoutingModeOverride) |
| | 83 | 390 | | .FirstOrDefault(m => m is not null); |
| | 34 | 391 | | effectiveRoutingModes[sourceType] = nodeOverride ?? graphRoutingMode; |
| | | 392 | | } |
| | | 393 | | |
| | | 394 | | // Validate: LlmChoice is not supported in the BSP path — it requires |
| | | 395 | | // async LLM calls that CreateGraphWorkflow (synchronous build) cannot provide. |
| | 97 | 396 | | foreach (var (sourceType, routingMode) in effectiveRoutingModes) |
| | | 397 | | { |
| | 34 | 398 | | if (routingMode == GraphRoutingMode.LlmChoice) |
| | | 399 | | { |
| | 1 | 400 | | throw new NotSupportedException( |
| | 1 | 401 | | $"GraphRoutingMode.LlmChoice on '{sourceType.FullName ?? sourceType.Name}' in graph '{graphName}' is |
| | 1 | 402 | | $"with CreateGraphWorkflow (which returns a MAF Workflow using BSP execution). " + |
| | 1 | 403 | | $"Use RunGraphAsync(\"{graphName}\", input) instead — it handles LlmChoice via " + |
| | 1 | 404 | | $"Needlr's own graph executor with an IChatClient."); |
| | | 405 | | } |
| | | 406 | | } |
| | | 407 | | |
| | 14 | 408 | | if (reducerBinding is not null && fanInTargets.Count > 0) |
| | | 409 | | { |
| | 6 | 410 | | builder.BindExecutor(reducerBinding); |
| | 6 | 411 | | var wiredFanInTargets = new HashSet<Type>(); |
| | | 412 | | |
| | 60 | 413 | | foreach (var edge in edges) |
| | | 414 | | { |
| | 24 | 415 | | if (fanInTargets.Contains(edge.TargetType)) |
| | | 416 | | { |
| | | 417 | | // Redirect fan-in edges through the reducer function node: |
| | | 418 | | // source → reducer (instead of source → fan-in agent) |
| | 12 | 419 | | AddRoutedEdge(builder, executorBindings[edge.SourceType], reducerBinding, |
| | 12 | 420 | | edge, effectiveRoutingModes, edgesBySource); |
| | | 421 | | |
| | | 422 | | // reducer → original fan-in agent (wired once per target) |
| | 12 | 423 | | if (wiredFanInTargets.Add(edge.TargetType)) |
| | | 424 | | { |
| | 6 | 425 | | builder.AddEdge(reducerBinding, executorBindings[edge.TargetType]); |
| | | 426 | | } |
| | | 427 | | } |
| | | 428 | | else |
| | | 429 | | { |
| | 12 | 430 | | AddRoutedEdge(builder, executorBindings[edge.SourceType], executorBindings[edge.TargetType], |
| | 12 | 431 | | edge, effectiveRoutingModes, edgesBySource); |
| | | 432 | | } |
| | | 433 | | } |
| | | 434 | | } |
| | | 435 | | else |
| | | 436 | | { |
| | 62 | 437 | | foreach (var edge in edges) |
| | | 438 | | { |
| | 23 | 439 | | AddRoutedEdge(builder, executorBindings[edge.SourceType], executorBindings[edge.TargetType], |
| | 23 | 440 | | edge, effectiveRoutingModes, edgesBySource); |
| | | 441 | | } |
| | | 442 | | } |
| | | 443 | | |
| | 14 | 444 | | return builder.Build(); |
| | | 445 | | } |
| | | 446 | | |
| | | 447 | | private static Dictionary<Type, GraphJoinMode> DiscoverNodeJoinModes( |
| | | 448 | | string graphName, |
| | | 449 | | IEnumerable<Type> agentTypes) |
| | | 450 | | { |
| | 16 | 451 | | var result = new Dictionary<Type, GraphJoinMode>(); |
| | 146 | 452 | | foreach (var type in agentTypes) |
| | | 453 | | { |
| | 57 | 454 | | var nodeAttr = type.GetCustomAttributes<AgentGraphNodeAttribute>() |
| | 68 | 455 | | .FirstOrDefault(a => string.Equals(a.GraphName, graphName, StringComparison.Ordinal)); |
| | 57 | 456 | | if (nodeAttr is not null) |
| | | 457 | | { |
| | 11 | 458 | | result[type] = nodeAttr.JoinMode; |
| | | 459 | | } |
| | | 460 | | } |
| | | 461 | | |
| | 16 | 462 | | return result; |
| | | 463 | | } |
| | | 464 | | |
| | | 465 | | /// <summary> |
| | | 466 | | /// Discovers a single <see cref="AgentGraphReducerAttribute"/> for the graph and |
| | | 467 | | /// creates a <see cref="FunctionExecutor{TInput, TOutput}"/> wrapped in an |
| | | 468 | | /// <see cref="ExecutorBinding"/> so it can participate as a node in the |
| | | 469 | | /// <see cref="WorkflowBuilder"/> DAG. |
| | | 470 | | /// </summary> |
| | | 471 | | /// <returns> |
| | | 472 | | /// An <see cref="ExecutorBinding"/> for the reducer, or <c>null</c> if no reducer |
| | | 473 | | /// is declared for this graph. |
| | | 474 | | /// </returns> |
| | | 475 | | [RequiresUnreferencedCode("Reducer discovery uses reflection to find [AgentGraphReducer] and invoke static methods." |
| | | 476 | | private static ExecutorBinding? DiscoverReducerBinding(string graphName) |
| | | 477 | | { |
| | 1696 | 478 | | foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies()) |
| | | 479 | | { |
| | | 480 | | Type[] types; |
| | 1672 | 481 | | try { types = assembly.GetTypes(); } |
| | 0 | 482 | | catch { continue; } |
| | | 483 | | |
| | 347524 | 484 | | foreach (var type in types) |
| | | 485 | | { |
| | 345972 | 486 | | foreach (var attr in type.GetCustomAttributes<AgentGraphReducerAttribute>()) |
| | | 487 | | { |
| | 60 | 488 | | if (!string.Equals(attr.GraphName, graphName, StringComparison.Ordinal)) |
| | | 489 | | continue; |
| | | 490 | | |
| | 6 | 491 | | if (string.IsNullOrWhiteSpace(attr.ReducerMethod)) |
| | | 492 | | continue; |
| | | 493 | | |
| | 6 | 494 | | var method = type.GetMethod( |
| | 6 | 495 | | attr.ReducerMethod, |
| | 6 | 496 | | BindingFlags.Public | BindingFlags.Static, |
| | 6 | 497 | | null, |
| | 6 | 498 | | [typeof(IReadOnlyList<string>)], |
| | 6 | 499 | | null); |
| | | 500 | | |
| | 6 | 501 | | if (method is null || method.ReturnType != typeof(string)) |
| | | 502 | | { |
| | 0 | 503 | | throw new InvalidOperationException( |
| | 0 | 504 | | $"[AgentGraphReducer] on {type.FullName ?? type.Name} references method '{attr.ReducerMethod |
| | 0 | 505 | | $"but no matching 'public static string {attr.ReducerMethod}(IReadOnlyList<string>)' was fou |
| | | 506 | | } |
| | | 507 | | |
| | 6 | 508 | | return CreateReducerExecutorBinding(type, method); |
| | | 509 | | } |
| | | 510 | | } |
| | | 511 | | } |
| | | 512 | | |
| | 9 | 513 | | return null; |
| | 6 | 514 | | } |
| | | 515 | | |
| | | 516 | | private static ExecutorBinding CreateReducerExecutorBinding(Type reducerType, MethodInfo reducerMethod) |
| | | 517 | | { |
| | 6 | 518 | | var reducerId = $"reducer:{reducerType.FullName ?? reducerType.Name}"; |
| | | 519 | | |
| | | 520 | | // The FunctionExecutor receives a string input per invocation. In the |
| | | 521 | | // BSP model each superstep delivers one message. The reducer is invoked |
| | | 522 | | // with a single-element list per call — the downstream agent sees the |
| | | 523 | | // reduced output from each branch independently. No shared mutable |
| | | 524 | | // state is needed because each invocation is self-contained. |
| | 6 | 525 | | var executor = new FunctionExecutor<string, string>( |
| | 6 | 526 | | reducerId, |
| | 6 | 527 | | (input, _, _) => |
| | 6 | 528 | | { |
| | 0 | 529 | | var inputs = new List<string> { input }; |
| | 0 | 530 | | var result = (string)reducerMethod.Invoke( |
| | 0 | 531 | | null, |
| | 0 | 532 | | [inputs.AsReadOnly()])!; |
| | 0 | 533 | | return new ValueTask<string>(result); |
| | 6 | 534 | | }, |
| | 6 | 535 | | options: null, |
| | 6 | 536 | | sentMessageTypes: null, |
| | 6 | 537 | | outputTypes: null, |
| | 6 | 538 | | declareCrossRunShareable: false); |
| | | 539 | | |
| | 6 | 540 | | return new ExecutorInstanceBinding(executor); |
| | | 541 | | } |
| | | 542 | | |
| | | 543 | | private Type FindGraphEntryType(string graphName) |
| | | 544 | | { |
| | 271 | 545 | | foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies()) |
| | | 546 | | { |
| | | 547 | | Type[] types; |
| | 252 | 548 | | try { types = assembly.GetTypes(); } |
| | 0 | 549 | | catch { continue; } |
| | | 550 | | |
| | 135565 | 551 | | foreach (var type in types) |
| | | 552 | | { |
| | 67665 | 553 | | var entry = type.GetCustomAttributes<AgentGraphEntryAttribute>() |
| | 68113 | 554 | | .FirstOrDefault(a => string.Equals(a.GraphName, graphName, StringComparison.Ordinal)); |
| | 67665 | 555 | | if (entry is not null) |
| | | 556 | | { |
| | 17 | 557 | | return type; |
| | | 558 | | } |
| | | 559 | | } |
| | | 560 | | } |
| | | 561 | | |
| | 1 | 562 | | throw new InvalidOperationException( |
| | 1 | 563 | | $"Cannot build graph workflow '{graphName}': no entry point found. " + |
| | 1 | 564 | | $"Ensure exactly one agent class has [AgentGraphEntry(\"{graphName}\")]."); |
| | | 565 | | } |
| | | 566 | | |
| | | 567 | | private static List<GraphEdgeInfo> DiscoverGraphEdges(string graphName) |
| | | 568 | | { |
| | 17 | 569 | | var edges = new List<GraphEdgeInfo>(); |
| | | 570 | | |
| | 3110 | 571 | | foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies()) |
| | | 572 | | { |
| | | 573 | | Type[] types; |
| | 3076 | 574 | | try { types = assembly.GetTypes(); } |
| | 0 | 575 | | catch { continue; } |
| | | 576 | | |
| | 584294 | 577 | | foreach (var type in types) |
| | | 578 | | { |
| | 584278 | 579 | | foreach (var attr in type.GetCustomAttributes<AgentGraphEdgeAttribute>()) |
| | | 580 | | { |
| | 1530 | 581 | | if (string.Equals(attr.GraphName, graphName, StringComparison.Ordinal)) |
| | | 582 | | { |
| | 51 | 583 | | edges.Add(new GraphEdgeInfo( |
| | 51 | 584 | | type, |
| | 51 | 585 | | attr.TargetAgentType, |
| | 51 | 586 | | attr.Condition, |
| | 51 | 587 | | attr.IsRequired, |
| | 51 | 588 | | attr.HasNodeRoutingMode ? attr.NodeRoutingMode : null)); |
| | | 589 | | } |
| | | 590 | | } |
| | | 591 | | } |
| | | 592 | | } |
| | | 593 | | |
| | 17 | 594 | | return edges; |
| | | 595 | | } |
| | | 596 | | |
| | | 597 | | /// <summary> |
| | | 598 | | /// Wires a single edge into the <see cref="WorkflowBuilder"/>, applying |
| | | 599 | | /// condition functions according to the effective routing mode for the |
| | | 600 | | /// source node. |
| | | 601 | | /// </summary> |
| | | 602 | | /// <remarks> |
| | | 603 | | /// <para> |
| | | 604 | | /// <see cref="GraphEdgeInfo.IsRequired"/> is intentionally NOT wired in the BSP path. |
| | | 605 | | /// MAF's <see cref="WorkflowBuilder.AddEdge(ExecutorBinding, ExecutorBinding)"/> API |
| | | 606 | | /// has no concept of optional vs. required edges — all edges are implicitly required. |
| | | 607 | | /// The <c>IsRequired</c> semantic is a Needlr-native-executor-only feature handled by |
| | | 608 | | /// <c>RunGraphAsync</c>, which uses Needlr's own graph executor. |
| | | 609 | | /// </para> |
| | | 610 | | /// </remarks> |
| | | 611 | | private static void AddRoutedEdge( |
| | | 612 | | WorkflowBuilder builder, |
| | | 613 | | ExecutorBinding source, |
| | | 614 | | ExecutorBinding target, |
| | | 615 | | GraphEdgeInfo edge, |
| | | 616 | | Dictionary<Type, GraphRoutingMode> effectiveRoutingModes, |
| | | 617 | | Dictionary<Type, List<GraphEdgeInfo>> edgesBySource) |
| | | 618 | | { |
| | 47 | 619 | | var routingMode = effectiveRoutingModes.GetValueOrDefault(edge.SourceType, GraphRoutingMode.Deterministic); |
| | | 620 | | |
| | 47 | 621 | | if (edge.Condition is null) |
| | | 622 | | { |
| | 42 | 623 | | builder.AddEdge(source, target); |
| | 42 | 624 | | return; |
| | | 625 | | } |
| | | 626 | | |
| | | 627 | | switch (routingMode) |
| | | 628 | | { |
| | | 629 | | case GraphRoutingMode.Deterministic: |
| | | 630 | | case GraphRoutingMode.AllMatching: |
| | 0 | 631 | | builder.AddEdge<object>(source, target, |
| | 0 | 632 | | input => EvaluateEdgeCondition(edge.SourceType, edge.Condition, input)); |
| | 0 | 633 | | break; |
| | | 634 | | |
| | | 635 | | case GraphRoutingMode.FirstMatching: |
| | | 636 | | { |
| | 1 | 637 | | var sourceEdges = edgesBySource[edge.SourceType]; |
| | 1 | 638 | | var edgeIndex = sourceEdges.IndexOf(edge); |
| | 1 | 639 | | var precedingConditionalEdges = sourceEdges |
| | 1 | 640 | | .Take(edgeIndex) |
| | 0 | 641 | | .Where(e => e.Condition is not null) |
| | 1 | 642 | | .ToList(); |
| | | 643 | | |
| | 1 | 644 | | builder.AddEdge<object>(source, target, input => |
| | 1 | 645 | | { |
| | 1 | 646 | | // Only follow this edge if its condition passes AND |
| | 1 | 647 | | // no earlier conditional edge's condition passed. |
| | 0 | 648 | | if (!EvaluateEdgeCondition(edge.SourceType, edge.Condition, input)) |
| | 0 | 649 | | return false; |
| | 1 | 650 | | |
| | 0 | 651 | | foreach (var earlier in precedingConditionalEdges) |
| | 1 | 652 | | { |
| | 0 | 653 | | if (EvaluateEdgeCondition(edge.SourceType, earlier.Condition!, input)) |
| | 0 | 654 | | return false; |
| | 1 | 655 | | } |
| | 1 | 656 | | |
| | 0 | 657 | | return true; |
| | 1 | 658 | | }); |
| | 1 | 659 | | break; |
| | | 660 | | } |
| | | 661 | | |
| | | 662 | | case GraphRoutingMode.ExclusiveChoice: |
| | | 663 | | { |
| | 4 | 664 | | var sourceEdges = edgesBySource[edge.SourceType]; |
| | 4 | 665 | | builder.AddEdge<object>(source, target, input => |
| | 4 | 666 | | { |
| | 0 | 667 | | var matchCount = 0; |
| | 0 | 668 | | var thisMatches = false; |
| | 4 | 669 | | |
| | 0 | 670 | | foreach (var e in sourceEdges) |
| | 4 | 671 | | { |
| | 0 | 672 | | if (e.Condition is null) |
| | 4 | 673 | | continue; |
| | 0 | 674 | | if (EvaluateEdgeCondition(edge.SourceType, e.Condition, input)) |
| | 4 | 675 | | { |
| | 0 | 676 | | matchCount++; |
| | 0 | 677 | | if (ReferenceEquals(e, edge)) |
| | 0 | 678 | | thisMatches = true; |
| | 4 | 679 | | } |
| | 4 | 680 | | } |
| | 4 | 681 | | |
| | 0 | 682 | | if (matchCount == 0) |
| | 4 | 683 | | { |
| | 0 | 684 | | throw new InvalidOperationException( |
| | 0 | 685 | | $"ExclusiveChoice routing on '{edge.SourceType.Name}': no edge condition matched. " + |
| | 0 | 686 | | $"Exactly one must match."); |
| | 4 | 687 | | } |
| | 4 | 688 | | |
| | 0 | 689 | | if (matchCount > 1) |
| | 4 | 690 | | { |
| | 0 | 691 | | var names = string.Join(", ", sourceEdges |
| | 0 | 692 | | .Where(e => e.Condition is not null && EvaluateEdgeCondition(edge.SourceType, e.Condition, i |
| | 0 | 693 | | .Select(e => e.TargetType.Name)); |
| | 0 | 694 | | throw new InvalidOperationException( |
| | 0 | 695 | | $"ExclusiveChoice routing on '{edge.SourceType.Name}': {matchCount} edges matched " + |
| | 0 | 696 | | $"({names}). Exactly one must match."); |
| | 4 | 697 | | } |
| | 4 | 698 | | |
| | 0 | 699 | | return thisMatches; |
| | 4 | 700 | | }); |
| | 4 | 701 | | break; |
| | | 702 | | } |
| | | 703 | | |
| | | 704 | | default: |
| | 0 | 705 | | builder.AddEdge(source, target); |
| | | 706 | | break; |
| | | 707 | | } |
| | 0 | 708 | | } |
| | | 709 | | |
| | | 710 | | /// <summary> |
| | | 711 | | /// Evaluates a condition string by looking up a static method on the source |
| | | 712 | | /// agent type that accepts <c>object?</c> and returns <c>bool</c>. |
| | | 713 | | /// </summary> |
| | | 714 | | [RequiresUnreferencedCode("Condition evaluation uses reflection to invoke static predicate methods on agent types.") |
| | | 715 | | private static bool EvaluateEdgeCondition(Type sourceType, string conditionMethodName, object? upstreamOutput) |
| | | 716 | | { |
| | 0 | 717 | | var method = sourceType.GetMethod( |
| | 0 | 718 | | conditionMethodName, |
| | 0 | 719 | | BindingFlags.Public | BindingFlags.Static | BindingFlags.NonPublic, |
| | 0 | 720 | | null, |
| | 0 | 721 | | [typeof(object)], |
| | 0 | 722 | | null); |
| | | 723 | | |
| | 0 | 724 | | if (method is null) |
| | | 725 | | { |
| | 0 | 726 | | method = sourceType.GetMethods(BindingFlags.Public | BindingFlags.Static | BindingFlags.NonPublic) |
| | 0 | 727 | | .FirstOrDefault(m => m.Name == conditionMethodName && m.GetParameters().Length == 1); |
| | | 728 | | } |
| | | 729 | | |
| | 0 | 730 | | if (method is null || method.ReturnType != typeof(bool)) |
| | | 731 | | { |
| | 0 | 732 | | throw new InvalidOperationException( |
| | 0 | 733 | | $"Condition '{conditionMethodName}' on '{sourceType.Name}' must be a static method " + |
| | 0 | 734 | | $"with signature 'static bool {conditionMethodName}(object? upstreamOutput)'."); |
| | | 735 | | } |
| | | 736 | | |
| | 0 | 737 | | return (bool)method.Invoke(null, [upstreamOutput])!; |
| | | 738 | | } |
| | | 739 | | |
| | 611 | 740 | | private sealed record GraphEdgeInfo(Type SourceType, Type TargetType, string? Condition, bool IsRequired, GraphRouti |
| | | 741 | | } |