concept ~11 min
PathObject Class

These are the public methods of the `P2PInterface` contract that `PumpStation` overrides:

P2P Interface

These are the public methods of the P2PInterface contract that PumpStation overrides:

FunctionDescription
setParentInterface(parent: P2PInterface)Sets the parent container reference.
getParentP2PInterface(): P2PInterface?Returns the parent container reference.
getPaths(): StringSerializes the visible path descriptor list.
getContextWindowFromInterface(): ContextWindow?Returns the station’s contextWindow.
getMiniBankFromInterface(): MiniBank?Returns the station’s miniBank.
setTokenBudgetRecursive(budget: TokenBudgetSettings)Propagates the budget to all child agents.
getTokenBudgetSettings(): TokenBudgetSettings?Returns the station’s budget.
setPipeSettingsRecursively(settings: PipeSettings)Propagates pipe settings.
P2PInit() (suspend)Initializes the harness.
executeLocal(content: MultimodalContent): MultimodalContent (suspend)Primary execution entry.
executeP2PRequest(request: P2PRequest): P2PResponse? (suspend)P2P entry.

PathObject Class

PathObject lives in Pipeline/PumpStation.kt:225. It represents a single path the dispatch agent can invoke.

class PathObject(override var killSwitch: KillSwitch? = null) : P2PInterface

PathObject Public Properties

PropertyTypeDefaultDescription
pathNameString""Unique identifier. Required. The DSL throws IllegalArgumentException at build time if blank.
pathDescriptionString""LLM-facing description. Injected into the dispatch prompt.
pathSchemaString""Free-form JSON schema for the path’s input.
pcpSchemaPcpContext?nullPCP context for PCP-bound paths.
riskLevelPathRiskLevelLowLow, Medium, or High. Triggers path-safety gate at Medium+.
dispatchHintString""Soft advisory surfaced in the dispatch prompt as "Hint: ...".
revealWhen(taskState, externalContext) -> Boolean{ _, _ -> false }Predicate for reserve path visibility. Sticky once revealed.
pathMetadataMutableMap<Any, Any>emptyDeveloper-supplied metadata.
isInternalAgentSetBoolean (getter)falseTrue if an internal agent is configured.
isExecutionFunctionSetBoolean (getter)falseTrue if an execution function is bound.
isRunsInBackgroundBoolean (getter)falseTrue if the path runs in background.
isSuppressHistoryEmitBoolean (getter)falseTrue if the async path’s completion is suppressed from being merged into turnHistory. Only takes effect when isRunsInBackground is also true.
killSwitchKillSwitch? (override)nullPer-path token cap. Propagated by the station.

PathObject Public Functions

FunctionDescription
setInternalAgent(agent: P2PInterface)Sets the internal agent. Overrides any agent builder.
setExecutionFunction(function: (suspend (MultimodalContent, PumpStation, ConverseHistory?, String) -> MultimodalContent)?)Sets the execution function.
setRunsInBackground(value: Boolean)Marks the path as background.
setSuppressHistoryEmit(value: Boolean)When true, an async path’s PathCompleted event is still emitted, but the foreground drain skips merging the result into turnHistory. Only meaningful when the path is also isRunsInBackground = true.
P2PInit() (suspend, override)Delegates to init().
init(): PathDescriptionData (suspend)Validates configuration and returns the PathDescriptionData record.
getPathTokenUsage(): com.TTT.Pipe.TokenUsage?Reads the path’s token usage when the internal agent is a Pipeline.
getPathLegacyTokenUsage(): Pair<Int, Int>Reads legacy (input, output) token counters.
setParentInterface(parent: P2PInterface) (override)Sets the parent container reference.
getParentP2PInterface(): P2PInterface? (override)Returns the parent container reference.

PathObject Extension Functions

In Pipeline/PumpStationPathObjectExtensions.kt:

fun PathObject.bindFunction(name: String, function: KFunction<*>): PathObject
fun PathObject.getStashContent(stashId: String, station: PumpStation?): ConverseData?

bindFunction registers a Kotlin function in the global FunctionRegistry and populates pcpSchema with the function’s TPipeContextOptions. Throws IllegalArgumentException on blank name. Returns this for chaining.

getStashContent retrieves a stashed ConverseData by ID from the parent station’s stash. Returns null if the station is null or no entry exists.

TurnResult Sealed Class

TurnResult lives in Pipeline/PumpStationModels.kt:844:

sealed class TurnResult {
    object Continue : TurnResult()
    data class Halt(val reason: PumpStationExitReason) : TurnResult()
}

