Skip to content

Refactor datapackerdataloader to be modular#19

Open
foreverlms wants to merge 72 commits into
mainfrom
maoshengl/dataset_enhancement
Open

Refactor datapackerdataloader to be modular#19
foreverlms wants to merge 72 commits into
mainfrom
maoshengl/dataset_enhancement

Conversation

@foreverlms

@foreverlms foreverlms commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Summary

Refactors the training data layer from the monolithic DataPackerDataLoader /
DataPacker / PackingIterableDataset into a modular, four-role abstraction
wired by a single loader. Behavior is preserved (golden-batch byte-identical to
the legacy loader; resume validated live), and all existing recipes are migrated.

DataDistributor → RawItemProcessor → SampleBatcher → BatchCollator
(shard/shuffle/ (raw item → one (samples → (group → one
resume) sample dict) batch groups) batch dict)

Each role is a small ABC with one required method; pick a built-in per slot or
write your own. CosmosDataLoader is a torch.utils.data.DataLoader subclass, so
it drops into the existing training loop.

What changed

New dataflow package — cosmos_framework/data/vfm/dataflow/

  • Loaders: CosmosDataLoader (+ batch_size= sugar → SimpleBatcher +
    DefaultBatchCollator), JointCosmosDataLoader (ratio-weighted heterogeneous join).
  • Distributors: IterableDistributor, MapDistributor (resumable),
    RankPartitionedDistributor, MixtureDistributor.
  • Processors: IdentityProcessor (+ recipe-specific VLMProcessor,
    VideoPhy2Processor).
  • Batchers: SimpleBatcher, PoolPackingBatcher, SequentialPackingBatcher.
  • Collators: DefaultBatchCollator, VFMListCollator (+ recipe VLMCollator).

Legacy removal

  • Deleted data_packer.py, data_packer_dataloader.py,
    packing_iterable_dataset.py, test_dp_state_distributed.py (+ old tests).

Experiment migrations

  • VLM llava_ov (renamed from llava_ov_datapacker, streaming IterableDistributor).
  • VLM videophy2_sft_nano.
  • VFM: existing path unchanged; added vision_sft_nano_v2 (new-loader variant).
  • Added llava_ov_mapresume — map-style (load_dataset(streaming=False) +
    MapDistributor) resumable example.

Config / TOML

  • PATH_REMAPS["vlm"]: route dataloader_train.{max_samples_per_batch, max_sequence_length} → nested batcher.{max_batch_size, max_tokens}.

Checkpoint / resume

  • Renamed the resume-state selector value "data_packer""cosmos_dataloader"
    and env prefix DP_STATE_COSMOS_DL_STATE_ (DataLoaderStateCallback,
    JointDataLoaderStateCallback, MapDistributor). On-disk format unchanged.

CI / tests / docs

  • Updated tests/launch_regression_test.py + launch scripts for the llava_ov
    rename (golden loss keyed by llava_ov; workflow -k llava_ov).
  • Added golden-batch, resume, and per-role unit tests.
  • Replaced docs/custom_dataset.md with the CosmosDataLoader tutorial; removed
    docs/dataflow.md.

Validation

  • Golden-batch equality: VLM / videophy2 / VFM batches byte-identical to the
    legacy loader.
  • Live save→stop→resume on pre_exp012_llava_ov_mapresume (8 dp ranks,
    save_iter=100): per-rank input_ids shapes identical across the resume
    boundary — 792 (iter, rank) keys, 0 mismatches — and loss curves match.
    No duplicated/skipped samples on any rank.
  • No CI risk: the llava_ov golden recipe and its streaming data path are
    unchanged; the remap only affects the 3 VLM TOMLs, all of which compose cleanly
    onto a real PoolPackingBatcher.

Hard invariant

Dataloader resume + checkpoint saving must not regress. Held: resume is preserved
through the existing DataLoaderStateCallback, with map-style fast-forward and the
multi-sample contiguity guard, and validated end-to-end above.

