FLUX.2 is live! High-fidelity image generation made simple.

LLM pricing can feel opaque until you translate it into a few simple numbers: input tokens, output tokens, and price per million. Every request you send—system prompt, chat history, RAG context, tool-call JSON—counts as input; everything the model writes back counts as output. Once you know those two counts, the cost of a completion is just napkin math.
This article dives into that math and, more importantly, shows the real drivers of your bill: history length, retrieved context size, output length, and whether you can reuse cached input. We’ll walk through small, copy-paste examples, then scale up to monthly budgets for common apps (internal chatbot, customer assistant, long-context agent) under different pricing scenarios.
A token is a small chunk of text—on average 3–4 characters in English, roughly three-quarters of a word. Models count every token that crosses the wire, not just the user’s latest message. That means your system prompt, prior conversation history, any retrieved passages (RAG), and even tool-call/JSON payloads all contribute to the input token total. The model’s reply contributes output tokens. Most providers bill separately for each, at a rate expressed per one million tokens.
These are some examples and their token length for you to get a better feeling for it:
Example A — ~13 tokens
“How much do 2,000 input and 400 output tokens cost?”
Example B — ~68 tokens
“Pricing for LLMs comes down to simple math: count every input token (system prompt, chat history, retrieved context, tool JSON) and every output token the model writes. Then multiply by your provider’s price per million tokens and cap max_tokens to avoid run-on answers.”
Example C — ~128 tokens
“To control spending in production, keep a short, stable system prompt; maintain a rolling window of conversation history; and deduplicate retrieved passages before you send them. Prefer structured outputs (JSON) with concise keys, set sensible max_tokens and stop sequences on every request, and use cached input pricing when you can by keeping repeated blocks byte-identical. Track usage (prompt/completion tokens and estimated cost) for every call, and alert when a single request or a session crosses your budget.”
You might wonder why we are only giving estimates for each of these sentences, but the same example can actually have different token lengths. Depending on the so-called tokenizer that a LLM uses, the token length can vary. Usually the variance is very small but it is not fixed for a specific text.
Input pricing covers everything you send to the model in a request: the system prompt, prior conversation history, any retrieved context (RAG chunks), and function/tool arguments or JSON you include. If it’s in the prompt payload, it’s billable input.
Output pricing covers what the model returns—including tool-call JSON if the model emits structured outputs. Shorter responses cost less; long generations (summaries, reports, code) cost more.
Cached input (when available) can materially reduce spend. Many apps resend the same text—unchanged instructions, policy blocks, or repeated context windows. DeepInfra offers a discounted cached-input rate for byte-identical text, hash and reuse these segments so they’re billed at the lower tier. This is one of the biggest levers for RAG and multi-turn chats.
Precision & throughput don’t change list prices, but they change total cost of ownership. Faster stacks (e.g., better quantization/precision choices, optimized KV cache) (–> internal link) let you serve more completions per GPU and shrink tail latencies, which lowers infrastructure and concurrency costs around the metered model price—especially at scale.
A single turn with 1,200 input tokens (of which 700 are cache-eligible) and 350 output tokens: you pay 500 tokens at the standard input rate, 700 at the cached-input rate, and 350 at the output rate. Same token math, less money if you reuse what you send.
After talking about the basics of pricing, let’s get into real numbers of API calls.
Let T_{\text{in}} be the input tokens you send, T_{\text{out}} the output tokens the model returns. Let P_{\text{in}} and P_{\text{out}} be the DeepInfra’s price per one million tokens for input and output depending on the model used, and (optionally) T_{\text{cache}} the subset of input tokens billed at a discounted cached-input rate P_{\text{cache}}.
Good to Know: DeepInfra offers discounted cached input tokens on various models such as Kimi K2 Instruct 0905 or Claude 3.7.
\text{Cost} = \frac{T_{\text{in}} \cdot P_{\text{in}} \;+\; T_{\text{out}} \cdot P_{\text{out}}}{10^{6}}
And using the cached input:
\text{Cost} = \frac{(T_{\text{in}} – T_{\text{cache}})\cdot P_{\text{in}} \;+\; T_{\text{cache}}\cdot P_{\text{cache}} \;+\; T_{\text{out}}\cdot P_{\text{out}}}{10^{6}}
Real life example with and without cache:
\begin{aligned}
\text{Given: }& T_{\text{in}}=2{,}000,\quad T_{\text{out}}=400.\\[4pt]
\text{Cost (no cache)}\;&=\;\frac{2{,}000\cdot 0.50 \;+\; 400\cdot 2.00}{10^{6}}
\;=\; \$0.0018.\\[8pt]
\text{Cost (with }T_{\text{cache}}=1{,}000\text{)}\;&=\;
\frac{1{,}000\cdot 0.50 \;+\; 1{,}000\cdot 0.40 \;+\; 400\cdot 2.00}{10^{6}}
\;=\; \$0.0017.
\end{aligned}
Even though the cost advantage of cached inputs seems rather small, it can really make sense for bigger applications and high user interaction.
To make the token math concrete, here are three illustrative scenarios using the example rates \(P_{\text{in}}=\$0.50/\text{M}\), \(P_{\text{out}}=\$2.00/\text{M}\) (and \(P_{\text{cache}}=\$0.40/\text{M}\) where noted). Adjust the counts to your traffic.
A company chatbot used by 1,000 employees, about 10 exchanges per user/month. Each exchange averages 1,500 input and 250 output tokens.
Cost per exchange: \((1500\times 0.50 + 250\times 2.00)/10^{6} = (750 + 500)/10^{6} = \$0.00125\).
Monthly cost: \(1{,}000 \times 10 \times \$0.00125 = \$12.50\).
A public-facing assistant with 50,000 MAU, each making 8 exchanges/month, at 2,000 input and 400 output tokens per exchange.
Cost per exchange: \((2000\times 0.50 + 400\times 2.00)/10^{6} = (1000 + 800)/10^{6} = \$0.0018\).
Monthly cost: \(50{,}000 \times 8 \times \$0.0018 = \$720\).
An agent used by 5,000 users, 5 runs/user/month. Each run sends 10,000 input tokens where 60% are cached (6,000) and 40% uncached (4,000), plus 2,000 output tokens.
Cost per run: \((4000\times 0.50 + 6000\times 0.40 + 2000\times 2.00)/10^{6} = (2000 + 2400 + 4000)/10^{6} = \$0.0084\).
Monthly cost: \(5{,}000 \times 5 \times \$0.0084 = \$210\).
As you can see, the costs follow tokens. Long histories and wide retrieval windows inflate input spend; verbose answers inflate output spend; and reusing stable text via cache can materially lower both. Tune these three levers—context length, answer length, and cache hit rate—and you control the bill. Start by monitoring token breakdowns, setting sensible caps, chunking retrieval thoughtfully, and caching any instructions or docs that repeat.
The tactics below show how to prune and cache inputs, cap and gate outputs, slim tool JSON, and right-size the model—usually cutting costs by multiples without hurting UX.
The cheapest LLM call is the one that sends fewer tokens and never surprises you. This section gives you a compact, copy-paste playbook—plus runnable Python you can drop into to keep spend predictable on DeepInfra’s OpenAI-compatible API.
To try along with our examples, make sure you open yourself a Jupyter Notebook and store your individual API key for DeepInfra in an environment variable. For the examples, we also use the OpenAI-compatible client, so make sure, you have that installed or use:
!pip install openaiThen you can run the following setup and insert the API key into the input field that opens up:
# Setup: store your token securely and create a DeepInfra OpenAI-compatible client
import os, getpass
from openai import OpenAI
os.environ["DEEPINFRA_API_TOKEN"] = getpass.getpass("Paste your DeepInfra API token: ")
client = OpenAI(
api_key=os.environ["DEEPINFRA_API_TOKEN"],
base_url="https://api.deepinfra.com/v1/openai",For our following examples, we will be using the Kimi K2 Instruct 0905 (–> internal link) model but these tips can of course be applied to other LLMs as well.
Output tokens are usually pricier than input. Always set a hard cap and add simple stop markers to prevent run-ons.
def ask_capped(messages, *, model="moonshotai/Kimi-K2-Instruct-0905", max_tokens=256):
return client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens, # hard cap on output cost
stop=["<END>", "\n\n\n"], # early stopping guard
response_format={"type": "json_object"} # terse, structured replies
)Using the max_tokens key, we can set a hard limit on the output tokens which is a rather effective way to limit costs. However, we need to find a sweet spot to keep a good quality for our application. In our example, capping to 120 tokens is way to early:
msgs = [{"role":"user","content":"Give three bullet tips to cut LLM spend."}]
resp = ask_capped(msgs, max_tokens=120)
print(resp.choices[0].message.content)System output:
[-1][EOS]
[-2] **Smarter prompts cost less tokens.** Use strict in-context examples (“show, don’t tell”) and tell the model to “be concise” or “answer in 20 words.” Even a 30 % shorter prompt/request → 30 % cheaper.
[-3] **Right-size the model.** Route 80 % of traffic to the smallest model that still meets the task (e.g., 3.5-turbo for summarization) and reserve the largest (GPT-4 / Claude-3-Opus) only when a
If your workload reuses the same instruction block or context, keep it byte-identical so it qualifies for cached-input pricing where available.
MODEL = "moonshotai/Kimi-K2-Instruct-0905"
# Make the shared prefix very long and ensure it's the very first tokens
SYSTEM_BLOCK = (
"You are a concise JSON-only assistant. "
"Respond with a compact JSON object. "
"If additional detail exists, include \"expandable\": true. \n"
)
POLICY_LINE = (
"Rule: Do not add commentary. Prefer short keys. "
"Use stable field order. Avoid repeating unchanged values.\n"
)
SYSTEM_BLOCK = SYSTEM_BLOCK + (POLICY_LINE * 400)
# Put all reusable context into the *system* message (earliest position).
CONTEXT = (
"Policy A: Users may request summaries; keep responses under 80 tokens by default.\n\n"
"Policy B: For lists, cap items at 5 unless user asks to expand.\n\n"
"Policy C: Use ISO dates; currencies in USD.\n"
)
def make_messages(user_question: str):
return [
{"role": "system", "content": SYSTEM_BLOCK + "\n" + CONTEXT},
{"role": "user", "content": user_question}
]
def call_once(q):
r = client.chat.completions.create(
model=MODEL,
messages=make_messages(q),
temperature=0,
max_tokens=200,
)
u = r.usage
pt = getattr(u, "prompt_tokens", 0)
ct = getattr(u, "completion_tokens", 0)
est = getattr(u, "estimated_cost", None)
ptd = getattr(u, "prompt_tokens_details", None)
cached = 0
if ptd is not None:
cached = getattr(ptd, "cached_tokens", 0) or (ptd.get("cached_tokens") if isinstance(ptd, dict) else 0)
return {"pt": pt, "ct": ct, "cached": cached, "cost": float(est) if est is not None else None}
cold = call_once("Summarize the policies in 3 bullets.")
warm = call_once("List 3 risks if these rules are ignored.") # different tail; identical prefix
print("Cold:", cold) # expect low cache
print("Warm:", warm) # expect very high cache
if cold["cost"] and warm["cost"]:
print(f"Savings: {100*(1 - warm['cost']/cold['cost']):.1f}%")System Output:
Cold: {‘pt’: 8500, ‘ct’: 43, ‘cached’: 34, ‘cost’: 0.0043326}
Warm: {‘pt’: 8501, ‘ct’: 20, ‘cached’: 8487, ‘cost’: 0.0034418}
Savings: 20.6%
The expected saving of roughly 20% is exactly what we would have since for Kimi K2, the cached inputs are 20% cheaper than regular input tokens.
Retrieval is usually your biggest input-token driver. Sending fewer, higher-quality chunks lowers cost and often improves answers. Do three simple things before every call:
Prefer short, sentence-level spans over full pages.
(Your system prompt can still be cache-eligible—keep it byte-identical across calls—but RAG chunks change per query, so the win here is mainly fewer tokens.)
import uuid, os
# Unique per script run, identical across both calls in the run
RUN_SEED = os.getenv("RAG_RUN_SEED") or uuid.uuid4().hex
# Prepend the seed so this run's prefix doesn't reuse an older server cache
SYSTEM_BLOCK = f"RUN_SEED:{RUN_SEED}\n" + SYSTEM_BLOCK
import re, textwrap
from difflib import SequenceMatcher
# ──────────────────────────────────────────────────────────────────────────────
# RAG CHUNK PREP — de-duplicate near-identicals, cap K, prefer short spans
# ──────────────────────────────────────────────────────────────────────────────
SENTENCE_RE = re.compile(r"(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!)\s+")
def split_sentences(text: str):
parts = SENTENCE_RE.split(text.strip())
return [p.strip() for p in parts if p.strip()]
def _norm(s: str) -> str:
return re.sub(r"\s+", " ", s).strip().lower()
def _near_dup(a: str, b: str, thresh: float = 0.92) -> bool:
return SequenceMatcher(None, _norm(a), _norm(b)).ratio() >= thresh
def dedupe_cap_shorten(chunks, k=6, sentence_span=1, sim_threshold=0.92):
"""
- Remove near-duplicates
- Prefer short, sentence-level spans
- Keep original order and cap to K
"""
keep, seen = [], []
for c in chunks:
sents = split_sentences(c) or [c.strip()]
short = " ".join(sents[:sentence_span])
if any(_near_dup(short, prev, sim_threshold) for prev in seen):
continue
seen.append(short)
keep.append(short)
if len(keep) == k:
break
return keep
# ──────────────────────────────────────────────────────────────────────────────
# Message building with stable (cacheable) prefix + dynamic RAG block
# ──────────────────────────────────────────────────────────────────────────────
def make_messages(user_question: str, rag_chunks=None):
rag_chunks = rag_chunks or []
system_content = SYSTEM_BLOCK + "\n" + CONTEXT
if rag_chunks:
system_content += "\nRAG context:\n" + "\n".join(f"- {c}" for c in rag_chunks)
return [
{"role": "system", "content": system_content},
{"role": "user", "content": user_question}
]
def call_once(q, rag_chunks=None):
r = client.chat.completions.create(
model=MODEL,
messages=make_messages(q, rag_chunks),
temperature=0,
max_tokens=200,
)
u = r.usage
pt = getattr(u, "prompt_tokens", 0)
ct = getattr(u, "completion_tokens", 0)
est = getattr(u, "estimated_cost", None)
ptd = getattr(u, "prompt_tokens_details", None)
cached = 0
if ptd is not None:
# Works for both object- and dict-like payloads
cached = getattr(ptd, "cached_tokens", 0) or (
ptd.get("cached_tokens") if isinstance(ptd, dict) else 0
)
return {
"pt": pt,
"ct": ct,
"cached": cached,
"cost": float(est) if est is not None else None,
"content": r.choices[0].message.content,
}
# ──────────────────────────────────────────────────────────────────────────────
# DEMO — RAG cleanup + cold/warm cache comparison with provider-estimated cost
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Raw retrieval (has a duplicate and some wordiness)
raw_chunks = [
"Pricing depends on input/output token counts.",
"Pricing depends on input/output token counts.", # duplicate
"Cached input reduces spend when reused exactly. Long passages are pricey; prefer short spans.",
"Shorter system prompts cut cost.",
"De-duplicate near-identical passages to avoid paying for repeats.",
"Cap how many chunks you include.",
]
# Clean: dedupe → cap → sentence-level
ctx = dedupe_cap_shorten(raw_chunks, k=3, sentence_span=1)
# Cold call (builds cache)
cold = call_once("Summarize the policies in 3 bullets.", rag_chunks=ctx)
# Warm call (identical long prefix, different user tail)
warm = call_once("List 3 risks if these rules are ignored.", rag_chunks=ctx)
print("\n--- Responses ---")
print("Cold:", textwrap.shorten(cold["content"], width=140))
print("Warm:", textwrap.shorten(warm["content"], width=140))
print("\n--- Token & Cost (provider estimated) ---")
print(f"Cold → pt={cold['pt']}, ct={cold['ct']}, cached={cold['cached']}, est_cost={cold['cost']!r}")
print(f"Warm → pt={warm['pt']}, ct={warm['ct']}, cached={warm['cached']}, est_cost={warm['cost']!r}")
if cold["cost"] and warm["cost"]:
print(f"Savings: {100*(1 - warm['cost']/cold['cost']):.1f}%")
print("\n--- RAG Context Sent ---")
for i, c in enumerate(ctx, 1):
print(f"{i}. {c}")System output:
— Responses —
Cold: {“summary”:[“Cap summaries at 80 tokens”,”Lists max 5 items unless expanded”,”ISO dates, USD currency”]}
Warm: {“risks”:[“overspend”,”rate-limit”,”account-suspension”]}
— Token & Cost (provider estimated) —
Cold → pt=8549, ct=26, cached=7, est_cost=0.0043258
Warm → pt=8550, ct=17, cached=8536, est_cost=0.0034554
Savings: 20.1%
Each turn re-sends prior messages. Keep a rolling window (and optionally summarize older turns). The following example keeps the rolling window low by summarizing the history:
import textwrap
# ── 1) Rolling window by characters ───────────────────────────────────────────
def trim_history(messages, max_chars=3500):
"""
Keep the most-recent messages whose *cumulative* content length ≤ max_chars.
Preserves order and always starts from the newest going backwards.
"""
total, keep = 0, []
for m in reversed(messages):
total += len(m.get("content", ""))
if total <= max_chars:
keep.append(m)
else:
break
return list(reversed(keep))
# ── 2) Optional: summarize older messages outside the window ──────────────────
def summarize_older_turns(older_msgs, max_tokens=120):
"""
Compact the overflow into a short memory note you can prepend next turn.
Returns a plain-text summary string (or None if nothing to summarize).
"""
if not older_msgs:
return None
# Build a lightweight transcript to summarize
transcript = []
for m in older_msgs:
role = m.get("role", "user")
content = m.get("content", "")
transcript.append(f"{role.upper()}: {content}")
transcript_text = "\n".join(transcript)
sys_prompt = (
"You are a concise meeting scribe.\n"
"Summarize the transcript into 4-6 bullets capturing facts, decisions, action items, and numbers.\n"
"Be neutral. Keep ≤120 words. No introductions or conclusions."
)
messages = [
{"role": "system", "content": sys_prompt},
{"role": "user", "content": transcript_text},
]
r = client.chat.completions.create(
model=MODEL, messages=messages, temperature=0, max_tokens=max_tokens
)
return r.choices[0].message.content.strip()
# ── 3) Helper: call the model and print tokens/costs ──────────────────────────
def ask(messages, max_tokens=200, temperature=0):
r = client.chat.completions.create(
model=MODEL,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
u = r.usage
pt = getattr(u, "prompt_tokens", 0)
ct = getattr(u, "completion_tokens", 0)
est = getattr(u, "estimated_cost", None)
ptd = getattr(u, "prompt_tokens_details", None)
cached = 0
if ptd is not None:
cached = getattr(ptd, "cached_tokens", 0) or (
ptd.get("cached_tokens") if isinstance(ptd, dict) else 0
)
print(f"→ pt={pt}, ct={ct}, cached={cached}, est_cost={float(est) if est is not None else None}")
return r
# ── 4) Demo: maintain history across turns with trimming + optional summary ───
INSTRUCTIONS = (
"You are a helpful assistant. Be brief but clear. "
"If asked for steps, use short numbered lists."
)
# Seed a long conversation to trigger trimming
history = [
{"role": "system", "content": INSTRUCTIONS},
{"role": "user", "content": "We’re planning a launch. Product: Atlas. Date: 2025-12-01. Goals: awareness, signups."},
{"role": "assistant", "content": "Got it. What’s the target audience and key channels?"},
{"role": "user", "content": "Audience: data engineers in EU. Channels: blog, webinar, LinkedIn. Budget 25k EUR."},
{"role": "assistant", "content": "Noted. Do we have a feature list or pricing tiers yet?"},
{"role": "user", "content": "Features: real-time sync, schema inference, alerting. Pricing TBD; freemium likely."},
]
# Add some filler to exceed char budget (simulate long chit-chat)
for i in range(8):
history.append({"role": "assistant", "content": f"Checkpoint {i}: recapping progress and minor notes..."})
history.append({"role": "user", "content": f"More detail on webinar plan {i}: speakers, agenda, length, CTA, and follow-up emails."})
# New user turn arrives
history.append({"role": "user", "content": "Draft a tight 5-bullet launch checklist for Atlas."})
# Split into (older overflow) + (kept tail)
MAX_CHARS = 3500
kept = trim_history(history, max_chars=MAX_CHARS)
# Determine what got dropped and summarize it (optional)
dropped_len = len(history) - len(kept)
older = history[: max(0, len(history) - len(kept))]
summary_note = summarize_older_turns(older) if dropped_len > 0 else None
# Build final messages for this turn:
final_messages = []
final_messages.append({"role": "system", "content": INSTRUCTIONS})
if summary_note:
final_messages.append({"role": "system", "content": "Conversation summary (memory):\n" + summary_note})
# Append the kept tail *excluding* the original system (we just re-added it)
for m in kept:
if m["role"] == "system":
continue
final_messages.append(m)
# Ask the model with the trimmed+summarized context
resp = ask(final_messages, max_tokens=180, temperature=0)
print(textwrap.shorten(resp.choices[0].message.content, width=200))
# ── 5) Next turn example: keep summary & window rolling ───────────────────────
# User follows up; we append and repeat the process.
history = kept # pretend we persisted; start from trimmed tail
if summary_note:
# store your summary somewhere persistent and reuse on next turn
pass
history.append({"role": "assistant", "content": resp.choices[0].message.content})
history.append({"role": "user", "content": "Great. Turn that into a 1-week timeline with owners."})
# Re-trim window
kept2 = trim_history(history, max_chars=MAX_CHARS)
final2 = [{"role": "system", "content": INSTRUCTIONS}]
if summary_note:
final2.append({"role": "system", "content": "Conversation summary (memory):\n" + summary_note})
for m in kept2:
if m["role"] == "system":
continue
final2.append(m)
resp2 = ask(final2, max_tokens=200, temperature=0)
print(textwrap.shorten(resp2.choices[0].message.content, width=200))→ pt=519, ct=180, cached=13, est_cost=0.0006182000000000001
Atlas 5-bullet launch checklist (EU data engineers, 25 k€, 1 Dec) 1. Webinar locked: 45 min, 2 speakers (CTO + senior DE), agenda “Live schema drift demo + Q&A”, CTA = freemium signup, 3-email […]
→ pt=723, ct=200, cached=698, est_cost=0.0006917000000000001
Atlas launch – 1-week sprint (25 Nov → 1 Dec) Mon 25 – Webinar: finalize speakers, dry-run, push save-seat email – Owner: PMM – Paid ads: upload creatives, set CPL cap – Owner: Growth Tue 26 – […]
If we do the same without trimming the history, we see how the input tokes grow fast and stack up to increase the overall cost:
def ask(messages, max_tokens=200, temperature=0):
r = client.chat.completions.create(
model=MODEL,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
u = r.usage
pt = getattr(u, "prompt_tokens", 0)
ct = getattr(u, "completion_tokens", 0)
est = getattr(u, "estimated_cost", None)
ptd = getattr(u, "prompt_tokens_details", None)
cached = 0
if ptd is not None:
cached = getattr(ptd, "cached_tokens", 0) or (
ptd.get("cached_tokens") if isinstance(ptd, dict) else 0
)
print(f"→ pt={pt}, ct={ct}, cached={cached}, est_cost={float(est) if est is not None else None}")
return r.choices[0].message.content
# seed full history (no trimming)
INSTRUCTIONS = (
"You are a helpful assistant. Be brief but clear. "
"If asked for steps, use short numbered lists."
)
history = [
{"role": "system", "content": SYSTEM_BLOCK + "\n" + CONTEXT + "\nConversation starts now."},
{"role": "user", "content": "We’re planning a launch. Product: Atlas. Date: 2025-12-01. Goals: awareness, signups."},
{"role": "assistant", "content": "Noted. Who’s the audience and channels?"},
{"role": "user", "content": "Audience: data engineers in EU. Channels: blog, webinar, LinkedIn. Budget 25k EUR."},
{"role": "assistant", "content": "Understood. Any features or pricing?"},
{"role": "user", "content": "Features: real-time sync, schema inference, alerting. Pricing TBD; freemium likely."},
]
# turn 1 (no rolling window — we send the entire history)
history.append({"role": "user", "content": "Draft a tight 5-bullet launch checklist for Atlas."})
resp1 = ask(history, max_tokens=200, temperature=0)
print(textwrap.shorten(resp1, width=160))
history.append({"role": "assistant", "content": resp1})
# turn 2 (history keeps growing; caching should be high for the long stable prefix)
history.append({"role": "user", "content": "Great. Turn that into a 1-week timeline with owners."})
resp2 = ask(history, max_tokens=200, temperature=0)
print(textwrap.shorten(resp2, width=160))
history.append({"role": "assistant", "content": resp2})System output:
→ pt=8634, ct=71, cached=7, est_cost=0.0044583
{“checklist”:[“Finalize freemium tier & EU pricing by Oct-15″,”Publish tech blog w/ real-time sync demo Oct-25”,”Announce LinkedIn ad campaign (20k€) […]
→ pt=8727, ct=63, cached=8691, est_cost=0.0036204
{“week”:{“Mon”:{“task”:”Pricing”,”owner”:”PM”},”Tue”:{“task”:”Blog draft”,”owner”:”Content”},”Wed”:{“task”:”Webinar […]
We see that the costs for this example are a lot higher, since so many prompt tokens have been sent in which drive costs.
| Scenario (2 calls) | Total prompt tokens (pt) | Cached pt (sum) | Uncached pt (sum) | Completion tokens (ct) | Total est. cost |
| Trim + summarize | 1,242 | 711 | 531 | 380 | $0.0013099 |
| No trim (full history) | 17,361 | 8,698 | 8,663 | 134 | $0.0080787 |
Cache hit ≠ free. Only input tokens benefit from caching; completions are always billed. That’s why a longer reply can offset some cache savings.
If you log token usage on every request, sudden cost or latency spikes become easy to spot. A tiny CSV is enough: record each call’s prompt and completion tokens and also add the estimated cost to make it straightforward where the costs are rising too high. By this, you can improve your deployment over time to find the most cost efficient setup.
The example snippet does exactly that. It calls the moonshotai/Kimi-K2-Instruct-0905 model, writes a deepinfra_usage.csv file with the columns ts, model, prompt_tokens, completion_tokens, and estimated_cost_usd, then prints the model’s response, logs the usage, and finally shows the raw usage payload returned by the provider. With this in place, you get immediate, low-friction visibility into how your prompts behave in production—and a fast path to roll back or fix anything that starts drifting.
# pip install openai
import os, csv, time, pathlib
# --- Config ---
MODEL = "moonshotai/Kimi-K2-Instruct-0905"
LOG_PATH = pathlib.Path("deepinfra_usage.csv")
def ask_capped(messages, max_tokens=40, temperature=0):
"""
Minimal wrapper that returns the full response object.
"""
return client.chat.completions.create(
model=MODEL,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
def log_usage(resp, model):
"""
Log usage.prompt_tokens, usage.completion_tokens, and estimated_cost (if present)
to a CSV file: deepinfra_usage.csv
"""
u = resp.usage
# Some providers return `estimated_cost`; if not, this will be None
est = getattr(u, "estimated_cost", None)
row = [time.time(), model, getattr(u, "prompt_tokens", 0), getattr(u, "completion_tokens", 0), est]
new = not LOG_PATH.exists()
with LOG_PATH.open("a", newline="") as f:
w = csv.writer(f)
if new:
w.writerow(["ts","model","prompt_tokens","completion_tokens","estimated_cost_usd"])
w.writerow(row)
# --- Example run ---
if __name__ == "__main__":
# 1) Simple one-liner request
msgs = [{"role":"user","content":"One sentence on token pricing."}]
r = ask_capped(msgs, max_tokens=40)
print(r.choices[0].message.content)
# 2) Log usage
log_usage(r, MODEL)
# 3) Inspect the provider's usage payload (may vary by SDK/provider)
# Not all SDK objects expose __dict__; fall back to a safe projection
try:
print(r.usage.__dict__) # works for many providers
except Exception:
u = r.usage
print({
"prompt_tokens": getattr(u, "prompt_tokens", None),
"completion_tokens": getattr(u, "completion_tokens", None),
"total_tokens": getattr(u, "total_tokens", None),
"estimated_cost": getattr(u, "estimated_cost", None),
"prompt_tokens_details": getattr(u, "prompt_tokens_details", None),
})
# 4) Optional: show the last log row we just wrote
try:
with LOG_PATH.open() as f:
*_, last = f.read().strip().splitlines()
print("Last log row:", last)
except Exception:
passSystem output:
{‘completion_tokens’: 28, ‘prompt_tokens’: 35, ‘total_tokens’: 63, ‘completion_tokens_details’: None, ‘prompt_tokens_details’: PromptTokensDetails(audio_tokens=None, cached_tokens=25, cache_write_tokens=None)}
Last log row: 1762754793.640675,moonshotai/Kimi-K2-Instruct-0905,35,28,7.099999999999999e-05
Before you ship a notebook or app to teammates, add simple budget guardrails so costs never creep up unnoticed. The wrapper below enforces two safety nets: a per-call cap (warns when a single request is pricier than expected) and a session cap (warns when your running total crosses a limit). It also trims history to keep inputs lean and logs every call’s usage to CSV for easy auditing. Drop it in, set your caps, and you’ll catch regressions the moment they happen instead of at the end of the month.
# pip install openai
import os, csv, time, pathlib, textwrap
# Minimal caller that returns the full response
def ask_capped(messages, *, model=MODEL, max_tokens=220, temperature=0):
return client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
# Budget guardrails
SESSION_BUDGET = 5.00 # dollars for this notebook/app session
session_spend = 0.0
def ask_budget(messages, *, model=MODEL, max_tokens=220, cost_cap=0.02):
"""
Make a call with budget guardrails:
- warn if this call exceeds `cost_cap`
- warn if running `session_spend` exceeds `SESSION_BUDGET`
"""
global session_spend
# (Optional) keep inputs lean
trimmed = trim_history(messages)
# Make the request
resp = ask_capped(trimmed, model=model, max_tokens=max_tokens)
# Extract usage & cost
u = resp.usage
est = getattr(u, "estimated_cost", None)
# Per-call warning
if isinstance(est, (int, float)):
if est > cost_cap:
print(f"[WARN] Call ${est:.5f} exceeded cap ${cost_cap:.5f}. Consider lowering max_tokens or trimming context.")
# Session accumulation + warning
session_spend += est
if session_spend > SESSION_BUDGET:
print(f"[WARN] Session budget hit: ${session_spend:.2f} > ${SESSION_BUDGET:.2f}")
else:
# If the provider doesn’t return estimated_cost, still show tokens
print("[INFO] Provider did not return estimated_cost; relying on token counts.")
print(f"→ pt={getattr(u,'prompt_tokens',None)}, ct={getattr(u,'completion_tokens',None)}")
return resp
# Example
msgs = [{"role":"user","content":"Write exactly 2 sentences about cached input pricing."}]
r = ask_budget(msgs, max_tokens=60, cost_cap=0.005)
print(textwrap.shorten(r.choices[0].message.content, width=160))
print(r)
print(session_spend)System Output:
Cached input pricing is a discount applied when a model re-uses tokens it has recently seen, so you pay less for repeated content. The exact savings […]
ChatCompletion(id=’chatcmpl-RUhhg8Cx2zBGjfh0UuO6i4ij’, choices=[Choice(finish_reason=’stop’, index=0, logprobs=None, message=ChatCompletionMessage(content=’Cached input pricing is a discount applied when a model re-uses tokens it has recently seen, so you pay less for repeated content. The exact savings depend on the provider and recency window, but reductions of 50–90% per cached token are common.’, refusal=None, role=’assistant’, annotations=None, audio=None, function_call=None, tool_calls=None, reasoning_content=None, name=None))], created=1762755796, model=’moonshotai/Kimi-K2-Instruct-0905′, object=’chat.completion’, service_tier=None, system_fingerprint=None, usage=CompletionUsage(completion_tokens=54, prompt_tokens=39, total_tokens=93, completion_tokens_details=None, prompt_tokens_details=PromptTokensDetails(audio_tokens=None, cached_tokens=38, cache_write_tokens=None), estimated_cost=0.0001237))
0.0001237
Before you lock in a provider or SKU, cross-check your unit economics against benchmark views of speed vs. price—they reveal whether a “cheap” route is actually slow (and costly in wall-clock and concurrency). If you run long contexts or multi-step agents, dig into our guides on KV-cache precision and paged attention; together they dramatically expand usable context at the same spend. And for teams at scale, expose precision knobs in your own API (or choose SKUs that do) so you can programmatically pick the cheapest mode that still passes your evals—e.g., default to int8 + int8 KV, auto-promote to bf16 only when quality triggers demand it.
Pricing for LLMs isn’t mysterious—it’s napkin math. Once you know your tokens in and out and your provider’s $/M rates, you can forecast confidently, tune prompts to hit a target budget, and choose the most economical route that still meets your UX bar. The biggest levers are simple: trim and cache system prompts, cap conversation history, send fewer/better RAG chunks, gate output length, keep tool payloads lean, and prefer the smallest/lowest-precision model that passes your evals. Pair that with a quick speed-vs-price check and a tiny, representative eval suite, and you’ll keep costs predictable while your app stays fast and delightful.
Guaranteed JSON output on Open-Source LLMs.DeepInfra is proud to announce that we have released "JSON mode" across all of our text language models. It is available through the "response_format" object, which currently supports only {"type": "json_object"}
Our JSON mode will guarantee that all tokens returned in the output of a langua...
Chat with books using DeepInfra and LlamaIndexAs DeepInfra, we are excited to announce our integration with LlamaIndex.
LlamaIndex is a powerful library that allows you to index and search documents
using various language models and embeddings. In this blog post, we will show
you how to chat with books using DeepInfra and LlamaIndex.
We will ...
From Precision to Quantization: A Practical Guide to Faster, Cheaper LLMs<p>Large language models live and die by numbers—literally trillions of them. How finely we store those numbers (their precision) determines how much memory a model needs, how fast it runs, and sometimes how good its answers are. This article walks from the basics to the deep end: we’ll start with how computers even store a […]</p>
© 2026 Deep Infra. All rights reserved.