Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
650 changes: 643 additions & 7 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fcore"
version = "0.5.6-dev"
version = "0.5.8-dev"
edition = "2021"
build = "build.rs"

Expand Down Expand Up @@ -36,12 +36,19 @@ serde_json = "1.0"
serde_urlencoded = "0.7"
serde_yaml = "0.9"
sysinfo = { version = "0.33"}
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"postgres",
"uuid",
"chrono"
] }
thiserror = "2.0"
tonic = { version = "0.12", optional = true }
toml = "0.8"
tokio = { version = "1", features = ["full"] }
tokio-postgres = { version="0.7", features=["with-uuid-1", "with-chrono-0_4", "with-serde_json-1"]}
tracing = "0.1"
tracing-appender = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
postgres-types = { version = "0.2", features = ["derive"]}
url = "2"
Expand Down
41 changes: 4 additions & 37 deletions dev/api.requests
Original file line number Diff line number Diff line change
@@ -1,43 +1,10 @@


requests.get('http://localhost:3000/healthcheck')
requests.get('http://localhost:3000/nodes')
requests.get('http://localhost:3000/node/ab514c21-aaaa-bbbb-91f7-32f8cb1ada40')

requests.post('http://localhost:3000/subscription', headers={'Authorization':'Bearer supetsecrettoken','Content-Type':'application/json'},json={'days':10})

requests.post(
'http://localhost:5005/subscription',
headers={
'Authorization': 'Bearer supetsecrettoken',
'Content-Type': 'application/json'
},
json={
"env": "experimental",
"days": 10
}
)


requests.post(
'http://localhost:3000/connection',
headers={
'Authorization': 'Bearer supetsecrettoken',
'Content-Type': 'application/json'
},
json={
"env": "dev",
"proto": "Hysteria2",
"subscription_id": "ac01dba1-1190-4cc6-905b-f4a35f82cfee"
}
)
requests.post('http://localhost:3000/connection', headers={'Authorization': 'Bearer supetsecrettoken', 'Content-Type': 'application/json' },json={"env": "experimental","proto": "VlessTcpReality","subscription_id": "525832b0-9886-4df3-8f10-0c1abb2cf99b"})

requests.post(
'http://localhost:3000/connection',
headers={
'Authorization': 'Bearer supetsecrettoken',
'Content-Type': 'application/json'
},
json={
"env": "premium12345",
"proto": "VlessTcpReality",
"subscription_id": "ac01dba1-1190-4cc6-905b-f4a35f82cfee"
}
)
70 changes: 67 additions & 3 deletions dev/dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ ALTER TABLE nodes
ADD COLUMN node_type node_type NOT NULL DEFAULT 'common';


ALTER TYPE node_type ADD VALUE 'service';
ALTER TYPE node_type ADD VALUE 'agent';


alter table connections drop column "node_id";
alter table connections drop column "wg_pubkey";
Expand All @@ -129,3 +126,70 @@ alter table subscriptions add column limit_bytes bigint;
alter table subscriptions add column downlink_bytes bigint;


alter table inbounds drop column uplink;
alter table inbounds drop column downlink ;
alter table inbounds drop column conn_count;

alter table inbounds drop column wg_pubkey;
alter table inbounds drop column wg_network;



-- TIMESCALE METRICS (SINGLE SOURCE OF TRUTH)

CREATE EXTENSION IF NOT EXISTS timescaledb;

CREATE TABLE node_metrics (
time TIMESTAMPTZ NOT NULL,
node_id UUID NOT NULL,
metric TEXT NOT NULL,
value DOUBLE PRECISION NOT NULL,
labels JSONB NOT NULL
);

SELECT create_hypertable('node_metrics', 'time');

-- индексы
CREATE INDEX idx_node_metrics_time ON node_metrics (time DESC);
CREATE INDEX idx_node_metrics_node ON node_metrics (node_id);
CREATE INDEX idx_node_metrics_metric ON node_metrics (metric);

-- GIN индекс для фильтрации по labels
CREATE INDEX idx_node_metrics_labels ON node_metrics USING GIN (labels);

-- compression (очень важно для прод)
ALTER TABLE node_metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'node_id, metric'
);

SELECT add_compression_policy('node_metrics', INTERVAL '7 days');

-- retention (опционально)
SELECT add_retention_policy('node_metrics', INTERVAL '90 days');

ALTER TABLE node_metrics ADD PRIMARY KEY (time, node_id, metric);
CREATE INDEX ON node_metrics (node_id, time DESC);

SELECT set_chunk_time_interval('node_metrics', INTERVAL '1 day');


CREATE INDEX idx_node_metrics_grafana
ON node_metrics (metric, node_id, time DESC);

ALTER TABLE node_metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'node_id, metric',
timescaledb.compress_orderby = 'time DESC'
);


CREATE MATERIALIZED VIEW node_metrics_1m
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 minute', time) AS bucket,
node_id,
metric,
avg(value) AS value
FROM node_metrics
GROUP BY bucket, node_id, metric;
140 changes: 140 additions & 0 deletions src/bin/api/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing_subscriber::Layer;

use fcore::{
utils::level_from_settings, utils::measure_time, Connection, ConnectionApiOperations,
ConnectionBaseOperations, MetricStorage, NodeStorageOperations, Publisher, Result,
Subscription, SubscriptionOperations,
};
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{
filter::{filter_fn, Targets},
fmt,
layer::SubscriberExt,
util::SubscriberInitExt,
};

use super::{
config::ServiceSettings,
email::EmailStore,
postgres::{
metrics::{MetricDbBuffer, PostgresMetricWriter},
pg::PgContext,
},
service::{Cache, Service},
sync::MemSync,
tasks::Tasks,
};

