Flow control valves allow you to halt, pause, or redirect the flow of data through your Pipeline based on conditional logic.
Advanced Pipeline Flow Control
💡 Tip: Flow control valves allow you to halt, pause, or redirect the flow of data through your Pipeline based on conditional logic.
Table of Contents
- Flow Control Overview
- Pipe Jumping
- Pipe Repeating
- Pipeline Termination
- Reasoning Skip
- Complex Flow Control Patterns
- Flow Control Best Practices
- Monitoring Flow Control
TPipe provides sophisticated flow control mechanisms that allow pipes to dynamically alter pipeline execution through jumping, repeating, and termination. These features enable complex conditional logic and adaptive processing workflows.
Flow Control Overview
The Problem
Linear pipeline execution is often insufficient for:
- Conditional processing based on content analysis
- Error handling and retry logic
- Multi-step reasoning that requires iteration
- Early termination when tasks are complete
The Solution
TPipe provides flow control mechanisms:
- Pipe Jumping: Skip to specific pipes or jump forward/backward
- Pipe Repeating: Execute the same pipe multiple times
- Pipeline Termination: Exit the pipeline early (success or failure)
- Reasoning Skip: Discard reasoning pipe output so the parent pipe runs without it
All flow control is managed through the MultimodalContent object that flows between pipes.
Pipe Jumping
Basic Pipe Jumping
// Inside a pipe's processing logic or validation function
content.jumpToPipe("error-handler") // Jump to named pipe
content.jumpToPipe("quality-check") // Jump to specific processing stage
How it works: The pipeline checks for jump instructions after each pipe execution and redirects to the specified pipe instead of continuing sequentially.
Conditional Jumping Example
val analysisPipe = BedrockPipe()
.setPipeName("content-analyzer")
.setValidatorFunction { content ->
val analysisResult = analyzeContent(content.text)
when {
analysisResult.hasErrors -> {
content.jumpToPipe("error-handler")
false // Validation failed, trigger jump
}
analysisResult.needsEnhancement -> {
content.jumpToPipe("enhancement-stage")
false // Jump to enhancement
}
else -> true // Continue normal flow
}
}
val errorHandler = BedrockPipe()
.setPipeName("error-handler")
.setSystemPrompt("Fix the identified errors in the content.")
val enhancer = BedrockPipe()
.setPipeName("enhancement-stage")
.setSystemPrompt("Enhance the content based on analysis results.")
pipeline.add(analysisPipe)
.add(errorHandler)
.add(enhancer)
Pipe Repeating
Basic Pipe Repeating
// Inside a pipe's processing logic
content.repeat() // Execute this pipe again with the current output
How it works: After pipe execution, the pipeline checks the repeatPipe flag and re-executes the same pipe with the updated content until the flag is cleared.
Iterative Refinement Example
val refinementPipe = BedrockPipe()
.setPipeName("iterative-refiner")
.setSystemPrompt("""
Refine the provided content. If the content quality score is below 8/10,
improve it and indicate that another iteration is needed.
If quality is 8/10 or higher, finalize the content.
""")
.setValidatorFunction { content ->
val qualityScore = assessQuality(content.text)
if (qualityScore < 8.0 && getIterationCount(content) < 5) {
content.repeat() // Try again with improved content
incrementIterationCount(content)
}
true
}
Custom Reasoning Loop
val reasoningPipe = BedrockPipe()
.setPipeName("reasoning-loop")
.setSystemPrompt("""
Think through this problem step by step.
If you need more thinking time, indicate "CONTINUE_REASONING".
If you have a final answer, provide it clearly.
""")
.setValidatorFunction { content ->
if (content.text.contains("CONTINUE_REASONING") && getReasoningSteps(content) < 10) {
content.repeat() // Continue reasoning
incrementReasoningSteps(content)
}
true
}
Pipeline Termination
Terminate vs Pass Pipeline
TPipe provides two ways to exit a pipeline early with different semantics:
Pass Pipeline (Success Exit)
content.passPipeline = true // Exit pipeline early as successful completion
What this does:
- Stops pipeline execution immediately
- Preserves current content as the final result
- Treats the early exit as successful completion
- Returns content to caller as valid result
Use cases: Task completed early, no further processing needed, successful shortcut.
Terminate Pipeline (Error Exit)
content.terminate() // Exit pipeline due to error or failure
What this does:
- Stops pipeline execution immediately
- Clears content (sets to empty)
- Treats the exit as an error/failure condition
- Returns empty content to indicate failure
Use cases: Unrecoverable errors, invalid input detected, processing cannot continue
Termination Examples
Successful Early Completion
val taskCompletionChecker = BedrockPipe()
.setPipeName("completion-checker")
.setSystemPrompt("Determine if the task is complete or needs further processing.")
.setValidatorFunction { content ->
val isComplete = checkTaskCompletion(content.text)
if (isComplete) {
content.passPipeline = true // Task done successfully, exit with current result
}
true
}
Error Exit
val inputValidator = BedrockPipe()
.setPipeName("input-validator")
.setValidatorFunction { content ->
if (isInvalidInput(content.text)) {
content.terminate() // Invalid input, exit as error
return false
}
true
}
Automatic Termination
// Pipeline automatically terminates if content becomes empty
if (content.isEmpty()) {
// Pipeline terminates automatically (treated as failure)
}
// Check termination status
if (content.shouldTerminate()) {
// Pipeline will terminate after this pipe
// This checks both terminate() and empty content conditions
}
Reasoning Skip
Skipping Reasoning Injection
// Inside a transformation or validation function
content.skipReasoning() // Reasoning output will not be injected
How it works: When skipReasoningPipe is set to true on the content object, the reasoning pipe’s output is discarded before injection. The parent pipe’s main LLM call proceeds as if no reasoning pipe was attached.
Conditional Reasoning Example
val analysisPipe = BedrockPipe()
.setPipeName("conditional-analyzer")
.setReasoningPipe(reasoningPipe)
.enableSemanticCompression()
.setTransformationFunction { content ->
// Skip decompression reasoning when compression didn't fire
if (content.metadata["semanticCompressionApplied"] != true) {
content.skipReasoning()
}
content
}
Use cases: Conditional reasoning based on runtime state, skipping decompression reasoning when semantic compression wasn’t needed, cost control for simple inputs.
💡 Tip:
skipReasoning()prevents injection, not execution. To skip the entire pipe including reasoning, usesetPreInvokeFunctioninstead. For full details see Skipping Reasoning Pipes.
Complex Flow Control Patterns
Multi-Stage Validation with Retry
val processor = BedrockPipe()
.setPipeName("main-processor")
.setValidatorFunction { content ->
val validationResult = validateOutput(content.text)
when {
validationResult.isValid -> true // Continue to next pipe
validationResult.canRetry && getRetryCount(content) < 3 -> {
content.repeat() // Try processing again
incrementRetryCount(content)
false
}
validationResult.isCriticalError -> {
content.terminate() // Unrecoverable error, fail pipeline
false
}
else -> {
content.jumpToPipe("fallback-processor") // Jump to fallback
false
}
}
}
val fallbackProcessor = BedrockPipe()
.setPipeName("fallback-processor")
.setSystemPrompt("Process using alternative approach.")
Conditional Pipeline Routing
val routingPipe = BedrockPipe()
.setPipeName("content-router")
.setSystemPrompt("Analyze content type and determine processing path.")
.setValidatorFunction { content ->
val contentType = determineContentType(content.text)
when (contentType) {
ContentType.TECHNICAL -> content.jumpToPipe("technical-processor")
ContentType.CREATIVE -> content.jumpToPipe("creative-processor")
ContentType.ANALYTICAL -> content.jumpToPipe("analytical-processor")
ContentType.COMPLETE -> {
content.passPipeline = true // Already complete, successful exit
}
ContentType.INVALID -> {
content.terminate() // Invalid content, fail pipeline
}
}
false // Always jump, terminate, or fail - never continue sequentially
}
Quality Assurance Loop
val generator = BedrockPipe()
.setPipeName("content-generator")
val qualityChecker = BedrockPipe()
.setPipeName("quality-checker")
.setValidatorFunction { content ->
val quality = assessQuality(content.text)
val attempts = getAttemptCount(content)
when {
quality.isAcceptable -> true // Continue to next stage
attempts < 3 -> {
content.jumpToPipe("content-generator") // Jump back to regenerate
incrementAttemptCount(content)
false
}
quality.isUnrecoverable -> {
content.terminate() // Cannot be fixed, fail pipeline
false
}
else -> {
content.jumpToPipe("manual-review") // Escalate to manual review
false
}
}
}
val manualReview = BedrockPipe()
.setPipeName("manual-review")
Flow Control Best Practices
1. Prevent Infinite Loops
// Always include iteration limits
val maxIterations = 5
var currentIteration = getIterationCount(content)
if (needsRepeat && currentIteration < maxIterations) {
content.repeat()
incrementIterationCount(content)
} else if (currentIteration >= maxIterations) {
content.terminate() // Too many attempts, fail
}
2. Clear Jump Targets
// Use descriptive pipe names for jumping
pipe.setPipeName("error-recovery-stage")
pipe.setPipeName("quality-enhancement")
pipe.setPipeName("fallback-processor")
// Jump to clear, descriptive targets
content.jumpToPipe("error-recovery-stage")
3. Meaningful Final Content
// Before successful early exit, ensure content is meaningful
content.text = "Analysis complete: ${results.summary}"
content.passPipeline = true
// Error exits will clear content automatically
content.terminate() // Content becomes empty
Monitoring Flow Control
Tracing Flow Changes
pipeline.enableTracing() // Captures all jumps, repeats, and terminations
// After execution
val traceReport = pipeline.getTraceReport()
// Shows complete execution path including flow control decisions
val failureAnalysis = pipeline.getFailureAnalysis()
// Provides details on any failures or early exits
Advanced flow control enables sophisticated pipeline behaviors that adapt dynamically to content and processing requirements, providing both successful shortcuts and graceful failure handling for complex AI workflows.
Next Steps
Now that you understand dynamic pipeline control, learn about monitoring and troubleshooting:
→ Error Handling and Propagation - Programmatic error capture and debugging