Skip to content

Commit 62e28bb

Browse files
authored
fix(extractor/solana): properly handle partially downloaded OF1 files (#1511)
* fix(extractor/solana): properly handle partially downloaded OF1 files - If an error occurs while downloading an OF1 CAR file, the next time the extractor runs should resume the file download. This commit adds that logic. * log first error at info level in `BlockStreamerWithRetry` * add error source to `BlockStreamerWithRetry` * extractors/solana: use `fs_err`
1 parent e094a40 commit 62e28bb

File tree

6 files changed

+197
-79
lines changed

6 files changed

+197
-79
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/common/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ indoc.workspace = true
2626
itertools.workspace = true
2727
js-runtime = { path = "../js-runtime" }
2828
metadata-db = { path = "../metadata-db" }
29+
monitoring = { path = "../monitoring" }
2930
object_store.workspace = true
3031
rand.workspace = true
3132
regex.workspace = true

crates/core/common/src/lib.rs

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -216,18 +216,48 @@ impl<T: BlockStreamer + Send + Sync> BlockStreamer for BlockStreamerWithRetry<T>
216216
yield block;
217217
}
218218
Err(e) => {
219+
let error_source = monitoring::logging::error_source(e.as_ref());
219220
// Progressively more severe logging and longer retry interval.
220-
if num_retries < DEBUG_RETRY_LIMIT {
221-
num_retries += 1;
222-
tracing::debug!(block = %current_block, error = %e, "Block streaming failed, retrying");
223-
tokio::time::sleep(DEBUG_RETRY_DELAY).await;
224-
} else if num_retries < WARN_RETRY_LIMIT {
225-
num_retries += 1;
226-
tracing::warn!(block = %current_block, error = %e, "Block streaming failed, retrying");
227-
tokio::time::sleep(WARN_RETRY_DELAY).await;
228-
} else {
229-
tracing::error!(block = %current_block, error = %e, "Block streaming failed, retrying");
230-
tokio::time::sleep(ERROR_RETRY_DELAY).await;
221+
match num_retries {
222+
0 => {
223+
// First error, make sure it is visible in info (default) logs.
224+
num_retries += 1;
225+
tracing::info!(
226+
block = %current_block,
227+
error = %e,
228+
error_source,
229+
"Block streaming failed, retrying"
230+
);
231+
tokio::time::sleep(DEBUG_RETRY_DELAY).await;
232+
}
233+
1..DEBUG_RETRY_LIMIT => {
234+
num_retries += 1;
235+
tracing::debug!(
236+
block = %current_block,
237+
error = %e,
238+
error_source,
239+
"Block streaming failed, retrying");
240+
tokio::time::sleep(DEBUG_RETRY_DELAY).await;
241+
}
242+
DEBUG_RETRY_LIMIT..WARN_RETRY_LIMIT => {
243+
num_retries += 1;
244+
tracing::warn!(
245+
block = %current_block,
246+
error = %e,
247+
error_source,
248+
"Block streaming failed, retrying"
249+
);
250+
tokio::time::sleep(WARN_RETRY_DELAY).await;
251+
}
252+
_ => {
253+
tracing::error!(
254+
block = %current_block,
255+
error = %e,
256+
error_source,
257+
"Block streaming failed, retrying"
258+
);
259+
tokio::time::sleep(ERROR_RETRY_DELAY).await;
260+
}
231261
}
232262
current_block = start + blocks_sent;
233263
continue 'retry;

crates/core/monitoring/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ version.workspace = true
55
license-file.workspace = true
66

77
[dependencies]
8-
common = { path = "../common" }
98
opentelemetry.workspace = true
109
opentelemetry-otlp.workspace = true
1110
opentelemetry_sdk.workspace = true

crates/extractors/solana/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ bs58 = "0.5.1"
1515
common = { path = "../../core/common" }
1616
datasets-common = { path = "../../core/datasets-common" }
1717
futures.workspace = true
18+
fs-err.workspace = true
1819
governor.workspace = true
1920
monitoring = { path = "../../core/monitoring" }
2021
reqwest.workspace = true

crates/extractors/solana/src/of1_client.rs

Lines changed: 153 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,24 @@ use crate::{metrics, rpc_client};
1313

1414
const OLD_FAITHFUL_ARCHIVE_URL: &str = "https://files.old-faithful.net";
1515

16+
#[derive(Debug, Default)]
17+
pub(crate) struct DecodedBlock {
18+
pub(crate) slot: Slot,
19+
pub(crate) parent_slot: Slot,
20+
21+
pub(crate) blockhash: [u8; 32],
22+
pub(crate) prev_blockhash: [u8; 32],
23+
24+
pub(crate) block_height: Option<u64>,
25+
pub(crate) blocktime: u64,
26+
27+
pub(crate) transactions: Vec<solana_sdk::transaction::VersionedTransaction>,
28+
pub(crate) transaction_metas: Vec<solana_storage_proto::confirmed_block::TransactionStatusMeta>,
29+
30+
#[allow(dead_code)]
31+
pub(crate) block_rewards: Vec<solana_storage_proto::confirmed_block::Rewards>,
32+
}
33+
1634
#[allow(clippy::too_many_arguments)]
1735
pub(crate) fn stream(
1836
start: solana_clock::Slot,
@@ -64,24 +82,31 @@ pub(crate) fn stream(
6482
}
6583
};
6684

85+
let reqwest_client = reqwest::Client::new();
86+
6787
// Download historical data via Old Faithful archive CAR files.
6888
'epochs: loop {
6989
tracing::debug!(epoch, "processing Old Faithful CAR file");
70-
let epoch_car_file_path = of1_car_directory.join(of1_car_filename(epoch));
71-
72-
if !std::fs::exists(&epoch_car_file_path)?
73-
&& let Err(e) = download_of1_car_file(
74-
epoch,
75-
&of1_car_directory,
76-
metrics.clone(),
77-
&provider,
78-
&network,
79-
).await
80-
{
90+
let local_filename = format!("epoch-{}.car", epoch);
91+
let epoch_car_file_path = of1_car_directory.join(local_filename);
92+
if let Err(e) = download_of1_car_file(
93+
epoch,
94+
&reqwest_client,
95+
&epoch_car_file_path,
96+
metrics.clone(),
97+
&provider,
98+
&network,
99+
).await {
81100
if let FileDownloadError::Http(404) = e {
82101
// No more epoch CAR files available.
83102
break 'epochs;
84103
} else {
104+
tracing::debug!("failed to download Old Faithful CAR file");
105+
106+
if let Some(metrics) = &metrics {
107+
metrics.record_of1_car_download_error(epoch, &provider, &network);
108+
}
109+
85110
yield Err(e.into());
86111
return;
87112
}
@@ -119,65 +144,139 @@ pub(crate) fn stream(
119144
}
120145

121146
/// Downloads the Old Faithful CAR file for the given epoch into the specified output directory.
147+
///
148+
/// If the file was partially downloaded before, the download will resume from where it left off.
122149
async fn download_of1_car_file(
123150
epoch: solana_clock::Epoch,
124-
output_dir: &Path,
151+
reqwest_client: &reqwest::Client,
152+
dest: &Path,
125153
metrics: Option<Arc<metrics::MetricsRegistry>>,
126154
provider: &str,
127155
network: &str,
128156
) -> Result<(), FileDownloadError> {
129-
let filename = of1_car_filename(epoch);
130-
let car_file_url = format!("{OLD_FAITHFUL_ARCHIVE_URL}/{epoch}/{filename}");
131-
tracing::info!(%car_file_url, "downloading Old Faithful CAR file");
157+
enum DownloadAction {
158+
Download,
159+
Resume(u64),
160+
Restart,
161+
Skip,
162+
}
132163

133-
let car_file_path = output_dir.join(filename);
134-
let mut file = tokio::fs::File::create(&car_file_path).await?;
164+
let download_url = of1_car_download_url(epoch);
135165

136-
let start = std::time::Instant::now();
137-
let response = reqwest::get(&car_file_url).await?;
138-
let status = response.status();
166+
let action = match fs_err::metadata(dest).map(|meta| meta.len()) {
167+
Ok(0) => DownloadAction::Download,
168+
Ok(local_file_size) => {
169+
// Get the actual file size from the server to determine if we need to resume.
170+
let head_response = reqwest_client.head(&download_url).send().await?;
139171

140-
if !status.is_success() {
141-
tracing::debug!(
142-
%status,
143-
"failed to download Old Faithful CAR file"
144-
);
172+
if head_response.status() != reqwest::StatusCode::OK {
173+
return Err(FileDownloadError::Http(head_response.status().as_u16()));
174+
}
145175

146-
if let Some(metrics) = metrics {
147-
metrics.record_of1_car_download_error(epoch, provider, network);
176+
let Some(content_length) = head_response.headers().get(reqwest::header::CONTENT_LENGTH)
177+
else {
178+
return Err(FileDownloadError::MissingContentLengthHeader);
179+
};
180+
let remote_file_size = content_length
181+
.to_str()
182+
.map_err(|_| FileDownloadError::ContentLengthParsing)?
183+
.parse()
184+
.map_err(|_| FileDownloadError::ContentLengthParsing)?;
185+
186+
match local_file_size.cmp(&remote_file_size) {
187+
// Local file is partially downloaded, need to resume.
188+
std::cmp::Ordering::Less => DownloadAction::Resume(local_file_size),
189+
// Local file is larger than remote file, need to restart download.
190+
std::cmp::Ordering::Greater => DownloadAction::Restart,
191+
// File already fully downloaded.
192+
std::cmp::Ordering::Equal => DownloadAction::Skip,
193+
}
194+
}
195+
Err(e) if e.kind() == std::io::ErrorKind::NotFound => DownloadAction::Download,
196+
Err(e) => return Err(FileDownloadError::Io(e)),
197+
};
198+
199+
// Set up HTTP headers for range requests if the file already exists.
200+
let mut headers = reqwest::header::HeaderMap::new();
201+
202+
match action {
203+
DownloadAction::Download => {
204+
tracing::debug!(%download_url, "downloading Old Faithful CAR file");
205+
}
206+
DownloadAction::Resume(download_offset) => {
207+
tracing::debug!(
208+
%download_url,
209+
%download_offset,
210+
"resuming Old Faithful CAR file download"
211+
);
212+
let range_header = format!("bytes={download_offset}-");
213+
let range_header_value =
214+
reqwest::header::HeaderValue::from_str(&range_header).expect("valid range header");
215+
headers.insert(reqwest::header::RANGE, range_header_value);
216+
}
217+
DownloadAction::Restart => {
218+
tracing::debug!(
219+
%download_url,
220+
"local Old Faithful CAR file is larger than remote file, restarting download"
221+
);
222+
tokio::fs::remove_file(&dest).await?;
148223
}
224+
DownloadAction::Skip => {
225+
tracing::debug!(
226+
%download_url,
227+
"local Old Faithful CAR file already fully downloaded, skipping"
228+
);
229+
return Ok(());
230+
}
231+
}
232+
233+
let start = std::time::Instant::now();
149234

235+
let response = reqwest_client
236+
.get(download_url)
237+
.headers(headers)
238+
.send()
239+
.await?;
240+
241+
let status = response.status();
242+
if !status.is_success() {
150243
return Err(FileDownloadError::Http(status.as_u16()));
151244
}
152245

246+
if let DownloadAction::Resume(_) = action {
247+
// Expecting a 206 Partial Content response when resuming.
248+
if status != reqwest::StatusCode::PARTIAL_CONTENT {
249+
return Err(FileDownloadError::PartialDownloadNotSupported);
250+
}
251+
}
252+
253+
let mut file = tokio::fs::File::options()
254+
.create(true) // Create the file if it doesn't exist.
255+
.append(true) // Append to the file to support resuming.
256+
.open(&dest)
257+
.await?;
258+
153259
// Stream the file content since these files can be extremely large.
154-
let mut total_bytes = 0u64;
155260
let mut stream = response.bytes_stream();
261+
let mut bytes_downloaded = 0u64;
156262
while let Some(chunk) = stream.next().await {
157-
let chunk = match chunk {
158-
Ok(c) => c,
159-
Err(err) => {
160-
if let Some(metrics) = metrics {
161-
metrics.record_of1_car_download_error(epoch, provider, network);
162-
}
163-
return Err(FileDownloadError::Reqwest(err));
164-
}
165-
};
263+
let chunk = chunk?;
264+
265+
file.write_all(&chunk).await?;
266+
267+
bytes_downloaded += chunk.len() as u64;
166268

167269
if let Some(ref metrics) = metrics {
168-
metrics.record_of1_car_download_bytes(total_bytes, epoch, provider, network);
270+
metrics.record_of1_car_download_bytes(chunk.len() as u64, epoch, provider, network);
169271
}
170-
171-
total_bytes += chunk.len() as u64;
172-
file.write_all(&chunk).await?;
173272
}
174273

175274
let duration = start.elapsed().as_secs_f64();
176275

177-
tracing::info!(
178-
epoch,
179-
bytes = total_bytes,
180-
duration_secs = duration,
276+
tracing::debug!(
277+
%epoch,
278+
%bytes_downloaded,
279+
duration_secs = %duration,
181280
"downloaded Old Faithful CAR file"
182281
);
183282

@@ -196,6 +295,12 @@ enum FileDownloadError {
196295
Http(u16),
197296
#[error("Reqwest error: {0}")]
198297
Reqwest(#[from] reqwest::Error),
298+
#[error("missing Content-Length header in HTTP response")]
299+
MissingContentLengthHeader,
300+
#[error("error parsing Content-Length header")]
301+
ContentLengthParsing,
302+
#[error("partial downloads are not supported by the server")]
303+
PartialDownloadNotSupported,
199304
}
200305

201306
/// Read an entire block worth of nodes from the given node reader and decode them into
@@ -282,27 +387,9 @@ async fn read_entire_block<R: tokio::io::AsyncRead + Unpin>(
282387
Ok(Some(block))
283388
}
284389

285-
#[derive(Debug, Default)]
286-
pub(crate) struct DecodedBlock {
287-
pub(crate) slot: Slot,
288-
pub(crate) parent_slot: Slot,
289-
290-
pub(crate) blockhash: [u8; 32],
291-
pub(crate) prev_blockhash: [u8; 32],
292-
293-
pub(crate) block_height: Option<u64>,
294-
pub(crate) blocktime: u64,
295-
296-
pub(crate) transactions: Vec<solana_sdk::transaction::VersionedTransaction>,
297-
pub(crate) transaction_metas: Vec<solana_storage_proto::confirmed_block::TransactionStatusMeta>,
298-
299-
#[allow(dead_code)]
300-
pub(crate) block_rewards: Vec<solana_storage_proto::confirmed_block::Rewards>,
301-
}
302-
303-
/// Generates the Old Faithful epoch CAR filename for the given epoch.
390+
/// Generates the Old Faithful CAR download URL for the given epoch.
304391
///
305392
/// Reference: <https://docs.old-faithful.net/references/of1-files>.
306-
fn of1_car_filename(epoch: solana_clock::Epoch) -> String {
307-
format!("epoch-{}.car", epoch)
393+
fn of1_car_download_url(epoch: solana_clock::Epoch) -> String {
394+
format!("{OLD_FAITHFUL_ARCHIVE_URL}/{epoch}/epoch-{epoch}.car")
308395
}

0 commit comments

Comments
 (0)