Pydantic Guardrails for LLM Pipelines: Harnessing Cognitive Drift (Part 2)

Pydantic Guardrails for LLM Pipelines: Harnessing Cognitive Drift (Part 2)

Introduction: From Principles to Practice

In Part 1, we discussed how Pydantic helps manage cognitive drift in LLM pipelines by enforcing structure and consistency. In this part, we’ll focus on building modular pipelines that leverage Pydantic’s validation capabilities for scalability, maintainability, and reliability.

A modular pipeline can be broken into four layers:

  • Data Layer: Schema definitions using Pydantic.
  • Business Logic Layer: Functions that orchestrate workflows (to be covered in later parts).
  • Resource Layer: Reusable prompt templates for generating instructions (to be covered in later parts).
  • Integration Layer: Utility functions for interacting with external APIs.

In this part, we’ll focus on the Data Layer and Integration Layer, which form the foundation for validating and structuring data.


Data Layer: Defining the Core Schema

The Data Layer defines the schema for validating LLM inputs, outputs, and intermediate data structures. Pydantic models simplify validation, reduce redundancy, and ensure consistency across the pipeline.

Core Model: BaseResponseModel

The BaseResponseModel serves as a base class for response models. It provides common fields like status and message to ensure uniformity across all response types.

from pydantic import BaseModel
from typing import Optional

class BaseResponseModel(BaseModel):
    status: str = "success"  # Default to "success" unless specified
    message: Optional[str] = None  # Optional feedback or context

    class Config:
        arbitrary_types_allowed = True        

Specialized Models for Different Response Types

Extend BaseResponseModel to handle specific response formats.

# Text Response: For plain text outputs
class TextResponse(BaseResponseModel):
    content: str

# JSON Response: For structured JSON data
from typing import Union, Dict, List, Any

class JSONResponse(BaseResponseModel):
    data: Union[Dict[str, Any], List[Dict[str, Any]]]

# Tabular Response: For tabular data using pandas
import pandas as pd

class TabularResponse(BaseResponseModel):
    data: pd.DataFrame

    class Config:
        arbitrary_types_allowed = True

# Code Response: For responses containing code snippets
class CodeResponse(BaseResponseModel):
code: str        

These models:

  • Simplify validation by enforcing a consistent structure.
  • Reduce redundancy by reusing the base model.
  • Enable modularity for adding new response types (e.g., multimedia or other formats).
  • Allow intuitive access to attributes, improving compatibility with tools like pylint.

Special Notes:

  • These four models (Text, Tabular, Code, and JSON) are designed to be “somewhat loose.”
  • A key feature of Pydantic is its support for building hierarchical models. By using class inheritance, you can stack models to create a tree-like structure easily.
  • Status, message, and data/content are attributes of the models. They are easier to access than keys in a dictionary (e.g., TextResponse["content"]) and “Pylint friendly.”


Integration Layer: API Wrapping and Validation

The Integration Layer connects your pipeline to external APIs like OpenAI or Claude.

It ensures:

  • API Requests are formatted correctly.
  • Raw Responses are parsed and cleaned.
  • Validation is performed using Pydantic models.

Unified API Handler

The call_api_async function abstracts provider-specific calls to LLM APIs. It ensures a unified interface for interacting with different providers.

async def call_api_async(client, model_id: str, prompt: str, temperature: float = 0.4, max_tokens: int = 1056):
    """
    Conceptual function to demonstrate unified API handling and response validation.
    Replace 'generate_response' with provider-specific methods.

    Args:
        client: API client instance (e.g., OpenAI, Claude).
        model_id: The LLM model ID.
        prompt: The input prompt.
        temperature: Sampling temperature.
        max_tokens: Token limit.

    Returns:
        Validated structured response.
    """
    # Conceptual call - use provider-specific method here
response = await client.generate_response(prompt, model_id, temperature, max_tokens)

    # Parse and validate the response (example logic)
return validate_response_type(response.content, expected_res_type="json")         

Note: This code is for demonstration purposes only and focuses on illustrating key workflows (e.g., API calling, response validation). It is not fully functional or provider specific.

  • Steps like detailed response extraction (response.choices[0].message.content for OpenAI, or response.content[0].text / response.content for Claude) have been omitted for brevity.
  • Replace placeholder methods like generate_response with actual API calls: OpenAI: client.chat.completions.create(...); Claude: client.messages.create(...); LLaMA: Use local or synchronous APIs wrapped with an async helper.
  • Full, runnable implementations - including response handling - are provided in the appendix for reference.

