Model Execution Patterns

Loading concept...

LangChain Core: Model Execution Patterns 🚀

The Restaurant Kitchen Analogy 🍳

Imagine you’re running a busy restaurant kitchen. Orders come in, dishes go out. But HOW you handle those orders makes all the difference between a smooth dinner service and total chaos!

LangChain’s execution patterns are like different ways to run your kitchen:

  • Streaming = Sending appetizers out as they’re ready (not waiting for the full meal)
  • Batch Processing = Cooking 20 identical orders at once
  • Async Operations = Multiple chefs working on different dishes simultaneously
  • Metadata = The ticket that tells you cooking time, ingredients used, and costs

Let’s dive into each pattern!


1. Streaming Model Responses 🌊

What Is It?

Instead of waiting for the ENTIRE response (like waiting 30 seconds for a full essay), streaming gives you words as they’re generated — one by one, like watching someone type!

Why Does This Matter?

Think about watching a chef prepare your food at a sushi bar. You see each piece being made. That’s streaming! You don’t wait 20 minutes wondering if anything is happening.

Without streaming:

User clicks submit… waits… waits… waits… BOOM! Wall of text appears

With streaming:

User clicks submit… words start flowing immediately… feels fast and alive!

Simple Example

from langchain_openai import ChatOpenAI

# Create a streaming-enabled model
llm = ChatOpenAI(streaming=True)

# Watch words appear one by one!
for chunk in llm.stream("Tell me a joke"):
    print(chunk.content, end="", flush=True)

Output appears like: Why did the chicken... (word by word!)

The Magic Behind It

graph TD A[You Ask Question] --> B[Model Starts Thinking] B --> C[First Word Ready] C --> D[Send First Word] D --> E[More Words Ready] E --> F[Send Each Word] F --> G[Last Word Sent] G --> H[Done!]

Key Point: Your app feels FAST because users see progress immediately!


2. Streaming Events API 📡

What Is It?

Streaming events is like having a live scoreboard that shows you EVERYTHING happening inside LangChain — not just the final answer, but every step along the way!

The Dashboard Analogy

Imagine a delivery app that shows:

  • ✅ Order received
  • 🍳 Chef started cooking
  • 📦 Food being packaged
  • 🚗 Driver picked up
  • 🏠 2 minutes away!

That’s what astream_events does for your AI!

Simple Example

async for event in chain.astream_events(
    {"topic": "cats"},
    version="v2"
):
    kind = event["event"]

    if kind == "on_chat_model_start":
        print("🚀 Model starting...")
    elif kind == "on_chat_model_stream":
        print(event["data"]["chunk"].content)
    elif kind == "on_chat_model_end":
        print("✅ Model finished!")

What Events Can You Catch?

Event Type When It Fires What You Learn
on_chain_start Chain begins Which chain, what inputs
on_chat_model_stream Each token The actual content piece
on_tool_start Tool called Which tool, what arguments
on_chain_end Chain complete Final outputs

Why Is This Powerful?

  • Debug complex chains — see exactly where things slow down
  • Build progress bars — show users what’s happening
  • Log everything — track costs and performance

3. Batch Processing ⚡

What Is It?

Batch processing means sending multiple requests at once instead of one-by-one. Like a teacher grading 30 essays simultaneously instead of reading each one, taking a break, then reading the next.

The Coffee Shop Analogy ☕

Without batching (slow):

  1. Make coffee #1… deliver… come back
  2. Make coffee #2… deliver… come back
  3. Make coffee #3… deliver… come back (Takes 15 minutes)

With batching (fast):

  1. Make coffees #1, #2, #3 at the same time
  2. Deliver all together (Takes 5 minutes)

Simple Example

from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

# Process 3 questions AT ONCE!
questions = [
    "What is the capital of France?",
    "What is 2 + 2?",
    "Who wrote Romeo and Juliet?"
]

# This sends all 3 together (FAST!)
answers = llm.batch(questions)

for q, a in zip(questions, answers):
    print(f"Q: {q}\nA: {a.content}\n")

Batch Configuration

# Control how many run in parallel
llm.batch(
    questions,
    config={"max_concurrency": 5}
)
graph TD A[10 Questions] --> B[Batch Processor] B --> C[Question 1] B --> D[Question 2] B --> E[Question 3] B --> F[...] C --> G[All Answers Ready] D --> G E --> G F --> G

Pro Tip: APIs have rate limits! Set max_concurrency to avoid getting blocked.


4. Async Operations 🔄

What Is It?

Async means your code doesn’t sit around waiting. While one AI call is processing, your program can do other things!

The Multitasking Chef 👨‍🍳

Synchronous (Blocking):

  1. Put pasta in water → STAND AND WAIT 10 MINUTES
  2. Then chop vegetables → WAIT UNTIL DONE
  3. Then make sauce → WAIT UNTIL DONE (Total: 30 minutes)