impl<N, C, S> Service<N, C, S>
where
N: NodeStorageOperations + Send + Sync + Clone + 'static + std::default::Default,
C: ConnectionBaseOperations
+ ConnectionApiOperations
+ Send
+ Sync
+ Clone
+ 'static
+ PartialEq
+ From<Connection>
+ Into<Connection>
+ serde::Serialize
+ 'static,
S: SubscriptionOperations
+ Send
+ Sync
+ Clone
+ 'static
+ PartialEq
+ Default
+ From<Subscription>
+ 'static,
Connection: From<C>,
Vec<(uuid::Uuid, Connection)>: FromIterator<(uuid::Uuid, C)>,
{
pub async fn bootstrap(settings: ServiceSettings) -> Result<Arc<Service<N, C, S>>> {
let db = PgContext::init(&settings.pg).await?;

let mem = Arc::new(RwLock::new(Cache::new()));
let publisher = Publisher::new(&settings.service.updates_endpoint_zmq).await?;
let mem_sync = MemSync::new(mem.clone(), db.clone(), publisher);
let metric_storage = match MetricStorage::load_snapshot(
&settings.metrics.snapshot_path,
settings.metrics.max_points,
settings.metrics.retention_seconds,
)
.await
{
Ok(storage) => {
tracing::info!("Metrics snapshot restored");
storage
}
Err(err) => {
tracing::warn!("Snapshot restore failed: {}", err);

MetricStorage::new(
settings.metrics.max_points,
settings.metrics.retention_seconds,
)
}
};

let email_store = EmailStore::new(settings.smtp.clone());
email_store.load_trials().await?;

let pg_writer = Arc::new(PostgresMetricWriter {
pool: db.pool().clone(),
});

let db_buffer = Arc::new(MetricDbBuffer {
batch: parking_lot::Mutex::new(vec![]),
pg: pg_writer.clone(),
});

let service = Service::new(
mem_sync,
settings.clone(),
Arc::new(metric_storage),
email_store,
db_buffer,
);

measure_time(service.get_state_from_db(), "Init PostgreSQL DB").await?;

Ok(Arc::new(service))
}
}

pub fn init_tracing(settings: ServiceSettings) {
let level = level_from_settings(&settings.service.log_level);

let stdout_layer = fmt::layer()
.with_target(true)
.with_filter(level)
.with_filter(filter_fn(|metadata| {
!metadata.target().starts_with("metrics") && !metadata.target().starts_with("sqlx")
}));

let log_directory = settings.metrics.log.directory;
let log_file = settings.metrics.log.file;

let metrics_file = RollingFileAppender::new(Rotation::DAILY, log_directory, log_file);

let metrics_layer = fmt::layer()
.with_ansi(false)
.with_target(true)
.with_writer(metrics_file)
.with_filter(
Targets::new()
.with_target("metrics", tracing::Level::DEBUG)
.with_target("metrics.ingest", tracing::Level::DEBUG)
.with_target("metrics.gc", tracing::Level::DEBUG)
.with_target("metrics.heartbeat", tracing::Level::DEBUG),
);

tracing_subscriber::registry()
.with(stdout_layer)
.with(metrics_layer)
.init();
}
22 changes: 18 additions & 4 deletions src/bin/api/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ impl Settings for ServiceSettings {
}

fn default_cors_origins() -> Vec<String> {
vec![
"http://localhost:3000".to_string(),
"http://localhost:3001".to_string(),
]
vec!["http://localhost:8080".to_string()]
}

fn default_wg_network() -> IpAddrMask {
Expand Down Expand Up @@ -58,6 +55,8 @@ pub struct ServiceConfig {
pub trial_limit_days: i64,
pub trial_limit_bytes: i64,
pub subscription_title: String,
pub support_contact: String,
pub base_url: String,
}

#[derive(Clone, Debug, Deserialize, Default)]
Expand All @@ -74,13 +73,28 @@ pub struct TasksConfig {
pub db_sync_interval_sec: u64,
pub subscription_restore_interval: u64,
pub subscription_expire_interval: u64,
pub connection_expire_interval: u64,
pub monitor_nodes_interval: u64,
pub heartbeat_node_offline_threshold_sec: u64,
}

#[derive(Clone, Default, Debug, Deserialize)]
pub struct MetricsLogConfig {
pub enabled: bool,
pub directory: String,
pub file: String,
pub rotation: String,
pub level: String,
}

#[derive(Clone, Default, Debug, Deserialize)]
pub struct MetricsRxConfig {
pub reciever: String,
pub max_points: usize,
pub retention_seconds: i64,
pub log: MetricsLogConfig,
pub snapshot_path: String,
pub pg_flush_interval: u64,
}

fn default_company_website() -> String {
Expand Down
3 changes: 0 additions & 3 deletions src/bin/api/http/handlers/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ where
+ Clone
+ 'static
+ From<Connection>
+ std::fmt::Debug
+ PartialEq
+ serde::ser::Serialize,
S: SubscriptionOperations + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -374,7 +373,6 @@ where
+ Clone
+ 'static
+ From<Connection>
+ std::fmt::Debug
+ PartialEq,
S: SubscriptionOperations + Send + Sync + Clone + 'static + PartialEq,
Connection: From<C>,
Expand Down Expand Up @@ -457,7 +455,6 @@ where
+ Clone
+ 'static
+ From<Connection>
+ std::fmt::Debug
+ PartialEq,
S: SubscriptionOperations + Send + Sync + Clone + 'static + PartialEq,
Connection: From<C>,
Expand Down
Loading
Loading