runTurn returns this to the outer runHarnessLoop. Continue re-enters the loop; Halt(reason) exits with the given reason. The outer loop also catches KillSwitchException at the loop boundary and transitions to a Halt(KillSwitchTripped) state.

PumpStationBuilder Class

PumpStationBuilder<S : PumpStationStage> lives in Pipeline/PumpStationDsl.kt:48. The generic type parameter tracks the build stage:

sealed class PumpStationStage {
    object Initial   : PumpStationStage()
    object HasPaths : PumpStationStage()
    object Ready    : PumpStationStage()
}

PumpStationBuilder<PumpStationStage.Initial> is the entry type. After at least one path { } call the builder promotes to HasPaths stage. After the build block returns, the builder promotes to Ready and build() produces the PumpStation.

Builder Block Methods

The pumpStation { } DSL surface is comprehensive. The most important blocks and properties:

pumpStation("name") {
    // Core configuration
    personality = "..."
    systemTask = "..."
    userGuidelines = "..."
    entryUserPrompt = "..."

    // Direct agent assignment
    judgeAgent = pipeline()
    dispatchAgent = pipeline()
    interventionAgent = pipeline()
    healthAgent = pipeline()
    lorebookAgent = pipeline()
    summaryAgent = pipeline()
    goalAgent = pipeline()
    preInitAgent = pipeline()
    pathSafetyAgent = pipeline()

    // Builder-function assignment
    judgeAgentBuilderFunction = { station -> pipeline() }
    // ... same for all agents

    // Path registration
    path("research") {
        description = "..."
        risk = PathRiskLevel.Low
        schema = "{}"
        pcpSchema = PcpContext()
        runsInBackground = false
        suppressHistoryEmit = false
        dispatchHint = "..."
        pathMetadata = mutableMapOf<Any, Any>()
        setInternalAgent(pipeline())
        setExecutionFunction { content, station, history, summary -> ... }
        bindFunction("fn", ::fn)
    }

    // Reserve path
    reservePath("sandboxed") {
        // same surface as path { }
        revealWhen { taskState, ctx -> ... }
    }

    // Memory and concurrency
    memory { mode = PumpStationMemoryManagementMode.Hybrid }
    setConcurrencyMode(PumpStationConcurrencyMode.Async)
    setMemoryManagementMode(PumpStationMemoryManagementMode.Compaction)
    setCompactionStrategy(PumpStationCompactionStrategy.Whole)
    setCompactionThreshold(0.8)
    setCompactionFanoutMode(ChunkFanoutMode.Sequential)
    setMaxCompactionAttempts(2)
    setChunkTokenBudget(2000)
    setMaxChunks(16)
    setMaxParallelChunks(4)
    setMaxCompactionBackups(3)
    setHybridWholeHeadroom(0.3)
    setPrePruneTransform { turns, station -> turns }
    appendPrePruneTransform { turns, station -> turns }
    setCompactionRolledBackFunction { backup, reason, station -> null }

    // Loop guards
    setMaxHarnessTurns(10)
    setMaxTurns(10)
    setMaxGoalFailAttempts(3)
    setMaxConsecutiveSamePath(3)
    setMaxTotalPathCallsPerPath(10)
    setPathLimitExceededPolicy(PathLimitExceededPolicy.Skip)
    setMaxRawTurnHistorySize(1000)
    setBlowoutThreshold(0.9)
    setMemoryUpdateTimeoutMs(30_000L)
    setMaxBlowoutRecoveries(3)
    setMaxRepairPromptTokens(500)
    setStopHarnessOnInvalidPathRequest(false)
    setMaxTurnHistorySize(50)
    setJudgeJsonContractEnabled(true)
    setPathSafetyJsonContractEnabled(true)
    setJudgeRunMode(PumpStationJudgeRunMode.Always)

    // Health
    setHealthAgentTurnInterval(10)
    setHealthAgentErrorRatioThreshold(0.2)
    setHealthAgentConcurrencyMode(PumpStationConcurrencyMode.Blocking)

    // DITL hooks
    setPreInitFunction { content, station -> content }
    setPreValidationJudgeFunction { content, miniBank, station -> miniBank }
    setPreInvokeFunction { ctx, miniBank, station -> true }
    setPreValidationDispatchFunction { content, ctx, miniBank, station -> miniBank }
    setPostGenerateFunction { content, station -> station }
    setPathValidationFunction { content, station -> true }
    setPathTransformationFunction { content, station -> content }
    setPostMemoryFunction { content, station -> content }
    setPreCompactionFunction { content, overflow, history, station -> content }
    setPostCompactionFunction { content, history, station -> content }
    setOnContextTruncated { wasTruncated, remaining -> }
    setPathSafetyFunction { path, schema, station -> true }
    setPathLimitExceededFunction { path, reason, station -> PathLimitExceededResult(...) }
    setCompactionRolledBackFunction { backup, reason, station -> null }
    setExternalContextProvider { taskState -> mutableMapOf() }

    // Custom prompts
    setCustomJudgeSystemPrompt("...")
    setCustomDispatchSystemPrompt("...")
    setCustomPathSafetySystemPrompt("...")
    setCustomHealthSystemPrompt("...")
    setCustomLorebookSystemPrompt("...")
    setCustomGoalSystemPrompt("...")

    // Additional harness agent slots
    harnessAgent {
        agent = pipeline()
        concurrency = PumpStationConcurrencyMode.Blocking
        interval = 5
    }
    harnessAgentBuilder {
        builderFunction = { station -> pipeline() }
        concurrency = PumpStationConcurrencyMode.Async
        interval = 5
    }

    // Kill switch
    killSwitch {
        inputTokenLimit = 50_000
        outputTokenLimit = 10_000
        onTripped = { ctx -> throw KillSwitchException(ctx) }
    }

    // Tracing
    tracing {
        enabled()
        maxHistory(1000)
        outputFormat(TraceFormat.HTML)
        detailLevel(TraceDetailLevel.STANDARD)
        autoExport(enabled = true, path = "~/.tpipe-traces/")
        includeContext(true)
        includeMetadata(true)
    }

    // Async substrate (paths that opt into runsInBackground; background harness agents)
    asyncPathsAppendToTurnHistory = true
    asyncAgentsAppendToTurnHistory = false
    asyncJobGracePeriodMs = null
    asyncJobsScopedToStation = true

    // Pause phases
    pause { phase(PumpStationPausePhase.BeforeJudge) }
    pause { phase(PumpStationPausePhase.BeforePathExecution) }

    // Pipeline names (reserved for future)
    pipelineNames { }

    // Dispatcher rules (reserved for future)
    dispatcherRules { }
}

