Skip to content

utils.py

generate_nwb_uuid(nwb_file_name, initial, len_uuid=6)

Generates a unique identifier related to an NWB file.

Parameters:

Name Type Description Default
nwb_file_name str

description

required
initial str

R if recording; A if artifact; S if sorting etc

required
len_uuid int

how many digits of uuid4 to keep

6
Source code in src/spyglass/spikesorting/v1/utils.py
def generate_nwb_uuid(nwb_file_name: str, initial: str, len_uuid: int = 6):
    """Generates a unique identifier related to an NWB file.

    Parameters
    ----------
    nwb_file_name : str
        _description_
    initial : str
        R if recording; A if artifact; S if sorting etc
    len_uuid : int
        how many digits of uuid4 to keep
    """
    uuid4 = str(uuid.uuid4())
    nwb_uuid = nwb_file_name + "_" + initial + "_" + uuid4[:len_uuid]
    return nwb_uuid

get_spiking_sorting_v1_merge_ids(restriction)

Parses the SpikingSorting V1 pipeline to get a list of merge ids for a given restriction.

Parameters:

Name Type Description Default
restriction dict

A dictionary containing some or all of the following key-value pairs: nwb_file_name : str name of the nwb file interval_list_name : str name of the interval list sort_group_name : str name of the sort group artifact_param_name : str name of the artifact parameter curation_id : int, optional id of the curation (if not specified, uses the latest curation)

required

Returns:

Name Type Description
merge_id_list list

list of merge ids for the given restriction

Source code in src/spyglass/spikesorting/v1/utils.py
def get_spiking_sorting_v1_merge_ids(restriction: dict):
    """
    Parses the SpikingSorting V1 pipeline to get a list of merge ids for a given restriction.

    Parameters
    ----------
    restriction : dict
        A dictionary containing some or all of the following key-value pairs:
        nwb_file_name : str
            name of the nwb file
        interval_list_name : str
            name of the interval list
        sort_group_name : str
            name of the sort group
        artifact_param_name : str
            name of the artifact parameter
        curation_id : int, optional
            id of the curation (if not specified, uses the latest curation)
    Returns
    -------
    merge_id_list : list
        list of merge ids for the given restriction
    """
    # list of recording ids
    recording_id_list = (SpikeSortingRecordingSelection() & restriction).fetch(
        "recording_id"
    )
    # list of artifact ids for each recording
    artifact_id_list = [
        (
            ArtifactDetectionSelection() & restriction & {"recording_id": id}
        ).fetch1("artifact_id")
        for id in recording_id_list
    ]
    # list of sorting ids for each recording
    sorting_restriction = restriction.copy()
    del sorting_restriction["interval_list_name"]
    sorting_id_list = []
    for r_id, a_id in zip(recording_id_list, artifact_id_list):
        # if sorted with artifact detection
        if (
            SpikeSortingSelection()
            & sorting_restriction
            & {"recording_id": r_id, "interval_list_name": a_id}
        ):
            sorting_id_list.append(
                (
                    SpikeSortingSelection()
                    & sorting_restriction
                    & {"recording_id": r_id, "interval_list_name": a_id}
                ).fetch1("sorting_id")
            )
        # if sorted without artifact detection
        else:
            sorting_id_list.append(
                (
                    SpikeSortingSelection()
                    & sorting_restriction
                    & {"recording_id": r_id, "interval_list_name": r_id}
                ).fetch1("sorting_id")
            )
    # if curation_id is specified, use that id for each sorting_id
    if "curation_id" in restriction:
        curation_id = [restriction["curation_id"] for _ in sorting_id_list]
    # if curation_id is not specified, use the latest curation_id for each sorting_id
    else:
        curation_id = [
            np.max((CurationV1 & {"sorting_id": id}).fetch("curation_id"))
            for id in sorting_id_list
        ]
    # list of merge ids for the desired curation(s)
    merge_id_list = [
        (
            SpikeSortingOutput.CurationV1()
            & {"sorting_id": id, "curation_id": c_id}
        ).fetch1("merge_id")
        for id, c_id in zip(sorting_id_list, curation_id)
    ]
    return merge_id_list