LangChain Core: Model Execution Patterns 🚀
The Restaurant Kitchen Analogy 🍳
Imagine you’re running a busy restaurant kitchen. Orders come in, dishes go out. But HOW you handle those orders makes all the difference between a smooth dinner service and total chaos!
LangChain’s execution patterns are like different ways to run your kitchen:
- Streaming = Sending appetizers out as they’re ready (not waiting for the full meal)
- Batch Processing = Cooking 20 identical orders at once
- Async Operations = Multiple chefs working on different dishes simultaneously
- Metadata = The ticket that tells you cooking time, ingredients used, and costs
Let’s dive into each pattern!
1. Streaming Model Responses 🌊
What Is It?
Instead of waiting for the ENTIRE response (like waiting 30 seconds for a full essay), streaming gives you words as they’re generated — one by one, like watching someone type!
Why Does This Matter?
Think about watching a chef prepare your food at a sushi bar. You see each piece being made. That’s streaming! You don’t wait 20 minutes wondering if anything is happening.
Without streaming:
User clicks submit… waits… waits… waits… BOOM! Wall of text appears
With streaming:
User clicks submit… words start flowing immediately… feels fast and alive!
Simple Example
from langchain_openai import ChatOpenAI
# Create a streaming-enabled model
llm = ChatOpenAI(streaming=True)
# Watch words appear one by one!
for chunk in llm.stream("Tell me a joke"):
print(chunk.content, end="", flush=True)
Output appears like: Why did the chicken... (word by word!)
The Magic Behind It
graph TD A[You Ask Question] --> B[Model Starts Thinking] B --> C[First Word Ready] C --> D[Send First Word] D --> E[More Words Ready] E --> F[Send Each Word] F --> G[Last Word Sent] G --> H[Done!]
Key Point: Your app feels FAST because users see progress immediately!
2. Streaming Events API 📡
What Is It?
Streaming events is like having a live scoreboard that shows you EVERYTHING happening inside LangChain — not just the final answer, but every step along the way!
The Dashboard Analogy
Imagine a delivery app that shows:
- ✅ Order received
- 🍳 Chef started cooking
- 📦 Food being packaged
- 🚗 Driver picked up
- 🏠 2 minutes away!
That’s what astream_events does for your AI!
Simple Example
async for event in chain.astream_events(
{"topic": "cats"},
version="v2"
):
kind = event["event"]
if kind == "on_chat_model_start":
print("🚀 Model starting...")
elif kind == "on_chat_model_stream":
print(event["data"]["chunk"].content)
elif kind == "on_chat_model_end":
print("✅ Model finished!")
What Events Can You Catch?
| Event Type | When It Fires | What You Learn |
|---|---|---|
on_chain_start |
Chain begins | Which chain, what inputs |
on_chat_model_stream |
Each token | The actual content piece |
on_tool_start |
Tool called | Which tool, what arguments |
on_chain_end |
Chain complete | Final outputs |
Why Is This Powerful?
- Debug complex chains — see exactly where things slow down
- Build progress bars — show users what’s happening
- Log everything — track costs and performance
3. Batch Processing ⚡
What Is It?
Batch processing means sending multiple requests at once instead of one-by-one. Like a teacher grading 30 essays simultaneously instead of reading each one, taking a break, then reading the next.
The Coffee Shop Analogy ☕
Without batching (slow):
- Make coffee #1… deliver… come back
- Make coffee #2… deliver… come back
- Make coffee #3… deliver… come back (Takes 15 minutes)
With batching (fast):
- Make coffees #1, #2, #3 at the same time
- Deliver all together (Takes 5 minutes)
Simple Example
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
# Process 3 questions AT ONCE!
questions = [
"What is the capital of France?",
"What is 2 + 2?",
"Who wrote Romeo and Juliet?"
]
# This sends all 3 together (FAST!)
answers = llm.batch(questions)
for q, a in zip(questions, answers):
print(f"Q: {q}\nA: {a.content}\n")
Batch Configuration
# Control how many run in parallel
llm.batch(
questions,
config={"max_concurrency": 5}
)
graph TD A[10 Questions] --> B[Batch Processor] B --> C[Question 1] B --> D[Question 2] B --> E[Question 3] B --> F[...] C --> G[All Answers Ready] D --> G E --> G F --> G
Pro Tip: APIs have rate limits! Set max_concurrency to avoid getting blocked.
4. Async Operations 🔄
What Is It?
Async means your code doesn’t sit around waiting. While one AI call is processing, your program can do other things!
The Multitasking Chef 👨🍳
Synchronous (Blocking):
- Put pasta in water → STAND AND WAIT 10 MINUTES
- Then chop vegetables → WAIT UNTIL DONE
- Then make sauce → WAIT UNTIL DONE (Total: 30 minutes)
Asynchronous (Non-Blocking):
- Put pasta in water → Set timer, move on!
- While pasta cooks → Chop vegetables
- While chopping → Start sauce simmering (Total: 12 minutes)
The Key Methods
| Sync Method | Async Method | What It Does |
|---|---|---|
.invoke() |
.ainvoke() |
Single call |
.batch() |
.abatch() |
Multiple calls |
.stream() |
.astream() |
Token-by-token |
Simple Example
import asyncio
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
async def ask_question(question):
# 'await' means: start this, but don't freeze!
response = await llm.ainvoke(question)
return response.content
async def main():
# Fire off 3 questions SIMULTANEOUSLY
tasks = [
ask_question("What is AI?"),
ask_question("What is ML?"),
ask_question("What is DL?")
]
# Wait for ALL to finish
results = await asyncio.gather(*tasks)
for r in results:
print(r)
asyncio.run(main())
When To Use Async?
✅ Use async when:
- Building web apps (FastAPI, etc.)
- Making many API calls
- Building chatbots
- Need maximum speed
❌ Stick with sync when:
- Simple scripts
- Learning/prototyping
- Code clarity matters more than speed
5. Response and Usage Metadata 📊
What Is It?
Every time an AI model responds, it includes extra information beyond just the answer. This metadata tells you:
- How many tokens were used (cost!)
- What model version answered
- Processing time
- Token breakdown (input vs output)
The Receipt Analogy 🧾
When you buy coffee, you get:
- ☕ The coffee (the response)
- 🧾 The receipt (the metadata)
The receipt shows: price, time, store location, payment method. Metadata is your AI’s receipt!
Simple Example
from langchain_openai import ChatOpenAI
llm = ChatOpenAI()
response = llm.invoke("What is LangChain?")
# The answer
print(response.content)
# The metadata (the receipt!)
print(response.response_metadata)
Output:
{
'token_usage': {
'prompt_tokens': 12,
'completion_tokens': 85,
'total_tokens': 97
},
'model_name': 'gpt-3.5-turbo',
'finish_reason': 'stop'
}
Key Metadata Fields
| Field | What It Tells You | Why You Care |
|---|---|---|
prompt_tokens |
Tokens in your question | Affects cost |
completion_tokens |
Tokens in the answer | Affects cost |
total_tokens |
Sum of both | Total cost |
model_name |
Which model replied | Debugging |
finish_reason |
Why it stopped | stop = normal, length = hit limit |
Usage Metadata in Streaming
Even when streaming, you can get token counts!
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(
streaming=True,
stream_usage=True # Enable usage tracking!
)
for chunk in llm.stream("Explain quantum"):
if chunk.usage_metadata:
print(f"Tokens so far: {chunk.usage_metadata}")
Cost Tracking Example
def calculate_cost(metadata,
input_price=0.001,
output_price=0.002):
usage = metadata.get('token_usage', {})
input_cost = usage.get('prompt_tokens', 0) / 1000 * input_price
output_cost = usage.get('completion_tokens', 0) / 1000 * output_price
return input_cost + output_cost
cost = calculate_cost(response.response_metadata)
print(f"This call cost: ${cost:.6f}")
Putting It All Together 🎯
graph TD A[Your Question] --> B{How to Execute?} B -->|One at a time| C[invoke/ainvoke] B -->|Many at once| D[batch/abatch] B -->|See each word| E[stream/astream] B -->|Full visibility| F[astream_events] C --> G[Response + Metadata] D --> G E --> G F --> G
Quick Decision Guide
| Situation | Best Method |
|---|---|
| Simple single question | invoke() |
| Web app, need speed | ainvoke() |
| Processing 100 documents | batch() |
| User wants to see typing | stream() |
| Building complex chains | astream_events() |
| Need to track costs | Check response_metadata |
Summary: Your New Kitchen Skills! 🎓
You’ve learned the 5 core execution patterns of LangChain:
- Streaming 🌊 — Words flow in real-time (sushi bar experience)
- Events API 📡 — Full visibility into every step (delivery tracker)
- Batch Processing ⚡ — Handle many requests at once (coffee shop efficiency)
- Async Operations 🔄 — Don’t wait around (multitasking chef)
- Metadata 📊 — Know your costs and stats (the receipt)
You’re now ready to run the most efficient AI kitchen in town! 🍳✨