From 44c616ecb96d5e5f67614a3cb60f977d962d598e Mon Sep 17 00:00:00 2001 From: Heeji Han Date: Mon, 11 May 2026 04:53:24 +0900 Subject: [PATCH] [add] update the MAC measurement method for CTTC from ptflops to fvcore (m76707) --- compressai_vision/utils/measure_complexity.py | 589 +++++++++++++++--- 1 file changed, 517 insertions(+), 72 deletions(-) diff --git a/compressai_vision/utils/measure_complexity.py b/compressai_vision/utils/measure_complexity.py index b60f466..2af3ae0 100644 --- a/compressai_vision/utils/measure_complexity.py +++ b/compressai_vision/utils/measure_complexity.py @@ -5,7 +5,9 @@ import torch import torch.nn as nn -from ptflops import get_model_complexity_info +from fvcore.nn import FlopCountAnalysis +from fvcore.nn.jit_handles import conv_flop_jit +from fvcore.nn.jit_handles import get_shape, prod def calc_complexity_nn_part1_dn53(vision_model, img): @@ -15,12 +17,12 @@ def calc_complexity_nn_part1_dn53(vision_model, img): img = img[0]["image"].unsqueeze(0).to(device) # backbone - partial_model = vision_model.darknet - kmacs, _ = measure_mac( - partial_model=partial_model, - input_res=(img, vision_model.features_at_splits, True), - input_constructor=prepare_jde_darknet_input, - ) + partial_model = DarknetBackboneOnlyFvcoreWrapper( + vision_model.darknet, + vision_model.features_at_splits, + is_nn_part1=True, + ).eval() + kmacs = measure_kmacs(partial_model, img) pixels = reduce(operator.mul, [p_size for p_size in img.shape]) @@ -29,25 +31,23 @@ def calc_complexity_nn_part1_dn53(vision_model, img): def calc_complexity_nn_part2_dn53(vision_model, dec_features): assert "data" in dec_features - device = vision_model.device - + if isinstance(dec_features["data"][0], list): # image task # x = {k: v[0] for k, v in x.items()} - pass + # If image-task path exists, keep the same behavior as your original code. + # You can implement the mapping here if needed. + raise NotImplementedError("Image-task path is not implemented yet for DN53 complexity.") + else: # video task x = dec_features["data"] - x = { - k: v.unsqueeze(0).to(device=device) - for k, v in zip(vision_model.split_layer_list, x.values()) - } - # nn part2 - part1 (darknet) - partial_model = vision_model.darknet - kmacs, _ = measure_mac( - partial_model=partial_model, - input_res=(None, x, False), - input_constructor=prepare_jde_darknet_input, - ) + # NN-part2 (Darknet backbone only): store features in wrapper, pass tensor-only dummy input to fvcore + partial_model = DarknetNNPart2BackboneOnlyFvcoreWrapper(vision_model.darknet, vision_model.features_at_splits).eval() + + # fvcore input must be Tensor (wrapper overwrites internal 'x' from stored features) + x_dummy = next(iter(x.values())) + # fvcore input must be a Tensor; pick a deterministic dummy tensor + kmacs = measure_kmacs(partial_model, x_dummy) pixels = sum( [reduce(operator.mul, [p_size for p_size in d.shape]) for d in x.values()] @@ -63,10 +63,7 @@ def calc_complexity_nn_part1_plyr(vision_model, img): # backbone partial_model = vision_model.backbone - - kmacs, _ = measure_mac( - partial_model=partial_model, input_res=(C, H, W), input_constructor=None - ) + kmacs = measure_kmacs(partial_model, imgs.tensor) pixels = reduce(operator.mul, [p_size for p_size in imgs.tensor.shape]) @@ -74,64 +71,512 @@ def calc_complexity_nn_part1_plyr(vision_model, img): def calc_complexity_nn_part2_plyr(vision_model, dec_features, data): - if isinstance(data[0], list): # image task + # Special handling for image task (keep existing logic) + if isinstance(next(iter(data.values())), list): data = {k: v[0] for k, v in data.items()} device = vision_model.device - input_res_list, partial_model_lst, input_constructure_lst = [], [], [] - - # top block - C, H, W = data[len(data) - 1].shape # p5 shape - input_res_list.append((C, H, W)) - partial_model_lst.append(vision_model.top_block) - input_constructure_lst.append(None) - - # for proposal generator - C, H, W = data[0].shape # p2 shape - cdummy = dummy(dec_features["input_size"]) - input_res_list.append((cdummy, (1, C, H, W), device)) - partial_model_lst.append(vision_model.proposal_generator) - input_constructure_lst.append(prepare_proposal_input_fpn) - - # for roi head - feature_pyramid = {f"p{k + 2}": v.to(device) for k, v in data.items()} - feature_pyramid.update({"p6": vision_model.top_block(feature_pyramid["p5"])[0]}) - feature_pyramid = {k: v.unsqueeze(0) for k, v in feature_pyramid.items()} - proposals, _ = vision_model.proposal_generator(cdummy, feature_pyramid, None) - input_res_list.append((cdummy, (1, C, H, W), proposals, device)) - partial_model_lst.append(vision_model.roi_heads) - input_constructure_lst.append(prepare_roi_head_input_fpn) - - kmacs_sum = 0 - for partial_model, input_res, input_constructure in zip( - partial_model_lst, input_res_list, input_constructure_lst - ): - kmacs, _ = measure_mac( - partial_model=partial_model, - input_res=input_res, - input_constructor=input_constructure, - ) - - kmacs_sum = kmacs_sum + kmacs + # 1) Build feature pyramid using actual decoded features + # Convert each feature from (C, H, W) to (1, C, H, W) + feature_pyramid = {f'p{k+2}': v.to(device).unsqueeze(0) for k, v in data.items()} # p2–p5 + + # Generate p6 from p5 using top_block + top_block_out = vision_model.top_block(feature_pyramid["p5"]) + p6 = top_block_out[0] if isinstance(top_block_out, (tuple, list)) else top_block_out + feature_pyramid["p6"] = p6 # (1, C, H, W) + + # 2) Measure top_block FLOPs (p5 -> p6 only) + kmacs_sum = 0.0 + kmacs_sum += measure_kmacs(vision_model.top_block, feature_pyramid["p5"]) + + # 3) Measure RPN head only (exclude proposal post-processing) + rpn_head_model = RPNHeadOnlyFvcoreWrapper(vision_model.proposal_generator).eval() + kmacs_sum += measure_kmacs( + rpn_head_model, + ( + feature_pyramid["p2"], + feature_pyramid["p3"], + feature_pyramid["p4"], + feature_pyramid["p5"], + feature_pyramid["p6"], + ), + ) + + #4) Measure sem_seg_head if available + # Panoptic/Semantic models use sem_seg_head(x, None) + is_semseg = hasattr(vision_model, "sem_seg_head") and vision_model.sem_seg_head is not None + if is_semseg: + semseg_model = SemSegHeadFvcoreWrapper(vision_model.sem_seg_head).eval() + # IMPORTANT: pass dict as a single positional arg + kmacs_sum += measure_kmacs(semseg_model, (feature_pyramid,)) + + # 5) ROIHeads + # Run the proposal generator once to obtain actual proposals. + # Only the image size is required for Detectron2, so a minimal dummy object is used. + class _ImagesDummy: + def __init__(self, image_sizes): + self.image_sizes = image_sizes + + # Assume dec_features["input_size"] follows the same structure as the original pipeline + images = _ImagesDummy(dec_features["input_size"]) + + with torch.no_grad(): + proposals, _ = vision_model.proposal_generator(images, feature_pyramid, None) + + # 5-1) Measure box_head + box_predictor + # ROIAlign/Pooler is excluded from FLOPs due to ambiguity and potential CUDA/JIT issues. + # Instead, pooled features are obtained once and only NN blocks are measured. + if hasattr(vision_model, "roi_heads") and vision_model.roi_heads is not None: + roi_heads = vision_model.roi_heads + + # Follow roi_heads.in_features order if available + if hasattr(roi_heads, "in_features"): + in_feats = list(roi_heads.in_features) + else: + in_feats = ["p2", "p3", "p4", "p5"] + + feat_list = [feature_pyramid[f] for f in in_feats if f in feature_pyramid] + + # Convert proposals to the format required by the box_pooler + # Detectron2 uses Instances.proposal_boxes by default + boxes = [p.proposal_boxes for p in proposals] + + with torch.no_grad(): + pooled = roi_heads.box_pooler(feat_list, boxes) # (num_boxes, C, pool_h, pool_w) + + box_head_model = BoxHeadPredictorFvcoreWrapper(roi_heads).eval() + kmacs_sum += measure_kmacs(box_head_model, pooled) + + # 5-2) Measure mask head if available + if (not is_semseg) and ( + hasattr(roi_heads, "mask_head") + and roi_heads.mask_head is not None + and hasattr(roi_heads, "mask_pooler") + ): + # Only run when mask task is actually enabled + if sum(len(p) for p in proposals) > 0: + + # Run ROIHeads once to obtain pred_instances + with torch.no_grad(): + pred_instances, _ = roi_heads(images, feature_pyramid, proposals, None) + + # Skip if no detected objects + if sum(len(p) for p in pred_instances) > 0: + + # Mask pooling requires pred_boxes + mask_boxes = [p.pred_boxes for p in pred_instances] + + with torch.no_grad(): + mask_pooled = roi_heads.mask_pooler(feat_list, mask_boxes) + + pred_classes = torch.cat([p.pred_classes for p in pred_instances]) + mask_head_model = MaskHeadFvcoreWrapper(roi_heads, pred_classes).eval() + kmacs_sum += measure_kmacs(mask_head_model, mask_pooled) + pixels = sum( - [reduce(operator.mul, [p_size for p_size in d.shape]) for d in data.values()] + [reduce(operator.mul, list(d.shape)) for d in data.values()] ) return kmacs_sum, pixels -def measure_mac(partial_model, input_res, input_constructor): - macs, params = get_model_complexity_info( - partial_model, - input_res=input_res, - input_constructor=input_constructor, - as_strings=False, - print_per_layer_stat=False, - verbose=False, - ) - return macs / 1_000, params +def _flops_to_kmacs(total_flops: float) -> float: + """ + Convert FLOPs reported by fvcore into KMACs. + + Note: + - fvcore typically counts multiply and add operations separately. + - Therefore, FLOPs are divided by 2 to approximate MACs. + - The result is further scaled to kilo-MACs (KMACs). + """ + return float(total_flops) / 1e3 + + +def measure_kmacs(module: nn.Module, inputs, tag: str = None) -> float: + """ + Measure KMACs for a given module using fvcore. + + This function: + - Ensures the module is in evaluation mode. + - Automatically casts inputs to the correct device and dtype. + - Supports nested iterable inputs (tuple, list, dict). + - Handles modules without trainable parameters. + + Args: + module: Target neural network module. + inputs: Input tensor or nested structure of tensors. + tag: Optional name for logging. + + Returns: + KMACs value as a float. + """ + + module = module.eval() + + try: + p = next(module.parameters()) + except StopIteration: + name = tag or module.__class__.__name__ + print(f"[INFO] No parameters found in {name}, MACs set to 0.") + return 0.0 + + # Safe casting (recursive) + def _cast(x): + if torch.is_tensor(x): + return x.to(device=p.device, dtype=p.dtype) + elif isinstance(x, (list, tuple)): + return type(x)(_cast(v) for v in x) + elif isinstance(x, dict): + return {k: _cast(v) for k, v in x.items()} + else: + return x + + inputs = _cast(inputs) + + if torch.is_tensor(inputs): + inputs = (inputs,) # single input -> tuple + elif not isinstance(inputs, tuple): + inputs = (inputs,) # safe fallback + + with torch.no_grad(): + flops = FlopCountAnalysis(module, inputs) + flops.set_op_handle(**{ + # conv ops by conv_flop_jit + "aten::conv2d": conv_flop_jit, + "aten::_convolution": conv_flop_jit, + "aten::cudnn_convolution": conv_flop_jit, + # element-wise ops (out-of-place) + "aten::add": elemwise_flop_jit, + "aten::add_": elemwise_flop_jit, + "aten::mul": elemwise_flop_jit, + "aten::mul_": elemwise_flop_jit, + "aten::exp": elemwise_flop_jit, + "aten::clamp_min": elemwise_flop_jit, + "aten::div": elemwise_flop_jit, + "aten::abs": elemwise_flop_jit, + "aten::reciprocal": elemwise_flop_jit, + "aten::round": elemwise_flop_jit, + "aten::leaky_relu": elemwise_flop_jit, + # pooling + "aten::max_pool2d": max_pool2d_flop_jit, + }) + total_flops = flops.total() + + del flops + + kmacs = _flops_to_kmacs(total_flops) + name = tag or module.__class__.__name__ + #print(f"[INFO] {name}: KMACs = {kmacs}") + return kmacs + +class SemSegHeadFvcoreWrapper(nn.Module): + def __init__(self, sem_seg_head: nn.Module): + super().__init__() + self.sem_seg_head = sem_seg_head + + def forward(self, x): + # detectron2 style: returns (sem_seg_results, losses) or similar + out = self.sem_seg_head(x, None) + return out[0] if isinstance(out, (tuple, list)) else out + +class RPNHeadOnlyFvcoreWrapper(nn.Module): + """ + Wrapper for Detectron2 RPN to measure FLOPs only for the neural network part. + + This excludes proposal generation and post-processing steps + such as Top-K selection, sorting, and NMS. + + Only the RPN head (convolution + classification + regression) + is executed for FLOPs measurement. + """ + def __init__(self, proposal_generator): + super().__init__() + self.pg = proposal_generator # Detectron2 RPN module + + def forward(self, p2, p3, p4, p5, p6): + feats = [p2, p3, p4, p5, p6] + return self.pg.rpn_head(feats) # Returns objectness logits and box deltas + + +class BoxHeadPredictorFvcoreWrapper(nn.Module): + """ + Wrapper to measure FLOPs for ROI box head and predictor only. + + The full ROIHeads module is not executed to avoid non-NN components. + The input is expected to be pooled box features. + + Input shape: + (num_boxes, C, pool_h, pool_w) + """ + def __init__(self, roi_heads): + super().__init__() + self.box_head = roi_heads.box_head + self.box_predictor = roi_heads.box_predictor + + def forward(self, box_features): + x = self.box_head(box_features) + scores, deltas = self.box_predictor(x) + return scores, deltas + + +class MaskHeadFvcoreWrapper(nn.Module): + def __init__(self, roi_heads, pred_classes): + super().__init__() + self.mask_head = roi_heads.mask_head + self.pred_classes = pred_classes + + def forward(self, mask_features): + # simulate detectron2 mask inference + return self.mask_head.layers(mask_features) + +class DarknetBackboneOnlyFvcoreWrapper(nn.Module): + """ + fvcore-friendly wrapper for Darknet that always returns a Tensor. + + - Runs the same module_list loop as Darknet.forward() + - Skips 'yolo' heads (so you can measure backbone-only FLOPs) + - Returns the last feature tensor `x` instead of detection output + (prevents returning None when no yolo layers are executed) + """ + def __init__(self, darknet: nn.Module, splits: dict, is_nn_part1: bool): + super().__init__() + self.darknet = darknet + self.splits = splits + self.is_nn_part1 = is_nn_part1 + + def forward(self, x): + # local aliases + module_defs = self.darknet.module_defs + module_list = self.darknet.module_list + + layer_outputs = [] + had_yolo = False + + if self.is_nn_part1: + sidx = 0 + eidx = max(self.splits.keys()) + 1 + splits = self.splits + else: + features = self.splits.copy() + max_id = max(features.keys()) + + if max_id <= 74: + sidx = max_id + 1 + for idx in range(0, sidx): + if idx not in features: + layer_outputs.append(None) + else: + x = features[idx] + layer_outputs.append(x) + else: + sidx = min(features.keys()) + + eidx = len(module_list) + splits = features # reuse name for convenience + + for i, (module_def, module) in enumerate(zip(module_defs[sidx:eidx], module_list[sidx:eidx])): + nn_idx = i + sidx + + if not self.is_nn_part1: + if nn_idx in splits: + x = splits[nn_idx] + layer_outputs.append(x) + splits.pop(nn_idx) + had_yolo = False + continue + elif had_yolo is True and nn_idx < min(splits.keys()): + continue + + mtype = module_def["type"] + + if mtype in ["convolutional", "upsample", "maxpool"]: + x = module(x) + + elif mtype == "route": + layer_i = [int(v) for v in module_def["layers"].split(",")] + if len(layer_i) == 1: + x = layer_outputs[layer_i[0]] + else: + x = torch.cat([layer_outputs[j] for j in layer_i], 1) + + elif mtype == "shortcut": + layer_i = int(module_def["from"]) + x = layer_outputs[-1] + layer_outputs[layer_i] + + elif mtype == "yolo": + # IMPORTANT: skip yolo head so we only count backbone FLOPs + had_yolo = True + # keep x unchanged, just store it + + layer_outputs.append(x) + + if self.is_nn_part1: + if nn_idx in self.splits: + self.splits[nn_idx] = x + + # Always return a tensor to make tracing stable + return x + +class DarknetNNPart2BackboneOnlyFvcoreWrapper(nn.Module): + """ + fvcore-friendly wrapper for Darknet assuming is_nn_part1 == False only. + + - Measures FLOPs for the backbone path only (skips YOLO heads and post-processing). + - Preserves Darknet's original nn-part2 feature injection logic. + - Always returns a Tensor to avoid fvcore reporting 0 FLOPs due to None outputs. + + Inputs: + - forward(x_dummy): A placeholder Tensor for fvcore tracing. + NOTE: This tensor is NOT used for computation. We initialize `x` from injected features. + """ + + def __init__(self, darknet: nn.Module, features: dict): + super().__init__() + self.darknet = darknet + self.features = features # dict[int, Tensor] + + self.is_nn_part1 = False # fixed for this wrapper + + def forward(self, x_dummy: torch.Tensor) -> torch.Tensor: + # ---- Fixed assumption: is_nn_part1 is always False (nn-part2) ---- + module_defs = self.darknet.module_defs + module_list = self.darknet.module_list + + # Working copy (same as original) + features = self.features.copy() + + layer_outputs = [] + output = [] # not used (we skip yolo), kept for structural similarity + had_yolo = False + + max_id = max(features.keys()) + + # Match original nn-part2 logic for sidx/eidx and pre-filling layer_outputs + if max_id <= 74: + sidx = max_id + 1 + + # Pre-fill layer_outputs[0:sidx] with injected features or None + # Also pick a valid initial x from the earliest available injected feature + for idx in range(0, sidx): + if idx not in features: + layer_outputs.append(None) + else: + x = features[idx] + layer_outputs.append(x) + else: + sidx = min(features.keys()) + + eidx = len(module_list) + # IMPORTANT: do NOT start from x_dummy (can cause channel mismatch before injection) + + # Main loop (same structure as original, but assumes nn-part2 only) + for i, (module_def, module) in enumerate( + zip(module_defs[sidx:eidx], module_list[sidx:eidx]) + ): + nn_idx = i + sidx + + # --- Feature injection (same as original) --- + if nn_idx in features.keys(): + x = features[nn_idx] + layer_outputs.append(x) + features.pop(nn_idx) + had_yolo = False + continue + elif had_yolo is True and len(features) > 0 and nn_idx < min(features.keys()): + continue + + mtype = module_def["type"] + if mtype in ["convolutional", "upsample", "maxpool"]: + x = module(x) + + elif mtype == "route": + layer_i = [int(v) for v in module_def["layers"].split(",")] + if len(layer_i) == 1: + x = layer_outputs[layer_i[0]] + else: + x = torch.cat([layer_outputs[j] for j in layer_i], 1) + + elif mtype == "shortcut": + layer_i = int(module_def["from"]) + x = layer_outputs[-1] + layer_outputs[layer_i] + + elif mtype == "yolo": + x = module[0](x, self.darknet.img_size) + had_yolo = True + # Keep x unchanged + + layer_outputs.append(x) + + # Always return a Tensor so fvcore can produce FLOPs stats + return x + +def elemwise_flop_jit(inputs, outputs): + # outputs can be Tensor or tuple/list of Tensors + out = outputs[0] if isinstance(outputs, (tuple, list)) else outputs + return prod(get_shape(out)) # 1 flop per output element (approx.) + +def max_pool2d_flop_jit(inputs, outputs): + """ + Approximate FLOPs for max_pool2d. + + Convention: + - For each output element, max-pool performs (kH*kW - 1) comparisons. + - We count comparisons as 1 FLOP each (approx). + """ + # aten::max_pool2d signature (typical): + # inputs = [x, kernel_size, stride, padding, dilation, ceil_mode] + x = inputs[0] + y = outputs[0] + + out_numel = _value_numel(y) + if out_numel == 0: + return 0 + + k = _to_ivalue(inputs[1], default=None) # could be int or (kH,kW) or list + if isinstance(k, int): + kH, kW = k, k + elif isinstance(k, (list, tuple)) and len(k) == 2: + kH, kW = int(k[0]), int(k[1]) + else: + # Fallback: if kernel size is not statically available, assume 1x1 + kH, kW = 1, 1 + + # comparisons per output = kH*kW - 1 + return int(out_numel) * max(int(kH) * int(kW) - 1, 0) + +def _value_sizes(v): + """ + Get static tensor sizes from torch._C.Value (JIT IR value). + Returns a list like [N, C, H, W] or None if unknown. + """ + try: + t = v.type() + if hasattr(t, "sizes") and t.sizes() is not None: + return list(t.sizes()) + except Exception: + pass + return None + +def _value_numel(v): + sizes = _value_sizes(v) + if not sizes or any(s is None for s in sizes): + return 0 + n = 1 + for s in sizes: + n *= int(s) + return n + +def _to_ivalue(v, default=None): + """ + Try to materialize constant from torch._C.Value if it is a constant. + Works for many prim::Constant-derived Values. + """ + try: + return v.toIValue() + except Exception: + return default class dummy: