Source code for motif.contour_decoders.maximum

"""Maximum contour decoder.
"""
from motif.core import ContourDecoder


[docs]class MaxDecoder(ContourDecoder): ''' Maximum contour decoder. ''' def decode(self, ctr, Y): raise NotImplementedError @classmethod def get_id(cls): return 'maximum'
# import numpy as np # from sklearn import metrics #TODO: figure out how this should be designed... # def melody_from_clf(scores, times, freqs, index, index_mapping, prob_thresh=0.5, # penalty=0, method='viterbi'): # """ Compute output melody using classifier output. # Parameters # ---------- # contour_data : DataFrame or dict of DataFrames # DataFrame containing labeled features. # prob_thresh : float # Threshold that determines positive class # Returns # ------- # mel_output : Series # Pandas Series with time stamp as index and f0 as values # """ # contour_threshed = contour_data[contour_data['mel prob'] >= prob_thresh] # index_pos = {k: v for k, v in scores.items() if v >= prob_thresh} # index_neg = {k: v for k, v in scores.items() if v < prob_thresh} # if len(index_pos.keys) == 0: # return np.zeros(times.shape) # avg_freq = np.mean(freqs) # # # create DataFrame with all unwrapped [time, frequency, probability] values. # # mel_dat = pd.DataFrame(columns=['time', 'f0', 'probability', 'c_num']) # # mel_dat['time'] = contour_times.values.ravel() # # mel_dat['f0'] = contour_freqs.values.ravel() # # mel_dat['probability'] = contour_probs.values.ravel() # # mel_dat['c_num'] = contour_nums.values.ravel() # # sort by probability then by time # # duplicate times with have maximum probability value at the end # mel_dat.sort(columns='probability', inplace=True) # mel_dat.sort(columns='time', inplace=True) # # compute evenly spaced time grid for output # step_size = 128.0/44100.0 # contour time stamp step size # mel_time_idx = np.arange(0, np.max(times) + 1, step_size) # # find index in evenly spaced grid of estimated time values # reidx = np.searchsorted(mel_time_idx, times) # shift_idx = (np.abs(times - mel_time_idx[reidx - 1]) < \ # np.abs(times - mel_time_idx[reidx])) # reidx[shift_idx] = reidx[shift_idx] - 1 # # find duplicate time values # mel_dat['reidx'] = reidx # if method == 'max': # print "using max decoding" # mel_dat.drop_duplicates(subset='reidx', take_last=True, inplace=True) # mel_output = pd.Series(np.zeros(mel_time_idx.shape), index=mel_time_idx) # mel_output.iloc[mel_dat['reidx']] = mel_dat['f0'].values # else: # print "using viterbi decoding" # duplicates = mel_dat.duplicated(subset='reidx') | \ # mel_dat.duplicated(subset='reidx', take_last=True) # not_duplicates = mel_dat[~duplicates] # # initialize output melody # mel_output = pd.Series(np.zeros(mel_time_idx.shape), index=mel_time_idx) # # fill non-duplicate values # mel_output.iloc[not_duplicates['reidx']] = not_duplicates['f0'].values # dups = mel_dat[duplicates] # dups['groupnum'] = (dups.loc[:, 'reidx'].diff() > 1).cumsum().copy() # groups = dups.groupby('groupnum') # for _, group in groups: # states = np.unique(group['c_num']) # center_freqs = avg_freq.loc[states] # times = np.unique(group['reidx']) # posterior = group[['probability', 'c_num', 'reidx']].pivot_table( # 'probability', index='reidx', # columns='c_num', # fill_value=0.0).as_matrix() # f0_vals = group[['f0', 'c_num', 'reidx']].pivot_table( # 'f0', index='reidx', # columns='c_num', # fill_value=0.0).as_matrix() # #posterior[np.where(f0_vals < prob_thresh)] = 0 #1e-10 # # build transition matrix from log distance between center frequency # transition_matrix = np.log2(center_freqs.values)[np.newaxis, :] - \ # np.log2(center_freqs.values)[:, np.newaxis] # transition_matrix = 1 - normalize(np.abs(transition_matrix), axis=1) # transition_matrix = normalize(transition_matrix, axis=1) # path = viterbi(posterior, transition_matrix=transition_matrix, # prior=None, penalty=penalty) # mel_output.iloc[times] = f0_vals[np.arange(len(path)), path] # return mel_output # def viterbi(posterior, transition_matrix=None, prior=None, penalty=0, # scaled=True): # """Find the optimal Viterbi path through a posteriorgram. # Ported closely from Tae Min Cho's MATLAB implementation. # Parameters # ---------- # posterior: np.ndarray, shape=(num_obs, num_states) # Matrix of observations (events, time steps, etc) by the number of # states (classes, categories, etc), e.g. # posterior[t, i] = Pr(y(t) | Q(t) = i) # transition_matrix: np.ndarray, shape=(num_states, num_states) # Transition matrix for the viterbi algorithm. For clarity, each row # corresponds to the probability of transitioning to the next state, e.g. # transition_matrix[i, j] = Pr(Q(t + 1) = j | Q(t) = i) # prior: np.ndarray, default=None (uniform) # Probability distribution over the states, e.g. # prior[i] = Pr(Q(0) = i) # penalty: scalar, default=0 # Scalar penalty to down-weight off-diagonal states. # scaled : bool, default=True # Scale transition probabilities between steps in the algorithm. # Note: Hard-coded to True in TMC's implementation; it's probably a bad # idea to change this. # Returns # ------- # path: np.ndarray, shape=(num_obs,) # Optimal state indices through the posterior. # """ # # Infer dimensions. # num_obs, num_states = posterior.shape # # Define the scaling function # scaler = normalize if scaled else lambda x: x # # Normalize the posterior. # posterior = normalize(posterior, axis=1) # if transition_matrix is None: # transition_matrix = np.ones([num_states]*2) # transition_matrix = normalize(transition_matrix, axis=1) # # Apply the off-axis penalty. # offset = np.ones([num_states]*2, dtype=float) # offset -= np.eye(num_states, dtype=np.float) # penalty = offset * np.exp(penalty) + np.eye(num_states, dtype=np.float) # transition_matrix = penalty * transition_matrix # # Create a uniform prior if one isn't provided. # prior = np.ones(num_states) / float(num_states) if prior is None else prior # # Algorithm initialization # delta = np.zeros_like(posterior) # psi = np.zeros_like(posterior) # path = np.zeros(num_obs, dtype=int) # idx = 0 # delta[idx, :] = scaler(prior * posterior[idx, :]) # for idx in range(1, num_obs): # res = delta[idx - 1, :].reshape(1, num_states) * transition_matrix # delta[idx, :] = scaler(np.max(res, axis=1) * posterior[idx, :]) # psi[idx, :] = np.argmax(res, axis=1) # path[-1] = np.argmax(delta[-1, :]) # for idx in range(num_obs - 2, -1, -1): # path[idx] = psi[idx + 1, path[idx + 1]] # return path # def normalize(x, axis=None): # """Normalize the values of an ndarray to sum to 1 along the given axis. # Parameters # ---------- # x : np.ndarray # Input multidimensional array to normalize. # axis : int, default=None # Axis to normalize along, otherwise performed over the full array. # Returns # ------- # z : np.ndarray, shape=x.shape # Normalized array. # """ # if not axis is None: # shape = list(x.shape) # shape[axis] = 1 # scalar = x.astype(float).sum(axis=axis).reshape(shape) # scalar[scalar == 0] = 1.0 # else: # scalar = x.sum() # scalar = 1 if scalar == 0 else scalar # return x / scalar # def get_best_threshold(y_ref, y_pred_score, plot=True): # """ Get threshold on scores that maximizes f1 score. # Parameters # ---------- # y_ref : array # Reference labels (binary). # y_pred_score : array # Predicted scores. # plot : bool # If true, plot ROC curve # Returns # ------- # best_threshold : float # threshold on score that maximized f1 score # max_fscore : float # f1 score achieved at best_threshold # """ # pos_weight = 1.0 - float(len(y_ref[y_ref == 1]))/float(len(y_ref)) # neg_weight = 1.0 - float(len(y_ref[y_ref == 0]))/float(len(y_ref)) # sample_weight = np.zeros(y_ref.shape) # sample_weight[y_ref == 1] = pos_weight # sample_weight[y_ref == 0] = neg_weight # print "max prediction value = %s" % np.max(y_pred_score) # print "min prediction value = %s" % np.min(y_pred_score) # precision, recall, thresholds = \ # metrics.precision_recall_curve(y_ref, y_pred_score, pos_label=1, # sample_weight=sample_weight) # beta = 1.0 # btasq = beta**2.0 # fbeta_scores = (1.0 + btasq)*(precision*recall)/((btasq*precision)+recall) # max_fscore = fbeta_scores[np.nanargmax(fbeta_scores)] # best_threshold = thresholds[np.nanargmax(fbeta_scores)]