Skip to content

Commit dcd2093

Browse files
committed
overhaul integrations to be cleaner
1 parent ddecdc2 commit dcd2093

File tree

19 files changed

+3457
-0
lines changed

19 files changed

+3457
-0
lines changed
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""
2+
Integration tracing plugins.
3+
4+
This module provides the plugin system for tracing integrations.
5+
6+
The plugin system uses a hierarchical structure:
7+
8+
TracingPlugin (base)
9+
├── OutboundPlugin (TO external services)
10+
│ ├── ClientPlugin (HTTP clients, etc.)
11+
│ │ └── StoragePlugin (storage systems)
12+
│ │ └── DatabasePlugin (databases)
13+
│ └── ProducerPlugin (message producers)
14+
└── InboundPlugin (FROM external sources)
15+
├── ServerPlugin (HTTP servers, etc.)
16+
└── ConsumerPlugin (message consumers)
17+
18+
Usage:
19+
# During ddtrace initialization
20+
from ddtrace._trace.tracing_plugins import initialize
21+
22+
initialize() # Registers all plugins
23+
24+
Adding a new integration:
25+
1. Create a plugin class in contrib/ that extends the appropriate base
26+
2. Define `package` and `operation` properties
27+
3. Register it in this module's initialize() function
28+
"""
29+
30+
from typing import List
31+
from typing import Type
32+
33+
from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin
34+
from ddtrace.internal.logger import get_logger
35+
36+
37+
log = get_logger(__name__)
38+
39+
# All registered plugins
40+
_plugins: List[TracingPlugin] = []
41+
_initialized: bool = False
42+
43+
44+
def register(plugin_cls: Type[TracingPlugin]) -> None:
45+
"""
46+
Register a plugin class.
47+
48+
Creates an instance and registers its event handlers.
49+
50+
Args:
51+
plugin_cls: Plugin class to register
52+
"""
53+
try:
54+
plugin = plugin_cls()
55+
plugin.register()
56+
_plugins.append(plugin)
57+
log.debug("Registered tracing plugin: %s.%s", plugin.package, plugin.operation)
58+
except Exception:
59+
log.warning(
60+
"Failed to register tracing plugin: %s",
61+
plugin_cls.__name__,
62+
exc_info=True,
63+
)
64+
65+
66+
def initialize() -> None:
67+
"""
68+
Initialize all v2 integration plugins.
69+
70+
This should be called during ddtrace initialization.
71+
Safe to call multiple times - only initializes once.
72+
"""
73+
global _initialized
74+
75+
if _initialized:
76+
return
77+
78+
_initialized = True
79+
80+
# Import contrib plugins
81+
from ddtrace._trace.tracing_plugins.contrib import asyncpg
82+
83+
# Database plugins
84+
register(asyncpg.AsyncpgExecutePlugin)
85+
register(asyncpg.AsyncpgConnectPlugin)
86+
87+
# TODO: Add more plugins as integrations are migrated:
88+
# from ddtrace._trace.tracing_plugins.contrib import psycopg
89+
# from ddtrace._trace.tracing_plugins.contrib import mysql
90+
# from ddtrace._trace.tracing_plugins.contrib import httpx
91+
# from ddtrace._trace.tracing_plugins.contrib import flask
92+
# from ddtrace._trace.tracing_plugins.contrib import kafka
93+
94+
log.debug("Initialized %d tracing plugins", len(_plugins))
95+
96+
97+
def get_plugins() -> List[TracingPlugin]:
98+
"""
99+
Get all registered plugins.
100+
101+
Returns:
102+
List of registered plugin instances
103+
"""
104+
return _plugins.copy()
105+
106+
107+
def reset() -> None:
108+
"""
109+
Reset plugin registration.
110+
111+
This is primarily for testing purposes.
112+
"""
113+
global _plugins, _initialized
114+
_plugins = []
115+
_initialized = False
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""
2+
Base tracing plugin classes.
3+
4+
This module provides the hierarchical plugin system for tracing integrations.
5+
"""
6+
7+
from ddtrace._trace.tracing_plugins.base.client import ClientPlugin
8+
from ddtrace._trace.tracing_plugins.base.consumer import ConsumerPlugin
9+
from ddtrace._trace.tracing_plugins.base.database import DatabasePlugin
10+
from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin
11+
from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin
12+
from ddtrace._trace.tracing_plugins.base.producer import ProducerPlugin
13+
from ddtrace._trace.tracing_plugins.base.server import ServerPlugin
14+
from ddtrace._trace.tracing_plugins.base.storage import StoragePlugin
15+
from ddtrace._trace.tracing_plugins.base.tracing import TracingPlugin
16+
17+
18+
__all__ = [
19+
"TracingPlugin",
20+
"OutboundPlugin",
21+
"ClientPlugin",
22+
"StoragePlugin",
23+
"DatabasePlugin",
24+
"InboundPlugin",
25+
"ServerPlugin",
26+
"ProducerPlugin",
27+
"ConsumerPlugin",
28+
]
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
"""
2+
ClientPlugin - Base for client-side operations.
3+
4+
Extends OutboundPlugin for client-type operations like HTTP requests.
5+
This is the base for HTTP clients, gRPC clients, etc.
6+
"""
7+
8+
from ddtrace._trace.tracing_plugins.base.outbound import OutboundPlugin
9+
from ddtrace.ext import SpanKind
10+
11+
12+
class ClientPlugin(OutboundPlugin):
13+
"""
14+
Base plugin for client-side operations.
15+
16+
Extends OutboundPlugin with:
17+
- kind: CLIENT (span.kind = "client")
18+
- type: "web" by default
19+
20+
Use this as the base for:
21+
- HTTP clients (httpx, requests, aiohttp client)
22+
- gRPC clients
23+
- Other RPC clients
24+
"""
25+
26+
kind = SpanKind.CLIENT
27+
type = "web"
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""
2+
ConsumerPlugin - Base for message consumers.
3+
4+
Extends InboundPlugin since consuming messages is an inbound operation
5+
(receiving data FROM a message broker/queue).
6+
"""
7+
8+
from typing import TYPE_CHECKING
9+
from typing import Any
10+
from typing import Optional
11+
from typing import Tuple
12+
13+
from ddtrace._trace.tracing_plugins.base.inbound import InboundPlugin
14+
15+
16+
if TYPE_CHECKING:
17+
from ddtrace._trace.span import Span
18+
from ddtrace._trace.tracing_plugins.base.events import MessagingContext
19+
from ddtrace.internal.core import ExecutionContext
20+
21+
22+
class ConsumerPlugin(InboundPlugin):
23+
"""
24+
Base plugin for message consumers.
25+
26+
Extends InboundPlugin - consuming is receiving data IN from a broker.
27+
28+
Handles:
29+
- Trace context extraction from message headers
30+
- Span linking to producer span
31+
- Destination tagging (topic, queue, exchange)
32+
- Message metadata tagging (offset, partition, etc.)
33+
34+
Subclasses define:
35+
- package: str (e.g., "kafka", "rabbitmq")
36+
- operation: str (e.g., "consume", "receive")
37+
"""
38+
39+
from ddtrace.ext import SpanKind
40+
41+
kind = SpanKind.CONSUMER
42+
type = "worker"
43+
44+
def on_start(self, ctx: "ExecutionContext") -> None:
45+
"""
46+
Create consumer span with context extraction.
47+
48+
Reads from ctx:
49+
- pin: The Pin instance
50+
- messaging_context: MessagingContext with destination, headers, etc.
51+
- span_name: Optional override for span name
52+
"""
53+
from ddtrace.constants import _SPAN_MEASURED_KEY
54+
from ddtrace.ext import SpanTypes
55+
from ddtrace.propagation.http import HTTPPropagator
56+
57+
pin = ctx.get_item("pin")
58+
msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context")
59+
60+
if not pin or not pin.enabled() or not msg_ctx:
61+
return
62+
63+
# Determine span name
64+
span_name = ctx.get_item("span_name") or f"{msg_ctx.messaging_system}.consume"
65+
66+
# Create span
67+
span = self.start_span(
68+
ctx,
69+
span_name,
70+
resource=msg_ctx.destination,
71+
span_type=SpanTypes.WORKER,
72+
)
73+
74+
if not span:
75+
return
76+
77+
# Mark as measured
78+
span.set_metric(_SPAN_MEASURED_KEY, 1)
79+
80+
# Set messaging tags
81+
self._set_messaging_tags(span, msg_ctx)
82+
83+
# Extract and link trace context from message headers
84+
if msg_ctx.headers:
85+
distributed_ctx = HTTPPropagator.extract(msg_ctx.headers)
86+
if distributed_ctx:
87+
span.link_span(distributed_ctx)
88+
89+
def on_finish(
90+
self,
91+
ctx: "ExecutionContext",
92+
exc_info: Tuple[Optional[type], Optional[BaseException], Optional[Any]],
93+
) -> None:
94+
"""
95+
Finish consumer span with message metadata.
96+
97+
Reads from ctx:
98+
- messaging_context: For consume metadata (offset, partition, etc.)
99+
"""
100+
span = ctx.span
101+
if not span:
102+
return
103+
104+
msg_ctx: Optional["MessagingContext"] = ctx.get_item("messaging_context")
105+
106+
# Set consume-specific metadata
107+
if msg_ctx:
108+
if msg_ctx.message_id:
109+
span._set_tag_str("messaging.message.id", msg_ctx.message_id)
110+
if msg_ctx.partition is not None:
111+
span.set_metric("messaging.kafka.partition", msg_ctx.partition)
112+
if msg_ctx.offset is not None:
113+
span.set_metric("messaging.kafka.offset", msg_ctx.offset)
114+
115+
# Call parent for error handling
116+
super().on_finish(ctx, exc_info)
117+
118+
def _set_messaging_tags(self, span: "Span", msg_ctx: "MessagingContext") -> None:
119+
"""Set standard messaging tags."""
120+
span._set_tag_str("messaging.system", msg_ctx.messaging_system)
121+
span._set_tag_str("messaging.destination.name", msg_ctx.destination)
122+
span._set_tag_str("messaging.operation", "receive")
123+
124+
if msg_ctx.key:
125+
span._set_tag_str("messaging.kafka.message.key", msg_ctx.key)
126+
127+
if msg_ctx.batch_size is not None and msg_ctx.batch_size > 1:
128+
span.set_metric("messaging.batch.message_count", msg_ctx.batch_size)
129+
130+
# Extra tags
131+
if msg_ctx.tags:
132+
span.set_tags(msg_ctx.tags)

0 commit comments

Comments
 (0)