From ba427c2dda3fff526e2fb19b4c811b93e880e419 Mon Sep 17 00:00:00 2001 From: Pranav Rai Date: Tue, 21 Apr 2026 23:12:20 +0100 Subject: [PATCH] Fix the IBL Cbin extractor so that it works with 4 split shanks as well I am taking the latest way of getting the sample shifts during the merge conflit. --- src/spikeinterface/extractors/cbin_ibl.py | 79 ++++++++++++++++++++++- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/src/spikeinterface/extractors/cbin_ibl.py b/src/spikeinterface/extractors/cbin_ibl.py index 2a53b999e3..4603095e29 100644 --- a/src/spikeinterface/extractors/cbin_ibl.py +++ b/src/spikeinterface/extractors/cbin_ibl.py @@ -1,4 +1,5 @@ from pathlib import Path +import re import warnings import numpy as np @@ -9,6 +10,56 @@ from spikeinterface.core.core_tools import define_function_from_class +def _parse_spikeglx_meta_table(meta_value): + """Return parenthesized entries from a SpikeGLX meta value. + + Accepts either the raw string form ``"(a)(b)(c)"`` or an already-parsed list. + """ + if isinstance(meta_value, str): + return re.findall(r"\(([^()]*)\)", meta_value) + return list(meta_value) + + +def _parse_saved_channel_subset(subset_text): + if subset_text is None or subset_text == "all": + return None + channels = [] + for entry in subset_text.split(","): + if ":" in entry: + start, stop = entry.split(":") + channels.extend(np.arange(int(start), int(stop) + 1)) + else: + channels.append(int(entry)) + return np.asarray(channels, dtype="int64") + + +def _get_saved_channel_indices(meta): + subset_text = meta.get("snsSaveChanSubset_orig") + if subset_text is None: + subset_text = meta.get("snsSaveChanSubset") + return _parse_saved_channel_subset(subset_text) + + +def _read_spikeglx_probe(meta_file, meta): + """Build a Probe for a SpikeGLX meta, honoring IBL shank-split subsets.""" + if "snsSaveChanSubset_orig" not in meta: + return probeinterface.read_spikeglx(meta_file) + from probeinterface.neuropixels_tools import _read_imro_string + + probe = _read_imro_string(imro_str=meta["imroTbl"], imDatPrb_pn=meta.get("imDatPrb_pn")) + saved = _get_saved_channel_indices(meta) + if saved is not None: + saved = saved[saved < probe.get_contact_count()] + if saved.size != probe.get_contact_count(): + probe = probe.get_slice(saved) + probe.annotate(serial_number=meta.get("imDatPrb_sn", meta.get("imProbeSN", None))) + probe.annotate(part_number=meta.get("imDatPrb_pn", None)) + probe.annotate(port=meta.get("imDatPrb_port", None)) + probe.annotate(slot=meta.get("imDatPrb_slot", None)) + probe.set_device_channel_indices(np.arange(probe.get_contact_count())) + return probe + + class CompressedBinaryIblExtractor(BaseRecording): """Load IBL data as an extractor object. @@ -99,7 +150,7 @@ def __init__(self, folder_path=None, load_sync_channel=False, stream_name="ap", self.set_channel_offsets(offsets) if not load_sync_channel: - probe = probeinterface.read_spikeglx(meta_file) + probe = _read_spikeglx_probe(meta_file, meta) if probe.shank_ids is not None: self.set_probe(probe, in_place=True, group_mode="by_shank") @@ -204,9 +255,31 @@ def extract_stream_info(meta_file, meta): info["stream_kind"] = stream_kind info["stream_name"] = stream_name info["units"] = units - info["channel_names"] = [txt.split(";")[0] for txt in meta["snsChanMap"]] + + chan_map_entries = _parse_spikeglx_meta_table(meta["snsChanMap"]) + # First entry is the header tuple like "(384,0,1)"; drop it when present. + if chan_map_entries and "," in chan_map_entries[0] and ";" not in chan_map_entries[0]: + chan_map_entries = chan_map_entries[1:] + full_channel_names = [entry.split(";")[0] for entry in chan_map_entries] + + saved_indices = _get_saved_channel_indices(meta) + if saved_indices is not None and len(full_channel_names) >= saved_indices.max() + 1: + channel_names = [full_channel_names[i] for i in saved_indices] + else: + channel_names = full_channel_names + + channel_offsets = np.zeros(num_chan) + + if len(channel_names) != num_chan or channel_gains.shape[0] != num_chan: + raise ValueError( + "Parsed channel metadata does not match nSavedChans: " + f"channel_names={len(channel_names)}, channel_gains={channel_gains.shape[0]}, " + f"num_chan={num_chan}" + ) + + info["channel_names"] = channel_names info["channel_gains"] = channel_gains - info["channel_offsets"] = np.zeros(info["num_chan"]) + info["channel_offsets"] = channel_offsets info["has_sync_trace"] = has_sync_trace return info