Response Type Validation

The validate_response_type function converts raw responses into structured Pydantic models based on the expected type.

from typing import Union
from io import StringIO
import pandas as pd

def validate_response_type(response_content: str, expected_res_type: str):
    …
    if expected_res_type == "json":
        cleaned_content = clean_and_extract_json(response_content)
        return JSONResponse(data=cleaned_content)
    elif expected_res_type == "str":
        return TextResponse(content=response_content)
    elif expected_res_type == "tabular":
        df = pd.read_csv(StringIO(response_content))
        return TabularResponse(data=df)
    elif expected_res_type == "code":
        return CodeResponse(code=response_content)
    else:
        raise ValueError(f"Unsupported response type: {expected_res_type}")        

Note: This code is for demonstration purposes only and focuses on illustrating key workflows. See Appendix for full runnable code examples.

  • Plain Text: Wrapped in a TextResponse model.
  • JSON: Parsed into a JSONResponse model using the utility function clean_and_extract_json.
  • Tabular: Parsed into a pandas DataFrame and wrapped in TabularResponse.
  • Code: Wrapped in a CodeResponse model.

Data Cleaning: Handling Malformed JSON

Raw responses often include malformed JSON or extraneous text. Use clean_and_extract_json to safely extract valid JSON.

import json
import re

def clean_and_extract_json(response_content: str):
    try:
        return json.loads(response_content)
    except json.JSONDecodeError:
        match = re.search(r"({.*}|\\[.*\\])", response_content)
        if match:
            clean_content = re.sub(r",\s*([}\]])", r"\1", match.group(0))
            return json.loads(clean_content)
        return None        

Note: This code is for demonstration purposes only and focuses on illustrating key workflows. See Appendix for full runnable code examples.

Provider-Specific Handling

Different LLM providers may return responses in varying formats. The Integration Layer handles these nuances.

Examples

  1. OpenAI: Responses are usually plain text or JSON in a single block.
  2. Claude: Uses multi-block responses, where different types of data are split into separate blocks.
  3. Llama: Often synchronous, requiring asynchronous wrappers for compatibility.

(see Appendix for detailed explanation)

By encapsulating these differences in the Integration Layer, the rest of your pipeline remains clean and consistent.


Pipeline in Action

Here’s how the Data Layer and Integration Layer work together in a modular LLM pipeline:

  1. API Call: A higher-level function calls an API wrapper (call_llm_provider_api) -> the wrapper then calls LLM using call_api_async.
  2. Response Validation: The raw response is cleaned and validated against a Pydantic model.
  3. Validated Output: A pydantic model (e.g., JSONResponse, TextResponse) is returned for downstream processing.


What’s Next?

In the future parts, I will show examples in the Resource Layer (prompt templates) and Business Logic Layer (task-specific workflows), such as:

  • Using reusable templates to streamline LLM interactions.
  • Building workflows for tasks like reading webpages, resume editing, and topic generation.


Stay tuned!


Appendix: Runnable Code Examples & Detailed Explanation (Optional Read)

LLM_Response Models (Pydantic Models)

Each model is a class. Pydantic models can stack on top of each other to form a tree hierarchy structure, by inheriting each other.  This approach is consistent, intuitive, and efficient as it allows models to share fields and configurations and cut down error handling code.

BaseResponseModel

class BaseResponseModel(BaseModel):
    """
    Base model that provides common fields for various response models.

    Attributes:
        status (str): Indicates the success status of the response, defaults to "success".
        message (Optional[str]): Optional field to provide additional feedback or a message.

    Config:
        arbitrary_types_allowed (bool): Allows non-standard types like pandas DataFrame.

    *Allows validation functions to add status and message easily!
    """

    status: str = "success"
    message: Optional[str] = None

    class Config:
        arbitrary_types_allowed = True        

BaseResponseModel is the base class for response models here. It provides two common fields (status and message) for consistency for all LLM responses. It serves as a “holder” or “shell” with “status” and “message” fields attached to it already.

Fields:

  • status: A string indicating the response status, defaulting to "success".
  • message: An optional field for additional feedback or context.

