You have optimized your Claude Code costs. You have tuned your CLAUDE.md, pruned your context, and chosen the right model for the task. Now here is the uncomfortable follow-up question: how do you actually know any of that is working?
This is where observability comes in — and it is a completely different problem from optimization. Optimization is about changing what Claude Code does. Observability is about being able to see what it is doing, in enough detail to diagnose problems, measure outcomes, and alert on anomalies.
In 2026, most Claude Code teams are flying blind. They know their monthly bill. They do not know which sessions drove 40% of that bill, which tool calls are failing silently, how often context is getting compacted (and what is being lost), or whether their CLAUDE.md changes last week actually improved anything. That is the gap this guide addresses.
What “Monitoring Claude Code” Actually Means
Before jumping to implementation, it helps to be precise about what you want to observe. Claude Code generates telemetry at several distinct layers:
Session-level: Total tokens in/out, model used, session duration, number of turns, compaction count.
Turn-level: Per-message token counts, tool calls made, tool success/failure, latency.
Tool-level: Which tools ran, arguments passed, output size, exit codes, error messages.
Cost-level: Derived from token counts and the current price sheet. Needs to be computed client-side since Anthropic does not expose per-session costs via the API.
Error-level: Tool failures, API errors, permission denials, context overflow events.
You probably do not need all of this immediately. A useful starting point: track session cost, tool failure rate, and compaction frequency. These three metrics alone will surface 80% of the problems worth fixing.
Implementation Layer 1: Hooks-Based Telemetry
Claude Code’s hooks system is the cleanest interception point for telemetry. Hooks fire at specific lifecycle events, receive structured JSON on stdin, and can write to any destination — a local SQLite database, a Postgres instance, a remote API, or a flat file you tail later.
The hooks you want for monitoring:
| Hook Event | What to Capture |
|---|---|
SessionStart | session_id, timestamp, model, working directory |
PreToolUse | tool_name, tool_input (sanitized), timestamp |
PostToolUse | tool_name, output_size, duration, exit_code |
PostToolUseFailure | tool_name, error_message, retry_count |
Stop | session token totals, turn count, final cost estimate |
PreCompact | context_size_before, compaction_reason |
PostCompact | context_size_after, tokens_dropped |
Minimal Telemetry Hook Setup
Here is a working Python hook that captures session events to a local SQLite database. Drop this into your project’s .claude/hooks/ directory:
#!/usr/bin/env python3
# .claude/hooks/telemetry.py
"""
Minimal Claude Code telemetry hook.
Captures session events to ~/.claude/telemetry.db
Usage: configure in settings.json under the relevant hook events.
"""
from __future__ import annotations
import json
import sqlite3
import sys
import time
from datetime import datetime
from pathlib import Path
DB_PATH = Path.home() / ".claude" / "telemetry.db"
SCHEMA = """
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
session_id TEXT,
timestamp TEXT NOT NULL,
tool_name TEXT,
tool_input_size INTEGER,
tool_output_size INTEGER,
duration_ms INTEGER,
exit_code INTEGER,
error_message TEXT,
tokens_input INTEGER,
tokens_output INTEGER,
model TEXT,
raw_json TEXT
);
"""
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.execute(SCHEMA)
conn.commit()
return conn
def main() -> None:
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
event_type = data.get("event", "unknown")
session_id = data.get("session_id")
timestamp = datetime.utcnow().isoformat()
record: dict = {
"event_type": event_type,
"session_id": session_id,
"timestamp": timestamp,
"raw_json": raw,
}
# Extract tool-specific fields
if event_type in ("PreToolUse", "PostToolUse", "PostToolUseFailure"):
tool = data.get("tool_use", {})
record["tool_name"] = tool.get("name")
inp = tool.get("input", {})
record["tool_input_size"] = len(json.dumps(inp))
if event_type == "PostToolUse":
result = data.get("tool_result", {})
output = result.get("output", "")
record["tool_output_size"] = len(str(output))
record["exit_code"] = result.get("exit_code", 0)
if event_type == "PostToolUseFailure":
record["error_message"] = data.get("error", "")
record["exit_code"] = data.get("exit_code", 1)
# Extract Stop-event token totals (if available in your Claude version)
if event_type == "Stop":
usage = data.get("usage", {})
record["tokens_input"] = usage.get("input_tokens")
record["tokens_output"] = usage.get("output_tokens")
record["model"] = data.get("model")
conn = get_db()
cols = ", ".join(record.keys())
placeholders = ", ".join("?" for _ in record)
conn.execute(
f"INSERT INTO events ({cols}) VALUES ({placeholders})",
list(record.values()),
)
conn.commit()
conn.close()
# Always exit 0 — never block execution for telemetry
sys.exit(0)
if __name__ == "__main__":
main()
Register this hook in settings.json:
{
"hooks": {
"SessionStart": [
{
"type": "command",
"command": "python3 .claude/hooks/telemetry.py"
}
],
"PostToolUse": [
{
"type": "command",
"command": "python3 .claude/hooks/telemetry.py"
}
],
"PostToolUseFailure": [
{
"type": "command",
"command": "python3 .claude/hooks/telemetry.py"
}
],
"Stop": [
{
"type": "command",
"command": "python3 .claude/hooks/telemetry.py"
}
],
"PreCompact": [
{
"type": "command",
"command": "python3 .claude/hooks/telemetry.py"
}
],
"PostCompact": [
{
"type": "command",
"command": "python3 .claude/hooks/telemetry.py"
}
]
}
}
One important detail: hooks must never block. If your telemetry database is unavailable, your hook should fail silently and exit 0. The Claude Code session should never hang waiting for a monitoring system.
Cost Estimation in the Stop Hook
Claude Code does not return a dollar cost in the Stop event — you compute it. Here is a helper function to add to your telemetry hook:
# Claude API pricing as of May 2026 — update when Anthropic changes rates
PRICING = {
"claude-opus-4-6": {"input": 15.0, "output": 75.0}, # per 1M tokens
"claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
"claude-haiku-4": {"input": 0.25, "output": 1.25},
# Add new models as they release
}
def estimate_cost_usd(
model: str,
input_tokens: int,
output_tokens: int,
cached_input_tokens: int = 0,
) -> float:
"""
Estimate session cost in USD.
Cached input tokens are billed at 10% of the standard input rate.
"""
rates = PRICING.get(model, PRICING["claude-sonnet-4-6"])
cache_write_cost = (cached_input_tokens / 1_000_000) * rates["input"] * 0.25
cache_read_cost = (cached_input_tokens / 1_000_000) * rates["input"] * 0.10
fresh_input_cost = ((input_tokens - cached_input_tokens) / 1_000_000) * rates["input"]
output_cost = (output_tokens / 1_000_000) * rates["output"]
return cache_write_cost + cache_read_cost + fresh_input_cost + output_cost
Note the cache read discount: tokens served from prompt cache are billed at roughly 10% of the standard input rate. If you are using persistent CLAUDE.md and system prompts (which you should be), your effective input cost is meaningfully lower than the headline rate.
Implementation Layer 2: JSONL Transcript Parsing
Claude Code writes every session to a JSONL transcript file. The default location is:
~/.claude/projects/<project-hash>/sessions/<session-id>.jsonl
Each line is a complete JSON object representing one event in the session. This is a goldmine for retrospective analysis — you can replay what happened in any session, count tool calls, extract error patterns, and compute token totals.
Reading the JSONL Format
#!/usr/bin/env python3
# parse_transcripts.py
"""
Parse Claude Code JSONL transcripts for analysis.
Usage: python3 parse_transcripts.py [session_id]
Omit session_id to analyze the most recent session.
"""
from __future__ import annotations
import json
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
SESSIONS_BASE = Path.home() / ".claude" / "projects"
@dataclass
class SessionStats:
session_id: str
model: str = ""
turns: int = 0
tool_calls: int = 0
tool_failures: int = 0
compactions: int = 0
tokens_input: int = 0
tokens_output: int = 0
tokens_cache_read: int = 0
tokens_cache_write: int = 0
tool_call_counts: dict = field(default_factory=dict)
tool_failure_messages: list = field(default_factory=list)
@property
def tool_failure_rate(self) -> float:
if self.tool_calls == 0:
return 0.0
return self.tool_failures / self.tool_calls
@property
def estimated_cost_usd(self) -> float:
rates = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
inp_rate, out_rate = rates.get(self.model, (3.0, 15.0))
fresh_input = self.tokens_input - self.tokens_cache_read
cost = (
(fresh_input / 1_000_000) * inp_rate
+ (self.tokens_cache_read / 1_000_000) * inp_rate * 0.10
+ (self.tokens_cache_write / 1_000_000) * inp_rate * 0.25
+ (self.tokens_output / 1_000_000) * out_rate
)
return cost
def find_latest_session() -> Optional[Path]:
"""Find the most recently modified session JSONL file."""
all_sessions = list(SESSIONS_BASE.rglob("*.jsonl"))
if not all_sessions:
return None
return max(all_sessions, key=lambda p: p.stat().st_mtime)
def parse_session(path: Path) -> SessionStats:
session_id = path.stem
stats = SessionStats(session_id=session_id)
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
event_type = event.get("type", "")
if event_type == "assistant":
stats.turns += 1
# Extract token usage if present
usage = event.get("usage", {})
stats.tokens_input += usage.get("input_tokens", 0)
stats.tokens_output += usage.get("output_tokens", 0)
stats.tokens_cache_read += usage.get("cache_read_input_tokens", 0)
stats.tokens_cache_write += usage.get("cache_creation_input_tokens", 0)
if not stats.model and event.get("model"):
stats.model = event["model"]
elif event_type == "tool_use":
tool_name = event.get("name", "unknown")
stats.tool_calls += 1
stats.tool_call_counts[tool_name] = (
stats.tool_call_counts.get(tool_name, 0) + 1
)
elif event_type == "tool_result":
if event.get("is_error"):
stats.tool_failures += 1
content = event.get("content", "")
if isinstance(content, list):
msg = " ".join(
c.get("text", "") for c in content if isinstance(c, dict)
)
else:
msg = str(content)
stats.tool_failure_messages.append(msg[:200])
elif event_type == "compaction":
stats.compactions += 1
return stats
def print_report(stats: SessionStats) -> None:
print(f"\n=== Session: {stats.session_id} ===")
print(f"Model: {stats.model or 'unknown'}")
print(f"Turns: {stats.turns}")
print(f"Tool calls: {stats.tool_calls}")
print(f"Tool failures: {stats.tool_failures} ({stats.tool_failure_rate:.1%})")
print(f"Compactions: {stats.compactions}")
print(f"\nTokens — input: {stats.tokens_input:,}")
print(f"Tokens — output: {stats.tokens_output:,}")
print(f"Tokens — cache r: {stats.tokens_cache_read:,}")
print(f"Tokens — cache w: {stats.tokens_cache_write:,}")
print(f"\nEstimated cost: ${stats.estimated_cost_usd:.4f}")
if stats.tool_call_counts:
print("\nTool call breakdown:")
for name, count in sorted(
stats.tool_call_counts.items(), key=lambda x: -x[1]
):
print(f" {name:<30} {count}")
if stats.tool_failure_messages:
print(f"\nFirst {min(5, len(stats.tool_failure_messages))} tool failures:")
for msg in stats.tool_failure_messages[:5]:
print(f" - {msg}")
def main() -> None:
if len(sys.argv) > 1:
session_id = sys.argv[1]
matches = list(SESSIONS_BASE.rglob(f"{session_id}.jsonl"))
if not matches:
print(f"Session {session_id} not found", file=sys.stderr)
sys.exit(1)
path = matches[0]
else:
path = find_latest_session()
if not path:
print("No sessions found", file=sys.stderr)
sys.exit(1)
stats = parse_session(path)
print_report(stats)
if __name__ == "__main__":
main()
Running this against a real session gives you output like:
=== Session: 550e8400-e29b-41d4-a716-446655440000 ===
Model: claude-sonnet-4-6
Turns: 34
Tool calls: 89
Tool failures: 7 (7.9%)
Compactions: 2
Tokens — input: 284,193
Tokens — output: 18,442
Tokens — cache r: 241,764
Tokens — cache w: 42,429
Estimated cost: $0.1847
Tool call breakdown:
Bash 52
Read 23
Edit 11
Glob 3
First 5 tool failures:
- Command 'npm test' failed with exit code 1: ...
- Permission denied: cannot write to /etc/hosts
- File not found: src/components/Header.tsx
The 7.9% tool failure rate and 2 compactions in a single session are signals worth investigating. A healthy session should have under 3% tool failures; repeated compactions often indicate a context management problem.
Batch Analysis Across Sessions
For trend analysis, run the parser across all sessions in a project:
#!/bin/bash
# analyze_project_sessions.sh
# Usage: ./analyze_project_sessions.sh <project-directory>
PROJECT_DIR="${1:-.}"
PROJECT_HASH=$(echo -n "$PROJECT_DIR" | md5sum | cut -d' ' -f1)
SESSIONS_DIR="$HOME/.claude/projects/$PROJECT_HASH"
if [ ! -d "$SESSIONS_DIR" ]; then
echo "No sessions found for project: $PROJECT_DIR"
exit 1
fi
echo "Analyzing sessions in: $SESSIONS_DIR"
echo ""
total_cost=0
total_sessions=0
total_failures=0
total_tools=0
for session_file in "$SESSIONS_DIR"/*.jsonl; do
[ -f "$session_file" ] || continue
result=$(python3 parse_transcripts.py "$(basename "$session_file" .jsonl)" 2>/dev/null)
if [ $? -eq 0 ]; then
cost=$(echo "$result" | grep "Estimated cost" | awk '{print $3}' | tr -d '$')
failures=$(echo "$result" | grep "Tool failures" | awk '{print $3}')
tools=$(echo "$result" | grep "Tool calls" | awk '{print $3}')
total_cost=$(echo "$total_cost + $cost" | bc)
total_sessions=$((total_sessions + 1))
total_failures=$((total_failures + failures))
total_tools=$((total_tools + tools))
fi
done
echo "Project summary:"
echo " Sessions analyzed: $total_sessions"
echo " Total estimated cost: \$$(printf '%.4f' $total_cost)"
echo " Total tool calls: $total_tools"
echo " Total tool failures: $total_failures"
if [ $total_tools -gt 0 ]; then
failure_pct=$(echo "scale=1; $total_failures * 100 / $total_tools" | bc)
echo " Failure rate: ${failure_pct}%"
fi
Implementation Layer 3: LangSmith Integration
If your team already uses LangSmith for LLM observability, you can route Claude Code telemetry directly into it. This gives you a unified trace view across all your LLM applications.
The integration point is the Stop hook. After each session, post the session stats to LangSmith’s Runs API:
#!/usr/bin/env python3
# .claude/hooks/langsmith_export.py
"""
Export Claude Code session data to LangSmith.
Requires: LANGSMITH_API_KEY environment variable.
Hook event: Stop
"""
from __future__ import annotations
import json
import os
import sys
import uuid
from datetime import datetime, timezone
import urllib.request
import urllib.error
LANGSMITH_API_KEY = os.environ.get("LANGSMITH_API_KEY", "")
LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT", "claude-code-monitoring")
LANGSMITH_BASE_URL = "https://api.smith.langchain.com"
def post_run(run_data: dict) -> None:
url = f"{LANGSMITH_BASE_URL}/runs"
payload = json.dumps(run_data).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={
"Content-Type": "application/json",
"x-api-key": LANGSMITH_API_KEY,
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5) as resp:
_ = resp.read()
except urllib.error.URLError:
# Never block the session for telemetry failures
pass
def main() -> None:
if not LANGSMITH_API_KEY:
sys.exit(0)
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
usage = data.get("usage", {})
model = data.get("model", "claude-sonnet-4-6")
session_id = data.get("session_id", str(uuid.uuid4()))
run_data = {
"id": session_id,
"name": f"claude-code-session",
"run_type": "llm",
"start_time": data.get("session_start_time", datetime.now(timezone.utc).isoformat()),
"end_time": datetime.now(timezone.utc).isoformat(),
"inputs": {"session_id": session_id},
"outputs": {
"turns": data.get("turns", 0),
"tool_calls": data.get("tool_calls", 0),
"tool_failures": data.get("tool_failures", 0),
"compactions": data.get("compactions", 0),
},
"extra": {
"metadata": {
"model": model,
"project": LANGSMITH_PROJECT,
},
"runtime": {
"library": "claude-code",
},
},
"token_usage": {
"prompt_tokens": usage.get("input_tokens", 0),
"completion_tokens": usage.get("output_tokens", 0),
"total_tokens": (
usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
),
},
"tags": ["claude-code", LANGSMITH_PROJECT],
}
post_run(run_data)
sys.exit(0)
if __name__ == "__main__":
main()
Once this is running, you can see token usage per session in the LangSmith dashboard, set up threshold alerts, and compare across projects.
Implementation Layer 4: Langfuse Integration
Langfuse is a strong alternative to LangSmith, especially if you prefer self-hosting. The integration is similar but uses Langfuse’s tracing API:
#!/usr/bin/env python3
# .claude/hooks/langfuse_export.py
"""
Export Claude Code session data to Langfuse.
Requires: LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY environment variables.
Optional: LANGFUSE_HOST (defaults to cloud.langfuse.com)
Hook event: Stop
"""
from __future__ import annotations
import base64
import json
import os
import sys
import uuid
from datetime import datetime, timezone
import urllib.request
import urllib.error
LANGFUSE_SECRET_KEY = os.environ.get("LANGFUSE_SECRET_KEY", "")
LANGFUSE_PUBLIC_KEY = os.environ.get("LANGFUSE_PUBLIC_KEY", "")
LANGFUSE_HOST = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
def get_auth_header() -> str:
credentials = f"{LANGFUSE_PUBLIC_KEY}:{LANGFUSE_SECRET_KEY}"
encoded = base64.b64encode(credentials.encode()).decode()
return f"Basic {encoded}"
def ingest_trace(trace_data: dict) -> None:
url = f"{LANGFUSE_HOST}/api/public/ingestion"
payload = json.dumps({
"batch": [
{
"id": str(uuid.uuid4()),
"type": "trace-create",
"timestamp": datetime.now(timezone.utc).isoformat(),
"body": trace_data,
}
]
}).encode("utf-8")
req = urllib.request.Request(
url,
data=payload,
headers={
"Content-Type": "application/json",
"Authorization": get_auth_header(),
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5) as resp:
_ = resp.read()
except urllib.error.URLError:
pass
def main() -> None:
if not LANGFUSE_SECRET_KEY or not LANGFUSE_PUBLIC_KEY:
sys.exit(0)
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
usage = data.get("usage", {})
session_id = data.get("session_id", str(uuid.uuid4()))
trace_data = {
"id": session_id,
"name": "claude-code-session",
"userId": os.environ.get("USER", "unknown"),
"metadata": {
"model": data.get("model"),
"working_directory": data.get("cwd"),
},
"tags": ["claude-code"],
"usage": {
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0),
"total": (
usage.get("input_tokens", 0) + usage.get("output_tokens", 0)
),
"unit": "TOKENS",
},
"output": {
"turns": data.get("turns", 0),
"compactions": data.get("compactions", 0),
"tool_failure_rate": data.get("tool_failure_rate", 0.0),
},
}
ingest_trace(trace_data)
sys.exit(0)
if __name__ == "__main__":
main()
Both LangSmith and Langfuse support cost tracking, session comparison, and alerting. The choice between them mostly comes down to whether you want managed SaaS (LangSmith) or the ability to self-host with full data ownership (Langfuse).
Cost Alerts
Passive monitoring is fine; active alerting is better. Here is a complete cost alert system that sends you a notification (email or Slack) when a session exceeds a cost threshold.
Session Cost Alert Hook
#!/usr/bin/env python3
# .claude/hooks/cost_alert.py
"""
Alert when a session exceeds the cost threshold.
Sends to Slack webhook or logs a warning depending on configuration.
Environment variables:
CLAUDE_COST_ALERT_THRESHOLD — float, USD (default: 0.50)
SLACK_WEBHOOK_URL — optional Slack webhook for alerts
CLAUDE_ALERT_LOG — path to write alert log (default: ~/.claude/cost_alerts.log)
"""
from __future__ import annotations
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime, timezone
from pathlib import Path
THRESHOLD = float(os.environ.get("CLAUDE_COST_ALERT_THRESHOLD", "0.50"))
SLACK_WEBHOOK = os.environ.get("SLACK_WEBHOOK_URL", "")
ALERT_LOG = Path(os.environ.get("CLAUDE_ALERT_LOG", Path.home() / ".claude" / "cost_alerts.log"))
PRICING = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
def compute_cost(model: str, usage: dict) -> float:
inp_rate, out_rate = PRICING.get(model, (3.0, 15.0))
fresh_input = usage.get("input_tokens", 0) - usage.get("cache_read_input_tokens", 0)
cost = (
(max(0, fresh_input) / 1_000_000) * inp_rate
+ (usage.get("cache_read_input_tokens", 0) / 1_000_000) * inp_rate * 0.10
+ (usage.get("cache_creation_input_tokens", 0) / 1_000_000) * inp_rate * 0.25
+ (usage.get("output_tokens", 0) / 1_000_000) * out_rate
)
return cost
def send_slack_alert(message: str) -> None:
if not SLACK_WEBHOOK:
return
payload = json.dumps({"text": message}).encode("utf-8")
req = urllib.request.Request(
SLACK_WEBHOOK,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5) as resp:
_ = resp.read()
except urllib.error.URLError:
pass
def log_alert(message: str) -> None:
ALERT_LOG.parent.mkdir(parents=True, exist_ok=True)
with ALERT_LOG.open("a") as f:
f.write(f"{datetime.now(timezone.utc).isoformat()} {message}\n")
def main() -> None:
raw = sys.stdin.read()
try:
data = json.loads(raw)
except json.JSONDecodeError:
sys.exit(0)
usage = data.get("usage", {})
model = data.get("model", "claude-sonnet-4-6")
session_id = data.get("session_id", "unknown")
cost = compute_cost(model, usage)
if cost < THRESHOLD:
sys.exit(0)
message = (
f":rotating_light: Claude Code cost alert: session {session_id[:8]}... "
f"cost ${cost:.4f} (threshold: ${THRESHOLD:.2f}). "
f"Model: {model}, "
f"Input: {usage.get('input_tokens', 0):,} tokens, "
f"Output: {usage.get('output_tokens', 0):,} tokens."
)
log_alert(message)
send_slack_alert(message)
# Print to stderr so Claude sees the warning (exit 0 to not block)
print(message, file=sys.stderr)
sys.exit(0)
if __name__ == "__main__":
main()
Set the threshold environment variable in your shell profile:
# ~/.zshrc or ~/.bashrc
export CLAUDE_COST_ALERT_THRESHOLD="0.50"
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
Daily Cost Summary Script
Beyond per-session alerts, a daily summary helps you spot trends:
#!/usr/bin/env python3
# daily_cost_summary.py
"""
Generate a daily cost summary from Claude Code JSONL transcripts.
Run via cron: 0 9 * * * python3 /path/to/daily_cost_summary.py
"""
from __future__ import annotations
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
SESSIONS_BASE = Path.home() / ".claude" / "projects"
PRICING = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
def compute_cost(model: str, usage: dict) -> float:
inp_rate, out_rate = PRICING.get(model, (3.0, 15.0))
fresh = max(0, usage.get("input_tokens", 0) - usage.get("cache_read_input_tokens", 0))
return (
(fresh / 1_000_000) * inp_rate
+ (usage.get("cache_read_input_tokens", 0) / 1_000_000) * inp_rate * 0.10
+ (usage.get("cache_creation_input_tokens", 0) / 1_000_000) * inp_rate * 0.25
+ (usage.get("output_tokens", 0) / 1_000_000) * out_rate
)
def analyze_session_file(path: Path, since: datetime) -> Optional[dict]:
"""Parse a session JSONL and return summary if within time range."""
mtime = datetime.fromtimestamp(path.stat().st_mtime, tz=timezone.utc)
if mtime < since:
return None
total_input = 0
total_output = 0
total_cache_read = 0
total_cache_write = 0
model = "claude-sonnet-4-6"
turns = 0
tool_calls = 0
tool_failures = 0
try:
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
event = json.loads(line)
etype = event.get("type", "")
if etype == "assistant":
turns += 1
usage = event.get("usage", {})
total_input += usage.get("input_tokens", 0)
total_output += usage.get("output_tokens", 0)
total_cache_read += usage.get("cache_read_input_tokens", 0)
total_cache_write += usage.get("cache_creation_input_tokens", 0)
if event.get("model"):
model = event["model"]
elif etype == "tool_use":
tool_calls += 1
elif etype == "tool_result" and event.get("is_error"):
tool_failures += 1
except (json.JSONDecodeError, OSError):
return None
usage_dict = {
"input_tokens": total_input,
"output_tokens": total_output,
"cache_read_input_tokens": total_cache_read,
"cache_creation_input_tokens": total_cache_write,
}
return {
"session_id": path.stem,
"model": model,
"turns": turns,
"tool_calls": tool_calls,
"tool_failures": tool_failures,
"cost": compute_cost(model, usage_dict),
}
def main() -> None:
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
sessions = []
for jsonl_file in SESSIONS_BASE.rglob("*.jsonl"):
result = analyze_session_file(jsonl_file, yesterday_start)
if result:
sessions.append(result)
if not sessions:
print(f"No sessions found for {yesterday_start.date()}")
return
total_cost = sum(s["cost"] for s in sessions)
total_tools = sum(s["tool_calls"] for s in sessions)
total_failures = sum(s["tool_failures"] for s in sessions)
failure_rate = total_failures / total_tools if total_tools > 0 else 0
print(f"\n=== Claude Code Daily Summary: {yesterday_start.date()} ===")
print(f"Sessions: {len(sessions)}")
print(f"Total cost: ${total_cost:.4f}")
print(f"Tool calls: {total_tools}")
print(f"Tool failures: {total_failures} ({failure_rate:.1%})")
print(f"\nTop sessions by cost:")
for s in sorted(sessions, key=lambda x: -x["cost"])[:5]:
print(f" {s['session_id'][:12]}... ${s['cost']:.4f} {s['turns']} turns {s['model']}")
if __name__ == "__main__":
main()
Dashboard Examples
Option 1: Terminal Dashboard (No Dependencies)
For developers who spend most of their time in the terminal, a simple rich-text dashboard is often more practical than a Grafana setup:
#!/usr/bin/env python3
# claude_dashboard.py
"""
Terminal dashboard for Claude Code metrics.
Reads from ~/.claude/telemetry.db (created by telemetry.py hook).
"""
from __future__ import annotations
import sqlite3
from datetime import datetime, timezone, timedelta
from pathlib import Path
DB_PATH = Path.home() / ".claude" / "telemetry.db"
def get_stats(conn: sqlite3.Connection, days: int = 7) -> dict:
cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
stats: dict = {}
# Session count
row = conn.execute(
"SELECT COUNT(DISTINCT session_id) FROM events WHERE timestamp > ? AND event_type = 'SessionStart'",
(cutoff,),
).fetchone()
stats["sessions"] = row[0] if row else 0
# Total cost (from Stop events)
rows = conn.execute(
"SELECT tokens_input, tokens_output, model FROM events WHERE timestamp > ? AND event_type = 'Stop'",
(cutoff,),
).fetchall()
pricing = {
"claude-opus-4-6": (15.0, 75.0),
"claude-sonnet-4-6": (3.0, 15.0),
"claude-haiku-4": (0.25, 1.25),
}
total_cost = 0.0
for inp, out, model in rows:
if inp and out and model:
r_in, r_out = pricing.get(model, (3.0, 15.0))
total_cost += (inp / 1_000_000) * r_in + (out / 1_000_000) * r_out
stats["total_cost"] = total_cost
# Tool failure rate
tool_rows = conn.execute(
"SELECT event_type FROM events WHERE timestamp > ? AND event_type IN ('PostToolUse', 'PostToolUseFailure')",
(cutoff,),
).fetchall()
total_tool_events = len(tool_rows)
failures = sum(1 for r in tool_rows if r[0] == "PostToolUseFailure")
stats["tool_failure_rate"] = failures / total_tool_events if total_tool_events > 0 else 0
# Top failing tools
fail_rows = conn.execute(
"""SELECT tool_name, COUNT(*) as cnt
FROM events
WHERE timestamp > ? AND event_type = 'PostToolUseFailure'
GROUP BY tool_name
ORDER BY cnt DESC
LIMIT 5""",
(cutoff,),
).fetchall()
stats["top_failing_tools"] = fail_rows
return stats
def render(stats: dict, days: int) -> None:
line = "-" * 50
print(f"\n{line}")
print(f" Claude Code Metrics — Last {days} days")
print(f"{line}")
print(f" Sessions: {stats['sessions']}")
print(f" Estimated cost: ${stats['total_cost']:.4f}")
print(f" Tool failure rate: {stats['tool_failure_rate']:.1%}")
if stats["top_failing_tools"]:
print(f"\n Top failing tools:")
for name, count in stats["top_failing_tools"]:
print(f" {(name or 'unknown'):<30} {count}")
print(f"{line}\n")
def main() -> None:
if not DB_PATH.exists():
print("No telemetry database found. Install the telemetry hook first.")
return
conn = sqlite3.connect(DB_PATH)
stats = get_stats(conn)
render(stats, days=7)
conn.close()
if __name__ == "__main__":
main()
Option 2: Grafana via Prometheus Push Gateway
For teams that already have Grafana, push metrics to Prometheus Push Gateway from the Stop hook:
# In your Stop hook, add this function:
def push_to_prometheus(metrics: dict) -> None:
"""
Push metrics to Prometheus Push Gateway.
Requires PUSHGATEWAY_URL environment variable.
"""
pushgateway = os.environ.get("PUSHGATEWAY_URL")
if not pushgateway:
return
job_name = "claude_code"
lines = [
f'# HELP claude_code_session_cost_usd Estimated cost in USD for the session',
f'# TYPE claude_code_session_cost_usd gauge',
f'claude_code_session_cost_usd{{model="{metrics["model"]}"}} {metrics["cost"]:.6f}',
f'',
f'# HELP claude_code_tokens_total Total tokens used in session',
f'# TYPE claude_code_tokens_total gauge',
f'claude_code_tokens_total{{type="input",model="{metrics["model"]}"}} {metrics["input_tokens"]}',
f'claude_code_tokens_total{{type="output",model="{metrics["model"]}"}} {metrics["output_tokens"]}',
f'',
f'# HELP claude_code_tool_failure_rate Tool failure rate for the session',
f'# TYPE claude_code_tool_failure_rate gauge',
f'claude_code_tool_failure_rate {metrics["tool_failure_rate"]:.4f}',
f'',
f'# HELP claude_code_compactions_total Compaction events in session',
f'# TYPE claude_code_compactions_total gauge',
f'claude_code_compactions_total {metrics["compactions"]}',
]
payload = "\n".join(lines).encode("utf-8")
url = f"{pushgateway}/metrics/job/{job_name}/instance/{metrics['session_id'][:8]}"
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "text/plain"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=5) as resp:
_ = resp.read()
except urllib.error.URLError:
pass
With this in place, you can build Grafana panels for:
- Cost trend — line chart of daily estimated cost, per model
- Tool failure rate — stat panel with threshold coloring (green < 3%, yellow 3-7%, red > 7%)
- Compaction frequency — bar chart showing how often context is being reset
- Session duration vs cost — scatter plot to identify expensive outlier sessions
Error Tracing Patterns
Beyond aggregated metrics, there are cases where you need to trace a specific error back through the session. The JSONL transcript is your source of truth here.
Tool Error Correlation
The most common pattern: a tool fails, Claude retries in a different way, and you want to understand the failure chain:
#!/usr/bin/env python3
# trace_errors.py
"""
Trace tool errors in a Claude Code session.
Shows the full context around each failure: the preceding tool call
and the assistant message that triggered it.
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
SESSIONS_BASE = Path.home() / ".claude" / "projects"
def find_session(session_id: str) -> Path:
matches = list(SESSIONS_BASE.rglob(f"{session_id}.jsonl"))
if not matches:
raise FileNotFoundError(f"Session not found: {session_id}")
return matches[0]
def trace_errors(path: Path) -> None:
events = []
with path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
error_count = 0
for i, event in enumerate(events):
if event.get("type") != "tool_result" or not event.get("is_error"):
continue
error_count += 1
print(f"\n{'='*60}")
print(f"Error #{error_count} at event index {i}")
print(f"{'='*60}")
# Find the tool_use that preceded this result
tool_use = None
for j in range(i - 1, max(0, i - 5), -1):
if events[j].get("type") == "tool_use":
tool_use = events[j]
break
if tool_use:
print(f"\nTool call: {tool_use.get('name', 'unknown')}")
inp = tool_use.get("input", {})
if "command" in inp:
print(f"Command: {inp['command']}")
elif "path" in inp:
print(f"Path: {inp['path']}")
else:
print(f"Input: {json.dumps(inp)[:200]}")
# Print error content
content = event.get("content", "")
if isinstance(content, list):
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
print(f"\nError: {item.get('text', '')[:400]}")
else:
print(f"\nError: {str(content)[:400]}")
if error_count == 0:
print("No tool errors found in this session.")
else:
print(f"\n\nTotal errors: {error_count}")
def main() -> None:
if len(sys.argv) < 2:
print("Usage: python3 trace_errors.py <session_id>")
sys.exit(1)
path = find_session(sys.argv[1])
trace_errors(path)
if __name__ == "__main__":
main()
Compaction Loss Detection
Context compaction is a lossy operation — Claude summarizes what it knows and discards the raw context. If you are seeing degraded behavior after a compaction (Claude forgetting something it knew earlier), you can detect compaction events and log what context was present just before them:
# Add to your PreCompact hook handler:
def log_precompact_context(data: dict) -> None:
"""Log context summary before compaction for later comparison."""
session_id = data.get("session_id", "unknown")
context_size = data.get("tokens_in_context", 0)
log_path = Path.home() / ".claude" / "compaction_log.jsonl"
record = {
"timestamp": datetime.utcnow().isoformat(),
"session_id": session_id,
"context_tokens_before": context_size,
"compaction_reason": data.get("reason", "unknown"),
}
with log_path.open("a") as f:
f.write(json.dumps(record) + "\n")
If you suspect a specific compaction is causing a regression, check the compaction log and identify which session and approximate time the compaction occurred — then pull the transcript and review the conversation state at that point.
Key Metrics Reference
A quick reference for the metrics worth tracking and what to do when they spike:
| Metric | Healthy Range | Action When Outside |
|---|---|---|
| Tool failure rate | < 3% | Investigate top failing tools; check permissions, paths |
| Compactions per session | 0–1 | Reduce CLAUDE.md size; be more surgical with file reads |
| Cache hit ratio | > 60% | Review CLAUDE.md placement; ensure system prompt is stable |
| Session cost (Sonnet) | < $0.20 | Check for excessive file reads, large tool outputs |
| Session cost (Opus) | < $1.00 | Verify Opus is appropriate for the task |
| Output token ratio | < 15% of input | Normal. If higher, Claude may be producing redundant output |
The cache hit ratio deserves special attention. When it drops below 60%, it usually means something in your CLAUDE.md or system prompt is changing between turns, which breaks cache continuity. Common causes: timestamps in the prompt, dynamic content that changes per session, or adding context at the beginning rather than the end of a conversation.
Common Pitfalls
Mistake 1: Blocking the session on telemetry failures. Your monitoring infrastructure will go down at some point. If your hook blocks (exits non-zero by accident, hangs on a network call), it will disrupt Claude Code sessions. Always exit 0 from telemetry hooks, regardless of what happened. Use a separate error log for telemetry system failures.
Mistake 2: Logging sensitive tool inputs.
Tool inputs include file contents, command arguments, and potentially secrets. Before logging raw_json, strip or hash sensitive fields. At minimum, truncate long strings and exclude file content fields.
Mistake 3: Treating estimated costs as exact. The cost estimates in this guide are derived from token counts and published pricing. They exclude taxes, any negotiated enterprise discounts, rounding, and price changes Anthropic may have made after this was written. Use them for relative comparisons and trend analysis, not accounting.
Mistake 4: Ignoring compaction in token counts.
If you are summing token counts from assistant events, compacted context does not show up as a separate token cost — it is reflected in the elevated input token count of the first turn after compaction. Your total will be accurate; just do not expect to see a “compaction cost” line item.
Mistake 5: Setting alert thresholds too low. A $0.10 threshold for a Sonnet session will produce noise. Use the baseline from your first week of monitoring to calibrate thresholds. A reasonable starting point: alert at 3x your median session cost.
FAQ
Can I get token counts during a session, not just at the end?
Yes, each assistant event in the JSONL transcript includes a usage object with cumulative token counts for that turn. Parse the transcript in real time (tail the file) to get a running total. The Stop hook is cleaner for session-end totals.
Does monitoring add meaningful latency to Claude Code?
No, if implemented correctly. Hooks run asynchronously in most configurations. The only latency-critical hook is PreToolUse if you are using it to block — for telemetry-only hooks, processing time is irrelevant since Claude does not wait for the hook response.
Can I track cost per project, not just per session?
Yes. Session JSONL files are organized by project hash under ~/.claude/projects/<hash>/. Run the batch analysis script against a specific project directory to get per-project totals.
What if I am using Claude Code via the API rather than the CLI?
The hooks system is specific to the Claude Code CLI. For API usage, instrument your API calls directly using the usage field in the Anthropic API response, which returns input and output token counts for every request.
Will these scripts break when Claude Code updates? The JSONL format is stable across minor versions; the event types and field names have not changed significantly since hooks were introduced. The main breaking risk is if Anthropic changes the transcript location or format in a major release. Pin to a specific version in production environments and test upgrades before deploying.
The scripts in this guide are starting points, not finished products. Your actual monitoring setup will depend on your team’s existing infrastructure, how much Claude Code you are running, and whether you need real-time alerting or are comfortable with daily summaries. Start with the minimal telemetry hook and the JSONL parser — those two alone will surface the majority of the issues worth fixing.