Skip to content

feat(sinks): add dlq logic on sink and support for elasticsearch #24930

Open
tanganellilore wants to merge 1 commit intovectordotdev:masterfrom
tanganellilore:add_dlq
Open

feat(sinks): add dlq logic on sink and support for elasticsearch #24930
tanganellilore wants to merge 1 commit intovectordotdev:masterfrom
tanganellilore:add_dlq

Conversation

@tanganellilore
Copy link

@tanganellilore tanganellilore commented Mar 15, 2026

Summary

This PR introduces a Dead Letter Queue (DLQ) output port for sinks, allowing failed/rejected events to be routed to other components instead of being silently dropped.

The implementation includes:

  • A generic SinkDlq utility (src/sinks/util/dlq.rs) that can be adopted by any sink
  • Full integration with the Elasticsearch sink, which routes non-retryable bulk failures (4xx) and orphaned events to the <sink_id>.dlq output port
  • Each DLQ event is annotated with metadata explaining the failure (reason, status code, error type/reason from Elasticsearch)

The request_retry_partial flag controls behavior: retryable failures (429, 5xx) are retried normally; only truly non-retryable failures are sent to DLQ.

Vector configuration

api:
  enabled: true
sources:
  my_source:
    type: demo_logs
    format: shuffle
    lines:
      - '{"value":1,"tag":"ok"}'
      - '{"value":2,"tag":"ok"}'
      - '{"value":"bad","tag":"bad"}'
      - '{"value":"bad2","tag":"bad"}'
      - '{"value":3,"tag":"bad"}'

transforms:
  my_transform:
    type: remap
    inputs:
      - my_source
    source: |
      . = parse_json!(.message)
  my_dlq_transform:
    type: remap
    inputs:
      - "es_out.dlq"
    source: |
      .enter_dlq = true
sinks:
  es_out:
    type: elasticsearch
    inputs:
      - my_transform
    endpoints:
      - "http://localhost:9200"
    bulk:
      index: "test-index"
  es_dlq_out:
    type: elasticsearch
    inputs:
      - "my_dlq_transform"
    endpoints:
      - "http://localhost:9200"
    bulk:
      index: "test-index-no-mapping"

How did you test this PR?

Unit tests in src/sinks/elasticsearch/service.rs covering:

  • Non-retryable failures (4xx) routed to DLQ
  • Retryable failures (429, 5xx) handled by retry logic
  • Orphaned events (missing from Elasticsearch bulk response) routed to DLQ
  • DLQ behavior with request_retry_partial enabled/disabled

I have tested in two way:

  • Manual end-to-end testing with a local Elasticsearch instance using the configuration above
  • Dev&Test env with vector that process normally 5k e/s and at peak 20k e/s.

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

References

Notes

N/A

@tanganellilore tanganellilore requested a review from a team as a code owner March 15, 2026 10:31
@github-actions github-actions bot added domain: topology Anything related to Vector's topology code domain: sinks Anything related to the Vector's sinks domain: core Anything related to core crates i.e. vector-core, core-common, etc labels Mar 15, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Mar 15, 2026

All contributors have signed the CLA ✍️ ✅
Posted by the CLA Assistant Lite bot.

@tanganellilore
Copy link
Author

I have read the CLA Document and I hereby sign the CLA

@tanganellilore tanganellilore changed the title feat(add dlq): add dla logic on sink and support for elasticsearch feat(add dlq): add dlq logic on sink and support for elasticsearch Mar 15, 2026
@tanganellilore tanganellilore changed the title feat(add dlq): add dlq logic on sink and support for elasticsearch feat(sinks): add dlq logic on sink and support for elasticsearch Mar 15, 2026
@github-actions github-actions bot added the domain: ci Anything related to Vector's CI environment label Mar 15, 2026
@tanganellilore
Copy link
Author

For the reviewer:

I am not a Rust expert, but I am familiar with the logic behind Elasticsearch and DLQs.

I have maintained the original behavior and logic for sinks when the DLQ is disabled. However, I have some concerns regarding the error logs: when an error occurs in the Elastic sink without a DLQ, a log line is emitted multiple times—once when retries are exhausted and again when events are discarded. I am concerned this might be highly verbose for the user.

Generally speaking, I believe the metrics implementation is slightly misleading for Elasticsearch. Currently, if a bulk request fails without a DLQ, vector top behaves as follows:

  • Error Column: +1 (even if only 1 out of 100 events caused the error).
  • Event Column: Does not increment (as the entire bulk is considered failed).

The metrics with DLQ follow this same logic. For example, if mapping errors occur while the DLQ is enabled, events are correctly routed to the DLQ; the Elastic sink does not show an error because the failure is handled by the DLQ forwarder.

I haven't changed this metrics logic in this PR as it would require a significant code refactoring, but I wanted to highlight it: if you test this, you might notice discrepancies that could look like bugs, but they are actually inherited from the current implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: ci Anything related to Vector's CI environment domain: core Anything related to core crates i.e. vector-core, core-common, etc domain: sinks Anything related to the Vector's sinks domain: topology Anything related to Vector's topology code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support dead letter queue on sinks

1 participant