@schema
class DLCCentroid(SpyglassMixin, dj.Computed):
"""
Table to calculate the centroid of a group of bodyparts
"""
definition = """
-> DLCCentroidSelection
---
-> AnalysisNwbfile
dlc_position_object_id : varchar(80)
dlc_velocity_object_id : varchar(80)
"""
log_path = None
def make(self, key):
output_dir = infer_output_dir(key=key, makedir=False)
self.log_path = Path(output_dir, "log.log")
self._logged_make(key)
logger.info("inserted entry into DLCCentroid")
def _fetch_pos_df(self, key, bodyparts_to_use):
return pd.concat(
{
bodypart: (
DLCSmoothInterpCohort.BodyPart
& {**key, **{"bodypart": bodypart}}
).fetch1_dataframe()
for bodypart in bodyparts_to_use
},
axis=1,
)
def _available_bodyparts(self, key):
return (DLCSmoothInterpCohort.BodyPart & key).fetch("bodypart")
@file_log(logger)
def _logged_make(self, key):
METERS_PER_CM = 0.01
idx = pd.IndexSlice
logger.info("Centroid Calculation")
# Get labels to smooth from Parameters table
params = (DLCCentroidParams() & key).fetch1("params")
points = params.get("points")
centroid_method = params.get("centroid_method")
required_points = _key_to_points.get(centroid_method)
for point in required_points:
if points[point] not in self._available_bodyparts(key):
raise ValueError(
"Bodypart in points not in model."
f"\tBodypart {points[point]}"
f"\tIn Model {self._available_bodyparts(key)}"
)
bodyparts_to_use = [points[point] for point in required_points]
pos_df = self._fetch_pos_df(key=key, bodyparts_to_use=bodyparts_to_use)
logger.info("Calculating centroid") # now done using number of points
centroid = Centroid(
pos_df=pos_df,
points=params.get("points"),
max_LED_separation=params.get("max_LED_separation"),
).centroid
centroid_df = pd.DataFrame(
centroid,
columns=["x", "y"],
index=pos_df.index.to_numpy(),
)
if params.get("interpolate"):
if np.any(np.isnan(centroid)):
logger.info("interpolating over NaNs")
nan_inds = (
pd.isnull(centroid_df.loc[:, idx[("x", "y")]])
.any(axis=1)
.to_numpy()
.nonzero()[0]
)
nan_spans = get_span_start_stop(nan_inds)
interp_df = interp_pos(
centroid_df.copy(), nan_spans, **params["interp_params"]
)
else:
interp_df = centroid_df.copy()
else:
interp_df = centroid_df.copy()
sampling_rate = 1 / np.median(np.diff(pos_df.index.to_numpy()))
if params.get("smooth"):
smooth_params = params["smoothing_params"]
dt = np.median(np.diff(pos_df.index.to_numpy()))
sampling_rate = 1 / dt
smooth_func = _key_to_smooth_func_dict[
smooth_params["smooth_method"]
]
logger.info(
f"Smoothing using method: {smooth_func.__name__}",
)
final_df = smooth_func(
interp_df, sampling_rate=sampling_rate, **smooth_params
)
else:
final_df = interp_df.copy()
logger.info("getting velocity")
velocity = get_velocity(
final_df.loc[:, idx[("x", "y")]].to_numpy(),
time=pos_df.index.to_numpy(),
sigma=params.pop("speed_smoothing_std_dev"),
sampling_frequency=sampling_rate,
)
speed = np.sqrt(np.sum(velocity**2, axis=1)) # cm/s
velocity_df = pd.DataFrame(
np.concatenate((velocity, speed[:, np.newaxis]), axis=1),
columns=["velocity_x", "velocity_y", "speed"],
index=pos_df.index.to_numpy(),
)
total_nan = np.sum(final_df.loc[:, idx[("x", "y")]].isna().any(axis=1))
logger.info(f"total NaNs in centroid dataset: {total_nan}")
spatial_series = (RawPosition() & key).fetch_nwb()[0]["raw_position"]
position = pynwb.behavior.Position()
velocity = pynwb.behavior.BehavioralTimeSeries()
common_attrs = {
"conversion": METERS_PER_CM,
"comments": spatial_series.comments,
}
position.create_spatial_series(
name="position",
timestamps=final_df.index.to_numpy(),
data=final_df.loc[:, idx[("x", "y")]].to_numpy(),
reference_frame=spatial_series.reference_frame,
description="x_position, y_position",
**common_attrs,
)
velocity.create_timeseries(
name="velocity",
timestamps=velocity_df.index.to_numpy(),
unit="m/s",
data=velocity_df.loc[
:, idx[("velocity_x", "velocity_y", "speed")]
].to_numpy(),
description="x_velocity, y_velocity, speed",
**common_attrs,
)
velocity.create_timeseries(
name="video_frame_ind",
unit="index",
timestamps=final_df.index.to_numpy(),
data=pos_df[pos_df.columns.levels[0][0]].video_frame_ind.to_numpy(),
description="video_frame_ind",
comments="no comments",
)
# Add to Analysis NWB file
analysis_file_name = AnalysisNwbfile().create(key["nwb_file_name"])
nwb_analysis_file = AnalysisNwbfile()
nwb_analysis_file.add(
nwb_file_name=key["nwb_file_name"],
analysis_file_name=analysis_file_name,
)
self.insert1(
{
**key,
"analysis_file_name": analysis_file_name,
"dlc_position_object_id": nwb_analysis_file.add_nwb_object(
analysis_file_name, position
),
"dlc_velocity_object_id": nwb_analysis_file.add_nwb_object(
analysis_file_name, velocity
),
}
)
def fetch1_dataframe(self):
nwb_data = self.fetch_nwb()[0]
index = pd.Index(
np.asarray(
nwb_data["dlc_position"].get_spatial_series().timestamps
),
name="time",
)
COLUMNS = [
"video_frame_ind",
"position_x",
"position_y",
"velocity_x",
"velocity_y",
"speed",
]
return pd.DataFrame(
np.concatenate(
(
np.asarray(
nwb_data["dlc_velocity"]
.time_series["video_frame_ind"]
.data,
dtype=int,
)[:, np.newaxis],
np.asarray(
nwb_data["dlc_position"].get_spatial_series().data
),
np.asarray(
nwb_data["dlc_velocity"].time_series["velocity"].data
),
),
axis=1,
),
columns=COLUMNS,
index=index,
)