diff --git a/src/cli.rs b/src/cli.rs index 969c3a619..6b5e9d7e4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -314,6 +314,46 @@ pub struct Options { )] pub hot_tier_storage_path: Option, + #[arg( + long = "hot-tier-download-chunk-size", + env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE", + default_value = "8388608", + help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)" + )] + pub hot_tier_download_chunk_size: u64, + + #[arg( + long = "hot-tier-download-concurrency", + env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY", + default_value = "16", + help = "Number of concurrent range requests per hot tier download" + )] + pub hot_tier_download_concurrency: usize, + + #[arg( + long = "hot-tier-files-per-stream-concurrency", + env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY", + default_value = "4", + help = "Number of concurrent parquet file downloads per stream during hot tier sync" + )] + pub hot_tier_files_per_stream_concurrency: usize, + + #[arg( + long = "hot-tier-latest-minutes", + env = "P_HOT_TIER_LATEST_MINUTES", + default_value = "10", + help = "Files whose timestamp is within the last N minutes are 'latest'; rest are 'historic'." + )] + pub hot_tier_latest_minutes: i64, + + #[arg( + long = "hot-tier-historic-sync-minutes", + env = "P_HOT_TIER_HISTORIC_SYNC_MINUTES", + default_value = "5", + help = "Interval (minutes) at which the historic hot-tier sync runs." + )] + pub hot_tier_historic_sync_minutes: u32, + //TODO: remove this when smart cache is implemented #[arg( long = "index-storage-path", diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 86f83329c..74af54d98 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -469,7 +469,9 @@ pub async fn put_stream_hot_tier( .metastore .put_stream_json(&stream_metadata, &stream_name, &tenant_id) .await?; - + hot_tier_manager + .spawn_stream_tasks(stream_name.clone(), tenant_id.clone()) + .await; Ok(( format!("hot tier set for stream {stream_name}"), StatusCode::OK, diff --git a/src/hottier.rs b/src/hottier.rs index c472f91c5..883f610a5 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -16,11 +16,14 @@ * */ +use datafusion::common::HashSet; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashMap}, io, path::{Path, PathBuf}, + sync::Arc, }; +use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use crate::{ catalog::manifest::{File, Manifest}, @@ -32,7 +35,6 @@ use crate::{ validator::error::HotTierValidationError, }; use chrono::NaiveDate; -use clokwerk::{AsyncScheduler, Interval, Job}; use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use futures_util::TryFutureExt; use object_store::{ObjectStoreExt, local::LocalFileSystem}; @@ -42,13 +44,11 @@ use relative_path::RelativePathBuf; use std::time::Duration; use sysinfo::Disks; use tokio::fs::{self, DirEntry}; -use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReadDirStream; -use tracing::{error, warn}; +use tracing::{error, info}; pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB -const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB pub const CURRENT_HOT_TIER_VERSION: &str = "v2"; @@ -65,9 +65,33 @@ pub struct StreamHotTier { pub oldest_date_time_entry: Option, } +/// Per-stream in-memory bookkeeping. Mutex protects concurrent reservation, +/// commit, and per-date manifest writes. Downloads run outside the lock. +struct StreamSyncState { + sht: AsyncMutex, +} + +/// Hot-tier sync runs in two phases. Latest pulls files newer than +/// `hot_tier_latest_minutes` ago and may evict historic to make room. +/// Historic pulls older files, runs less often, never triggers eviction. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum SyncPhase { + Latest, + Historic, +} + +type StreamKey = (Option, String); + +struct StreamTasks { + latest: tokio::task::JoinHandle<()>, + historic: tokio::task::JoinHandle<()>, +} + pub struct HotTierManager { filesystem: LocalFileSystem, hot_tier_path: &'static Path, + state_cache: AsyncRwLock>>, + tasks: AsyncRwLock>, } impl HotTierManager { @@ -76,7 +100,205 @@ impl HotTierManager { HotTierManager { filesystem: LocalFileSystem::new(), hot_tier_path, + state_cache: AsyncRwLock::new(HashMap::new()), + tasks: AsyncRwLock::new(HashMap::new()), + } + } + + /// Lazy-load and cache the `StreamHotTier` for a (tenant, stream) pair. + /// All sync-path mutations should acquire `state.sht.lock()`. + async fn get_or_load_state( + &self, + stream: &str, + tenant_id: &Option, + ) -> Result, HotTierError> { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + if let Some(state) = self.state_cache.read().await.get(&key).cloned() { + return Ok(state); + } + let mut cache = self.state_cache.write().await; + if let Some(state) = cache.get(&key).cloned() { + return Ok(state); + } + let sht = self.reconcile_stream(stream, tenant_id).await?; + let state = Arc::new(StreamSyncState { + sht: AsyncMutex::new(sht), + }); + cache.insert(key, state.clone()); + Ok(state) + } + + /// Drop cached state for a stream (used after delete). + pub async fn invalidate_state(&self, stream: &str, tenant_id: &Option) { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + self.state_cache.write().await.remove(&key); + } + + /// Walk the on-disk hot-tier directory for a stream and bring it into + /// agreement with `hottier.manifest.json` files. Removes `.partial` + /// orphans, drops manifest entries whose files are missing or wrong size, + /// deletes parquet files that exist but are not in their date manifest, + /// then recomputes `used_size` / `available_size` from the cleaned + /// manifests and persists the updated `StreamHotTier`. + async fn reconcile_stream( + &self, + stream: &str, + tenant_id: &Option, + ) -> Result { + info!(stream = %stream, tenant = ?tenant_id, "reconcile starting"); + let mut sht = self.get_hot_tier(stream, tenant_id).await?; + let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; + let mut total_used: u64 = 0; + let mut partials_removed = 0usize; + let mut entries_dropped = 0usize; + let mut orphans_removed = 0usize; + + for date in dates { + let date_dir = self.get_stream_path_for_date(stream, &date, tenant_id); + if !date_dir.exists() { + continue; + } + + let mut on_disk: HashSet = HashSet::new(); + + // Pass 1: collect on-disk parquet files (drop .partial orphans). + self.drop_partials( + &mut on_disk, + &date_dir, + &mut partials_removed, + stream, + tenant_id, + ) + .await?; + + // Pass 2: clean manifest of stale entries. + let mut keep_names: HashSet = HashSet::new(); + self.clean_manifest( + &mut keep_names, + &date_dir, + &mut total_used, + &mut entries_dropped, + stream, + tenant_id, + ) + .await?; + + // Pass 3: delete on-disk parquet files not referenced by the cleaned manifest. + for name in on_disk.difference(&keep_names) { + let p = date_dir.join(name); + let _ = fs::remove_file(&p).await; + orphans_removed += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %p.display(), + "reconcile: deleted orphan parquet not in manifest" + ); + } + } + + sht.used_size = total_used; + sht.available_size = sht.size.saturating_sub(total_used); + self.put_hot_tier(stream, &mut sht, tenant_id).await?; + info!( + stream = %stream, + tenant = ?tenant_id, + partials_removed, + entries_dropped, + orphans_removed, + used = sht.used_size, + available = sht.available_size, + "reconcile done" + ); + Ok(sht) + } + + async fn drop_partials( + &self, + on_disk: &mut HashSet, + date_dir: &PathBuf, + partials_removed: &mut usize, + stream: &str, + tenant_id: &Option, + ) -> Result<(), HotTierError> { + let mut entries = fs::read_dir(date_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let p = entry.path(); + let Some(name_os) = p.file_name() else { + continue; + }; + let name = name_os.to_string_lossy(); + if name.ends_with(".partial") { + let _ = fs::remove_file(&p).await; + *partials_removed += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + path = %p.display(), + "reconcile: deleted partial orphan" + ); + continue; + } + if name.ends_with(".manifest.json") { + continue; + } + if !p.is_file() { + continue; + } + on_disk.insert(name.into_owned()); } + Ok(()) + } + + async fn clean_manifest( + &self, + keep_names: &mut HashSet, + date_dir: &PathBuf, + total_used: &mut u64, + entries_dropped: &mut usize, + stream: &str, + tenant_id: &Option, + ) -> Result<(), HotTierError> { + let manifest_path = date_dir.join("hottier.manifest.json"); + let mut manifest: Manifest = if manifest_path.exists() { + let bytes = fs::read(&manifest_path).await?; + serde_json::from_slice(&bytes).unwrap_or_default() + } else { + Manifest::default() + }; + + let mut kept = Vec::with_capacity(manifest.files.len()); + for f in manifest.files.drain(..) { + let local = self.hot_tier_path.join(&f.file_path); + let ok = match fs::metadata(&local).await { + Ok(m) => m.len() == f.file_size, + Err(_) => false, + }; + if ok { + if let Some(name) = local.file_name().and_then(|s| s.to_str()) { + keep_names.insert(name.to_owned()); + } + *total_used += f.file_size; + kept.push(f); + } else { + let _ = fs::remove_file(&local).await; + *entries_dropped += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %f.file_path, + "reconcile: dropped manifest entry (file missing or wrong size)" + ); + } + } + kept.sort_by_key(|f| f.file_path.clone()); + manifest.files = kept; + + if manifest_path.exists() || !manifest.files.is_empty() { + fs::create_dir_all(&date_dir).await?; + fs::write(&manifest_path, serde_json::to_vec(&manifest)?).await?; + } + Ok(()) } /// Get a global @@ -214,12 +436,16 @@ impl HotTierManager { if !self.check_stream_hot_tier_exists(stream, tenant_id) { return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } + // Stop loops before tearing down the directory so no in-flight tick + // re-creates files mid-delete. + self.abort_stream_tasks(stream, tenant_id).await; let path = if let Some(tenant_id) = tenant_id.as_ref() { self.hot_tier_path.join(tenant_id).join(stream) } else { self.hot_tier_path.join(stream) }; fs::remove_dir_all(path).await?; + self.invalidate_state(stream, tenant_id).await; Ok(()) } @@ -259,58 +485,111 @@ impl HotTierManager { Ok(path) } - /// schedule the download of the hot tier files from S3 every minute + /// Discover hot-tier-enabled streams at boot and spawn a per-stream pair + /// of (Latest, Historic) loops for each. New streams added later acquire + /// their own loops via `spawn_stream_tasks` from the PUT hot-tier handler. pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where 'a: 'static, { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(HOT_TIER_SYNC_DURATION) - .plus(Interval::Seconds(5)) - .run(move || async { - if let Err(err) = self.sync_hot_tier().await { - error!("Error in hot tier scheduler: {:?}", err); - } - }); - + let latest_min = PARSEABLE.options.hot_tier_latest_minutes; + let historic_min = PARSEABLE.options.hot_tier_historic_sync_minutes.max(1); + info!( + latest_minutes = latest_min, + historic_sync_minutes = historic_min, + "hot tier scheduler starting" + ); + + let this: &'static HotTierManager = self; tokio::spawn(async move { - loop { - scheduler.run_pending().await; - tokio::time::sleep(Duration::from_secs(10)).await; + // pstats hot tier may need to be created on boot before any tasks + // can pick it up. + if let Err(e) = this.create_pstats_hot_tier().await { + tracing::error!("Skipping pstats hot tier creation because of error: {e}"); + } + let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { + tenants.into_iter().map(Some).collect::>() + } else { + vec![None] + }; + for tenant_id in tenants { + for stream in PARSEABLE.streams.list(&tenant_id) { + if this.check_stream_hot_tier_exists(&stream, &tenant_id) { + this.spawn_stream_tasks(stream, tenant_id.clone()).await; + } + } } }); Ok(()) } - /// sync the hot tier files from S3 to the hot tier directory for all streams - async fn sync_hot_tier(&self) -> Result<(), HotTierError> { - // Before syncing, check if pstats stream was created and needs hot tier - if let Err(e) = self.create_pstats_hot_tier().await { - tracing::trace!("Skipping pstats hot tier creation because of error: {e}"); + /// Spawn (Latest, Historic) loops for a single stream. Idempotent: + /// if tasks already exist for this (tenant, stream), no-op. + pub async fn spawn_stream_tasks(&'static self, stream: String, tenant_id: Option) { + let key: StreamKey = (tenant_id.clone(), stream.clone()); + { + let tasks = self.tasks.read().await; + if let Some(existing) = tasks.get(&key) + && !existing.latest.is_finished() + && !existing.historic.is_finished() + { + return; + } } - let mut sync_hot_tier_tasks = FuturesUnordered::new(); - let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { - tenants.into_iter().map(Some).collect() - } else { - vec![None] - }; - for tenant_id in tenants { - for stream in PARSEABLE.streams.list(&tenant_id) { - if self.check_stream_hot_tier_exists(&stream, &tenant_id) { - sync_hot_tier_tasks.push(self.process_stream(stream, tenant_id.to_owned())); + let latest_interval = Duration::from_secs(60); + let historic_interval = Duration::from_secs( + PARSEABLE.options.hot_tier_historic_sync_minutes.max(1) as u64 * 60, + ); + + info!(stream = %stream, tenant = ?tenant_id, "spawning per-stream hot tier tasks"); + + let s = stream.clone(); + let t = tenant_id.clone(); + let latest = tokio::spawn(async move { + loop { + info!(stream = %s, tenant = ?t, phase = ?SyncPhase::Latest, "stream tick fired"); + if let Err(err) = self + .process_stream(s.clone(), t.clone(), SyncPhase::Latest) + .await + { + error!(stream = %s, "latest sync error: {err:?}"); } + tokio::time::sleep(latest_interval).await; } - } + }); - while let Some(res) = sync_hot_tier_tasks.next().await { - if let Err(err) = res { - error!("Failed to run hot tier sync task {err:?}"); - return Err(err); + let s = stream.clone(); + let t = tenant_id.clone(); + let historic = tokio::spawn(async move { + loop { + info!(stream = %s, tenant = ?t, phase = ?SyncPhase::Historic, "stream tick fired"); + if let Err(err) = self + .process_stream(s.clone(), t.clone(), SyncPhase::Historic) + .await + { + error!(stream = %s, "historic sync error: {err:?}"); + } + tokio::time::sleep(historic_interval).await; } + }); + + let mut tasks = self.tasks.write().await; + if let Some(old) = tasks.insert(key, StreamTasks { latest, historic }) { + old.latest.abort(); + old.historic.abort(); + } + } + + /// Abort and remove per-stream tasks. Caller must ensure no further work + /// will be enqueued for the stream after this returns. + async fn abort_stream_tasks(&self, stream: &str, tenant_id: &Option) { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + if let Some(t) = self.tasks.write().await.remove(&key) { + t.latest.abort(); + t.historic.abort(); + info!(stream = %stream, tenant = ?tenant_id, "aborted per-stream hot tier tasks"); } - Ok(()) } /// process the hot tier files for the stream @@ -319,10 +598,14 @@ impl HotTierManager { &self, stream: String, tenant_id: Option, + phase: SyncPhase, ) -> Result<(), HotTierError> { - let stream_hot_tier = self.get_hot_tier(&stream, &tenant_id).await?; - let mut parquet_file_size = stream_hot_tier.used_size; - + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "fetching manifest from metastore" + ); let mut s3_manifest_file_list = PARSEABLE .metastore .get_all_manifest_files(&stream, &tenant_id) @@ -332,15 +615,31 @@ impl HotTierManager { e.to_detail(), ))) })?; + let date_count = s3_manifest_file_list.len(); + let total_files: usize = s3_manifest_file_list + .values() + .map(|m| m.iter().map(|x| x.files.len()).sum::()) + .sum(); + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + dates = date_count, + files = total_files, + "manifest fetched" + ); + + let stream_start = std::time::Instant::now(); + self.process_manifest(&stream, &mut s3_manifest_file_list, &tenant_id, phase) + .await?; - self.process_manifest( - &stream, - &mut s3_manifest_file_list, - &mut parquet_file_size, - &tenant_id, - ) - .await?; - + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + elapsed_ms = stream_start.elapsed().as_millis() as u64, + "stream sync done" + ); Ok(()) } @@ -352,120 +651,319 @@ impl HotTierManager { &self, stream: &str, manifest_files_to_download: &mut BTreeMap>, - parquet_file_size: &mut u64, tenant_id: &Option, + phase: SyncPhase, ) -> Result<(), HotTierError> { if manifest_files_to_download.is_empty() { return Ok(()); } + + let latest_minutes = PARSEABLE.options.hot_tier_latest_minutes.max(0); + let cutoff = chrono::Utc::now().naive_utc() - chrono::Duration::minutes(latest_minutes); + + // Build flat work list: (date, file, local_path) ordered latest-date-first, + // and within a date by descending file_path (matches old `pop()` order). + let mut work: Vec<(NaiveDate, File, PathBuf)> = Vec::new(); for (str_date, manifest_files) in manifest_files_to_download.iter().rev() { - let mut storage_combined_manifest = Manifest::default(); + let date = + match NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") { + Ok(d) => d, + Err(_) => { + info!("Invalid date format: {}", str_date); + continue; + } + }; + let mut combined = Manifest::default(); for storage_manifest in manifest_files { - storage_combined_manifest - .files - .extend(storage_manifest.files.clone()); + combined.files.extend(storage_manifest.files.clone()); } + combined.files.sort_by_key(|f| f.file_path.clone()); + while let Some(parquet_file) = combined.files.pop() { + let parquet_path = self.hot_tier_path.join(&parquet_file.file_path); + if parquet_path.exists() { + continue; + } + let dt = extract_datetime(&parquet_file.file_path); + let is_latest = dt.map(|d| d >= cutoff).unwrap_or(false); + let keep = match phase { + SyncPhase::Latest => is_latest, + SyncPhase::Historic => !is_latest, + }; + if keep { + work.push((date, parquet_file, parquet_path)); + } + } + } - storage_combined_manifest - .files - .sort_by_key(|file| file.file_path.clone()); - - while let Some(parquet_file) = storage_combined_manifest.files.pop() { - let parquet_file_path = &parquet_file.file_path; - let parquet_path = self.hot_tier_path.join(parquet_file_path); - - if !parquet_path.exists() { - if let Ok(date) = - NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") - { - if !self - .process_parquet_file( - stream, - &parquet_file, - parquet_file_size, - parquet_path, - date, - tenant_id, - ) - .await? - { - break; - } - } else { - warn!("Invalid date format: {}", str_date); + let work_count = work.len(); + let total_bytes: u64 = work.iter().map(|(_, f, _)| f.file_size).sum(); + if work.is_empty() { + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "no files to download this tick" + ); + return Ok(()); + } + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + work_count, + total_bytes, + "work list built" + ); + + let state = self.get_or_load_state(stream, tenant_id).await?; + let concurrency = PARSEABLE + .options + .hot_tier_files_per_stream_concurrency + .max(4); + + // Reservation failure (out of disk + nothing to evict) is sticky: + // once one file can't be placed, no subsequent file will fit either. + let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + let stream_owned = stream.to_owned(); + let tenant_owned = tenant_id.clone(); + + let results: Vec> = futures::stream::iter(work) + .map(|(date, file, parquet_path)| { + let state = state.clone(); + let stream = stream_owned.clone(); + let tenant_id = tenant_owned.clone(); + let stop = stop.clone(); + async move { + if stop.load(std::sync::atomic::Ordering::Relaxed) { + return Ok(()); } + let processed = self + .process_parquet_file_concurrent( + &stream, + &file, + parquet_path, + date, + &tenant_id, + &state, + phase, + ) + .await?; + if !processed && !stop.swap(true, std::sync::atomic::Ordering::Relaxed) { + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "sticky stop: halting further reservations this tick" + ); + } + Ok(()) } - } + }) + .buffer_unordered(concurrency) + .collect() + .await; + + for r in results { + r?; } Ok(()) } - /// process the parquet file for the stream - /// check if the disk is available to download the parquet file - /// if not available, delete the oldest entry from the hot tier directory - /// download the parquet file from S3 to the hot tier directory - /// update the used and available size in the hot tier metadata - /// return true if the parquet file is processed successfully - async fn process_parquet_file( + /// Reserve disk budget under the per-stream lock, download outside the lock, + /// then commit usage + per-date manifest under the lock again. + /// Returns false when no budget is available (caller should stop scheduling + /// further work for this stream). + #[allow(clippy::too_many_arguments)] + async fn process_parquet_file_concurrent( &self, stream: &str, parquet_file: &File, - parquet_file_size: &mut u64, parquet_path: PathBuf, date: NaiveDate, tenant_id: &Option, + state: &Arc, + phase: SyncPhase, ) -> Result { - let mut file_processed = false; - let mut stream_hot_tier = self.get_hot_tier(stream, tenant_id).await?; - if !self.is_disk_available(parquet_file.file_size).await? - || stream_hot_tier.available_size <= parquet_file.file_size + // RESERVE { - if !self - .cleanup_hot_tier_old_data( - stream, - &mut stream_hot_tier, - &parquet_path, - parquet_file.file_size, - tenant_id, - ) - .await? + let mut sht = state.sht.lock().await; + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + used = sht.used_size, + "reserving" + ); + if !self.is_disk_available(parquet_file.file_size).await? + || sht.available_size < parquet_file.file_size { - return Ok(file_processed); + match phase { + SyncPhase::Latest => { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "tight on space; triggering eviction" + ); + if !self + .cleanup_hot_tier_old_data( + stream, + &mut sht, + &parquet_path, + parquet_file.file_size, + tenant_id, + ) + .await? + { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + "eviction freed nothing, skipping file" + ); + return Ok(false); + } + } + SyncPhase::Historic => { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "historic phase: full, skipping file" + ); + return Ok(false); + } + } + } + if sht.available_size < parquet_file.file_size { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "still no space after eviction, skipping" + ); + return Ok(false); } - *parquet_file_size = stream_hot_tier.used_size; + sht.available_size = + if let Some(val) = sht.available_size.checked_sub(parquet_file.file_size) { + val + } else { + tracing::error!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "file_size > sht.available_size, setting available_size to 0 and moving on" + ); + 0 + }; + self.put_hot_tier(stream, &mut sht, tenant_id).await?; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + deducted = parquet_file.file_size, + new_available = sht.available_size, + "reserved" + ); } + + // DOWNLOAD (no lock held) let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; - let mut file = fs::File::create(parquet_path.clone()).await?; - let parquet_data = PARSEABLE + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + "download starting" + ); + let dl_start = std::time::Instant::now(); + let download_result = PARSEABLE .storage .get_object_store() - .get_object(&parquet_file_path, tenant_id) - .await?; - file.write_all(&parquet_data).await?; - *parquet_file_size += parquet_file.file_size; - stream_hot_tier.used_size = *parquet_file_size; + .buffered_write(&parquet_file_path, tenant_id, parquet_path.clone()) + .await; + let dl_elapsed = dl_start.elapsed(); + + if let Err(e) = download_result { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + elapsed_ms = dl_elapsed.as_millis() as u64, + err = %e, + "download failed, refunding reservation" + ); + // refund reservation + let mut sht = state.sht.lock().await; + sht.available_size += parquet_file.file_size; + if let Err(put_err) = self.put_hot_tier(stream, &mut sht, tenant_id).await { + error!("failed to persist refund after download failure: {put_err:?}"); + } + // backend already cleaned up its `.partial` file; final path was never created. + return Err(e.into()); + } + let elapsed_ms = dl_elapsed.as_millis() as u64; + let mbps = if dl_elapsed.as_secs_f64() > 0.0 { + (parquet_file.file_size as f64 * 8.0) / dl_elapsed.as_secs_f64() / 1_000_000.0 + } else { + 0.0 + }; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + elapsed_ms, + mbps = format!("{mbps:.1}"), + "download finished, committing" + ); + + // COMMIT + { + let mut sht = state.sht.lock().await; + sht.used_size += parquet_file.file_size; + self.put_hot_tier(stream, &mut sht, tenant_id).await?; - stream_hot_tier.available_size -= parquet_file.file_size; - self.put_hot_tier(stream, &mut stream_hot_tier, tenant_id) - .await?; - file_processed = true; - let path = self.get_stream_path_for_date(stream, &date, tenant_id); - let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?; - hot_tier_manifest.files.push(parquet_file.clone()); - hot_tier_manifest - .files - .sort_by_key(|file| file.file_path.clone()); - // write the manifest file to the hot tier directory - let manifest_path = self - .get_stream_path_for_date(stream, &date, tenant_id) - .join("hottier.manifest.json"); - fs::create_dir_all(manifest_path.parent().unwrap()).await?; - fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; - - Ok(file_processed) + let path = self.get_stream_path_for_date(stream, &date, tenant_id); + let mut hot_tier_manifest = + HotTierManager::get_hot_tier_manifest_from_path(path).await?; + hot_tier_manifest.files.push(parquet_file.clone()); + hot_tier_manifest + .files + .sort_by_key(|file| file.file_path.clone()); + // write the manifest file to the hot tier directory + let manifest_path = self + .get_stream_path_for_date(stream, &date, tenant_id) + .join("hottier.manifest.json"); + fs::create_dir_all(manifest_path.parent().unwrap()).await?; + fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + used = sht.used_size, + available = sht.available_size, + "committed" + ); + } + + Ok(true) } ///fetch the list of dates available in the hot tier directory for the stream and sort them @@ -628,7 +1126,7 @@ impl HotTierManager { path.exists() } - ///delete the parquet file from the hot tier directory for the stream + /// delete entire parquet file minute from the hot tier directory for the stream /// loop through all manifests in the hot tier directory for the stream /// loop through all parquet files in the manifest /// check for the oldest entry to delete if the path exists in hot tier @@ -642,7 +1140,15 @@ impl HotTierManager { parquet_file_size: u64, tenant_id: &Option, ) -> Result { + info!( + stream = %stream, + tenant = ?tenant_id, + target_size = parquet_file_size, + available = stream_hot_tier.available_size, + "eviction starting" + ); let mut delete_successful = false; + let mut freed_total: u64 = 0; let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; 'loop_dates: for date in dates { let path = self.get_stream_path_for_date(stream, &date, tenant_id); @@ -662,49 +1168,98 @@ impl HotTierManager { let file = fs::read(manifest_file.path()).await?; let mut manifest: Manifest = serde_json::from_slice(&file)?; - manifest.files.sort_by_key(|file| file.file_path.clone()); - manifest.files.reverse(); - - 'loop_files: while let Some(file_to_delete) = manifest.files.pop() { - let file_size = file_to_delete.file_size; - let path_to_delete = self.hot_tier_path.join(&file_to_delete.file_path); + if manifest.files.is_empty() { + continue; + } - if path_to_delete.exists() { - if let (Some(download_date_time), Some(delete_date_time)) = ( - extract_datetime(download_file_path.to_str().unwrap()), - extract_datetime(path_to_delete.to_str().unwrap()), - ) && download_date_time <= delete_date_time - { - delete_successful = false; - break 'loop_files; - } + // sort in an ascending manner + // idx0: minute=00 + // idx59: minute=59 + manifest.files.sort_by_key(|file| file.file_path.clone()); - fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + // get first file's parent (/hottier/stream/date=d/hour=h/minute=m) + let first_file = manifest.files.first().unwrap(); + let first_file_path = self.hot_tier_path.join(&first_file.file_path); + let minute_to_delete = first_file_path.parent().unwrap(); - fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; - delete_empty_directory_hot_tier( - path_to_delete.parent().unwrap().to_path_buf(), - ) - .await?; - - stream_hot_tier.used_size -= file_size; - stream_hot_tier.available_size += file_size; - self.put_hot_tier(stream, stream_hot_tier, tenant_id) - .await?; - delete_successful = true; + if minute_to_delete.exists() { + if let (Some(download_date_time), Some(delete_date_time)) = ( + extract_datetime(download_file_path.to_str().unwrap()), + extract_datetime(first_file_path.to_str().unwrap()), + ) && download_date_time <= delete_date_time + { + info!( + stream = %stream, + tenant = ?tenant_id, + candidate = %minute_to_delete.display(), + target = %download_file_path.display(), + "skip evict: candidate newer than target" + ); + continue; + } - if stream_hot_tier.available_size <= parquet_file_size { - continue 'loop_files; + let minute_to_delete_owned = minute_to_delete.to_path_buf(); + let mut minute_freed: u64 = 0; + manifest.files.retain(|file| { + let file_path = self.hot_tier_path.join(&file.file_path); + let file_minute = file_path.parent().unwrap(); + if file_minute == minute_to_delete_owned { + minute_freed = minute_freed.saturating_add(file.file_size); + false } else { - break 'loop_dates; + true } + }); + + stream_hot_tier.used_size = stream_hot_tier + .used_size + .checked_sub(minute_freed) + .unwrap_or_else(|| { + tracing::error!( + stream = %stream, + tenant = ?tenant_id, + minute = %minute_to_delete_owned.display(), + minute_freed, + used_size = stream_hot_tier.used_size, + "minute_freed > used_size, clamping used_size to 0" + ); + 0 + }); + stream_hot_tier.available_size = + stream_hot_tier.available_size.saturating_add(minute_freed); + freed_total = freed_total.saturating_add(minute_freed); + + fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + fs::remove_dir_all(&minute_to_delete_owned).await?; + delete_empty_directory_hot_tier(minute_to_delete_owned.clone()).await?; + self.put_hot_tier(stream, stream_hot_tier, tenant_id) + .await?; + delete_successful = true; + info!( + stream = %stream, + tenant = ?tenant_id, + evicted_minute = %minute_to_delete_owned.display(), + evicted_size = minute_freed, + freed_total, + new_available = stream_hot_tier.available_size, + "evicted" + ); + if stream_hot_tier.available_size <= parquet_file_size { + continue; } else { - fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + break 'loop_dates; } } } } + info!( + stream = %stream, + tenant = ?tenant_id, + freed_total, + success = delete_successful, + "eviction complete" + ); Ok(delete_successful) } diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 9c9ff86fa..c7689863a 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -306,6 +306,19 @@ pub static TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = + Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "partial_file_scans_in_object_store_calls_by_date", + "Partial file scans in object store calls by date", + ) + .namespace(METRICS_NAMESPACE), + &["method", "date", "tenant_id"], + ) + .expect("metric can be created") + }); + pub static TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = Lazy::new(|| { IntCounterVec::new( @@ -527,6 +540,11 @@ fn custom_metrics(registry: &Registry) { TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), )) .expect("metric can be registered"); + registry + .register(Box::new( + PARTIAL_FILE_SCANS_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), + )) + .expect("metric can be registered"); registry .register(Box::new( TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), @@ -695,6 +713,17 @@ pub fn increment_object_store_calls_by_date(method: &str, date: &str, tenant_id: .inc(); } +pub fn increment_partial_file_scans_in_object_store_calls_by_date( + method: &str, + count: u64, + date: &str, + tenant_id: &str, +) { + PARTIAL_FILE_SCANS_IN_OBJECT_STORE_CALLS_BY_DATE + .with_label_values(&[method, date, tenant_id]) + .inc_by(count); +} + pub fn increment_files_scanned_in_object_store_calls_by_date( method: &str, count: u64, diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 62abad0db..1f81a847c 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -16,7 +16,13 @@ * */ -use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + ops::Range, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use async_trait::async_trait; use bytes::Bytes; @@ -47,6 +53,7 @@ use crate::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -54,7 +61,8 @@ use crate::{ use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] @@ -181,7 +189,7 @@ impl ObjectStorageProvider for AzureBlobConfig { // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); Arc::new(BlobStore { - client: azure, + client: Arc::new(azure), account: self.account.clone(), container: self.container.clone(), root: StorePath::from(""), @@ -197,13 +205,111 @@ impl ObjectStorageProvider for AzureBlobConfig { // object store such as S3 and Azure Blob #[derive(Debug)] pub struct BlobStore { - client: LimitStore, + client: Arc>, account: String, container: String, root: StorePath, } impl BlobStore { + async fn _parallel_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + tokio::fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; + + if let Some(parent) = partial_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&partial_path).await?; + file.set_len(total).await?; + let std_file = Arc::new(file.into_std().await); + + let chunk = PARSEABLE + .options + .hot_tier_download_chunk_size + .max(8 * 1024 * 1024); + let concurrency = PARSEABLE.options.hot_tier_download_concurrency.max(6); + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + let client = self.client.clone(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + futures::stream::iter(ranges) + .map(|r| { + let client = client.clone(); + let src = src.clone(); + let std_file = std_file.clone(); + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + ObjectStorageError::Custom(format!("semaphore closed: {e}")) + })?; + let bytes = client.get_range(&src, r.clone()).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + use std::os::unix::fs::FileExt; + std_file.write_all_at(&bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(concurrency) + .try_collect::>() + .await?; + + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + + increment_object_store_calls_by_date("GET", &date, tenant_str); + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + async fn _get_object( &self, path: &RelativePath, @@ -466,6 +572,14 @@ impl BlobStore { #[async_trait] impl ObjectStorage for BlobStore { + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._parallel_download(path, tenant_id, write_path).await + } async fn get_buffered_reader( &self, _path: &RelativePath, diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 019802f24..75dd141b6 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -16,13 +16,20 @@ * */ -use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + ops::Range, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use crate::{ metrics::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -47,12 +54,14 @@ use object_store::{ }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; + use tracing::error; use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] @@ -169,6 +178,106 @@ pub struct Gcs { } impl Gcs { + async fn _parallel_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + tokio::fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; + + if let Some(parent) = partial_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&partial_path).await?; + file.set_len(total).await?; + let std_file = Arc::new(file.into_std().await); + + let chunk = PARSEABLE + .options + .hot_tier_download_chunk_size + .max(8 * 1024 * 1024); + let concurrency = PARSEABLE.options.hot_tier_download_concurrency.max(16); + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + let client = self.client.clone(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + let mut handles = Vec::with_capacity(ranges.len()); + for r in ranges { + let client = client.clone(); + let src = src.clone(); + let std_file = std_file.clone(); + let semaphore = semaphore.clone(); + handles.push(tokio::spawn(async move { + let _permit = semaphore + .acquire_owned() + .await + .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; + let bytes = client.get_range(&src, r.clone()).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + use std::os::unix::fs::FileExt; + std_file.write_all_at(&bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + Ok::<_, ObjectStorageError>(()) + })); + } + for h in handles { + h.await + .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; + } + + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + + increment_object_store_calls_by_date("GET", &date, tenant_str); + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + async fn _get_object( &self, path: &RelativePath, @@ -431,6 +540,14 @@ impl Gcs { #[async_trait] impl ObjectStorage for Gcs { + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._parallel_download(path, tenant_id, write_path).await + } async fn get_buffered_reader( &self, path: &RelativePath, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 6f981981c..8d6027413 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -106,6 +106,28 @@ impl LocalFS { #[async_trait] impl ObjectStorage for LocalFS { + async fn buffered_write( + &self, + path: &RelativePath, + _tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let src = self.path_in_root(path); + let partial = super::partial_path(&write_path)?; + if let Some(parent) = partial.parent() { + fs::create_dir_all(parent).await?; + } + match fs::copy(&src, &partial).await { + Ok(_) => { + fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = fs::remove_file(&partial).await; + Err(e.into()) + } + } + } async fn upload_multipart( &self, key: &RelativePath, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2ca7a10de..15dd98975 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -344,3 +344,18 @@ pub enum ObjectStorageError { pub fn to_object_store_path(path: &RelativePath) -> Path { Path::from(path.as_str()) } + +/// Append `.partial` to the file name of a local path. Used by hot-tier +/// downloaders to write to a sibling path and atomically rename on success. +pub fn partial_path( + write_path: &std::path::Path, +) -> Result { + let name = write_path + .file_name() + .ok_or_else(|| ObjectStorageError::Custom("download write_path has no file name".into()))?; + let mut next = std::ffi::OsString::from(name); + next.push(".partial"); + let mut buf = write_path.to_path_buf(); + buf.set_file_name(next); + Ok(buf) +} diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 2e58a0df5..2c6244733 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -302,6 +302,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { path: &RelativePath, tenant_id: &Option, ) -> Result; + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError>; async fn get_object( &self, path: &RelativePath, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 169d3c508..51f979ff8 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -17,7 +17,13 @@ */ use std::{ - collections::HashSet, fmt::Display, path::Path, str::FromStr, sync::Arc, time::Duration, + collections::HashSet, + fmt::Display, + ops::Range, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + time::Duration, }; use async_trait::async_trait; @@ -48,6 +54,7 @@ use crate::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -55,7 +62,8 @@ use crate::{ use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; // in bytes @@ -331,6 +339,109 @@ pub struct S3 { } impl S3 { + async fn _parallel_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + if let Err(e) = tokio::fs::rename(&partial, &write_path).await { + let _ = tokio::fs::remove_file(&partial).await; + return Err(e.into()); + } + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; + + if let Some(parent) = partial_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&partial_path).await?; + file.set_len(total).await?; + let std_file = Arc::new(file.into_std().await); + + let chunk = PARSEABLE + .options + .hot_tier_download_chunk_size + .max(8 * 1024 * 1024); + let concurrency = PARSEABLE.options.hot_tier_download_concurrency.max(16); + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + let client = Arc::new(self.client.clone()); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency)); + + let mut handles = Vec::with_capacity(ranges.len()); + for r in ranges { + let client = client.clone(); + let src = src.clone(); + let std_file = std_file.clone(); + let semaphore = semaphore.clone(); + handles.push(tokio::spawn(async move { + let _permit = semaphore + .acquire_owned() + .await + .map_err(|e| ObjectStorageError::Custom(format!("semaphore closed: {e}")))?; + let bytes = client.get_range(&src, r.clone()).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + use std::os::unix::fs::FileExt; + std_file.write_all_at(&bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + Ok::<_, ObjectStorageError>(()) + })); + } + for h in handles { + h.await + .map_err(|e| ObjectStorageError::Custom(format!("join error: {e}")))??; + } + + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + + increment_object_store_calls_by_date("GET", &date, tenant_str); + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + async fn _get_object( &self, path: &RelativePath, @@ -686,12 +797,21 @@ impl ObjectStorage for S3 { Ok(result?) } + async fn buffered_write( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._parallel_download(path, tenant_id, write_path).await + } + async fn get_object( &self, path: &RelativePath, tenant_id: &Option, ) -> Result { - Ok(self._get_object(path, tenant_id).await?) + self._get_object(path, tenant_id).await } async fn get_objects(