• Uncategorised

Mutli node interrup example in langGraph4j

Your agents flow might require interruption or human-in-loop at multiple node executions. Here is the code sample to do so

package ai;

import org.bsc.langgraph4j.*;
import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.state.StateSnapshot;

import java.util.*;
import java.util.concurrent.CompletableFuture;

import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;

/**
 * Multi-Stage Approval Workflow with 3 Interrupt Points
 * 
 * Shows how to:
 * - Handle multiple checkpoints
 * - Identify which checkpoint you're at
 * - Handle different logic for each stage
 */
public class MultiStageApprovalExample {

    // Nodes
    private static Map<String, Object> submitRequest(AgentState state) {
        System.out.println("\n[SUBMIT] Request submitted");
        int amount = (int) state.data().get("amount");
        
        Map<String, Object> updates = new HashMap<>();
        updates.put("status", "submitted");
        updates.put("current_stage", "manager_review");
        updates.put("amount", amount);
        return updates;
    }

    private static Map<String, Object> managerReview(AgentState state) {
        int amount = (int) state.data().get("amount");
        System.out.println("\n[MANAGER] Reviewing $" + amount);
        
        Map<String, Object> updates = new HashMap<>();
        updates.put("status", "pending_manager_approval");
        updates.put("current_stage", "manager_review");
        updates.put("message", "Awaiting manager approval");
        return updates;
    }

    private static Map<String, Object> financeReview(AgentState state) {
        int amount = (int) state.data().get("amount");
        System.out.println("\n[FINANCE] Reviewing $" + amount);
        
        Map<String, Object> updates = new HashMap<>();
        updates.put("status", "pending_finance_approval");
        updates.put("current_stage", "finance_review");
        updates.put("message", "Awaiting finance approval");
        return updates;
    }

    private static Map<String, Object> ceoReview(AgentState state) {
        int amount = (int) state.data().get("amount");
        System.out.println("\n[CEO] Reviewing $" + amount);
        
        Map<String, Object> updates = new HashMap<>();
        updates.put("status", "pending_ceo_approval");
        updates.put("current_stage", "ceo_review");
        updates.put("message", "Awaiting CEO approval");
        return updates;
    }

    private static Map<String, Object> executeRequest(AgentState state) {
        System.out.println("\n[EXECUTE] Processing approved request");
        
        Map<String, Object> updates = new HashMap<>();
        updates.put("status", "completed");
        updates.put("current_stage", "completed");
        return updates;
    }

    // Build workflow with 3 interrupts
    public static CompiledGraph<AgentState> buildWorkflow() throws Exception {
        StateGraph<AgentState> workflow = new StateGraph<>(AgentState::new);
        
        workflow.addNode("submit", state -> CompletableFuture.completedFuture(submitRequest(state)));
        workflow.addNode("manager_review", state -> CompletableFuture.completedFuture(managerReview(state)));
        workflow.addNode("finance_review", state -> CompletableFuture.completedFuture(financeReview(state)));
        workflow.addNode("ceo_review", state -> CompletableFuture.completedFuture(ceoReview(state)));
        workflow.addNode("execute", state -> CompletableFuture.completedFuture(executeRequest(state)));
        
        workflow.addEdge(START, "submit");
        workflow.addEdge("submit", "manager_review");
        workflow.addEdge("manager_review", "finance_review");
        workflow.addEdge("finance_review", "ceo_review");
        workflow.addEdge("ceo_review", "execute");
        workflow.addEdge("execute", END);
        
        // THREE INTERRUPT POINTS!
        CompileConfig config = CompileConfig.builder()
            .checkpointSaver(new MemorySaver())
            .interruptAfter("manager_review")   // Checkpoint 1
            .interruptAfter("finance_review")   // Checkpoint 2
            .interruptAfter("ceo_review")       // Checkpoint 3
            .build();
        
        return workflow.compile(config);
    }