foreverlms and others added 30 commits June 4, 2026 08:40
Four-role decomposition (DataDistributor / RawItemProcessor /
SampleBatcher / BatchCollator) replacing the fused DataPacker +
PackingIterableDataset. Covers VLM + VFM behavior-preserving migration
and arbitrary user dataflows.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ment

Validate the four-role abstraction against ALL dataloaders (not just
VLM/VFM): WebDataset base is vestigial, joint_dataloader fusions are
composition artifacts, videophy2 + DROID covered, legacy Iterative/Random
expressible. Record per-recipe processor placement and collator nuances.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…h debt

IdentityProcessor + still-processing dataset is a conscious break from the
four-role separation, chosen for safety/minimal diff; tracked as a follow-up
to extract real SFTVideoProcessor / DROIDProcessor later.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Keep old dataloaders as a living baseline; validate each live recipe via a
mirror *_datapacker experiment with golden-batch + deterministic loss-curve
equivalence (leveraging train.py --deterministic). Delete old code only in a
final cleanup PR after all mirrors are validated.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Task-oriented bring-your-own-dataset tutorial (mental model, quickstart,
four roles, use-case recipes, wiring, resume, sharding, FAQ, worked
example). Wired into the implementation phases and docs index.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Align built-in-roles note with the per-recipe IdentityProcessor decision
(SFTVideoProcessor/DROIDProcessor are deferred follow-ups). Bump status.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Bite-sized TDD plan for the four role ABCs + IdentityProcessor /
DefaultBatchCollator / SimpleBatcher / IterableDistributor / MapDistributor
+ the DataPackerDataLoader orchestrator (batch_size sugar, DP-coord
resolution). New dataflow/ package, no existing recipe touched. Resume and
packing batchers deferred to follow-up plans.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Resume + checkpoint saving must not break (blocking gate, dedicated resume
test). Regression = baseline launch scripts vs mirror *_datapacker experiments,
logging_iter=1/max_iter=500, wandb project cosmos_oss_alignment, fresh run
names, env via cosmos3-run-env.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ression

PoolPackingBatcher (port of the packing engine), VLMProcessor/VLMCollator
extraction, golden-batch equality test, mirror experiment + TOML + launch
wrapper, and the loss-curve regression run (baseline vs mirror, logging_iter=1,
max_iter=500, wandb cosmos_oss_alignment). Resume invariant honored (iterable
source, callbacks untouched).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…smosDataLoader)

DataPacker is gone and packing is just one batcher, so the orchestrator gets a
fitting name. New loader lives in dataflow/loader.py; legacy DataPackerDataLoader
/ JointDataPackerDataLoader stay as the regression baseline until cleanup. Legacy
references in golden tests / coverage audit kept intact.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Plan 3: MapDistributor env-var resume + CosmosDataLoader meta-threading,
        callback unchanged, resume integration test (HARD INVARIANT).
Plan 4: videophy2 migration (VideoPhy2Processor, reuse VLMCollator), golden +
        regression.
Plan 5: VFM migration (VFMListCollator, SequentialPackingBatcher,
        RankPartitionedDistributor) + MixtureDistributor + JointCosmosDataLoader.
Plan 6: docs/dataflow.md tutorial + cleanup (promote mirrors, delete legacy);
        execution gated LAST on all regressions passing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…SampleBatcher/BatchCollator)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Introduces loader.py with CosmosDataLoader (thin DataLoader subclass) and
_DataflowIterableDataset, wiring DataDistributor -> RawItemProcessor ->
SampleBatcher -> BatchCollator per worker. Adds batch_size= convenience sugar
(builds SimpleBatcher + DefaultBatchCollator) and rejects ambiguous combos.
Exports CosmosDataLoader from the dataflow package __init__.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Appends test_multiworker_disjoint_and_complete_one_epoch to loader_test.py:
spins up CosmosDataLoader with num_workers=2 over a 12-item MapDataset,
consumes exactly one epoch, and asserts all 12 indices are seen exactly once
with no duplicates or gaps.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…istent_workers coercion

