concept ~3 min
Connector

The Connector is a conditional Valve. It tests the water flow against a condition and routes it to one pipe or the other—like a smart Y-joint.

Connector

💡 Tip: The Connector is a conditional Valve. It tests the water flow against a condition and routes it to one pipe or the other—like a smart Y-joint.

Table of Contents

Connector provides key-based routing to different pipelines. Content is routed to a specific pipeline based on a routing key, enabling branching logic in your pipeline architecture.

Basic Usage

val connector = Connector()
    .add("process", processingPipeline)
    .add("analyze", analysisPipeline)
    .add("summarize", summaryPipeline)

// Route content to specific pipeline
val result = connector.execute("process", content)

Key Methods

Pipeline Management

// Add pipeline with routing key
fun add(key: Any, pipeline: Pipeline): Connector

// Get pipeline by key
fun get(key: Any): Pipeline?

// Set default path for P2P requests
fun setDefaultPath(path: Any): Connector

Execution

// Execute with explicit key
suspend fun execute(path: Any, content: MultimodalContent): MultimodalContent

Error Handling

Connector does not throw exceptions on invalid paths. Instead, it sets terminatePipeline = true:

suspend fun execute(path: Any, content: MultimodalContent): MultimodalContent {
    try {
        val connection = branches[path]
        if (connection != null) {
            return connection.execute(content)
        }
        content.terminatePipeline = true
        return content
    } catch (e: Exception) {
        content.terminatePipeline = true
        return content
    }
}

Check the termination flag instead of catching exceptions:

val result = connector.execute("unknown-key", content)
if (result.terminatePipeline) {
    // Handle invalid routing key
}

Connector Path Metadata

Content objects can carry routing information:

// Extension functions in Connector.kt
fun MultimodalContent.setConnectorPath(path: Any) {
    metadata["connectorPath"] = path
}

fun MultimodalContent.getConnectorPath(): Any? {
    return metadata["connectorPath"]
}

P2P Integration

P2P Request Handling

P2P requests route to the last connected pipeline:

override suspend fun executeP2PRequest(request: P2PRequest): P2PResponse? {
    val pipeline = branches[lastConnection]
    return pipeline?.executeP2PRequest(request)
}

P2P Registration

fun registerAsP2PAgent() {
    // Set descriptor
    connector.setP2pDescription(P2PDescriptor(
        agentName = "document-processor",
        agentDescription = "Routes documents to type-specific processors",
        transport = P2PTransport(Transport.Tpipe, "document-processor"),
        requiresAuth = false,
        usesConverse = false,
        allowsAgentDuplication = false,
        allowsCustomContext = false,
        allowsCustomAgentJson = false,
        recordsInteractionContext = false,
        recordsPromptContent = false,
        allowsExternalContext = false,
        contextProtocol = ContextProtocol.none
    ))
    
    // Set requirements
    connector.setP2pRequirements(P2PRequirements(
        allowExternalConnections = true
    ))
    
    // Set transport
    connector.setP2pTransport(P2PTransport(Transport.Tpipe, "document-processor"))
    
    // Register with P2P system
    P2PRegistry.register(connector)
}

Tracing Support

Enable tracing across all child pipelines:

connector.enableTracing(TraceConfig(
    detailLevel = TraceDetailLevel.DETAILED,
    includeContent = true
))

// Get trace from last executed pipeline
val trace = connector.getTrace(TraceFormat.HTML)

Complete Example

class DocumentProcessor {
    private val connector = Connector()
        .add("pdf", pdfPipeline)
        .add("docx", docxPipeline) 
        .add("txt", textPipeline)
        .setDefaultPath("txt")
        .enableTracing()
    
    suspend fun processDocument(content: MultimodalContent, type: String): MultimodalContent {
        val result = connector.execute(type, content)
        
        if (result.terminatePipeline) {
            // Handle unsupported document type
            return handleUnsupportedType(content, type)
        }
        
        return result
    }
    
    private fun handleUnsupportedType(content: MultimodalContent, type: String): MultimodalContent {
        return MultimodalContent().apply {
            addText("Unsupported document type: $type")
            terminatePipeline = false // Reset flag
        }
    }
}

Best Practices

  • Check termination flag instead of catching exceptions
  • Set default paths for P2P integration
  • Use meaningful keys that describe the routing logic
  • Enable tracing for debugging complex routing scenarios
  • Validate pipeline existence before execution if needed
  • Handle unsupported routes gracefully with fallback logic

Previous: ← Container Overview | Next: MultiConnector →

Next Steps