    /**
     * Helper: Identify which stage we're at
     */
    public static String identifyCurrentStage(StateSnapshot<AgentState> snapshot) {
        AgentState state = snapshot.state();
        
        // Option 1: Check the "current_stage" field we set in nodes
        String currentStage = (String) state.data().get("current_stage");
        if (currentStage != null) {
            return currentStage;
        }
        
        // Option 2: Check the status field
        String status = (String) state.data().get("status");
        if (status != null) {
            if (status.contains("manager")) return "manager_review";
            if (status.contains("finance")) return "finance_review";
            if (status.contains("ceo")) return "ceo_review";
        }
        
        // Option 3: Check what's next
        List<String> nextNodes = snapshot.next();
        if (!nextNodes.isEmpty()) {
            String next = nextNodes.get(0);
            // We're paused just before this next node
            return "before_" + next;
        }
        
        return "unknown";
    }

    /**
     * Handle approval for specific stage
     */
    public static void handleApprovalAtStage(
        CompiledGraph<AgentState> app,
        RunnableConfig config,
        String stage,
        String approverName,
        boolean approved
    ) throws Exception {
        
        System.out.println("\n" + "=".repeat(70));
        System.out.println("HANDLING APPROVAL AT STAGE: " + stage);
        System.out.println("=".repeat(70));
        
        Map<String, Object> updates = new HashMap<>();
        
        switch (stage) {
            case "manager_review":
                updates.put("manager_approved", approved);
                updates.put("manager_name", approverName);
                updates.put("manager_timestamp", System.currentTimeMillis());
                app.updateState(config, updates, "manager_review");
                break;
                
            case "finance_review":
                updates.put("finance_approved", approved);
                updates.put("finance_name", approverName);
                updates.put("finance_timestamp", System.currentTimeMillis());
                app.updateState(config, updates, "finance_review");
                break;
                
            case "ceo_review":
                updates.put("ceo_approved", approved);
                updates.put("ceo_name", approverName);
                updates.put("ceo_timestamp", System.currentTimeMillis());
                app.updateState(config, updates, "ceo_review");
                break;
                
            default:
                throw new IllegalStateException("Unknown stage: " + stage);
        }
        
        System.out.println("✓ Updated checkpoint at stage: " + stage);
    }