MapDistributor.stream now returns immediately when n==0 or stream_id>=n,
preventing an infinite spin that would hang training when the dataset is
empty or smaller than dp_world_size*num_workers. Three regression tests
added (empty dataset, empty shard, non-empty shard stays infinite).

CosmosDataLoader now logs an info message and explicitly sets
persistent_workers=False when num_workers==0, instead of silently
coercing True to False inside the dict literal.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…t engine)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Copies _decode_image / _sharegpt_to_openai / sft_process_sample /
sft_collate_fn logic verbatim from VLMDataPacker into standalone
RawItemProcessor + BatchCollator roles, ready for use with CosmosDataLoader.
Legacy files are untouched.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Proves that CosmosDataLoader+VLMProcessor+VLMCollator+PoolPackingBatcher
produces identical batches to the legacy DataPackerDataLoader+VLMDataPacker
on the same fixed iterable source (40 items, drain 10 batches).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… dataflow loader

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…sion

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add _DataflowIterableDataset._cached_gen so single-process (num_workers=0)
MapDistributor iter() calls reuse the same generator, preserving resume state
across iter() calls (analogous to persistent_workers for multi-worker mode).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…le v2 dataloader_train

- loader.py: revert the _cached_gen generator caching (commit 1c4d017). __iter__
  returns a fresh generator per call, matching legacy/torch semantics. The resume
  test was the real bug, not the loader.
- resume_test.py: use a single iterator (it2 = iter(loader2)) so the env-var
  fast-forward is consumed once and iteration continues 5,6,7.
- llava_ov_datapacker_v2.toml: drop [dataloader_train].max_samples_per_batch — it
  remaps to dataloader_train.max_batch_size which does not exist on the new
  CosmosDataLoader node (PoolPackingBatcher owns max_batch_size), causing
  ConfigAttributeError at config resolution.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…guity guard

- PoolPackingBatcher.sample_size default uses len(sample["input_ids"]) (matches
  the documented contract; identical to .shape[0] for 1-D tensors, also handles
  list input_ids).
- CosmosDataLoader: keep the legacy bit-for-bit max(epoch)/max(pos) resume stamp
  (correct for all live recipes — VLM/videophy2 use max_batch_size=1; VFM uses a
  non-resumable iterable source). NOT switching to min-1 (Codex's suggestion),
  which would make max_batch_size=1 replay the last sample on every resume.
  Instead, add a contiguity/epoch guard: a multi-sample batch whose _dp_stream_pos
  are non-contiguous or span epochs (reordering batcher + batch_size>1) now raises
  loudly instead of silently skipping buffered lower positions on resume.
- Add regression test for the guard.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
foreverlms and others added 6 commits June 4, 2026 22:22
VLM/videophy2 v2 mirrors were folded into the canonical experiments, so the
tutorial now points at pre_exp012_llava_ov and videophy2_sft_nano (plus
vision_sft_nano_v2 for the VFM new-loader variant) instead of the deleted
_v2 mirror names.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
… batcher

The fixed [dataloader_train] TOML fields now drive the new loader via PATH_REMAPS["vlm"]:
  max_samples_per_batch -> dataloader_train.batcher.max_batch_size
  max_sequence_length   -> dataloader_train.batcher.max_tokens
(previously these targeted the flat dataloader_train.{max_batch_size,max_tokens}
that existed on the legacy DataPackerDataLoader but not on CosmosDataLoader, whose
PoolPackingBatcher owns those knobs). Re-add the [dataloader_train] block to
llava_ov.toml + videophy2_sft_nano.toml (max_samples_per_batch=1,
max_sequence_length=16000) so the loader is TOML-tunable again; update the
DataloaderTrainConfig docstrings.

Verified: TOML max_sequence_length=12345 -> resolved batcher.max_tokens=12345;
dryrun resolves for all 5 live recipes (llava_ov, videophy2, vision_sft_nano,
vision_sft_nano_v2, vision_sft_super); 57 dataflow/experiment tests pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ov_mapresume) for resume validation

