From 86083cf8ffcff642459c4f0db0701b39074a793a Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 5 May 2026 12:49:26 +0530 Subject: [PATCH 01/13] fix: hottier downloads Make hottier downloads streaming --- src/hottier.rs | 7 ++-- src/metrics/mod.rs | 29 ++++++++++++++++ src/storage/azure_blob.rs | 61 +++++++++++++++++++++++++++++++-- src/storage/gcs.rs | 61 +++++++++++++++++++++++++++++++-- src/storage/localfs.rs | 13 +++++++ src/storage/object_storage.rs | 6 ++++ src/storage/s3.rs | 64 +++++++++++++++++++++++++++++++++-- 7 files changed, 229 insertions(+), 12 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index c472f91c5..c6172d966 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -42,7 +42,6 @@ use relative_path::RelativePathBuf; use std::time::Duration; use sysinfo::Disks; use tokio::fs::{self, DirEntry}; -use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReadDirStream; use tracing::{error, warn}; @@ -438,13 +437,11 @@ impl HotTierManager { } let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; - let mut file = fs::File::create(parquet_path.clone()).await?; - let parquet_data = PARSEABLE + PARSEABLE .storage .get_object_store() - .get_object(&parquet_file_path, tenant_id) + .buffered_write(&parquet_file_path, tenant_id, parquet_path) .await?; - file.write_all(&parquet_data).await?; *parquet_file_size += parquet_file.file_size; stream_hot_tier.used_size = *parquet_file_size; diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 9c9ff86fa..c7689863a 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -306,6 +306,19 @@ pub static TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = + Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "partial_file_scans_in_object_store_calls_by_date", + "Partial file scans in object store calls by date", + ) + .namespace(METRICS_NAMESPACE), + &["method", "date", "tenant_id"], + ) + .expect("metric can be created") + }); + pub static TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = Lazy::new(|| { IntCounterVec::new( @@ -527,6 +540,11 @@ fn custom_metrics(registry: &Registry) { TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), )) .expect("metric can be registered"); + registry + .register(Box::new( + PARTIAL_FILE_SCANS_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), + )) + .expect("metric can be registered"); registry .register(Box::new( TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), @@ -695,6 +713,17 @@ pub fn increment_object_store_calls_by_date(method: &str, date: &str, tenant_id: .inc(); } +pub fn increment_partial_file_scans_in_object_store_calls_by_date( + method: &str, + count: u64, + date: &str, + tenant_id: &str, +) { + PARTIAL_FILE_SCANS_IN_OBJECT_STORE_CALLS_BY_DATE + .with_label_values(&[method, date, tenant_id]) + .inc_by(count); +} + pub fn increment_files_scanned_in_object_store_calls_by_date( method: &str, count: u64, diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 62abad0db..78271b29e 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -16,7 +16,12 @@ * */ -use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use async_trait::async_trait; use bytes::Bytes; @@ -38,7 +43,10 @@ use object_store::{ path::Path as StorePath, }; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::{fs::OpenOptions, io::AsyncReadExt}; +use tokio::{ + fs::OpenOptions, + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, +}; use tracing::error; use url::Url; @@ -47,6 +55,7 @@ use crate::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -204,6 +213,46 @@ pub struct BlobStore { } impl BlobStore { + async fn _stream_to_file( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let resp = self.client.get(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("GET", &date, tenant_str); + let resp = resp?; + + if let Some(parent) = write_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&write_path).await?; + let mut writer = BufWriter::new(file); + let mut stream = resp.into_stream(); + let mut total: u64 = 0; + let mut chunk_count: u64 = 0; + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + total += chunk.len() as u64; + chunk_count += 1; + writer.write_all(&chunk).await?; + } + writer.flush().await?; + writer.into_inner().sync_all().await?; + + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + async fn _get_object( &self, path: &RelativePath, @@ -466,6 +515,14 @@ impl BlobStore { #[async_trait] impl ObjectStorage for BlobStore { + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._stream_to_file(path, tenant_id, write_path).await + } async fn get_buffered_reader( &self, _path: &RelativePath, diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 019802f24..314649116 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -16,13 +16,19 @@ * */ -use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use crate::{ metrics::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -46,7 +52,10 @@ use object_store::{ path::Path as StorePath, }; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::{fs::OpenOptions, io::AsyncReadExt}; +use tokio::{ + fs::OpenOptions, + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, +}; use tracing::error; use super::{ @@ -169,6 +178,46 @@ pub struct Gcs { } impl Gcs { + async fn _stream_to_file( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let resp = self.client.get(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("GET", &date, tenant_str); + let resp = resp?; + + if let Some(parent) = write_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&write_path).await?; + let mut writer = BufWriter::new(file); + let mut stream = resp.into_stream(); + let mut total: u64 = 0; + let mut chunk_count: u64 = 0; + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + total += chunk.len() as u64; + chunk_count += 1; + writer.write_all(&chunk).await?; + } + writer.flush().await?; + writer.into_inner().sync_all().await?; + + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + async fn _get_object( &self, path: &RelativePath, @@ -431,6 +480,14 @@ impl Gcs { #[async_trait] impl ObjectStorage for Gcs { + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._stream_to_file(path, tenant_id, write_path).await + } async fn get_buffered_reader( &self, path: &RelativePath, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 6f981981c..c9a58502c 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -106,6 +106,19 @@ impl LocalFS { #[async_trait] impl ObjectStorage for LocalFS { + async fn buffered_write( + &self, + path: &RelativePath, + _tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let src = self.path_in_root(path); + if let Some(parent) = write_path.parent() { + fs::create_dir_all(parent).await?; + } + fs::copy(&src, &write_path).await?; + Ok(()) + } async fn upload_multipart( &self, key: &RelativePath, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 2e58a0df5..2c6244733 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -302,6 +302,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { path: &RelativePath, tenant_id: &Option, ) -> Result; + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError>; async fn get_object( &self, path: &RelativePath, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 169d3c508..ba38b532f 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -17,7 +17,12 @@ */ use std::{ - collections::HashSet, fmt::Display, path::Path, str::FromStr, sync::Arc, time::Duration, + collections::HashSet, + fmt::Display, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + time::Duration, }; use async_trait::async_trait; @@ -40,7 +45,10 @@ use object_store::{ path::Path as StorePath, }; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::{fs::OpenOptions, io::AsyncReadExt}; +use tokio::{ + fs::OpenOptions, + io::{AsyncReadExt, AsyncWriteExt, BufWriter}, +}; use tracing::error; use crate::{ @@ -48,6 +56,7 @@ use crate::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -331,6 +340,46 @@ pub struct S3 { } impl S3 { + async fn _stream_to_file( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let resp = self.client.get(&to_object_store_path(path)).await; + increment_object_store_calls_by_date("GET", &date, tenant_str); + let resp = resp?; + + if let Some(parent) = write_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&write_path).await?; + let mut writer = BufWriter::new(file); + let mut stream = resp.into_stream(); + let mut total: u64 = 0; + let mut chunk_count: u64 = 0; + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + total += chunk.len() as u64; + chunk_count += 1; + writer.write_all(&chunk).await?; + } + writer.flush().await?; + writer.into_inner().sync_all().await?; + + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + async fn _get_object( &self, path: &RelativePath, @@ -686,12 +735,21 @@ impl ObjectStorage for S3 { Ok(result?) } + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._stream_to_file(path, tenant_id, write_path).await + } + async fn get_object( &self, path: &RelativePath, tenant_id: &Option, ) -> Result { - Ok(self._get_object(path, tenant_id).await?) + self._get_object(path, tenant_id).await } async fn get_objects( From b26b90bd235036c2c00c13254c899b74749fc576 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 5 May 2026 14:36:54 +0530 Subject: [PATCH 02/13] replace streaming with parallel downloads --- src/storage/azure_blob.rs | 63 +++++++++++++++++++++++++++----------- src/storage/gcs.rs | 64 ++++++++++++++++++++++++++++----------- src/storage/s3.rs | 63 +++++++++++++++++++++++++++----------- 3 files changed, 136 insertions(+), 54 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 78271b29e..2cb96f974 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -18,6 +18,7 @@ use std::{ collections::HashSet, + ops::Range, path::{Path, PathBuf}, sync::Arc, time::Duration, @@ -45,11 +46,15 @@ use object_store::{ use relative_path::{RelativePath, RelativePathBuf}; use tokio::{ fs::OpenOptions, - io::{AsyncReadExt, AsyncWriteExt, BufWriter}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + sync::Mutex as AsyncMutex, }; use tracing::error; use url::Url; +const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; +const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 8; + use crate::{ metrics::{ increment_bytes_scanned_in_object_store_calls_by_date, @@ -213,7 +218,7 @@ pub struct BlobStore { } impl BlobStore { - async fn _stream_to_file( + async fn _parallel_download( &self, path: &RelativePath, tenant_id: &Option, @@ -221,27 +226,49 @@ impl BlobStore { ) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let date = Utc::now().date_naive().to_string(); - let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("GET", &date, tenant_str); - let resp = resp?; + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; if let Some(parent) = write_path.parent() { tokio::fs::create_dir_all(parent).await?; } let file = tokio::fs::File::create(&write_path).await?; - let mut writer = BufWriter::new(file); - let mut stream = resp.into_stream(); - let mut total: u64 = 0; - let mut chunk_count: u64 = 0; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - total += chunk.len() as u64; - chunk_count += 1; - writer.write_all(&chunk).await?; - } - writer.flush().await?; - writer.into_inner().sync_all().await?; + file.set_len(total).await?; + let file = Arc::new(AsyncMutex::new(file)); + let chunk = PARALLEL_DOWNLOAD_CHUNK_SIZE; + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + + futures::stream::iter(ranges) + .map(|r| { + let src = src.clone(); + let file = file.clone(); + let client = &self.client; + async move { + let bytes = client.get_range(&src, r.clone()).await?; + let mut f = file.lock().await; + f.seek(std::io::SeekFrom::Start(r.start)).await?; + f.write_all(&bytes).await?; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(PARALLEL_DOWNLOAD_CONCURRENCY) + .try_collect::>() + .await?; + + let file = Arc::try_unwrap(file) + .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? + .into_inner(); + file.sync_all().await?; + + increment_object_store_calls_by_date("GET", &date, tenant_str); increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); increment_partial_file_scans_in_object_store_calls_by_date( @@ -521,7 +548,7 @@ impl ObjectStorage for BlobStore { tenant_id: &Option, write_path: PathBuf, ) -> Result<(), ObjectStorageError> { - self._stream_to_file(path, tenant_id, write_path).await + self._parallel_download(path, tenant_id, write_path).await } async fn get_buffered_reader( &self, diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 314649116..f3ccfbd07 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -18,6 +18,7 @@ use std::{ collections::HashSet, + ops::Range, path::{Path, PathBuf}, sync::Arc, time::Duration, @@ -54,8 +55,12 @@ use object_store::{ use relative_path::{RelativePath, RelativePathBuf}; use tokio::{ fs::OpenOptions, - io::{AsyncReadExt, AsyncWriteExt, BufWriter}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + sync::Mutex as AsyncMutex, }; + +const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; +const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 8; use tracing::error; use super::{ @@ -178,7 +183,7 @@ pub struct Gcs { } impl Gcs { - async fn _stream_to_file( + async fn _parallel_download( &self, path: &RelativePath, tenant_id: &Option, @@ -186,27 +191,50 @@ impl Gcs { ) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let date = Utc::now().date_naive().to_string(); - let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("GET", &date, tenant_str); - let resp = resp?; + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; if let Some(parent) = write_path.parent() { tokio::fs::create_dir_all(parent).await?; } let file = tokio::fs::File::create(&write_path).await?; - let mut writer = BufWriter::new(file); - let mut stream = resp.into_stream(); - let mut total: u64 = 0; - let mut chunk_count: u64 = 0; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - total += chunk.len() as u64; - chunk_count += 1; - writer.write_all(&chunk).await?; - } - writer.flush().await?; - writer.into_inner().sync_all().await?; + file.set_len(total).await?; + let file = Arc::new(AsyncMutex::new(file)); + let chunk = PARALLEL_DOWNLOAD_CHUNK_SIZE; + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + let client = self.client.clone(); + + futures::stream::iter(ranges) + .map(|r| { + let client = client.clone(); + let src = src.clone(); + let file = file.clone(); + async move { + let bytes = client.get_range(&src, r.clone()).await?; + let mut f = file.lock().await; + f.seek(std::io::SeekFrom::Start(r.start)).await?; + f.write_all(&bytes).await?; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(PARALLEL_DOWNLOAD_CONCURRENCY) + .try_collect::>() + .await?; + + let file = Arc::try_unwrap(file) + .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? + .into_inner(); + file.sync_all().await?; + + increment_object_store_calls_by_date("GET", &date, tenant_str); increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); increment_partial_file_scans_in_object_store_calls_by_date( @@ -486,7 +514,7 @@ impl ObjectStorage for Gcs { tenant_id: &Option, write_path: PathBuf, ) -> Result<(), ObjectStorageError> { - self._stream_to_file(path, tenant_id, write_path).await + self._parallel_download(path, tenant_id, write_path).await } async fn get_buffered_reader( &self, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index ba38b532f..c89a81191 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -19,6 +19,7 @@ use std::{ collections::HashSet, fmt::Display, + ops::Range, path::{Path, PathBuf}, str::FromStr, sync::Arc, @@ -47,7 +48,8 @@ use object_store::{ use relative_path::{RelativePath, RelativePathBuf}; use tokio::{ fs::OpenOptions, - io::{AsyncReadExt, AsyncWriteExt, BufWriter}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, + sync::Mutex as AsyncMutex, }; use tracing::error; @@ -70,6 +72,8 @@ use super::{ // in bytes // const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; +const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; +const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 8; #[derive(Debug, Clone, clap::Args)] #[command( @@ -340,7 +344,7 @@ pub struct S3 { } impl S3 { - async fn _stream_to_file( + async fn _parallel_download( &self, path: &RelativePath, tenant_id: &Option, @@ -348,27 +352,50 @@ impl S3 { ) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let date = Utc::now().date_naive().to_string(); - let resp = self.client.get(&to_object_store_path(path)).await; - increment_object_store_calls_by_date("GET", &date, tenant_str); - let resp = resp?; + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; if let Some(parent) = write_path.parent() { tokio::fs::create_dir_all(parent).await?; } let file = tokio::fs::File::create(&write_path).await?; - let mut writer = BufWriter::new(file); - let mut stream = resp.into_stream(); - let mut total: u64 = 0; - let mut chunk_count: u64 = 0; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - total += chunk.len() as u64; - chunk_count += 1; - writer.write_all(&chunk).await?; - } - writer.flush().await?; - writer.into_inner().sync_all().await?; + file.set_len(total).await?; + let file = Arc::new(AsyncMutex::new(file)); + let chunk = PARALLEL_DOWNLOAD_CHUNK_SIZE; + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + let client = Arc::new(self.client.clone()); + + futures::stream::iter(ranges) + .map(|r| { + let client = client.clone(); + let src = src.clone(); + let file = file.clone(); + async move { + let bytes = client.get_range(&src, r.clone()).await?; + let mut f = file.lock().await; + f.seek(std::io::SeekFrom::Start(r.start)).await?; + f.write_all(&bytes).await?; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(PARALLEL_DOWNLOAD_CONCURRENCY) + .try_collect::>() + .await?; + + let file = Arc::try_unwrap(file) + .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? + .into_inner(); + file.sync_all().await?; + + increment_object_store_calls_by_date("GET", &date, tenant_str); increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); increment_partial_file_scans_in_object_store_calls_by_date( @@ -741,7 +768,7 @@ impl ObjectStorage for S3 { tenant_id: &Option, write_path: PathBuf, ) -> Result<(), ObjectStorageError> { - self._stream_to_file(path, tenant_id, write_path).await + self._parallel_download(path, tenant_id, write_path).await } async fn get_object( From 92866c05d53d168d161b84c87eda05116f49129a Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 5 May 2026 15:25:04 +0530 Subject: [PATCH 03/13] new task per range --- src/storage/azure_blob.rs | 48 +++++++++++++++++++++++---------------- src/storage/gcs.rs | 43 ++++++++++++++++++++--------------- src/storage/s3.rs | 43 ++++++++++++++++++++--------------- 3 files changed, 78 insertions(+), 56 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 2cb96f974..9bc67fcd0 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -53,7 +53,7 @@ use tracing::error; use url::Url; const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; -const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 8; +const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 16; use crate::{ metrics::{ @@ -195,7 +195,7 @@ impl ObjectStorageProvider for AzureBlobConfig { // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); Arc::new(BlobStore { - client: azure, + client: Arc::new(azure), account: self.account.clone(), container: self.container.clone(), root: StorePath::from(""), @@ -211,7 +211,7 @@ impl ObjectStorageProvider for AzureBlobConfig { // object store such as S3 and Azure Blob #[derive(Debug)] pub struct BlobStore { - client: LimitStore, + client: Arc>, account: String, container: String, root: StorePath, @@ -245,23 +245,31 @@ impl BlobStore { .map(|s| s..(s + chunk).min(total)) .collect(); let chunk_count = ranges.len() as u64; - - futures::stream::iter(ranges) - .map(|r| { - let src = src.clone(); - let file = file.clone(); - let client = &self.client; - async move { - let bytes = client.get_range(&src, r.clone()).await?; - let mut f = file.lock().await; - f.seek(std::io::SeekFrom::Start(r.start)).await?; - f.write_all(&bytes).await?; - Ok::<_, ObjectStorageError>(()) - } - }) - .buffer_unordered(PARALLEL_DOWNLOAD_CONCURRENCY) - .try_collect::>() - .await?; + let client = self.client.clone(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(PARALLEL_DOWNLOAD_CONCURRENCY)); + + let mut handles = Vec::with_capacity(ranges.len()); + for r in ranges { + let client = client.clone(); + let src = src.clone(); + let file = file.clone(); + let semaphore = semaphore.clone(); + handles.push(tokio::spawn(async move { + let _permit = semaphore + .acquire_owned() + .await + .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; + let bytes = client.get_range(&src, r.clone()).await?; + let mut f = file.lock().await; + f.seek(std::io::SeekFrom::Start(r.start)).await?; + f.write_all(&bytes).await?; + Ok::<_, ObjectStorageError>(()) + })); + } + for h in handles { + h.await + .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; + } let file = Arc::try_unwrap(file) .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index f3ccfbd07..32b741737 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -60,7 +60,7 @@ use tokio::{ }; const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; -const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 8; +const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 16; use tracing::error; use super::{ @@ -211,23 +211,30 @@ impl Gcs { .collect(); let chunk_count = ranges.len() as u64; let client = self.client.clone(); - - futures::stream::iter(ranges) - .map(|r| { - let client = client.clone(); - let src = src.clone(); - let file = file.clone(); - async move { - let bytes = client.get_range(&src, r.clone()).await?; - let mut f = file.lock().await; - f.seek(std::io::SeekFrom::Start(r.start)).await?; - f.write_all(&bytes).await?; - Ok::<_, ObjectStorageError>(()) - } - }) - .buffer_unordered(PARALLEL_DOWNLOAD_CONCURRENCY) - .try_collect::>() - .await?; + let semaphore = Arc::new(tokio::sync::Semaphore::new(PARALLEL_DOWNLOAD_CONCURRENCY)); + + let mut handles = Vec::with_capacity(ranges.len()); + for r in ranges { + let client = client.clone(); + let src = src.clone(); + let file = file.clone(); + let semaphore = semaphore.clone(); + handles.push(tokio::spawn(async move { + let _permit = semaphore + .acquire_owned() + .await + .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; + let bytes = client.get_range(&src, r.clone()).await?; + let mut f = file.lock().await; + f.seek(std::io::SeekFrom::Start(r.start)).await?; + f.write_all(&bytes).await?; + Ok::<_, ObjectStorageError>(()) + })); + } + for h in handles { + h.await + .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; + } let file = Arc::try_unwrap(file) .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? diff --git a/src/storage/s3.rs b/src/storage/s3.rs index c89a81191..11abd3cd5 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -73,7 +73,7 @@ use super::{ // const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; -const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 8; +const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 16; #[derive(Debug, Clone, clap::Args)] #[command( @@ -372,23 +372,30 @@ impl S3 { .collect(); let chunk_count = ranges.len() as u64; let client = Arc::new(self.client.clone()); - - futures::stream::iter(ranges) - .map(|r| { - let client = client.clone(); - let src = src.clone(); - let file = file.clone(); - async move { - let bytes = client.get_range(&src, r.clone()).await?; - let mut f = file.lock().await; - f.seek(std::io::SeekFrom::Start(r.start)).await?; - f.write_all(&bytes).await?; - Ok::<_, ObjectStorageError>(()) - } - }) - .buffer_unordered(PARALLEL_DOWNLOAD_CONCURRENCY) - .try_collect::>() - .await?; + let semaphore = Arc::new(tokio::sync::Semaphore::new(PARALLEL_DOWNLOAD_CONCURRENCY)); + + let mut handles = Vec::with_capacity(ranges.len()); + for r in ranges { + let client = client.clone(); + let src = src.clone(); + let file = file.clone(); + let semaphore = semaphore.clone(); + handles.push(tokio::spawn(async move { + let _permit = semaphore + .acquire_owned() + .await + .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; + let bytes = client.get_range(&src, r.clone()).await?; + let mut f = file.lock().await; + f.seek(std::io::SeekFrom::Start(r.start)).await?; + f.write_all(&bytes).await?; + Ok::<_, ObjectStorageError>(()) + })); + } + for h in handles { + h.await + .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; + } let file = Arc::try_unwrap(file) .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? From 881d6fe4baaba3b1cd092000fea5b9fde6729182 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 5 May 2026 17:11:31 +0530 Subject: [PATCH 04/13] expose new env vars --- src/cli.rs | 16 ++++++++++++++++ src/storage/azure_blob.rs | 11 ++++++----- src/storage/gcs.rs | 10 ++++++---- src/storage/s3.rs | 10 ++++++---- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 969c3a619..49aea0be5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -314,6 +314,22 @@ pub struct Options { )] pub hot_tier_storage_path: Option, + #[arg( + long = "hot-tier-download-chunk-size", + env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE", + default_value = "8388608", + help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)" + )] + pub hot_tier_download_chunk_size: u64, + + #[arg( + long = "hot-tier-download-concurrency", + env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY", + default_value = "16", + help = "Number of concurrent range requests per hot tier download" + )] + pub hot_tier_download_concurrency: usize, + //TODO: remove this when smart cache is implemented #[arg( long = "index-storage-path", diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 9bc67fcd0..46d610573 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -52,9 +52,6 @@ use tokio::{ use tracing::error; use url::Url; -const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; -const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 16; - use crate::{ metrics::{ increment_bytes_scanned_in_object_store_calls_by_date, @@ -239,14 +236,18 @@ impl BlobStore { file.set_len(total).await?; let file = Arc::new(AsyncMutex::new(file)); - let chunk = PARALLEL_DOWNLOAD_CHUNK_SIZE; + let chunk = PARSEABLE + .options + .hot_tier_download_chunk_size + .max(8 * 1024 * 1024); + let concurrency = PARSEABLE.options.hot_tier_download_concurrency.max(6); let ranges: Vec> = (0..total) .step_by(chunk as usize) .map(|s| s..(s + chunk).min(total)) .collect(); let chunk_count = ranges.len() as u64; let client = self.client.clone(); - let semaphore = Arc::new(tokio::sync::Semaphore::new(PARALLEL_DOWNLOAD_CONCURRENCY)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); let mut handles = Vec::with_capacity(ranges.len()); for r in ranges { diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 32b741737..561c687f4 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -59,8 +59,6 @@ use tokio::{ sync::Mutex as AsyncMutex, }; -const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; -const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 16; use tracing::error; use super::{ @@ -204,14 +202,18 @@ impl Gcs { file.set_len(total).await?; let file = Arc::new(AsyncMutex::new(file)); - let chunk = PARALLEL_DOWNLOAD_CHUNK_SIZE; + let chunk = PARSEABLE + .options + .hot_tier_download_chunk_size + .max(8 * 1024 * 1024); + let concurrency = PARSEABLE.options.hot_tier_download_concurrency.max(16); let ranges: Vec> = (0..total) .step_by(chunk as usize) .map(|s| s..(s + chunk).min(total)) .collect(); let chunk_count = ranges.len() as u64; let client = self.client.clone(); - let semaphore = Arc::new(tokio::sync::Semaphore::new(PARALLEL_DOWNLOAD_CONCURRENCY)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); let mut handles = Vec::with_capacity(ranges.len()); for r in ranges { diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 11abd3cd5..f1bf5a3ba 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -72,8 +72,6 @@ use super::{ // in bytes // const MULTIPART_UPLOAD_SIZE: usize = 1024 * 1024 * 100; const AWS_CONTAINER_CREDENTIALS_RELATIVE_URI: &str = "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"; -const PARALLEL_DOWNLOAD_CHUNK_SIZE: u64 = 8 * 1024 * 1024; -const PARALLEL_DOWNLOAD_CONCURRENCY: usize = 16; #[derive(Debug, Clone, clap::Args)] #[command( @@ -365,14 +363,18 @@ impl S3 { file.set_len(total).await?; let file = Arc::new(AsyncMutex::new(file)); - let chunk = PARALLEL_DOWNLOAD_CHUNK_SIZE; + let chunk = PARSEABLE + .options + .hot_tier_download_chunk_size + .max(8 * 1024 * 1024); + let concurrency = PARSEABLE.options.hot_tier_download_concurrency.max(16); let ranges: Vec> = (0..total) .step_by(chunk as usize) .map(|s| s..(s + chunk).min(total)) .collect(); let chunk_count = ranges.len() as u64; let client = Arc::new(self.client.clone()); - let semaphore = Arc::new(tokio::sync::Semaphore::new(PARALLEL_DOWNLOAD_CONCURRENCY)); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); let mut handles = Vec::with_capacity(ranges.len()); for r in ranges { From aec06d3e7f6d8ba9215a952ac71ffc032917011d Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 5 May 2026 17:35:54 +0530 Subject: [PATCH 05/13] parallel file download per stream --- src/cli.rs | 8 ++ src/hottier.rs | 268 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 186 insertions(+), 90 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 49aea0be5..1eaf617c7 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -330,6 +330,14 @@ pub struct Options { )] pub hot_tier_download_concurrency: usize, + #[arg( + long = "hot-tier-files-per-stream-concurrency", + env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY", + default_value = "4", + help = "Number of concurrent parquet file downloads per stream during hot tier sync" + )] + pub hot_tier_files_per_stream_concurrency: usize, + //TODO: remove this when smart cache is implemented #[arg( long = "index-storage-path", diff --git a/src/hottier.rs b/src/hottier.rs index c6172d966..887f0c4d7 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -17,10 +17,12 @@ */ use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, io, path::{Path, PathBuf}, + sync::Arc, }; +use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use crate::{ catalog::manifest::{File, Manifest}, @@ -64,9 +66,18 @@ pub struct StreamHotTier { pub oldest_date_time_entry: Option, } +/// Per-stream in-memory bookkeeping. Mutex protects concurrent reservation, +/// commit, and per-date manifest writes. Downloads run outside the lock. +struct StreamSyncState { + sht: AsyncMutex, +} + +type StreamKey = (Option, String); + pub struct HotTierManager { filesystem: LocalFileSystem, hot_tier_path: &'static Path, + state_cache: AsyncRwLock>>, } impl HotTierManager { @@ -75,9 +86,39 @@ impl HotTierManager { HotTierManager { filesystem: LocalFileSystem::new(), hot_tier_path, + state_cache: AsyncRwLock::new(HashMap::new()), } } + /// Lazy-load and cache the `StreamHotTier` for a (tenant, stream) pair. + /// All sync-path mutations should acquire `state.sht.lock()`. + async fn get_or_load_state( + &self, + stream: &str, + tenant_id: &Option, + ) -> Result, HotTierError> { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + if let Some(state) = self.state_cache.read().await.get(&key).cloned() { + return Ok(state); + } + let mut cache = self.state_cache.write().await; + if let Some(state) = cache.get(&key).cloned() { + return Ok(state); + } + let sht = self.get_hot_tier(stream, tenant_id).await?; + let state = Arc::new(StreamSyncState { + sht: AsyncMutex::new(sht), + }); + cache.insert(key, state.clone()); + Ok(state) + } + + /// Drop cached state for a stream (used after delete). + pub async fn invalidate_state(&self, stream: &str, tenant_id: &Option) { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + self.state_cache.write().await.remove(&key); + } + /// Get a global pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); @@ -219,6 +260,7 @@ impl HotTierManager { self.hot_tier_path.join(stream) }; fs::remove_dir_all(path).await?; + self.invalidate_state(stream, tenant_id).await; Ok(()) } @@ -319,9 +361,6 @@ impl HotTierManager { stream: String, tenant_id: Option, ) -> Result<(), HotTierError> { - let stream_hot_tier = self.get_hot_tier(&stream, &tenant_id).await?; - let mut parquet_file_size = stream_hot_tier.used_size; - let mut s3_manifest_file_list = PARSEABLE .metastore .get_all_manifest_files(&stream, &tenant_id) @@ -332,13 +371,8 @@ impl HotTierManager { ))) })?; - self.process_manifest( - &stream, - &mut s3_manifest_file_list, - &mut parquet_file_size, - &tenant_id, - ) - .await?; + self.process_manifest(&stream, &mut s3_manifest_file_list, &tenant_id) + .await?; Ok(()) } @@ -351,118 +385,172 @@ impl HotTierManager { &self, stream: &str, manifest_files_to_download: &mut BTreeMap>, - parquet_file_size: &mut u64, tenant_id: &Option, ) -> Result<(), HotTierError> { if manifest_files_to_download.is_empty() { return Ok(()); } + + // Build flat work list: (date, file, local_path) ordered latest-date-first, + // and within a date by descending file_path (matches old `pop()` order). + let mut work: Vec<(NaiveDate, File, PathBuf)> = Vec::new(); for (str_date, manifest_files) in manifest_files_to_download.iter().rev() { - let mut storage_combined_manifest = Manifest::default(); + let date = + match NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") { + Ok(d) => d, + Err(_) => { + warn!("Invalid date format: {}", str_date); + continue; + } + }; + let mut combined = Manifest::default(); for storage_manifest in manifest_files { - storage_combined_manifest - .files - .extend(storage_manifest.files.clone()); + combined.files.extend(storage_manifest.files.clone()); } + combined.files.sort_by_key(|f| f.file_path.clone()); + while let Some(parquet_file) = combined.files.pop() { + let parquet_path = self.hot_tier_path.join(&parquet_file.file_path); + if !parquet_path.exists() { + work.push((date, parquet_file, parquet_path)); + } + } + } - storage_combined_manifest - .files - .sort_by_key(|file| file.file_path.clone()); - - while let Some(parquet_file) = storage_combined_manifest.files.pop() { - let parquet_file_path = &parquet_file.file_path; - let parquet_path = self.hot_tier_path.join(parquet_file_path); + if work.is_empty() { + return Ok(()); + } - if !parquet_path.exists() { - if let Ok(date) = - NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") - { - if !self - .process_parquet_file( - stream, - &parquet_file, - parquet_file_size, - parquet_path, - date, - tenant_id, - ) - .await? - { - break; - } - } else { - warn!("Invalid date format: {}", str_date); + let state = self.get_or_load_state(stream, tenant_id).await?; + let concurrency = PARSEABLE + .options + .hot_tier_files_per_stream_concurrency + .max(4); + + // Reservation failure (out of disk + nothing to evict) is sticky: + // once one file can't be placed, no subsequent file will fit either. + let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + let stream_owned = stream.to_owned(); + let tenant_owned = tenant_id.clone(); + + let results: Vec> = futures::stream::iter(work) + .map(|(date, file, parquet_path)| { + let state = state.clone(); + let stream = stream_owned.clone(); + let tenant_id = tenant_owned.clone(); + let stop = stop.clone(); + async move { + if stop.load(std::sync::atomic::Ordering::Relaxed) { + return Ok(()); + } + let processed = self + .process_parquet_file_concurrent( + &stream, + &file, + parquet_path, + date, + &tenant_id, + &state, + ) + .await?; + if !processed { + stop.store(true, std::sync::atomic::Ordering::Relaxed); } + Ok(()) } - } + }) + .buffer_unordered(concurrency) + .collect() + .await; + + for r in results { + r?; } Ok(()) } - /// process the parquet file for the stream - /// check if the disk is available to download the parquet file - /// if not available, delete the oldest entry from the hot tier directory - /// download the parquet file from S3 to the hot tier directory - /// update the used and available size in the hot tier metadata - /// return true if the parquet file is processed successfully - async fn process_parquet_file( + /// Reserve disk budget under the per-stream lock, download outside the lock, + /// then commit usage + per-date manifest under the lock again. + /// Returns false when no budget is available (caller should stop scheduling + /// further work for this stream). + async fn process_parquet_file_concurrent( &self, stream: &str, parquet_file: &File, - parquet_file_size: &mut u64, parquet_path: PathBuf, date: NaiveDate, tenant_id: &Option, + state: &Arc, ) -> Result { - let mut file_processed = false; - let mut stream_hot_tier = self.get_hot_tier(stream, tenant_id).await?; - if !self.is_disk_available(parquet_file.file_size).await? - || stream_hot_tier.available_size <= parquet_file.file_size + // RESERVE { - if !self - .cleanup_hot_tier_old_data( - stream, - &mut stream_hot_tier, - &parquet_path, - parquet_file.file_size, - tenant_id, - ) - .await? - { - return Ok(file_processed); + let mut sht = state.sht.lock().await; + if (!self.is_disk_available(parquet_file.file_size).await? + || sht.available_size <= parquet_file.file_size) + && !self + .cleanup_hot_tier_old_data( + stream, + &mut sht, + &parquet_path, + parquet_file.file_size, + tenant_id, + ) + .await? + { + return Ok(false); + } + if sht.available_size <= parquet_file.file_size { + return Ok(false); } - *parquet_file_size = stream_hot_tier.used_size; + sht.available_size -= parquet_file.file_size; + self.put_hot_tier(stream, &mut sht, tenant_id).await?; } + + // DOWNLOAD (no lock held) let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; - PARSEABLE + let download_result = PARSEABLE .storage .get_object_store() - .buffered_write(&parquet_file_path, tenant_id, parquet_path) - .await?; - *parquet_file_size += parquet_file.file_size; - stream_hot_tier.used_size = *parquet_file_size; + .buffered_write(&parquet_file_path, tenant_id, parquet_path.clone()) + .await; + + if let Err(e) = download_result { + // refund reservation + let mut sht = state.sht.lock().await; + sht.available_size += parquet_file.file_size; + if let Err(put_err) = self.put_hot_tier(stream, &mut sht, tenant_id).await { + error!("failed to persist refund after download failure: {put_err:?}"); + } + // best-effort cleanup of any partial file + let _ = fs::remove_file(&parquet_path).await; + return Err(e.into()); + } - stream_hot_tier.available_size -= parquet_file.file_size; - self.put_hot_tier(stream, &mut stream_hot_tier, tenant_id) - .await?; - file_processed = true; - let path = self.get_stream_path_for_date(stream, &date, tenant_id); - let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?; - hot_tier_manifest.files.push(parquet_file.clone()); - hot_tier_manifest - .files - .sort_by_key(|file| file.file_path.clone()); - // write the manifest file to the hot tier directory - let manifest_path = self - .get_stream_path_for_date(stream, &date, tenant_id) - .join("hottier.manifest.json"); - fs::create_dir_all(manifest_path.parent().unwrap()).await?; - fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; - - Ok(file_processed) + // COMMIT + { + let mut sht = state.sht.lock().await; + sht.used_size += parquet_file.file_size; + self.put_hot_tier(stream, &mut sht, tenant_id).await?; + + let path = self.get_stream_path_for_date(stream, &date, tenant_id); + let mut hot_tier_manifest = + HotTierManager::get_hot_tier_manifest_from_path(path).await?; + hot_tier_manifest.files.push(parquet_file.clone()); + hot_tier_manifest + .files + .sort_by_key(|file| file.file_path.clone()); + // write the manifest file to the hot tier directory + let manifest_path = self + .get_stream_path_for_date(stream, &date, tenant_id) + .join("hottier.manifest.json"); + fs::create_dir_all(manifest_path.parent().unwrap()).await?; + fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; + } + + Ok(true) } ///fetch the list of dates available in the hot tier directory for the stream and sort them From 2b786c7582785d0badd414c3bdbdac8c16ca689e Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 5 May 2026 18:34:06 +0530 Subject: [PATCH 06/13] concurrent writes instead of mutex --- src/hottier.rs | 6 +++--- src/storage/azure_blob.rs | 28 ++++++++++++++-------------- src/storage/gcs.rs | 28 ++++++++++++++-------------- src/storage/s3.rs | 28 ++++++++++++++-------------- 4 files changed, 45 insertions(+), 45 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 887f0c4d7..d176a66ca 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -498,9 +498,9 @@ impl HotTierManager { tenant_id, ) .await? - { - return Ok(false); - } + { + return Ok(false); + } if sht.available_size <= parquet_file.file_size { return Ok(false); } diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 46d610573..5f680e9ee 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -44,11 +44,7 @@ use object_store::{ path::Path as StorePath, }; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::{ - fs::OpenOptions, - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, - sync::Mutex as AsyncMutex, -}; +use tokio::{fs::OpenOptions, io::AsyncReadExt}; use tracing::error; use url::Url; @@ -234,7 +230,7 @@ impl BlobStore { } let file = tokio::fs::File::create(&write_path).await?; file.set_len(total).await?; - let file = Arc::new(AsyncMutex::new(file)); + let std_file = Arc::new(file.into_std().await); let chunk = PARSEABLE .options @@ -253,7 +249,7 @@ impl BlobStore { for r in ranges { let client = client.clone(); let src = src.clone(); - let file = file.clone(); + let std_file = std_file.clone(); let semaphore = semaphore.clone(); handles.push(tokio::spawn(async move { let _permit = semaphore @@ -261,9 +257,13 @@ impl BlobStore { .await .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; let bytes = client.get_range(&src, r.clone()).await?; - let mut f = file.lock().await; - f.seek(std::io::SeekFrom::Start(r.start)).await?; - f.write_all(&bytes).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + use std::os::unix::fs::FileExt; + std_file.write_all_at(&bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; Ok::<_, ObjectStorageError>(()) })); } @@ -272,10 +272,10 @@ impl BlobStore { .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; } - let file = Arc::try_unwrap(file) - .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? - .into_inner(); - file.sync_all().await?; + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; increment_object_store_calls_by_date("GET", &date, tenant_str); increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 561c687f4..d40c80b4b 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -53,11 +53,7 @@ use object_store::{ path::Path as StorePath, }; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::{ - fs::OpenOptions, - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, - sync::Mutex as AsyncMutex, -}; +use tokio::{fs::OpenOptions, io::AsyncReadExt}; use tracing::error; @@ -200,7 +196,7 @@ impl Gcs { } let file = tokio::fs::File::create(&write_path).await?; file.set_len(total).await?; - let file = Arc::new(AsyncMutex::new(file)); + let std_file = Arc::new(file.into_std().await); let chunk = PARSEABLE .options @@ -219,7 +215,7 @@ impl Gcs { for r in ranges { let client = client.clone(); let src = src.clone(); - let file = file.clone(); + let std_file = std_file.clone(); let semaphore = semaphore.clone(); handles.push(tokio::spawn(async move { let _permit = semaphore @@ -227,9 +223,13 @@ impl Gcs { .await .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; let bytes = client.get_range(&src, r.clone()).await?; - let mut f = file.lock().await; - f.seek(std::io::SeekFrom::Start(r.start)).await?; - f.write_all(&bytes).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + use std::os::unix::fs::FileExt; + std_file.write_all_at(&bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; Ok::<_, ObjectStorageError>(()) })); } @@ -238,10 +238,10 @@ impl Gcs { .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; } - let file = Arc::try_unwrap(file) - .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? - .into_inner(); - file.sync_all().await?; + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; increment_object_store_calls_by_date("GET", &date, tenant_str); increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); diff --git a/src/storage/s3.rs b/src/storage/s3.rs index f1bf5a3ba..4bc72d078 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -46,11 +46,7 @@ use object_store::{ path::Path as StorePath, }; use relative_path::{RelativePath, RelativePathBuf}; -use tokio::{ - fs::OpenOptions, - io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, - sync::Mutex as AsyncMutex, -}; +use tokio::{fs::OpenOptions, io::AsyncReadExt}; use tracing::error; use crate::{ @@ -361,7 +357,7 @@ impl S3 { } let file = tokio::fs::File::create(&write_path).await?; file.set_len(total).await?; - let file = Arc::new(AsyncMutex::new(file)); + let std_file = Arc::new(file.into_std().await); let chunk = PARSEABLE .options @@ -380,7 +376,7 @@ impl S3 { for r in ranges { let client = client.clone(); let src = src.clone(); - let file = file.clone(); + let std_file = std_file.clone(); let semaphore = semaphore.clone(); handles.push(tokio::spawn(async move { let _permit = semaphore @@ -388,9 +384,13 @@ impl S3 { .await .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; let bytes = client.get_range(&src, r.clone()).await?; - let mut f = file.lock().await; - f.seek(std::io::SeekFrom::Start(r.start)).await?; - f.write_all(&bytes).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + use std::os::unix::fs::FileExt; + std_file.write_all_at(&bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; Ok::<_, ObjectStorageError>(()) })); } @@ -399,10 +399,10 @@ impl S3 { .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; } - let file = Arc::try_unwrap(file) - .map_err(|_| ObjectStorageError::Custom("download file arc still shared".into()))? - .into_inner(); - file.sync_all().await?; + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; increment_object_store_calls_by_date("GET", &date, tenant_str); increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); From 24b893fff0aa2ac49c118311f773299b94a77a6a Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 6 May 2026 17:09:09 +0530 Subject: [PATCH 07/13] crash safety by using .partial file --- src/hottier.rs | 97 +++++++++++++++++++++++++++++++++++++-- src/storage/azure_blob.rs | 29 ++++++++++-- src/storage/gcs.rs | 29 ++++++++++-- src/storage/localfs.rs | 15 ++++-- src/storage/mod.rs | 15 ++++++ src/storage/s3.rs | 29 ++++++++++-- 6 files changed, 199 insertions(+), 15 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index d176a66ca..d10c7308c 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -105,7 +105,7 @@ impl HotTierManager { if let Some(state) = cache.get(&key).cloned() { return Ok(state); } - let sht = self.get_hot_tier(stream, tenant_id).await?; + let sht = self.reconcile_stream(stream, tenant_id).await?; let state = Arc::new(StreamSyncState { sht: AsyncMutex::new(sht), }); @@ -119,6 +119,98 @@ impl HotTierManager { self.state_cache.write().await.remove(&key); } + /// Walk the on-disk hot-tier directory for a stream and bring it into + /// agreement with `hottier.manifest.json` files. Removes `.partial` + /// orphans, drops manifest entries whose files are missing or wrong size, + /// deletes parquet files that exist but are not in their date manifest, + /// then recomputes `used_size` / `available_size` from the cleaned + /// manifests and persists the updated `StreamHotTier`. + async fn reconcile_stream( + &self, + stream: &str, + tenant_id: &Option, + ) -> Result { + let mut sht = self.get_hot_tier(stream, tenant_id).await?; + let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; + let mut total_used: u64 = 0; + + for date in dates { + let date_dir = self.get_stream_path_for_date(stream, &date, tenant_id); + if !date_dir.exists() { + continue; + } + + // Pass 1: collect on-disk parquet files (drop .partial orphans). + let mut on_disk: std::collections::HashSet = std::collections::HashSet::new(); + let mut entries = fs::read_dir(&date_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let p = entry.path(); + let Some(name_os) = p.file_name() else { + continue; + }; + let name = name_os.to_string_lossy(); + if name.ends_with(".partial") { + let _ = fs::remove_file(&p).await; + continue; + } + if name.ends_with(".manifest.json") { + continue; + } + if !p.is_file() { + continue; + } + on_disk.insert(name.into_owned()); + } + + // Pass 2: clean manifest of stale entries. + let manifest_path = date_dir.join("hottier.manifest.json"); + let mut manifest: Manifest = if manifest_path.exists() { + let bytes = fs::read(&manifest_path).await?; + serde_json::from_slice(&bytes).unwrap_or_default() + } else { + Manifest::default() + }; + + let mut kept = Vec::with_capacity(manifest.files.len()); + let mut keep_names: std::collections::HashSet = + std::collections::HashSet::new(); + for f in manifest.files.drain(..) { + let local = self.hot_tier_path.join(&f.file_path); + let ok = match fs::metadata(&local).await { + Ok(m) => m.len() == f.file_size, + Err(_) => false, + }; + if ok { + if let Some(name) = local.file_name().and_then(|s| s.to_str()) { + keep_names.insert(name.to_owned()); + } + total_used += f.file_size; + kept.push(f); + } else { + let _ = fs::remove_file(&local).await; + } + } + kept.sort_by_key(|f| f.file_path.clone()); + manifest.files = kept; + + if manifest_path.exists() || !manifest.files.is_empty() { + fs::create_dir_all(&date_dir).await?; + fs::write(&manifest_path, serde_json::to_vec(&manifest)?).await?; + } + + // Pass 3: delete on-disk parquet files not referenced by the cleaned manifest. + for name in on_disk.difference(&keep_names) { + let p = date_dir.join(name); + let _ = fs::remove_file(&p).await; + } + } + + sht.used_size = total_used; + sht.available_size = sht.size.saturating_sub(total_used); + self.put_hot_tier(stream, &mut sht, tenant_id).await?; + Ok(sht) + } + /// Get a global pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); @@ -524,8 +616,7 @@ impl HotTierManager { if let Err(put_err) = self.put_hot_tier(stream, &mut sht, tenant_id).await { error!("failed to persist refund after download failure: {put_err:?}"); } - // best-effort cleanup of any partial file - let _ = fs::remove_file(&parquet_path).await; + // backend already cleaned up its `.partial` file; final path was never created. return Err(e.into()); } diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 5f680e9ee..52a4668aa 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -61,7 +61,8 @@ use crate::{ use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] @@ -216,6 +217,28 @@ impl BlobStore { path: &RelativePath, tenant_id: &Option, write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + tokio::fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, ) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let date = Utc::now().date_naive().to_string(); @@ -225,10 +248,10 @@ impl BlobStore { increment_object_store_calls_by_date("HEAD", &date, tenant_str); let total = meta.size; - if let Some(parent) = write_path.parent() { + if let Some(parent) = partial_path.parent() { tokio::fs::create_dir_all(parent).await?; } - let file = tokio::fs::File::create(&write_path).await?; + let file = tokio::fs::File::create(&partial_path).await?; file.set_len(total).await?; let std_file = Arc::new(file.into_std().await); diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index d40c80b4b..75dd141b6 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -60,7 +60,8 @@ use tracing::error; use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] @@ -182,6 +183,28 @@ impl Gcs { path: &RelativePath, tenant_id: &Option, write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + tokio::fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, ) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let date = Utc::now().date_naive().to_string(); @@ -191,10 +214,10 @@ impl Gcs { increment_object_store_calls_by_date("HEAD", &date, tenant_str); let total = meta.size; - if let Some(parent) = write_path.parent() { + if let Some(parent) = partial_path.parent() { tokio::fs::create_dir_all(parent).await?; } - let file = tokio::fs::File::create(&write_path).await?; + let file = tokio::fs::File::create(&partial_path).await?; file.set_len(total).await?; let std_file = Arc::new(file.into_std().await); diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index c9a58502c..8d6027413 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -113,11 +113,20 @@ impl ObjectStorage for LocalFS { write_path: PathBuf, ) -> Result<(), ObjectStorageError> { let src = self.path_in_root(path); - if let Some(parent) = write_path.parent() { + let partial = super::partial_path(&write_path)?; + if let Some(parent) = partial.parent() { fs::create_dir_all(parent).await?; } - fs::copy(&src, &write_path).await?; - Ok(()) + match fs::copy(&src, &partial).await { + Ok(_) => { + fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = fs::remove_file(&partial).await; + Err(e.into()) + } + } } async fn upload_multipart( &self, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2ca7a10de..15dd98975 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -344,3 +344,18 @@ pub enum ObjectStorageError { pub fn to_object_store_path(path: &RelativePath) -> Path { Path::from(path.as_str()) } + +/// Append `.partial` to the file name of a local path. Used by hot-tier +/// downloaders to write to a sibling path and atomically rename on success. +pub fn partial_path( + write_path: &std::path::Path, +) -> Result { + let name = write_path + .file_name() + .ok_or_else(|| ObjectStorageError::Custom("download write_path has no file name".into()))?; + let mut next = std::ffi::OsString::from(name); + next.push(".partial"); + let mut buf = write_path.to_path_buf(); + buf.set_file_name(next); + Ok(buf) +} diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 4bc72d078..1ba95ef76 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -62,7 +62,8 @@ use crate::{ use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; // in bytes @@ -343,6 +344,28 @@ impl S3 { path: &RelativePath, tenant_id: &Option, write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + tokio::fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, ) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let date = Utc::now().date_naive().to_string(); @@ -352,10 +375,10 @@ impl S3 { increment_object_store_calls_by_date("HEAD", &date, tenant_str); let total = meta.size; - if let Some(parent) = write_path.parent() { + if let Some(parent) = partial_path.parent() { tokio::fs::create_dir_all(parent).await?; } - let file = tokio::fs::File::create(&write_path).await?; + let file = tokio::fs::File::create(&partial_path).await?; file.set_len(total).await?; let std_file = Arc::new(file.into_std().await); From e734cf455461eca2f13969f6e26a43d33833bf31 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 6 May 2026 18:11:50 +0530 Subject: [PATCH 08/13] separate out historic and latest hottier tasks --- src/cli.rs | 16 +++++++++ src/hottier.rs | 92 +++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 89 insertions(+), 19 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 1eaf617c7..6b5e9d7e4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -338,6 +338,22 @@ pub struct Options { )] pub hot_tier_files_per_stream_concurrency: usize, + #[arg( + long = "hot-tier-latest-minutes", + env = "P_HOT_TIER_LATEST_MINUTES", + default_value = "10", + help = "Files whose timestamp is within the last N minutes are 'latest'; rest are 'historic'." + )] + pub hot_tier_latest_minutes: i64, + + #[arg( + long = "hot-tier-historic-sync-minutes", + env = "P_HOT_TIER_HISTORIC_SYNC_MINUTES", + default_value = "5", + help = "Interval (minutes) at which the historic hot-tier sync runs." + )] + pub hot_tier_historic_sync_minutes: u32, + //TODO: remove this when smart cache is implemented #[arg( long = "index-storage-path", diff --git a/src/hottier.rs b/src/hottier.rs index d10c7308c..72eacc750 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -72,6 +72,15 @@ struct StreamSyncState { sht: AsyncMutex, } +/// Hot-tier sync runs in two phases. Latest pulls files newer than +/// `hot_tier_latest_minutes` ago and may evict historic to make room. +/// Historic pulls older files, runs less often, never triggers eviction. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum SyncPhase { + Latest, + Historic, +} + type StreamKey = (Option, String); pub struct HotTierManager { @@ -392,7 +401,10 @@ impl HotTierManager { Ok(path) } - /// schedule the download of the hot tier files from S3 every minute + /// schedule two hot-tier sync jobs: a frequent "latest" pass for files + /// within the last `P_HOT_TIER_LATEST_MINUTES`, and a less-frequent + /// "historic" pass for older files. Both share the per-stream lock; only + /// the latest pass triggers eviction. pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where 'a: 'static, @@ -402,8 +414,19 @@ impl HotTierManager { .every(HOT_TIER_SYNC_DURATION) .plus(Interval::Seconds(5)) .run(move || async { - if let Err(err) = self.sync_hot_tier().await { - error!("Error in hot tier scheduler: {:?}", err); + if let Err(err) = self.sync_hot_tier(SyncPhase::Latest).await { + error!("Error in hot tier latest scheduler: {:?}", err); + } + }); + + let historic_interval = + Interval::Minutes(PARSEABLE.options.hot_tier_historic_sync_minutes.max(1)); + scheduler + .every(historic_interval) + .plus(Interval::Seconds(15)) + .run(move || async { + if let Err(err) = self.sync_hot_tier(SyncPhase::Historic).await { + error!("Error in hot tier historic scheduler: {:?}", err); } }); @@ -417,7 +440,7 @@ impl HotTierManager { } /// sync the hot tier files from S3 to the hot tier directory for all streams - async fn sync_hot_tier(&self) -> Result<(), HotTierError> { + async fn sync_hot_tier(&self, phase: SyncPhase) -> Result<(), HotTierError> { // Before syncing, check if pstats stream was created and needs hot tier if let Err(e) = self.create_pstats_hot_tier().await { tracing::trace!("Skipping pstats hot tier creation because of error: {e}"); @@ -432,7 +455,11 @@ impl HotTierManager { for tenant_id in tenants { for stream in PARSEABLE.streams.list(&tenant_id) { if self.check_stream_hot_tier_exists(&stream, &tenant_id) { - sync_hot_tier_tasks.push(self.process_stream(stream, tenant_id.to_owned())); + sync_hot_tier_tasks.push(self.process_stream( + stream, + tenant_id.to_owned(), + phase, + )); } } } @@ -452,6 +479,7 @@ impl HotTierManager { &self, stream: String, tenant_id: Option, + phase: SyncPhase, ) -> Result<(), HotTierError> { let mut s3_manifest_file_list = PARSEABLE .metastore @@ -463,7 +491,7 @@ impl HotTierManager { ))) })?; - self.process_manifest(&stream, &mut s3_manifest_file_list, &tenant_id) + self.process_manifest(&stream, &mut s3_manifest_file_list, &tenant_id, phase) .await?; Ok(()) @@ -478,11 +506,15 @@ impl HotTierManager { stream: &str, manifest_files_to_download: &mut BTreeMap>, tenant_id: &Option, + phase: SyncPhase, ) -> Result<(), HotTierError> { if manifest_files_to_download.is_empty() { return Ok(()); } + let latest_minutes = PARSEABLE.options.hot_tier_latest_minutes.max(0); + let cutoff = chrono::Utc::now().naive_utc() - chrono::Duration::minutes(latest_minutes); + // Build flat work list: (date, file, local_path) ordered latest-date-first, // and within a date by descending file_path (matches old `pop()` order). let mut work: Vec<(NaiveDate, File, PathBuf)> = Vec::new(); @@ -503,7 +535,16 @@ impl HotTierManager { combined.files.sort_by_key(|f| f.file_path.clone()); while let Some(parquet_file) = combined.files.pop() { let parquet_path = self.hot_tier_path.join(&parquet_file.file_path); - if !parquet_path.exists() { + if parquet_path.exists() { + continue; + } + let dt = extract_datetime(&parquet_file.file_path); + let is_latest = dt.map(|d| d >= cutoff).unwrap_or(false); + let keep = match phase { + SyncPhase::Latest => is_latest, + SyncPhase::Historic => !is_latest, + }; + if keep { work.push((date, parquet_file, parquet_path)); } } @@ -544,6 +585,7 @@ impl HotTierManager { date, &tenant_id, &state, + phase, ) .await?; if !processed { @@ -567,6 +609,7 @@ impl HotTierManager { /// then commit usage + per-date manifest under the lock again. /// Returns false when no budget is available (caller should stop scheduling /// further work for this stream). + #[allow(clippy::too_many_arguments)] async fn process_parquet_file_concurrent( &self, stream: &str, @@ -575,23 +618,34 @@ impl HotTierManager { date: NaiveDate, tenant_id: &Option, state: &Arc, + phase: SyncPhase, ) -> Result { // RESERVE { let mut sht = state.sht.lock().await; - if (!self.is_disk_available(parquet_file.file_size).await? - || sht.available_size <= parquet_file.file_size) - && !self - .cleanup_hot_tier_old_data( - stream, - &mut sht, - &parquet_path, - parquet_file.file_size, - tenant_id, - ) - .await? + if !self.is_disk_available(parquet_file.file_size).await? + || sht.available_size <= parquet_file.file_size { - return Ok(false); + match phase { + SyncPhase::Latest => { + if !self + .cleanup_hot_tier_old_data( + stream, + &mut sht, + &parquet_path, + parquet_file.file_size, + tenant_id, + ) + .await? + { + return Ok(false); + } + } + SyncPhase::Historic => { + // Historic never evicts; just skip when full. + return Ok(false); + } + } } if sht.available_size <= parquet_file.file_size { return Ok(false); From 60b889470a27f4fa0c50b6df460f42ea136f3537 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 6 May 2026 19:53:32 +0530 Subject: [PATCH 09/13] add logs --- src/hottier.rs | 212 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 205 insertions(+), 7 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index 72eacc750..ef69799d2 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -139,9 +139,13 @@ impl HotTierManager { stream: &str, tenant_id: &Option, ) -> Result { + warn!(stream = %stream, tenant = ?tenant_id, "reconcile starting"); let mut sht = self.get_hot_tier(stream, tenant_id).await?; let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; let mut total_used: u64 = 0; + let mut partials_removed = 0usize; + let mut entries_dropped = 0usize; + let mut orphans_removed = 0usize; for date in dates { let date_dir = self.get_stream_path_for_date(stream, &date, tenant_id); @@ -160,6 +164,13 @@ impl HotTierManager { let name = name_os.to_string_lossy(); if name.ends_with(".partial") { let _ = fs::remove_file(&p).await; + partials_removed += 1; + warn!( + stream = %stream, + tenant = ?tenant_id, + path = %p.display(), + "reconcile: deleted partial orphan" + ); continue; } if name.ends_with(".manifest.json") { @@ -197,6 +208,13 @@ impl HotTierManager { kept.push(f); } else { let _ = fs::remove_file(&local).await; + entries_dropped += 1; + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %f.file_path, + "reconcile: dropped manifest entry (file missing or wrong size)" + ); } } kept.sort_by_key(|f| f.file_path.clone()); @@ -211,12 +229,29 @@ impl HotTierManager { for name in on_disk.difference(&keep_names) { let p = date_dir.join(name); let _ = fs::remove_file(&p).await; + orphans_removed += 1; + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %p.display(), + "reconcile: deleted orphan parquet not in manifest" + ); } } sht.used_size = total_used; sht.available_size = sht.size.saturating_sub(total_used); self.put_hot_tier(stream, &mut sht, tenant_id).await?; + warn!( + stream = %stream, + tenant = ?tenant_id, + partials_removed, + entries_dropped, + orphans_removed, + used = sht.used_size, + available = sht.available_size, + "reconcile done" + ); Ok(sht) } @@ -409,22 +444,30 @@ impl HotTierManager { where 'a: 'static, { + let latest_min = PARSEABLE.options.hot_tier_latest_minutes; + let historic_min = PARSEABLE.options.hot_tier_historic_sync_minutes.max(1); + warn!( + latest_minutes = latest_min, + historic_sync_minutes = historic_min, + "hot tier scheduler starting" + ); + let mut scheduler = AsyncScheduler::new(); scheduler .every(HOT_TIER_SYNC_DURATION) .plus(Interval::Seconds(5)) .run(move || async { + warn!(phase = ?SyncPhase::Latest, "hot tier tick fired"); if let Err(err) = self.sync_hot_tier(SyncPhase::Latest).await { error!("Error in hot tier latest scheduler: {:?}", err); } }); - let historic_interval = - Interval::Minutes(PARSEABLE.options.hot_tier_historic_sync_minutes.max(1)); scheduler - .every(historic_interval) + .every(Interval::Minutes(historic_min)) .plus(Interval::Seconds(15)) .run(move || async { + warn!(phase = ?SyncPhase::Historic, "hot tier tick fired"); if let Err(err) = self.sync_hot_tier(SyncPhase::Historic).await { error!("Error in hot tier historic scheduler: {:?}", err); } @@ -452,6 +495,7 @@ impl HotTierManager { } else { vec![None] }; + let mut scheduled = 0usize; for tenant_id in tenants { for stream in PARSEABLE.streams.list(&tenant_id) { if self.check_stream_hot_tier_exists(&stream, &tenant_id) { @@ -460,9 +504,11 @@ impl HotTierManager { tenant_id.to_owned(), phase, )); + scheduled += 1; } } } + warn!(phase = ?phase, num_streams = scheduled, "hot tier tick: streams scheduled"); while let Some(res) = sync_hot_tier_tasks.next().await { if let Err(err) = res { @@ -470,6 +516,7 @@ impl HotTierManager { return Err(err); } } + warn!(phase = ?phase, "hot tier tick complete"); Ok(()) } @@ -481,6 +528,12 @@ impl HotTierManager { tenant_id: Option, phase: SyncPhase, ) -> Result<(), HotTierError> { + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "fetching manifest from metastore" + ); let mut s3_manifest_file_list = PARSEABLE .metastore .get_all_manifest_files(&stream, &tenant_id) @@ -490,10 +543,24 @@ impl HotTierManager { e.to_detail(), ))) })?; + let date_count = s3_manifest_file_list.len(); + let total_files: usize = s3_manifest_file_list + .values() + .map(|m| m.iter().map(|x| x.files.len()).sum::()) + .sum(); + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + dates = date_count, + files = total_files, + "manifest fetched" + ); self.process_manifest(&stream, &mut s3_manifest_file_list, &tenant_id, phase) .await?; + warn!(stream = %stream, tenant = ?tenant_id, phase = ?phase, "stream sync done"); Ok(()) } @@ -550,9 +617,25 @@ impl HotTierManager { } } + let work_count = work.len(); + let total_bytes: u64 = work.iter().map(|(_, f, _)| f.file_size).sum(); if work.is_empty() { + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "no files to download this tick" + ); return Ok(()); } + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + work_count, + total_bytes, + "work list built" + ); let state = self.get_or_load_state(stream, tenant_id).await?; let concurrency = PARSEABLE @@ -588,9 +671,15 @@ impl HotTierManager { phase, ) .await?; - if !processed { - stop.store(true, std::sync::atomic::Ordering::Relaxed); - } + if !processed + && !stop.swap(true, std::sync::atomic::Ordering::Relaxed) { + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "sticky stop: halting further reservations this tick" + ); + } Ok(()) } }) @@ -623,11 +712,29 @@ impl HotTierManager { // RESERVE { let mut sht = state.sht.lock().await; + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + used = sht.used_size, + "reserving" + ); if !self.is_disk_available(parquet_file.file_size).await? || sht.available_size <= parquet_file.file_size { match phase { SyncPhase::Latest => { + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "tight on space; triggering eviction" + ); if !self .cleanup_hot_tier_old_data( stream, @@ -638,25 +745,62 @@ impl HotTierManager { ) .await? { + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + "eviction freed nothing, skipping file" + ); return Ok(false); } } SyncPhase::Historic => { - // Historic never evicts; just skip when full. + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "historic phase: full, skipping file" + ); return Ok(false); } } } if sht.available_size <= parquet_file.file_size { + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "still no space after eviction, skipping" + ); return Ok(false); } sht.available_size -= parquet_file.file_size; self.put_hot_tier(stream, &mut sht, tenant_id).await?; + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + deducted = parquet_file.file_size, + new_available = sht.available_size, + "reserved" + ); } // DOWNLOAD (no lock held) let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + "download starting" + ); let download_result = PARSEABLE .storage .get_object_store() @@ -664,6 +808,13 @@ impl HotTierManager { .await; if let Err(e) = download_result { + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + err = %e, + "download failed, refunding reservation" + ); // refund reservation let mut sht = state.sht.lock().await; sht.available_size += parquet_file.file_size; @@ -673,6 +824,13 @@ impl HotTierManager { // backend already cleaned up its `.partial` file; final path was never created. return Err(e.into()); } + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + "download finished, committing" + ); // COMMIT { @@ -693,6 +851,14 @@ impl HotTierManager { .join("hottier.manifest.json"); fs::create_dir_all(manifest_path.parent().unwrap()).await?; fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; + warn!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + used = sht.used_size, + available = sht.available_size, + "committed" + ); } Ok(true) @@ -872,7 +1038,15 @@ impl HotTierManager { parquet_file_size: u64, tenant_id: &Option, ) -> Result { + warn!( + stream = %stream, + tenant = ?tenant_id, + target_size = parquet_file_size, + available = stream_hot_tier.available_size, + "eviction starting" + ); let mut delete_successful = false; + let mut freed_total: u64 = 0; let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; 'loop_dates: for date in dates { let path = self.get_stream_path_for_date(stream, &date, tenant_id); @@ -905,6 +1079,13 @@ impl HotTierManager { extract_datetime(path_to_delete.to_str().unwrap()), ) && download_date_time <= delete_date_time { + warn!( + stream = %stream, + tenant = ?tenant_id, + candidate = %file_to_delete.file_path, + target = %download_file_path.display(), + "skip evict: candidate newer than target" + ); delete_successful = false; break 'loop_files; } @@ -922,6 +1103,16 @@ impl HotTierManager { self.put_hot_tier(stream, stream_hot_tier, tenant_id) .await?; delete_successful = true; + freed_total += file_size; + warn!( + stream = %stream, + tenant = ?tenant_id, + evicted_file = %file_to_delete.file_path, + evicted_size = file_size, + freed_total, + new_available = stream_hot_tier.available_size, + "evicted" + ); if stream_hot_tier.available_size <= parquet_file_size { continue 'loop_files; @@ -935,6 +1126,13 @@ impl HotTierManager { } } + warn!( + stream = %stream, + tenant = ?tenant_id, + freed_total, + success = delete_successful, + "eviction complete" + ); Ok(delete_successful) } From b5a348eedd0815856c40f0e776d4e09595314ea6 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Wed, 6 May 2026 23:43:17 +0530 Subject: [PATCH 10/13] two loops instead of clockwerk --- src/hottier.rs | 88 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 35 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index ef69799d2..1a8385dc7 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -34,7 +34,6 @@ use crate::{ validator::error::HotTierValidationError, }; use chrono::NaiveDate; -use clokwerk::{AsyncScheduler, Interval, Job}; use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use futures_util::TryFutureExt; use object_store::{ObjectStoreExt, local::LocalFileSystem}; @@ -49,7 +48,6 @@ use tracing::{error, warn}; pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB -const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB pub const CURRENT_HOT_TIER_VERSION: &str = "v2"; @@ -436,10 +434,11 @@ impl HotTierManager { Ok(path) } - /// schedule two hot-tier sync jobs: a frequent "latest" pass for files - /// within the last `P_HOT_TIER_LATEST_MINUTES`, and a less-frequent - /// "historic" pass for older files. Both share the per-stream lock; only - /// the latest pass triggers eviction. + /// Spawn two independent loops, one per phase, so Latest and Historic + /// tick clocks are independent. A long-running Historic backfill no + /// longer delays Latest ticks. Within each phase, the next iteration + /// starts only after the previous returns + a sleep, so a phase can + /// never overlap itself. pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where 'a: 'static, @@ -452,31 +451,28 @@ impl HotTierManager { "hot tier scheduler starting" ); - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(HOT_TIER_SYNC_DURATION) - .plus(Interval::Seconds(5)) - .run(move || async { + let latest_interval = Duration::from_secs(60); + let historic_interval = Duration::from_secs(historic_min as u64 * 60); + + let this = self; + tokio::spawn(async move { + loop { warn!(phase = ?SyncPhase::Latest, "hot tier tick fired"); - if let Err(err) = self.sync_hot_tier(SyncPhase::Latest).await { + if let Err(err) = this.sync_hot_tier(SyncPhase::Latest).await { error!("Error in hot tier latest scheduler: {:?}", err); } - }); + tokio::time::sleep(latest_interval).await; + } + }); - scheduler - .every(Interval::Minutes(historic_min)) - .plus(Interval::Seconds(15)) - .run(move || async { + let this = self; + tokio::spawn(async move { + loop { warn!(phase = ?SyncPhase::Historic, "hot tier tick fired"); - if let Err(err) = self.sync_hot_tier(SyncPhase::Historic).await { + if let Err(err) = this.sync_hot_tier(SyncPhase::Historic).await { error!("Error in hot tier historic scheduler: {:?}", err); } - }); - - tokio::spawn(async move { - loop { - scheduler.run_pending().await; - tokio::time::sleep(Duration::from_secs(10)).await; + tokio::time::sleep(historic_interval).await; } }); Ok(()) @@ -510,13 +506,18 @@ impl HotTierManager { } warn!(phase = ?phase, num_streams = scheduled, "hot tier tick: streams scheduled"); + let tick_start = std::time::Instant::now(); while let Some(res) = sync_hot_tier_tasks.next().await { if let Err(err) = res { error!("Failed to run hot tier sync task {err:?}"); return Err(err); } } - warn!(phase = ?phase, "hot tier tick complete"); + warn!( + phase = ?phase, + elapsed_ms = tick_start.elapsed().as_millis() as u64, + "hot tier tick complete" + ); Ok(()) } @@ -557,10 +558,17 @@ impl HotTierManager { "manifest fetched" ); + let stream_start = std::time::Instant::now(); self.process_manifest(&stream, &mut s3_manifest_file_list, &tenant_id, phase) .await?; - warn!(stream = %stream, tenant = ?tenant_id, phase = ?phase, "stream sync done"); + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + elapsed_ms = stream_start.elapsed().as_millis() as u64, + "stream sync done" + ); Ok(()) } @@ -671,15 +679,14 @@ impl HotTierManager { phase, ) .await?; - if !processed - && !stop.swap(true, std::sync::atomic::Ordering::Relaxed) { - warn!( - stream = %stream, - tenant = ?tenant_id, - phase = ?phase, - "sticky stop: halting further reservations this tick" - ); - } + if !processed && !stop.swap(true, std::sync::atomic::Ordering::Relaxed) { + warn!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "sticky stop: halting further reservations this tick" + ); + } Ok(()) } }) @@ -801,17 +808,20 @@ impl HotTierManager { file_size = parquet_file.file_size, "download starting" ); + let dl_start = std::time::Instant::now(); let download_result = PARSEABLE .storage .get_object_store() .buffered_write(&parquet_file_path, tenant_id, parquet_path.clone()) .await; + let dl_elapsed = dl_start.elapsed(); if let Err(e) = download_result { warn!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, + elapsed_ms = dl_elapsed.as_millis() as u64, err = %e, "download failed, refunding reservation" ); @@ -824,11 +834,19 @@ impl HotTierManager { // backend already cleaned up its `.partial` file; final path was never created. return Err(e.into()); } + let elapsed_ms = dl_elapsed.as_millis() as u64; + let mbps = if dl_elapsed.as_secs_f64() > 0.0 { + (parquet_file.file_size as f64 * 8.0) / dl_elapsed.as_secs_f64() / 1_000_000.0 + } else { + 0.0 + }; warn!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, file_size = parquet_file.file_size, + elapsed_ms, + mbps = format!("{mbps:.1}"), "download finished, committing" ); From 2b2fe6196b5b713146b6668ccf83792d59dc4855 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Thu, 7 May 2026 15:34:04 +0530 Subject: [PATCH 11/13] per-stream hottier tasks --- src/handlers/http/logstream.rs | 3 + src/hottier.rs | 142 ++++++++++++++++++++------------- 2 files changed, 89 insertions(+), 56 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 86f83329c..1abe4e075 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -455,6 +455,9 @@ pub async fn put_stream_hot_tier( hot_tier_manager .put_hot_tier(&stream_name, &mut hottier, &tenant_id) .await?; + hot_tier_manager + .spawn_stream_tasks(stream_name.clone(), tenant_id.clone()) + .await; let mut stream_metadata: ObjectStoreFormat = serde_json::from_slice( &PARSEABLE diff --git a/src/hottier.rs b/src/hottier.rs index 1a8385dc7..9c7e0f6ed 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -81,10 +81,16 @@ enum SyncPhase { type StreamKey = (Option, String); +struct StreamTasks { + latest: tokio::task::JoinHandle<()>, + historic: tokio::task::JoinHandle<()>, +} + pub struct HotTierManager { filesystem: LocalFileSystem, hot_tier_path: &'static Path, state_cache: AsyncRwLock>>, + tasks: AsyncRwLock>, } impl HotTierManager { @@ -94,6 +100,7 @@ impl HotTierManager { filesystem: LocalFileSystem::new(), hot_tier_path, state_cache: AsyncRwLock::new(HashMap::new()), + tasks: AsyncRwLock::new(HashMap::new()), } } @@ -388,6 +395,9 @@ impl HotTierManager { if !self.check_stream_hot_tier_exists(stream, tenant_id) { return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } + // Stop loops before tearing down the directory so no in-flight tick + // re-creates files mid-delete. + self.abort_stream_tasks(stream, tenant_id).await; let path = if let Some(tenant_id) = tenant_id.as_ref() { self.hot_tier_path.join(tenant_id).join(stream) } else { @@ -434,11 +444,9 @@ impl HotTierManager { Ok(path) } - /// Spawn two independent loops, one per phase, so Latest and Historic - /// tick clocks are independent. A long-running Historic backfill no - /// longer delays Latest ticks. Within each phase, the next iteration - /// starts only after the previous returns + a sleep, so a phase can - /// never overlap itself. + /// Discover hot-tier-enabled streams at boot and spawn a per-stream pair + /// of (Latest, Historic) loops for each. New streams added later acquire + /// their own loops via `spawn_stream_tasks` from the PUT hot-tier handler. pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where 'a: 'static, @@ -451,74 +459,96 @@ impl HotTierManager { "hot tier scheduler starting" ); + let this: &'static HotTierManager = self; + tokio::spawn(async move { + // pstats hot tier may need to be created on boot before any tasks + // can pick it up. + if let Err(e) = this.create_pstats_hot_tier().await { + tracing::error!("Skipping pstats hot tier creation because of error: {e}"); + } + let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { + tenants.into_iter().map(Some).collect::>() + } else { + vec![None] + }; + for tenant_id in tenants { + for stream in PARSEABLE.streams.list(&tenant_id) { + if this.check_stream_hot_tier_exists(&stream, &tenant_id) { + this.spawn_stream_tasks(stream, tenant_id.clone()).await; + } + } + } + }); + Ok(()) + } + + /// Spawn (Latest, Historic) loops for a single stream. Idempotent: + /// if tasks already exist for this (tenant, stream), no-op. + pub async fn spawn_stream_tasks(&'static self, stream: String, tenant_id: Option) { + let key: StreamKey = (tenant_id.clone(), stream.clone()); + { + let tasks = self.tasks.read().await; + if let Some(existing) = tasks.get(&key) + && !existing.latest.is_finished() + && !existing.historic.is_finished() + { + return; + } + } + let latest_interval = Duration::from_secs(60); - let historic_interval = Duration::from_secs(historic_min as u64 * 60); + let historic_interval = Duration::from_secs( + PARSEABLE.options.hot_tier_historic_sync_minutes.max(1) as u64 * 60, + ); - let this = self; - tokio::spawn(async move { + warn!(stream = %stream, tenant = ?tenant_id, "spawning per-stream hot tier tasks"); + + let s = stream.clone(); + let t = tenant_id.clone(); + let latest = tokio::spawn(async move { loop { - warn!(phase = ?SyncPhase::Latest, "hot tier tick fired"); - if let Err(err) = this.sync_hot_tier(SyncPhase::Latest).await { - error!("Error in hot tier latest scheduler: {:?}", err); + warn!(stream = %s, tenant = ?t, phase = ?SyncPhase::Latest, "stream tick fired"); + if let Err(err) = self + .process_stream(s.clone(), t.clone(), SyncPhase::Latest) + .await + { + error!(stream = %s, "latest sync error: {err:?}"); } tokio::time::sleep(latest_interval).await; } }); - let this = self; - tokio::spawn(async move { + let s = stream.clone(); + let t = tenant_id.clone(); + let historic = tokio::spawn(async move { loop { - warn!(phase = ?SyncPhase::Historic, "hot tier tick fired"); - if let Err(err) = this.sync_hot_tier(SyncPhase::Historic).await { - error!("Error in hot tier historic scheduler: {:?}", err); + warn!(stream = %s, tenant = ?t, phase = ?SyncPhase::Historic, "stream tick fired"); + if let Err(err) = self + .process_stream(s.clone(), t.clone(), SyncPhase::Historic) + .await + { + error!(stream = %s, "historic sync error: {err:?}"); } tokio::time::sleep(historic_interval).await; } }); - Ok(()) - } - /// sync the hot tier files from S3 to the hot tier directory for all streams - async fn sync_hot_tier(&self, phase: SyncPhase) -> Result<(), HotTierError> { - // Before syncing, check if pstats stream was created and needs hot tier - if let Err(e) = self.create_pstats_hot_tier().await { - tracing::trace!("Skipping pstats hot tier creation because of error: {e}"); + let mut tasks = self.tasks.write().await; + if let Some(old) = tasks.insert(key, StreamTasks { latest, historic }) { + old.latest.abort(); + old.historic.abort(); } + } - let mut sync_hot_tier_tasks = FuturesUnordered::new(); - let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { - tenants.into_iter().map(Some).collect() - } else { - vec![None] - }; - let mut scheduled = 0usize; - for tenant_id in tenants { - for stream in PARSEABLE.streams.list(&tenant_id) { - if self.check_stream_hot_tier_exists(&stream, &tenant_id) { - sync_hot_tier_tasks.push(self.process_stream( - stream, - tenant_id.to_owned(), - phase, - )); - scheduled += 1; - } - } - } - warn!(phase = ?phase, num_streams = scheduled, "hot tier tick: streams scheduled"); - - let tick_start = std::time::Instant::now(); - while let Some(res) = sync_hot_tier_tasks.next().await { - if let Err(err) = res { - error!("Failed to run hot tier sync task {err:?}"); - return Err(err); - } + /// Abort and remove per-stream tasks. Caller must ensure no further work + /// will be enqueued for the stream after this returns. + async fn abort_stream_tasks(&self, stream: &str, tenant_id: &Option) { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + if let Some(t) = self.tasks.write().await.remove(&key) { + t.latest.abort(); + t.historic.abort(); + warn!(stream = %stream, tenant = ?tenant_id, "aborted per-stream hot tier tasks"); } - warn!( - phase = ?phase, - elapsed_ms = tick_start.elapsed().as_millis() as u64, - "hot tier tick complete" - ); - Ok(()) } /// process the hot tier files for the stream From 43e437c704ad8599de6ebf4101197415fcb8a2c2 Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Fri, 8 May 2026 12:54:48 +0530 Subject: [PATCH 12/13] fix: deepsource, coderabbit suggestions --- src/handlers/http/logstream.rs | 7 +- src/hottier.rs | 241 +++++++++++++++++++-------------- src/storage/azure_blob.rs | 48 ++++--- src/storage/s3.rs | 5 +- 4 files changed, 171 insertions(+), 130 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 1abe4e075..74af54d98 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -455,9 +455,6 @@ pub async fn put_stream_hot_tier( hot_tier_manager .put_hot_tier(&stream_name, &mut hottier, &tenant_id) .await?; - hot_tier_manager - .spawn_stream_tasks(stream_name.clone(), tenant_id.clone()) - .await; let mut stream_metadata: ObjectStoreFormat = serde_json::from_slice( &PARSEABLE @@ -472,7 +469,9 @@ pub async fn put_stream_hot_tier( .metastore .put_stream_json(&stream_metadata, &stream_name, &tenant_id) .await?; - + hot_tier_manager + .spawn_stream_tasks(stream_name.clone(), tenant_id.clone()) + .await; Ok(( format!("hot tier set for stream {stream_name}"), StatusCode::OK, diff --git a/src/hottier.rs b/src/hottier.rs index 9c7e0f6ed..f9dbafb43 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -16,6 +16,7 @@ * */ +use datafusion::common::HashSet; use std::{ collections::{BTreeMap, HashMap}, io, @@ -44,7 +45,7 @@ use std::time::Duration; use sysinfo::Disks; use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; -use tracing::{error, warn}; +use tracing::{error, info}; pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB @@ -144,7 +145,7 @@ impl HotTierManager { stream: &str, tenant_id: &Option, ) -> Result { - warn!(stream = %stream, tenant = ?tenant_id, "reconcile starting"); + info!(stream = %stream, tenant = ?tenant_id, "reconcile starting"); let mut sht = self.get_hot_tier(stream, tenant_id).await?; let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; let mut total_used: u64 = 0; @@ -158,84 +159,36 @@ impl HotTierManager { continue; } + let mut on_disk: HashSet = HashSet::new(); + // Pass 1: collect on-disk parquet files (drop .partial orphans). - let mut on_disk: std::collections::HashSet = std::collections::HashSet::new(); - let mut entries = fs::read_dir(&date_dir).await?; - while let Some(entry) = entries.next_entry().await? { - let p = entry.path(); - let Some(name_os) = p.file_name() else { - continue; - }; - let name = name_os.to_string_lossy(); - if name.ends_with(".partial") { - let _ = fs::remove_file(&p).await; - partials_removed += 1; - warn!( - stream = %stream, - tenant = ?tenant_id, - path = %p.display(), - "reconcile: deleted partial orphan" - ); - continue; - } - if name.ends_with(".manifest.json") { - continue; - } - if !p.is_file() { - continue; - } - on_disk.insert(name.into_owned()); - } + self.drop_partials( + &mut on_disk, + &date_dir, + &mut partials_removed, + stream, + tenant_id, + ) + .await?; // Pass 2: clean manifest of stale entries. - let manifest_path = date_dir.join("hottier.manifest.json"); - let mut manifest: Manifest = if manifest_path.exists() { - let bytes = fs::read(&manifest_path).await?; - serde_json::from_slice(&bytes).unwrap_or_default() - } else { - Manifest::default() - }; - - let mut kept = Vec::with_capacity(manifest.files.len()); - let mut keep_names: std::collections::HashSet = - std::collections::HashSet::new(); - for f in manifest.files.drain(..) { - let local = self.hot_tier_path.join(&f.file_path); - let ok = match fs::metadata(&local).await { - Ok(m) => m.len() == f.file_size, - Err(_) => false, - }; - if ok { - if let Some(name) = local.file_name().and_then(|s| s.to_str()) { - keep_names.insert(name.to_owned()); - } - total_used += f.file_size; - kept.push(f); - } else { - let _ = fs::remove_file(&local).await; - entries_dropped += 1; - warn!( - stream = %stream, - tenant = ?tenant_id, - file = %f.file_path, - "reconcile: dropped manifest entry (file missing or wrong size)" - ); - } - } - kept.sort_by_key(|f| f.file_path.clone()); - manifest.files = kept; - - if manifest_path.exists() || !manifest.files.is_empty() { - fs::create_dir_all(&date_dir).await?; - fs::write(&manifest_path, serde_json::to_vec(&manifest)?).await?; - } + let mut keep_names: HashSet = HashSet::new(); + self.clean_manifest( + &mut keep_names, + &date_dir, + &mut total_used, + &mut entries_dropped, + stream, + tenant_id, + ) + .await?; // Pass 3: delete on-disk parquet files not referenced by the cleaned manifest. for name in on_disk.difference(&keep_names) { let p = date_dir.join(name); let _ = fs::remove_file(&p).await; orphans_removed += 1; - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %p.display(), @@ -247,7 +200,7 @@ impl HotTierManager { sht.used_size = total_used; sht.available_size = sht.size.saturating_sub(total_used); self.put_hot_tier(stream, &mut sht, tenant_id).await?; - warn!( + info!( stream = %stream, tenant = ?tenant_id, partials_removed, @@ -260,6 +213,94 @@ impl HotTierManager { Ok(sht) } + async fn drop_partials( + &self, + on_disk: &mut HashSet, + date_dir: &PathBuf, + partials_removed: &mut usize, + stream: &str, + tenant_id: &Option, + ) -> Result<(), HotTierError> { + let mut entries = fs::read_dir(date_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let p = entry.path(); + let Some(name_os) = p.file_name() else { + continue; + }; + let name = name_os.to_string_lossy(); + if name.ends_with(".partial") { + let _ = fs::remove_file(&p).await; + *partials_removed += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + path = %p.display(), + "reconcile: deleted partial orphan" + ); + continue; + } + if name.ends_with(".manifest.json") { + continue; + } + if !p.is_file() { + continue; + } + on_disk.insert(name.into_owned()); + } + Ok(()) + } + + async fn clean_manifest( + &self, + keep_names: &mut HashSet, + date_dir: &PathBuf, + total_used: &mut u64, + entries_dropped: &mut usize, + stream: &str, + tenant_id: &Option, + ) -> Result<(), HotTierError> { + let manifest_path = date_dir.join("hottier.manifest.json"); + let mut manifest: Manifest = if manifest_path.exists() { + let bytes = fs::read(&manifest_path).await?; + serde_json::from_slice(&bytes).unwrap_or_default() + } else { + Manifest::default() + }; + + let mut kept = Vec::with_capacity(manifest.files.len()); + for f in manifest.files.drain(..) { + let local = self.hot_tier_path.join(&f.file_path); + let ok = match fs::metadata(&local).await { + Ok(m) => m.len() == f.file_size, + Err(_) => false, + }; + if ok { + if let Some(name) = local.file_name().and_then(|s| s.to_str()) { + keep_names.insert(name.to_owned()); + } + *total_used += f.file_size; + kept.push(f); + } else { + let _ = fs::remove_file(&local).await; + *entries_dropped += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %f.file_path, + "reconcile: dropped manifest entry (file missing or wrong size)" + ); + } + } + kept.sort_by_key(|f| f.file_path.clone()); + manifest.files = kept; + + if manifest_path.exists() || !manifest.files.is_empty() { + fs::create_dir_all(&date_dir).await?; + fs::write(&manifest_path, serde_json::to_vec(&manifest)?).await?; + } + Ok(()) + } + /// Get a global pub fn global() -> Option<&'static HotTierManager> { static INSTANCE: OnceCell = OnceCell::new(); @@ -453,7 +494,7 @@ impl HotTierManager { { let latest_min = PARSEABLE.options.hot_tier_latest_minutes; let historic_min = PARSEABLE.options.hot_tier_historic_sync_minutes.max(1); - warn!( + info!( latest_minutes = latest_min, historic_sync_minutes = historic_min, "hot tier scheduler starting" @@ -501,13 +542,13 @@ impl HotTierManager { PARSEABLE.options.hot_tier_historic_sync_minutes.max(1) as u64 * 60, ); - warn!(stream = %stream, tenant = ?tenant_id, "spawning per-stream hot tier tasks"); + info!(stream = %stream, tenant = ?tenant_id, "spawning per-stream hot tier tasks"); let s = stream.clone(); let t = tenant_id.clone(); let latest = tokio::spawn(async move { loop { - warn!(stream = %s, tenant = ?t, phase = ?SyncPhase::Latest, "stream tick fired"); + info!(stream = %s, tenant = ?t, phase = ?SyncPhase::Latest, "stream tick fired"); if let Err(err) = self .process_stream(s.clone(), t.clone(), SyncPhase::Latest) .await @@ -522,7 +563,7 @@ impl HotTierManager { let t = tenant_id.clone(); let historic = tokio::spawn(async move { loop { - warn!(stream = %s, tenant = ?t, phase = ?SyncPhase::Historic, "stream tick fired"); + info!(stream = %s, tenant = ?t, phase = ?SyncPhase::Historic, "stream tick fired"); if let Err(err) = self .process_stream(s.clone(), t.clone(), SyncPhase::Historic) .await @@ -547,7 +588,7 @@ impl HotTierManager { if let Some(t) = self.tasks.write().await.remove(&key) { t.latest.abort(); t.historic.abort(); - warn!(stream = %stream, tenant = ?tenant_id, "aborted per-stream hot tier tasks"); + info!(stream = %stream, tenant = ?tenant_id, "aborted per-stream hot tier tasks"); } } @@ -559,7 +600,7 @@ impl HotTierManager { tenant_id: Option, phase: SyncPhase, ) -> Result<(), HotTierError> { - warn!( + info!( stream = %stream, tenant = ?tenant_id, phase = ?phase, @@ -579,7 +620,7 @@ impl HotTierManager { .values() .map(|m| m.iter().map(|x| x.files.len()).sum::()) .sum(); - warn!( + info!( stream = %stream, tenant = ?tenant_id, phase = ?phase, @@ -592,7 +633,7 @@ impl HotTierManager { self.process_manifest(&stream, &mut s3_manifest_file_list, &tenant_id, phase) .await?; - warn!( + info!( stream = %stream, tenant = ?tenant_id, phase = ?phase, @@ -628,7 +669,7 @@ impl HotTierManager { match NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") { Ok(d) => d, Err(_) => { - warn!("Invalid date format: {}", str_date); + info!("Invalid date format: {}", str_date); continue; } }; @@ -658,7 +699,7 @@ impl HotTierManager { let work_count = work.len(); let total_bytes: u64 = work.iter().map(|(_, f, _)| f.file_size).sum(); if work.is_empty() { - warn!( + info!( stream = %stream, tenant = ?tenant_id, phase = ?phase, @@ -666,7 +707,7 @@ impl HotTierManager { ); return Ok(()); } - warn!( + info!( stream = %stream, tenant = ?tenant_id, phase = ?phase, @@ -710,7 +751,7 @@ impl HotTierManager { ) .await?; if !processed && !stop.swap(true, std::sync::atomic::Ordering::Relaxed) { - warn!( + info!( stream = %stream, tenant = ?tenant_id, phase = ?phase, @@ -749,7 +790,7 @@ impl HotTierManager { // RESERVE { let mut sht = state.sht.lock().await; - warn!( + info!( stream = %stream, tenant = ?tenant_id, phase = ?phase, @@ -760,11 +801,11 @@ impl HotTierManager { "reserving" ); if !self.is_disk_available(parquet_file.file_size).await? - || sht.available_size <= parquet_file.file_size + || sht.available_size < parquet_file.file_size { match phase { SyncPhase::Latest => { - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -782,7 +823,7 @@ impl HotTierManager { ) .await? { - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -793,7 +834,7 @@ impl HotTierManager { } } SyncPhase::Historic => { - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -805,8 +846,8 @@ impl HotTierManager { } } } - if sht.available_size <= parquet_file.file_size { - warn!( + if sht.available_size < parquet_file.file_size { + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -818,7 +859,7 @@ impl HotTierManager { } sht.available_size -= parquet_file.file_size; self.put_hot_tier(stream, &mut sht, tenant_id).await?; - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -831,7 +872,7 @@ impl HotTierManager { // DOWNLOAD (no lock held) let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -847,7 +888,7 @@ impl HotTierManager { let dl_elapsed = dl_start.elapsed(); if let Err(e) = download_result { - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -870,7 +911,7 @@ impl HotTierManager { } else { 0.0 }; - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -899,7 +940,7 @@ impl HotTierManager { .join("hottier.manifest.json"); fs::create_dir_all(manifest_path.parent().unwrap()).await?; fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; - warn!( + info!( stream = %stream, tenant = ?tenant_id, file = %parquet_file.file_path, @@ -1086,7 +1127,7 @@ impl HotTierManager { parquet_file_size: u64, tenant_id: &Option, ) -> Result { - warn!( + info!( stream = %stream, tenant = ?tenant_id, target_size = parquet_file_size, @@ -1127,7 +1168,7 @@ impl HotTierManager { extract_datetime(path_to_delete.to_str().unwrap()), ) && download_date_time <= delete_date_time { - warn!( + info!( stream = %stream, tenant = ?tenant_id, candidate = %file_to_delete.file_path, @@ -1152,7 +1193,7 @@ impl HotTierManager { .await?; delete_successful = true; freed_total += file_size; - warn!( + info!( stream = %stream, tenant = ?tenant_id, evicted_file = %file_to_delete.file_path, @@ -1174,7 +1215,7 @@ impl HotTierManager { } } - warn!( + info!( stream = %stream, tenant = ?tenant_id, freed_total, diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 52a4668aa..1f81a847c 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -268,32 +268,30 @@ impl BlobStore { let client = self.client.clone(); let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); - let mut handles = Vec::with_capacity(ranges.len()); - for r in ranges { - let client = client.clone(); - let src = src.clone(); - let std_file = std_file.clone(); - let semaphore = semaphore.clone(); - handles.push(tokio::spawn(async move { - let _permit = semaphore - .acquire_owned() + futures::stream::iter(ranges) + .map(|r| { + let client = client.clone(); + let src = src.clone(); + let std_file = std_file.clone(); + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + ObjectStorageError::Custom(format!("semaphore closed: {e}")) + })?; + let bytes = client.get_range(&src, r.clone()).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + use std::os::unix::fs::FileExt; + std_file.write_all_at(&bytes, offset) + }) .await - .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; - let bytes = client.get_range(&src, r.clone()).await?; - let offset = r.start; - tokio::task::spawn_blocking(move || -> std::io::Result<()> { - use std::os::unix::fs::FileExt; - std_file.write_all_at(&bytes, offset) - }) - .await - .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; - Ok::<_, ObjectStorageError>(()) - })); - } - for h in handles { - h.await - .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; - } + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(concurrency) + .try_collect::>() + .await?; let std_file_sync = std_file.clone(); tokio::task::spawn_blocking(move || std_file_sync.sync_all()) diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 1ba95ef76..51f979ff8 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -351,7 +351,10 @@ impl S3 { .await { Ok(()) => { - tokio::fs::rename(&partial, &write_path).await?; + if let Err(e) = tokio::fs::rename(&partial, &write_path).await { + let _ = tokio::fs::remove_file(&partial).await; + return Err(e.into()); + } Ok(()) } Err(e) => { From b185b38c89e20c1d5a6145c5514827f8b347ebbe Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Fri, 8 May 2026 16:18:21 +0530 Subject: [PATCH 13/13] hottier deletion bug --- src/hottier.rs | 132 +++++++++++++++++++++++++++++++------------------ 1 file changed, 85 insertions(+), 47 deletions(-) diff --git a/src/hottier.rs b/src/hottier.rs index f9dbafb43..883f610a5 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -857,7 +857,20 @@ impl HotTierManager { ); return Ok(false); } - sht.available_size -= parquet_file.file_size; + sht.available_size = + if let Some(val) = sht.available_size.checked_sub(parquet_file.file_size) { + val + } else { + tracing::error!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "file_size > sht.available_size, setting available_size to 0 and moving on" + ); + 0 + }; self.put_hot_tier(stream, &mut sht, tenant_id).await?; info!( stream = %stream, @@ -1113,7 +1126,7 @@ impl HotTierManager { path.exists() } - ///delete the parquet file from the hot tier directory for the stream + /// delete entire parquet file minute from the hot tier directory for the stream /// loop through all manifests in the hot tier directory for the stream /// loop through all parquet files in the manifest /// check for the oldest entry to delete if the path exists in hot tier @@ -1155,61 +1168,86 @@ impl HotTierManager { let file = fs::read(manifest_file.path()).await?; let mut manifest: Manifest = serde_json::from_slice(&file)?; - manifest.files.sort_by_key(|file| file.file_path.clone()); - manifest.files.reverse(); - - 'loop_files: while let Some(file_to_delete) = manifest.files.pop() { - let file_size = file_to_delete.file_size; - let path_to_delete = self.hot_tier_path.join(&file_to_delete.file_path); - - if path_to_delete.exists() { - if let (Some(download_date_time), Some(delete_date_time)) = ( - extract_datetime(download_file_path.to_str().unwrap()), - extract_datetime(path_to_delete.to_str().unwrap()), - ) && download_date_time <= delete_date_time - { - info!( - stream = %stream, - tenant = ?tenant_id, - candidate = %file_to_delete.file_path, - target = %download_file_path.display(), - "skip evict: candidate newer than target" - ); - delete_successful = false; - break 'loop_files; - } - - fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + if manifest.files.is_empty() { + continue; + } - fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; - delete_empty_directory_hot_tier( - path_to_delete.parent().unwrap().to_path_buf(), - ) - .await?; + // sort in an ascending manner + // idx0: minute=00 + // idx59: minute=59 + manifest.files.sort_by_key(|file| file.file_path.clone()); - stream_hot_tier.used_size -= file_size; - stream_hot_tier.available_size += file_size; - self.put_hot_tier(stream, stream_hot_tier, tenant_id) - .await?; - delete_successful = true; - freed_total += file_size; + // get first file's parent (/hottier/stream/date=d/hour=h/minute=m) + let first_file = manifest.files.first().unwrap(); + let first_file_path = self.hot_tier_path.join(&first_file.file_path); + let minute_to_delete = first_file_path.parent().unwrap(); + + if minute_to_delete.exists() { + if let (Some(download_date_time), Some(delete_date_time)) = ( + extract_datetime(download_file_path.to_str().unwrap()), + extract_datetime(first_file_path.to_str().unwrap()), + ) && download_date_time <= delete_date_time + { info!( stream = %stream, tenant = ?tenant_id, - evicted_file = %file_to_delete.file_path, - evicted_size = file_size, - freed_total, - new_available = stream_hot_tier.available_size, - "evicted" + candidate = %minute_to_delete.display(), + target = %download_file_path.display(), + "skip evict: candidate newer than target" ); + continue; + } - if stream_hot_tier.available_size <= parquet_file_size { - continue 'loop_files; + let minute_to_delete_owned = minute_to_delete.to_path_buf(); + let mut minute_freed: u64 = 0; + manifest.files.retain(|file| { + let file_path = self.hot_tier_path.join(&file.file_path); + let file_minute = file_path.parent().unwrap(); + if file_minute == minute_to_delete_owned { + minute_freed = minute_freed.saturating_add(file.file_size); + false } else { - break 'loop_dates; + true } + }); + + stream_hot_tier.used_size = stream_hot_tier + .used_size + .checked_sub(minute_freed) + .unwrap_or_else(|| { + tracing::error!( + stream = %stream, + tenant = ?tenant_id, + minute = %minute_to_delete_owned.display(), + minute_freed, + used_size = stream_hot_tier.used_size, + "minute_freed > used_size, clamping used_size to 0" + ); + 0 + }); + stream_hot_tier.available_size = + stream_hot_tier.available_size.saturating_add(minute_freed); + freed_total = freed_total.saturating_add(minute_freed); + + fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + fs::remove_dir_all(&minute_to_delete_owned).await?; + delete_empty_directory_hot_tier(minute_to_delete_owned.clone()).await?; + self.put_hot_tier(stream, stream_hot_tier, tenant_id) + .await?; + delete_successful = true; + info!( + stream = %stream, + tenant = ?tenant_id, + evicted_minute = %minute_to_delete_owned.display(), + evicted_size = minute_freed, + freed_total, + new_available = stream_hot_tier.available_size, + "evicted" + ); + if stream_hot_tier.available_size <= parquet_file_size { + continue; } else { - fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + break 'loop_dates; } } }