Source code for ava.segmenting.template_segmentation

"""
Segment song motifs by finding maxima in spectrogram cross correlations.

"""
__date__ = "April 2019 - November 2020"


from affinewarp import ShiftWarping
import h5py
from itertools import repeat
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
plt.switch_backend('agg')
try: # Numba >= 0.52
	from numba.core.errors import NumbaPerformanceWarning
except ModuleNotFoundError:
	try: # Numba <= 0.45
		from numba.errors import NumbaPerformanceWarning
	except (NameError, ModuleNotFoundError):
		pass
import numpy as np
from scipy.io import wavfile
from scipy.io.wavfile import WavFileWarning
from scipy.signal import stft
from scipy.ndimage.filters import gaussian_filter
import os
import umap
import warnings

from ava.plotting.tooltip_plot import tooltip_plot


EPSILON = 1e-9



[docs]def get_template(feature_dir, p, smoothing_kernel=(0.5, 0.5), verbose=True): """ Create a linear feature template given exemplar spectrograms. Parameters ---------- feature_dir : str Directory containing multiple audio files to average together. p : dict Parameters. Must contain keys: ``'fs'``, ``'min_freq'``, ``'max_freq'``, ``'nperseg'``, ``'noverlap'``, ``'spec_min_val'``, ``'spec_max_val'``. smoothing_kernel : tuple of floats, optional Each spectrogram is blurred using a gaussian kernel with the following bandwidths, in bins. Defaults to ``(0.5, 0.5)``. verbose : bool, optional Defaults to ``True``. Returns ------- template : np.ndarray Spectrogram template. """ filenames = [os.path.join(feature_dir, i) for i in os.listdir(feature_dir) \ if _is_wav_file(i)] specs = [] for i, filename in enumerate(filenames): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) fs, audio = wavfile.read(filename) assert fs == p['fs'], "Found samplerate="+str(fs)+\ ", expected "+str(p['fs']) spec, dt = _get_spec(fs, audio, p) spec = gaussian_filter(spec, smoothing_kernel) specs.append(spec) min_time_bins = min(spec.shape[1] for spec in specs) specs = np.array([i[:,:min_time_bins] for i in specs]) # Average over all the templates. template = np.mean(specs, axis=0) # Normalize to unit norm. template -= np.mean(template) template /= np.sum(np.power(template, 2)) + EPSILON if verbose: duration = min_time_bins * dt print("Made template from", len(filenames), "files. Duration:", duration) return template
[docs]def segment_files(audio_dirs, segment_dirs, template, p, num_mad=2.0, \ min_dt=0.05, n_jobs=1, verbose=True): """ Write segments to text files. Parameters ---------- audio_dirs : list of str Audio directories. segment_dirs : list of str Corresponding directories containing segmenting decisions. template : numpy.ndarray Spectrogram template. p : dict Parameters. Must contain keys: ``'fs'``, ``'min_freq'``, ``'max_freq'``, ``'nperseg'``, ``'noverlap'``, ``'spec_min_val'``, ``'spec_max_val'``. num_mad : float, optional Number of median absolute deviations for cross-correlation threshold. Defaults to ``2.0``. min_dt : float, optional Minimum duration between cross correlation maxima. Defaults to ``0.05``. n_jobs : int, optional Number of jobs for parallelization. Defaults to ``1``. verbose : bool, optional Defaults to ``True``. Returns ------- result : dict Maps audio filenames to segments (numpy.ndarrays). """ # Collect all the filenames we need to parallelize. all_audio_fns = [] all_seg_dirs = [] for audio_dir, segment_dir in zip(audio_dirs, segment_dirs): if not os.path.exists(segment_dir): os.makedirs(segment_dir) audio_fns = [os.path.join(audio_dir, i) for i in os.listdir(audio_dir) \ if _is_wav_file(i)] all_audio_fns = all_audio_fns + audio_fns all_seg_dirs = all_seg_dirs + [segment_dir]*len(audio_fns) # Segment. if verbose: print("Segmenting files. n =",len(all_audio_fns)) gen = zip(all_seg_dirs, all_audio_fns, repeat(template), repeat(p), \ repeat(num_mad), repeat(min_dt)) res = Parallel(n_jobs=n_jobs)(delayed(_segment_file)(*args) for args in gen) # Write results. result = {} num_segments = 0 for segment_dir, audio_fn, segments in res: result[audio_fn] = segments segment_fn = os.path.split(audio_fn)[-1][:-4] + '.txt' segment_fn = os.path.join(segment_dir, segment_fn) np.savetxt(segment_fn, segments, fmt='%.5f') num_segments += len(segments) if verbose: print("\tFound", num_segments, "segments.") print("\tDone.") # Return a dictionary mapping audio filenames to segments. return result
[docs]def read_segment_decisions(audio_dirs, segment_dirs, verbose=True): """ Returns the same data as ``segment_files``. Parameters ---------- audio_dirs : list of str Audio directories. segment_dirs : list of str Segment directories. verbose : bool, optional Defaults to ``True``. Returns ------- result : dict Maps audio filenames to segments. """ if verbose: print("Reading segments...") result = {} n_segs = 0 for audio_dir, segment_dir in zip(audio_dirs, segment_dirs): audio_fns = [os.path.join(audio_dir, i) for i in os.listdir(audio_dir) \ if _is_wav_file(i)] for audio_fn in audio_fns: segment_fn = os.path.split(audio_fn)[-1][:-4] + '.txt' segment_fn = os.path.join(segment_dir, segment_fn) segments = np.loadtxt(segment_fn).reshape(-1,2) result[audio_fn] = segments n_segs += len(segments) if verbose: print("\tFound", n_segs, "segments.") print("\tDone.") return result
def _segment_file(segment_dir, filename, template, p, num_mad=2.0, min_dt=0.05,\ min_extra_time_bins=5): """ Match linear spetrogram features and extract times where features align. Parameters ---------- segment_dir : str Segment directory. filename : str Audio filename. template : numpy.ndarray Spectrogram template. p : dict Parameters. Must contain keys: ``'fs'``, ``'min_freq'``, ``'max_freq'``, ``'nperseg'``, ``'noverlap'``, ``'spec_min_val'``, ``'spec_max_val'``. num_mad : float, optional Number of median absolute deviations for cross-correlation threshold. Defaults to ``2.0``. min_dt : float, optional ... min_extra_time_bins : int, optional ... Returns ------- segment_dir : str Copied from input parameters. filename : str Copied from input parameters. segments : numpy.ndarray Onsets and offsets. """ with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) fs, audio = wavfile.read(filename) assert fs == p['fs'], "Found samplerate="+str(fs)+", expected "+str(p['fs']) if len(audio) < p['nperseg']: warnings.warn( "Found an audio file that is too short to make a spectrogram: "+\ filename + "\nSamples: "+str(len(audio))+"\np[\'nperseg\']: "+\ str(p['nperseg']), UserWarning ) return segment_dir, filename, np.zeros((0, 2)) big_spec, dt = _get_spec(fs, audio, p) spec_len = template.shape[1] template = template.flatten() if big_spec.shape[1] - spec_len < min_extra_time_bins: d1, d2 = dt*spec_len, dt*big_spec.shape[1] warnings.warn( "Found an audio file that is too short to extract segments from: "+\ filename + "\nTemplate duration: "+str(d1)+"\nFile duration: "+\ str(d2)+"\nConsider reducing the template duration.", UserWarning ) return segment_dir, filename, np.zeros((0, 2)) # Compute normalized cross-correlation. result = np.zeros(big_spec.shape[1] - spec_len) for i in range(len(result)): temp = big_spec[:,i:i+spec_len].flatten() temp -= np.mean(temp) temp /= np.sum(np.power(temp, 2)) + EPSILON result[i] = np.dot(template, temp) median = np.median(result) abs_devs = np.abs(result - median) mad = np.median(abs_devs) + EPSILON # Get maxima. times = dt * np.arange(len(result)) indices = np.argwhere(result>median + num_mad*mad).flatten()[1:-1] max_indices = [] for i in range(2,len(indices)-1): if max(result[indices[i]-1], result[indices[i]+1]) < result[indices[i]]: max_indices.append(indices[i]) max_indices = np.array(max_indices, dtype='int') max_indices = _clean_max_indices(max_indices, times, result, min_dt=min_dt) # Define onsets/offsets. segments = np.zeros((len(max_indices), 2)) segments[:,0] = dt * max_indices # onsets segments[:,1] = segments[:,0] + spec_len * dt return segment_dir, filename, segments
[docs]def clean_collected_data(result, audio_dirs, segment_dirs, p, \ max_num_specs=10000, verbose=True, img_fn='temp.pdf', \ tooltip_plot_dir='html'): """Deprecated. See ``clean_collected_segments``.""" warnings.warn( "ava.segmenting.template_segmentation.clean_collected_data has been" + \ " renamed to clean_collected_segments in v0.3.0.", UserWarning ) clean_collected_segments(result, audio_dirs, segment_dirs, p, \ max_num_specs=max_num_specs, verbose=verbose, img_fn=img_fn, \ tooltip_plot_dir=tooltip_plot_dir)
[docs]def clean_collected_segments(result, audio_dirs, segment_dirs, p, \ max_num_specs=10000, verbose=True, img_fn='temp.pdf', \ tooltip_plot_dir='html'): """ Take a look at the collected segments and discard false positives. Parameters ---------- result : dict Output of ``segment_files`` or `read_segment_decisions``. audio_dirs : list of str Directories containing audio. segment_dirs : list of str Directories containing segmenting decisions. p : dict Parameters. Must contain keys: ``'fs'``, ``'min_freq'``, ``'max_freq'``, ``'nperseg'``, ``'noverlap'``, ``'spec_min_val'``, ``'spec_max_val'``. max_num_specs : int, optional Maximum number of spectrograms to feed to UMAP. Deafults to ``10000``. verbose : bool, optional Defaults to ``True``. img_fn : str, optional Image filename. Defaults to ``'temp.pdf'``. tooltip_plot_dir : str, optional Directory to save tooltip plot to. Defaults to ``'html'``. """ # Collect spectrograms. if verbose: print("Collecting spectrograms...") specs = [] for filename in result.keys(): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) fs, audio = wavfile.read(filename) assert fs == p['fs'], "Found samplerate=" + str(fs) + \ ", expected " + str(p['fs']) for segment in result[filename]: i1 = int(round(segment[0] * fs)) i2 = int(round(segment[1] * fs)) spec, dt = _get_spec(fs, audio[i1:i2], p) specs.append(spec) if len(specs) == 0: warnings.warn( "Found no spectrograms in " + \ "ava.segmenting.template_segmentation.clean_collected_segments.\n" \ + "Consider reducing the `num_mad` parameter in `segment_files`.", UserWarning ) return max_t = max(spec.shape[1] for spec in specs) temp_specs = np.zeros((len(specs), specs[0].shape[0], max_t)) for i, spec in enumerate(specs): temp_specs[i,:,:spec.shape[1]] = spec specs = temp_specs if len(specs) > max_num_specs: warnings.warn( "Found more spectrograms than `max_num_specs` (" + \ str(max_num_specs) + "). Consider increasing `max_num_specs` or" + \ " `num_mad`.", UserWarning ) if verbose: print("\tCollected",len(specs),"spectrograms.") print("\tSpectrogram shape:", specs.shape[1:]) if len(specs) > max_num_specs: print("\tRandomly sampling", max_num_specs, "spectrograms.") print("\tDone.") np.random.seed(42) specs = specs[np.random.permutation(len(specs))[:max_num_specs]] np.random.seed(None) # UMAP the spectrograms. if verbose: print("Running UMAP. n =", len(specs)) transform = umap.UMAP(random_state=42, metric='correlation') # https://github.com/lmcinnes/umap/issues/252 with warnings.catch_warnings(): try: warnings.filterwarnings("ignore", \ category=NumbaPerformanceWarning) except NameError: pass embedding = transform.fit_transform(specs.reshape(len(specs), -1)) if verbose: print("\tDone.") # Plot and ask for user input. bounds = { 'x1s':[], 'x2s':[], 'y1s':[], 'y2s':[], } bounds_keys = ['x1s', 'x2s', 'y1s', 'y2s'] queries = ['x1: ', 'x2: ', 'y1: ', 'y2: '] X, Y = embedding[:,0], embedding[:,1] i = 0 while True: colors = ['b' if _in_region(embed, bounds) else 'r' for \ embed in embedding] print("Selected", \ len([c for c in colors if c=='b']), "out of", len(colors)) plt.scatter(X, Y, c=colors, s=0.9, alpha=0.5) for x_tick in np.arange(np.floor(np.min(X)), np.ceil(np.max(X))): plt.axvline(x=x_tick, c='k', alpha=0.1, lw=0.5) for y_tick in np.arange(np.floor(np.min(Y)), np.ceil(np.max(Y))): plt.axhline(y=y_tick, c='k', alpha=0.1, lw=0.5) title = "Find relevant song" plt.title(title) plt.savefig(img_fn) plt.close('all') # Plot the tooltip plot. if i == 0: if verbose: print("Writing tooltip plot...") tooltip_plot(embedding, specs, output_dir=tooltip_plot_dir, \ num_imgs=1000, title=title, grid=True) if verbose: print("\tDone.") # Get input from user. for key, query in zip(bounds_keys, queries): answer = 'initial input' while not _is_number(answer): answer = input(query) bounds[key].append(float(answer)) # Continue? temp = input('[Enter] to select more regions, [c] to continue: ') if temp == 'c': break i += 1 # Save only the good segments. if verbose: print("Saving segments...") num_deleted, num_total = 0, 0 for audio_dir, seg_dir in zip(audio_dirs, segment_dirs): audio_fns = [os.path.join(audio_dir, i) for i in os.listdir(audio_dir) \ if _is_wav_file(i)] for audio_fn in audio_fns: with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) fs, audio = wavfile.read(audio_fn) assert fs == p['fs'], "Found samplerate=" + str(fs) + \ ", expected " + str(p['fs']) segment_fn = os.path.split(audio_fn)[-1][:-4] + '.txt' segment_fn = os.path.join(seg_dir, segment_fn) segments = np.loadtxt(segment_fn).reshape(-1,2) if len(segments) == 0: continue new_segments = np.zeros(segments.shape) i = 0 specs = [] for segment in segments: i1 = int(round(segment[0] * fs)) i2 = int(round(segment[1] * fs)) spec, dt = _get_spec(fs, audio[i1:i2], p) temp_spec = np.zeros((spec.shape[0], max_t)) temp_spec[:, :spec.shape[1]] = spec spec = temp_spec specs.append(spec) specs = np.stack(specs) embed = transform.transform(specs.reshape(specs.shape[0],-1)) for j, segment in enumerate(segments): if _in_region(embed[j], bounds): new_segments[i] = segment[:] i += 1 num_total += 1 else: num_deleted += 1 new_segments = new_segments[:i] np.savetxt(segment_fn, new_segments, fmt='%.5f') if verbose: print("\tdeleted:", num_deleted, "remaining:", num_total) print("\tDone.")
[docs]def segment_sylls_from_songs(audio_dirs, song_seg_dirs, syll_seg_dirs, p, \ shoulder=0.05, img_fn='temp.pdf', verbose=True): """ Split song renditions into syllables, write segments. Enter quantiles to determine where to split the song motif. Entering the same quantile twice will remove it. Note ---- * All the song segments must be the same duration! Parameters ---------- audio_dirs : list of str Audio directories. song_seg_dirs : list of str Directories containing song segments. syll_seg_dirs : list of str Directories where syllable segments are written. p : dict Segmenting parameters. shoulder : float, optional Duration of padding on either side of song segments, in seconds. img_fn : str, optional Image filename. Defaults to ``'temp.pdf'``. verbose : bool, optional Defaults to `True`. """ # Read segments. song_segs = read_segment_decisions(audio_dirs, song_seg_dirs) # Collect spectrograms. empty_audio_files = [] specs, fns, song_onsets = [], [], [] for audio_fn in song_segs: with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) fs, audio = wavfile.read(audio_fn) for seg in song_segs[audio_fn].reshape(-1,2): # Make spectrogram. onset, offset = seg[0] - shoulder, seg[1] + shoulder i1, i2 = int(fs*onset), int(fs*offset) spec, dt = _get_spec(fs, audio[max(i1,0):i2], p) # Pad spectrogram if it's near the edge of the file. if i1 < 0 or i2 > len(audio): pre_bins = max(0, int(np.round(-i1/fs/dt))) post_bins = max(0, int(np.round((i2 - len(audio))/fs/dt))) new_spec = np.mean(spec) * \ np.ones((spec.shape[0], spec.shape[1]+pre_bins+post_bins)) if post_bins == 0: post_bins = -new_spec.shape[1] new_spec[pre_bins:-post_bins] spec = new_spec specs.append(spec) fns.append(audio_fn) song_onsets.append(onset) if len(song_segs[audio_fn]) == 0: empty_audio_files.append(audio_fn) assert len(specs) > 0, "Found no spectrograms!" # Calculate and smooth amplitude traces. amp_traces = [] for spec in specs: amps = np.sum(spec, axis=0) amps -= np.mean(amps) amps /= np.std(amps) + EPSILON amp_traces.append(amps) # Truncate the amplitude traces if they aren't exactly the same length. min_time_bins = min(len(amp_trace) for amp_trace in amp_traces) max_time_bins = max(len(amp_trace) for amp_trace in amp_traces) if verbose and (min_time_bins != max_time_bins): print("Found different numbers of time bins in segments!") print("\tmin:" + str(min_time_bins) + ", max:", max_time_bins) print("\tTruncating to minimum number of time bins.") if min_time_bins != max_time_bins: amp_traces = [amp_trace[:min_time_bins] for amp_trace in amp_traces] amp_traces = np.array(amp_traces) # Warp the amplitude traces. max_t = amp_traces.shape[1]*dt*1e3 num_time_bins = amp_traces.shape[1] model = ShiftWarping(maxlag=0.2, smoothness_reg_scale=10.0) model.fit(amp_traces[:,:,np.newaxis], iterations=50) aligned = model.predict().squeeze() max_raw_val = np.max(amp_traces) max_aligned_val = np.max(aligned) shifts = model.shifts quantiles = [] break_flag = False while True: # Plot. _, axarr = plt.subplots(3,1, sharex=True) axarr[0].imshow(specs[np.random.randint(len(specs))], origin='lower', \ aspect='auto', extent=[0,max_t,p['min_freq']/1e3, \ p['max_freq']/1e3]) temp = np.copy(amp_traces) for q in quantiles: for i in range(len(temp)): try: temp[i,int(round(q*num_time_bins))+shifts[i]] = max_raw_val except IndexError: pass axarr[1].imshow(temp, origin='lower', aspect='auto', \ extent=[0,max_t,0,len(amp_traces)]) temp = np.copy(aligned) for q in quantiles: for i in range(len(temp)): temp[i,int(round(q*num_time_bins))] = max_aligned_val axarr[2].imshow(temp, origin='lower', aspect='auto', \ extent=[0,max_t,0,len(amp_traces)]) axarr[0].set_ylabel("Frequency (kHz)") axarr[1].set_ylabel('Amplitude') axarr[2].set_ylabel('Shifted') axarr[0].set_title('Enter segmenting quantiles:') axarr[2].set_xlabel('Time (ms)') plt.savefig(img_fn) plt.close('all') # Ask for segmenting decisions. while True: temp = input("Add or delete quantile or [s]top: ") if temp == 's': break_flag = True break try: temp = float(temp) assert 0.0 < temp and temp < 1.0 if temp in quantiles: quantiles.remove(temp) else: quantiles.append(temp) break except: print("Invalid input!") print("Must be \'s\' or a float between 0 and 1.") continue if break_flag: break # Write syllable segments. if verbose: print("Writing syllable segments...") duration = num_time_bins * dt quantiles = np.array(quantiles) quantiles.sort() files_encountered = {} for i, (fn, song_onset) in enumerate(zip(fns, song_onsets)): # Unshifted onsets and offsets. onsets = song_onset + duration * quantiles[:-1] offsets = song_onset + duration * quantiles[1:] # Apply shifts. onsets += shifts[i] * dt offsets += shifts[i] * dt # Save. index = audio_dirs.index(os.path.split(fn)[0]) write_fn = os.path.join(syll_seg_dirs[index], os.path.split(fn)[-1]) write_fn = write_fn[:-4] + '.txt' if not os.path.exists(os.path.split(write_fn)[0]): os.makedirs(os.path.split(write_fn)[0]) segs = np.stack([onsets, offsets]).reshape(2,-1).T header, mode = "", 'ab' if fn not in files_encountered: files_encountered[fn] = 1 mode = 'wb' header += "Syllables from song: " + fn + "\n" header += "Song onset: "+str(song_onset) with open(write_fn, mode) as f: np.savetxt(f, segs, fmt='%.5f', header=header) # Write empty files corresponding to audio files without song. for fn in empty_audio_files: index = audio_dirs.index(os.path.split(fn)[0]) write_fn = os.path.join(syll_seg_dirs[index], os.path.split(fn)[-1]) write_fn = write_fn[:-4] + '.txt' if not os.path.exists(os.path.split(write_fn)[0]): os.makedirs(os.path.split(write_fn)[0]) header = "Syllables from song: " + fn np.savetxt(write_fn, np.array([]), header=header)
[docs]def segment_sylls_from_warped_songs(warped_window_dset, audio_dirs, spec_dirs, \ time_bins=512, num_specs=3, img_fn='temp.pdf', verbose=True): """ Split time-warped song renditions into time-warped syllables, save specs. Enter quantiles to determine where to split the song motif. Entering the same quantile twice will remove it. Parameters ---------- warped_window_dset : ava.models.window_vae_dataset.WarpedWindowDataset Dataset defining a warping. audio_dirs : list of str Audio directories. spec_dirs : list of str Spectrogram directories. time_bins : int, optional Number of spectrogram time bins to plot. num_specs : int, optional Number of spectrograms to plot. Defaults to `1`. img_fn : str, optional Image filename. Defaults to ``'temp.pdf'``. verbose : bool, optional Defaults to `True`. """ # Check the input. audio_dir_to_spec_dir = dict(zip(audio_dirs,spec_dirs)) for audio_fn in warped_window_dset.audio_filenames: assert os.path.split(audio_fn)[0] in audio_dirs, "Cannot find " + \ os.path.split(audio_fn)[0] + " in audio_dirs!" # Collect segmenting quantiles. start_q, stop_q = warped_window_dset.start_q, warped_window_dset.stop_q error_msg = "Invalid input!\nMust be \'s\' or a float between " + \ "{0:.2f}".format(start_q) + " and " + "{0:.2f}".format(stop_q) + "." p, fs = warped_window_dset.p, warped_window_dset.fs quantiles = [] break_flag = False while True: # Plot. _, axarr = plt.subplots(nrows=num_specs, sharex=True) if num_specs == 1: axarr = [axarr] axarr[0].set_title('Enter segmenting quantiles:') for i in range(num_specs): plt.sca(axarr[i]) index = np.random.randint(len(warped_window_dset.audio_filenames)) audio_fn = warped_window_dset.audio_filenames[index] warped_spec = warped_window_dset.get_whole_warped_spectrogram( \ audio_fn, time_bins=time_bins) plt.imshow(warped_spec, origin='lower', aspect='auto', \ extent=[start_q,stop_q,p['min_freq']/1e3,p['max_freq']/1e3]) for q in quantiles: plt.axvline(x=q, color='red') plt.ylabel("Frequency (kHz)") plt.xlabel('Warped Time Quantile') plt.savefig(img_fn) plt.close('all') # Ask for segmenting decisions. while True: temp = input("Add or delete quantile or [s]top: ") if temp == '': break elif temp == 's': break_flag = True break try: temp = float(temp) assert start_q < temp and temp < stop_q if temp in quantiles: quantiles.remove(temp) else: quantiles.append(temp) break except: print(error_msg) continue if break_flag: break assert len(quantiles) > 1, "Not enough quantiles to segment!" # Write syllable spectrograms. if verbose: print("Making and saving syllable spectrograms...") quantiles = sorted(quantiles) segs = [[q1,q2] for q1, q2 in zip(quantiles[:-1],quantiles[1:])] num_saved = 0 template_dur = warped_window_dset.template_dur for audio_fn in warped_window_dset.audio_filenames: syll_data = { 'specs':[], 'onsets':[], 'offsets':[], 'audio_filenames':[], } for q1, q2 in segs: syll_data['onsets'].append(q1) # Quantiles are saved, not times. syll_data['offsets'].append(q2) syll_data['audio_filenames'].append(audio_fn) # Make spectrogram. index = warped_window_dset.audio_filenames.index(audio_fn) t_vals = np.linspace(q1, q2, p['num_time_bins']) # Inverse warp. target_ts = warped_window_dset._get_unwarped_times(t_vals, index) target_ts *= template_dur # Then make a spectrogram. spec, flag = p['get_spec'](0.0, template_dur, \ warped_window_dset.audio[index], p, fs=fs, max_dur=None, \ target_times=target_ts) assert flag syll_data['specs'].append(spec) num_saved += 1 audio_dir = os.path.split(audio_fn)[0] write_fn = os.path.split(audio_fn)[-1][:-4] + '.hdf5' spec_dir = audio_dir_to_spec_dir[audio_dir] if not os.path.exists(spec_dir): os.makedirs(spec_dir) write_fn = os.path.join(spec_dir, write_fn) with h5py.File(write_fn, "w") as f: # Add all the fields. for key in ['onsets', 'offsets']: f.create_dataset(key, data=np.array(syll_data[key])) f.create_dataset('specs', data=np.stack(syll_data['specs'])) f.create_dataset('audio_filenames', \ data=np.array(syll_data['audio_filenames']).astype('S')) if verbose: print("\tSaved "+str(num_saved)+" spectrograms.") print("\tDone.")
def _get_spec(fs, audio, p): """ Get a spetrogram. Not many options here. Parameters ---------- fs : float Samplerate. audio : numpy.ndarray Raw audio. p : dict Parameters. Must contain keys: ``'nperseg'``, ``'noverlap'``, ``'min_freq'``, ``'max_freq'``, ``'spec_min_val'``, and ``'spec_max_val'``. Returns ------- spec : numpy.ndarray Spectrogram. dt : float Timestep. """ f, t, spec = stft(audio, fs=fs, nperseg=p['nperseg'], \ noverlap=p['noverlap']) f1 = np.searchsorted(f, p['min_freq']) f2 = np.searchsorted(f, p['max_freq']) spec = spec[f1:f2] spec = np.log(np.abs(spec) + EPSILON) spec -= p['spec_min_val'] spec /= p['spec_max_val'] - p['spec_min_val'] spec[spec<0.0] = 0.0 spec[spec>1.0] = 1.0 return spec, t[1]-t[0] def _clean_max_indices(old_indices, old_times, values, min_dt=0.05): """Remove maxima that are too close together.""" if len(old_indices) <= 1: return old_indices old_indices = old_indices[np.argsort(values[old_indices])] indices = [old_indices[0]] times = [old_times[old_indices[0]]] i = 1 while i < len(old_indices): time = old_times[old_indices[i]] flag = True for j in range(len(indices)): if abs(old_times[indices[j]] - time) < min_dt: flag = False break if flag: indices.append(old_indices[i]) times.append(old_times[old_indices[i]]) i += 1 indices = np.array(indices) indices.sort() return indices def _in_region(point, bounds): """Is the point in the union of the given rectangles?""" for i in range(len(bounds['x1s'])): x_min = min(bounds['x1s'][i], bounds['x2s'][i]) x_max = max(bounds['x1s'][i], bounds['x2s'][i]) y_min = min(bounds['y1s'][i], bounds['y2s'][i]) y_max = max(bounds['y1s'][i], bounds['y2s'][i]) if point[0] > x_min and point[0] < x_max and point[1] > y_min and \ point[1] < y_max: return True return False def _is_wav_file(filename): return len(filename) > 4 and filename[-4:] == '.wav' def _is_number(answer): try: answer = float(answer) return True except: if answer != 'initial input': print("Invalid input!") return False if __name__ == '__main__': pass ###