Introduces get_llava_ov_map (materialises 800 streamed items into a map-style
datasets.Dataset) and pre_exp012_llava_ov_mapresume backed by MapDistributor
(shuffle=True, seed=42, num_workers=0).  The dataloader_state callback is
overridden to distributor_type="data_packer" so DP_STATE_WORKER_* env vars
are set on resume.  Dryrun verified: CosmosDataLoader + MapDistributor +
distributor_type=data_packer + save_iter=100 all resolve correctly.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…smos_dataloader"

The distributor_type value "data_packer" + env prefix DP_STATE_ were named after
the removed DataPacker. Rename the value -> "cosmos_dataloader" and the env prefix
-> COSMOS_DL_STATE_ (callback + MapDistributor reader + resume/distributor tests +
the map-resume recipe). Keep the field name `distributor_type` (the broader legacy
shardlist-strategy selector) and add a docstring clarifying it is NOT the dataflow
DataDistributor role.

Verified: 18 resume/distributor tests pass (callback<->MapDistributor env prefixes
stay matched); map-resume dryrun shows dataloader_state.distributor_type=cosmos_dataloader.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…map Dataset

Switch get_llava_ov_map from "stream + islice into Dataset.from_list" to a
genuine random-access map dataset via load_dataset(..., streaming=False), then
filter and cap to n rows with .select(). This exercises MapDistributor against
a true map-style Dataset (its intended case). Bump n 800 -> 4000 so that, at
dp_shard=8 (500 samples/rank/epoch), the iter-100 checkpoint lands mid-epoch
(100 samples/rank) — a clean no-epoch-wrap resume test.

Dryrun verified: dataloader_train -> MapDistributor(get_llava_ov_map, n=4000,
shuffle=true), dataloader_state.distributor_type=cosmos_dataloader, save_iter=100.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…emove dataflow.md

The old custom_dataset.md documented the deleted DataPackerDataLoader/DataPacker
API. Replace it with the bring-your-own-dataset tutorial for the four-role
CosmosDataLoader (DataDistributor → RawItemProcessor → SampleBatcher →
BatchCollator), folding in the content from docs/dataflow.md (now deleted, single
source of truth).

Updates vs the dataflow.md draft:
- resume uses distributor_type="cosmos_dataloader" + COSMOS_DL_STATE_* (the rename)
- JointCosmosDataLoader shown with dataloaders=/JointDataLoaderStateCallback
- notes the TOML->batcher PATH_REMAPS["vlm"] routing
- real-world examples point at the current recipes (llava_ov, llava_ov_mapresume,
  videophy2_sft_nano, vision_sft_nano_v2) + a new-API checklist

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@foreverlms foreverlms marked this pull request as ready for review June 5, 2026 09:37
Comment thread cosmos_framework/configs/base/experiment/sft/vision_sft_nano_v2.py Outdated
Comment thread examples/toml/sft_config/llava_ov_mapstyle_dataloader.toml
Comment thread cosmos_framework/data/vfm/dataflow/batchers_test.py Outdated
foreverlms and others added 7 commits June 5, 2026 03:47
… addlicense)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…loader

Rename the map-style resumable VLM example (experiment module, TOML, the
pre_exp012_llava_ov_mapresume -> pre_exp012_llava_ov_mapstyle_dataloader
registration name, and doc references). Auto-discovered by filename via
pkgutil.iter_modules, so no import to update.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…epcopy

vision_sft_nano_v2 differed from vision_sft_nano only in job.name and the
training dataloader (legacy PackingDataLoader+RankPartitionedDataLoader vs the
four-role CosmosDataLoader stack). Derive v2 by deep-copying vision_sft_nano and
overriding just those two fields; delete the standalone vision_sft_nano_v2.py
module + its config.py import (v2 now registers from vision_sft_nano.py).
Experiment name "vision_sft_nano_v2" is unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ment.py via deepcopy