The pumpStation("name") { ... } function returns a fully built PumpStation. There is also pumpStationBuilder("name") for callers who want to build the builder separately and call build() explicitly.

Async Substrate

PumpStation exposes a thread-safe substrate for code that runs outside the foreground turn loop. Async paths (isRunsInBackground = true) and async harness agents (HarnessAgentSlot with concurrency = Async) launch coroutines on a station-scoped CoroutineScope and need a way to land their results into the harness conversation history without corrupting it. The async substrate is that interface.

The full design — what the substrate guarantees, why the default grace period is null, and the rationale for monotonic seq ordering — is documented in PumpStation Container Doc. This section is the public API surface.

Async Substrate Properties

PropertyTypeDefaultDescription
asyncScopekotlinx.coroutines.CoroutineScopestation-scoped SupervisorJob() + Dispatchers.DefaultThe scope backing every async coroutine launched by the harness. Cancelled by cancelAsyncJobs() at the end of executeLocal.

Async Substrate Public Functions

FunctionDescription
appendTurnEntryAsync(entry: ConverseData, source: String = "agent") (suspend)The single thread-safe access point for async producers to push a ConverseData into turnHistory. Acquires the harness history mutex, appends to turnHistory and rawTurnHistory, and emits an AsyncTurnAppended event. Direct mutation of turnHistory from async code is NOT safe; always use this method.
appendTurnEntriesAsync(entries: List<ConverseData>, source: String = "agent") (suspend)Batch version of appendTurnEntryAsync. Acquires the history mutex once and emits a single trailing AsyncTurnAppended event for the last entry.
cancelAsyncJobs(gracePeriodMs: Long? = asyncJobGracePeriodMs)Cancel in-flight async coroutines. When gracePeriodMs is null (the default), the wait is unbounded — the harness yields once and then cancels. When set, coroutines that do not finish within the window are cancelled. Safe to call multiple times. Called automatically by runFinalizationPhase before executeLocal returns.
isAsyncScopeActive(): BooleanReturns true if asyncScope is still active (not yet cancelled). Useful for tests and DITL tooling.
setAsyncPathsAppendToTurnHistory(value: Boolean): PumpStationStation-wide default for whether async paths append their result to turnHistory on completion. Default true. Per-path opt-out via PathObject.setSuppressHistoryEmit.
isAsyncPathsAppendToTurnHistory(): BooleanReturns the current default.
setAsyncAgentsAppendToTurnHistory(value: Boolean): PumpStationStation-wide default for whether async harness agents append their result to turnHistory on completion. Default false (fire-and-forget). Per-slot opt-in via HarnessAgentSlot.appendsToTurnHistory.
isAsyncAgentsAppendToTurnHistory(): BooleanReturns the current default.
setAsyncJobGracePeriodMs(ms: Long?): PumpStationOptional millisecond grace period given to in-flight async coroutines after runFinalizationPhase before cancelAsyncJobs cancels asyncScope. When null (the default), the cancel is unbounded. TPipe intentionally does not impose arbitrary timeouts on user work; developers who need a hard upper bound should set this to a value that matches their worst-case LLM round-trip plus safety margin.
getAsyncJobGracePeriodMs(): Long?Returns the current grace period, or null if the cancel is unbounded.
setAsyncJobsScopedToStation(value: Boolean): PumpStationWhen true (the default), async work runs on the station-scoped asyncScope so cancelAsyncJobs can guarantee no coroutine outlives executeLocal. When false, async work runs on GlobalScope (the pre-substrate fire-and-forget behavior).
isAsyncJobsScopedToStation(): BooleanReturns whether async work runs on the station-scoped asyncScope.

