Chapter 7: Interrupt/Resume (Human-in-the-Loop)
Goal of this chapter: understand the Interrupt/Resume mechanism and implement a Tool approval flow so users can confirm before sensitive operations.
Code Location
- Entry code: cmd/ch07/main.go
Prerequisites
Same as Chapter 1: you need a configured and available ChatModel (OpenAI or Ark). Also, like Chapter 4, set PROJECT_ROOT:
export PROJECT_ROOT=/path/to/eino # Eino core library root directory (defaults to current directory if not set)
Running
In the examples/quickstart/chatwitheino directory:
# Set project root directory
export PROJECT_ROOT=/path/to/your/project
go run ./cmd/ch07
Example output:
you> Please execute the command echo hello
⚠️ Approval Required ⚠️
Tool: execute
Arguments: {"command":"echo hello"}
Approve this action? (y/n): y
[tool result] hello
hello
From Automatic Execution to Human Approval: Why We Need Interrupt
In previous chapters, the Agent automatically executed all Tool calls, but in certain scenarios this is dangerous:
Risks of automatic execution:
- Deleting files: accidentally deleting important data
- Sending emails: sending incorrect content
- Executing commands: running dangerous operations
- Modifying config: breaking system settings
Interrupt’s role:
- Interrupt is the Agent’s pause mechanism: pauses before critical operations, waiting for user confirmation
- Interrupt carries information: shows the user the operation about to be executed
- Interrupt is resumable: continues after user confirmation, or returns an error on rejection
Simple analogy:
- Automatic execution = “autopilot” (fully trusting the system)
- Interrupt = “manual override” (critical decisions made by humans)
Key Concepts
Interrupt Mechanism
Interrupt is the core mechanism in Eino for implementing human-agent collaboration.
Core idea: pause before executing critical operations, wait for user confirmation, then continue.
A Tool that requires approval has its execution split into two phases:
- First call (triggers interrupt): the Tool saves the current arguments, then returns an interrupt signal. Runner pauses execution and returns an Interrupt event to the caller.
- Resume after user approval: Runner re-invokes the Tool; this time the Tool detects it was “previously interrupted”, reads the user’s approval result, and executes (or rejects).
Simplified pseudocode:
func myTool(ctx, args):
if first_call:
save args
return interrupt_signal // Runner pauses, shows approval prompt
else: // Second call after Resume
if user_approved:
return execute_operation(saved_args)
else:
return "Operation rejected by user"
Full code with key field explanations:
// Trigger interrupt in a Tool
func myTool(ctx context.Context, args string) (string, error) {
// wasInterrupted: whether this is the second call after Resume (false on first call, true after Resume)
// storedArgs: arguments saved via StatefulInterrupt on first call, retrievable after Resume
wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
if !wasInterrupted {
// First call: trigger interrupt, saving args for use after Resume
return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: "my_tool",
ArgumentsInJSON: args,
}, args) // Third parameter is the state to save (retrieved via storedArgs after Resume)
}
// Second call after Resume: read user's approval result
// isTarget: whether this Resume targets the current Tool (each Resume targets only one Tool)
// hasData: whether Resume carried approval result data
// data: the user's approval result
isTarget, hasData, data := tool.GetResumeContext[*ApprovalResult](ctx)
if isTarget && hasData {
if data.Approved {
return doSomething(storedArgs) // Execute actual operation with saved args
}
return "Operation rejected by user", nil
}
// Other cases (isTarget=false means this Resume's target is not the current Tool): re-interrupt
return "", tool.StatefulInterrupt(ctx, &ApprovalInfo{
ToolName: "my_tool",
ArgumentsInJSON: storedArgs,
}, storedArgs)
}
ApprovalMiddleware
ApprovalMiddleware is a generic approval middleware that intercepts specific Tool calls:
type approvalMiddleware struct {
*adk.BaseChatModelAgentMiddleware
}
func (m *approvalMiddleware) WrapInvokableToolCall(
_ context.Context,
endpoint adk.InvokableToolCallEndpoint,
tCtx *adk.ToolContext,
) (adk.InvokableToolCallEndpoint, error) {
// Only intercept Tools that require approval
if tCtx.Name != "execute" {
return endpoint, nil
}
return func(ctx context.Context, args string, opts ...tool.Option) (string, error) {
wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
if !wasInterrupted {
return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
ToolName: tCtx.Name,
ArgumentsInJSON: args,
}, args)
}
isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
if isTarget && hasData {
if data.Approved {
return endpoint(ctx, storedArgs, opts...)
}
if data.DisapproveReason != nil {
return fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason), nil
}
return fmt.Sprintf("tool '%s' disapproved", tCtx.Name), nil
}
isTarget, _, _ = tool.GetResumeContext[any](ctx)
if !isTarget {
return "", tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
ToolName: tCtx.Name,
ArgumentsInJSON: storedArgs,
}, storedArgs)
}
return endpoint(ctx, storedArgs, opts...)
}, nil
}
func (m *approvalMiddleware) WrapStreamableToolCall(
_ context.Context,
endpoint adk.StreamableToolCallEndpoint,
tCtx *adk.ToolContext,
) (adk.StreamableToolCallEndpoint, error) {
// If the agent is configured with StreamingShell, execute goes through streaming calls;
// this method must be implemented to intercept it
if tCtx.Name != "execute" {
return endpoint, nil
}
return func(ctx context.Context, args string, opts ...tool.Option) (*schema.StreamReader[string], error) {
wasInterrupted, _, storedArgs := tool.GetInterruptState[string](ctx)
if !wasInterrupted {
return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
ToolName: tCtx.Name,
ArgumentsInJSON: args,
}, args)
}
isTarget, hasData, data := tool.GetResumeContext[*commontool.ApprovalResult](ctx)
if isTarget && hasData {
if data.Approved {
return endpoint(ctx, storedArgs, opts...)
}
if data.DisapproveReason != nil {
return singleChunkReader(fmt.Sprintf("tool '%s' disapproved: %s", tCtx.Name, *data.DisapproveReason)), nil
}
return singleChunkReader(fmt.Sprintf("tool '%s' disapproved", tCtx.Name)), nil
}
isTarget, _, _ = tool.GetResumeContext[any](ctx)
if !isTarget {
return nil, tool.StatefulInterrupt(ctx, &commontool.ApprovalInfo{
ToolName: tCtx.Name,
ArgumentsInJSON: storedArgs,
}, storedArgs)
}
return endpoint(ctx, storedArgs, opts...)
}, nil
}
CheckPointStore
CheckPointStore is a key component for implementing interrupt/resume:
type CheckPointStore interface {
// Save checkpoint
Put(ctx context.Context, key string, checkpoint *Checkpoint) error
// Get checkpoint
Get(ctx context.Context, key string) (*Checkpoint, error)
}
Why do we need CheckPointStore?
- Save state on interrupt: Tool arguments, execution position, etc.
- Load state on resume: continue execution from the interrupt point
- Support cross-process resume: can still resume after process restart
Interrupt/Resume Implementation
1. Configure Runner with CheckPointStore
runner := adk.NewTypedRunner[M](adk.TypedRunnerConfig[M]{
Agent: agent,
EnableStreaming: true,
CheckPointStore: adkstore.NewInMemoryStore(), // In-memory storage
})
2. Configure Agent with ApprovalMiddleware
agent, err := deep.NewTyped[M](ctx, &deep.TypedConfig[M]{
// ... other config
Handlers: []adk.TypedChatModelAgentMiddleware[M]{
newApprovalMiddleware[M](), // Add approval middleware
newSafeToolMiddleware[M](), // Convert Tool errors to strings (interrupt errors propagate upward)
},
})
3. Handle Interrupt Events
checkPointID := sessionID
events := runner.Run(ctx, msgops.NormalizeMessagesForModelInput(history), adk.WithCheckPointID(checkPointID))
result, err := helpers.PrintAndCollect[M](events, helpers.PrintOptions{
ShowToolCalls: true,
ShowToolResults: true,
CaptureInterrupt: true,
})
if err != nil {
return err
}
assistantText := result.AssistantText
if result.InterruptInfo != nil {
// Note: it's recommended to use the same stdin reader for both "user input" and "approval y/n"
// to avoid approval input being treated as the next round's you> message
assistantText, err = handleInterrupt[M](ctx, runner, checkPointID, result.InterruptInfo, reader)
if err != nil {
return err
}
}
_ = session.Append(msgops.NewAssistant[M](assistantText, nil))
Interrupt/Resume Execution Flow
┌─────────────────────────────────────────┐
│ User: execute command echo hello │
└─────────────────────────────────────────┘
↓
┌──────────────────────┐
│ Agent analyzes intent│
│ Decides to call │
│ execute │
└──────────────────────┘
↓
┌──────────────────────┐
│ ApprovalMiddleware │
│ Intercepts Tool call │
└──────────────────────┘
↓
┌──────────────────────┐
│ Trigger Interrupt │
│ Save state to Store │
└──────────────────────┘
↓
┌──────────────────────┐
│ Return Interrupt │
│ event; await user │
│ approval │
└──────────────────────┘
↓
┌──────────────────────┐
│ User inputs y/n │
└──────────────────────┘
↓
┌──────────────────────┐
│ runner.ResumeWith... │
│ Resume execution │
└──────────────────────┘
↓
┌──────────────────────┐
│ Execute command │
│ or return rejection │
└──────────────────────┘
Chapter Summary
- Interrupt: the Agent’s pause mechanism, pausing before critical operations to await confirmation
- Resume: resume execution — continue after user confirmation, or return an error on rejection
- ApprovalMiddleware: a generic approval middleware that intercepts specific Tool calls
- CheckPointStore: saves interrupt state, supporting cross-process resume
- Human-agent collaboration: critical decisions are confirmed by humans, improving safety
Extended Thinking
Other Interrupt scenarios:
- Multi-option approval: user selects one of several options
- Parameter completion: user provides missing parameters
- Conditional branching: user decides the execution path
Approval strategies:
- Allowlist: only approve sensitive operations
- Blocklist: approve all operations except safe ones
- Dynamic rules: decide whether to require approval based on argument content