- [Introduction](#introduction)
Table of Contents
- Introduction
- Prerequisites
- Basic Usage
- API Modes
- Authentication
- Your First Pipe (per mode)
- Third-Party Providers
- Structured Outputs
- Function Calling
- Reasoning Models
- Streaming
- Anthropic-Style Caching
- Multimodal Content
- Comparison with Other Providers
- Troubleshooting
- Next Steps
Introduction
TPipe-GenericOpenAI is a single TPipe pipe that talks to any provider implementing an OpenAI-compatible API surface. That includes the obvious targets — OpenAI, Azure OpenAI — but also Anthropic’s /v1/messages endpoint, DeepSeek, Groq, Together, MiniMax, and any custom in-house proxy that speaks the OpenAI Chat Completions, Anthropic Messages, or OpenAI Responses wire spec. You pick the wire format with setApiMode(ApiMode.…), set the endpoint with setBaseUrl(…), and the rest of TPipe’s pipeline/orchestration surface (pipelines, junctions, distribution grids, PCP tools) stays identical to every other TPipe provider.
Prerequisites
- API key for your target provider. OpenAI keys come from platform.openai.com, Anthropic keys from console.anthropic.com, and so on. You can also rely on the
GENERIC_OPENAI_API_KEYenvironment variable and skip programmatic key management entirely. - A Gradle Kotlin DSL project. Maven and Groovy DSL are not supported by TPipe.
- Java 24+ (GraalVM CE 24 recommended). Matches the rest of the TPipe toolchain.
- Project dependency. Add the
:TPipe-GenericOpenAImodule:// settings.gradle.kts include(":TPipe-GenericOpenAI") // build.gradle.kts dependencies { implementation(project(":TPipe-GenericOpenAI")) }
Basic Usage
The shortest path to a working pipe: build it, configure the API key, pick a model, init, and execute. The default ApiMode.OpenAI mode is used unless you call setApiMode(...).
import genericOpenAIPipe.GenericOpenAIPipe
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("gpt-4o")
.setSystemPrompt("You are a helpful assistant.")
.setTemperature(0.7)
.init()
val result = pipe.execute("What is the capital of France?")
println(result.text)
}
If you would rather resolve the key once for the whole process, set the GENERIC_OPENAI_API_KEY environment variable and skip setApiKey():
export GENERIC_OPENAI_API_KEY="sk-..."
init() reads it automatically and throws IllegalStateException if neither path is configured.
API Modes
GenericOpenAIPipe is wire-format polymorphic. The ApiMode sealed class selects which endpoint, auth header set, request serializer, and SSE parser the pipe uses. The mode is locked after the first API call — see the warning box below.
| Mode | Endpoint | Auth header(s) | When to use |
|---|---|---|---|
ApiMode.OpenAI (default) | ${baseUrl}/chat/completions | Authorization: Bearer <key> | OpenAI, Azure OpenAI, DeepSeek, Groq, Together, MiniMax, local llama.cpp servers, any OpenAI Chat-Completions-compatible proxy |
ApiMode.Anthropic | ${baseUrl}/anthropic/v1/messages | x-api-key: <key>, anthropic-version: 2023-06-01 | Anthropic Claude (direct or via a Messages-API proxy) |
ApiMode.OpenAIResponses | ${baseUrl}/responses | Authorization: Bearer <key> | OpenAI’s newer Responses wire spec (response.created / response.output_text.delta / response.completed events) |
The getAuthHeaders() and getEndpoint() methods on the pipe select the right values for the active mode at the moment the request is built, so the rest of your configuration is identical across modes.
Authentication
There are three ways to provide credentials, checked in this order by init():
- Programmatic via the pipe’s builder:
val pipe = GenericOpenAIPipe() .setApiKey("sk-...") - Process-global via the
GenericOpenAIEnvsingleton:import genericOpenAIPipe.env.genericOpenAIEnv genericOpenAIEnv.setApiKey("sk-...") val pipe = GenericOpenAIPipe() // no setApiKey needed - Environment variable
GENERIC_OPENAI_API_KEY:export GENERIC_OPENAI_API_KEY="sk-..."
init() walks the chain in this order: pipe-level apiKey field → GenericOpenAIEnv.resolveApiKey() (programmatic + env) → IllegalStateException if nothing resolves. Use genericOpenAIEnv.hasApiKey() to probe before constructing a pipe.
Endpoint overrides for proxies and enterprise gateways
Most providers expose the OpenAI Chat Completions surface at a different base URL. Use setBaseUrl(...) to point the pipe at it. The URL must be HTTPS — the builder enforces this and throws IllegalArgumentException for plain HTTP:
.setBaseUrl("https://api.openai.com/v1") // OpenAI
.setBaseUrl("https://api.anthropic.com") // Anthropic
.setBaseUrl("https://api.deepseek.com/v1") // DeepSeek
.setBaseUrl("https://api.groq.com/openai/v1") // Groq
.setBaseUrl("https://api.together.xyz/v1") // Together
.setBaseUrl("https://api.MiniMax.io/v1") // MiniMax
.setBaseUrl("https://openai.myenterprise.com") // internal proxy
The trailing slash is stripped automatically.
Your First Pipe (per mode)
The same pipe class behaves differently per ApiMode. Three minimal, end-to-end working examples:
OpenAI mode (default)
import genericOpenAIPipe.GenericOpenAIPipe
import genericOpenAIPipe.api.ApiMode
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("gpt-4o")
.setSystemPrompt("You are a helpful assistant.")
.init()
println(pipe.execute("What is TPipe?").text)
}
Anthropic mode (Claude via /messages)
import genericOpenAIPipe.GenericOpenAIPipe
import genericOpenAIPipe.api.ApiMode
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("ANTHROPIC_API_KEY"))
.setBaseUrl("https://api.anthropic.com")
.setModel("claude-3-5-sonnet-20241022")
.setApiMode(ApiMode.Anthropic)
.setMaxTokens(1024)
.init()
println(pipe.execute("Explain quantum entanglement in one sentence.").text)
}
The Anthropic mode swaps the auth header set to x-api-key + anthropic-version automatically, routes through AnthropicRequestSerializer to produce an AnthropicMessagesRequest, and uses AnthropicSseParser for streaming.
OpenAI Responses mode
import genericOpenAIPipe.GenericOpenAIPipe
import genericOpenAIPipe.api.ApiMode
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("gpt-4o-2025-04-16")
.setApiMode(ApiMode.OpenAIResponses)
.init()
println(pipe.execute("Summarize the plot of Hamlet in three bullets.").text)
}
⚠
apiModeis locked after the first API call. CallingsetApiMode(...)afterexecute()/generateText()/generateContent()has been invoked throwsIllegalStateException. Set the mode up front, before the first call. If you need to switch modes, build a new pipe instance.
Third-Party Providers
Every provider on this list is reached by combining setApiMode(ApiMode.OpenAI) with a provider-specific setBaseUrl(...). The wire format is the same; only the URL and the key change.
import genericOpenAIPipe.GenericOpenAIPipe
import genericOpenAIPipe.api.ApiMode
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// DeepSeek
val deepseek = GenericOpenAIPipe()
.setApiKey(System.getenv("DEEPSEEK_API_KEY"))
.setBaseUrl("https://api.deepseek.com/v1")
.setModel("deepseek-chat")
.setApiMode(ApiMode.OpenAI)
.init()
// Groq
val groq = GenericOpenAIPipe()
.setApiKey(System.getenv("GROQ_API_KEY"))
.setBaseUrl("https://api.groq.com/openai/v1")
.setModel("llama-3.1-70b-versatile")
.setApiMode(ApiMode.OpenAI)
.init()
// Together
val together = GenericOpenAIPipe()
.setApiKey(System.getenv("TOGETHER_API_KEY"))
.setBaseUrl("https://api.together.xyz/v1")
.setModel("meta-llama/Llama-3-70b-chat-hf")
.setApiMode(ApiMode.OpenAI)
.init()
// MiniMax
val MiniMax = GenericOpenAIPipe()
.setApiKey(System.getenv("MiniMax_API_KEY"))
.setBaseUrl("https://api.MiniMax.io/v1")
.setModel("MiniMax-text-01")
.setApiMode(ApiMode.OpenAI)
.init()
// Azure-style proxy (OpenAI Responses spec)
val azureResponses = GenericOpenAIPipe()
.setApiKey(System.getenv("AZURE_OPENAI_KEY"))
.setBaseUrl("https://my-resource.openai.azure.com/openai/deployments/gpt-4o")
.setModel("gpt-4o")
.setApiMode(ApiMode.OpenAIResponses)
.init()
println(deepseek.execute("Hello").text)
println(groq.execute("Hello").text)
}
If you want to point the pipe at an internal proxy, set its base URL with setBaseUrl("https://openai.myenterprise.com") — as long as it speaks one of the three supported wire formats, no other configuration is needed.
Structured Outputs
Constrain the model to a JSON shape using setResponseFormat(...) together with setStructuredOutputs(true). The schema is a standard JSON Schema JsonObject:
import genericOpenAIPipe.GenericOpenAIPipe
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject
fun main() = runBlocking {
val schema = Json.parseToJsonElement("""
{
"type": "object",
"properties": {
"name": {"type": "string"},
"score": {"type": "number"}
},
"required": ["name", "score"]
}
""").jsonObject
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("gpt-4o")
.setResponseFormat("json_schema", schema)
.setStructuredOutputs(true)
.init()
println(pipe.execute("Score the name 'Ada Lovelace' from 0 to 10 and explain why.").text)
}
Three type values are supported: "text" (default, plain prose), "json_object" (free-form JSON object mode), and "json_schema" (validated against the supplied schema — jsonSchema becomes required for that type).
Function Calling
Register function-calling tools with setTools(...), choose the model’s tool-selection mode with setToolChoice(...), and optionally allow parallel calls with setParallelToolCalls(true):
import genericOpenAIPipe.GenericOpenAIPipe
import genericOpenAIPipe.env.ToolDefinition
import genericOpenAIPipe.env.FunctionSchema
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.jsonObject
fun main() = runBlocking {
val weatherParams = Json.parseToJsonElement("""
{
"type": "object",
"properties": {
"location": {"type": "string"},
"unit": {"type": "string", "enum": ["c", "f"]}
},
"required": ["location"]
}
""").jsonObject
val tools = listOf(
ToolDefinition(
type = "function",
function = FunctionSchema(
name = "get_weather",
description = "Get current weather for a location.",
parameters = weatherParams
)
)
)
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("gpt-4o")
.setTools(tools)
.setToolChoice("auto")
.setParallelToolCalls(true)
.init()
val reply = pipe.execute("What is the weather in Paris and Tokyo?")
println(reply.text)
}
Valid setToolChoice(...) values are "auto", "none", and "required". setParallelToolCalls(true) lets the model emit multiple tool calls in one response (default behaviour of most modern tool-calling models).
Reasoning Models
Reasoning-capable models (OpenAI o3 / o4-mini, DeepSeek-R1, etc.) accept a ReasoningConfig with effort, max-tokens, and visibility flags. The fields are serialized into the request body for the active mode:
import genericOpenAIPipe.GenericOpenAIPipe
import genericOpenAIPipe.env.ReasoningConfig
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("o4-mini")
.setReasoningConfig(
ReasoningConfig(
effort = "high", // "xhigh", "high", "medium", "low", "minimal", "none"
maxTokens = 8192,
exclude = false, // include reasoning in the final output
enabled = true
)
)
.init()
val result = pipe.execute("Plan a 3-day trip to Kyoto in JSON.")
println(result.text)
// result.modelReasoning carries the chain-of-thought when supported
println(result.modelReasoning)
}
Reasoning effort enum values: "xhigh" | "high" | "medium" | "low" | "minimal" | "none". The Responses API mode (OpenAI) additionally populates streamingReasoningTokens from the wire and exposes them via tracing metadata.
Streaming
Both setStreamingEnabled(true) and setStreamingCallback { ... } enable Server-Sent Events streaming. The callback receives text chunks as they arrive and the pipe also accumulates them into the final result:
import genericOpenAIPipe.GenericOpenAIPipe
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("gpt-4o")
.setStreamingCallback { chunk ->
print(chunk)
kotlin.io.stdout.flush()
}
.init()
pipe.execute("Stream a haiku about distributed systems.")
println()
}
SSE format is mode-specific:
ApiMode.OpenAI— standarddata: {...}lines parsed bySseParser.ApiMode.Anthropic—event:+data:lines parsed byAnthropicSseParser.ApiMode.OpenAIResponses—response.created/response.output_text.delta/response.completedevents parsed byOpenAIResponsesSseParser.
The pipe automatically routes to the right parser based on the active ApiMode.
Anthropic-Style Caching
The CacheControl data class carries Anthropic-style prompt-caching hints (cache type + optional TTL). It is defined in genericOpenAIPipe.env and serialized into the request body as cache_control when present:
import genericOpenAIPipe.env.CacheControl
val caching = CacheControl(
type = "ephemeral", // required: e.g., "ephemeral"
ttl = "5m" // optional: e.g., "5m", "1h", "24h"
)
CacheControl is a wire-level field on GenericOpenAIChatRequest (set to cache_control in the JSON body), so you use it when you construct a request directly or wire it into a custom request pipeline. A dedicated setCacheControl(...) builder is not currently exposed on the pipe — for a higher-level ergonomic API, use setApiMode(ApiMode.Anthropic) and the Anthropic-specific caching behaviour that the provider supports.
Multimodal Content
Pass images and documents to the pipe via MultimodalContent.binaryContent. The pipe converts each BinaryContent variant to the right content block shape for the active mode at request-build time:
import com.TTT.Pipe.MultimodalContent
import com.TTT.Pipe.BinaryContent
import genericOpenAIPipe.GenericOpenAIPipe
import genericOpenAIPipe.api.ApiMode
import kotlinx.coroutines.runBlocking
import java.io.File
fun main() = runBlocking {
val imageBytes = File("paris.png").readBytes()
val pipe = GenericOpenAIPipe()
.setApiKey(System.getenv("OPENAI_API_KEY"))
.setModel("gpt-4o")
.setApiMode(ApiMode.OpenAI)
.init()
val content = MultimodalContent(
text = "What is in this image?",
binaryContent = mutableListOf(
BinaryContent.Bytes(
data = imageBytes,
mimeType = "image/png"
)
)
)
println(pipe.execute(content).text)
}
The conversion matrix for each BinaryContent variant across the three modes:
BinaryContent type | OpenAI mode | Anthropic mode | OpenAI Responses mode |
|---|---|---|---|
Bytes | base64 data URI image block | base64 image block | base64 input item |
Base64String | base64 data URI image block | base64 image block | base64 input item |
CloudReference | URL image block | URL image block | URL input item |
TextDocument | text block | text block | text input item |
Bytes and Base64String are equivalent on the wire — both end up as a data:<mime>;base64,... image URL. CloudReference passes the URL through as-is. TextDocument injects the text into the content array.
Comparison with Other Providers
| Feature | GenericOpenAIPipe | OllamaPipe | OpenRouterPipe | BedrockPipe |
|---|---|---|---|---|
| Provider access | Any OpenAI-compatible API (OpenAI, Azure, Anthropic via /messages, DeepSeek, Groq, Together, MiniMax, custom proxies) | Local Ollama runtime | 300+ models through OpenRouter | AWS Bedrock (Claude, Titan, Llama, Cohere, …) |
| API modes | 3 (OpenAI, Anthropic, OpenAIResponses) | 2 (/api/chat, legacy /api/generate) | 1 (OpenAI Chat Completions) | 1 (Bedrock Converse API) |
| Tool calling | setTools(...) + parallel + setToolChoice(...) | Native tool calling | OpenAI-compatible | Converse API tools |
| Reasoning | setReasoningConfig(ReasoningConfig(...)) | enableThink() extracts <think> | setReasoningConfig(...) / setReasoningEffort(...) | Native reasoningContent |
| Streaming | SSE callback, mode-specific parsers | Ktor async + enableStreaming(...) | Server-Sent Events | Converse stream handler |
| Multimodal | MultimodalContent.binaryContent with mode-specific conversion | Base64 images | OpenAI-compatible | Base64 images |
| Caching | CacheControl(type, ttl) (Anthropic-style) | n/a | setCacheControl(ttl) | n/a |
| Free tier | Depends on provider | Yes (local) | Yes (limited free models) | No |
| Auth pattern | setApiKey / GenericOpenAIEnv / GENERIC_OPENAI_API_KEY | Local socket | setApiKey / OPENROUTER_API_KEY | AWS credentials / IAM roles |
| Endpoint override | setBaseUrl(https://...) | setIP / setPort | setBaseUrl(...) | setRegion(...) |
Use GenericOpenAIPipe when the provider you want is not first-class in TPipe, or when you need the Anthropic /messages or OpenAI Responses surface from a single pipe class.
Troubleshooting
IllegalStateException: GenericOpenAI API key is required
You called init() without a key. Fix in one of three ways:
- Call
.setApiKey("sk-...")on the pipe beforeinit(). - Call
genericOpenAIEnv.setApiKey("sk-...")before constructing the pipe. - Export
GENERIC_OPENAI_API_KEYin the process environment.
Probe at startup with genericOpenAIEnv.hasApiKey() if you want to fail fast with a friendlier error.
IllegalArgumentException: baseUrl must use HTTPS for security
setBaseUrl(...) enforces HTTPS. If you are behind a TLS-terminating proxy, configure it to forward HTTPS upstream; for local development against a plaintext Ollama / llama.cpp OpenAI-compatible server, use a self-signed cert and the https:// scheme.
IllegalStateException: apiMode cannot be changed after the first API request
You called setApiMode(...) after execute(), generateText(), or generateContent(). The mode is locked at first use. Build a second GenericOpenAIPipe with the new mode, or set the mode before the first call.
401 / 403 / 429 / 5xx responses
GenericOpenAIPipe maps these to P2PError types — see the Error Mapping table in the API reference. The most common root causes:
- 401 — wrong provider key, or key and base URL are mismatched (DeepSeek key against OpenAI’s URL, for example).
- 403 — region / model access not granted. AWS Bedrock and Azure both gate specific models; request access in the provider console.
- 429 — rate limit. Back off and retry, or use a higher service tier / quota.
- 5xx — provider-side fault. Retry with exponential backoff.
Output truncated / finishReason = "length"
You hit the model’s max-output-tokens limit. Raise setMaxTokens(...) or shorten the prompt. For very long completions, prefer streaming with setStreamingCallback(...) so partial output is visible.
Streaming callback never fires
You called setStreamingEnabled(false) (or didn’t call setStreamingCallback/setStreamingEnabled(true)). The pipe defaults to non-streaming mode. Either set setStreamingEnabled(true) explicitly, or pass a setStreamingCallback { ... } — registering a callback auto-enables streaming.
apiMode does not change wire format mid-conversation
Even if you re-call setApiMode(...) between calls, the second call throws. Re-create the pipe. Internally, apiModeLocked is set to true inside sendRequest(...) on the first request, before the response is even read.
Next Steps
GenericOpenAIPipeClass API — Full builder reference, env singleton, error mapping, andApiModedetails.PipeClass API — Core pipe abstraction and base-class builders (setModel,setTemperature,setMaxTokens,setSystemPrompt, etc.).- Pipe Context Protocol — Attach PCP tools to a
GenericOpenAIPipefor sandboxed multi-language tool execution.