Pydantic Configuration

Allows non-standard types (e.g., pandas DataFrame) through arbitrary_types_allowed = True (this means that we allow any new fields to be added to have custom or non-standard types such as a pandas.DataFrame, datetime, or any user-defined class.)

 

TextResponseModel

class TextResponse(BaseResponseModel):
    """
    Model for plain text responses.

    Attributes:
        content (str): Holds plain text content of the response.

    Config:
        json_schema_extra (dict): Provides an example structure for documentation.
    """

    content: str

    class Config:
        json_schema_extra = {
            "example": {
                "status": "success",
                "message": "Text response processed.",
                "content": "This is the plain text content.",
            }
        }        

This class, TextResponse, is a Pydantic model that inherits from BaseResponseModel and is specifically designed to handle plain text responses. By extending BaseResponseModel, it automatically inherits the status field (e.g., "success" or "error") and the message field (optional feedback or context about the response).

TextResponse adds its own field, "content", which holds the plain text content as a string.

The Config class in Pydantic allows for customization of the model’s behavior and metadata. Here, the json_schema_extra dictionary provides additional details for schema generation, typically used for API documentation or OpenAPI specifications. For instance, the example key in json_schema_extra provides a sample structure for TextResponse that showcases how the model might appear in API responses.

 The model now has three key fields:

  • TextResponse.status
  • TextResponse.message
  • TextResponse.content

Status, message, and content are now attributes of the TextResponse model, not just keys in a dictionary (e.g., TextResponse["content"]). This makes accessing these fields more intuitive (e.g., TextResponse.content) and also improves compatibility with tools like pylint, which can better validate attribute access compared to dictionary key access.

It is a common convention to organize the core data or content of a model under a single attribute. For plain text responses, it is usually content, while more complex formats may use data (easy for your colleagues to know where to look.)

CodeResponseModel

class CodeResponse(BaseResponseModel):
    """
    Model for responses containing code snippets.

    Attributes:
        code (str): Holds the code as a string.

    Config:
        json_schema_extra (dict): Example structure for code response documentation.
    """

    code: str

    class Config:
        json_schema_extra = {
            "example": {
                "status": "success",
                "message": "Code response processed.",
                "code": "print('Hello, world!')",
            }
        }        

