Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 76 additions & 3 deletions src/spikeinterface/extractors/cbin_ibl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
import re
import warnings
import numpy as np

Expand All @@ -9,6 +10,56 @@
from spikeinterface.core.core_tools import define_function_from_class


def _parse_spikeglx_meta_table(meta_value):
"""Return parenthesized entries from a SpikeGLX meta value.

Accepts either the raw string form ``"(a)(b)(c)"`` or an already-parsed list.
"""
if isinstance(meta_value, str):
return re.findall(r"\(([^()]*)\)", meta_value)
return list(meta_value)


def _parse_saved_channel_subset(subset_text):
if subset_text is None or subset_text == "all":
return None
channels = []
for entry in subset_text.split(","):
if ":" in entry:
start, stop = entry.split(":")
channels.extend(np.arange(int(start), int(stop) + 1))
else:
channels.append(int(entry))
return np.asarray(channels, dtype="int64")


def _get_saved_channel_indices(meta):
subset_text = meta.get("snsSaveChanSubset_orig")
if subset_text is None:
subset_text = meta.get("snsSaveChanSubset")
return _parse_saved_channel_subset(subset_text)


def _read_spikeglx_probe(meta_file, meta):
"""Build a Probe for a SpikeGLX meta, honoring IBL shank-split subsets."""
if "snsSaveChanSubset_orig" not in meta:
return probeinterface.read_spikeglx(meta_file)
from probeinterface.neuropixels_tools import _read_imro_string

probe = _read_imro_string(imro_str=meta["imroTbl"], imDatPrb_pn=meta.get("imDatPrb_pn"))
saved = _get_saved_channel_indices(meta)
if saved is not None:
saved = saved[saved < probe.get_contact_count()]
if saved.size != probe.get_contact_count():
probe = probe.get_slice(saved)
probe.annotate(serial_number=meta.get("imDatPrb_sn", meta.get("imProbeSN", None)))
probe.annotate(part_number=meta.get("imDatPrb_pn", None))
probe.annotate(port=meta.get("imDatPrb_port", None))
probe.annotate(slot=meta.get("imDatPrb_slot", None))
probe.set_device_channel_indices(np.arange(probe.get_contact_count()))
return probe


class CompressedBinaryIblExtractor(BaseRecording):
"""Load IBL data as an extractor object.

Expand Down Expand Up @@ -99,7 +150,7 @@ def __init__(self, folder_path=None, load_sync_channel=False, stream_name="ap",
self.set_channel_offsets(offsets)

if not load_sync_channel:
probe = probeinterface.read_spikeglx(meta_file)
probe = _read_spikeglx_probe(meta_file, meta)

if probe.shank_ids is not None:
self.set_probe(probe, in_place=True, group_mode="by_shank")
Expand Down Expand Up @@ -204,9 +255,31 @@ def extract_stream_info(meta_file, meta):
info["stream_kind"] = stream_kind
info["stream_name"] = stream_name
info["units"] = units
info["channel_names"] = [txt.split(";")[0] for txt in meta["snsChanMap"]]

chan_map_entries = _parse_spikeglx_meta_table(meta["snsChanMap"])
# First entry is the header tuple like "(384,0,1)"; drop it when present.
if chan_map_entries and "," in chan_map_entries[0] and ";" not in chan_map_entries[0]:
chan_map_entries = chan_map_entries[1:]
full_channel_names = [entry.split(";")[0] for entry in chan_map_entries]

saved_indices = _get_saved_channel_indices(meta)
if saved_indices is not None and len(full_channel_names) >= saved_indices.max() + 1:
channel_names = [full_channel_names[i] for i in saved_indices]
else:
channel_names = full_channel_names

channel_offsets = np.zeros(num_chan)

if len(channel_names) != num_chan or channel_gains.shape[0] != num_chan:
raise ValueError(
"Parsed channel metadata does not match nSavedChans: "
f"channel_names={len(channel_names)}, channel_gains={channel_gains.shape[0]}, "
f"num_chan={num_chan}"
)

info["channel_names"] = channel_names
info["channel_gains"] = channel_gains
info["channel_offsets"] = np.zeros(info["num_chan"])
info["channel_offsets"] = channel_offsets
info["has_sync_trace"] = has_sync_trace

return info
Loading