Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion langfuse/_client/propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from opentelemetry import (
trace as otel_trace_api,
)
from opentelemetry.context import _RUNTIME_CONTEXT
from opentelemetry.util._decorator import (
_AgnosticContextManager,
_agnosticcontextmanager,
Expand Down Expand Up @@ -272,7 +273,13 @@ def _propagate_attributes(
yield

finally:
otel_context_api.detach(token)
try:
# Bypass the public detach() which logs an ERROR when the token was
# created in a different async task/thread (common in async frameworks).
# The span data is already captured; the failed detach is harmless.
_RUNTIME_CONTEXT.detach(token)
except Exception:
pass


def _get_propagated_attributes_from_context(
Expand Down
236 changes: 225 additions & 11 deletions langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from contextvars import Token
from typing import (
Any,
Expand Down Expand Up @@ -35,6 +36,166 @@
from langfuse.logger import langfuse_logger
from langfuse.types import TraceContext


def _to_langfuse_tool(tool: Any) -> List[Any]:
"""Normalize a tool definition into a list of OpenAI-format tool dicts.

Returns a list because Google / Vertex AI's ``function_declarations`` format
bundles multiple tools into a single object, so one input can expand to many.

LangChain providers serialize tools differently depending on the backend:
- OpenAI / LiteLLM / Ollama: {type: "function", function: {name, description, parameters}}
- Anthropic (ChatAnthropic): {name, description, input_schema}
- Google / Vertex AI: {function_declarations: [{name, description, parameters}, ...]}
- BaseTool / StructuredTool objects: LangChain objects not yet converted to dict

All formats are normalized to the canonical OpenAI shape so Langfuse's
``extractToolsFromObservation`` (which uses ``OpenAIToolSchema``) can parse them.
"""
if not isinstance(tool, dict):
# BaseTool / StructuredTool objects — passed without dict conversion (langfuse#11850).
# Extract via duck typing to avoid a hard langchain-core dependency.
if hasattr(tool, "name") and hasattr(tool, "description"):
try:
parameters: Dict[str, Any] = {}
args_schema = getattr(tool, "args_schema", None)
if args_schema is not None:
if hasattr(args_schema, "model_json_schema"): # Pydantic v2
schema = args_schema.model_json_schema()
elif hasattr(args_schema, "schema"): # Pydantic v1
schema = args_schema.schema()
else:
schema = {}
parameters = {k: v for k, v in schema.items() if k != "title"}
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": parameters,
},
}
]
except Exception:
langfuse_logger.debug(
"Failed to convert BaseTool object to dict: %s", tool
)
return [tool]

# Already in OpenAI format: {type: "function", function: {name, description, parameters}}
if tool.get("type") == "function" and "function" in tool:
return [tool]

# Anthropic format: {name, description, input_schema} -> OpenAI format
if "name" in tool and "input_schema" in tool:
return [
{
"type": "function",
"function": {
"name": tool["name"],
"description": tool.get("description", ""),
"parameters": tool["input_schema"],
},
}
]

# Google / Vertex AI format: {function_declarations: [{name, description, parameters}, ...]}
# One object bundles multiple tool definitions — expand to individual OpenAI-format tools.
if "function_declarations" in tool:
result = []
for decl in tool.get("function_declarations", []):
if not isinstance(decl, dict):
continue
result.append(
{
"type": "function",
"function": {
"name": decl.get("name", ""),
"description": decl.get("description", ""),
"parameters": decl.get("parameters", {}),
},
}
)
return result if result else [tool]

return [tool]


def _normalize_anthropic_content_blocks(
content: List[Any], tool_calls: List[Dict[str, Any]]
) -> List[Any]:
"""Remove streaming artifacts from Anthropic content blocks.

Anthropic streaming leaves tool_use blocks with ``input: {}`` and
streaming-specific fields (``index``, ``partial_json``). The actual
arguments are already reconstructed in ``message.tool_calls``. This
helper fills the empty ``input`` from the normalized tool_calls and
strips the streaming-only keys so the block looks like a proper
Anthropic content block.
"""
if not tool_calls:
return content
tc_by_id: Dict[str, Any] = {
tc["id"]: tc.get("args", {})
for tc in tool_calls
if isinstance(tc, dict) and "id" in tc
}
tc_by_name: Dict[str, Any] = {
tc["name"]: tc.get("args", {})
for tc in tool_calls
if isinstance(tc, dict) and "name" in tc
}
result = []
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_use":
block_input = block.get("input") or {}
if not block_input:
block_input = (
tc_by_id.get(block.get("id", ""))
or tc_by_name.get(block.get("name", ""))
or {}
)
normalized = {
k: v for k, v in block.items() if k not in ("index", "partial_json")
}
normalized["input"] = block_input
result.append(normalized)
else:
result.append(block)
return result


def _convert_tool_call(tc: Any, include_error: bool = False) -> Optional[Dict[str, Any]]:
"""Convert a single tool call dict to Langfuse's canonical format.

Handles both LangChain format {name, args, id} and Anthropic streaming
format {type: "tool_use", name, input, id}.

Returns None (and logs a debug message) if tc is not a dict.
Set include_error=True for invalid_tool_calls entries.
"""
if not isinstance(tc, dict):
langfuse_logger.debug("Skipping tool_call entry that is not a dict: %s", tc)
return None
# Anthropic streaming uses "input" instead of "args"
args = tc.get("args") or tc.get("input") or {}
try:
arguments = json.dumps(args)
except (TypeError, ValueError) as e:
langfuse_logger.debug("Failed to serialize tool call args to JSON: %s", e)
arguments = "{}"
result: Dict[str, Any] = {
"id": tc.get("id", ""),
"type": "function",
"name": tc.get("name", ""),
"arguments": arguments,
}
if include_error:
result["error"] = tc.get("error", "")
return result


try:
import langchain

Expand Down Expand Up @@ -841,9 +1002,27 @@ def __on_llm_action(
self._child_to_parent_run_id_map[run_id] = parent_run_id

try:
tools = kwargs.get("invocation_params", {}).get("tools", None)
observation_input: Any = prompts
invocation_params = kwargs.get("invocation_params", {})
langfuse_logger.debug(
"LLM action invocation_params keys: %s", list(invocation_params.keys())
)
tools = invocation_params.get("tools", None)
langfuse_logger.debug(
"LLM action tools from invocation_params: %s", tools
)
if tools and isinstance(tools, list):
prompts.extend([{"role": "tool", "content": tool} for tool in tools])
# Structure input as {messages, tools} so extractToolsFromObservation
# can find tool definitions at the top-level tools key — the canonical
# format expected by the backend's LLMToolDefinitionSchema.
normalized_tools = [n for t in tools for n in _to_langfuse_tool(t)]
langfuse_logger.debug(
"LLM action normalized tools: %s", normalized_tools
)
observation_input = {
"messages": prompts,
"tools": normalized_tools,
}

model_name = self._parse_model_and_log_errors(
serialized=serialized, metadata=metadata, kwargs=kwargs
Expand All @@ -868,7 +1047,7 @@ def __on_llm_action(

content = {
"name": self.get_langchain_run_name(serialized, **kwargs),
"input": prompts,
"input": observation_input,
"metadata": self.__join_tags_and_metadata(
tags,
metadata,
Expand Down Expand Up @@ -1049,21 +1228,56 @@ def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]:
if isinstance(message, HumanMessage):
message_dict: Dict[str, Any] = {"role": "user", "content": message.content}
elif isinstance(message, AIMessage):
message_dict = {"role": "assistant", "content": message.content}

if (
# Normalize Anthropic content blocks: fill empty tool_use input from
# message.tool_calls and strip streaming artifacts (index, partial_json).
content = message.content
langfuse_logger.debug(
"AIMessage content type=%s value=%s", type(content).__name__, content
)
lc_tool_calls = (
list(message.tool_calls)
if hasattr(message, "tool_calls") and message.tool_calls
else []
)
langfuse_logger.debug(
"AIMessage tool_calls=%s additional_kwargs=%s",
lc_tool_calls,
message.additional_kwargs,
)
if isinstance(content, list) and lc_tool_calls:
content = _normalize_anthropic_content_blocks(content, lc_tool_calls)
langfuse_logger.debug("AIMessage normalized content=%s", content)
message_dict = {"role": "assistant", "content": content}

# Resolve tool_calls: prefer LangChain's normalized {name, args, id}
# format; fall back to additional_kwargs["tool_calls"] which contains
# Anthropic's raw {type: "tool_use", name, input, id} format when
# streaming is used and message.tool_calls is not populated.
raw_tool_calls = message.tool_calls if (
hasattr(message, "tool_calls")
and message.tool_calls is not None
and len(message.tool_calls) > 0
):
message_dict["tool_calls"] = message.tool_calls
) else message.additional_kwargs.get("tool_calls") or []

if raw_tool_calls:
converted_tool_calls = [
r for tc in raw_tool_calls if (r := _convert_tool_call(tc)) is not None
]
if converted_tool_calls:
message_dict["tool_calls"] = converted_tool_calls

if (
hasattr(message, "invalid_tool_calls")
and message.invalid_tool_calls is not None
hasattr(message, "invalid_tool_calls")
and message.invalid_tool_calls is not None
and len(message.invalid_tool_calls) > 0
):
message_dict["invalid_tool_calls"] = message.invalid_tool_calls
converted_invalid_tool_calls = [
r
for tc in message.invalid_tool_calls
if (r := _convert_tool_call(tc, include_error=True)) is not None
]
if converted_invalid_tool_calls:
message_dict["invalid_tool_calls"] = converted_invalid_tool_calls

elif isinstance(message, SystemMessage):
message_dict = {"role": "system", "content": message.content}
Expand Down
Loading