An async Rust client for InfluxDB 3 Core and Enterprise.
InfluxDB 3 is the latest generation of the InfluxDB time series engine, built on Apache Arrow and DataFusion. Core is the free, single-node build for recent data and edge workloads; Enterprise adds clustering, high availability, and historical query performance on top of the same engine. Both speak the same HTTP write API and serve queries over Arrow Flight, so this client works against either.
This client is part of the InfluxDB 3 client family and mirrors the feature set of the official Go and Python clients with an idiomatic Rust API.
Requires Rust 1.86 or later. The optional polars feature requires Rust 1.88 or later.
cargo add influxdb3-clientOr add it to Cargo.toml:
[dependencies]
influxdb3-client = "0.1"
tokio = { version = "1", features = ["full"] }The optional polars feature adds DataFrame writes and query-to-DataFrame
conversion:
influxdb3-client = { version = "0.1", features = ["polars"] }A client needs a host, a database, and (usually) an API token. Build the configuration explicitly:
use influxdb3_client::{Client, ClientConfig};
#[tokio::main]
async fn main() -> influxdb3_client::Result<()> {
let client = Client::new(
ClientConfig::builder()
.host("http://localhost:8181")
.token("my-api-token")
.database("sensors")
.build()?,
)
.await?;
Ok(())
}Or read INFLUX_HOST, INFLUX_TOKEN, and INFLUX_DATABASE from the
environment:
let client = influxdb3_client::Client::from_env().await?;Or parse a connection string:
let client = influxdb3_client::Client::from_connection_string(
"https://cluster.example.io/?token=TOKEN&database=mydb",
).await?;The Arrow Flight channel used for queries is opened lazily on the first query, so constructing a client never blocks on query connectivity.
client.write(data) returns a builder. Chain options, then .await. data can
be a line-protocol string, a Vec<Point>, or a polars DataFrame (see below).
use influxdb3_client::{Point, Precision};
let points = vec![
Point::new("temperature")
.tag("location", "office")
.tag("floor", "2")
.field("celsius", 22.5_f64)
.field("humidity", 48_i64)
.field("occupied", true),
];
client.write(points).precision(Precision::Millisecond).await?;client
.write("cpu,host=server01 usage_user=42.3,usage_system=1.2")
.await?;client.write(points)
.precision(Precision::Nanosecond)
.batch_size(10_000) // points per HTTP request
.max_inflight(8) // concurrent in-flight requests
.default_tag("region", "us-east")
.no_sync() // acknowledge before WAL sync
.await?;Large inputs are split into batches and sent as multiple pipelined requests; one batch buffer is held in memory at a time.
For sustained, high-volume writes the throughput levers are batch_size (points
per request), max_inflight (concurrent requests per call), and no_sync()
(acknowledge before the WAL is synced, trading durability for speed).
A single write call serialises its batches on one task. To use more CPU cores
and connections, run several write calls concurrently. Client is cheap to
share, and its HTTP connection pool is reused, so wrap it in an Arc, spread
chunks across tasks, and cap concurrency with a semaphore to keep in-flight
buffers bounded:
use std::sync::Arc;
use tokio::sync::Semaphore;
let client = Arc::new(client);
let gate = Arc::new(Semaphore::new(8)); // cap concurrent writes
for chunk in chunks { // each chunk is a Vec<Point>
let permit = gate.clone().acquire_owned().await.unwrap();
let client = Arc::clone(&client);
tokio::spawn(async move {
let _permit = permit; // released when the write completes
client
.write(chunk)
.batch_size(10_000)
.max_inflight(8)
.no_sync()
.await
});
}To spread load across multiple ingest nodes, put a load balancer in front of the
cluster, or construct one Client per node and distribute chunks across them.
Set max_idle_connections to at least the total number of concurrent requests
you expect.
Writes are idempotent at the (series, timestamp, field) level: writing a point
with the same measurement, tag set, and timestamp overwrites the previous field
values (last write wins). Data deletion and retention are managed at the database
level and are not exposed by this client.
InfluxDB 3 supports both SQL and InfluxQL. Use client.sql(q) or
client.influxql(q); both return a query builder.
let result = client
.sql("SELECT * FROM temperature ORDER BY time DESC LIMIT 10")
.await?;
for row in result {
let row = row?;
let loc = row["location"].as_str().unwrap_or("");
let c = row["celsius"].as_f64().unwrap_or(0.0);
println!("{loc}: {c}");
}InfluxQL is called the same way:
let result = client
.influxql("SELECT MEAN(celsius) FROM temperature WHERE time > now() - 1h")
.await?;let rows = client
.sql("SELECT COUNT(*) AS n FROM cpu WHERE host = $host")
.param("host", "server01")
.await?
.rows()?;
if let Some(r) = rows.first() {
println!("count: {}", r["n"]);
}A QueryResult can be iterated row by row, collected with .rows(), or accessed
as raw Arrow RecordBatches with .record_batches(). A Row is indexed by
column name (row["col"]) or position (row[0]), and yields a Value with
typed accessors (as_f64, as_i64, as_str, as_bool, is_null).
For results too large to hold in memory, stream the Arrow batches:
use futures_util::TryStreamExt;
let mut stream = client.sql("SELECT * FROM temperature").stream().await?;
while let Some(batch) = stream.try_next().await? {
println!("got {} rows", batch.num_rows());
}Transient failures are retried automatically with exponential backoff and full
jitter. Connection errors, timeouts, 429, and 5xx responses are retried;
Retry-After is honoured when present. Deterministic failures (other 4xx, and
partial writes) are never retried. Retrying writes is safe because line-protocol
writes are idempotent.
use influxdb3_client::RetryConfig;
use std::time::Duration;
// Per-request override.
client.write(points)
.retry(RetryConfig { max_retries: 5, base_delay: Duration::from_millis(100), ..RetryConfig::default() })
.await?;
// Disable retries for a single call.
client.write(points).no_retry().await?;Set a default policy for all requests with ClientConfig::builder().retry(...).
When a batch contains invalid lines, the server accepts the valid ones and
reports the rest. This surfaces as Error::PartialWrite, which lists the
rejected lines:
use influxdb3_client::Error;
if let Err(Error::PartialWrite(e)) = client.write(line_protocol).await {
for line_error in &e.line_errors {
eprintln!("line {}: {}", line_error.line, line_error.message);
}
}With the polars feature, write a DataFrame directly and read query results back
as a DataFrame.
use influxdb3_client::write_dataframe::DataFrameWrite;
use polars::prelude::*;
let df = df![
"host" => ["srv1", "srv2"],
"region" => ["us-east", "us-west"],
"cpu_pct" => [42.5_f64, 71.0_f64],
"time" => [1_700_000_000_000_000_000_i64, 1_700_000_001_000_000_000_i64],
]?;
client
.write(
DataFrameWrite::new(&df, "server_metrics")
.tags(&["host", "region"])
.timestamp_column("time"),
)
.await?;
let df_back = client
.sql("SELECT * FROM server_metrics")
.await?
.to_polars()?;File IO lives in your code, not the client: read the file with polars, then hand
the frame to DataFrameWrite. Enable the reader you need on polars in your own
Cargo.toml:
polars = { version = "0.53", features = ["parquet"] } # or "csv"use std::fs::File;
use polars::prelude::*;
let df = ParquetReader::new(File::open("sensors.parquet")?).finish()?;
client
.write(
DataFrameWrite::new(&df, "sensor_data")
.tags(&["host", "region"])
.timestamp_column("time"),
)
.await?;CSV works the same via CsvReadOptions, but its columns infer as strings unless
you supply dtypes; cast the numeric and bool columns before writing or they will
land as string fields.
Runnable examples are in examples/:
quickstart.rs: end-to-end write and query.cloud_dedicated.rs: connecting to InfluxDB Cloud Dedicated.write_dataframe.rs: polars DataFrame write and read-back (requires--features polars).
INFLUX_HOST=http://localhost:8181 INFLUX_TOKEN=token INFLUX_DATABASE=mydb \
cargo run --example quickstartContributions are welcome. To build and check locally:
cargo build
cargo test
cargo clippy --all-targetsThe polars feature is gated behind a flag, so test it separately:
cargo test --features polarsA few conventions to keep in mind:
- Keep
cargo clippy --all-targetsfree of errors. - The
config_testsenv-var tests mutate process environment, so run that file single-threaded if you see collisions:cargo test --test config_tests -- --test-threads=1. - Comments and docs are ASCII only.
Please open an issue to discuss substantial changes before sending a pull request.
MIT