The CodeResponse class is a Pydantic model designed for API responses containing code snippets. It inherits common fields from BaseResponseModel like “status” (e.g., "success"` or "error") and “message” (optional feedback). It adds a “code” field to store the code snippet as a string.

TabularResponseModel

class TabularResponse(BaseResponseModel):
    """
    Model for handling tabular data responses using pandas DataFrame.

    Attributes:
        data (pd.DataFrame): Contains the tabular data.

    Config:
        - arbitrary_types_allowed (bool): Allows DataFrame as a valid type.
        - json_schema_extra (dict): Example structure for tabular data documentation.
    """

    data: pd.DataFrame

    class Config:
        arbitrary_types_allowed = True
        json_schema_extra = {
            "example": {
                "status": "success",
                "message": "Tabular data processed.",
                "data": "Pandas DataFrame object",
            }
        }        

The TabularResponse class handles API responses containing tabular data stored in a Pandas DataFrame. It inherits common fields from BaseResponseModel, such as status (e.g., "success" or "error") and message (optional feedback or context). The data field is a required attribute that holds the tabular data in the form of a Pandas DataFrame.

JSONResponseModel

class JSONResponse(BaseModel):
    """
    General-purpose model for handling JSON-based responses.

    Attributes:
        data (Union[Dict[str, Any], List[Dict[str, Any]]]): Holds JSON data,
        which can be either a dictionary or a list of dictionaries.

    Config:
        arbitrary_types_allowed (bool): Allows non-standard types in
        JSON responses.
    """

    data: Union[Dict[str, Any], List[Dict[str, Any]]]

    class Config:
        arbitrary_types_allowed = True        

The JSONResponse class is a Pydantic model designed for handling general-purpose JSON-based responses. It includes a data field that can store either a dictionary or a list of dictionaries, making it versatile for various JSON response formats.

The Config class enables arbitrary_types_allowed, allowing the model to handle non-standard types in JSON responses.

These four models (Text, Tabular, Code, and JSON) are designed to be flexible to make the models easy to reuse and share across projects. (The domain specific models, which will be covered in future parts, are much "tighter.")

LLM API Calling Functions

These are utils functions to make LLM API calls (mostly async version).

  • Most built with native API libraries, without using high-level frameworks like Llama-Index or LangChain, except for ollama for Llama3.
  • Mostly Async.
  • Most of the code can be shared across all my projects.

Functions:

  • clean_and_extract_json: Extracts and sanitizes JSON data from raw API responses. 
  • get_openai_api_key: Retrieves the OpenAI API key from environment variables. 
  • get_claude_api_key: Retrieves the Claude API key from environment variables. 
  • validate_response_type: Validates and structures responses based on the expected type (json, tabular, str, or code). 
  • call_api: Unified function for making API calls and handling responses for OpenAI, Claude, and Llama. 
  • call_openai_api: Simplified wrapper for OpenAI-specific API interactions. 
  • call_claude_api: Simplified wrapper for Claude-specific API interactions. 
  • call_llama3: Simplified wrapper for Llama-specific API interactions. 

Dependencies (Core LLM API Calling Utils Functions)

# Built-in & External libraries
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import Union, Optional, cast
import json
import logging
import logging_config
from pydantic import ValidationError

# LLM imports
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import ollama  # ollama remains synchronous as there’s no async client yet

# From own modules
from models.llm_response_models import (
    CodeResponse,
    JSONResponse,
    TabularResponse,
    TextResponse,
    EditingResponseModel,
    JobSiteResponseModel,
)
from utils.llm_api_utils import (
    validate_json_type,
    validate_response_type,
    get_claude_api_key,
    get_openai_api_key,
)
from project_config import (
    GPT_35_TURBO,
    GPT_4,
    GPT_4_TURBO,
    CLAUDE_HAIKU,
    CLAUDE_SONNET,
    CLAUDE_OPUS,
)        

  • External libraries like os, json, pandas, re, and dotenv for general utility tasks.
  • Logging_config is my custom logging function
  • Libraries for LLM API interaction: openai, anthropic, and ollama (import AsyncOpenai and AsyncAnthropic for async version)
  • Pydantic models for structured validation (ValidationError, BaseModel, and specific models from llm_response_models).
  • Async and concurrent are concurrency related libraries.
  • Custom utils functions to make API calls.
  • Import LLM model names from the internal project_config file (gpt-4-turbo, gpt-4o, claude-3-5-sonnet-20241022, etc.)

Helper for Asynchronous Operation

# Helper function to run synchronous API calls in an executor
async def run_in_executor_async(func, *args):
    """
    Runs a synchronous function in a ThreadPoolExecutor for async compatibility.

    Args:
        func (Callable): The synchronous function to execute.
        *args: Arguments to pass to the function.

    Returns:
        Any: The result of the synchronous function.
    """
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        return await loop.run_in_executor(pool, func, *args)        

This function ensures that synchronous functions, such as call_llama3, can be executed seamlessly within an asynchronous module. Since LLaMA is local and does not use an API, asynchronous execution is not inherently needed.

API Wrapper Functions (Call GPT, Call Claude, Call Llama3)

OpenAI (GPT)

# Async wrapper for OpenAI
async def call_openai_api_async(
    prompt: str,
    model_id: str = GPT_4_TURBO,
    expected_res_type: str = "str",
    json_type: str = "",
    temperature: float = 0.4,
    max_tokens: int = 1056,
    client: Optional[AsyncOpenAI] = None,
) -> Union[
    JSONResponse,
    TabularResponse,
    TextResponse,
    CodeResponse,
    EditingResponseModel,
    JobSiteResponseModel,
]:
    """Asynchronously calls OpenAI API and parses the response."""
    openai_client = client or AsyncOpenAI(api_key=get_openai_api_key())
    logger.info("OpenAI client ready for async API call.")
    return await call_api_async(
        openai_client,
        model_id,
        prompt,
        expected_res_type,
        json_type,
        temperature,
        max_tokens,
        "openai",
    )        

Anthropic (Claude)

# Async wrapper for Claude
async def call_claude_api_async(
    prompt: str,
    model_id: str = CLAUDE_SONNET,
    expected_res_type: str = "str",
    json_type: str = "",
    temperature: float = 0.4,
    max_tokens: int = 1056,
    client: Optional[AsyncAnthropic] = None,
) -> Union[
    JSONResponse,
    TabularResponse,
    TextResponse,
    CodeResponse,
    EditingResponseModel,
    JobSiteResponseModel,
]:
    """Asynchronously calls the Claude API to generate responses based on a given prompt."""
    claude_client = client or AsyncAnthropic(api_key=get_claude_api_key())
    logger.info("Claude client ready for async API call.")
    return await call_api_async(
        claude_client,
        model_id,
        prompt,
        expected_res_type,
        json_type,
        temperature,
        max_tokens,
        "claude",
    )        

LLaMA

# Async wrapper for Llama 3
async def call_llama3_async(
    prompt: str,
    model_id: str = "llama3",
    expected_res_type: str = "str",
    json_type: str = "",
    temperature: float = 0.4,
    max_tokens: int = 1056,
) -> Union[
    JSONResponse,
    TabularResponse,
    TextResponse,
    CodeResponse,
    EditingResponseModel,
    JobSiteResponseModel,
]:
    """Asynchronously calls the Llama 3 API and parses the response."""
    return await call_api_async(
        client=None,
        model_id=model_id,
        prompt=prompt,
        expected_res_type=expected_res_type,
        json_type=json_type,
        temperature=temperature,
        max_tokens=max_tokens,
        llm_provider="llama3",
    )        

These are the high-level async functions that wrap around specific LLM APIs, such as OpenAI, Claude, and Llama3. They are the entry points of the module: whenever you need to make a LLM query, you call one of them directly.

  • Standard LLM API parameters:
  • Llm_provider: opeanai, claude, or llama3
  • Custom prompt
  • Temperature: 0 to 1.0; determines how “tight” you want the LLM response to be (if you want the LLM to be more very exact and “literal”, i.e., extraction, classification, code generation, you need to tighten it to 0.3 or lower; for idea generation or ideation, you want to loosen it up to 0.8 or higher; tasks like summarization are somewhere in between.
  • Max_token: how many tokens are allowed; I usually like to set it to at least 1056.
  • Client: if an instantiated API instance is passed down by the higher-level function calling it (for OpenAI or Anthropic), then use that client (instance.) Otherwise create a new instance here in the function.
  • In OpenAI and Anthropic, you call OpenAI or Anthropic class to instantiate the client (here I called AsyncOpena/AsyncAnthropic instead). I used the utils functions get_openai_api_key/get_claude_apiKkey to retrieve the API keys from .env file using dotenv.


Note: dotenv/.env is the most common API key management method.

  • Simply create a .env file with api keys: “OPENAI_API_KEY=xx-xx…”
  • from dotenv import load_dotenv, import os
  • load_dotenv()
  • api_key = os.env(“OPENAI_API_KEY”)


expected_res_type refers to "labels" that classify and validate LLM responses. You can request responses in these four formats:

  • str (plain text): For conversational interactions, simple and validation-free.
  • json: Structured responses validated against custom schemas or models.
  • tabular (tables): For data in rows and columns, like CSV or Markdown tables.
  • code: Executable code (e.g., Python, JavaScript), validated for syntax and requirements.

These formats ensure LLM outputs are tailored, parsable, and ready for downstream processes. While multi-modal options (like video/audio) are possible, the current focus is on text.

 

Core Unified API Handler

The wrapper functions then call on call_api_async method to execute the call.

# Unified async API calling function
async def call_api_async(
    client: Optional[Union[AsyncOpenAI, AsyncAnthropic]],
    model_id: str,
    prompt: str,
    expected_res_type: str,
    json_type: str,
    temperature: float,
    max_tokens: int,
    llm_provider: str,
) -> Union[
    JSONResponse,
    TabularResponse,
    CodeResponse,
    TextResponse,
    EditingResponseModel,
    JobSiteResponseModel,
]:
    """
    Asynchronous function for handling API calls to OpenAI, Claude, and Llama.

    This method handles provider-specific nuances (e.g., multi-block responses for Claude)
    and validates responses against expected types and Pydantic models.

    Args:
        client (Optional[Union[AsyncOpenAI, AsyncAnthropic]]):
            The API client instance for the respective provider.
            If None, a new client is instantiated.
        model_id (str):
            The model ID to use for the API call (e.g., "gpt-4-turbo" for OpenAI).
        prompt (str):
            The input prompt for the LLM.
        expected_res_type (str):
            The expected type of response (e.g., "json", "tabular", "str", "code").
        json_type (str):
            Specifies the type of JSON model for validation (e.g., "job_site", "editing").
        temperature (float):
            Sampling temperature for the LLM.
        max_tokens (int):
            Maximum number of tokens for the response.
        llm_provider (str):
            The name of the LLM provider ("openai", "claude", or "llama3").

    Returns:
        Union[JSONResponse, TabularResponse, CodeResponse, TextResponse, EditingResponseModel,
        JobSiteResponseModel]:
            The validated and structured response.

    Raises:
        ValueError: If the response cannot be validated or parsed.
        TypeError: If the response type does not match the expected format.
        Exception: For other unexpected errors during API interaction.

    Notes:
    - OpenAI & Llama3 always returns single-block responses, while Claude may
    return multi-block responses, which needs special treatment.
    - Llama3 API is synchronous and is executed using an async executor.
    #* Therefore, the API calling for each LLM provider need to remain separate:
        #* Combining them into a single code block will have complications;
        #* keep them separate here for each provider is a more clean and modular.

    Examples:
    >>> await call_api_async(
            client=openai_client,
            model_id="gpt-4-turbo",
            prompt="Translate this text to French",
            expected_res_type="json",
            json_type="editing",
            temperature=0.5,
            max_tokens=100,
            llm_provider="openai"
        )
    """
    try:
        logger.info(f"Making API call with expected response type: {expected_res_type}")
        response_content = ""

        if llm_provider == "openai":
            openai_client = cast(AsyncOpenAI, client)
            response = await openai_client.chat.completions.create(
                model=model_id,
                messages=[
                    {"role": "system", "content": "You are a helpful assistant."},
                    {"role": "user", "content": prompt},
                ],
                temperature=temperature,
                max_tokens=max_tokens,
            )
            response_content = response.choices[0].message.content

        elif llm_provider == "claude":
            claude_client = cast(AsyncAnthropic, client)
            system_instruction = (
                "You are a helpful assistant who adheres to instructions."
            )
            response = await claude_client.messages.create(
                model=model_id,
                max_tokens=max_tokens,
                messages=[{"role": "user", "content": system_instruction + prompt}],
                temperature=temperature,
            )

            # *Add an extra step to extract content from response object's TextBlocks
            # *(Unlike GPT and LlaMA, Claude uses multi-blocks in its responses:
            # *The content attribute of Message is a list of TextBlock objects,
            # *whereas others wrap everything into a single block.)
            response_content = (
                response.content[0].text
                if hasattr(response.content[0], "text")
                else str(response.content[0])
            )
            # logger.info(f"claude api response raw output: {response_content}")

        elif llm_provider == "llama3":
            # Llama3 remains synchronous, so run it in an executor
            options = {
                "temperature": temperature,
                "max_tokens": max_tokens,
                "batch_size": 10,
                "retry_enabled": True,
            }
            response = await run_in_executor_async(
                ollama.generate, model_id, prompt, options
            )
            response_content = response["response"]

        logger.info(f"Raw {llm_provider} Response: {response_content}")

        # Validation 1: response content and return structured response
        validated_response_model = validate_response_type(
            response_content, expected_res_type
        )

        logger.info(
            f"validated response content after validate_response_type: \n{validated_response_model}"
        )  # TODO: debugging; delete afterwards

        # Validation 2: Further validate JSONResponse -> edit response or job site response models
        if expected_res_type == "json":
            if isinstance(validated_response_model, JSONResponse):
                # Pass directly to validate_json_type for further validation
                validated_response_model = validate_json_type(
                    response_model=validated_response_model, json_type=json_type
                )
            else:
                raise TypeError(
                    "Expected validated response content needs to be a JSONResponse model."
                )

        logger.info(
            f"validated response content after validate_json_type: \n{validated_response_model}"
        )  # TODO: debugging; delete afterwards

        return validated_response_model

    except (json.JSONDecodeError, ValidationError) as e:
        logger.error(f"Validation or parsing error: {e}")
        raise ValueError(f"Invalid format received from {llm_provider} API: {e}")
    except Exception as e:
        logger.error(f"{llm_provider} API call failed: {e}")
        raise        

Step 1: Input Parameters

The function accepts:

  • llm_provider: The LLM provider (e.g., OpenAI, Anthropic, Llama3).
  • client: API client (e.g., AsyncOpenAI, AsyncAnthropic).
  • prompt: The input prompt.
  • model_id: Identifier for the LLM model.
  • temperature: Sampling temperature for response variability.
  • expected_res_type: Expected response type (e.g., str, json, tabular, code).
  • json_type: Specific JSON model for further validation.

Step 2: Provider-Specific API Calls

OpenAI

  • Use the AsyncOpenAI client.
  • Create a chat completion with system and user messages:

client.chat.completions()        

  • Extract response content from the first choice's message.

Claude

  • Use the AsyncAnthropic client.
  • Add a system instruction to the prompt:

client.message.create()        

  • Extract content from Claude's multi-block response (see Step 3 for details).

Llama3

  • Use the executor to run a synchronous API call asynchronously.
  • Generate the response with specified options.
  • Extract response content from the returned dictionary.

Step 3: Handling Claude's Multi-Block Responses

Unlike other providers (e.g., OpenAI, Meta), which deliver single-block outputs, Claude splits responses into multiple blocks.

  • Example:

Mathematica

Block 1 (Metadata): {"blocks": ["text", "table", "image"]} 

Block 2 (Text): "Results indicate a trend." 

Block 3 (Table): 
Condition | Value 
A              | 12.5 
B              | 15.3 

Block 4 (Image): Base64-encoded image data.         

Challenges:

  • Different block types require dynamic handling.
  • Single-block assumptions (content[0].text) may fail.

Solution: Handle responses dynamically:

response_content = (

    response.content[0].text 

    if hasattr(response.content[0], "text") 

    else str(response.content[0])

)         

Future Improvement: A more robust solution iterates through all blocks to capture essential data.

Note on Claude’s multi-block response:

Most LLM providers use the single-block approach to deliver complex or multi-part outputs. Anthropic API is quite unique by using the multi-block format, which makes it easier to process distinct data types independently, reduces token usage, and is more flexible. However, it also adds complexity to API interactions, as developers must dynamically handle different block formats without relying on predefined block types.

Step 4: First Validation – validate_response_type()

  • Ensures the response matches expected_res_type.
  • Converts the raw LLM response into one of the following Pydantic models: JSONResponse, TabularResponse, CodeResponse, TextResponse

Step 5: Second Validation – validate_json_type() (For JSON Only)

If the first validation outputs a JSONResponse, further validate the structure against specific JSON models: EditingResponseModel JobSiteResponseModel

Step 6: Return Structured Response

Final Output Types:

  • JSONResponse
  • TabularResponse
  • CodeResponse
  • TextResponse
  • EditingResponseModel, JobSiteResponseModel, or other domain specific JSON models

Original Extracted Content from Raw LLM Responses

  • OpenAI: Extracted content → message.content (string).
  • Claude: Extracted content → content[0].text or str(content[0]).
  • Llama3: Extracted content → response["response"] (string in a dictionary).

 

Guardrails:

  • Strict typing ensures type safety.
  • Each validation step transforms and validates the response, ensuring consistency and reliability.


Response Type Validation: validate_response_type

# Response type validation
def validate_response_type(
    response_content: Union[str, Any], expected_res_type: str
) -> Union[
    CodeResponse,
    JSONResponse,
    TabularResponse,
    TextResponse,
]:
    """
    Validates and structures the response content based on
    the expected response type.

    Args:
        response_content (Any): The raw response content from the LLM API.
        expected_res_type (str): The expected type of the response
            (e.g., "json", "tabular", "str", "code").

    Returns:
        Union[CodeResponse, JSONResponse, TabularResponse, TextResponse]:
            The validated and structured response as a Pydantic model instance.
            - CodeResponse: Returned when expected_res_type is "code", wraps code content.
            - JSONResponse, JobSiteResponseModel, or EditingResponseModel:
              Returned when expected_res_type is "json", based on json_type.
            - TabularResponse: Returned when expected_res_type is "tabular", wraps a DataFrame.
            - TextResponse: Returned when expected_res_type is "str", wraps plain text content.
    """

    if expected_res_type == "json":
        # Check if response_content is a string that needs parsing
        if isinstance(response_content, str):
            # Only parse if it's a string
            cleaned_content = clean_and_extract_json(response_content)
            if cleaned_content is None:
                raise ValueError("Failed to extract valid JSON from the response.")
        else:
            # If it's already a dict or list, use it directly
            cleaned_content = response_content

        # Create a JSONResponse instance with the cleaned content
        if isinstance(cleaned_content, (dict, list)):
            return JSONResponse(data=cleaned_content)
        else:
            raise TypeError(
                f"Expected dict or list for JSON response, got {type(cleaned_content)}"
            )

    elif expected_res_type == "tabular":
        try:
            # Parse as DataFrame and wrap in TabularResponse model
            df = pd.read_csv(StringIO(response_content))
            return TabularResponse(data=df)
        except Exception as e:
            logger.error(f"Error parsing tabular data: {e}")
            raise ValueError("Response is not valid tabular data.")

    elif expected_res_type == "str":
        # Wrap text response in TextResponse model
        return TextResponse(content=response_content)

    elif expected_res_type == "code":
        # Wrap code response in CodeResponse model
        return CodeResponse(code=response_content)

    else:
        raise ValueError(f"Unsupported response type: {expected_res_type}")        

This function is fairly straight forward, except for the JSON cleaning part.

  • The function dynamically validates the raw API response content against the expected type (json, tabular, str, or code). It uses the corresponding Pydantic models to structure the response.
  • JSON Handling: Cleans and parses JSON responses using clean_and_extract_json, wrapping valid data in JSONResponse.
  • Tabular Handling: Parses CSV content into a DataFrame and wraps it in TabularResponse.
  • String and Code Handling: Wraps plain text or code content in TextResponse or CodeResponse, respectively.
  • Error Handling: Raises errors for invalid formats or unsupported response types.

 

Data Cleaning and Parsing: clean_and_extract_json

# Function to clean/extract JSON content
def clean_and_extract_json(
    response_content: str,
) -> Optional[Union[Dict[str, Any], List[Any]]]:
    """
    Extracts, cleans, and parses JSON content from the API response.
    Strips out any non-JSON content like extra text before the JSON block.
    Also removes JavaScript-style comments and trailing commas.

    Args:
        response_content (str): Raw response content.

    Returns:
        Optional[Union[Dict[str, Any], List[Any]]]: Parsed JSON data as a dictionary or list,
        or None if parsing fails.
    """
    try:
        # Attempt direct parsing
        return json.loads(response_content)
    except json.JSONDecodeError:
        logger.warning("Initial JSON parsing failed. Attempting fallback extraction.")

    # Extract JSON-like structure (object or array)
    match = re.search(r"({.*}|\\[.*\\])", response_content, re.DOTALL)
    if not match:
        logger.error("No JSON-like content found.")
        return None

    try:
        # Remove trailing commas
        clean_content = re.sub(r",\\s*([}\\]])", r"\\1", match.group(0))
        return json.loads(clean_content)
    except json.JSONDecodeError as e:
        logger.error(f"Failed to parse JSON in fallback: {e}")
        return None        

This function supports validate_response_type by parsing and cleaning JSON content. Despite precise prompting, LLMs often return imperfect JSON - a common challenge in the field. LLM responses frequently include extra content outside the JSON structure (e.g., explanations, preambles, or trailing text), making direct parsing unreliable. This function ensures such messy JSON data is transformed into valid JSON (dictionary or list).

Step 1. Direct Parsing

Attempts to parse the input string using json.loads.

Step 2. Fallback with Regex

If parsing fails, it uses the re library to extract JSON-like structures ({...} or [...]) from unstructured text.

Regex Pattern: r"({.*}|\[.*\])"

  • Searches for JSON objects ({...}) or list-like arrays ([...]) in the content.
  • Handles escaped versions (\[...]) and mixed text that might include periods or extraneous characters.
  • Why This Matters: LLMs often wrap or append extra information around the JSON block, such as explanations or disclaimers. This regex isolates the structured data while ignoring surrounding noise.
  • Caveat: The regex is "greedy" and may match too much if multiple structures are present.

Step 3. Cleaning

Removes JavaScript-style trailing commas from the extracted JSON to ensure validity.

Step 4. Final Parsing

  • Attempts to parse the cleaned JSON block.
  • Returns the result if successful.

To view or add a comment, sign in

Insights from the community

Others also viewed

Explore topics