Drain Ordering

The foreground calls drainPendingAsyncResults() at safe phase boundaries (start of judge, start of finalization). The drain merges pending entries into turnHistory in monotonic seq order, where seq is assigned at enqueue time by an AtomicLong counter. Out-of-order async completions still produce a deterministic merge order from the LLM’s perspective.

Per-path opt-out via PathObject.setSuppressHistoryEmit is honoured during the drain. Per-slot opt-in via HarnessAgentSlot.appendsToTurnHistory is honoured at enqueue time. The two station-wide defaults (asyncPathsAppendToTurnHistory, asyncAgentsAppendToTurnHistory) act as the umbrella switches; the per-path / per-slot flags override the station defaults.

Pushing From Custom Async Agents

Custom async harness agents and DITL hooks running on asyncScope should use appendTurnEntryAsync (or appendTurnEntriesAsync for batches) to land results in the conversation. The corresponding AsyncTurnAppended event carries the source identifier and the seq so observers can correlate the merge back to the dispatch. Direct mutation of turnHistory from async code is unsafe because ConverseHistory is a plain data class with no internal lock.

pumpStation("research") {
    asyncAgentsAppendToTurnHistory = true
    asyncJobGracePeriodMs = 30 * 60 * 1000L  // 30 minutes

    harnessAgentBuilder({ station ->
        MyAsyncResearchAgent(station)
    }, concurrency = PumpStationConcurrencyMode.Async)
}

Enums

The enums referenced by the API are documented in full in PumpStation Models API:

  • PumpStationConcurrencyMode — Async, Blocking
  • PumpStationMemoryManagementMode — Compaction, Truncation, Hybrid
  • PumpStationCompactionStrategy — Whole, Chunked, Hybrid
  • PumpStationJudgeRunMode — Always, FlagTriggered
  • PathRiskLevel — Low, Medium, High
  • PumpStationStatus — NotStarted, Running, WaitingOnBackground, Suspended, Completed, Failed, Terminated
  • PumpStationPhase — PreInit, HealthCheck, Judge, Dispatch, PathSafety, PathExecution, PathValidation, Intervention, ForegroundAgents, MemoryUpdate, Compaction, GoalValidation, Exit
  • PumpStationError — UnknownPath, InvalidPathRequest, DispatchJsonRepairFailed, PathExecutionException, TokenBudgetExceeded, MemoryBlowout, KillSwitchTripped, MaxTurnsExceeded, LoopGuardTriggered, P2PRequestInvalid, InitNotCalled, CompactionInflated, CompactionRolledBack
  • PumpStationExitReason — JudgeComplete, PassSignal, TerminateSignal, MaxTurnsHit, KillSwitchTripped, GoalValidationFailed, InterventionTerminated, Error
  • PumpStationPausePhase — BeforeJudge, AfterJudge, BeforeDispatch, AfterDispatch, BeforePathSafety, BeforePathExecution, AfterPathExecution, BeforeMemoryUpdate, BeforeCompaction, BeforeGoalValidation, BeforeExit
  • StashReason — TokenOverflow, BinaryPayload, ErrorLog, UnsafeForPrompt, DeveloperRequested, BackgroundResult
  • PathLimitExceededPolicy — Skip, Halt, Continue
  • HealthStatus — Healthy, Degraded, Critical, Unknown
  • WarningCode — NoExitSignalConfigured
  • ExitMechanism — JudgeAlways, JudgeFlagTriggered, PathPassPipeline, PathTerminatePipeline
  • ChunkFanoutMode — Sequential, Parallel
  • LorebookOperation — Merge, Replace

Default prompts are documented in PumpStation Magic Contracts.

Cross-References


Next: PumpStation Models API →