Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions ddtrace/_trace/tracing_plugins/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
Integration tracing plugins.

This module provides the plugin system for tracing integrations.

The plugin system uses a hierarchical structure:

TracingPlugin (base)
├── OutboundPlugin (TO external services)
│ ├── ClientPlugin (HTTP clients, etc.)
│ │ └── StoragePlugin (storage systems)
│ │ └── DatabasePlugin (databases)
│ └── ProducerPlugin (message producers)
└── InboundPlugin (FROM external sources)
├── ServerPlugin (HTTP servers, etc.)
└── ConsumerPlugin (message consumers)

Usage:
# During ddtrace initialization
from ddtrace._trace.tracing_plugins import initialize
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the plugins going to be tracer specific? Would it make sense to put these modules in ddtrace/internal/plugins/..... I can see other products (asm, llmobs, serverless, ...) extending or using these plugins. Making them tracing specific puts us in a corner.


initialize() # Registers all plugins

Adding a new integration:
1. Create a plugin class in contrib/ that extends the appropriate base
2. Define `package` and `operation` properties
3. Register it in this module's initialize() function
"""

from typing import List
from typing import Type

from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin
from ddtrace.internal.logger import get_logger


log = get_logger(__name__)

# All registered plugins
_plugins: List[TracingPlugin] = []
_initialized: bool = False


def register(plugin_cls: Type[TracingPlugin]) -> None:
"""
Register a plugin class.

Creates an instance and registers its event handlers.

Args:
plugin_cls: Plugin class to register
"""
try:
plugin = plugin_cls()
plugin.register()
_plugins.append(plugin)
log.debug("Registered tracing plugin: %s.%s", plugin.package, plugin.operation)
except Exception:
log.warning(
"Failed to register tracing plugin: %s",
plugin_cls.__name__,
exc_info=True,
)


def initialize() -> None:
"""
Initialize all v2 integration plugins.

This should be called during ddtrace initialization.
Safe to call multiple times - only initializes once.
"""
global _initialized

if _initialized:
return

_initialized = True

# Import contrib plugins
from ddtrace._trace.tracing_plugins.contrib import asyncpg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid inlining imports at all costs. This is a code smell and a sign of circular imports. If importing modules have unintended side effects we should remove them (ex: importing from ddtrace.contrib should not register trace handler listeners, we should get rid of this import side effect)


# Database plugins
register(asyncpg.AsyncpgExecutePlugin)
register(asyncpg.AsyncpgConnectPlugin)

# TODO: Add more plugins as integrations are migrated:
# from ddtrace._trace.tracing_plugins.contrib import psycopg
# from ddtrace._trace.tracing_plugins.contrib import mysql
# from ddtrace._trace.tracing_plugins.contrib import httpx
# from ddtrace._trace.tracing_plugins.contrib import flask
# from ddtrace._trace.tracing_plugins.contrib import kafka

log.debug("Initialized %d tracing plugins", len(_plugins))


def get_plugins() -> List[TracingPlugin]:
"""
Get all registered plugins.

Returns:
List of registered plugin instances
"""
return _plugins.copy()


def reset() -> None:
"""
Reset plugin registration.

This is primarily for testing purposes.
"""
global _plugins, _initialized
_plugins = []
_initialized = False
28 changes: 28 additions & 0 deletions ddtrace/_trace/tracing_plugins/base/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Base tracing plugin classes.

This module provides the hierarchical plugin system for tracing integrations.
"""

from ddtrace._trace.tracing_plugins.base.client import ClientPlugin
from ddtrace._trace.tracing_plugins.base.consumer import ConsumerPlugin
from ddtrace._trace.tracing_plugins.base.database import DatabasePlugin
from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin
from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin
from ddtrace._trace.tracing_plugins.base.producer import ProducerPlugin
from ddtrace._trace.tracing_plugins.base.server import ServerPlugin
from ddtrace._trace.tracing_plugins.base.storage import StoragePlugin
from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin


__all__ = [
"TracingPlugin",
"OutboundPlugin",
"ClientPlugin",
"StoragePlugin",
"DatabasePlugin",
"InboundPlugin",
"ServerPlugin",
"ProducerPlugin",
"ConsumerPlugin",
]
27 changes: 27 additions & 0 deletions ddtrace/_trace/tracing_plugins/base/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
ClientPlugin - Base for client-side operations.

Extends OutboundPlugin for client-type operations like HTTP requests.
This is the base for HTTP clients, gRPC clients, etc.
"""

from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin
from ddtrace.ext import SpanKind


class ClientPlugin(OutboundPlugin):
"""
Base plugin for client-side operations.

Extends OutboundPlugin with:
- kind: CLIENT (span.kind = "client")
- type: "web" by default

Use this as the base for:
- HTTP clients (httpx, requests, aiohttp client)
- gRPC clients
- Other RPC clients
"""

kind = SpanKind.CLIENT
type = "web"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use SpanType here.

132 changes: 132 additions & 0 deletions ddtrace/_trace/tracing_plugins/base/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""
ConsumerPlugin - Base for message consumers.

Extends InboundPlugin since consuming messages is an inbound operation
(receiving data FROM a message broker/queue).
"""

from typing import TYPE_CHECKING
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't use TYPE_CHECKING constant any more. This is used to support python2 style typing (types in comments). We should enforce python3 typing and if we do we don;t need to hid imports.

Using this is actually a code smell because it can hide circular imports

from typing import Any
from typing import Optional
from typing import Tuple

from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin


if TYPE_CHECKING:
from ddtrace._trace.span import Span
from ddtrace._trace.tracing_plugins.base.events import MessagingContext
from ddtrace.internal.core import ExecutionContext


class ConsumerPlugin(InboundPlugin):
"""
Base plugin for message consumers.

Extends InboundPlugin - consuming is receiving data IN from a broker.

Handles:
- Trace context extraction from message headers
- Span linking to producer span
- Destination tagging (topic, queue, exchange)
- Message metadata tagging (offset, partition, etc.)

Subclasses define:
- package: str (e.g., "kafka", "rabbitmq")
- operation: str (e.g., "consume", "receive")
"""

from ddtrace.ext import SpanKind

kind = SpanKind.CONSUMER
type = "worker"

def on_start(self, ctx: "ExecutionContext") -> None:
"""
Create consumer span with context extraction.

Reads from ctx:
- pin: The Pin instance
- messaging_context: MessagingContext with destination, headers, etc.
- span_name: Optional override for span name
"""
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.ext import SpanTypes
from ddtrace.propagation.http import HTTPPropagator

pin = ctx.get_item("pin")
msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context")

if not pin or not pin.enabled() or not msg_ctx:
return

# Determine span name
span_name = ctx.get_item("span_name") or f"{msg_ctx.messaging_system}.consume"

# Create span
span = self.start_span(
ctx,
span_name,
resource=msg_ctx.destination,
span_type=SpanTypes.WORKER,
)

if not span:
return

# Mark as measured
span.set_metric(_SPAN_MEASURED_KEY, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very important tag for trace metrics. Now that we are getting rid of integration specific service names we will ONLY compute trace metrics for spans with this tag OR tags that have a span.kind of producer, server, client, and producer.

To be safe we should move this to be a core component for all plugins. This should be set on the first span for each integration


# Set messaging tags
self._set_messaging_tags(span, msg_ctx)

# Extract and link trace context from message headers
if msg_ctx.headers:
distributed_ctx = HTTPPropagator.extract(msg_ctx.headers)
if distributed_ctx:
span.link_span(distributed_ctx)

def on_finish(
self,
ctx: "ExecutionContext",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we avoiding circular imports here? Can we avoid this?

exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]],
) -> None:
"""
Finish consumer span with message metadata.

Reads from ctx:
- messaging_context: For consume metadata (offset, partition, etc.)
"""
span = ctx.span
if not span:
return

msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context")

# Set consume-specific metadata
if msg_ctx:
if msg_ctx.message_id:
span._set_tag_str("messaging.message.id", msg_ctx.message_id)
if msg_ctx.partition is not None:
span.set_metric("messaging.kafka.partition", msg_ctx.partition)
if msg_ctx.offset is not None:
span.set_metric("messaging.kafka.offset", msg_ctx.offset)

# Call parent for error handling
super().on_finish(ctx, exc_info)

def _set_messaging_tags(self, span: "Span", msg_ctx: "MessagingContext") -> None:
"""Set standard messaging tags."""
span._set_tag_str("messaging.system", msg_ctx.messaging_system)
span._set_tag_str("messaging.destination.name", msg_ctx.destination)
span._set_tag_str("messaging.operation", "receive")

if msg_ctx.key:
span._set_tag_str("messaging.kafka.message.key", msg_ctx.key)

if msg_ctx.batch_size is not None and msg_ctx.batch_size > 1:
span.set_metric("messaging.batch.message_count", msg_ctx.batch_size)

# Extra tags
if msg_ctx.tags:
span.set_tags(msg_ctx.tags)
Loading
Loading