    /**
     * Main workflow orchestration
     */
    public static void main(String[] args) {
        try {
            System.out.println("╔════════════════════════════════════════════════════════════╗");
            System.out.println("║      Multi-Stage Approval Workflow (3 Checkpoints)        ║");
            System.out.println("╚════════════════════════════════════════════════════════════╝");
            
            CompiledGraph<AgentState> app = buildWorkflow();
            
            // Initial request
            String requestId = "REQ-MULTI-001";
            RunnableConfig config = RunnableConfig.builder()
                .threadId(requestId)
                .build();
            
            Map<String, Object> initialData = new HashMap<>();
            initialData.put("amount", 50000);
            initialData.put("requester", "Alice");
            
            // ================================================================
            // EXECUTION 1: Start workflow, pause at manager_review
            // ================================================================
            System.out.println("\n" + "=".repeat(70));
            System.out.println("EXECUTION #1: Initial Request");
            System.out.println("=".repeat(70));
            
            app.stream(initialData, config).forEach(output -> {
                System.out.println("  → " + output.node());
            });
            
            // Check where we are
            StateSnapshot<AgentState> snapshot1 = app.getState(config);
            String stage1 = identifyCurrentStage(snapshot1);
            
            System.out.println("\n✋ PAUSED at stage: " + stage1);
            System.out.println("   Status: " + snapshot1.state().data().get("status"));
            System.out.println("   Message: " + snapshot1.state().data().get("message"));
            System.out.println("   Next nodes: " + snapshot1.next());
            
            // ================================================================
            // EXECUTION 2: Manager approves, pause at finance_review
            // ================================================================
            System.out.println("\n\n" + "=".repeat(70));
            System.out.println("EXECUTION #2: Manager Approval");
            System.out.println("=".repeat(70));
            
            Thread.sleep(500);
            System.out.println("\n👤 Manager Bob Smith reviews and approves");
            
            // Handle manager approval
            handleApprovalAtStage(app, config, stage1, "Bob Smith", true);
            
            // Resume
            System.out.println("\n▶️  Resuming workflow...");
            app.stream(null, config).forEach(output -> {
                System.out.println("  → " + output.node());
            });
            
            // Check where we are now
            StateSnapshot<AgentState> snapshot2 = app.getState(config);
            String stage2 = identifyCurrentStage(snapshot2);
            
            System.out.println("\n✋ PAUSED at stage: " + stage2);
            System.out.println("   Status: " + snapshot2.state().data().get("status"));
            System.out.println("   Message: " + snapshot2.state().data().get("message"));
            System.out.println("   Manager: " + snapshot2.state().data().get("manager_name") + " ✓");
            
            // ================================================================
            // EXECUTION 3: Finance approves, pause at ceo_review
            // ================================================================
            System.out.println("\n\n" + "=".repeat(70));
            System.out.println("EXECUTION #3: Finance Approval");
            System.out.println("=".repeat(70));
            
            Thread.sleep(500);
            System.out.println("\n👤 Finance Director Carol approves");
            
            // Handle finance approval
            handleApprovalAtStage(app, config, stage2, "Carol Johnson", true);
            
            // Resume
            System.out.println("\n▶️  Resuming workflow...");
            app.stream(null, config).forEach(output -> {
                System.out.println("  → " + output.node());
            });
            
            // Check where we are now
            StateSnapshot<AgentState> snapshot3 = app.getState(config);
            String stage3 = identifyCurrentStage(snapshot3);
            
            System.out.println("\n✋ PAUSED at stage: " + stage3);
            System.out.println("   Status: " + snapshot3.state().data().get("status"));
            System.out.println("   Message: " + snapshot3.state().data().get("message"));
            System.out.println("   Manager: " + snapshot3.state().data().get("manager_name") + " ✓");
            System.out.println("   Finance: " + snapshot3.state().data().get("finance_name") + " ✓");
            
            // ================================================================
            // EXECUTION 4: CEO approves, completes workflow
            // ================================================================
            System.out.println("\n\n" + "=".repeat(70));
            System.out.println("EXECUTION #4: CEO Approval (Final)");
            System.out.println("=".repeat(70));
            
            Thread.sleep(500);
            System.out.println("\n👤 CEO David approves");
            
            // Handle CEO approval
            handleApprovalAtStage(app, config, stage3, "David CEO", true);
            
            // Resume - should complete now
            System.out.println("\n▶️  Resuming workflow...");
            app.stream(null, config).forEach(output -> {
                System.out.println("  → " + output.node());
            });
            
            // Final state
            StateSnapshot<AgentState> finalSnapshot = app.getState(config);
            
            System.out.println("\n" + "=".repeat(70));
            System.out.println("✓ WORKFLOW COMPLETED!");
            System.out.println("=".repeat(70));
            
            System.out.println("\n📊 Final State:");
            System.out.println("   Status: " + finalSnapshot.state().data().get("status"));
            System.out.println("   Amount: $" + finalSnapshot.state().data().get("amount"));
            System.out.println("   Approved by:");
            System.out.println("     • Manager: " + finalSnapshot.state().data().get("manager_name"));
            System.out.println("     • Finance: " + finalSnapshot.state().data().get("finance_name"));
            System.out.println("     • CEO: " + finalSnapshot.state().data().get("ceo_name"));
            
            // ================================================================
            // SUMMARY
            // ================================================================
            System.out.println("\n" + "=".repeat(70));
            System.out.println("KEY TECHNIQUES FOR MULTIPLE INTERRUPTS");
            System.out.println("=".repeat(70));
            
            System.out.println("\n1️⃣  Store current_stage in state:");
            System.out.println("   updates.put(\"current_stage\", \"manager_review\");");
            
            System.out.println("\n2️⃣  Check where you are:");
            System.out.println("   String stage = (String) state.data().get(\"current_stage\");");
            
            System.out.println("\n3️⃣  Handle each stage differently:");
            System.out.println("   switch(stage) {");
            System.out.println("     case \"manager_review\": ...");
            System.out.println("     case \"finance_review\": ...");
            System.out.println("     case \"ceo_review\": ...");
            System.out.println("   }");
            
            System.out.println("\n4️⃣  Update at correct node:");
            System.out.println("   app.updateState(config, updates, stage);");
            
            System.out.println("\n5️⃣  Resume from checkpoint:");
            System.out.println("   app.stream(null, config);");
            
        } catch (Exception e) {
            System.err.println("Error: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

You may also like...