Source code for pylleo.lleocal

[docs]def get_cal_data(data_df, cal_dict, param): """Get data along specified axis during calibration intervals Args ---- data_df: pandas.DataFrame Pandas dataframe with lleo data cal_dict: dict Calibration dictionary Returns ------- lower: pandas dataframe slice of lleo datafram containing points at -1g calibration position upper: pandas dataframe slice of lleo datafram containing points at -1g calibration position See also -------- lleoio.read_data: creates pandas dataframe `data_df` read_cal: creates `cal_dict` and describes fields """ param = param.lower().replace(" ", "_").replace("-", "_") idx_lower_start = cal_dict["parameters"][param]["lower"]["start"] idx_lower_end = cal_dict["parameters"][param]["lower"]["end"] idx_upper_start = cal_dict["parameters"][param]["upper"]["start"] idx_upper_end = cal_dict["parameters"][param]["upper"]["end"] idx_lower = (data_df.index >= idx_lower_start) & (data_df.index <= idx_lower_end) idx_upper = (data_df.index >= idx_upper_start) & (data_df.index <= idx_upper_end) return data_df[param][idx_lower], data_df[param][idx_upper]
[docs]def read_cal(cal_yaml_path): """Load calibration file if exists, else create Args ---- cal_yaml_path: str Path to calibration YAML file Returns ------- cal_dict: dict Key value pairs of calibration meta data """ from collections import OrderedDict import datetime import os import yamlord from . import utils def __create_cal(cal_yaml_path): cal_dict = OrderedDict() # Add experiment name for calibration reference base_path, _ = os.path.split(cal_yaml_path) _, exp_name = os.path.split(base_path) cal_dict["experiment"] = exp_name return cal_dict # Try reading cal file, else create if os.path.isfile(cal_yaml_path): cal_dict = yamlord.read_yaml(cal_yaml_path) else: cal_dict = __create_cal(cal_yaml_path) cal_dict["parameters"] = OrderedDict() for key, val in utils.parse_experiment_params(cal_dict["experiment"]).items(): cal_dict[key] = val fmt = "%Y-%m-%d %H:%M:%S" cal_dict["date_modified"] = datetime.datetime.now().strftime(fmt) return cal_dict
[docs]def update(data_df, cal_dict, param, bound, start, end): """Update calibration times for give parameter and boundary""" from collections import OrderedDict if param not in cal_dict["parameters"]: cal_dict["parameters"][param] = OrderedDict() if bound not in cal_dict["parameters"][param]: cal_dict["parameters"][param][bound] = OrderedDict() cal_dict["parameters"][param][bound]["start"] = start cal_dict["parameters"][param][bound]["end"] = end return cal_dict
[docs]def fit1d(lower, upper): """Fit acceleration data at lower and upper boundaries of gravity Args ---- lower: pandas dataframe slice of lleo datafram containing points at -1g calibration position upper: pandas dataframe slice of lleo datafram containing points at -1g calibration position Returns ------- p: ndarray Polynomial coefficients, highest power first. If y was 2-D, the coefficients for k-th data set are in p[:,k]. From `numpy.polyfit()`. NOTE ---- This method should be compared agaist alternate linalg method, which allows for 2d for 2d poly, see - http://stackoverflow.com/a/33966967/943773 A = numpy.vstack(lower, upper).transpose() y = A[:,1] m, c = numpy.linalg.lstsq(A, y)[0] """ import numpy # Get smallest size as index position for slicing idx = min(len(lower), len(upper)) # Stack accelerometer count values for upper and lower bounds of curve x = numpy.hstack((lower[:idx].values, upper[:idx].values)) x = x.astype(float) # Make corresponding y array where all lower bound points equal -g # and all upper bound points equal +g y = numpy.zeros(len(x), dtype=float) y[:idx] = -1.0 # negative gravity y[idx:] = 1.0 # positive gravity return numpy.polyfit(x, y, deg=1)
def calibrate_acc(data_df, cal_dict): def apply_poly(data_df, cal_dict, param): """Apply poly fit to data array""" import numpy poly = cal_dict["parameters"][param]["poly"] a = numpy.polyval(poly, data_df[param]) return a.astype(float) # Apply calibration and add as a new column to the dataframe for ax in ["x", "y", "z"]: col = "A{}_g".format(ax) col_cal = "acceleration_{}".format(ax) data_df[col] = apply_poly(data_df, cal_dict, col_cal) return data_df def create_speed_csv(cal_fname, data): import numpy import pandas from pylleo import utils # Get a mask of values which contain a sample, assuming the propeller was # not sampled at as high of a frequency as the accelerometer notnan = ~numpy.isnan(data["propeller"]) # Read speed, start, and end times from csv cal = pandas.read_csv(cal_fname) # For each calibration in `speed_calibrations.csv` for i in range(len(cal)): start = cal.loc[i, "start"] end = cal.loc[i, "end"] dt0 = utils.nearest(data["datetimes"][notnan], start) dt1 = utils.nearest(data["datetimes"][notnan], end) cal_mask = (data["datetimes"] >= dt0) & (data["datetimes"] <= dt1) count_avg = data["propeller"][cal_mask].mean() cal.loc[i, "count_average"] = count_avg cal.to_csv(cal_fname) return cal def calibrate_propeller(data_df, cal_fname, plot=False): def speed_calibration_average(cal_fname, plot): """Cacluate the coefficients for the mean fit of calibrations Notes ----- `cal_fname` should contain three columns: date,est_speed,count_average 2014-04-18,2.012,30 """ import matplotlib.pyplot as plt import numpy import pandas # Read calibration data calibs = pandas.read_csv(cal_fname) calibs["date"] = pandas.to_datetime(calibs["date"]) # Get unique dates to process fits for udates = numpy.unique(calibs["date"]) # Create x data for samples and output array for y n_samples = 1000 x = numpy.arange(n_samples) fits = numpy.zeros((len(udates), n_samples), dtype=float) # Calculate fit coefficients then store `n_samples number of samples # Force intercept through zero (i.e. zero counts = zero speed) # http://stackoverflow.com/a/9994484/943773 for i in range(len(udates)): cal = calibs[calibs["date"] == udates[i]] xi = cal["count_average"].values[:, numpy.newaxis] yi = cal["est_speed"].values m, _, _, _ = numpy.linalg.lstsq(xi, yi) fits[i, :] = m * x # Add fit to plot if switch on if plot: plt.plot(x, fits[i, :], label="cal{}".format(i)) # Calculate average of calibration samples y_avg = numpy.mean(fits, axis=0) # Add average fit to plot and show if switch on if plot: plt.plot(x, y_avg, label="avg") plt.legend() plt.show() # Calculate fit coefficients for average samples x_avg = x[:, numpy.newaxis] m_avg, _, _, _ = numpy.linalg.lstsq(x_avg, y_avg) return m_avg m_avg = speed_calibration_average(cal_fname, plot=plot) data_df["speed"] = m_avg * data_df["propeller"] return data_df