pre_exp012_llava_ov_mapstyle_dataloader differed from pre_exp012_llava_ov only in
job.name, the dataloader_state callback (distributor_type="cosmos_dataloader"),
checkpoint.save_iter, and the dataloader (streaming IterableDistributor vs
map-style MapDistributor + num_workers=0). Derive it by deep-copying
pre_exp012_llava_ov and overriding just those fields; move get_llava_ov_map
alongside get_llava_ov_streaming. Delete the standalone module (VLM experiments
auto-register via import_all_modules_from_package). Experiment name and TOML
unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the per-module unit tests that sat next to the dataflow source
(base/batchers/collators/distributors/loader/processors/joint_loader tests, the
VLM/VideoPhy2 dataflow_roles tests, and the vision_sft_nano_v2 registration
test). Keep the two behavior guarantees: golden_vfm_test.py (byte-identical to
the legacy loader) and resume_test.py (checkpoint resume = no dup/skip).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Commit 7bd72ee added the CosmosDataLoader imports + removed the standalone
module/config.py import, but the vision_sft_nano_v2 deepcopy block and the
registration-loop update were dropped in a messy git-reset sequence — leaving
config.py expecting an experiment that vision_sft_nano.py no longer defined
("Could not find experiment/vision_sft_nano_v2"). Re-add the deepcopy derivation
and register both experiments.

Verified: all 6 example recipes dry-run clean (vision_sft_nano_v2 resolves to
CosmosDataLoader -> RankPartitionedDistributor -> SequentialPackingBatcher ->
VFMListCollator).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Comment thread examples/toml/sft_config/vision_sft_nano_mapstyle_dataloader.toml
Comment thread cosmos_framework/configs/base/vlm/experiment/llava_ov_vlm.py
yy-code-nv
yy-code-nv previously approved these changes Jun 8, 2026
@lfengad lfengad enabled auto-merge (squash) June 9, 2026 04:58
lfengad
lfengad previously approved these changes Jun 9, 2026
foreverlms and others added 2 commits June 8, 2026 23:32
…dataloader_state.py

dataloader_state.py was modified in commit 4855c86 to add "cosmos_dataloader"
as a distributor_type and change the env prefix to COSMOS_DL_STATE_. This
entangled CosmosDataLoader-specific resume logic with the legacy
DataLoaderStateCallback that serves WebDataset "no_replace" / "data_packer"
shardlist strategies.

Revert dataloader_state.py fully to main (data_packer, DP_STATE_ prefix) and
introduce cosmos_dataloader_state.py with two dedicated classes:

- CosmosDataLoaderStateCallback: purpose-built for CosmosDataLoader(MapDistributor).
  No distributor_type discriminator — always active. Writes COSMOS_DL_STATE_*
  env vars on load_state_dict so MapDistributor.stream fast-forwards on resume.
  Accepts distributor_type kwarg (ignored) for Hydra struct-merge compat with
  the base DataLoaderStateCallback entry in the basic_log callbacks group.

- JointCosmosDataLoaderStateCallback: for JointCosmosDataLoader. Persists
  outer global_id + inner per-dataset per-worker (epoch, index) via one
  CosmosDataLoaderStateCallback per inner loader.

Update llava_ov_vlm.py and resume_test.py to use the new callback.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…llback

DataPackerDataLoader and JointDataPackerDataLoader were removed earlier in
this branch. Clean up the remnants in dataloader_state.py:

- Drop "data_packer" from _ACTIVE_DISTRIBUTOR_TYPES (only "no_replace" remains)
- Remove the DP_STATE_ env-var branch in load_state_dict
- Delete JointDataLoaderStateCallback (existed solely for the deleted
  JointDataPackerDataLoader; JointCosmosDataLoaderStateCallback covers the
  CosmosDataLoader joint-loader case)
- Fix stale JointDataLoaderStateCallback docstring reference in loader.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
lfengad
lfengad previously approved these changes Jun 9, 2026
…cipe

Mirrors launch_sft_llava_ov.sh for the MapDistributor-backed VLM SFT recipe.
Supports optional RESUME_FROM_CKPT env var for checkpoint resume via
CosmosDataLoaderStateCallback.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants