diff --git a/ddtrace/_trace/tracing_plugins/__init__.py b/ddtrace/_trace/tracing_plugins/__init__.py new file mode 100644 index 00000000000..56b3597f9e2 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/__init__.py @@ -0,0 +1,115 @@ +""" +Integration tracing plugins. + +This module provides the plugin system for tracing integrations. + +The plugin system uses a hierarchical structure: + + TracingPlugin (base) + ├── OutboundPlugin (TO external services) + │ ├── ClientPlugin (HTTP clients, etc.) + │ │ └── StoragePlugin (storage systems) + │ │ └── DatabasePlugin (databases) + │ └── ProducerPlugin (message producers) + └── InboundPlugin (FROM external sources) + ├── ServerPlugin (HTTP servers, etc.) + └── ConsumerPlugin (message consumers) + +Usage: + # During ddtrace initialization + from ddtrace._trace.tracing_plugins import initialize + + initialize() # Registers all plugins + +Adding a new integration: + 1. Create a plugin class in contrib/ that extends the appropriate base + 2. Define `package` and `operation` properties + 3. Register it in this module's initialize() function +""" + +from typing import List +from typing import Type + +from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin +from ddtrace.internal.logger import get_logger + + +log = get_logger(__name__) + +# All registered plugins +_plugins: List[TracingPlugin] = [] +_initialized: bool = False + + +def register(plugin_cls: Type[TracingPlugin]) -> None: + """ + Register a plugin class. + + Creates an instance and registers its event handlers. + + Args: + plugin_cls: Plugin class to register + """ + try: + plugin = plugin_cls() + plugin.register() + _plugins.append(plugin) + log.debug("Registered tracing plugin: %s.%s", plugin.package, plugin.operation) + except Exception: + log.warning( + "Failed to register tracing plugin: %s", + plugin_cls.__name__, + exc_info=True, + ) + + +def initialize() -> None: + """ + Initialize all v2 integration plugins. + + This should be called during ddtrace initialization. + Safe to call multiple times - only initializes once. + """ + global _initialized + + if _initialized: + return + + _initialized = True + + # Import contrib plugins + from ddtrace._trace.tracing_plugins.contrib import asyncpg + + # Database plugins + register(asyncpg.AsyncpgExecutePlugin) + register(asyncpg.AsyncpgConnectPlugin) + + # TODO: Add more plugins as integrations are migrated: + # from ddtrace._trace.tracing_plugins.contrib import psycopg + # from ddtrace._trace.tracing_plugins.contrib import mysql + # from ddtrace._trace.tracing_plugins.contrib import httpx + # from ddtrace._trace.tracing_plugins.contrib import flask + # from ddtrace._trace.tracing_plugins.contrib import kafka + + log.debug("Initialized %d tracing plugins", len(_plugins)) + + +def get_plugins() -> List[TracingPlugin]: + """ + Get all registered plugins. + + Returns: + List of registered plugin instances + """ + return _plugins.copy() + + +def reset() -> None: + """ + Reset plugin registration. + + This is primarily for testing purposes. + """ + global _plugins, _initialized + _plugins = [] + _initialized = False diff --git a/ddtrace/_trace/tracing_plugins/base/__init__.py b/ddtrace/_trace/tracing_plugins/base/__init__.py new file mode 100644 index 00000000000..d6b245cd174 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/__init__.py @@ -0,0 +1,28 @@ +""" +Base tracing plugin classes. + +This module provides the hierarchical plugin system for tracing integrations. +""" + +from ddtrace._trace.tracing_plugins.base.client import ClientPlugin +from ddtrace._trace.tracing_plugins.base.consumer import ConsumerPlugin +from ddtrace._trace.tracing_plugins.base.database import DatabasePlugin +from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin +from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin +from ddtrace._trace.tracing_plugins.base.producer import ProducerPlugin +from ddtrace._trace.tracing_plugins.base.server import ServerPlugin +from ddtrace._trace.tracing_plugins.base.storage import StoragePlugin +from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin + + +__all__ = [ + "TracingPlugin", + "OutboundPlugin", + "ClientPlugin", + "StoragePlugin", + "DatabasePlugin", + "InboundPlugin", + "ServerPlugin", + "ProducerPlugin", + "ConsumerPlugin", +] diff --git a/ddtrace/_trace/tracing_plugins/base/client.py b/ddtrace/_trace/tracing_plugins/base/client.py new file mode 100644 index 00000000000..f29407eec17 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/client.py @@ -0,0 +1,27 @@ +""" +ClientPlugin - Base for client-side operations. + +Extends OutboundPlugin for client-type operations like HTTP requests. +This is the base for HTTP clients, gRPC clients, etc. +""" + +from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin +from ddtrace.ext import SpanKind + + +class ClientPlugin(OutboundPlugin): + """ + Base plugin for client-side operations. + + Extends OutboundPlugin with: + - kind: CLIENT (span.kind = "client") + - type: "web" by default + + Use this as the base for: + - HTTP clients (httpx, requests, aiohttp client) + - gRPC clients + - Other RPC clients + """ + + kind = SpanKind.CLIENT + type = "web" diff --git a/ddtrace/_trace/tracing_plugins/base/consumer.py b/ddtrace/_trace/tracing_plugins/base/consumer.py new file mode 100644 index 00000000000..c11a639ba38 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/consumer.py @@ -0,0 +1,132 @@ +""" +ConsumerPlugin - Base for message consumers. + +Extends InboundPlugin since consuming messages is an inbound operation +(receiving data FROM a message broker/queue). +""" + +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin + + +if TYPE_CHECKING: + from ddtrace._trace.span import Span + from ddtrace._trace.tracing_plugins.base.events import MessagingContext + from ddtrace.internal.core import ExecutionContext + + +class ConsumerPlugin(InboundPlugin): + """ + Base plugin for message consumers. + + Extends InboundPlugin - consuming is receiving data IN from a broker. + + Handles: + - Trace context extraction from message headers + - Span linking to producer span + - Destination tagging (topic, queue, exchange) + - Message metadata tagging (offset, partition, etc.) + + Subclasses define: + - package: str (e.g., "kafka", "rabbitmq") + - operation: str (e.g., "consume", "receive") + """ + + from ddtrace.ext import SpanKind + + kind = SpanKind.CONSUMER + type = "worker" + + def on_start(self, ctx: "ExecutionContext") -> None: + """ + Create consumer span with context extraction. + + Reads from ctx: + - pin: The Pin instance + - messaging_context: MessagingContext with destination, headers, etc. + - span_name: Optional override for span name + """ + from ddtrace.constants import _SPAN_MEASURED_KEY + from ddtrace.ext import SpanTypes + from ddtrace.propagation.http import HTTPPropagator + + pin = ctx.get_item("pin") + msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context") + + if not pin or not pin.enabled() or not msg_ctx: + return + + # Determine span name + span_name = ctx.get_item("span_name") or f"{msg_ctx.messaging_system}.consume" + + # Create span + span = self.start_span( + ctx, + span_name, + resource=msg_ctx.destination, + span_type=SpanTypes.WORKER, + ) + + if not span: + return + + # Mark as measured + span.set_metric(_SPAN_MEASURED_KEY, 1) + + # Set messaging tags + self._set_messaging_tags(span, msg_ctx) + + # Extract and link trace context from message headers + if msg_ctx.headers: + distributed_ctx = HTTPPropagator.extract(msg_ctx.headers) + if distributed_ctx: + span.link_span(distributed_ctx) + + def on_finish( + self, + ctx: "ExecutionContext", + exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]], + ) -> None: + """ + Finish consumer span with message metadata. + + Reads from ctx: + - messaging_context: For consume metadata (offset, partition, etc.) + """ + span = ctx.span + if not span: + return + + msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context") + + # Set consume-specific metadata + if msg_ctx: + if msg_ctx.message_id: + span._set_tag_str("messaging.message.id", msg_ctx.message_id) + if msg_ctx.partition is not None: + span.set_metric("messaging.kafka.partition", msg_ctx.partition) + if msg_ctx.offset is not None: + span.set_metric("messaging.kafka.offset", msg_ctx.offset) + + # Call parent for error handling + super().on_finish(ctx, exc_info) + + def _set_messaging_tags(self, span: "Span", msg_ctx: "MessagingContext") -> None: + """Set standard messaging tags.""" + span._set_tag_str("messaging.system", msg_ctx.messaging_system) + span._set_tag_str("messaging.destination.name", msg_ctx.destination) + span._set_tag_str("messaging.operation", "receive") + + if msg_ctx.key: + span._set_tag_str("messaging.kafka.message.key", msg_ctx.key) + + if msg_ctx.batch_size is not None and msg_ctx.batch_size > 1: + span.set_metric("messaging.batch.message_count", msg_ctx.batch_size) + + # Extra tags + if msg_ctx.tags: + span.set_tags(msg_ctx.tags) diff --git a/ddtrace/_trace/tracing_plugins/base/database.py b/ddtrace/_trace/tracing_plugins/base/database.py new file mode 100644 index 00000000000..9d5a1d4c7cc --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/database.py @@ -0,0 +1,226 @@ +""" +DatabasePlugin - Base for all database integrations. + +Extends StoragePlugin with database-specific functionality: +- DBM (Database Monitoring) comment propagation +- Query tagging and truncation +- Rowcount tracking +- Standard database tags (db.system, db.name, db.user, etc.) +""" + +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from ddtrace._trace.tracing_plugins.base.storage import StoragePlugin + + +if TYPE_CHECKING: + from ddtrace._trace.span import Span + from ddtrace._trace.tracing_plugins.base.events import DatabaseContext + from ddtrace.internal.core import ExecutionContext + + +class DatabasePlugin(StoragePlugin): + """ + Base plugin for database integrations. + + Handles all common database tracing: + - Standard tags (db.system, db.name, db.user, net.target.host, etc.) + - DBM (Database Monitoring) comment propagation + - Query resource tagging + - Rowcount tracking + - Measured span marking + + Subclasses just need to define: + - package: str (e.g., "asyncpg") + - operation: str (e.g., "execute") + - db_system: str (e.g., "postgresql") + """ + + type = "sql" + + # Override in subclasses + db_system: Optional[str] = None # "postgresql", "mysql", "sqlite", "mongodb" + + def on_start(self, ctx: "ExecutionContext") -> None: + """ + Create database span with standard tags. + + Reads from ctx: + - pin: The Pin instance + - db_context: DatabaseContext with query, connection info, etc. + - span_name: Optional override for span name + - resource: Optional override for resource name + """ + from ddtrace.constants import _SPAN_MEASURED_KEY + from ddtrace.ext import SpanTypes + + pin = ctx.get_item("pin") + db_ctx: Optional["DatabaseContext"] = ctx.get_item("db_context") + + if not pin or not pin.enabled(): + return + + # Get span name + span_name = ctx.get_item("span_name") or self._get_span_name() + + # Determine resource (query) + resource = ctx.get_item("resource") + if resource is None and db_ctx and db_ctx.query: + resource = db_ctx.query + + # Create span + span = self.start_span( + ctx, + span_name, + resource=resource, + span_type=SpanTypes.SQL, + ) + + if not span: + return + + # Mark as measured (shows in APM metrics) + span.set_metric(_SPAN_MEASURED_KEY, 1) + + # Set database tags from context + if db_ctx: + self._set_database_tags(span, db_ctx) + + # Set any tags from pin + if pin.tags: + span.set_tags(pin.tags) + + # Handle DBM propagation + if db_ctx and db_ctx.dbm_propagator: + self._apply_dbm_propagation(ctx, span, db_ctx) + + def on_finish( + self, + ctx: "ExecutionContext", + exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]], + ) -> None: + """ + Finish span with rowcount and error handling. + + Reads from ctx: + - rowcount: Optional row count from query result + """ + from ddtrace.ext import db + + span = ctx.span + if not span: + return + + # Set rowcount if available + rowcount = ctx.get_item("rowcount") + if rowcount is not None: + span.set_metric(db.ROWCOUNT, rowcount) + # Also set as tag for backward compatibility + if isinstance(rowcount, int) and rowcount >= 0: + span.set_tag(db.ROWCOUNT, rowcount) + + # Call parent for peer service and error handling + super().on_finish(ctx, exc_info) + + def _get_span_name(self) -> str: + """ + Get default span name for this database. + + Uses schema to generate proper span name based on db_system. + """ + from ddtrace.internal.schema import schematize_database_operation + + system = self.db_system or self.system or "db" + return schematize_database_operation( + f"{system}.query", + database_provider=system, + ) + + def _set_database_tags(self, span: "Span", db_ctx: "DatabaseContext") -> None: + """ + Set standard database tags from DatabaseContext. + + Tags set: + - db.system: Database type + - db.name: Database name + - db.user: Database user + - net.target.host: Host + - net.target.port: Port + - server.address: Host (alias) + """ + from ddtrace.ext import db + + # Database system + system = db_ctx.db_system or self.db_system + if system: + span._set_tag_str(db.SYSTEM, system) + + # Connection info + if db_ctx.host: + self.add_host(span, db_ctx.host, db_ctx.port) + + if db_ctx.user: + span._set_tag_str(db.USER, db_ctx.user) + + if db_ctx.database: + span._set_tag_str(db.NAME, db_ctx.database) + + # Extra tags from context + if db_ctx.tags: + span.set_tags(db_ctx.tags) + + def _apply_dbm_propagation( + self, + ctx: "ExecutionContext", + span: "Span", + db_ctx: "DatabaseContext", + ) -> None: + """ + Apply DBM (Database Monitoring) comment injection. + + This injects a SQL comment with trace context into the query, + allowing correlation between traces and database query analytics. + + The actual injection is handled by a dispatcher that the integration + can hook into. + """ + from ddtrace.internal import core + + if not db_ctx.dbm_propagator: + return + + # Dispatch to integration-specific DBM handler + # The handler modifies args/kwargs to inject the DBM comment + result = core.dispatch_with_results( + f"{self.package}.execute", + ( + self.integration_config, + span, + ctx.get_item("args", ()), + ctx.get_item("kwargs", {}), + ), + ).result + + if result and result.value: + _, modified_args, modified_kwargs = result.value + ctx.set_item("modified_args", modified_args) + ctx.set_item("modified_kwargs", modified_kwargs) + + def _get_peer_service(self, ctx: "ExecutionContext", span: "Span") -> Optional[str]: + """ + Get peer service for database spans. + + For databases, prefer db.name as the peer service identifier. + """ + from ddtrace.ext import db + + # Try db.name first for databases + db_name = span.get_tag(db.NAME) + if db_name: + return str(db_name) + + # Fall back to parent implementation + return super()._get_peer_service(ctx, span) diff --git a/ddtrace/_trace/tracing_plugins/base/events.py b/ddtrace/_trace/tracing_plugins/base/events.py new file mode 100644 index 00000000000..2f2ed3aa56f --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/events.py @@ -0,0 +1,161 @@ +""" +Event context types - the contract between patch and subscriber sides. + +These dataclasses define the structured data passed from integration patch code +to tracing plugins via the core context system. +""" + +from dataclasses import dataclass +from dataclasses import field +from typing import Any +from typing import Callable +from typing import Dict +from typing import Optional + + +@dataclass +class DatabaseContext: + """ + Context passed by database integrations. + + Contains all information needed to create and tag a database span. + """ + + # Required - identifies the database system + db_system: str # "postgresql", "mysql", "sqlite", "mongodb", etc. + + # Query info + query: Optional[str] = None + + # Connection info + host: Optional[str] = None + port: Optional[int] = None + user: Optional[str] = None + database: Optional[str] = None + + # DBM (Database Monitoring) propagation + dbm_propagator: Optional[Any] = None + + # Extra tags specific to this integration + tags: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class HTTPClientContext: + """ + Context passed by HTTP client integrations. + + Contains request information for outbound HTTP calls. + """ + + # Required - request info + method: str # GET, POST, PUT, DELETE, etc. + url: str + + # Target info for service naming and peer service + target_host: Optional[str] = None + target_port: Optional[int] = None + + # Request headers (for tagging, not modification) + headers: Dict[str, str] = field(default_factory=dict) + + # Callback to inject distributed tracing headers into the request + # The callback receives a dict of headers to add + inject_headers: Optional[Callable[[Dict[str, str]], None]] = None + + # Extra tags + tags: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class HTTPClientResponseContext: + """ + Response context for HTTP clients. + + Set on the execution context after the response is received. + """ + + status_code: int + headers: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class WebContext: + """ + Context passed by web framework integrations. + + Contains incoming request information for server-side handling. + """ + + # Required - request info + method: str # GET, POST, PUT, DELETE, etc. + url: str + path: str + + # Route info (may be set later after routing) + route: Optional[str] = None + + # Headers for distributed tracing extraction + headers: Dict[str, str] = field(default_factory=dict) + + # Query and path parameters + query_params: Dict[str, str] = field(default_factory=dict) + path_params: Dict[str, str] = field(default_factory=dict) + + # Client info + remote_addr: Optional[str] = None + + # Extra tags + tags: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class WebResponseContext: + """ + Response context for web frameworks. + + Set on the execution context when the response is ready. + """ + + status_code: int + headers: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class MessagingContext: + """ + Context passed by messaging integrations (Kafka, RabbitMQ, SQS, etc.). + + Used for both produce and consume operations. + """ + + # Required - identifies the messaging system + messaging_system: str # "kafka", "rabbitmq", "sqs", "sns", etc. + + # Required - destination (topic, queue, exchange) + destination: str + + # Operation type + operation: str # "produce" or "consume" + + # Optional broker connection info (for peer service on produce) + host: Optional[str] = None + port: Optional[int] = None + + # Message metadata + message_id: Optional[str] = None + partition: Optional[int] = None + offset: Optional[int] = None + key: Optional[str] = None + + # For distributed tracing: + # - On produce: inject_headers callback to add trace headers to message + # - On consume: headers dict containing trace context from message + headers: Dict[str, str] = field(default_factory=dict) + inject_headers: Optional[Callable[[Dict[str, str]], None]] = None + + # Batch info (for batch produce/consume) + batch_size: Optional[int] = None + + # Extra tags + tags: Dict[str, Any] = field(default_factory=dict) diff --git a/ddtrace/_trace/tracing_plugins/base/inbound.py b/ddtrace/_trace/tracing_plugins/base/inbound.py new file mode 100644 index 00000000000..9b134b47c78 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/inbound.py @@ -0,0 +1,84 @@ +""" +InboundPlugin - Base for incoming connections. + +This plugin handles connections FROM external sources: +- Web framework requests +- Message queue consumers +- gRPC servers + +Provides distributed tracing context extraction. +""" + +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional + +from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin + + +if TYPE_CHECKING: + from ddtrace.internal.core import ExecutionContext + + +class InboundPlugin(TracingPlugin): + """ + Base plugin for inbound connections. + + Connections FROM external sources: + - HTTP server requests + - gRPC server calls + - Message queue consumers + - WebSocket connections + + Provides: + - Distributed tracing context extraction + - Parent store binding for context propagation + """ + + type = "inbound" + + def extract_distributed_context( + self, + ctx: "ExecutionContext", + headers: dict, + ) -> Optional[Any]: + """ + Extract distributed tracing context from headers. + + Args: + ctx: The execution context + headers: Dict-like object containing trace headers + + Returns: + Extracted context or None + """ + from ddtrace.propagation.http import HTTPPropagator + + return HTTPPropagator.extract(headers) + + def activate_distributed_context( + self, + ctx: "ExecutionContext", + headers: dict, + ) -> None: + """ + Extract and activate distributed tracing context. + + This sets up the trace context from incoming headers so that + spans created during request handling are part of the distributed trace. + + Args: + ctx: The execution context + headers: Dict-like object containing trace headers + """ + from ddtrace.contrib.internal.trace_utils import activate_distributed_headers + + pin = ctx.get_item("pin") + if not pin: + return + + activate_distributed_headers( + pin.tracer, + int_config=self.integration_config, + request_headers=headers, + ) diff --git a/ddtrace/_trace/tracing_plugins/base/outbound.py b/ddtrace/_trace/tracing_plugins/base/outbound.py new file mode 100644 index 00000000000..25a64ccceba --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/outbound.py @@ -0,0 +1,163 @@ +""" +OutboundPlugin - Base for all outgoing connections to external services. + +This plugin handles: +- Peer service resolution and tagging +- Host/port tagging +- Span finishing with peer service metadata + +Extend this for any integration that connects TO an external service +(databases, HTTP clients, message producers, external APIs, etc.) +""" + +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin + + +if TYPE_CHECKING: + from ddtrace._trace.span import Span + from ddtrace.internal.core import ExecutionContext + + +class OutboundPlugin(TracingPlugin): + """ + Base plugin for outbound connections. + + Connections TO external services: + - Database queries + - HTTP client requests + - Message queue producers + - External API calls + + Provides: + - Peer service resolution + - Host/port tagging utilities + - Automatic peer service tagging on span finish + """ + + type = "outbound" + + def on_finish( + self, + ctx: "ExecutionContext", + exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]], + ) -> None: + """ + Finish span with peer service tagging. + + Sets peer.service tag if not already set, then calls parent finish. + """ + span = ctx.span + if not span: + return + + # Set peer service if not already set + self._set_peer_service(ctx, span) + + # Call parent to handle error and finish + super().on_finish(ctx, exc_info) + + def _set_peer_service(self, ctx: "ExecutionContext", span: "Span") -> None: + """ + Determine and set peer.service tag. + + Only sets if peer.service is not already present. + """ + from ddtrace import config + + # Skip if peer service computation is disabled + if not getattr(config, "_peer_service_computation_enabled", True): + return + + # Skip if already set + if span.get_tag("peer.service"): + return + + # Try to derive from standard tags + peer_service = self._get_peer_service(ctx, span) + if peer_service: + span._set_tag_str("peer.service", peer_service) + + # Check for peer service remapping + remapped = self._get_peer_service_remap(peer_service) + if remapped and remapped != peer_service: + span._set_tag_str("peer.service", remapped) + span._set_tag_str("_dd.peer.service.source", "peer.service") + + def _get_peer_service(self, ctx: "ExecutionContext", span: "Span") -> Optional[str]: + """ + Get peer service from available tags. + + Priority: + 1. net.peer.name + 2. out.host + 3. net.target.host / server.address + 4. db.name (for databases) + 5. messaging.destination (for messaging) + + Override in subclasses for custom logic. + """ + from ddtrace.ext import net + + # Check standard network tags in priority order + precursor_tags = [ + "net.peer.name", + "out.host", + net.TARGET_HOST, + net.SERVER_ADDRESS, + "db.name", + "messaging.destination.name", + ] + + for tag in precursor_tags: + value = span.get_tag(tag) + if value: + return str(value) + + return None + + def _get_peer_service_remap(self, peer_service: str) -> Optional[str]: + """ + Apply peer service remapping if configured. + + Checks DD_TRACE_PEER_SERVICE_MAPPING for overrides. + """ + from ddtrace import config + + mapping = getattr(config, "_peer_service_mapping", None) + if mapping and peer_service in mapping: + return mapping[peer_service] + + return peer_service + + def add_host( + self, + span: "Span", + host: Optional[str], + port: Optional[int] = None, + ) -> None: + """ + Tag span with host and port information. + + Sets standard network tags: + - net.target.host + - server.address + - net.target.port + + Args: + span: The span to tag + host: Hostname or IP address + port: Port number (optional) + """ + from ddtrace.ext import net + + if host: + span._set_tag_str(net.TARGET_HOST, host) + span._set_tag_str(net.SERVER_ADDRESS, host) + + if port is not None: + span.set_metric(net.TARGET_PORT, port) diff --git a/ddtrace/_trace/tracing_plugins/base/producer.py b/ddtrace/_trace/tracing_plugins/base/producer.py new file mode 100644 index 00000000000..7bea5a223b2 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/producer.py @@ -0,0 +1,146 @@ +""" +ProducerPlugin - Base for message producers. + +Extends OutboundPlugin since producing messages is an outbound operation +(sending data TO a message broker/queue). +""" + +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + +from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin + + +if TYPE_CHECKING: + from ddtrace._trace.span import Span + from ddtrace._trace.tracing_plugins.base.events import MessagingContext + from ddtrace.internal.core import ExecutionContext + + +class ProducerPlugin(OutboundPlugin): + """ + Base plugin for message producers. + + Extends OutboundPlugin - producing is sending data OUT to a broker. + + Handles: + - Trace context injection into message headers + - Destination tagging (topic, queue, exchange) + - Message metadata tagging + - Broker host tagging (for peer service) + + Subclasses define: + - package: str (e.g., "kafka", "rabbitmq") + - operation: str (e.g., "produce", "send") + """ + + from ddtrace.ext import SpanKind + + kind = SpanKind.PRODUCER + type = "worker" + + def on_start(self, ctx: "ExecutionContext") -> None: + """ + Create producer span with context injection. + + Reads from ctx: + - pin: The Pin instance + - messaging_context: MessagingContext with destination, headers, etc. + - span_name: Optional override for span name + """ + from ddtrace.constants import _SPAN_MEASURED_KEY + from ddtrace.ext import SpanTypes + from ddtrace.propagation.http import HTTPPropagator + + pin = ctx.get_item("pin") + msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context") + + if not pin or not pin.enabled() or not msg_ctx: + return + + # Determine span name + span_name = ctx.get_item("span_name") or f"{msg_ctx.messaging_system}.produce" + + # Create span + span = self.start_span( + ctx, + span_name, + resource=msg_ctx.destination, + span_type=SpanTypes.WORKER, + ) + + if not span: + return + + # Mark as measured + span.set_metric(_SPAN_MEASURED_KEY, 1) + + # Set messaging tags + self._set_messaging_tags(span, msg_ctx) + + # Add broker host for peer service + if msg_ctx.host: + self.add_host(span, msg_ctx.host, msg_ctx.port) + + # Inject trace context into message headers + if msg_ctx.inject_headers: + headers: dict = {} + HTTPPropagator.inject(span.context, headers) + msg_ctx.inject_headers(headers) + + def on_finish( + self, + ctx: "ExecutionContext", + exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]], + ) -> None: + """ + Finish producer span with message metadata. + + Reads from ctx: + - messaging_context: For any post-produce metadata (message_id, partition) + """ + span = ctx.span + if not span: + return + + msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context") + + # Set any metadata that was determined after produce + if msg_ctx: + if msg_ctx.message_id: + span._set_tag_str("messaging.message.id", msg_ctx.message_id) + if msg_ctx.partition is not None: + span.set_metric("messaging.kafka.partition", msg_ctx.partition) + + # Call parent for peer service and error handling + super().on_finish(ctx, exc_info) + + def _set_messaging_tags(self, span: "Span", msg_ctx: "MessagingContext") -> None: + """Set standard messaging tags.""" + span._set_tag_str("messaging.system", msg_ctx.messaging_system) + span._set_tag_str("messaging.destination.name", msg_ctx.destination) + span._set_tag_str("messaging.operation", "publish") + + if msg_ctx.key: + span._set_tag_str("messaging.kafka.message.key", msg_ctx.key) + + if msg_ctx.batch_size is not None and msg_ctx.batch_size > 1: + span.set_metric("messaging.batch.message_count", msg_ctx.batch_size) + + # Extra tags + if msg_ctx.tags: + span.set_tags(msg_ctx.tags) + + def _get_peer_service(self, ctx: "ExecutionContext", span: "Span") -> Optional[str]: + """ + Get peer service for producer spans. + + For messaging, use the destination (topic/queue) as peer service. + """ + destination = span.get_tag("messaging.destination.name") + if destination: + return str(destination) + + return super()._get_peer_service(ctx, span) diff --git a/ddtrace/_trace/tracing_plugins/base/server.py b/ddtrace/_trace/tracing_plugins/base/server.py new file mode 100644 index 00000000000..fa0e44e8739 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/server.py @@ -0,0 +1,26 @@ +""" +ServerPlugin - Base for server-side request handling. + +Extends InboundPlugin for server operations (HTTP servers, gRPC servers, etc.) +""" + +from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin +from ddtrace.ext import SpanKind + + +class ServerPlugin(InboundPlugin): + """ + Base plugin for server-side request handling. + + Extends InboundPlugin with: + - kind: SERVER (span.kind = "server") + - type: "web" + + Use this as the base for: + - Web framework servers (Flask, Django, FastAPI) + - gRPC servers + - WebSocket servers + """ + + kind = SpanKind.SERVER + type = "web" diff --git a/ddtrace/_trace/tracing_plugins/base/storage.py b/ddtrace/_trace/tracing_plugins/base/storage.py new file mode 100644 index 00000000000..0a52d69e2a5 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/storage.py @@ -0,0 +1,58 @@ +""" +StoragePlugin - Base for storage system clients. + +Extends ClientPlugin for storage backends like databases and caches. +Provides system-based service naming. +""" + +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional + +from ddtrace._trace.tracing_plugins.base.client import ClientPlugin + + +if TYPE_CHECKING: + from ddtrace._trace.span import Span + from ddtrace.internal.core import ExecutionContext + + +class StoragePlugin(ClientPlugin): + """ + Base plugin for storage system clients. + + Extends ClientPlugin with: + - type: "storage" + - System-based service naming: {tracer_service}-{system} + + Use this as the base for: + - Databases (through DatabasePlugin) + - Caches (Redis, Memcached) + - Object stores + """ + + type = "storage" + + # Override in subclasses to set the storage system name + # e.g., "postgresql", "redis", "memcached" + system: Optional[str] = None + + def start_span( + self, + ctx: "ExecutionContext", + name: str, + **options: Any, + ) -> Optional["Span"]: + """ + Create span with system-based service naming. + + If service is not specified and system is set, uses the + integration config's service resolution. + """ + # Use default service resolution if not specified + if "service" not in options: + service = self.get_service(ctx) + if service: + options["service"] = service + + return super().start_span(ctx, name, **options) diff --git a/ddtrace/_trace/tracing_plugins/base/tracing.py b/ddtrace/_trace/tracing_plugins/base/tracing.py new file mode 100644 index 00000000000..09d10d57925 --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/base/tracing.py @@ -0,0 +1,223 @@ +""" +Base tracing plugin class. + +All tracing plugins inherit from TracingPlugin. This provides: +- Event subscription based on package/operation +- Span lifecycle management +- Common utilities for span creation +""" + +from abc import ABC +from abc import abstractmethod +from typing import TYPE_CHECKING +from typing import Any +from typing import Optional +from typing import Tuple + + +if TYPE_CHECKING: + from ddtrace._trace.span import Span + from ddtrace.internal.core import ExecutionContext + + +class TracingPlugin(ABC): + """ + Root base class for all tracing plugins. + + Subclasses define `package` and `operation` to auto-subscribe to events. + The event pattern is: context.started.{package}.{operation} + + Example: + class MyPlugin(TracingPlugin): + package = "mylib" + operation = "execute" + + # This subscribes to: + # - context.started.mylib.execute + # - context.ended.mylib.execute + """ + + # --- Required attributes (override in subclasses) --- + + @property + @abstractmethod + def package(self) -> str: + """ + Package name (e.g., 'asyncpg', 'flask', 'kafka'). + + This should match the integration name used in config. + """ + pass + + @property + @abstractmethod + def operation(self) -> str: + """ + Operation name (e.g., 'execute', 'request', 'produce'). + + Combined with package, forms the event name: {package}.{operation} + """ + pass + + # --- Optional attributes (override as needed) --- + + kind: Optional[str] = None # SpanKind: "client", "server", "producer", "consumer" + type: str = "custom" # Span type category: "web", "sql", "storage", "worker" + + # --- Computed properties --- + + @property + def event_name(self) -> str: + """Full event name: {package}.{operation}""" + return f"{self.package}.{self.operation}" + + @property + def integration_config(self) -> Any: + """ + Get the integration's config from ddtrace.config. + + Returns the config object or empty dict if not found. + """ + from ddtrace import config + + return getattr(config, self.package, {}) + + # --- Registration --- + + def register(self) -> None: + """ + Register this plugin's event handlers with the core system. + + Subscribes to: + - context.started.{event_name} -> on_start + - context.ended.{event_name} -> on_finish + """ + from ddtrace.internal import core + + core.on(f"context.started.{self.event_name}", self._on_started) + core.on(f"context.ended.{self.event_name}", self._on_ended) + + # --- Internal event handlers --- + + def _on_started(self, ctx: "ExecutionContext") -> None: + """Internal handler for context.started event.""" + self.on_start(ctx) + + def _on_ended( + self, + ctx: "ExecutionContext", + exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]], + ) -> None: + """Internal handler for context.ended event.""" + self.on_finish(ctx, exc_info) + + # --- Override these in subclasses --- + + def on_start(self, ctx: "ExecutionContext") -> None: + """ + Called when the traced context starts. + + Override this to create spans and set initial tags. + The span should be stored on ctx.span. + + Args: + ctx: The execution context containing pin, event-specific context, etc. + """ + pass + + def on_finish( + self, + ctx: "ExecutionContext", + exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]], + ) -> None: + """ + Called when the traced context ends. + + Override this to set final tags and finish the span. + Default implementation handles error tagging and span.finish(). + + Args: + ctx: The execution context + exc_info: Tuple of (exc_type, exc_value, traceback) or (None, None, None) + """ + span = ctx.span + if not span: + return + + # Set error info if an exception occurred + if exc_info[0] is not None: + span.set_exc_info(*exc_info) + + span.finish() + + # --- Utility methods --- + + def start_span( + self, + ctx: "ExecutionContext", + name: str, + **options: Any, + ) -> Optional["Span"]: + """ + Create and configure a span with common tags. + + This utility method: + - Gets the tracer from the pin + - Creates a span with the given name and options + - Sets component and span.kind tags + - Stores the span on ctx.span + + Args: + ctx: The execution context (must contain 'pin') + name: Span operation name + **options: Additional options passed to tracer.trace() + (service, resource, span_type, etc.) + + Returns: + The created span, or None if tracing is disabled + """ + from ddtrace.constants import SPAN_KIND + from ddtrace.internal.constants import COMPONENT + + pin = ctx.get_item("pin") + if not pin or not pin.enabled(): + return None + + # Create the span + span = pin.tracer.trace(name, **options) + + # Set common tags + integration_name = self.package + if hasattr(self.integration_config, "integration_name"): + integration_name = self.integration_config.integration_name + + span._set_tag_str(COMPONENT, integration_name) + + if self.kind: + span._set_tag_str(SPAN_KIND, self.kind) + + # Store on context + ctx.span = span + + return span + + def get_service(self, ctx: "ExecutionContext") -> Optional[str]: + """ + Get the service name for this span. + + Default implementation uses ext_service helper. + Override in subclasses for custom service resolution. + + Args: + ctx: The execution context + + Returns: + Service name or None to use default + """ + from ddtrace.contrib.internal.trace_utils import ext_service + + pin = ctx.get_item("pin") + if not pin: + return None + + return ext_service(pin, self.integration_config) diff --git a/ddtrace/_trace/tracing_plugins/contrib/__init__.py b/ddtrace/_trace/tracing_plugins/contrib/__init__.py new file mode 100644 index 00000000000..53c2f80e45c --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/contrib/__init__.py @@ -0,0 +1,6 @@ +""" +Integration-specific tracing plugins. + +Each module in this package provides plugin classes for specific integrations. +Plugins extend base classes and define package/operation to subscribe to events. +""" diff --git a/ddtrace/_trace/tracing_plugins/contrib/asyncpg.py b/ddtrace/_trace/tracing_plugins/contrib/asyncpg.py new file mode 100644 index 00000000000..0199f9d337d --- /dev/null +++ b/ddtrace/_trace/tracing_plugins/contrib/asyncpg.py @@ -0,0 +1,179 @@ +""" +asyncpg integration plugin. + +This module provides tracing plugins for asyncpg (async PostgreSQL driver). + +Plugins: +- AsyncpgExecutePlugin: Handles asyncpg.execute events (queries) +- AsyncpgConnectPlugin: Handles asyncpg.connect events (connections) + +These plugins extend DatabasePlugin and extract all necessary data from +the raw (instance, args, kwargs) passed by the instrumentor. +""" + +from typing import TYPE_CHECKING +from typing import Any +from typing import Dict +from typing import Optional + +from ddtrace._trace.tracing_plugins.base.database import DatabasePlugin + + +if TYPE_CHECKING: + from ddtrace.internal.core import ExecutionContext + + +class AsyncpgExecutePlugin(DatabasePlugin): + """ + Handles asyncpg.execute events. + + Subscribes to: context.started.asyncpg.execute + context.ended.asyncpg.execute + + Extracts: + - Query from args (first positional argument or 'state' kwarg) + - Connection info from protocol instance._connection + + The DatabasePlugin base class handles: + - Span creation with proper naming + - Database tags (db.system, db.name, etc.) + - DBM propagation + - Rowcount tracking + - Error handling + """ + + @property + def package(self) -> str: + return "asyncpg" + + @property + def operation(self) -> str: + return "execute" + + # PostgreSQL system identifiers + system: Optional[str] = "postgresql" + db_system: Optional[str] = "postgresql" + + def on_start(self, ctx: "ExecutionContext") -> None: + """Extract data from raw objects and create span.""" + from ddtrace._trace.tracing_plugins.base.events import DatabaseContext + + pin = ctx.get_item("pin") + if not pin or not pin.enabled(): + return + + # Extract from raw objects passed by instrumentor + instance = ctx.get_item("instance") + args = ctx.get_item("args", ()) + kwargs = ctx.get_item("kwargs", {}) + + # Extract query + query = self._extract_query(args, kwargs) + + # Extract connection info from protocol instance + conn_info = self._extract_connection_info(instance) + + # Build context and set on execution context + db_ctx = DatabaseContext( + db_system=self.db_system or "postgresql", + query=query, + host=conn_info.get("host"), + port=conn_info.get("port"), + user=conn_info.get("user"), + database=conn_info.get("database"), + ) + + ctx.set_item("db_context", db_ctx) + ctx.set_item("resource", query) + + # Let parent create span + super().on_start(ctx) + + def _extract_query(self, args: tuple, kwargs: dict) -> Optional[str]: + """Extract query from execute arguments.""" + state = args[0] if args else kwargs.get("state") + if isinstance(state, (str, bytes)): + return state if isinstance(state, str) else state.decode("utf-8", errors="replace") + # PreparedStatement - get the query attribute + return getattr(state, "query", None) + + def _extract_connection_info(self, instance: Any) -> Dict[str, Any]: + """Extract connection info from Protocol instance.""" + conn = getattr(instance, "_connection", None) + if not conn: + return {} + + addr = getattr(conn, "_addr", None) + params = getattr(conn, "_params", None) + + result: Dict[str, Any] = {} + if addr and isinstance(addr, tuple) and len(addr) >= 2: + result["host"] = addr[0] + result["port"] = addr[1] + if params: + result["user"] = getattr(params, "user", None) + result["database"] = getattr(params, "database", None) + + return result + + +class AsyncpgConnectPlugin(DatabasePlugin): + """ + Handles asyncpg.connect events. + + Subscribes to: context.started.asyncpg.connect + context.ended.asyncpg.connect + + Extracts connection parameters from kwargs passed to connect(). + """ + + @property + def package(self) -> str: + return "asyncpg" + + @property + def operation(self) -> str: + return "connect" + + # PostgreSQL system identifiers + system: Optional[str] = "postgresql" + db_system: Optional[str] = "postgresql" + + def on_start(self, ctx: "ExecutionContext") -> None: + """Extract connection info and create span.""" + from ddtrace._trace.tracing_plugins.base.events import DatabaseContext + + pin = ctx.get_item("pin") + if not pin or not pin.enabled(): + return + + # Extract from raw objects passed by instrumentor + args = ctx.get_item("args", ()) + kwargs = ctx.get_item("kwargs", {}) + + # Extract connection params from connect() arguments + # asyncpg.connect(dsn=None, host=None, port=None, user=None, database=None, ...) + host = kwargs.get("host") or (args[1] if len(args) > 1 else None) + port = kwargs.get("port") or (args[2] if len(args) > 2 else None) + user = kwargs.get("user") or (args[3] if len(args) > 3 else None) + database = kwargs.get("database") or (args[4] if len(args) > 4 else None) + + # Build context + db_ctx = DatabaseContext( + db_system=self.db_system or "postgresql", + query=None, # connect has no query + host=host, + port=port, + user=user, + database=database, + ) + + ctx.set_item("db_context", db_ctx) + ctx.set_item("resource", "connect") + + # Let parent create span + super().on_start(ctx) + + def _get_span_name(self) -> str: + """Override to use connect-specific span name.""" + return "postgres.connect" diff --git a/ddtrace/contrib/auto/__init__.py b/ddtrace/contrib/auto/__init__.py new file mode 100644 index 00000000000..a25ad2f8ffa --- /dev/null +++ b/ddtrace/contrib/auto/__init__.py @@ -0,0 +1,14 @@ +""" +Auto instrumentation package. + +This package contains the new instrumentation API v2 - ultra-minimal +instrumentors that only wrap methods and emit events with raw objects. + +All extraction and tracing logic is handled by plugins in +ddtrace._trace.tracing_plugins. +""" + +from ddtrace.contrib.auto._base import Instrumentor + + +__all__ = ["Instrumentor"] diff --git a/ddtrace/contrib/auto/_base.py b/ddtrace/contrib/auto/_base.py new file mode 100644 index 00000000000..9267831f785 --- /dev/null +++ b/ddtrace/contrib/auto/_base.py @@ -0,0 +1,211 @@ +""" +Instrumentor base class for the new instrumentation API v2. + +Instrumentors are DUMB - they only: +1. Wrap library functions +2. Emit events with raw (instance, args, kwargs) +3. Know supported versions + +All extraction and tracing logic lives in plugins. +""" + +from abc import ABC +from abc import abstractmethod +from typing import Dict +from typing import List +from typing import Tuple + +from ddtrace import config +from ddtrace._trace.pin import Pin +from ddtrace.internal import core + + +class Instrumentor(ABC): + """ + Base class for instrumentors. + + Instrumentors are DUMB - they only: + 1. Wrap library functions + 2. Emit events with raw (instance, args, kwargs) + 3. Know supported versions + + All extraction and tracing logic lives in plugins. + """ + + # --- Required --- + + @property + @abstractmethod + def package(self) -> str: + """Package name (e.g., 'asyncpg').""" + pass + + @property + @abstractmethod + def supported_versions(self) -> Dict[str, str]: + """ + Supported package versions. + + Returns: + Dict mapping package name to version spec. + Example: {"asyncpg": ">=0.23.0"} + """ + pass + + @property + @abstractmethod + def methods_to_wrap(self) -> List[Tuple[str, str]]: + """ + Methods to wrap. + + Returns: + List of (target, operation) tuples. + target: Dot-path to method (e.g., "protocol.Protocol.execute") + operation: Event operation name (e.g., "execute") + + Example: + [ + ("protocol.Protocol.execute", "execute"), + ("protocol.Protocol.query", "query"), + ("connect", "connect"), + ] + """ + pass + + # --- Optional config --- + + @property + def default_config(self) -> Dict: + """Default config values.""" + return {} + + # --- State --- + + _is_instrumented: bool = False + + # --- Public API --- + + def instrument(self) -> None: + """Instrument the library.""" + if self._is_instrumented: + return + + module = self._import_module() + if module is None: + return + + self._register_config() + Pin(_config=config._get_config(self.package)).onto(module) + + for target, operation in self.methods_to_wrap: + self._wrap_method(module, target, operation) + + module._datadog_patch = True + self._is_instrumented = True + + def uninstrument(self) -> None: + """Remove instrumentation.""" + if not self._is_instrumented: + return + + module = self._import_module() + if module is None: + return + + for target, _ in self.methods_to_wrap: + self._unwrap_method(module, target) + + module._datadog_patch = False + self._is_instrumented = False + + # --- Internal --- + + def _import_module(self): + try: + from importlib import import_module + + return import_module(self.package) + except ImportError: + return None + + def _register_config(self): + if not hasattr(config, self.package): + config._add(self.package, self.default_config) + + def _wrap_method(self, module, target: str, operation: str): + """Wrap a method to emit events.""" + from wrapt import wrap_function_wrapper + + parts = target.split(".") + if len(parts) == 1: + # Top-level function + wrap_function_wrapper(module, target, self._make_wrapper(operation)) + else: + # Nested: module.Class.method or module.submodule.Class.method + obj = module + for part in parts[:-1]: + obj = getattr(obj, part) + wrap_function_wrapper(obj, parts[-1], self._make_wrapper(operation)) + + def _unwrap_method(self, module, target: str): + from ddtrace.contrib.internal.trace_utils import unwrap + + parts = target.split(".") + if len(parts) == 1: + unwrap(module, target) + else: + obj = module + for part in parts[:-1]: + obj = getattr(obj, part) + unwrap(obj, parts[-1]) + + def _make_wrapper(self, operation: str): + """Create wrapper that emits event with raw objects.""" + pkg = self.package + + def wrapper(wrapped, instance, args, kwargs): + pin = Pin.get_from(instance) or Pin.get_from(self._import_module()) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + # Emit event with raw objects - plugin extracts what it needs + event_name = f"{pkg}.{operation}" + with ( + core.context_with_data( + event_name, + pin=pin, + instance=instance, + args=args, + kwargs=kwargs, + call=wrapped, + ) as ctx, + ctx.span, + ): + return wrapped(*args, **kwargs) + + return wrapper + + def _make_async_wrapper(self, operation: str): + """Create async wrapper.""" + pkg = self.package + + async def wrapper(wrapped, instance, args, kwargs): + pin = Pin.get_from(instance) or Pin.get_from(self._import_module()) + if not pin or not pin.enabled(): + return await wrapped(*args, **kwargs) + + event_name = f"{pkg}.{operation}" + with ( + core.context_with_data( + event_name, + pin=pin, + instance=instance, + args=args, + kwargs=kwargs, + call=wrapped, + ) as ctx, + ctx.span, + ): + return await wrapped(*args, **kwargs) + + return wrapper diff --git a/ddtrace/contrib/auto/asyncpg/__init__.py b/ddtrace/contrib/auto/asyncpg/__init__.py new file mode 100644 index 00000000000..3c1c43a036a --- /dev/null +++ b/ddtrace/contrib/auto/asyncpg/__init__.py @@ -0,0 +1,48 @@ +""" +asyncpg instrumentation. + +Ultra-minimal instrumentor that wraps asyncpg methods and emits events +with raw (instance, args, kwargs). All extraction and tracing is handled +by the asyncpg plugin in ddtrace._trace.tracing_plugins.contrib.asyncpg. +""" + +from ddtrace.contrib.auto._base import Instrumentor + + +class AsyncpgInstrumentor(Instrumentor): + """asyncpg instrumentation.""" + + package = "asyncpg" + + supported_versions = {"asyncpg": ">=0.23.0"} + + methods_to_wrap = [ + ("protocol.Protocol.execute", "execute"), + ("protocol.Protocol.bind_execute", "execute"), + ("protocol.Protocol.query", "execute"), + ("protocol.Protocol.bind_execute_many", "execute"), + ] + + default_config = { + "_default_service": "postgres", + } + + def _make_wrapper(self, operation): + # Override to use async wrapper for asyncpg + return self._make_async_wrapper(operation) + + +# Module-level API +_instrumentor = AsyncpgInstrumentor() + + +def patch(): + _instrumentor.instrument() + + +def unpatch(): + _instrumentor.uninstrument() + + +def get_versions(): + return _instrumentor.supported_versions diff --git a/docs/design/instrumentation-api-v2.md b/docs/design/instrumentation-api-v2.md new file mode 100644 index 00000000000..d5e1eb01e7c --- /dev/null +++ b/docs/design/instrumentation-api-v2.md @@ -0,0 +1,445 @@ +# Instrumentation API v2 - Design Document + +## Overview + +**Ultra-minimal instrumentation side** - just wrap and emit raw objects. The plugin handles all extraction and tracing. + +``` +┌─────────────────────────────────────┐ ┌─────────────────────────────────────┐ +│ INSTRUMENTOR │ │ PLUGIN │ +│ (dumb wrapper) │ │ (all the smarts) │ +├─────────────────────────────────────┤ ├─────────────────────────────────────┤ +│ • Wraps library functions │ │ • Extracts connection info │ +│ • Passes (instance, args, kwargs) │────▶│ • Extracts query/request data │ +│ • Knows supported versions │ │ • Creates spans │ +│ • That's it! │ │ • Sets tags, handles errors │ +└─────────────────────────────────────┘ └─────────────────────────────────────┘ +``` + +--- + +## Instrumentor API + +### Base Class + +```python +# ddtrace/contrib/auto/_base.py + +from abc import ABC, abstractmethod +from typing import Dict, List, Tuple +from ddtrace import config +from ddtrace._trace.pin import Pin +from ddtrace.internal import core + + +class Instrumentor(ABC): + """ + Base class for instrumentors. + + Instrumentors are DUMB - they only: + 1. Wrap library functions + 2. Emit events with raw (instance, args, kwargs) + 3. Know supported versions + + All extraction and tracing logic lives in plugins. + """ + + # --- Required --- + + @property + @abstractmethod + def package(self) -> str: + """Package name (e.g., 'asyncpg').""" + pass + + @property + @abstractmethod + def supported_versions(self) -> Dict[str, str]: + """ + Supported package versions. + + Returns: + Dict mapping package name to version spec. + Example: {"asyncpg": ">=0.23.0"} + """ + pass + + @property + @abstractmethod + def methods_to_wrap(self) -> List[Tuple[str, str]]: + """ + Methods to wrap. + + Returns: + List of (target, operation) tuples. + target: Dot-path to method (e.g., "protocol.Protocol.execute") + operation: Event operation name (e.g., "execute") + + Example: + [ + ("protocol.Protocol.execute", "execute"), + ("protocol.Protocol.query", "query"), + ("connect", "connect"), + ] + """ + pass + + # --- Optional config --- + + @property + def default_config(self) -> Dict: + """Default config values.""" + return {} + + # --- State --- + + _is_instrumented: bool = False + + # --- Public API --- + + def instrument(self) -> None: + """Instrument the library.""" + if self._is_instrumented: + return + + module = self._import_module() + if module is None: + return + + self._register_config() + Pin(_config=config[self.package]).onto(module) + + for target, operation in self.methods_to_wrap: + self._wrap_method(module, target, operation) + + module._datadog_patch = True + self._is_instrumented = True + + def uninstrument(self) -> None: + """Remove instrumentation.""" + if not self._is_instrumented: + return + + module = self._import_module() + if module is None: + return + + for target, _ in self.methods_to_wrap: + self._unwrap_method(module, target) + + module._datadog_patch = False + self._is_instrumented = False + + # --- Internal --- + + def _import_module(self): + try: + from importlib import import_module + return import_module(self.package) + except ImportError: + return None + + def _register_config(self): + if not hasattr(config, self.package): + config._add(self.package, self.default_config) + + def _wrap_method(self, module, target: str, operation: str): + """Wrap a method to emit events.""" + from wrapt import wrap_function_wrapper + + parts = target.split(".") + if len(parts) == 1: + # Top-level function + wrap_function_wrapper(module, target, self._make_wrapper(operation)) + else: + # Nested: module.Class.method or module.submodule.Class.method + obj = module + for part in parts[:-1]: + obj = getattr(obj, part) + wrap_function_wrapper(obj, parts[-1], self._make_wrapper(operation)) + + def _unwrap_method(self, module, target: str): + from ddtrace.contrib.internal.trace_utils import unwrap + + parts = target.split(".") + if len(parts) == 1: + unwrap(module, target) + else: + obj = module + for part in parts[:-1]: + obj = getattr(obj, part) + unwrap(obj, parts[-1]) + + def _make_wrapper(self, operation: str): + """Create wrapper that emits event with raw objects.""" + pkg = self.package + + def wrapper(wrapped, instance, args, kwargs): + pin = Pin.get_from(instance) or Pin.get_from(self._import_module()) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + # Emit event with raw objects - plugin extracts what it needs + event_name = f"{pkg}.{operation}" + with core.context_with_data( + event_name, + pin=pin, + instance=instance, + args=args, + kwargs=kwargs, + call=wrapped, + ) as ctx, ctx.span: + return wrapped(*args, **kwargs) + + return wrapper + + def _make_async_wrapper(self, operation: str): + """Create async wrapper.""" + pkg = self.package + + async def wrapper(wrapped, instance, args, kwargs): + pin = Pin.get_from(instance) or Pin.get_from(self._import_module()) + if not pin or not pin.enabled(): + return await wrapped(*args, **kwargs) + + event_name = f"{pkg}.{operation}" + with core.context_with_data( + event_name, + pin=pin, + instance=instance, + args=args, + kwargs=kwargs, + call=wrapped, + ) as ctx, ctx.span: + return await wrapped(*args, **kwargs) + + return wrapper +``` + +--- + +## Example: asyncpg Instrumentor + +The instrumentor is now trivially simple: + +```python +# ddtrace/contrib/auto/asyncpg/__init__.py + +from ddtrace.contrib.auto._base import Instrumentor + + +class AsyncpgInstrumentor(Instrumentor): + """asyncpg instrumentation.""" + + package = "asyncpg" + + supported_versions = {"asyncpg": ">=0.23.0"} + + methods_to_wrap = [ + ("protocol.Protocol.execute", "execute"), + ("protocol.Protocol.bind_execute", "execute"), + ("protocol.Protocol.query", "execute"), + ("protocol.Protocol.bind_execute_many", "execute"), + ] + + default_config = { + "_default_service": "postgres", + } + + def _make_wrapper(self, operation): + # Override to use async wrapper + return self._make_async_wrapper(operation) + + +# Module-level API +_instrumentor = AsyncpgInstrumentor() + + +def patch(): + _instrumentor.instrument() + + +def unpatch(): + _instrumentor.uninstrument() + + +def get_versions(): + return _instrumentor.supported_versions +``` + +**That's ~25 lines for the entire instrumentor!** + +--- + +## Updated Plugin (extracts from raw objects) + +The plugin now does all the extraction: + +```python +# ddtrace/_trace/tracing_plugins/contrib/asyncpg.py + +from typing import Any, Dict, Optional, Tuple +from ddtrace._trace.tracing_plugins.base.database import DatabasePlugin + + +class AsyncpgExecutePlugin(DatabasePlugin): + """ + Handles asyncpg.execute events. + + Extracts connection info and query from raw instance/args. + """ + + @property + def package(self) -> str: + return "asyncpg" + + @property + def operation(self) -> str: + return "execute" + + system = "postgresql" + db_system = "postgresql" + + def on_start(self, ctx) -> None: + """Extract data from raw objects and create span.""" + from ddtrace._trace.tracing_plugins.base.events import DatabaseContext + + pin = ctx.get_item("pin") + if not pin or not pin.enabled(): + return + + # Extract from raw objects passed by instrumentor + instance = ctx.get_item("instance") + args = ctx.get_item("args", ()) + kwargs = ctx.get_item("kwargs", {}) + + # Extract query + query = self._extract_query(args, kwargs) + + # Extract connection info from protocol instance + conn_info = self._extract_connection_info(instance) + + # Build context and call parent + db_ctx = DatabaseContext( + db_system=self.db_system, + query=query, + host=conn_info.get("host"), + port=conn_info.get("port"), + user=conn_info.get("user"), + database=conn_info.get("database"), + ) + + ctx.set_item("db_context", db_ctx) + ctx.set_item("resource", query) + + # Let parent create span + super().on_start(ctx) + + def _extract_query(self, args: tuple, kwargs: dict) -> Optional[str]: + """Extract query from execute arguments.""" + state = args[0] if args else kwargs.get("state") + if isinstance(state, (str, bytes)): + return state if isinstance(state, str) else state.decode("utf-8", errors="replace") + # PreparedStatement + return getattr(state, "query", None) + + def _extract_connection_info(self, instance) -> Dict[str, Any]: + """Extract connection info from Protocol instance.""" + conn = getattr(instance, "_connection", None) + if not conn: + return {} + + addr = getattr(conn, "_addr", None) + params = getattr(conn, "_params", None) + + result = {} + if addr and isinstance(addr, tuple) and len(addr) >= 2: + result["host"] = addr[0] + result["port"] = addr[1] + if params: + result["user"] = getattr(params, "user", None) + result["database"] = getattr(params, "database", None) + + return result +``` + +--- + +## Benefits of This Approach + +| Aspect | Benefit | +|--------|---------| +| **Instrumentor simplicity** | Just a list of methods to wrap | +| **No duplication** | Extraction logic only in plugin | +| **Testable** | Plugin can be tested with mock objects | +| **Flexible** | Plugin can extract different data for different needs | +| **Library-agnostic instrumentor** | Almost boilerplate | + +--- + +## Directory Structure + +``` +ddtrace/ +├── contrib/ +│ └── auto/ +│ ├── __init__.py +│ ├── _base.py # Instrumentor base class (~80 lines) +│ │ +│ ├── asyncpg/ +│ │ └── __init__.py # ~25 lines +│ ├── psycopg/ +│ │ └── __init__.py # ~25 lines +│ ├── httpx/ +│ │ └── __init__.py # ~25 lines +│ └── kafka/ +│ └── __init__.py # ~30 lines +│ +└── _trace/ + └── tracing_plugins/ + ├── base/ # Base plugins (already implemented) + └── contrib/ + ├── asyncpg.py # Extraction + tracing logic + ├── psycopg.py + ├── httpx.py + └── kafka.py +``` + +--- + +## Flow Summary + +``` +1. User calls: await conn.execute("SELECT * FROM users") + +2. Instrumentor wrapper: + - Gets pin + - Emits: core.context_with_data("asyncpg.execute", + instance=protocol, args=("SELECT...",), kwargs={}) + +3. Plugin.on_start(): + - Extracts query from args[0] + - Extracts host/port/user/db from instance._connection + - Creates DatabaseContext + - Calls super().on_start() → creates span with tags + +4. Original method runs + +5. Plugin.on_finish(): + - Sets rowcount if available + - Handles errors + - Finishes span +``` + +--- + +## Comparison + +| | Old Pattern | New Instrumentor | New Plugin | +|---|-------------|------------------|------------| +| Lines of code | ~200 | ~25 | ~50 | +| Extracts data | Yes | No | Yes | +| Creates spans | Yes | No | Yes | +| Handles errors | Yes | No | Yes | +| Library knowledge | Yes | Minimal (just method names) | Yes | +| Tracing knowledge | Yes | No | Yes | diff --git a/docs/design/integration-system-v2.md b/docs/design/integration-system-v2.md new file mode 100644 index 00000000000..7b09d699645 --- /dev/null +++ b/docs/design/integration-system-v2.md @@ -0,0 +1,1165 @@ +# Integration System v2 - Design Document + +## Overview + +A clean integration architecture for dd-trace-py modeled after dd-trace-js: +1. **Publisher Side** (Patch): Integration code emits events as `{pkg_name}.{func_name}` +2. **Subscriber Side** (Trace): Hierarchical plugin system with category-based base classes + +**Directory Structure:** +- `ddtrace/contrib/auto/` - New clean integrations (publisher side) +- `ddtrace/_trace/tracing_plugins/base/` - Base plugin classes +- `ddtrace/_trace/tracing_plugins/contrib/` - Integration-specific plugins + +--- + +## Plugin Hierarchy + +``` +TracingPlugin (base) +│ +├── OutboundPlugin (connections TO external services) +│ │ - peer service resolution +│ │ - host/port tagging +│ │ - span finishing with peer service tags +│ │ +│ ├── ClientPlugin (kind: CLIENT) +│ │ │ - type: "web" (default) +│ │ │ +│ │ └── StoragePlugin (type: "storage") +│ │ │ - system-based service naming +│ │ │ +│ │ └── DatabasePlugin +│ │ - DBM propagation +│ │ - query tagging +│ │ - rowcount tracking +│ │ +│ └── ProducerPlugin (kind: PRODUCER) +│ - type: "worker" +│ - trace context injection +│ - destination tagging +│ +└── InboundPlugin (connections FROM external sources) + │ - distributed context extraction + │ - parent store binding + │ + ├── ServerPlugin (kind: SERVER) + │ │ - type: "web" + │ │ + │ └── RouterPlugin + │ - route tagging + │ - path parameter extraction + │ + └── ConsumerPlugin (kind: CONSUMER) + - type: "worker" + - trace context extraction + - message metadata tagging +``` + +--- + +## Part 1: Directory Structure + +``` +ddtrace/ +├── contrib/ +│ ├── internal/ # Existing integrations (legacy) +│ │ └── ... +│ │ +│ └── auto/ # NEW: Clean integrations (publisher side) +│ ├── __init__.py +│ ├── asyncpg/ +│ │ ├── __init__.py +│ │ └── patch.py # Emits asyncpg.execute, asyncpg.connect +│ ├── psycopg/ +│ │ └── patch.py # Emits psycopg.execute +│ ├── httpx/ +│ │ └── patch.py # Emits httpx.send +│ ├── flask/ +│ │ └── patch.py # Emits flask.request +│ └── kafka/ +│ └── patch.py # Emits kafka.produce, kafka.consume +│ +└── _trace/ + └── tracing_plugins/ + ├── __init__.py # Plugin registration + │ + ├── base/ # Base plugin classes + │ ├── __init__.py + │ ├── tracing.py # TracingPlugin (root base class) + │ ├── outbound.py # OutboundPlugin + │ ├── client.py # ClientPlugin + │ ├── storage.py # StoragePlugin + │ ├── database.py # DatabasePlugin + │ ├── producer.py # ProducerPlugin (extends Outbound) + │ ├── inbound.py # InboundPlugin + │ ├── server.py # ServerPlugin + │ ├── router.py # RouterPlugin + │ ├── consumer.py # ConsumerPlugin (extends Inbound) + │ └── events.py # Event context types + │ + └── contrib/ # Integration-specific plugins + ├── __init__.py + ├── asyncpg.py # AsyncpgPlugin(DatabasePlugin) + ├── psycopg.py # PsycopgPlugin(DatabasePlugin) + ├── mysql.py # MySQLPlugin(DatabasePlugin) + ├── httpx.py # HttpxPlugin(ClientPlugin) + ├── aiohttp.py # AiohttpPlugin(ClientPlugin) + ├── flask.py # FlaskPlugin(RouterPlugin) + ├── django.py # DjangoPlugin(RouterPlugin) + └── kafka.py # KafkaProducerPlugin, KafkaConsumerPlugin +``` + +--- + +## Part 2: Base Plugin Classes + +### Location: `ddtrace/_trace/tracing_plugins/base/` + +### `tracing.py` - Root Base Class + +```python +""" +Base tracing plugin class. + +All plugins inherit from this. Provides core subscription and span lifecycle. +""" +from abc import ABC, abstractmethod +from typing import Any, Tuple, Optional +from ddtrace.internal import core + + +class TracingPlugin(ABC): + """ + Root base class for all tracing plugins. + + Subclasses define `package` and `operation` to auto-subscribe to events. + The event pattern is: context.started.{package}.{operation} + """ + + # --- Required attributes (override in subclasses) --- + + @property + @abstractmethod + def package(self) -> str: + """Package name (e.g., 'asyncpg', 'flask').""" + pass + + @property + @abstractmethod + def operation(self) -> str: + """Operation name (e.g., 'execute', 'request').""" + pass + + # --- Optional attributes (override as needed) --- + + kind: Optional[str] = None # SpanKind: "client", "server", "producer", "consumer" + type: str = "custom" # Span type category: "web", "sql", "storage", etc. + + # --- Computed properties --- + + @property + def event_name(self) -> str: + """Full event name: {package}.{operation}""" + return f"{self.package}.{self.operation}" + + @property + def integration_config(self): + """Get the integration's config.""" + from ddtrace import config + return config.get(self.package, {}) + + # --- Registration --- + + def register(self) -> None: + """Register this plugin's event handlers.""" + core.on(f"context.started.{self.event_name}", self._on_started) + core.on(f"context.ended.{self.event_name}", self._on_ended) + + # --- Event handlers --- + + def _on_started(self, ctx: core.ExecutionContext) -> None: + """Internal: called when context starts.""" + self.on_start(ctx) + + def _on_ended(self, ctx: core.ExecutionContext, exc_info: Tuple) -> None: + """Internal: called when context ends.""" + self.on_finish(ctx, exc_info) + + # --- Override these in subclasses --- + + def on_start(self, ctx: core.ExecutionContext) -> None: + """Called when the traced context starts. Creates span.""" + pass + + def on_finish(self, ctx: core.ExecutionContext, exc_info: Tuple) -> None: + """Called when the traced context ends. Finishes span.""" + span = ctx.span + if not span: + return + + if exc_info[0]: + span.set_exc_info(*exc_info) + + span.finish() + + # --- Utility methods --- + + def start_span(self, ctx: core.ExecutionContext, name: str, **options) -> Any: + """Create and configure a span.""" + pin = ctx.get_item("pin") + if not pin or not pin.enabled(): + return None + + from ddtrace.constants import SPAN_KIND + from ddtrace.internal.constants import COMPONENT + + span = pin.tracer.trace(name, **options) + + # Set common tags + span._set_tag_str(COMPONENT, self.integration_config.get("integration_name", self.package)) + if self.kind: + span._set_tag_str(SPAN_KIND, self.kind) + + ctx.span = span + return span +``` + +### `outbound.py` - Outbound Connections + +```python +""" +OutboundPlugin - Base for all outgoing connections to external services. + +Handles: +- Peer service resolution +- Host/port tagging +- Span finishing with peer service tags +""" +from typing import Optional, Tuple +from ddtrace.internal import core +from ddtrace.ext import net +from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin + + +class OutboundPlugin(TracingPlugin): + """ + Base plugin for outbound connections (DB, HTTP client, messaging produce). + + Provides peer service resolution and host tagging. + """ + + type = "outbound" + + def on_finish(self, ctx: core.ExecutionContext, exc_info: Tuple) -> None: + """Finish span with peer service tagging.""" + span = ctx.span + if not span: + return + + # Set peer service if not already set + self._set_peer_service(ctx, span) + + # Call parent to handle error and finish + super().on_finish(ctx, exc_info) + + def _set_peer_service(self, ctx: core.ExecutionContext, span) -> None: + """Determine and set peer service tag.""" + from ddtrace import config + + # Skip if already set + if span.get_tag("peer.service"): + return + + # Try to derive from standard tags + peer_service = self._get_peer_service(ctx, span) + if peer_service: + span._set_tag_str("peer.service", peer_service) + + def _get_peer_service(self, ctx: core.ExecutionContext, span) -> Optional[str]: + """ + Get peer service from available tags. + + Priority: + 1. Explicit peer.service tag + 2. net.peer.name + 3. out.host / net.target.host + 4. Service-specific derivation + """ + # Check standard network tags + for tag in ("net.peer.name", "out.host", net.TARGET_HOST): + value = span.get_tag(tag) + if value: + return value + + # Let subclasses override + return None + + def add_host(self, span, host: Optional[str], port: Optional[int] = None) -> None: + """Tag span with host and port info.""" + if host: + span._set_tag_str(net.TARGET_HOST, host) + span._set_tag_str(net.SERVER_ADDRESS, host) + if port: + span.set_metric(net.TARGET_PORT, port) +``` + +### `client.py` - Client Operations + +```python +""" +ClientPlugin - Base for client-side operations (HTTP clients, etc.) + +Extends OutboundPlugin with client-specific behavior. +""" +from ddtrace.ext import SpanKind +from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin + + +class ClientPlugin(OutboundPlugin): + """ + Base plugin for client-side operations. + + - kind: CLIENT + - type: "web" by default (override for storage) + """ + + kind = SpanKind.CLIENT + type = "web" +``` + +### `storage.py` - Storage Systems + +```python +""" +StoragePlugin - Base for storage system clients (databases, caches, etc.) + +Extends ClientPlugin with storage-specific service naming. +""" +from typing import Any, Optional +from ddtrace.internal import core +from ddtrace._trace.tracing_plugins.base.client import ClientPlugin + + +class StoragePlugin(ClientPlugin): + """ + Base plugin for storage systems (databases, caches). + + Provides system-based service naming: {tracer_service}-{system} + """ + + type = "storage" + + # Override in subclasses + system: Optional[str] = None # e.g., "postgresql", "redis", "mysql" + + def start_span(self, ctx: core.ExecutionContext, name: str, **options) -> Any: + """Create span with system-based service naming.""" + from ddtrace.contrib.internal.trace_utils import ext_service + + pin = ctx.get_item("pin") + + # Default service: {base_service}-{system} + if "service" not in options and self.system: + options["service"] = ext_service(pin, self.integration_config) + + return super().start_span(ctx, name, **options) +``` + +### `database.py` - Database Operations + +```python +""" +DatabasePlugin - Base for all database integrations. + +Extends StoragePlugin with: +- DBM (Database Monitoring) propagation +- Query tagging and truncation +- Rowcount tracking +- Standard DB tags +""" +from typing import Optional, Tuple, Any +from ddtrace.internal import core +from ddtrace.constants import _SPAN_MEASURED_KEY +from ddtrace.ext import SpanTypes, db, net +from ddtrace._trace.tracing_plugins.base.storage import StoragePlugin + + +class DatabasePlugin(StoragePlugin): + """ + Base plugin for database integrations. + + Handles all common database tracing: + - Standard tags (db.system, db.name, db.user, etc.) + - DBM comment propagation + - Query resource tagging + - Rowcount tracking + """ + + type = "sql" + + # Override in subclasses + db_system: Optional[str] = None # "postgresql", "mysql", "sqlite" + + def on_start(self, ctx: core.ExecutionContext) -> None: + """Create database span with standard tags.""" + pin = ctx.get_item("pin") + db_ctx = ctx.get_item("db_context") + + if not pin or not pin.enabled(): + return + + # Get span name + span_name = ctx.get_item("span_name") or self._get_span_name() + + # Create span + span = self.start_span( + ctx, + span_name, + resource=ctx.get_item("resource") or (db_ctx.query if db_ctx else None), + span_type=SpanTypes.SQL, + ) + + if not span: + return + + # Mark as measured + span.set_metric(_SPAN_MEASURED_KEY, 1) + + # Set database tags + if db_ctx: + self._set_database_tags(span, db_ctx) + + # Pin tags + if pin.tags: + span.set_tags(pin.tags) + + # DBM propagation + if db_ctx and db_ctx.dbm_propagator: + self._apply_dbm_propagation(ctx, span, db_ctx) + + def on_finish(self, ctx: core.ExecutionContext, exc_info: Tuple) -> None: + """Finish span with rowcount.""" + span = ctx.span + if not span: + return + + # Set rowcount if available + rowcount = ctx.get_item("rowcount") + if rowcount is not None: + span.set_metric(db.ROWCOUNT, rowcount) + if isinstance(rowcount, int) and rowcount >= 0: + span.set_tag(db.ROWCOUNT, rowcount) + + super().on_finish(ctx, exc_info) + + def _get_span_name(self) -> str: + """Get default span name for this database.""" + from ddtrace.internal.schema import schematize_database_operation + + system = self.db_system or self.system or "db" + return schematize_database_operation( + f"{system}.query", + database_provider=system + ) + + def _set_database_tags(self, span, db_ctx) -> None: + """Set standard database tags.""" + if db_ctx.db_system: + span._set_tag_str(db.SYSTEM, db_ctx.db_system) + + if db_ctx.host: + self.add_host(span, db_ctx.host, db_ctx.port) + + if db_ctx.user: + span._set_tag_str(db.USER, db_ctx.user) + + if db_ctx.database: + span._set_tag_str(db.NAME, db_ctx.database) + + # Extra tags from context + if db_ctx.tags: + span.set_tags(db_ctx.tags) + + def _apply_dbm_propagation(self, ctx, span, db_ctx) -> None: + """Apply DBM comment injection if configured.""" + if not db_ctx.dbm_propagator: + return + + # Dispatch to DBM handler + result = core.dispatch_with_results( + f"{self.package}.execute", + (self.integration_config, span, ctx.get_item("args", ()), ctx.get_item("kwargs", {})) + ).result + + if result: + _, args, kwargs = result.value + ctx.set_item("modified_args", args) + ctx.set_item("modified_kwargs", kwargs) +``` + +### `producer.py` - Message Producers (extends Outbound) + +```python +""" +ProducerPlugin - Base for message producers. + +Extends OutboundPlugin since producing messages is an outbound operation. +""" +from typing import Tuple +from ddtrace.internal import core +from ddtrace.constants import _SPAN_MEASURED_KEY +from ddtrace.ext import SpanKind, SpanTypes +from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin + + +class ProducerPlugin(OutboundPlugin): + """ + Base plugin for message producers. + + Extends OutboundPlugin - producing is sending data OUT. + + Handles: + - Trace context injection into message headers + - Destination tagging + - Message metadata + """ + + kind = SpanKind.PRODUCER + type = "worker" + + def on_start(self, ctx: core.ExecutionContext) -> None: + """Create producer span with context injection.""" + from ddtrace.propagation.http import HTTPPropagator + + pin = ctx.get_item("pin") + msg_ctx = ctx.get_item("messaging_context") + + if not pin or not pin.enabled() or not msg_ctx: + return + + span_name = f"{msg_ctx.messaging_system}.produce" + + span = self.start_span( + ctx, + span_name, + resource=msg_ctx.destination, + span_type=SpanTypes.WORKER, + ) + + if not span: + return + + span.set_metric(_SPAN_MEASURED_KEY, 1) + span._set_tag_str("messaging.system", msg_ctx.messaging_system) + span._set_tag_str("messaging.destination.name", msg_ctx.destination) + + # Add host if available (broker address) + if hasattr(msg_ctx, 'host') and msg_ctx.host: + self.add_host(span, msg_ctx.host, getattr(msg_ctx, 'port', None)) + + # Inject trace context into message headers + if msg_ctx.inject_headers: + headers = {} + HTTPPropagator.inject(span.context, headers) + msg_ctx.inject_headers(headers) + + def on_finish(self, ctx: core.ExecutionContext, exc_info: Tuple) -> None: + """Finish producer span with message metadata.""" + span = ctx.span + if not span: + return + + msg_ctx = ctx.get_item("messaging_context") + + if msg_ctx: + if msg_ctx.message_id: + span._set_tag_str("messaging.message.id", msg_ctx.message_id) + if msg_ctx.partition is not None: + span.set_metric("messaging.kafka.partition", msg_ctx.partition) + + super().on_finish(ctx, exc_info) +``` + +### `inbound.py` - Inbound Connections + +```python +""" +InboundPlugin - Base for incoming requests. + +Handles distributed context extraction. +""" +from typing import Tuple +from ddtrace.internal import core +from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin + + +class InboundPlugin(TracingPlugin): + """ + Base plugin for inbound connections (web frameworks, message consumers). + + Handles distributed tracing context extraction. + """ + + type = "inbound" + + def bind_finish(self, ctx: core.ExecutionContext): + """Return parent store for context binding.""" + return ctx.get_item("parent_store") +``` + +### `server.py` - Server Operations + +```python +""" +ServerPlugin - Base for server-side request handling. +""" +from ddtrace.ext import SpanKind +from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin + + +class ServerPlugin(InboundPlugin): + """ + Base plugin for server-side request handling. + + - kind: SERVER + - type: "web" + """ + + kind = SpanKind.SERVER + type = "web" +``` + +### `router.py` - Router Operations + +```python +""" +RouterPlugin - Base for web framework routing. + +Extends ServerPlugin with route tagging. +""" +from typing import Tuple +from ddtrace.internal import core +from ddtrace.ext import http +from ddtrace._trace.tracing_plugins.base.server import ServerPlugin + + +class RouterPlugin(ServerPlugin): + """ + Base plugin for web framework routing. + + Handles: + - Route template tagging + - Path parameter extraction + - Request/response meta tagging + """ + + def on_start(self, ctx: core.ExecutionContext) -> None: + """Create request span with HTTP tags.""" + from ddtrace.constants import _SPAN_MEASURED_KEY + from ddtrace.contrib.internal.trace_utils import set_http_meta + + pin = ctx.get_item("pin") + web_ctx = ctx.get_item("web_context") + + if not pin or not pin.enabled(): + return + + span_name = ctx.get_item("span_name") or f"{self.package}.request" + + span = self.start_span(ctx, span_name) + if not span: + return + + span.set_metric(_SPAN_MEASURED_KEY, 1) + + if web_ctx: + # Set resource as method + route + route = web_ctx.route or web_ctx.path + span.resource = f"{web_ctx.method} {route}" + + # Set HTTP meta + set_http_meta( + span, + self.integration_config, + method=web_ctx.method, + url=web_ctx.url, + route=web_ctx.route, + request_headers=web_ctx.headers, + ) + + def on_finish(self, ctx: core.ExecutionContext, exc_info: Tuple) -> None: + """Finish with response tags.""" + from ddtrace.contrib.internal.trace_utils import set_http_meta + + span = ctx.span + if not span: + return + + web_response = ctx.get_item("web_response") + if web_response: + set_http_meta( + span, + self.integration_config, + status_code=web_response.status_code, + response_headers=web_response.headers, + ) + + super().on_finish(ctx, exc_info) +``` + +### `consumer.py` - Message Consumers (extends Inbound) + +```python +""" +ConsumerPlugin - Base for message consumers. + +Extends InboundPlugin since consuming messages is an inbound operation. +""" +from typing import Tuple +from ddtrace.internal import core +from ddtrace.constants import _SPAN_MEASURED_KEY +from ddtrace.ext import SpanKind, SpanTypes +from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin + + +class ConsumerPlugin(InboundPlugin): + """ + Base plugin for message consumers. + + Extends InboundPlugin - consuming is receiving data IN. + + Handles: + - Trace context extraction from message headers + - Span linking to producer + - Message metadata tagging + """ + + kind = SpanKind.CONSUMER + type = "worker" + + def on_start(self, ctx: core.ExecutionContext) -> None: + """Create consumer span with context extraction.""" + from ddtrace.propagation.http import HTTPPropagator + + pin = ctx.get_item("pin") + msg_ctx = ctx.get_item("messaging_context") + + if not pin or not pin.enabled() or not msg_ctx: + return + + span_name = f"{msg_ctx.messaging_system}.consume" + + span = self.start_span( + ctx, + span_name, + resource=msg_ctx.destination, + span_type=SpanTypes.WORKER, + ) + + if not span: + return + + span.set_metric(_SPAN_MEASURED_KEY, 1) + span._set_tag_str("messaging.system", msg_ctx.messaging_system) + span._set_tag_str("messaging.destination.name", msg_ctx.destination) + + # Extract and link trace context from message headers + if msg_ctx.headers: + distributed_ctx = HTTPPropagator.extract(msg_ctx.headers) + if distributed_ctx: + span.link_span(distributed_ctx) + + def on_finish(self, ctx: core.ExecutionContext, exc_info: Tuple) -> None: + """Finish consumer span with message metadata.""" + span = ctx.span + if not span: + return + + msg_ctx = ctx.get_item("messaging_context") + + if msg_ctx: + if msg_ctx.message_id: + span._set_tag_str("messaging.message.id", msg_ctx.message_id) + if msg_ctx.partition is not None: + span.set_metric("messaging.kafka.partition", msg_ctx.partition) + if msg_ctx.offset is not None: + span.set_metric("messaging.kafka.offset", msg_ctx.offset) + + super().on_finish(ctx, exc_info) +``` + +--- + +## Part 3: Event Context Types + +### Location: `ddtrace/_trace/tracing_plugins/base/events.py` + +```python +""" +Event context types - the contract between patch and subscriber sides. +""" +from dataclasses import dataclass, field +from typing import Any, Dict, Optional, Callable + + +@dataclass +class DatabaseContext: + """Context passed by database integrations.""" + db_system: str # "postgresql", "mysql", "sqlite" + query: Optional[str] = None + host: Optional[str] = None + port: Optional[int] = None + user: Optional[str] = None + database: Optional[str] = None + dbm_propagator: Optional[Any] = None + tags: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class HTTPClientContext: + """Context passed by HTTP client integrations.""" + method: str + url: str + target_host: Optional[str] = None + target_port: Optional[int] = None + headers: Dict[str, str] = field(default_factory=dict) + inject_headers: Optional[Callable[[Dict[str, str]], None]] = None + tags: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class HTTPClientResponseContext: + """Response context for HTTP clients.""" + status_code: int + headers: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class WebContext: + """Context passed by web framework integrations.""" + method: str + url: str + path: str + route: Optional[str] = None + headers: Dict[str, str] = field(default_factory=dict) + query_params: Dict[str, str] = field(default_factory=dict) + path_params: Dict[str, str] = field(default_factory=dict) + tags: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class WebResponseContext: + """Response context for web frameworks.""" + status_code: int + headers: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class MessagingContext: + """Context passed by messaging integrations.""" + messaging_system: str # "kafka", "rabbitmq", "sqs" + destination: str # topic, queue name + operation: str # "produce", "consume" + + # Optional connection info (for peer service) + host: Optional[str] = None + port: Optional[int] = None + + # Message metadata + message_id: Optional[str] = None + partition: Optional[int] = None + offset: Optional[int] = None + + # For distributed tracing + headers: Dict[str, str] = field(default_factory=dict) + inject_headers: Optional[Callable[[Dict[str, str]], None]] = None + + tags: Dict[str, Any] = field(default_factory=dict) +``` + +--- + +## Part 4: Integration-Specific Plugins + +### Location: `ddtrace/_trace/tracing_plugins/contrib/` + +### `asyncpg.py` + +```python +"""asyncpg integration plugin.""" +from ddtrace._trace.tracing_plugins.base.database import DatabasePlugin + + +class AsyncpgExecutePlugin(DatabasePlugin): + """Handles asyncpg.execute events.""" + + package = "asyncpg" + operation = "execute" + system = "postgresql" + db_system = "postgresql" + + +class AsyncpgConnectPlugin(DatabasePlugin): + """Handles asyncpg.connect events.""" + + package = "asyncpg" + operation = "connect" + system = "postgresql" + db_system = "postgresql" + + def _get_span_name(self) -> str: + return "postgres.connect" +``` + +### `psycopg.py` + +```python +"""psycopg integration plugin.""" +from ddtrace._trace.tracing_plugins.base.database import DatabasePlugin + + +class PsycopgPlugin(DatabasePlugin): + """Handles psycopg.execute events.""" + + package = "psycopg" + operation = "execute" + system = "postgresql" + db_system = "postgresql" + + def on_start(self, ctx) -> None: + """Handle psycopg-specific SQL.Composable types.""" + super().on_start(ctx) + + # Convert Composable to string for resource + db_ctx = ctx.get_item("db_context") + if db_ctx and db_ctx.query and hasattr(db_ctx.query, "as_string"): + if ctx.span: + ctx.span.resource = str(db_ctx.query) +``` + +### `httpx.py` + +```python +"""httpx integration plugin.""" +from ddtrace._trace.tracing_plugins.base.client import ClientPlugin +from ddtrace.internal import core +from ddtrace.constants import _SPAN_MEASURED_KEY +from ddtrace.ext import SpanTypes +from ddtrace.propagation.http import HTTPPropagator +from ddtrace.contrib.internal.trace_utils import set_http_meta + + +class HttpxPlugin(ClientPlugin): + """Handles httpx.send events.""" + + package = "httpx" + operation = "send" + + def on_start(self, ctx: core.ExecutionContext) -> None: + """Create HTTP client span.""" + from ddtrace.internal.schema import schematize_url_operation + from ddtrace.internal.schema.span_attribute_schema import SpanDirection + + pin = ctx.get_item("pin") + http_ctx = ctx.get_item("http_context") + + if not pin or not pin.enabled(): + return + + span_name = schematize_url_operation( + "http.request", + protocol="http", + direction=SpanDirection.OUTBOUND + ) + + service = self._get_service(pin, http_ctx) + + span = self.start_span(ctx, span_name, service=service, span_type=SpanTypes.HTTP) + if not span: + return + + span.set_metric(_SPAN_MEASURED_KEY, 1) + + # Add host tags for peer service + if http_ctx: + self.add_host(span, http_ctx.target_host, http_ctx.target_port) + + # Inject distributed tracing headers + if self.integration_config.get("distributed_tracing", True) and http_ctx and http_ctx.inject_headers: + headers = {} + HTTPPropagator.inject(span.context, headers) + http_ctx.inject_headers(headers) + + def on_finish(self, ctx, exc_info) -> None: + """Finish with HTTP meta tags.""" + span = ctx.span + if not span: + return + + http_ctx = ctx.get_item("http_context") + http_response = ctx.get_item("http_response") + + if http_ctx: + set_http_meta( + span, + self.integration_config, + method=http_ctx.method, + url=http_ctx.url, + target_host=http_ctx.target_host, + status_code=http_response.status_code if http_response else None, + request_headers=http_ctx.headers, + response_headers=http_response.headers if http_response else None, + ) + + super().on_finish(ctx, exc_info) + + def _get_service(self, pin, http_ctx): + """Determine service name.""" + from ddtrace.contrib.internal.trace_utils import ext_service + + if self.integration_config.get("split_by_domain") and http_ctx: + service = http_ctx.target_host or "" + if http_ctx.target_port: + service += f":{http_ctx.target_port}" + return service + return ext_service(pin, self.integration_config) +``` + +### `flask.py` + +```python +"""flask integration plugin.""" +from ddtrace._trace.tracing_plugins.base.router import RouterPlugin + + +class FlaskPlugin(RouterPlugin): + """Handles flask.request events.""" + + package = "flask" + operation = "request" +``` + +### `kafka.py` + +```python +"""kafka integration plugins.""" +from ddtrace._trace.tracing_plugins.base.producer import ProducerPlugin +from ddtrace._trace.tracing_plugins.base.consumer import ConsumerPlugin + + +class KafkaProducePlugin(ProducerPlugin): + """Handles kafka.produce events.""" + + package = "kafka" + operation = "produce" + + +class KafkaConsumePlugin(ConsumerPlugin): + """Handles kafka.consume events.""" + + package = "kafka" + operation = "consume" +``` + +--- + +## Part 5: Plugin Registration + +### Location: `ddtrace/_trace/tracing_plugins/__init__.py` + +```python +""" +Integration tracing plugins. + +This module provides the plugin system for tracing integrations. +""" +from typing import List, Type + +# Base classes +from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin + + +# All registered plugins +_plugins: List[TracingPlugin] = [] + + +def register(plugin_cls: Type[TracingPlugin]) -> None: + """Register a plugin class.""" + plugin = plugin_cls() + plugin.register() + _plugins.append(plugin) + + +def initialize() -> None: + """Initialize all v2 integration plugins.""" + # Import contrib plugins + from ddtrace._trace.tracing_plugins.contrib import asyncpg + from ddtrace._trace.tracing_plugins.contrib import psycopg + from ddtrace._trace.tracing_plugins.contrib import httpx + from ddtrace._trace.tracing_plugins.contrib import flask + from ddtrace._trace.tracing_plugins.contrib import kafka + + # Database plugins + register(asyncpg.AsyncpgExecutePlugin) + register(asyncpg.AsyncpgConnectPlugin) + register(psycopg.PsycopgPlugin) + + # HTTP client plugins + register(httpx.HttpxPlugin) + + # Web framework plugins + register(flask.FlaskPlugin) + + # Messaging plugins + register(kafka.KafkaProducePlugin) + register(kafka.KafkaConsumePlugin) +``` + +--- + +## Summary + +### Plugin Hierarchy (Final) + +``` +TracingPlugin +│ +├── OutboundPlugin (TO external services) +│ │ +│ ├── ClientPlugin (kind: CLIENT, type: web) +│ │ └── StoragePlugin (type: storage) +│ │ └── DatabasePlugin (type: sql) +│ │ +│ └── ProducerPlugin (kind: PRODUCER, type: worker) +│ +└── InboundPlugin (FROM external sources) + │ + ├── ServerPlugin (kind: SERVER, type: web) + │ └── RouterPlugin + │ + └── ConsumerPlugin (kind: CONSUMER, type: worker) +``` + +### Adding a New Integration + +1. **Patch side** (`contrib/auto/{pkg}/patch.py`): + - Emit `{pkg}.{operation}` event with appropriate context + - ~30 lines of code + +2. **Plugin side** (`_trace/tracing_plugins/contrib/{pkg}.py`): + - Extend appropriate base class + - Define `package`, `operation` + - Override only what's different + - ~10-20 lines of code + +### Benefits + +| Aspect | Impact | +|--------|--------| +| Consistency | All DBs get same tags, all HTTP clients behave same | +| Clear hierarchy | Outbound vs Inbound split mirrors data flow | +| Code reduction | New integration: ~50 lines vs ~200+ lines | +| Maintainability | Fix DBM once, all DBs get it | +| Testability | Test base classes once, test overrides in isolation | +| AI-friendly | Simple patterns for agentic code generation |