Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions datafusion/datasource-parquet/src/schema_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,37 +51,39 @@ pub fn apply_file_schema_type_coercions(
table_schema: &Schema,
file_schema: &Schema,
) -> Option<Schema> {
// First pass: check whether any transformation is needed without building
// the full lookup map. For wide schemas this saves constructing a
// HashMap of every column (which is then discarded) once per file.
let mut needs_view_transform = false;
let mut needs_string_transform = false;

// Create a mapping of table field names to their data types for fast lookup
// and simultaneously check if we need any transformations
let table_fields: HashMap<_, _> = table_schema
.fields()
.iter()
.map(|f| {
let dt = f.data_type();
// Check if we need view type transformation
if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
needs_view_transform = true;
}
// Check if we need string type transformation
if matches!(
dt,
&DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
) {
needs_string_transform = true;
}

(f.name(), dt)
})
.collect();
for f in table_schema.fields() {
let dt = f.data_type();
if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
needs_view_transform = true;
}
if matches!(
dt,
&DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
) {
needs_string_transform = true;
}
if needs_view_transform && needs_string_transform {
break;
}
}

// Early return if no transformation needed
if !needs_view_transform && !needs_string_transform {
return None;
}

// Build a name -> data type lookup only when we actually need it.
let table_fields: HashMap<_, _> = table_schema
.fields()
.iter()
.map(|f| (f.name(), f.data_type()))
.collect();

let transformed_fields: Vec<Arc<Field>> = file_schema
.fields()
.iter()
Expand Down
55 changes: 39 additions & 16 deletions datafusion/execution/src/cache/file_metadata_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,20 @@ use crate::cache::{
lru_queue::LruQueue,
};

/// Internal cache value: the entry plus its precomputed memory size, so we
/// don't have to call [`FileMetadata::memory_size`] (which walks the entire
/// metadata structure) on every put / remove / eviction.
Comment thread
martin-g marked this conversation as resolved.
///
/// [`FileMetadata::memory_size`]: https://docs.rs/datafusion/latest/datafusion/execution/cache/cache_manager/trait.FileMetadata.html#tymethod.memory_size
#[derive(Clone)]
struct SizedCacheEntry {
entry: CachedFileMetadataEntry,
size: usize,
}

/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
struct DefaultFilesMetadataCacheState {
lru_queue: LruQueue<Path, CachedFileMetadataEntry>,
lru_queue: LruQueue<Path, SizedCacheEntry>,
memory_limit: usize,
memory_used: usize,
cache_hits: HashMap<Path, usize>,
Expand All @@ -46,9 +57,14 @@ impl DefaultFilesMetadataCacheState {
/// Returns the respective entry from the cache, if it exists.
/// If the entry exists, it becomes the most recently used.
fn get(&mut self, k: &Path) -> Option<CachedFileMetadataEntry> {
self.lru_queue.get(k).cloned().inspect(|_| {
*self.cache_hits.entry(k.clone()).or_insert(0) += 1;
})
self.lru_queue
.get(k)
.map(|sized| sized.entry.clone())
.inspect(|_| {
if let Some(hits) = self.cache_hits.get_mut(k) {
*hits += 1;
}
})
}

/// Checks if the metadata is currently cached.
Expand All @@ -65,6 +81,8 @@ impl DefaultFilesMetadataCacheState {
key: Path,
value: CachedFileMetadataEntry,
) -> Option<CachedFileMetadataEntry> {
// Compute the size once and store it on the entry so the eviction
// path can reuse it without re-walking the metadata.
let value_size = value.file_metadata.memory_size();

// no point in trying to add this value to the cache if it cannot fit entirely
Expand All @@ -73,23 +91,28 @@ impl DefaultFilesMetadataCacheState {
}

self.cache_hits.insert(key.clone(), 0);
let sized = SizedCacheEntry {
entry: value,
size: value_size,
};
// if the key is already in the cache, the old value is removed
let old_value = self.lru_queue.put(key, value);
let old_value = self.lru_queue.put(key, sized);
self.memory_used += value_size;
if let Some(ref old_entry) = old_value {
self.memory_used -= old_entry.file_metadata.memory_size();
if let Some(ref old) = old_value {
self.memory_used -= old.size;
}

self.evict_entries();

old_value
old_value.map(|s| s.entry)
}

/// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`.
fn evict_entries(&mut self) {
while self.memory_used > self.memory_limit {
if let Some(removed) = self.lru_queue.pop() {
self.memory_used -= removed.1.file_metadata.memory_size();
self.memory_used -= removed.1.size;
Comment thread
adriangb marked this conversation as resolved.
self.cache_hits.remove(&removed.0);
} else {
// cache is empty while memory_used > memory_limit, cannot happen
debug_assert!(
Expand All @@ -103,10 +126,10 @@ impl DefaultFilesMetadataCacheState {

/// Removes an entry from the cache and returns it, if it exists.
fn remove(&mut self, k: &Path) -> Option<CachedFileMetadataEntry> {
if let Some(old_entry) = self.lru_queue.remove(k) {
self.memory_used -= old_entry.file_metadata.memory_size();
if let Some(old) = self.lru_queue.remove(k) {
self.memory_used -= old.size;
self.cache_hits.remove(k);
Some(old_entry)
Some(old.entry)
} else {
None
}
Expand Down Expand Up @@ -220,14 +243,14 @@ impl FileMetadataCache for DefaultFilesMetadataCache {
let state = self.state.lock().unwrap();
let mut entries = HashMap::<Path, FileMetadataCacheEntry>::new();

for (path, entry) in state.lru_queue.list_entries() {
for (path, sized) in state.lru_queue.list_entries() {
entries.insert(
path.clone(),
FileMetadataCacheEntry {
object_meta: entry.meta.clone(),
size_bytes: entry.file_metadata.memory_size(),
object_meta: sized.entry.meta.clone(),
size_bytes: sized.size,
hits: *state.cache_hits.get(path).expect("entry must exist"),
extra: entry.file_metadata.extra_info(),
extra: sized.entry.file_metadata.extra_info(),
},
);
}
Expand Down
Loading