how-to ~53 min
PumpStation

- [What PumpStation Is](#what-pumpstation-is)

πŸ’‘ Tip: PumpStation is TPipe’s runtime agentic harness. It drives a judge β†’ dispatch β†’ path β†’ memory cycle over multiple turns, routes through named PathObject units, and applies memory management, lorebook, DITL hooks, and a per-turn KillSwitch to keep the loop bounded and observable.

Table of Contents

What PumpStation Is

PumpStation is a runtime agentic harness β€” a class that drives an LLM-powered loop over multiple turns. Each turn runs a judge, a dispatch, and a path; between turns the harness updates memory, fires background agents, and (optionally) compacts the conversation history.

The contract surface is wide but the build surface is narrow. PumpStation auto-injects system prompts when the developer does not supply them, reads MultimodalContent flags for loop control, and treats PathObject as the only thing the dispatch agent can call. There are no mandatory JSON contracts the developer must remember to inject β€” the harness surfaces the prompts for the agents it owns (judge, dispatch, goal, path-safety, health, lorebook, summary) and auto-injects the path descriptor protocol into the dispatch pipe.

PumpStation implements P2PInterface. A pump station can sit inside another pump station as a path’s internal agent, and a path can be exposed to the wider P2P registry for distributed dispatch.

When to Use It

Use PumpStation when:

  • The task is non-trivial and may require multiple LLM calls to complete
  • You want an LLM to decide which step to take next, rather than a fixed state machine
  • You want the harness to manage context overflow, memory compaction, and lorebook selection for you
  • You want a unified kill switch and tracing surface across the agent loop
  • You need path-safety validation or goal-validation (Ralph-loop) semantics

Don’t use it when:

  • A single LLM call is enough β€” use a plain Pipeline or Pipe
  • The control flow is fully deterministic β€” use a Manifold (state machine) or Junction (workflow recipes)
  • The agents live on different machines and need registry discovery β€” use DistributionGrid

Architecture at a Glance

The runtime has two scopes. The outer loop (runHarnessLoop) drives turns via while (turnIndex < maxTurns), calling the inner cycle (runTurn) each iteration, then runs finalization once. The inner cycle runs the per-turn phases (health, judge, dispatch, path, foreground agents, background agents, memory update, compaction) and returns Continue or Halt(reason). A separate transition phase, runExitFlow, runs goal validation when the judge says the task is complete or a path signals pass.

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  runHarnessLoop (outer)                                                β”‚
β”‚                                                                        β”‚
β”‚  runPreInitPhase(content)                  ← ONCE                      β”‚
β”‚  while (turnIndex < maxTurns) {                                       β”‚
β”‚      runTurn()                            ← inner cycle               β”‚
β”‚          healthCheck β†’ judge β†’ dispatch β†’ path β†’ fg agents             β”‚
β”‚            β†’ bg agents β†’ memory update β†’ compaction                    β”‚
β”‚          return TurnResult.Continue | Halt(reason)                     β”‚
β”‚  }                                                                     β”‚
β”‚  runFinalizationPhase()                    ← ONCE, returns content    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

The path flow itself:

input β†’ preInit β†’ JUDGE β†’ DISPATCH β†’ PathObject β†’ postGenerate β†’ pathValidation
         ↑                                                              ↓
         ←←←←←←←←←←←←←←←← loop ←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←←
                       background:
                       lorebookAgent (mutex-queued)
                       summaryAgent  (mutex-queued)

Per-turn, the harness (in order):

  1. Refreshes agent instances, system prompts, and propagated settings
  2. Runs the optional healthAgent if interval or error-ratio is exceeded
  3. Runs the judgeAgent (or skips it in FlagTriggered mode without a flag)
  4. If the judge says complete or terminates, transitions to runExitFlow
  5. Runs the dispatchAgent and parses a PathRequest
  6. Repairs malformed dispatch output up to failurePolicy.maxDispatchRepairAttempts times
  7. Resolves the path by name (case-insensitive across pathList and reservePaths)
  8. Enforces loop guards (consecutive same path, total calls per path)
  9. For medium/high risk paths, runs the path-safety gate
  10. Emits PathSelected/PathSafetyStarted/PathStarted events
  11. Calls PathObject.execute (PCP function, execution function, internal agent, or agent builder β€” in priority order)
  12. Reads the path’s token usage; folds it into the station accumulator; runs the per-path kill switch
  13. Runs the pathValidationFunction DITL hook
  14. Runs the pathTransformationFunction DITL hook
  15. Appends the transformed result to turnHistory and rawTurnHistory
  16. Records the result in taskState.lastPathResult and taskState.latestContent
  17. Checks passPipeline / terminatePipeline flags on the path’s result
  18. Prunes turn history if it exceeds maxTurnHistorySize
  19. Runs the foreground (Blocking) harness agent slots
  20. Queues background (Async) harness agent slots on the station-scoped asyncScope (coroutine scope owned by the harness; cancelled by cancelAsyncJobs at finalization)
  21. Runs the memory update phase (queues lorebook + summary if interval hit or fill pressure)
  22. Runs the compaction phase
  23. Returns Continue

On the next turn, the judge phase starts by calling drainPendingAsyncResults(), which merges any PendingTurnEntry items produced by async paths or async harness agents during the previous turn into turnHistory in monotonic seq order, then emits one AsyncTurnAppended event per merged entry. The finalization phase performs a second drain immediately before cancelling asyncScope.

Context blowout detection runs at every phase boundary. The kill switch check runs after each judge, dispatch, and path phase.

Agent Contracts

PumpStation owns up to seven distinct agents. Each one has a contract β€” what it receives as input, what it must output, and how the harness interprets its output. The contracts are documented individually below; the consolidated reference (with field-level JSON schemas) is in PumpStation Magic Contracts.

All seven agent slots are optional. A pump station with no judge, no path-safety agent, and no goal agent still runs β€” it relies on passPipeline / terminatePipeline flags on path results to exit. The harness emits a HarnessWarning event with code = NoExitSignalConfigured when none of the legitimate exit mechanisms are wired (see Pre-Init Advisory below).

The Judge Agent Contract

Type: Pipeline? Location: PumpStation.judgeAgent / setJudgeAgent(...) / PumpStationBuilder.judgeAgent Default prompt: DEFAULT_JUDGE_PROMPT in Pipeline/PumpStationDefaults.kt

What the judge receives:

A MultimodalContent built by buildTurnContent() (see Pipeline/PumpStationHelpers.kt:677). The text carries turnSummary (if non-blank) prefixed to a phase-specific question, currently "Is the task complete? Decide based on the conversation history." for the Judge phase. The context.converseHistory is the curated turnHistory. The miniBankContext is the current miniBank. The metadata map contains taskState, phase, turnIndex, runId, isInitialTurn, and visiblePaths.

What the judge must output:

By default, JSON matching the schema documented in DEFAULT_JUDGE_PROMPT:

{
  "isComplete": boolean,
  "shouldTerminate": boolean,
  "reason": string
}

isComplete: true transitions the harness into the goal-validation exit flow. shouldTerminate: true halts the harness immediately with PumpStationExitReason.TerminateSignal β€” there is no goal gate on this path.

The harness parses the JSON via parseJudgeVerdict (Pipeline/PumpStationHelpers.kt:476). On parse failure, an empty verdict is returned (treated as isComplete: false).

Flag-based override (no JSON):

The judge can drive the loop entirely via MultimodalContent flags. Set setJudgeJsonContractEnabled(false) (or judgeJsonContractEnabled = false in the DSL). With the JSON contract disabled, the parser is skipped and the verdict comes solely from the flag check:

  • terminatePipeline = true β†’ shouldHalt = true, exit with TerminateSignal
  • passPipeline = true β†’ isComplete = true, enter goal validation
  • neither set β†’ continue

JudgeVerdict is a data class defined in Pipeline/PumpStationModels.kt:

data class JudgeVerdict(
    val isComplete: Boolean = false,
    val shouldTerminate: Boolean = false,
    val shouldHalt: Boolean = false,
    val reason: PumpStationExitReason? = null
) {
    companion object {
        fun empty(): JudgeVerdict = JudgeVerdict()
    }
}

Judge run mode (FlagTriggered):

Set setJudgeRunMode(PumpStationJudgeRunMode.FlagTriggered). The judge is skipped on every turn except turns where requestJudgeNextTurn() was called. The flag is one-shot β€” the judge clears it after consuming. A path’s setExecutionFunction typically calls station.requestJudgeNextTurn() when it believes the task is done, deferring the cost of judge LLM calls to the turns that actually need them.

In FlagTriggered mode, maxTurns is the only safety net if the dispatch agent never signals. Set it conservatively.

The Dispatch Agent Contract

Type: Pipeline? (required β€” P2PInitInternal throws if dispatchAgent is not a Pipeline) Location: PumpStation.dispatchAgent / setDispatchAgent(...) / PumpStationBuilder.dispatchAgent Default prompt: DEFAULT_DISPATCH_PROMPT in Pipeline/PumpStationDefaults.kt

What the dispatch agent receives:

A MultimodalContent built by buildTurnContent() (same builder as the judge). The text is turnSummary + "Select the next path to invoke.". The dispatch pipe additionally has enableHarnessMode() called on it, which auto-injects the path descriptor protocol into its system prompt so the visible paths and their schemas are presented to the model.

What the dispatch agent must output:

A PathRequest JSON object:

{
  "pathName": "the exact path name from the visible list",
  "pathSchema": "free-form input schema string for the path"
}

pathName is matched case-insensitively against pathList and reservePaths. An empty or blank pathName ends the turn without calling a path (the loop continues). pathSchema is passed to the path as the input β€” see The Path Execution Contract.

PathRequest is a data class in Pipeline/PumpStationModels.kt:

@Serializable
data class PathRequest(
    var pathName: String = "",
    var pathSchema: String = ""
)

Repair on parse failure:

If the harness cannot parse the dispatch output as PathRequest, it invokes buildRepairPrompt (up to failurePolicy.maxDispatchRepairAttempts times) to ask the model to fix its malformed JSON. After the repair budget is exhausted:

  • If failurePolicy.stopHarnessOnInvalidPathRequest is true, the harness records lastError = DispatchJsonRepairFailed and the finalization phase emits a HarnessFailed event.
  • Otherwise the turn continues without a path call.

The error message the harness injects for an unknown path or invalid path request is built by buildLlmErrorMessage (Pipeline/PumpStationHelpers.kt:743). It is natural-language, names the available paths, and explains what a valid PathRequest looks like β€” the dispatch agent sees this on the next turn as part of turnHistory.

The Path Execution Contract

Type: PathObject β€” see PathObject and PathRequest Location: PumpStation.pathList / addPath(...) / PumpStationBuilder.path("name") { }

A path receives a MultimodalContent and returns a MultimodalContent. There is no required JSON contract on the result. The path’s content goes through the dispatch path and the loop sees it as the path’s output. The harness reads the result’s flags:

  • passPipeline = true β†’ enter goal validation (or exit with PassSignal if no goal agent is configured)
  • terminatePipeline = true β†’ halt with TerminateSignal directly, no goal gate
  • neither set β†’ result is appended to turnHistory, loop continues

PathObject is defined in Pipeline/PumpStation.kt:225. The path must have at least one execution mechanism configured (see PathObject and PathRequest).

The Goal Agent Contract

Type: P2PInterface? Location: PumpStation.goalAgent / setGoalAgent(...) / PumpStationBuilder.goalAgent Default prompt: DEFAULT_GOAL_PROMPT in Pipeline/PumpStationDefaults.kt

What the goal agent receives:

A MultimodalContent built by buildGoalContent(). This is the same shape as the judge/dispatch content but the context.converseHistory is overridden with rawTurnHistory (the full event log, not the curated turn history), and metadata includes judgeVerdict and rawHistorySize. The text is turnSummary + "Verify the work was done.". The full event log is included so the goal agent can do thorough deep verification.

What the goal agent must output:

By default, prose text. The goal agent’s pass/fail signal is read from the terminatePipeline flag on the result:

  • terminatePipeline = false (default) β†’ goal passed β†’ exit with JudgeComplete
  • terminatePipeline = true β†’ goal failed β†’ append the result to turnHistory, increment taskState.goalFailCount, and continue the loop (subject to maxGoalFailAttempts)

There is no JSON contract on the goal agent. The goal’s text is treated as critique feedback when the goal fails; the next turn’s judge and dispatch see it in the history.

If maxGoalFailAttempts is exceeded, the harness halts with PumpStationExitReason.GoalValidationFailed.

The Health Agent Contract

Type: P2PInterface? Location: PumpStation.healthAgent / setHealthAgent(...) / PumpStationBuilder.healthAgent Default prompt: DEFAULT_HEALTH_PROMPT in Pipeline/PumpStationDefaults.kt

The health agent fires before the judge on a turn if either healthAgentTurnInterval turns have passed since the last health check, or the running error ratio exceeds healthAgentErrorRatioThreshold. It is proactive (not reactive like interventionAgent).

What the health agent receives:

A MultimodalContent whose text is the JSON-serialized HealthContext:

@Serializable
data class HealthContext(
    val runId: String,
    val turnIndex: Int,
    val harnessStatus: PumpStationStatus,
    val lastError: String?,
    val consecutivePathCount: Int,
    val lastSelectedPathName: String?,
    val pathCallCounts: Map<String, Int>,
    val visiblePathNames: List<String>,
    val reservePathNames: List<String>,
    val contextFillPercent: Double,
    val turnHistorySummary: List<String>,
    val recentErrors: List<String>
)

What the health agent must output:

A HealthReport JSON object:

@Serializable
data class HealthReport(
    val status: HealthStatus = HealthStatus.Unknown,
    val warnings: List<String> = emptyList(),
    val suggestions: List<String> = emptyList(),
    val metrics: Map<String, Double> = emptyMap(),
    val suggestedNextPath: String? = null,
    val terminateHarness: Boolean = false
)

HealthStatus is the enum Healthy, Degraded, Critical, or Unknown. terminateHarness: true causes the harness to set taskState.latestContent.terminatePipeline = true, which propagates to the next phase boundary and halts the loop.

On parse failure, the harness falls back to HealthReport() (all defaults, Unknown).

The Path-Safety Agent Contract

Type: P2PInterface? Location: PumpStation.pathSafetyAgent / setPathSafetyAgent(...) / PumpStationBuilder.pathSafetyAgent Default prompt: DEFAULT_PATH_SAFETY_PROMPT in Pipeline/PumpStationDefaults.kt

The path-safety agent runs only for paths with PathRiskLevel.Medium or PathRiskLevel.High. It sits between PathSelected and PathStarted and gates whether the path actually runs.

What the path-safety agent receives:

The path’s input MultimodalContent. The path’s name, description, schema, and risk level are surfaced in the conversation history.

What the path-safety agent must output:

A PathSafetyVerdict JSON object:

{
  "safe": boolean,
  "reason": string
}

The parser is strict on the safe field β€” it must be a JSON boolean literal (true or false). Strings like "true", numbers, and null are rejected; the harness falls back to the flag-based check on the result. To use the strict JSON contract, do nothing; to disable it and rely on MultimodalContent flags only, set pathSafetyJsonContractEnabled = false (or setPathSafetyJsonContractEnabled(false)).

Flag-based fallback (when JSON parse fails or the contract is disabled):

  • terminatePipeline = true β†’ reject the path, return the original input unchanged
  • passPipeline = true β†’ approve the path, run it normally
  • neither set β†’ reject the path (defensive default)

When a path is rejected by safety, the harness does not call the path. The original input flows through as the turn’s result so the loop can continue.

The Lorebook Agent Contract

Type: P2PInterface? Location: PumpStation.lorebookAgent / setLorebookAgent(...) / PumpStationBuilder.lorebookAgent Default prompt: DEFAULT_LOREBOOK_PROMPT in Pipeline/PumpStationDefaults.kt

The lorebook agent updates contextWindow.loreBookKeys with new entries, updates, and removals based on recent turns.

What the lorebook agent receives:

A MultimodalContent whose text is the JSON-serialized LorebookAgentInput:

@Serializable
data class LorebookAgentInput(
    val turnsSinceLastUpdate: List<ConverseData>,
    val lastLorebookUpdateTurnIndex: Int,
    val currentLorebook: List<LoreBook>,
    val taskContext: LorebookTaskContext,
    val harnessGeneration: Long
)

What the lorebook agent must output:

A LorebookAgentOutput JSON object:

@Serializable
data class LorebookAgentOutput(
    val compactedThroughTurn: Int,
    val updates: List<LorebookUpdate>,
    val deletions: List<String>
)

Each LorebookUpdate carries key, value, weight, linkedKeys, aliasKeys, requiredKeys, and an operation (either LorebookOperation.Merge or LorebookOperation.Replace). The cursor is advanced to compactedThroughTurn on success; updates whose compactedThroughTurn is not strictly greater than the current cursor are discarded silently (pre-emption detection).

If the agent returns a LoreBook JSON object (or an array of them) without the typed envelope, the harness falls back to the legacy free-form path (applyLorebookUpdates).

The lorebook agent runs under lorebookMutex so concurrent turns queue their updates in chronological order.

The Summary Agent Contract

Type: P2PInterface? Location: PumpStation.summaryAgent / setSummaryAgent(...) / PumpStationBuilder.summaryAgent

The summary agent produces the turnSummary string injected at the top of the judge and dispatch user messages on subsequent turns.

What the summary agent receives:

taskState.latestContent (the most recent content object, typically the path’s output).

What the summary agent must output:

A MultimodalContent whose text becomes the new turnSummary. The harness replaces the previous summary wholesale. There is no JSON contract β€” the agent is expected to produce prose.

The summary agent runs under summaryMutex so concurrent turns queue their updates in chronological order.

If the agent signals terminatePipeline = true, the harness records lastError = P2PRequestInvalid and skips the summary update. If the agent signals passPipeline = true, the harness skips the update but does not error.

PathObject and PathRequest

PathObject is the harness’s atom. A pump station owns a set of PathObject instances (in pathList and reservePaths), and the dispatch agent chooses one to invoke by name. The class lives in Pipeline/PumpStation.kt:225.

PathObject Construction

val path = PathObject().apply {
    pathName = "researcher"
    pathDescription = "Performs a single research query and returns the result."
    pathSchema = "{\"query\": \"the question to research\"}"
    riskLevel = PathRiskLevel.Medium
    pcpSchema = PcpContext()  // optional
    setExecutionFunction { content, station, turnHistory, turnSummary ->
        // ... do work ...
        MultimodalContent(text = "research result", passPipeline = false)
    }
}

A path is only valid when it has at least one execution mechanism. The priorities at execution time are:

  1. PCP function β€” if pcpSchema.tpipeOptions is non-empty and the dispatch content carries a functionName, the path delegates to PcpExecutionDispatcher
  2. Execution function β€” setExecutionFunction { ... } is called directly with (content, station, turnHistory, turnSummary)
  3. Internal agent β€” setInternalAgent(agent) is called via agent.executeLocal(content). Can be any P2PInterface including another PumpStation
  4. Agent builder function β€” setAgentBuilderFunction { _ -> ... } is called per invocation, producing a fresh agent. Useful for clean-slate stateless runs

If none of these are configured, PathObject.init() throws IllegalStateException at harness startup.

PathDescriptionData

When the harness boots, every path runs PathObject.init(). The result is captured in a PathDescriptionData and stored in the path descriptor list. This is what the dispatch agent’s system prompt sees:

@Serializable
data class PathDescriptionData(
    val name: String,
    val description: String,
    val inputSchema: String,
    val pcpSchema: PcpContext?,
    val hasInternalAgent: Boolean,
    val hasExecutionFunction: Boolean,
    val isRunsInBackground: Boolean,
    val agentTypeName: String? = null
)

PathDescriptionList is a wrapper that serializes the full set of visible paths to JSON for prompt injection:

@Serializable
data class PathDescriptionList(
    var paths: MutableList<PathDescriptionData> = mutableListOf()
)

Binding a Function as a PCP Tool

PathObject can bind a Kotlin function so the dispatch agent sees it as a callable PCP tool. The pattern mirrors Pipe.bindNativeFunction from PipeContextProtocol/PcpFunctionExtensions.kt:

import com.TTT.Pipeline.bindFunction

val path = PathObject().apply {
    pathName = "code_review"
    pathDescription = "Process a code review for a given repository."
}
path.bindFunction("processCodeReview", ::processCodeReview)

The extension function PathObject.bindFunction(name, function):

  1. Calls FunctionRegistry.registerFunction(name, function) to create a FunctionSignature via reflection
  2. Builds TPipeContextOptions from the signature (LLM-readable parameter schema: type, description, enumValues, isRequired)
  3. Initializes pcpSchema if null and appends the new tpipeOption to it

The result is additive β€” multiple bindFunction calls accumulate options in pcpSchema.tpipeOptions. The schema is serialized into the dispatch agent’s prompt context at injection time.

The path is then resolved at execution time by PathObject.execute(content, ...) which inspects content.tools.tPipeContextOptions.functionName and dispatches via PcpExecutionDispatcher.

DSL Builder

If you want the pump station assembled, validated, and initialized in one place, prefer the Kotlin DSL:

import com.TTT.Pipeline.PumpStation
import com.TTT.Pipeline.pumpStation
import com.TTT.Pipeline.PumpStationMemoryManagementMode
import com.TTT.Pipe.MultimodalContent

val station = pumpStation("research-station") {
    personality = "You are a meticulous research analyst."
    systemTask = "Complete the user's research request to the highest standard."
    userGuidelines = "Cite every claim. Prefer primary sources."
    entryUserPrompt = "Research the history of distributed consensus algorithms."

    judgeAgent = Pipeline().apply {
        pipelineName = "judge"
        add(OpenRouterPipe().apply {
            setModel("openai/gpt-4o-mini")
            setApiKey(System.getenv("OPENROUTER_API_KEY") ?: "")
        })
    }

    dispatchAgent = Pipeline().apply {
        pipelineName = "dispatcher"
        add(OpenRouterPipe().apply {
            setModel("openai/gpt-4o-mini")
            setApiKey(System.getenv("OPENROUTER_API_KEY") ?: "")
        })
    }

    path("research") {
        description = "Performs a single research query."
        risk = PathRiskLevel.Low
        schema = "{\"query\": \"the question to research\"}"
        setExecutionFunction { content, station, history, summary ->
            // The harness fills content.text with the request schema
            // ... do work ...
            MultimodalContent(text = "research result").apply {
                passPipeline = true
            }
        }
    }

    setMaxHarnessTurns(10)
    setConcurrencyMode(PumpStationConcurrencyMode.Async)
    setMemoryManagementMode(PumpStationMemoryManagementMode.Compaction)
    setCompactionThreshold(0.8)
    setCompactionStrategy(PumpStationCompactionStrategy.Whole)
    setKillSwitch(KillSwitch(inputTokenLimit = 50_000, outputTokenLimit = 10_000))
}

The DSL returns a fully initialized PumpStation ready for executeLocal. The DSL stages are:

StageDescription
InitialNothing configured yet. pumpStation("name") { ... } enters this stage.
HasPathsAt least one path { } has been called. Required before build().
ReadyAll required and optional configuration is complete. build() is called.

The stage transitions are tracked by the PumpStationStage sealed class in Pipeline/PumpStationDsl.kt:31. The compiler enforces the β€œat least one path” requirement via the builder’s generic type parameter β€” build() is only available on PumpStationBuilder<PumpStationStage.HasPaths>.

DSL Block Reference

The pumpStation { } builder supports these top-level blocks and setters.

Top-Level Configuration

Property / BlockTypeDefaultDescription
nameStringβ€”Unique name for the station. Required.
personalityString""Injected into judge and dispatch prompts ahead of all other instructions.
systemTaskString""”System prompt” layer, second-priority.
userGuidelinesString""Third-priority guidelines. Equivalent to β€œskills” in other harnesses.
entryUserPromptString""Core task, fourth-priority. Set programmatically or via the input content.
judgeRunModePumpStationJudgeRunModeAlwaysAlways (judge every turn) or FlagTriggered (judge only when requestJudgeNextTurn() is set).
maxHarnessTurnsInt50Safety cap on loop iterations.
maxTurnsInt50Canonical setter. maxHarnessTurns is a delegating alias.
maxGoalFailAttemptsInt3Max consecutive goal-validation failures before GoalValidationFailed exit.
maxRawTurnHistorySizeInt?nullCap on rawTurnHistory.history; null disables.
blowoutThresholdDouble0.9Context-window fill ratio that triggers blowout detection.
memoryUpdateTimeoutMsLong30_000LTimeout in ms for memory phase to await in-flight background jobs.
maxBlowoutRecoveriesInt3Max blowout recovery attempts before forced halt.
maxRepairPromptTokensInt500Token cap for repair/regeneration prompts.
stopHarnessOnInvalidPathRequestBooleanfalseThrow on parse failure instead of skipping the turn.
failurePolicyPumpStationFailurePolicydefaultsSee Failure Policy.
concurrencyModePumpStationConcurrencyMode?nullDefault for harness agents; falls back to Async.
maxConcurrentBackgroundAgentsInt3Cap on concurrent background agents.
maxConcurrentForegroundAgentsInt3Cap on concurrent foreground agents (hint to paths).
foregroundTurnIntervalInt0Turns between foreground agent firings; 0 disables.
backgroundTurnIntervalInt5Turns between background agent firings.
memoryManagementModePumpStationMemoryManagementModeCompactionCompaction, Truncation, or Hybrid.
compactionThresholdDouble0.8Context-window fill ratio that triggers compaction.
compactionStrategyPumpStationCompactionStrategyWholeWhole, Chunked, or Hybrid.
maxTurnHistorySizeInt50Cap on turnHistory.history; excess entries are summarized.
judgeJsonContractEnabledBooleantrueWhen false, judge verdict comes from flags only.
pathSafetyJsonContractEnabledBooleantrueWhen false, path-safety verdict comes from flags only.
maxConsecutiveSamePathInt3Loop guard on consecutive same-path dispatch.
maxTotalPathCallsPerPathInt?nullLoop guard on total calls per path; null disables.
pathLimitExceededPolicyPathLimitExceededPolicySkipSkip, Halt, or Continue when the per-path limit is hit.
asyncPathsAppendToTurnHistoryBooleantrueStation-wide default for whether async paths append their result to turnHistory on completion. Per-path opt-out via suppressHistoryEmit on the path block.
asyncAgentsAppendToTurnHistoryBooleanfalseStation-wide default for whether async harness agents append their result to turnHistory on completion. Per-slot opt-in via HarnessAgentSlot.appendsToTurnHistory.
asyncJobGracePeriodMsLong?nullOptional millisecond grace period given to in-flight async coroutines after finalization before cancelAsyncJobs cancels the asyncScope. null (the default) is unbounded. See Async Substrate below.
asyncJobsScopedToStationBooleantrueWhen true, async work runs on the station-scoped asyncScope; when false, async work runs on GlobalScope (the pre-substrate fire-and-forget behavior).

Agent Setters

PropertyTypeDescription
judgeAgentP2PInterface?The judge. Should be a Pipeline.
dispatchAgentP2PInterface?Required. Must be a Pipeline (P2PInit throws otherwise).
interventionAgentP2PInterface?Reactive β€” fires after a path failure or loop guard trip.
healthAgentP2PInterface?Proactive β€” fires before the judge based on interval or error ratio.
healthAgentTurnIntervalInt?Turns between health checks.
healthAgentErrorRatioThresholdDouble?Error-ratio trigger.
healthAgentConcurrencyModePumpStationConcurrencyMode?Blocking (judge waits) or Async.
lorebookAgentP2PInterface?Updates lorebook entries from recent turns.
summaryAgentP2PInterface?Generates turnSummary.
goalAgentP2PInterface?Validates work when judge says complete or path says pass.
preInitAgentP2PInterface?Runs once at startup before the main loop.
pathSafetyAgentP2PInterface?Gates medium/high risk path calls.

Each agent has a matching *BuilderFunction setter for (suspend (PumpStation) -> Pipeline) (or P2PInterface) that produces a fresh agent per turn. When set, the builder function overrides the static agent reference.

path("name") { } Block

Registers a PathObject in pathList. The block exposes:

MemberTypeDescription
descriptionStringHuman-readable description; injected into the dispatch prompt.
riskPathRiskLevelLow, Medium, or High. Triggers path-safety gate at Medium+.
dispatchHintStringSoft guidance surfaced in the dispatch prompt as "Hint: ...".
runsInBackgroundBooleanWhen true, the path runs on the station’s asyncScope instead of the foreground path flow.
suppressHistoryEmitBooleanWhen true (and runsInBackground is also true), the path’s PathCompleted event is still emitted, but the foreground drain skips merging the result into turnHistory.
schemaStringFree-form JSON schema for the path’s input.
pcpSchemaPcpContext?Pre-built PCP context for PCP-bound paths.
pathMetadataMutableMap<Any, Any>Advisory metadata; travels with the path object.
setInternalAgent(agent)(P2PInterface) -> UnitSet the path’s internal agent (any P2PInterface, including another PumpStation).
setExecutionFunction(fn)(... ) -> UnitSet the path’s execution function.
bindFunction(function)(KFunction<*>) -> UnitRegister a Kotlin function as a PCP tool under the function’s own name.
bindFunction(name, function)(String, KFunction<*>) -> UnitRegister a Kotlin function under a custom name.
build()() -> PathObjectBuild and add the path. Called automatically at end of block.

reservePath("name") { } Block

Same surface as path { } but stores the PathObject in reservePaths instead of pathList. The reserve path is not visible to the dispatch agent until its revealWhen predicate evaluates to true. Sticky once revealed.

MemberTypeDescription
revealWhen(predicate)(taskState, externalContext) -> BooleanPredicate evaluated each dispatch turn.

tracing { } Block

Configures TraceConfig for the station:

MethodDescription
enabled(enabled = true)Enable or disable tracing. Default enables.
maxHistory(count)Max trace events retained per trace.
outputFormat(format)TraceFormat.HTML, JSON, MARKDOWN, or CONSOLE.
detailLevel(level)TraceDetailLevel.VERBOSE, DETAILED, STANDARD, etc.
autoExport(enabled, path)Auto-export after each run to a path.
includeContext(include)Include context snapshot on every event.
includeMetadata(include)Include event metadata.
config(configuration)Replace the entire configuration.

killSwitch { } Block

Configures the KillSwitch:

FieldTypeDescription
inputTokenLimitInt?Max input tokens; null disables.
outputTokenLimitInt?Max output tokens; null disables.
onTripped(KillSwitchContext) -> UnitCallback on trip; default throws KillSwitchException.

compaction { } Block

A thin alias for the v3 compaction configuration:

FieldTypeDescription
modePumpStationMemoryManagementModeRead/write accessor for the builder’s memoryManagementMode.
compactionThresholdDoubleRead/write accessor for the builder’s compactionThreshold.
strategyPumpStationCompactionStrategyRead/write accessor for the builder’s compactionStrategy.

pause { } Block

Configures pause phases for external inspection/intervention:

pumpStation("name") {
    pause { phase(PumpStationPausePhase.BeforeJudge) }
    pause { phase(PumpStationPausePhase.BeforeGoalValidation) }
}

The harness suspends at the named phase boundaries until resume() is called.

harnessAgent { } and harnessAgentBuilder { } Blocks

Register additional agents that fire during the foreground or background phase. Each block produces a HarnessAgentSlot:

MemberTypeDescription
agentP2PInterface?The agent instance.
builderFunction(suspend (PumpStation) -> P2PInterface)?Builder producing a fresh agent per turn.
concurrencyPumpStationConcurrencyModeBlocking (foreground) or Async (background).
intervalIntTurns between firings.

The blocking slots fire synchronously during the foreground phase. The async slots are queued and launched on the station’s asyncScope; the result is captured into turnHistory when the slot’s appendsToTurnHistory field is true OR the station-wide asyncAgentsAppendToTurnHistory is true. See Async Substrate below.

memory { } Block

Read/write accessors for memory-management fields. See compaction { } β€” same shape.

pipelineNames { } Block

A scope for declaring named pipe references that can be re-bound in the path block.

dispatcherRules { } Block

Reserved for future use. Currently a placeholder. Path selection today is fully driven by the dispatch agent’s PathRequest output.

Failure Policy

@Serializable
data class PumpStationFailurePolicy(
    var repairInvalidDispatchJson: Boolean = true,
    var maxDispatchRepairAttempts: Int = 1,
    var stashOversizedOutputs: Boolean = true,
    var callInterventionOnPathFailure: Boolean = true,
    var stopHarnessOnInvalidPathRequest: Boolean = false
)
FieldDefaultDescription
repairInvalidDispatchJsontrueRepair malformed dispatch output up to maxDispatchRepairAttempts times.
maxDispatchRepairAttempts1Max repair attempts.
stashOversizedOutputstrueStash oversized path outputs to keep prompt budgets bounded.
callInterventionOnPathFailuretrueInvoke interventionAgent after a path failure.
stopHarnessOnInvalidPathRequestfalseWhen true, set lastError = DispatchJsonRepairFailed after repair budget is exhausted.

Execution Flow: The Two-Scope Loop

PumpStation’s loop has two scopes. Confusing them is the most common documentation error.

Outer loop: runHarnessLoop

runHarnessLoop drives turns via while (turnIndex < maxTurns && status == Running). Each iteration calls runTurn(). After the while loop exits, it runs runFinalizationPhase() once.

internal suspend fun PumpStation.runHarnessLoop(): KillSwitchException? {
    var tripException: KillSwitchException? = null
    while (taskState.turnIndex < maxTurns && taskState.status == PumpStationStatus.Running) {
        if (!checkPauseGuards(PumpStationPausePhase.BeforeJudge)) break
        val result = try { runTurn() }
        catch (e: KillSwitchException) {
            taskState.lastError = PumpStationError.KillSwitchTripped
            taskState.exitReason = PumpStationExitReason.KillSwitchTripped
            tripException = e
            break
        }
        if (result is TurnResult.Halt) {
            taskState.exitReason = result.reason
            break
        }
        taskState.turnIndex++
    }
    if (taskState.turnIndex >= maxTurns && taskState.lastError == null) {
        taskState.lastError = PumpStationError.MaxTurnsExceeded
        taskState.exitReason = PumpStationExitReason.MaxTurnsHit
    }
    return tripException
}

TurnResult is a sealed class in Pipeline/PumpStationModels.kt:844:

sealed class TurnResult {
    object Continue : TurnResult()
    data class Halt(val reason: PumpStationExitReason) : TurnResult()
}

Inner cycle: runTurn

runTurn runs the per-turn phases and returns Continue or Halt(reason). The function lives at Pipeline/PumpStationLoop.kt:1898.

internal suspend fun PumpStation.runTurn(): TurnResult {
    refreshAgentInstances()
    refreshPipelinesPrompts()
    refreshSettingsPropagation()

    runHealthCheckPhase()
    detectAndHandleContextBlowout(PumpStationPhase.HealthCheck)

    val judgeVerdict = runJudgePhase()
    detectAndHandleContextBlowout(PumpStationPhase.Judge)
    if (judgeVerdict.shouldHalt) {
        return TurnResult.Halt(judgeVerdict.reason ?: PumpStationExitReason.TerminateSignal)
    }
    if (judgeVerdict.isComplete) return runExitFlow()

    val pathRequest = runDispatchPhase() ?: return TurnResult.Continue
    detectAndHandleContextBlowout(PumpStationPhase.Dispatch)
    if (pathRequest.pathName.isBlank()) return TurnResult.Continue

    if (!checkPauseGuards(PumpStationPausePhase.BeforePathExecution)) {
        return TurnResult.Halt(PumpStationExitReason.KillSwitchTripped)
    }
    val pathResult = runPathFlow(pathRequest)
    detectAndHandleContextBlowout(PumpStationPhase.PathExecution)

    if (pathResult != null) {
        taskState.latestContent = pathResult
        if (pathResult.passPipeline) {
            return if (goalAgent == null) {
                TurnResult.Halt(PumpStationExitReason.PassSignal)
            } else {
                runExitFlow()
            }
        }
        if (pathResult.terminatePipeline) {
            return TurnResult.Halt(PumpStationExitReason.TerminateSignal)
        }
    }
    pruneTurnHistory()
    pruneRawTurnHistory()

    runForegroundAgentsPhase()
    detectAndHandleContextBlowout(PumpStationPhase.ForegroundAgents)

    runBackgroundAgentsPhase()
    runMemoryUpdatePhase()
    runCompactionPhase()

    return TurnResult.Continue
}

Goal-validation transition: runExitFlow

runExitFlow is the goal-validation phase. It is called by runTurn when the judge says complete OR when a path returns passPipeline: true and a goal agent is configured. With no goal agent, the loop exits with JudgeComplete or PassSignal and the goal gate is skipped.

internal suspend fun PumpStation.runExitFlow(): TurnResult {
    if (!checkPauseGuards(PumpStationPausePhase.BeforeGoalValidation)) {
        return TurnResult.Halt(PumpStationExitReason.KillSwitchTripped)
    }
    if (goalAgent == null) {
        return TurnResult.Halt(PumpStationExitReason.JudgeComplete)
    }

    val agent = goalAgentBuilderFunction?.invoke(this) ?: goalAgent!!
    agent.setParentInterface(this)
    agent.P2PInit()
    refreshPipelinesPrompts()

    emitEventInternal(GoalValidationStarted(...))
    val goalContent = buildGoalContent()
    val result = agent.executeLocal(goalContent)
    val passed = !result.terminatePipeline
    emitEventInternal(GoalValidationCompleted(..., passed = passed, ...))

    if (!passed) {
        turnHistory.add(ConverseData(role = ConverseRole.assistant, content = result))
        taskState.goalFailCount++
        if (taskState.goalFailCount > maxGoalFailAttempts) {
            return TurnResult.Halt(PumpStationExitReason.GoalValidationFailed)
        }
        return TurnResult.Continue
    }

    return TurnResult.Halt(PumpStationExitReason.JudgeComplete)
}

The judge saying isComplete: true is not a terminal signal. It is a transition into goal validation. With no goal agent, the harness exits. With a goal agent, the goal validates β€” pass means deliver, fail means re-loop with the goal’s critique appended to history (up to maxGoalFailAttempts).

Finalization: runFinalizationPhase

After the while loop exits, runFinalizationPhase runs once. It drains any pending async results (merging them into turnHistory with AsyncTurnAppended events), then cancels the station’s asyncScope so async coroutines cannot outlive executeLocal, then drains background events, runs a final compaction if the fill ratio is still above compactionThreshold, emits either HarnessCompleted or HarnessFailed, and returns the harness’s deliverable.

internal suspend fun PumpStation.runFinalizationPhase(): MultimodalContent {
    drainPendingAsyncResults()
    cancelAsyncJobs(asyncJobGracePeriodMs)
    drainBackgroundEventQueue()
    if (contextFillRatio() > compactionThreshold) runCompactionPhase()

    val isFailure = taskState.lastError in listOf(
        PumpStationError.MaxTurnsExceeded,
        PumpStationError.KillSwitchTripped,
        PumpStationError.P2PRequestInvalid,
        PumpStationError.InitNotCalled,
        PumpStationError.CompactionInflated
    )
    if (isFailure) {
        emitEventInternal(HarnessFailed(...))
        taskState.status = PumpStationStatus.Failed
    } else {
        emitEventInternal(HarnessCompleted(...))
        taskState.status = PumpStationStatus.Completed
    }

    if (tracingEnabled && RemoteTraceConfig.dispatchAutomatically) {
        PipeTracer.exportTrace(taskState.runId, TraceFormat.HTML)
    }

    return taskState.lastPathResult ?: (taskState.latestContent ?: MultimodalContent())
}

executeLocal then re-throws the KillSwitchException (if any) so the caller sees the trip, and otherwise returns the finalization’s content.

Pre-Init Advisory

runPreInitPhase emits a HarnessWarning event with code = NoExitSignalConfigured when all of these are true:

  • judgeAgent == null AND judgeAgentBuilderFunction == null
  • judgeRunModeInternal != PumpStationJudgeRunMode.FlagTriggered
  • maxTurnsInternal > 1

The advisory is non-blocking β€” the harness continues. The mechanisms field of the HarnessWarning lists the four legitimate exit mechanisms (JudgeAlways, JudgeFlagTriggered, PathPassPipeline, PathTerminatePipeline).

Phase Reference

PumpStationPhaseWhenWhat runs
PreInitOnce at startpreInitAgent, preInitFunction, then HarnessStarted event
HealthCheckTop of each turn (if interval or error ratio hit)healthAgent
JudgeTop of each turn (or skipped in FlagTriggered)judgeAgent
DispatchAfter judge, if not complete and not haltdispatchAgent
PathSafetyFor medium/high risk pathspathSafetyAgent (or pathSafetyFunction)
PathExecutionAfter dispatch + safetyPathObject.execute
PathValidationAfter path returnspathValidationFunction
InterventionAfter path failure or loop guard tripinterventionAgent
ForegroundAgentsAfter path execution (if interval hit)Blocking harness agent slots
MemoryUpdateAfter foreground (if interval hit or pressure)lorebookAgent, summaryAgent
CompactionAfter memory update (if fill ratio hit)summaryAgent (compaction strategy)
GoalValidationInside runExitFlowgoalAgent
ExitFinal event (HarnessCompleted or HarnessFailed)none

PumpStationPhase is an enum in Pipeline/PumpStationModels.kt:37. Each PumpStationEvent carries the phase that produced it; the trace funnel maps it to a more general TracePhase (see Pipeline/PumpStationHelpers.kt:435).

Memory Management

PumpStation supports three memory management modes:

ModeBehavior
CompactionDefault. When contextFillRatio() > compactionThreshold, the summary agent condenses turnHistory into a new summary.
TruncationLeverages TPipe’s TokenBudget and lorebook selection. Truncates context and selects lorebook entries to fit.
HybridBoth β€” deployed intelligently when context suddenly explodes before background agents can catch up.

PumpStationMemoryManagementMode is in Pipeline/PumpStation.kt:70. When the harness is configured with a lorebook or summary agent, P2PInitInternal auto-promotes Compaction β†’ Hybrid if the developer did not explicitly set the mode.

Compaction Strategies

PumpStationCompactionStrategy (Pipeline/PumpStation.kt:92):

StrategyBehavior
WholeEntire turnHistory is passed to the summary agent. Requires it to fit in the agent’s context window. On overflow, branchFailureFunction is called.
ChunkedHistory is split into token-bounded chunks. Each chunk is summarized (sequentially with running summary, or in parallel with bounded concurrency), then folded. Survives very large contexts.
HybridDynamically chooses between Whole and Chunked based on headroom.

ChunkFanoutMode (Pipeline/PumpStationV3Models.kt:45):

ModeBehavior
SequentialDefault. Each chunk’s summary carries forward the previous chunk’s running summary. Causal order preserved.
ParallelChunks summarized concurrently with bounded concurrency (maxParallelChunks). All chunks folded by a second summary call. Cancellation through coroutineScope propagates on kill-switch trip.

Compaction Result Sealed Type

CompactionResult (Pipeline/PumpStationV3Models.kt:117) communicates the outcome of every compaction attempt:

sealed class CompactionResult {
    object SkippedBelowThreshold : CompactionResult()
    object SkippedNoAgent : CompactionResult()
    object SkippedCursorAlreadyAdvanced : CompactionResult()
    data class Applied(val inputTokens: Int, val outputTokens: Int, val generation: Long, val fanout: ChunkFanoutMode? = null) : CompactionResult()
    data class Inflated(val inputTokens: Int, val outputTokens: Int, val attempt: Int) : CompactionResult()
    data class DiscardedPreEmpted(val observedGeneration: Long, val currentGeneration: Long) : CompactionResult()
    data class RolledBack(val backupGeneration: Long, val reason: String) : CompactionResult()
    data class HandedOffToTruncation(val contextWindowBefore: Int, val contextWindowAfter: Int) : CompactionResult()
}
  • SkippedBelowThreshold β€” fill ratio did not exceed compactionThreshold
  • SkippedNoAgent β€” no summary agent bound
  • SkippedCursorAlreadyAdvanced β€” another compaction already covered this range
  • Applied β€” turnHistory was replaced with the summary
  • Inflated β€” LLM summary was larger than input; orchestrator restores backup and retries with smaller scope
  • DiscardedPreEmpted β€” cursor moved while LLM was in flight; result dropped without mutation
  • RolledBack β€” DITL hook or orchestrator restored a backup
  • HandedOffToTruncation β€” retry budget exhausted; failure policy’s truncation path takes over

Cursors

Pre-emption detection relies on two cursors:

@Serializable
data class CompactionCursor(
    val generation: Long = 0L,
    val lastCompactedTurnIndex: Int = -1,
    val lastCompactionStrategy: PumpStationCompactionStrategy? = null,
    val lastCompactionInputTokens: Int = 0,
    val lastCompactionOutputTokens: Int = 0,
    val lastCompactionTimestamp: Long = 0L,
    val lastFanoutMode: ChunkFanoutMode? = null
)

@Serializable
data class LorebookCursor(
    val lastUpdatedTurnIndex: Int = -1,
    val lastUpdateTimestamp: Long = 0L,
    val lastUpdateGeneration: Long = 0L
)

A strategy function captures the current generation on entry, performs the LLM work, and CAS-applies the result only if the generation is still unchanged when the LLM returns. If a concurrent compaction has advanced the generation, the first caller’s work is stale and is returned as DiscardedPreEmpted without mutating turnHistory.

Backups

@Serializable
data class CompactionBackup(
    val generation: Long,
    val turnIndex: Int,
    val turnHistory: List<ConverseData>,
    val latestContent: MultimodalContent?,
    val contextWindow: ContextWindow,
    val miniBank: MiniBank,
    val createdAt: Long = System.currentTimeMillis()
)

Captured before every compaction attempt; pushed to a ring buffer bounded by maxCompactionBackups (default 3). Used by restoreFromBackup to roll back Inflated or pre-empted attempts.

Concurrency Modes

PumpStationConcurrencyMode (Pipeline/PumpStation.kt:49):

ModeBehavior
AsyncBackground agents fire as soon as possible; queued via backgroundMutex. Harness advances without waiting.
BlockingEach background agent blocks the harness until completion; agents run in sequence.

Only some harness agents can run as Async. The path-safety, judge, dispatch, goal, and intervention agents are always Blocking by design.

concurrencyMode is the default for harness agent slots. Individual slots can override via setConcurrencyMode(...) on the slot.

Foreground vs Background Agent Slots

HarnessAgentSlot (Pipeline/PumpStationModels.kt:855):

data class HarnessAgentSlot(
    val agent: P2PInterface?,
    val concurrency: PumpStationConcurrencyMode,
    val builderFunction: (suspend (harness: PumpStation) -> P2PInterface)? = null,
    val appendsToTurnHistory: Boolean = false
)
  • Blocking slots fire synchronously during the foreground phase, in registration order. The harness awaits each result.
  • Async slots are queued via backgroundMutex and launched on the station-scoped asyncScope (SupervisorJob() + Dispatchers.Default). The harness continues to the next phase. Failures are isolated. The scope is cancelled by cancelAsyncJobs at the end of executeLocal, so async coroutines cannot outlive the harness run.

appendsToTurnHistory (default false) is honoured only for concurrency = Async slots. When true, the agent’s result is captured into a PendingTurnEntry and merged into turnHistory by the foreground drain at the next safe phase boundary; when false, the result is discarded (historical fire-and-forget). The station-wide asyncAgentsAppendToTurnHistory flag acts as an umbrella default; per-slot flags override the station default. See Async Substrate below for the full contract.

Both slot types respect interval β€” a slot with interval = 5 only fires every 5 turns (modulo). The foregroundTurnInterval and backgroundTurnInterval builder fields are shorthand for setting intervals on the default slot list.

Background Jobs and Mutexes

The file-scope backgroundJobs, backgroundMutex, lorebookMutex, and summaryMutex are all declared in Pipeline/PumpStationLoop.kt:464. The lorebook and summary mutexes serialize their respective agent invocations to keep updates chronological; the background mutex serializes enqueueing (so concurrent turns do not race on the jobs list).

The lorebook agent is invoked under lorebookMutex.withLock in updateLorebook (PumpStationLoop.kt:1336). The summary agent under summaryMutex.withLock in updateSummary (PumpStationLoop.kt:1504).

DITL Hooks

PumpStation exposes six Developer-in-the-Loop hooks plus three agent-level functions. The full lifecycle:

HookTypeWhen it firesReceivesReturn
preInitFunction(suspend (MultimodalContent, PumpStation) -> MultimodalContent)?Once at startup, after preInitAgentOriginal content(Optional) transformed content
preValidationJudgeFunction(suspend (MultimodalContent, MiniBank, PumpStation) -> MiniBank)?Before judge firesBase input + current MiniBankReplacement MiniBank
preInvokeFunction(suspend (ContextWindow, MiniBank, PumpStation) -> Boolean)?Before judge firesContext + MiniBankfalse to abort the run with InterventionTerminated
preValidationDispatchFunction(suspend (MultimodalContent, ContextWindow, MiniBank, PumpStation) -> MiniBank)?Before dispatch firesBase input + ContextWindow + MiniBankReplacement MiniBank
postGenerateFunction(suspend (MultimodalContent, PumpStation) -> P2PInterface)?After dispatch produces a path requestDispatch resultOptional P2PInterface surfaced in metadata
pathValidationFunction(suspend (MultimodalContent, PumpStation) -> Boolean)?After path executesPath resultfalse to reject the result; original input flows through
pathTransformationFunction(suspend (MultimodalContent, PumpStation) -> MultimodalContent)?After path validation passesPath resultReplacement content for turnHistory
postMemoryFunction(suspend (MultimodalContent, PumpStation) -> MultimodalContent)?After lorebook/summary agentsLatest contentReplacement content
preCompactionFunction(suspend (MultimodalContent, ConverseData, ConverseHistory, PumpStation) -> MultimodalContent)?Before compactionLatest content + overflow turn + historyReplacement content (advisory)
postCompactionFunction(suspend (MultimodalContent, ConverseHistory, PumpStation) -> MultimodalContent)?After compactionLatest content + new historyReplacement content (advisory)
onContextTruncated(suspend (Boolean, Int) -> Unit)?When an agent truncates due to token budgetingWhether truncated + free spacenone
pathSafetyFunction(suspend (PathObject, String, PumpStation) -> Boolean)?For medium/high risk paths (alternative to pathSafetyAgent)Path + schematrue to approve, false to reject
pathLimitExceededFunction(suspend (PathObject, String, PumpStation) -> PathLimitExceededResult)?When maxTotalPathCallsPerPath is exceededPath + reasonDynamic policy result
compactionRolledBackFunction(suspend (CompactionBackup, String, PumpStation) -> CompactionBackup?)?When a backup is restoredBackup + reasonReplacement backup (or null to use as-is)
externalContextProvider((PumpStationTaskState) -> MutableMap<String, Any>)?Per dispatch turn (passed to reserve path revealWhen)Task stateExternal context map

The DITL hook order per turn:

  1. preInvokeFunction (gates the turn)
  2. preValidationJudgeFunction
  3. judgeAgent runs
  4. preValidationDispatchFunction
  5. dispatchAgent runs
  6. postGenerateFunction (surfaces optional agent in metadata)
  7. (if path selected) pathSafetyFunction / pathSafetyAgent for medium/high risk
  8. PathObject.execute
  9. pathValidationFunction (gates result)
  10. pathTransformationFunction
  11. result appended to turnHistory and rawTurnHistory
  12. (foreground phase) blocking harness agent slots
  13. (memory phase) postMemoryFunction
  14. (compaction phase) preCompactionFunction, then summaryAgent, then postCompactionFunction

Setters on PumpStation:

fun setPreInitFunction(fn: (suspend (MultimodalContent, PumpStation) -> MultimodalContent)?): PumpStation
fun setPreValidationJudgeFunction(fn: (suspend (MultimodalContent, MiniBank, PumpStation) -> MiniBank)?): PumpStation
fun setPreInvokeFunction(fn: (suspend (ContextWindow, MiniBank, PumpStation) -> Boolean)?): PumpStation
fun setPreValidationDispatchFunction(fn: (suspend (MultimodalContent, ContextWindow, MiniBank, PumpStation) -> MiniBank)?): PumpStation
fun setPostGenerateFunction(fn: (suspend (MultimodalContent, PumpStation) -> P2PInterface)?): PumpStation
fun setPathValidationFunction(fn: (suspend (MultimodalContent, PumpStation) -> Boolean)?): PumpStation
fun setPathTransformationFunction(fn: (suspend (MultimodalContent, PumpStation) -> MultimodalContent)?): PumpStation
fun setPostMemoryFunction(fn: (suspend (MultimodalContent, PumpStation) -> MultimodalContent)?): PumpStation
fun setPreCompactionFunction(fn: (suspend (MultimodalContent, ConverseData, ConverseHistory, PumpStation) -> MultimodalContent)?): PumpStation
fun setPostCompactionFunction(fn: (suspend (MultimodalContent, ConverseHistory, PumpStation) -> MultimodalContent)?): PumpStation
fun setOnContextTruncated(fn: (suspend (Boolean, Int) -> Unit)?): PumpStation
fun setPathSafetyFunction(fn: (suspend (PathObject, String, PumpStation) -> Boolean)?): PumpStation
fun setPathLimitExceededFunction(fn: (suspend (PathObject, String, PumpStation) -> PathLimitExceededResult)?): PumpStation
fun setCompactionRolledBackFunction(fn: (suspend (CompactionBackup, String, PumpStation) -> CompactionBackup?)?): PumpStation
fun setExternalContextProvider(provider: ((PumpStationTaskState) -> MutableMap<String, Any>)?): PumpStation

All setters return this for chaining. The DSL exposes these as setXyz { ... } builder setters via the top-level PumpStationBuilder.

Path Risk Levels

PathRiskLevel (Pipeline/PumpStation.kt:132):

LevelBehavior
LowPath runs without safety gate. Default for every path.
MediumPath-safety agent / function fires before the path runs.
HighPath-safety gate fires. The path is rejected on safe: false.

The safety gate calls pathSafetyFunction if set, then falls back to pathSafetyAgent. If neither is set, paths above Low risk are approved automatically. To force a manual review for medium/high risk paths, configure at least one of them.

Risk level is set on the path object:

path("dangerous") {
    risk = PathRiskLevel.High
    // ...
}

Loop Guards

Two guards run before each path call:

maxConsecutiveSamePath

Default 3. When the dispatch agent selects the same path N consecutive turns, the harness emits a LoopGuardTripped event, optionally invokes interventionAgent (if set), and proceeds with the call. The developer can detect this in DITL hooks or in their intervention agent.

maxTotalPathCallsPerPath

Default null (disabled). When set, after the call count for a path exceeds the cap, the harness emits LoopGuardTripped and consults the policy:

PathLimitExceededPolicyBehavior
SkipMove the path to reservePaths (effectively hiding it from dispatch). The path is no longer visible until revealed.
HaltSet lastError = MaxTurnsExceeded and return the input unchanged. The finalization phase emits a HarnessFailed event.
ContinueLog the violation via a PathFailed event and proceed with the call anyway.

The pathLimitExceededFunction DITL hook can override the static policy with a dynamic result.

Both loop guard events are emitted on PumpStationPhase.PathExecution.

Reserve Paths

Reserve paths are stored in reservePaths and are not visible to the dispatch agent until their revealWhen predicate evaluates to true. This is useful for:

  • Token-cost control: a rarely-needed path is hidden from the dispatch prompt until the situation warrants it
  • Capability gating: a path only becomes available after a specific event in the harness
  • Progressive disclosure: a path’s description can be revealed based on the current turn index, error ratio, or task state
reservePath("sandboxed-shell") {
    description = "Runs a shell command in a sandbox."
    risk = PathRiskLevel.High
    revealWhen { taskState, _ ->
        taskState.turnIndex > 3 && taskState.lastSelectedPathName != "sandboxed-shell"
    }
    setExecutionFunction { content, station, _, _ ->
        // ... run shell command ...
        MultimodalContent(text = "shell output")
    }
}

Once a reserve path is revealed, it stays visible for the remainder of the run (sticky). The getVisiblePathDescriptorsInternal method (Pipeline/PumpStation.kt:1739) is responsible for filtering reserve paths based on revealWhen. The set of revealed reserve paths is tracked in the internal revealedReservePaths set.

A ReservePathRevealed event is emitted on the first turn a reserve path becomes visible, with pathName and the full list of reserve paths.

Async Substrate

Async work in PumpStation comes from two sources: paths marked with isRunsInBackground = true, and additional harness agent slots configured with concurrency = PumpStationConcurrencyMode.Async. Both sources run as coroutines outside the foreground turn loop, and both need a way to land their results back into the harness conversation without corrupting it. The async substrate is the thread-safe interface that makes that possible.

State and Lifetime

The substrate is anchored by four pieces of state, all declared on PumpStation:

FieldTypePurpose
asyncScopeCoroutineScope (SupervisorJob() + Dispatchers.Default)The scope backing every async coroutine launched by the harness. Replaces the pre-substrate GlobalScope.launch calls so async work cannot outlive executeLocal.
pendingAsyncResultsChannel<PendingTurnEntry> (UNLIMITED)Queue of async-produced turn entries awaiting foreground drain. Bounded by maxConcurrentBackgroundAgents in practice.
historyMutexMutexSerializes async-origin writes to turnHistory, rawTurnHistory, turnSummary, contextWindow, and taskState. Foreground code paths keep their single-coroutine funnel and do not need to acquire this mutex.
asyncSeqCounterAtomicLongMonotonic id assigned to every PendingTurnEntry at enqueue time. The foreground drain sorts pending entries by seq so out-of-order async completions still produce a deterministic merge order from the LLM’s perspective.

ConverseHistory is a plain data class with no internal lock, so direct mutation of turnHistory from async code is NOT safe. The single thread-safe access point is appendTurnEntryAsync(entry, source) (and its batch sibling appendTurnEntriesAsync), which acquires historyMutex, appends the entry, and emits an AsyncTurnAppended event.

Drain Ordering

The foreground calls drainPendingAsyncResults() at two safe phase boundaries:

  1. Start of runJudgePhase β€” so the judge sees async completions from prior turns
  2. Start of runFinalizationPhase β€” so any final-turn async work lands before finalization completes

The drain batch-pulls everything currently buffered in pendingAsyncResults, sorts by seq, and merges into turnHistory (and rawTurnHistory) under historyMutex. Producers may continue to enqueue after the drain returns; the next drain picks them up. Per-path opt-out via PathObject.setSuppressHistoryEmit is honoured during the merge.

The seq-based ordering is chosen over wall-clock completion order because, from the LLM’s perspective inside the harness, intent order (the order producers declared their work) is more stable than completion order under concurrent completions. A Channel preserves submission order naturally; sorting by seq makes the property explicit and observable.

Per-Producer Opt-In and Opt-Out

Four flags govern whether an async result is captured into turnHistory:

FlagDefaultScopeEffect
asyncPathsAppendToTurnHistory (station)trueAll async pathsUmbrella switch. When false, the drain returns 0 regardless of per-path opt-out.
PathObject.setSuppressHistoryEmit (per-path)falseOne async pathPer-path override. When true, the path’s PathCompleted event still fires, but the drain skips the history merge.
asyncAgentsAppendToTurnHistory (station)falseAll async harness agentsUmbrella switch. When false, async agents run fire-and-forget (results discarded).
HarnessAgentSlot.appendsToTurnHistory (per-slot)falseOne async agentPer-slot override. The slot’s value OR-ed with the station default. When true, the result is captured into a PendingTurnEntry and merged.

Cancellation Lifecycle

runFinalizationPhase calls drainPendingAsyncResults() first, then cancelAsyncJobs(), then drainBackgroundEventQueue() last (so trace events fired during cancellation still flush). cancelAsyncJobs honors asyncJobGracePeriodMs:

  • null (the default): the wait is unbounded. cancelAsyncJobs yields once so any in-flight suspend point can make progress, then cancels asyncScope. Long-running async paths (e.g. an async path that wraps a multi-minute LLM call) are not artificially timeboxed.
  • A millisecond value: the wait is bounded by withTimeoutOrNull(gracePeriodMs) { yield() }. Coroutines that do not finish within the window are cancelled; their partial results are NOT merged into turnHistory.

The default is null because TPipe intentionally does not impose arbitrary timeouts on user work. Developers who need a hard upper bound should set this to a value that matches their worst-case LLM round-trip plus safety margin (e.g. 30 minutes for production agent harnesses).

asyncJobsScopedToStation defaults to true so async coroutines cannot outlive executeLocal. Setting it to false restores the pre-substrate GlobalScope.launch behavior, which is preserved for back-compat but should be considered legacy.

Pushing From Custom Async Agents

Custom async harness agents and DITL hooks running on asyncScope should use appendTurnEntryAsync (or appendTurnEntriesAsync for batches) to land results in the conversation. The corresponding AsyncTurnAppended event carries the source identifier and the seq so observers can correlate the merge back to the dispatch. Direct mutation of turnHistory from async code is unsafe because ConverseHistory is a plain data class with no internal lock.

pumpStation("research") {
    asyncAgentsAppendToTurnHistory = true
    asyncJobGracePeriodMs = 30 * 60 * 1000L  // 30 minutes

    harnessAgentBuilder({ station ->
        MyAsyncResearchAgent(station)
    }, concurrency = PumpStationConcurrencyMode.Async)
}

The full public API surface (setters, getters, drain internals) is documented in PumpStation API Reference.

Stash, Snapshots, and Pause/Resume

Stash

PumpStation.stash: MutableMap<String, ConverseData> is an in-memory escape hatch for content that would blow out the prompt budget. DITL hooks (typically pathTransformationFunction) can move oversized content into the stash and replace the turnHistory entry with a placeholder. A path designed to handle the stashed data can call station.retrieveStash(stashId) to pull it back.

The stash manifest (station.getStashManifest(): List<StashEntry>) surfaces rich metadata without loading the full content:

@Serializable
data class StashEntry(
    val id: String,
    val sourcePath: String?,
    val createdTurn: Int,
    val reason: StashReason,
    val tokenEstimate: Int?,
    val byteSize: Long?,
    val preview: String
)

StashReason (Pipeline/PumpStationModels.kt:115):

enum class StashReason {
    TokenOverflow,
    BinaryPayload,
    ErrorLog,
    UnsafeForPrompt,
    DeveloperRequested,
    BackgroundResult
}

A path can reach back into the stash via the extension PathObject.getStashContent(stashId, station) (Pipeline/PumpStationPathObjectExtensions.kt:55).

Snapshots

saveSnapshot() and restoreSnapshot(snapshot) are the high-risk-boundary save/restore. The snapshot captures the full task state plus the curated and raw history, context window, MiniBank, and stash manifest.

@Serializable
data class PumpStationSnapshot(
    val taskState: PumpStationTaskState,
    val turnHistory: ConverseHistory,
    val rawTurnHistory: ConverseHistory,
    val turnSummary: String,
    val contextWindow: ContextWindow,
    val miniBank: MiniBank,
    val stashManifest: List<StashEntry>,
    val visiblePathNames: List<String>,
    val reservePathNames: List<String>
)

After restoreSnapshot, harnessIsReady is set to true and a HarnessResumed event is emitted.

Pause and Resume

pauseAt(vararg phases) registers pause phases. The harness calls checkPauseGuards(phase) at the named phase boundaries; if the current phase is in the pause set, the harness emits HarnessSuspended and suspends on a rendezvous channel.

resume() clears the pause phases, emits HarnessResumed, and sends a signal to the suspended checkPauseGuards call.

Pause phases: BeforeJudge, AfterJudge, BeforeDispatch, AfterDispatch, BeforePathSafety, BeforePathExecution, AfterPathExecution, BeforeMemoryUpdate, BeforeCompaction, BeforeGoalValidation, BeforeExit.

Tracing Support

PumpStation tracing is keyed by taskState.runId (e.g. ps-1718371200000-1234). Every PumpStationEvent is mirrored into the global PipeTracer when tracing is enabled. The trace funnel (Pipeline/PumpStationHelpers.kt:75) maps each sealed event subtype to a PUMP_STATION_* TraceEventType.

The visualizer reads the phase and turnIndex from the trace event metadata, so each event carries a consistent turnIndex regardless of where it was emitted. The original PumpStationPhase is preserved in metadata["phase"] even when the TracePhase is a more generic bucket.

Trace events emitted by PumpStation:

TraceEventTypeEmitted onMetadata fields
PUMP_STATION_STARTEDHarnessStarted and PreInitCompletedrunId, turnIndex, phase
PUMP_STATION_HARNESS_WARNINGHarnessWarningwarningCode, mechanisms
PUMP_STATION_COMPLETEDHarnessCompletedrunId, turnIndex, exitReason, finalOutput
PUMP_STATION_FAILEDHarnessFailederror, errorMessage, exitReason
PUMP_STATION_SUSPENDEDHarnessSuspendedpausedAt, reason
PUMP_STATION_RESUMEDHarnessResumedrunId, turnIndex, phase
PUMP_STATION_HEALTH_CHECK_STARTEDHealthCheckStartedrunId, turnIndex
PUMP_STATION_HEALTH_CHECK_COMPLETEDHealthCheckCompletedstatus, warnings, terminateHarness
PUMP_STATION_JUDGE_STARTEDJudgeStartedrunId, turnIndex
PUMP_STATION_JUDGE_SKIPPEDJudgeSkippedreason, judgeRunMode
PUMP_STATION_JUDGE_COMPLETEDJudgeCompletedisComplete, shouldTerminate, contentPreview, token usage
PUMP_STATION_DISPATCH_STARTEDDispatchStartedrunId, turnIndex
PUMP_STATION_DISPATCH_COMPLETEDDispatchCompletedselectedPathName, pathRequest, contentPreview, token usage
PUMP_STATION_PATH_SELECTEDPathSelectedpathName, riskLevel
PUMP_STATION_PATH_SAFETY_STARTEDPathSafetyStartedpathName, riskLevel
PUMP_STATION_PATH_SAFETY_COMPLETEDPathSafetyCompletedpathName, riskLevel, approved, reason
PUMP_STATION_PATH_STARTEDPathStartedpathName, riskLevel
PUMP_STATION_PATH_COMPLETEDPathCompletedpathName, riskLevel, contentPreview, token usage
PUMP_STATION_PATH_FAILEDPathFailedpathName, riskLevel, error, errorMessage
PUMP_STATION_PATH_HIDDENPathHiddenpathName, reason
PUMP_STATION_PATH_VALIDATION_COMPLETEDPathValidationCompletedpathName, approved, reason
PUMP_STATION_INTERVENTION_STARTEDInterventionStartedpathName, trigger
PUMP_STATION_INTERVENTION_COMPLETEDInterventionCompletednudges, shouldContinue, contentPreview, token usage
PUMP_STATION_FOREGROUND_AGENT_COMPLETEDForegroundAgentCompletedagentName, contentPreview, token usage
PUMP_STATION_MEMORY_UPDATE_STARTEDMemoryUpdateStartedmemoryMode
PUMP_STATION_MEMORY_UPDATE_COMPLETEDMemoryUpdateCompletedmemoryMode, compactionPercent, loreBookActive, summaryActive
PUMP_STATION_STASH_CREATEDStashCreatedstashId, sourcePath, reason, tokenEstimate
PUMP_STATION_COMPACTION_STARTEDCompactionStartedstrategy, memoryMode
PUMP_STATION_COMPACTION_COMPLETEDCompactionCompleted and CompactionAttemptCompletedstrategy, memoryMode, attempt, fanout, result
PUMP_STATION_COMPACTION_INFLATEDCompactionInflatedinputTokens, outputTokens, attempt, willRetry
PUMP_STATION_COMPACTION_ROLLED_BACKCompactionRolledBackbackupGeneration, reason
PUMP_STATION_COMPACTION_HANDED_OFFCompactionHandedOffToTruncationcontextWindowBefore, contextWindowAfter
PUMP_STATION_GOAL_VALIDATION_STARTEDGoalValidationStartedrunId, turnIndex
PUMP_STATION_GOAL_VALIDATION_COMPLETEDGoalValidationCompletedpassed, reason
PUMP_STATION_RESERVE_PATH_REVEALEDReservePathRevealedpathName, reservePathNames
PUMP_STATION_LOOP_GUARD_TRIPPEDLoopGuardTrippedguard, pathName, detail
PUMP_STATION_CONTEXT_BLOWOUT_DETECTEDContextBlowoutDetectedfillRatio, threshold, afterPhase
PUMP_STATION_BACKGROUND_AGENT_QUEUEDBackgroundAgentQueuedagentName
PUMP_STATION_ASYNC_TURN_APPENDEDAsyncTurnAppendedsource, pathName, agentName, seq, contentPreview
PUMP_STATION_NESTED_P2P_COMPLETEDNestedP2PCompletedpathName, agentName, contentPreview, token usage

Enable tracing:

val station = pumpStation("name") {
    // ...
    tracing {
        enabled()
        outputFormat(TraceFormat.HTML)
        autoExport(enabled = true, path = "~/.tpipe-traces/")
    }
}

Or programmatically: station.enableTracing(TraceConfig(enabled = true)). getTraceReport(format) returns the rendered trace as a string; getTraceId() returns the run ID.

Kill Switch Integration

PumpStation carries a KillSwitch (Pipeline/PumpStation.kt:677) that propagates to every PathObject in pathList and reservePaths. The harness checks the kill switch at every phase boundary via recordAndCheckKillSwitch(agent) (after judge, dispatch, and path execution).

The kill switch enforces inputTokenLimit and outputTokenLimit. When tripped, the default KillSwitch.onTripped callback throws KillSwitchException. The harness loop catches the exception at the runHarnessLoop boundary and sets taskState.lastError = KillSwitchTripped / exitReason = KillSwitchTripped before runFinalizationPhase emits a HarnessFailed event.

Each PathObject also has its own kill switch slot, populated by the station’s addPath and killSwitch setter. Per-path limits are checked independently against the path’s own token usage.

KillSwitch is documented in full in KillSwitch - Token Limit Enforcement. The propagation pattern in PumpStation mirrors the one used by Manifold, Splitter, Junction, Connector, MultiConnector, and DistributionGrid.

P2P Integration

PumpStation : P2PInterface (Pipeline/PumpStation.kt:667). The station exposes its child agents and context through the P2P surface:

override fun getContextWindowFromInterface(): ContextWindow? = contextWindow
override fun getMiniBankFromInterface(): MiniBank? = miniBank
override fun getPaths(): String = serialize(getVisiblePathDescriptorsInternal(), false)

executeP2PRequest(request: P2PRequest): P2PResponse? delegates to executeLocal(content) and emits a NestedP2PCompleted event annotated with the parent path name. This lets a path issue a nested P2P call (e.g. dispatching to a child agent) and have the visualizer render the nested call as a sub-entry under the parent path’s content panel.

setParentInterface(parent: P2PInterface) and getParentP2PInterface(): P2PInterface? track the parent container when a pump station is nested inside another. The PathObject.setParentInterface(this) call is made automatically at path execution time so nested calls are tagged with the station as their parent.

P2P Concurrency: PumpStation is stateful β€” turn index, history, and task state are mutable during a run. When exposed via P2P, register with P2PConcurrencyMode.ISOLATED so each inbound request gets a fresh clone. See P2P Registry and Routing for details.

Common Startup Failures

dispatchAgent must be a Pipeline

PumpStation.init() failed: dispatchAgent must be a Pipeline.
Agents requiring PathRequest schema output must use Pipeline.

Cause: dispatchAgent is null or not a Pipeline.

Fix: assign a Pipeline to dispatchAgent (or pumpStationBuilder.dispatchAgent).

PathObject.init() failed: pathName is required

Cause: a PathObject was added without a pathName.

Fix: set pathName on every PathObject before adding to the station.

PathObject.init() failed for path 'X': no execution mechanism configured

Cause: the path has none of executionFunction, internalAgent, agentBuilderFunction, or a bound PCP function.

Fix: wire at least one execution mechanism on the path.

MaxTurnsExceeded

The loop hit maxTurns (default 50) before any of the legitimate exit signals fired. Inspect taskState.lastError and taskState.exitReason to confirm. Common causes:

  • Judge never says isComplete: true (in Always mode) or requestJudgeNextTurn is never set (in FlagTriggered mode)
  • No path returns passPipeline = true
  • No terminatePipeline either
  • A HarnessWarning with code = NoExitSignalConfigured was emitted at startup if none of the above are configured

Fix: increase maxTurns conservatively, wire an exit signal, or supply a goal agent if you want the loop to keep verifying until the goal accepts.

KillSwitchTripped

Token usage exceeded the configured input or output cap. The loop boundary catches the KillSwitchException and transitions to failure. Increase the cap on the KillSwitch (e.g. KillSwitch(inputTokenLimit = 100_000, outputTokenLimit = 20_000)) or optimize the agents to use fewer tokens.

CompactionInflated

The summary agent returned a summary whose estimated token count exceeded the input. The orchestrator restored the most recent CompactionBackup and retried with a smaller scope until the retry budget was exhausted. The harness then handed off to the truncation path. Increase maxCompactionAttempts (default 2) or switch to Truncation mode for cost-controlled runs.

ContextBlowoutDetected

The context-window fill ratio exceeded blowoutThreshold (default 0.9) at a phase boundary. The harness’s recovery policy kicks in (up to maxBlowoutRecoveries attempts); after that the run halts.

Dispatch agent emits invalid JSON for PathRequest

By default, the harness attempts up to maxDispatchRepairAttempts (default 1) repair round. After the budget is exhausted, the turn continues without a path call. To halt on parse failure, set stopHarnessOnInvalidPathRequest = true (or failurePolicy.stopHarnessOnInvalidPathRequest = true).

Path-safety agent rejects a path

The path was not called. The original input flows through as the turn’s result so the loop can continue. Inspect PathSafetyCompleted.approved in the trace to see the reason.

Best Practices

  • Start with PumpStationDefaults.withOpenRouter(config) { ... } from TPipe-Defaults for a pre-wired station. Override slots only when you need custom behavior.
  • Wire at least one exit signal before running. Judge with default prompt, or a path that returns passPipeline = true, or FlagTriggered mode with a path-bound requestJudgeNextTurn().
  • Set maxTurns conservatively in FlagTriggered mode. The judge’s absence means a runaway dispatch agent can loop forever otherwise.
  • Use a goalAgent for Ralph-loop semantics. The goal agent forces a re-loop with critique if the work isn’t acceptable, up to maxGoalFailAttempts.
  • Use reserve paths for expensive capabilities. Hide them from the dispatch prompt until needed; their revealWhen predicate gates visibility.
  • Bind functions as PCP tools rather than writing full setExecutionFunction closures. The dispatch agent gets typed parameter schema and the path stays lightweight.
  • Set a KillSwitch with a non-zero token cap on every station. The default recommendedKillSwitchConfig() (50K input / 10K output) is a safe starting point.
  • Configure tracing during development. The visualizer’s turn-centric layout is the fastest way to understand what the harness is doing.
  • Use saveSnapshot() / restoreSnapshot() for rollback at high-risk boundaries (e.g. before a destructive path call).
  • Don’t share a single PumpStation instance across concurrent executeLocal calls. Build a fresh station per concurrent run.
  • Tune compaction strategy per workload. Whole is the cheapest when history fits; Chunked is the only option when history has blown past the agent’s context window.
  • Test the magic contracts. The judge, dispatch, and path-safety agents have structured JSON outputs. Validate them in your test suite β€” parseJudgeVerdict, parseDispatchOutput, and parsePathSafetyVerdict are accessible for direct invocation.

Cross-References