Pydantic Guardrails for LLM Pipelines: Harnessing Cognitive Drift (Part 2)
Introduction: From Principles to Practice
In Part 1, we discussed how Pydantic helps manage cognitive drift in LLM pipelines by enforcing structure and consistency. In this part, we’ll focus on building modular pipelines that leverage Pydantic’s validation capabilities for scalability, maintainability, and reliability.
A modular pipeline can be broken into four layers:
In this part, we’ll focus on the Data Layer and Integration Layer, which form the foundation for validating and structuring data.
Data Layer: Defining the Core Schema
The Data Layer defines the schema for validating LLM inputs, outputs, and intermediate data structures. Pydantic models simplify validation, reduce redundancy, and ensure consistency across the pipeline.
Core Model: BaseResponseModel
The BaseResponseModel serves as a base class for response models. It provides common fields like status and message to ensure uniformity across all response types.
from pydantic import BaseModel
from typing import Optional
class BaseResponseModel(BaseModel):
status: str = "success" # Default to "success" unless specified
message: Optional[str] = None # Optional feedback or context
class Config:
arbitrary_types_allowed = True
Specialized Models for Different Response Types
Extend BaseResponseModel to handle specific response formats.
# Text Response: For plain text outputs
class TextResponse(BaseResponseModel):
content: str
# JSON Response: For structured JSON data
from typing import Union, Dict, List, Any
class JSONResponse(BaseResponseModel):
data: Union[Dict[str, Any], List[Dict[str, Any]]]
# Tabular Response: For tabular data using pandas
import pandas as pd
class TabularResponse(BaseResponseModel):
data: pd.DataFrame
class Config:
arbitrary_types_allowed = True
# Code Response: For responses containing code snippets
class CodeResponse(BaseResponseModel):
code: str
These models:
Special Notes:
Integration Layer: API Wrapping and Validation
The Integration Layer connects your pipeline to external APIs like OpenAI or Claude.
It ensures:
Unified API Handler
The call_api_async function abstracts provider-specific calls to LLM APIs. It ensures a unified interface for interacting with different providers.
async def call_api_async(client, model_id: str, prompt: str, temperature: float = 0.4, max_tokens: int = 1056):
"""
Conceptual function to demonstrate unified API handling and response validation.
Replace 'generate_response' with provider-specific methods.
Args:
client: API client instance (e.g., OpenAI, Claude).
model_id: The LLM model ID.
prompt: The input prompt.
temperature: Sampling temperature.
max_tokens: Token limit.
Returns:
Validated structured response.
"""
# Conceptual call - use provider-specific method here
response = await client.generate_response(prompt, model_id, temperature, max_tokens)
# Parse and validate the response (example logic)
return validate_response_type(response.content, expected_res_type="json")
Note: This code is for demonstration purposes only and focuses on illustrating key workflows (e.g., API calling, response validation). It is not fully functional or provider specific.
Response Type Validation
The validate_response_type function converts raw responses into structured Pydantic models based on the expected type.
from typing import Union
from io import StringIO
import pandas as pd
def validate_response_type(response_content: str, expected_res_type: str):
…
if expected_res_type == "json":
cleaned_content = clean_and_extract_json(response_content)
return JSONResponse(data=cleaned_content)
elif expected_res_type == "str":
return TextResponse(content=response_content)
elif expected_res_type == "tabular":
df = pd.read_csv(StringIO(response_content))
return TabularResponse(data=df)
elif expected_res_type == "code":
return CodeResponse(code=response_content)
else:
raise ValueError(f"Unsupported response type: {expected_res_type}")
Note: This code is for demonstration purposes only and focuses on illustrating key workflows. See Appendix for full runnable code examples.
Data Cleaning: Handling Malformed JSON
Raw responses often include malformed JSON or extraneous text. Use clean_and_extract_json to safely extract valid JSON.
import json
import re
def clean_and_extract_json(response_content: str):
try:
return json.loads(response_content)
except json.JSONDecodeError:
match = re.search(r"({.*}|\\[.*\\])", response_content)
if match:
clean_content = re.sub(r",\s*([}\]])", r"\1", match.group(0))
return json.loads(clean_content)
return None
Note: This code is for demonstration purposes only and focuses on illustrating key workflows. See Appendix for full runnable code examples.
Provider-Specific Handling
Different LLM providers may return responses in varying formats. The Integration Layer handles these nuances.
Examples
(see Appendix for detailed explanation)
By encapsulating these differences in the Integration Layer, the rest of your pipeline remains clean and consistent.
Pipeline in Action
Here’s how the Data Layer and Integration Layer work together in a modular LLM pipeline:
What’s Next?
In the future parts, I will show examples in the Resource Layer (prompt templates) and Business Logic Layer (task-specific workflows), such as:
Stay tuned!
Appendix: Runnable Code Examples & Detailed Explanation (Optional Read)
LLM_Response Models (Pydantic Models)
Each model is a class. Pydantic models can stack on top of each other to form a tree hierarchy structure, by inheriting each other. This approach is consistent, intuitive, and efficient as it allows models to share fields and configurations and cut down error handling code.
BaseResponseModel
class BaseResponseModel(BaseModel):
"""
Base model that provides common fields for various response models.
Attributes:
status (str): Indicates the success status of the response, defaults to "success".
message (Optional[str]): Optional field to provide additional feedback or a message.
Config:
arbitrary_types_allowed (bool): Allows non-standard types like pandas DataFrame.
*Allows validation functions to add status and message easily!
"""
status: str = "success"
message: Optional[str] = None
class Config:
arbitrary_types_allowed = True
BaseResponseModel is the base class for response models here. It provides two common fields (status and message) for consistency for all LLM responses. It serves as a “holder” or “shell” with “status” and “message” fields attached to it already.
Fields:
Pydantic Configuration
Allows non-standard types (e.g., pandas DataFrame) through arbitrary_types_allowed = True (this means that we allow any new fields to be added to have custom or non-standard types such as a pandas.DataFrame, datetime, or any user-defined class.)
TextResponseModel
class TextResponse(BaseResponseModel):
"""
Model for plain text responses.
Attributes:
content (str): Holds plain text content of the response.
Config:
json_schema_extra (dict): Provides an example structure for documentation.
"""
content: str
class Config:
json_schema_extra = {
"example": {
"status": "success",
"message": "Text response processed.",
"content": "This is the plain text content.",
}
}
This class, TextResponse, is a Pydantic model that inherits from BaseResponseModel and is specifically designed to handle plain text responses. By extending BaseResponseModel, it automatically inherits the status field (e.g., "success" or "error") and the message field (optional feedback or context about the response).
TextResponse adds its own field, "content", which holds the plain text content as a string.
The Config class in Pydantic allows for customization of the model’s behavior and metadata. Here, the json_schema_extra dictionary provides additional details for schema generation, typically used for API documentation or OpenAPI specifications. For instance, the example key in json_schema_extra provides a sample structure for TextResponse that showcases how the model might appear in API responses.
The model now has three key fields:
Status, message, and content are now attributes of the TextResponse model, not just keys in a dictionary (e.g., TextResponse["content"]). This makes accessing these fields more intuitive (e.g., TextResponse.content) and also improves compatibility with tools like pylint, which can better validate attribute access compared to dictionary key access.
It is a common convention to organize the core data or content of a model under a single attribute. For plain text responses, it is usually content, while more complex formats may use data (easy for your colleagues to know where to look.)
CodeResponseModel
class CodeResponse(BaseResponseModel):
"""
Model for responses containing code snippets.
Attributes:
code (str): Holds the code as a string.
Config:
json_schema_extra (dict): Example structure for code response documentation.
"""
code: str
class Config:
json_schema_extra = {
"example": {
"status": "success",
"message": "Code response processed.",
"code": "print('Hello, world!')",
}
}
The CodeResponse class is a Pydantic model designed for API responses containing code snippets. It inherits common fields from BaseResponseModel like “status” (e.g., "success"` or "error") and “message” (optional feedback). It adds a “code” field to store the code snippet as a string.
TabularResponseModel
class TabularResponse(BaseResponseModel):
"""
Model for handling tabular data responses using pandas DataFrame.
Attributes:
data (pd.DataFrame): Contains the tabular data.
Config:
- arbitrary_types_allowed (bool): Allows DataFrame as a valid type.
- json_schema_extra (dict): Example structure for tabular data documentation.
"""
data: pd.DataFrame
class Config:
arbitrary_types_allowed = True
json_schema_extra = {
"example": {
"status": "success",
"message": "Tabular data processed.",
"data": "Pandas DataFrame object",
}
}
The TabularResponse class handles API responses containing tabular data stored in a Pandas DataFrame. It inherits common fields from BaseResponseModel, such as status (e.g., "success" or "error") and message (optional feedback or context). The data field is a required attribute that holds the tabular data in the form of a Pandas DataFrame.
JSONResponseModel
class JSONResponse(BaseModel):
"""
General-purpose model for handling JSON-based responses.
Attributes:
data (Union[Dict[str, Any], List[Dict[str, Any]]]): Holds JSON data,
which can be either a dictionary or a list of dictionaries.
Config:
arbitrary_types_allowed (bool): Allows non-standard types in
JSON responses.
"""
data: Union[Dict[str, Any], List[Dict[str, Any]]]
class Config:
arbitrary_types_allowed = True
The JSONResponse class is a Pydantic model designed for handling general-purpose JSON-based responses. It includes a data field that can store either a dictionary or a list of dictionaries, making it versatile for various JSON response formats.
The Config class enables arbitrary_types_allowed, allowing the model to handle non-standard types in JSON responses.
These four models (Text, Tabular, Code, and JSON) are designed to be flexible to make the models easy to reuse and share across projects. (The domain specific models, which will be covered in future parts, are much "tighter.")
Recommended by LinkedIn
LLM API Calling Functions
These are utils functions to make LLM API calls (mostly async version).
Functions:
Dependencies (Core LLM API Calling Utils Functions)
# Built-in & External libraries
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Union, Optional, cast
import json
import logging
import logging_config
from pydantic import ValidationError
# LLM imports
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import ollama # ollama remains synchronous as there’s no async client yet
# From own modules
from models.llm_response_models import (
CodeResponse,
JSONResponse,
TabularResponse,
TextResponse,
EditingResponseModel,
JobSiteResponseModel,
)
from utils.llm_api_utils import (
validate_json_type,
validate_response_type,
get_claude_api_key,
get_openai_api_key,
)
from project_config import (
GPT_35_TURBO,
GPT_4,
GPT_4_TURBO,
CLAUDE_HAIKU,
CLAUDE_SONNET,
CLAUDE_OPUS,
)
Helper for Asynchronous Operation
# Helper function to run synchronous API calls in an executor
async def run_in_executor_async(func, *args):
"""
Runs a synchronous function in a ThreadPoolExecutor for async compatibility.
Args:
func (Callable): The synchronous function to execute.
*args: Arguments to pass to the function.
Returns:
Any: The result of the synchronous function.
"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
return await loop.run_in_executor(pool, func, *args)
This function ensures that synchronous functions, such as call_llama3, can be executed seamlessly within an asynchronous module. Since LLaMA is local and does not use an API, asynchronous execution is not inherently needed.
API Wrapper Functions (Call GPT, Call Claude, Call Llama3)
OpenAI (GPT)
# Async wrapper for OpenAI
async def call_openai_api_async(
prompt: str,
model_id: str = GPT_4_TURBO,
expected_res_type: str = "str",
json_type: str = "",
temperature: float = 0.4,
max_tokens: int = 1056,
client: Optional[AsyncOpenAI] = None,
) -> Union[
JSONResponse,
TabularResponse,
TextResponse,
CodeResponse,
EditingResponseModel,
JobSiteResponseModel,
]:
"""Asynchronously calls OpenAI API and parses the response."""
openai_client = client or AsyncOpenAI(api_key=get_openai_api_key())
logger.info("OpenAI client ready for async API call.")
return await call_api_async(
openai_client,
model_id,
prompt,
expected_res_type,
json_type,
temperature,
max_tokens,
"openai",
)
Anthropic (Claude)
# Async wrapper for Claude
async def call_claude_api_async(
prompt: str,
model_id: str = CLAUDE_SONNET,
expected_res_type: str = "str",
json_type: str = "",
temperature: float = 0.4,
max_tokens: int = 1056,
client: Optional[AsyncAnthropic] = None,
) -> Union[
JSONResponse,
TabularResponse,
TextResponse,
CodeResponse,
EditingResponseModel,
JobSiteResponseModel,
]:
"""Asynchronously calls the Claude API to generate responses based on a given prompt."""
claude_client = client or AsyncAnthropic(api_key=get_claude_api_key())
logger.info("Claude client ready for async API call.")
return await call_api_async(
claude_client,
model_id,
prompt,
expected_res_type,
json_type,
temperature,
max_tokens,
"claude",
)
LLaMA
# Async wrapper for Llama 3
async def call_llama3_async(
prompt: str,
model_id: str = "llama3",
expected_res_type: str = "str",
json_type: str = "",
temperature: float = 0.4,
max_tokens: int = 1056,
) -> Union[
JSONResponse,
TabularResponse,
TextResponse,
CodeResponse,
EditingResponseModel,
JobSiteResponseModel,
]:
"""Asynchronously calls the Llama 3 API and parses the response."""
return await call_api_async(
client=None,
model_id=model_id,
prompt=prompt,
expected_res_type=expected_res_type,
json_type=json_type,
temperature=temperature,
max_tokens=max_tokens,
llm_provider="llama3",
)
These are the high-level async functions that wrap around specific LLM APIs, such as OpenAI, Claude, and Llama3. They are the entry points of the module: whenever you need to make a LLM query, you call one of them directly.
Note: dotenv/.env is the most common API key management method.
expected_res_type refers to "labels" that classify and validate LLM responses. You can request responses in these four formats:
These formats ensure LLM outputs are tailored, parsable, and ready for downstream processes. While multi-modal options (like video/audio) are possible, the current focus is on text.
Core Unified API Handler
The wrapper functions then call on call_api_async method to execute the call.
# Unified async API calling function
async def call_api_async(
client: Optional[Union[AsyncOpenAI, AsyncAnthropic]],
model_id: str,
prompt: str,
expected_res_type: str,
json_type: str,
temperature: float,
max_tokens: int,
llm_provider: str,
) -> Union[
JSONResponse,
TabularResponse,
CodeResponse,
TextResponse,
EditingResponseModel,
JobSiteResponseModel,
]:
"""
Asynchronous function for handling API calls to OpenAI, Claude, and Llama.
This method handles provider-specific nuances (e.g., multi-block responses for Claude)
and validates responses against expected types and Pydantic models.
Args:
client (Optional[Union[AsyncOpenAI, AsyncAnthropic]]):
The API client instance for the respective provider.
If None, a new client is instantiated.
model_id (str):
The model ID to use for the API call (e.g., "gpt-4-turbo" for OpenAI).
prompt (str):
The input prompt for the LLM.
expected_res_type (str):
The expected type of response (e.g., "json", "tabular", "str", "code").
json_type (str):
Specifies the type of JSON model for validation (e.g., "job_site", "editing").
temperature (float):
Sampling temperature for the LLM.
max_tokens (int):
Maximum number of tokens for the response.
llm_provider (str):
The name of the LLM provider ("openai", "claude", or "llama3").
Returns:
Union[JSONResponse, TabularResponse, CodeResponse, TextResponse, EditingResponseModel,
JobSiteResponseModel]:
The validated and structured response.
Raises:
ValueError: If the response cannot be validated or parsed.
TypeError: If the response type does not match the expected format.
Exception: For other unexpected errors during API interaction.
Notes:
- OpenAI & Llama3 always returns single-block responses, while Claude may
return multi-block responses, which needs special treatment.
- Llama3 API is synchronous and is executed using an async executor.
#* Therefore, the API calling for each LLM provider need to remain separate:
#* Combining them into a single code block will have complications;
#* keep them separate here for each provider is a more clean and modular.
Examples:
>>> await call_api_async(
client=openai_client,
model_id="gpt-4-turbo",
prompt="Translate this text to French",
expected_res_type="json",
json_type="editing",
temperature=0.5,
max_tokens=100,
llm_provider="openai"
)
"""
try:
logger.info(f"Making API call with expected response type: {expected_res_type}")
response_content = ""
if llm_provider == "openai":
openai_client = cast(AsyncOpenAI, client)
response = await openai_client.chat.completions.create(
model=model_id,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt},
],
temperature=temperature,
max_tokens=max_tokens,
)
response_content = response.choices[0].message.content
elif llm_provider == "claude":
claude_client = cast(AsyncAnthropic, client)
system_instruction = (
"You are a helpful assistant who adheres to instructions."
)
response = await claude_client.messages.create(
model=model_id,
max_tokens=max_tokens,
messages=[{"role": "user", "content": system_instruction + prompt}],
temperature=temperature,
)
# *Add an extra step to extract content from response object's TextBlocks
# *(Unlike GPT and LlaMA, Claude uses multi-blocks in its responses:
# *The content attribute of Message is a list of TextBlock objects,
# *whereas others wrap everything into a single block.)
response_content = (
response.content[0].text
if hasattr(response.content[0], "text")
else str(response.content[0])
)
# logger.info(f"claude api response raw output: {response_content}")
elif llm_provider == "llama3":
# Llama3 remains synchronous, so run it in an executor
options = {
"temperature": temperature,
"max_tokens": max_tokens,
"batch_size": 10,
"retry_enabled": True,
}
response = await run_in_executor_async(
ollama.generate, model_id, prompt, options
)
response_content = response["response"]
logger.info(f"Raw {llm_provider} Response: {response_content}")
# Validation 1: response content and return structured response
validated_response_model = validate_response_type(
response_content, expected_res_type
)
logger.info(
f"validated response content after validate_response_type: \n{validated_response_model}"
) # TODO: debugging; delete afterwards
# Validation 2: Further validate JSONResponse -> edit response or job site response models
if expected_res_type == "json":
if isinstance(validated_response_model, JSONResponse):
# Pass directly to validate_json_type for further validation
validated_response_model = validate_json_type(
response_model=validated_response_model, json_type=json_type
)
else:
raise TypeError(
"Expected validated response content needs to be a JSONResponse model."
)
logger.info(
f"validated response content after validate_json_type: \n{validated_response_model}"
) # TODO: debugging; delete afterwards
return validated_response_model
except (json.JSONDecodeError, ValidationError) as e:
logger.error(f"Validation or parsing error: {e}")
raise ValueError(f"Invalid format received from {llm_provider} API: {e}")
except Exception as e:
logger.error(f"{llm_provider} API call failed: {e}")
raise
Step 1: Input Parameters
The function accepts:
Step 2: Provider-Specific API Calls
OpenAI
client.chat.completions()
Claude
client.message.create()
Llama3
Step 3: Handling Claude's Multi-Block Responses
Unlike other providers (e.g., OpenAI, Meta), which deliver single-block outputs, Claude splits responses into multiple blocks.
Mathematica
Block 1 (Metadata): {"blocks": ["text", "table", "image"]}
Block 2 (Text): "Results indicate a trend."
Block 3 (Table):
Condition | Value
A | 12.5
B | 15.3
Block 4 (Image): Base64-encoded image data.
Challenges:
Solution: Handle responses dynamically:
response_content = (
response.content[0].text
if hasattr(response.content[0], "text")
else str(response.content[0])
)
Future Improvement: A more robust solution iterates through all blocks to capture essential data.
Note on Claude’s multi-block response:
Most LLM providers use the single-block approach to deliver complex or multi-part outputs. Anthropic API is quite unique by using the multi-block format, which makes it easier to process distinct data types independently, reduces token usage, and is more flexible. However, it also adds complexity to API interactions, as developers must dynamically handle different block formats without relying on predefined block types.
Step 4: First Validation – validate_response_type()
Step 5: Second Validation – validate_json_type() (For JSON Only)
If the first validation outputs a JSONResponse, further validate the structure against specific JSON models: EditingResponseModel JobSiteResponseModel
Step 6: Return Structured Response
Final Output Types:
Original Extracted Content from Raw LLM Responses
Guardrails:
Response Type Validation: validate_response_type
# Response type validation
def validate_response_type(
response_content: Union[str, Any], expected_res_type: str
) -> Union[
CodeResponse,
JSONResponse,
TabularResponse,
TextResponse,
]:
"""
Validates and structures the response content based on
the expected response type.
Args:
response_content (Any): The raw response content from the LLM API.
expected_res_type (str): The expected type of the response
(e.g., "json", "tabular", "str", "code").
Returns:
Union[CodeResponse, JSONResponse, TabularResponse, TextResponse]:
The validated and structured response as a Pydantic model instance.
- CodeResponse: Returned when expected_res_type is "code", wraps code content.
- JSONResponse, JobSiteResponseModel, or EditingResponseModel:
Returned when expected_res_type is "json", based on json_type.
- TabularResponse: Returned when expected_res_type is "tabular", wraps a DataFrame.
- TextResponse: Returned when expected_res_type is "str", wraps plain text content.
"""
if expected_res_type == "json":
# Check if response_content is a string that needs parsing
if isinstance(response_content, str):
# Only parse if it's a string
cleaned_content = clean_and_extract_json(response_content)
if cleaned_content is None:
raise ValueError("Failed to extract valid JSON from the response.")
else:
# If it's already a dict or list, use it directly
cleaned_content = response_content
# Create a JSONResponse instance with the cleaned content
if isinstance(cleaned_content, (dict, list)):
return JSONResponse(data=cleaned_content)
else:
raise TypeError(
f"Expected dict or list for JSON response, got {type(cleaned_content)}"
)
elif expected_res_type == "tabular":
try:
# Parse as DataFrame and wrap in TabularResponse model
df = pd.read_csv(StringIO(response_content))
return TabularResponse(data=df)
except Exception as e:
logger.error(f"Error parsing tabular data: {e}")
raise ValueError("Response is not valid tabular data.")
elif expected_res_type == "str":
# Wrap text response in TextResponse model
return TextResponse(content=response_content)
elif expected_res_type == "code":
# Wrap code response in CodeResponse model
return CodeResponse(code=response_content)
else:
raise ValueError(f"Unsupported response type: {expected_res_type}")
This function is fairly straight forward, except for the JSON cleaning part.
Data Cleaning and Parsing: clean_and_extract_json
# Function to clean/extract JSON content
def clean_and_extract_json(
response_content: str,
) -> Optional[Union[Dict[str, Any], List[Any]]]:
"""
Extracts, cleans, and parses JSON content from the API response.
Strips out any non-JSON content like extra text before the JSON block.
Also removes JavaScript-style comments and trailing commas.
Args:
response_content (str): Raw response content.
Returns:
Optional[Union[Dict[str, Any], List[Any]]]: Parsed JSON data as a dictionary or list,
or None if parsing fails.
"""
try:
# Attempt direct parsing
return json.loads(response_content)
except json.JSONDecodeError:
logger.warning("Initial JSON parsing failed. Attempting fallback extraction.")
# Extract JSON-like structure (object or array)
match = re.search(r"({.*}|\\[.*\\])", response_content, re.DOTALL)
if not match:
logger.error("No JSON-like content found.")
return None
try:
# Remove trailing commas
clean_content = re.sub(r",\\s*([}\\]])", r"\\1", match.group(0))
return json.loads(clean_content)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON in fallback: {e}")
return None
This function supports validate_response_type by parsing and cleaning JSON content. Despite precise prompting, LLMs often return imperfect JSON - a common challenge in the field. LLM responses frequently include extra content outside the JSON structure (e.g., explanations, preambles, or trailing text), making direct parsing unreliable. This function ensures such messy JSON data is transformed into valid JSON (dictionary or list).
Step 1. Direct Parsing
Attempts to parse the input string using json.loads.
Step 2. Fallback with Regex
If parsing fails, it uses the re library to extract JSON-like structures ({...} or [...]) from unstructured text.
Regex Pattern: r"({.*}|\[.*\])"
Step 3. Cleaning
Removes JavaScript-style trailing commas from the extracted JSON to ensure validity.
Step 4. Final Parsing