Asynchronous (Non-Blocking):

  1. Put pasta in water → Set timer, move on!
  2. While pasta cooks → Chop vegetables
  3. While chopping → Start sauce simmering (Total: 12 minutes)

The Key Methods

Sync Method Async Method What It Does
.invoke() .ainvoke() Single call
.batch() .abatch() Multiple calls
.stream() .astream() Token-by-token

Simple Example

import asyncio
from langchain_openai import ChatOpenAI

llm = ChatOpenAI()

async def ask_question(question):
    # 'await' means: start this, but don't freeze!
    response = await llm.ainvoke(question)
    return response.content

async def main():
    # Fire off 3 questions SIMULTANEOUSLY
    tasks = [
        ask_question("What is AI?"),
        ask_question("What is ML?"),
        ask_question("What is DL?")
    ]

    # Wait for ALL to finish
    results = await asyncio.gather(*tasks)

    for r in results:
        print(r)

asyncio.run(main())

When To Use Async?

Use async when:

  • Building web apps (FastAPI, etc.)
  • Making many API calls
  • Building chatbots
  • Need maximum speed

Stick with sync when:

  • Simple scripts
  • Learning/prototyping
  • Code clarity matters more than speed

5. Response and Usage Metadata 📊

What Is It?

Every time an AI model responds, it includes extra information beyond just the answer. This metadata tells you:

  • How many tokens were used (cost!)
  • What model version answered
  • Processing time
  • Token breakdown (input vs output)

The Receipt Analogy 🧾

When you buy coffee, you get:

  • ☕ The coffee (the response)
  • 🧾 The receipt (the metadata)

The receipt shows: price, time, store location, payment method. Metadata is your AI’s receipt!

Simple Example

from langchain_openai import ChatOpenAI

llm = ChatOpenAI()
response = llm.invoke("What is LangChain?")

# The answer
print(response.content)

# The metadata (the receipt!)
print(response.response_metadata)

Output:

{
    'token_usage': {
        'prompt_tokens': 12,
        'completion_tokens': 85,
        'total_tokens': 97
    },
    'model_name': 'gpt-3.5-turbo',
    'finish_reason': 'stop'
}

Key Metadata Fields

Field What It Tells You Why You Care
prompt_tokens Tokens in your question Affects cost
completion_tokens Tokens in the answer Affects cost
total_tokens Sum of both Total cost
model_name Which model replied Debugging
finish_reason Why it stopped stop = normal, length = hit limit

Usage Metadata in Streaming

Even when streaming, you can get token counts!

from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
    streaming=True,
    stream_usage=True  # Enable usage tracking!
)

for chunk in llm.stream("Explain quantum"):
    if chunk.usage_metadata:
        print(f"Tokens so far: {chunk.usage_metadata}")

Cost Tracking Example

def calculate_cost(metadata,
                   input_price=0.001,
                   output_price=0.002):
    usage = metadata.get('token_usage', {})
    input_cost = usage.get('prompt_tokens', 0) / 1000 * input_price
    output_cost = usage.get('completion_tokens', 0) / 1000 * output_price
    return input_cost + output_cost

cost = calculate_cost(response.response_metadata)
print(f"This call cost: ${cost:.6f}")

Putting It All Together 🎯

graph TD A[Your Question] --> B{How to Execute?} B -->|One at a time| C[invoke/ainvoke] B -->|Many at once| D[batch/abatch] B -->|See each word| E[stream/astream] B -->|Full visibility| F[astream_events] C --> G[Response + Metadata] D --> G E --> G F --> G

Quick Decision Guide

Situation Best Method
Simple single question invoke()
Web app, need speed ainvoke()
Processing 100 documents batch()
User wants to see typing stream()
Building complex chains astream_events()
Need to track costs Check response_metadata

Summary: Your New Kitchen Skills! 🎓

You’ve learned the 5 core execution patterns of LangChain:

  1. Streaming 🌊 — Words flow in real-time (sushi bar experience)
  2. Events API 📡 — Full visibility into every step (delivery tracker)
  3. Batch Processing ⚡ — Handle many requests at once (coffee shop efficiency)
  4. Async Operations 🔄 — Don’t wait around (multitasking chef)
  5. Metadata 📊 — Know your costs and stats (the receipt)

You’re now ready to run the most efficient AI kitchen in town! 🍳✨

Loading story...

No Story Available

This concept doesn't have a story yet.

Story Preview

Story - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

Interactive Preview

Interactive - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

No Interactive Content

This concept doesn't have interactive content yet.

Cheatsheet Preview

Cheatsheet - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

No Cheatsheet Available

This concept doesn't have a cheatsheet yet.

Quiz Preview

Quiz - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.

No Quiz Available

This concept doesn't have a quiz yet.