Refactor datapackerdataloader to be modular#19
Open
foreverlms wants to merge 72 commits into
Open
Conversation
Four-role decomposition (DataDistributor / RawItemProcessor / SampleBatcher / BatchCollator) replacing the fused DataPacker + PackingIterableDataset. Covers VLM + VFM behavior-preserving migration and arbitrary user dataflows. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ment Validate the four-role abstraction against ALL dataloaders (not just VLM/VFM): WebDataset base is vestigial, joint_dataloader fusions are composition artifacts, videophy2 + DROID covered, legacy Iterative/Random expressible. Record per-recipe processor placement and collator nuances. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…h debt IdentityProcessor + still-processing dataset is a conscious break from the four-role separation, chosen for safety/minimal diff; tracked as a follow-up to extract real SFTVideoProcessor / DROIDProcessor later. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Keep old dataloaders as a living baseline; validate each live recipe via a mirror *_datapacker experiment with golden-batch + deterministic loss-curve equivalence (leveraging train.py --deterministic). Delete old code only in a final cleanup PR after all mirrors are validated. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task-oriented bring-your-own-dataset tutorial (mental model, quickstart, four roles, use-case recipes, wiring, resume, sharding, FAQ, worked example). Wired into the implementation phases and docs index. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Align built-in-roles note with the per-recipe IdentityProcessor decision (SFTVideoProcessor/DROIDProcessor are deferred follow-ups). Bump status. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Bite-sized TDD plan for the four role ABCs + IdentityProcessor / DefaultBatchCollator / SimpleBatcher / IterableDistributor / MapDistributor + the DataPackerDataLoader orchestrator (batch_size sugar, DP-coord resolution). New dataflow/ package, no existing recipe touched. Resume and packing batchers deferred to follow-up plans. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Resume + checkpoint saving must not break (blocking gate, dedicated resume test). Regression = baseline launch scripts vs mirror *_datapacker experiments, logging_iter=1/max_iter=500, wandb project cosmos_oss_alignment, fresh run names, env via cosmos3-run-env. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ression PoolPackingBatcher (port of the packing engine), VLMProcessor/VLMCollator extraction, golden-batch equality test, mirror experiment + TOML + launch wrapper, and the loss-curve regression run (baseline vs mirror, logging_iter=1, max_iter=500, wandb cosmos_oss_alignment). Resume invariant honored (iterable source, callbacks untouched). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…smosDataLoader) DataPacker is gone and packing is just one batcher, so the orchestrator gets a fitting name. New loader lives in dataflow/loader.py; legacy DataPackerDataLoader / JointDataPackerDataLoader stay as the regression baseline until cleanup. Legacy references in golden tests / coverage audit kept intact. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Plan 3: MapDistributor env-var resume + CosmosDataLoader meta-threading,
callback unchanged, resume integration test (HARD INVARIANT).
Plan 4: videophy2 migration (VideoPhy2Processor, reuse VLMCollator), golden +
regression.
Plan 5: VFM migration (VFMListCollator, SequentialPackingBatcher,
RankPartitionedDistributor) + MixtureDistributor + JointCosmosDataLoader.
Plan 6: docs/dataflow.md tutorial + cleanup (promote mirrors, delete legacy);
execution gated LAST on all regressions passing.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…SampleBatcher/BatchCollator) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Introduces loader.py with CosmosDataLoader (thin DataLoader subclass) and _DataflowIterableDataset, wiring DataDistributor -> RawItemProcessor -> SampleBatcher -> BatchCollator per worker. Adds batch_size= convenience sugar (builds SimpleBatcher + DefaultBatchCollator) and rejects ambiguous combos. Exports CosmosDataLoader from the dataflow package __init__. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Appends test_multiworker_disjoint_and_complete_one_epoch to loader_test.py: spins up CosmosDataLoader with num_workers=2 over a 12-item MapDataset, consumes exactly one epoch, and asserts all 12 indices are seen exactly once with no duplicates or gaps. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…istent_workers coercion MapDistributor.stream now returns immediately when n==0 or stream_id>=n, preventing an infinite spin that would hang training when the dataset is empty or smaller than dp_world_size*num_workers. Three regression tests added (empty dataset, empty shard, non-empty shard stays infinite). CosmosDataLoader now logs an info message and explicitly sets persistent_workers=False when num_workers==0, instead of silently coercing True to False inside the dict literal. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…t engine) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copies _decode_image / _sharegpt_to_openai / sft_process_sample / sft_collate_fn logic verbatim from VLMDataPacker into standalone RawItemProcessor + BatchCollator roles, ready for use with CosmosDataLoader. Legacy files are untouched. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Proves that CosmosDataLoader+VLMProcessor+VLMCollator+PoolPackingBatcher produces identical batches to the legacy DataPackerDataLoader+VLMDataPacker on the same fixed iterable source (40 items, drain 10 batches). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… dataflow loader Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…sion Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add _DataflowIterableDataset._cached_gen so single-process (num_workers=0) MapDistributor iter() calls reuse the same generator, preserving resume state across iter() calls (analogous to persistent_workers for multi-worker mode). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…le v2 dataloader_train - loader.py: revert the _cached_gen generator caching (commit 1c4d017). __iter__ returns a fresh generator per call, matching legacy/torch semantics. The resume test was the real bug, not the loader. - resume_test.py: use a single iterator (it2 = iter(loader2)) so the env-var fast-forward is consumed once and iteration continues 5,6,7. - llava_ov_datapacker_v2.toml: drop [dataloader_train].max_samples_per_batch — it remaps to dataloader_train.max_batch_size which does not exist on the new CosmosDataLoader node (PoolPackingBatcher owns max_batch_size), causing ConfigAttributeError at config resolution. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…guity guard - PoolPackingBatcher.sample_size default uses len(sample["input_ids"]) (matches the documented contract; identical to .shape[0] for 1-D tensors, also handles list input_ids). - CosmosDataLoader: keep the legacy bit-for-bit max(epoch)/max(pos) resume stamp (correct for all live recipes — VLM/videophy2 use max_batch_size=1; VFM uses a non-resumable iterable source). NOT switching to min-1 (Codex's suggestion), which would make max_batch_size=1 replay the last sample on every resume. Instead, add a contiguity/epoch guard: a multi-sample batch whose _dp_stream_pos are non-contiguous or span epochs (reordering batcher + batch_size>1) now raises loudly instead of silently skipping buffered lower positions on resume. - Add regression test for the guard. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
VLM/videophy2 v2 mirrors were folded into the canonical experiments, so the tutorial now points at pre_exp012_llava_ov and videophy2_sft_nano (plus vision_sft_nano_v2 for the VFM new-loader variant) instead of the deleted _v2 mirror names. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… batcher
The fixed [dataloader_train] TOML fields now drive the new loader via PATH_REMAPS["vlm"]:
max_samples_per_batch -> dataloader_train.batcher.max_batch_size
max_sequence_length -> dataloader_train.batcher.max_tokens
(previously these targeted the flat dataloader_train.{max_batch_size,max_tokens}
that existed on the legacy DataPackerDataLoader but not on CosmosDataLoader, whose
PoolPackingBatcher owns those knobs). Re-add the [dataloader_train] block to
llava_ov.toml + videophy2_sft_nano.toml (max_samples_per_batch=1,
max_sequence_length=16000) so the loader is TOML-tunable again; update the
DataloaderTrainConfig docstrings.
Verified: TOML max_sequence_length=12345 -> resolved batcher.max_tokens=12345;
dryrun resolves for all 5 live recipes (llava_ov, videophy2, vision_sft_nano,
vision_sft_nano_v2, vision_sft_super); 57 dataflow/experiment tests pass.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ov_mapresume) for resume validation Introduces get_llava_ov_map (materialises 800 streamed items into a map-style datasets.Dataset) and pre_exp012_llava_ov_mapresume backed by MapDistributor (shuffle=True, seed=42, num_workers=0). The dataloader_state callback is overridden to distributor_type="data_packer" so DP_STATE_WORKER_* env vars are set on resume. Dryrun verified: CosmosDataLoader + MapDistributor + distributor_type=data_packer + save_iter=100 all resolve correctly. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…smos_dataloader" The distributor_type value "data_packer" + env prefix DP_STATE_ were named after the removed DataPacker. Rename the value -> "cosmos_dataloader" and the env prefix -> COSMOS_DL_STATE_ (callback + MapDistributor reader + resume/distributor tests + the map-resume recipe). Keep the field name `distributor_type` (the broader legacy shardlist-strategy selector) and add a docstring clarifying it is NOT the dataflow DataDistributor role. Verified: 18 resume/distributor tests pass (callback<->MapDistributor env prefixes stay matched); map-resume dryrun shows dataloader_state.distributor_type=cosmos_dataloader. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…map Dataset Switch get_llava_ov_map from "stream + islice into Dataset.from_list" to a genuine random-access map dataset via load_dataset(..., streaming=False), then filter and cap to n rows with .select(). This exercises MapDistributor against a true map-style Dataset (its intended case). Bump n 800 -> 4000 so that, at dp_shard=8 (500 samples/rank/epoch), the iter-100 checkpoint lands mid-epoch (100 samples/rank) — a clean no-epoch-wrap resume test. Dryrun verified: dataloader_train -> MapDistributor(get_llava_ov_map, n=4000, shuffle=true), dataloader_state.distributor_type=cosmos_dataloader, save_iter=100. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…emove dataflow.md The old custom_dataset.md documented the deleted DataPackerDataLoader/DataPacker API. Replace it with the bring-your-own-dataset tutorial for the four-role CosmosDataLoader (DataDistributor → RawItemProcessor → SampleBatcher → BatchCollator), folding in the content from docs/dataflow.md (now deleted, single source of truth). Updates vs the dataflow.md draft: - resume uses distributor_type="cosmos_dataloader" + COSMOS_DL_STATE_* (the rename) - JointCosmosDataLoader shown with dataloaders=/JointDataLoaderStateCallback - notes the TOML->batcher PATH_REMAPS["vlm"] routing - real-world examples point at the current recipes (llava_ov, llava_ov_mapresume, videophy2_sft_nano, vision_sft_nano_v2) + a new-API checklist Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
lfengad
reviewed
Jun 5, 2026
… addlicense) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…loader Rename the map-style resumable VLM example (experiment module, TOML, the pre_exp012_llava_ov_mapresume -> pre_exp012_llava_ov_mapstyle_dataloader registration name, and doc references). Auto-discovered by filename via pkgutil.iter_modules, so no import to update. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…epcopy vision_sft_nano_v2 differed from vision_sft_nano only in job.name and the training dataloader (legacy PackingDataLoader+RankPartitionedDataLoader vs the four-role CosmosDataLoader stack). Derive v2 by deep-copying vision_sft_nano and overriding just those two fields; delete the standalone vision_sft_nano_v2.py module + its config.py import (v2 now registers from vision_sft_nano.py). Experiment name "vision_sft_nano_v2" is unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ment.py via deepcopy pre_exp012_llava_ov_mapstyle_dataloader differed from pre_exp012_llava_ov only in job.name, the dataloader_state callback (distributor_type="cosmos_dataloader"), checkpoint.save_iter, and the dataloader (streaming IterableDistributor vs map-style MapDistributor + num_workers=0). Derive it by deep-copying pre_exp012_llava_ov and overriding just those fields; move get_llava_ov_map alongside get_llava_ov_streaming. Delete the standalone module (VLM experiments auto-register via import_all_modules_from_package). Experiment name and TOML unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the per-module unit tests that sat next to the dataflow source (base/batchers/collators/distributors/loader/processors/joint_loader tests, the VLM/VideoPhy2 dataflow_roles tests, and the vision_sft_nano_v2 registration test). Keep the two behavior guarantees: golden_vfm_test.py (byte-identical to the legacy loader) and resume_test.py (checkpoint resume = no dup/skip). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Commit 7bd72ee added the CosmosDataLoader imports + removed the standalone module/config.py import, but the vision_sft_nano_v2 deepcopy block and the registration-loop update were dropped in a messy git-reset sequence — leaving config.py expecting an experiment that vision_sft_nano.py no longer defined ("Could not find experiment/vision_sft_nano_v2"). Re-add the deepcopy derivation and register both experiments. Verified: all 6 example recipes dry-run clean (vision_sft_nano_v2 resolves to CosmosDataLoader -> RankPartitionedDistributor -> SequentialPackingBatcher -> VFMListCollator). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
lfengad
reviewed
Jun 8, 2026
yy-code-nv
previously approved these changes
Jun 8, 2026
lfengad
previously approved these changes
Jun 9, 2026
…dataloader_state.py dataloader_state.py was modified in commit 4855c86 to add "cosmos_dataloader" as a distributor_type and change the env prefix to COSMOS_DL_STATE_. This entangled CosmosDataLoader-specific resume logic with the legacy DataLoaderStateCallback that serves WebDataset "no_replace" / "data_packer" shardlist strategies. Revert dataloader_state.py fully to main (data_packer, DP_STATE_ prefix) and introduce cosmos_dataloader_state.py with two dedicated classes: - CosmosDataLoaderStateCallback: purpose-built for CosmosDataLoader(MapDistributor). No distributor_type discriminator — always active. Writes COSMOS_DL_STATE_* env vars on load_state_dict so MapDistributor.stream fast-forwards on resume. Accepts distributor_type kwarg (ignored) for Hydra struct-merge compat with the base DataLoaderStateCallback entry in the basic_log callbacks group. - JointCosmosDataLoaderStateCallback: for JointCosmosDataLoader. Persists outer global_id + inner per-dataset per-worker (epoch, index) via one CosmosDataLoaderStateCallback per inner loader. Update llava_ov_vlm.py and resume_test.py to use the new callback. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…llback DataPackerDataLoader and JointDataPackerDataLoader were removed earlier in this branch. Clean up the remnants in dataloader_state.py: - Drop "data_packer" from _ACTIVE_DISTRIBUTOR_TYPES (only "no_replace" remains) - Remove the DP_STATE_ env-var branch in load_state_dict - Delete JointDataLoaderStateCallback (existed solely for the deleted JointDataPackerDataLoader; JointCosmosDataLoaderStateCallback covers the CosmosDataLoader joint-loader case) - Fix stale JointDataLoaderStateCallback docstring reference in loader.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
lfengad
previously approved these changes
Jun 9, 2026
…cipe Mirrors launch_sft_llava_ov.sh for the MapDistributor-backed VLM SFT recipe. Supports optional RESUME_FROM_CKPT env var for checkpoint resume via CosmosDataLoaderStateCallback. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
186116a to
fe95d63
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Refactors the training data layer from the monolithic
DataPackerDataLoader/DataPacker/PackingIterableDatasetinto a modular, four-role abstractionwired by a single loader. Behavior is preserved (golden-batch byte-identical to
the legacy loader; resume validated live), and all existing recipes are migrated.
DataDistributor → RawItemProcessor → SampleBatcher → BatchCollator
(shard/shuffle/ (raw item → one (samples → (group → one
resume) sample dict) batch groups) batch dict)
Each role is a small ABC with one required method; pick a built-in per slot or
write your own.
CosmosDataLoaderis atorch.utils.data.DataLoadersubclass, soit drops into the existing training loop.
What changed
New dataflow package —
cosmos_framework/data/vfm/dataflow/CosmosDataLoader(+batch_size=sugar →SimpleBatcher+DefaultBatchCollator),JointCosmosDataLoader(ratio-weighted heterogeneous join).IterableDistributor,MapDistributor(resumable),RankPartitionedDistributor,MixtureDistributor.IdentityProcessor(+ recipe-specificVLMProcessor,VideoPhy2Processor).SimpleBatcher,PoolPackingBatcher,SequentialPackingBatcher.DefaultBatchCollator,VFMListCollator(+ recipeVLMCollator).Legacy removal
data_packer.py,data_packer_dataloader.py,packing_iterable_dataset.py,test_dp_state_distributed.py(+ old tests).Experiment migrations
llava_ov(renamed fromllava_ov_datapacker, streamingIterableDistributor).videophy2_sft_nano.vision_sft_nano_v2(new-loader variant).llava_ov_mapresume— map-style (load_dataset(streaming=False)+MapDistributor) resumable example.Config / TOML
PATH_REMAPS["vlm"]: routedataloader_train.{max_samples_per_batch, max_sequence_length}→ nestedbatcher.{max_batch_size, max_tokens}.Checkpoint / resume
"data_packer"→"cosmos_dataloader"and env prefix
DP_STATE_→COSMOS_DL_STATE_(DataLoaderStateCallback,JointDataLoaderStateCallback,MapDistributor). On-disk format unchanged.CI / tests / docs
tests/launch_regression_test.py+ launch scripts for thellava_ovrename (golden loss keyed by
llava_ov; workflow-k llava_ov).docs/custom_dataset.mdwith theCosmosDataLoadertutorial; removeddocs/dataflow.md.Validation
legacy loader.
pre_exp012_llava_ov_mapresume(8 dp ranks,save_iter=100): per-rankinput_idsshapes identical across the resumeboundary — 792
(iter, rank)keys, 0 mismatches — and loss curves match.No duplicated/skipped samples on any rank.
llava_ovgolden recipe and its streaming data path areunchanged; the remap only affects the 3 VLM TOMLs, all of which compose cleanly
onto a real
PoolPackingBatcher.Hard invariant
Dataloader resume + checkpoint saving must not regress. Held: resume is preserved
through the existing
DataLoaderStateCallback, with map-style fast-forward and themulti-sample contiguity guard, and validated end-to-end above.