Source code for ava.models.window_vae_dataset

"""
Methods for feeding randomly sampled spectrogram data to the shotgun VAE.

Meant to be used with `ava.models.vae.VAE`.

TO DO
-----
- replace `affinewarp` with `ava.preprocessing.warping`

"""
__date__ = "August 2019 - November 2020"


from affinewarp import PiecewiseWarping
import h5py
import numpy as np
import os
from scipy.interpolate import interp1d
from scipy.io import wavfile
from scipy.io.wavfile import WavFileWarning
from torch.utils.data import Dataset, DataLoader
import warnings

from ava.models.utils import numpy_to_tensor, _get_wavs_from_dir, \
		_get_specs_and_amplitude_traces


DEFAULT_WARP_PARAMS = {
	'n_knots': 0, # number of pieces minus one in the piecwise linear warp
	'warp_reg_scale': 1e-2, # penalizes distance of warp to identity line
	'smoothness_reg_scale': 1e-1, # penalizes L2 norm of warp second derivatives
	'l2_reg_scale': 1e-7, # penalizes L2 norm of warping template
}
"""Default time-warping parameters sent to affinewarp"""

EPSILON = 1e-9



[docs]def get_window_partition(audio_dirs, roi_dirs, split=0.8, shuffle=True, \ exclude_empty_roi_files=True): """ Get a train/test split for fixed-duration shotgun VAE. Parameters ---------- audio_dirs : list of str Audio directories. roi_dirs : list of str ROI (segment) directories. split : float, optional Train/test split. Defaults to ``0.8``, indicating an 80/20 train/test split. shuffle : bool, optional Whether to shuffle at the audio file level. Defaults to ``True``. exclude_empty_roi_files : bool, optional Defaults to ``True``. Returns ------- partition : dict Defines the test/train split. The keys ``'test'`` and ``'train'`` each map to a dictionary with keys ``'audio'`` and ``'rois'``, which both map to numpy arrays containing filenames. """ assert(split > 0.0 and split <= 1.0) # Collect filenames. audio_filenames, roi_filenames = [], [] for audio_dir, roi_dir in zip(audio_dirs, roi_dirs): temp_wavs = _get_wavs_from_dir(audio_dir) temp_rois = [os.path.join(roi_dir, os.path.split(i)[-1][:-4]+'.txt') \ for i in temp_wavs] if exclude_empty_roi_files: for i in reversed(range(len(temp_wavs))): segs = np.loadtxt(temp_rois[i]) if len(segs) == 0: del temp_wavs[i] del temp_rois[i] audio_filenames += temp_wavs roi_filenames += temp_rois # Reproducibly shuffle. audio_filenames = np.array(audio_filenames) roi_filenames = np.array(roi_filenames) perm = np.argsort(audio_filenames) audio_filenames, roi_filenames = audio_filenames[perm], roi_filenames[perm] if shuffle: np.random.seed(42) perm = np.random.permutation(len(audio_filenames)) audio_filenames = audio_filenames[perm] roi_filenames = roi_filenames[perm] np.random.seed(None) # Split. i = int(round(split * len(audio_filenames))) return { \ 'train': { \ 'audio': audio_filenames[:i], 'rois': roi_filenames[:i]}, \ 'test': { \ 'audio': audio_filenames[i:], 'rois': roi_filenames[i:]} \ }
[docs]def get_fixed_window_data_loaders(partition, p, batch_size=64, \ shuffle=(True, False), num_workers=4, min_spec_val=None): """ Get DataLoaders for training and testing: fixed-duration shotgun VAE Parameters ---------- partition : dict Output of ``ava.models.window_vae_dataset.get_window_partition``. p : dict Preprocessing parameters. Must contain keys: ... batch_size : int, optional Defaults to ``64``. shuffle : tuple of bool, optional Whether to shuffle train and test sets, respectively. Defaults to ``(True, False)``. num_workers : int, optional Number of CPU workers to feed data to the network. Defaults to ``4``. Returns ------- loaders : dict Maps the keys ``'train'`` and ``'test'`` to their respective DataLoaders. """ train_dataset = FixedWindowDataset(partition['train']['audio'], \ partition['train']['rois'], p, transform=numpy_to_tensor, \ min_spec_val=min_spec_val) train_dataloader = DataLoader(train_dataset, batch_size=batch_size, \ shuffle=shuffle[0], num_workers=num_workers) if not partition['test']: return {'train':train_dataloader, 'test':None} test_dataset = FixedWindowDataset(partition['test']['audio'], \ partition['test']['rois'], p, transform=numpy_to_tensor, \ min_spec_val=min_spec_val) test_dataloader = DataLoader(test_dataset, batch_size=batch_size, \ shuffle=shuffle[1], num_workers=num_workers) return {'train':train_dataloader, 'test':test_dataloader}
[docs]class FixedWindowDataset(Dataset): def __init__(self, audio_filenames, roi_filenames, p, transform=None, dataset_length=2048, min_spec_val=None): """ Create a torch.utils.data.Dataset for chunks of animal vocalization. Parameters ---------- audio_filenames : list of str List of wav files. roi_filenames : list of str List of files containing animal vocalization times. transform : {``None``, function}, optional Transformation to apply to each item. Defaults to ``None`` (no transformation). dataset_length : int, optional Arbitrary number that determines batch size. Defaults to ``2048``. min_spec_val : {float, None}, optional Used to disregard silence. If not `None`, spectrogram with a maximum value less than `min_spec_val` will be disregarded. """ self.filenames = np.array(sorted(audio_filenames)) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) self.audio = [wavfile.read(fn)[1] for fn in self.filenames] self.fs = wavfile.read(audio_filenames[0])[0] self.roi_filenames = roi_filenames self.dataset_length = dataset_length self.min_spec_val = min_spec_val self.p = p self.rois = [np.loadtxt(i, ndmin=2) for i in roi_filenames] self.file_weights = np.array([np.sum(np.diff(i)) for i in self.rois]) self.file_weights /= np.sum(self.file_weights) self.roi_weights = [] for i in range(len(self.rois)): temp = np.diff(self.rois[i]).flatten() self.roi_weights.append(temp/np.sum(temp)) self.transform = transform def __len__(self): """NOTE: length is arbitrary""" return self.dataset_length def __getitem__(self, index, seed=None, shoulder=0.05, \ return_seg_info=False): """ Get spectrograms. Parameters ---------- index : seed : shoulder : return_seg_info : Returns ------- specs : file_indices : onsets : offsets : """ specs, file_indices, onsets, offsets = [], [], [], [] single_index = False try: iterator = iter(index) except TypeError: index = [index] single_index = True np.random.seed(seed) for i in index: while True: # First find the file, then the ROI. file_index = np.random.choice(np.arange(len(self.filenames)), \ p=self.file_weights) load_filename = self.filenames[file_index] roi_index = \ np.random.choice(np.arange(len(self.roi_weights[file_index])), p=self.roi_weights[file_index]) roi = self.rois[file_index][roi_index] # Then choose a chunk of audio uniformly at random. onset = roi[0] + (roi[1] - roi[0] - self.p['window_length']) \ * np.random.rand() offset = onset + self.p['window_length'] target_times = np.linspace(onset, offset, \ self.p['num_time_bins']) # Then make a spectrogram. spec, flag = self.p['get_spec'](max(0.0, onset-shoulder), \ offset+shoulder, self.audio[file_index], self.p, \ fs=self.fs, target_times=target_times) if not flag: continue # Remake the spectrogram if it's silent. if self.min_spec_val is not None and \ np.max(spec) < self.min_spec_val: continue if self.transform: spec = self.transform(spec) specs.append(spec) file_indices.append(file_index) onsets.append(onset) offsets.append(offset) break np.random.seed(None) if return_seg_info: if single_index: return specs[0], file_indices[0], onsets[0], offsets[0] return specs, file_indices, onsets, offsets if single_index: return specs[0] return specs
[docs] def write_hdf5_files(self, save_dir, num_files=500, sylls_per_file=100): """ Write hdf5 files containing spectrograms of random audio chunks. TO DO ----- * Write to multiple directories. Note ---- * This should be consistent with `ava.preprocessing.preprocess.process_sylls`. Parameters ---------- save_dir : str Directory to save hdf5s in. num_files : int, optional Number of files to save. Defaults to ``500``. sylls_per_file : int, optional Number of syllables in each file. Defaults to ``100``. """ if not os.path.exists(save_dir): os.mkdir(save_dir) for write_file_num in range(num_files): specs, file_indices, _, _ = \ self.__getitem__(np.arange(sylls_per_file), \ seed=write_file_num, return_seg_info=True) specs = np.array([spec.detach().numpy() for spec in specs]) filenames = np.array([self.filenames[i] for i in file_indices]) fn = "syllables_" + str(write_file_num).zfill(4) + '.hdf5' fn = os.path.join(save_dir, fn) with h5py.File(fn, "w") as f: f.create_dataset('specs', data=specs) f.create_dataset('audio_filenames', data=filenames.astype('S'))
[docs]def get_warped_window_data_loaders(audio_dirs, p, batch_size=64, num_workers=4,\ load_warp=False, warp_fn=None, warp_params={}, warp_type='spectrogram'): """ Get DataLoaders for training and testing: warped shotgun VAE Warning ------- * Audio files must all be the same duration! You can use `segmenting.utils.write_segments_to_audio` to extract audio from song segments, writing them as separate ``.wav`` files. TO DO ----- * Add a train/test split! Parameters ---------- audio_dirs : list of str Audio directories. p : dict Preprocessing parameters. Must contain keys: ``'window_length'``, ``'nperseg'``, ``'noverlap'``, ``'min_freq'``, ``'max_freq'``, ``'spec_min_val'``, and ``'spec_max_val'``. batch_size : int, optional DataLoader batch size. Defaults to ``64``. num_workers : int, optional Number of CPU workers to retrieve data for the model. Defaults to ``4``. load_warp : bool, optional Whether to load a previously saved time warping result. Defaults to ``False``. warp_fn : {str, None}, optional Where the x-knots and y-knots should be saved and loaded. Defaults to ``None``. warp_params : dict, optional Parameters passed to affinewarp. Defaults to ``{}``. warp_type : {``'amplitude'``, ``'spectrogram'``, ``'null'``}, optional Whether to time-warp using ampltidue traces, full spectrograms, or not warp at all. Defaults to ``'spectrogram'``. Returns ------- loaders : dict Maps the keys ``'train'`` and ``'test'`` to their respective DataLoaders. """ assert type(p) == type({}) assert warp_type in ['amplitude', 'spectrogram', 'null'] # Collect audio filenames. audio_fns = [] for audio_dir in audio_dirs: audio_fns += _get_wavs_from_dir(audio_dir) # Make the Dataset and DataLoader. dataset = WarpedWindowDataset(audio_fns, p, \ transform=numpy_to_tensor, load_warp=load_warp, warp_fn=warp_fn, \ warp_params=warp_params, warp_type=warp_type) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, \ num_workers=num_workers) return {'train': dataloader, 'test': dataloader}
[docs]class WarpedWindowDataset(Dataset): def __init__(self, audio_filenames, p, transform=None, dataset_length=2048,\ load_warp=False, save_warp=True, start_q=-0.1, stop_q=1.1, \ warp_fn=None, warp_params={}, warp_type='spectrogram'): """ Dataset for time-warped chunks of animal vocalization TO DO ----- * Use `affinewarp` functions instead of direct references to knots. Parameters ---------- audio_filenames : list of strings List of .wav files. p : dict Preprocessing parameters. Must contain keys: ``'window_length'``, ``'nperseg'``, ``'noverlap'``, ``'min_freq'``, ``'max_freq'``, ``'spec_min_val'``, and ``'spec_max_val'``. transform : {None, function}, optional Transformation to apply to each item. Defaults to ``None`` (no transformation). dataset_length : int, optional Defaults to ``2048``. This is an arbitrary number that determines how many batches make up an epoch. load_warp : bool, optional Whether to load the results of a previous warp. Defaults to ``False``. save_warp : bool, optional Whether to save the results of the warp. Defaults to ``True``. start_q : float, optional Start quantile. Defaults to ``-0.1``. stop_q : float, optional Stop quantile. Defaults to ``1.1``. warp_fn : {None, str}, optional Where to save the x knots and y knots of the warp. If ``None``, then nothing will be saved or loaded. Defaults to ``None``. warp_params : dict, optional Parameters passed to affinewarp. Defaults to ``{}``. warp_type : {``'amplitude'``, ``'spectrogram'``, ``'null'``}, optional Whether to time-warp using ampltidue traces, full spectrograms, or not warp at all. Defaults to ``'spectrogram'``. """ assert type(p) == type({}) assert warp_type in ['amplitude', 'spectrogram', 'null'] self.audio_filenames = sorted(audio_filenames) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) self.audio = [wavfile.read(fn)[1] for fn in self.audio_filenames] self.fs = wavfile.read(self.audio_filenames[0])[0] self.dataset_length = dataset_length self.p = p self.transform = transform self.start_q = start_q self.stop_q = stop_q self.warp_fn = warp_fn self.warp_params = {**DEFAULT_WARP_PARAMS, **warp_params} self._compute_warp(load_warp=load_warp, save_warp=save_warp, \ warp_type=warp_type) self.window_frac = self.p['window_length'] / self.template_dur def __len__(self): """NOTE: length is arbitrary.""" return self.dataset_length
[docs] def write_hdf5_files(self, save_dir, num_files=400, sylls_per_file=100): """ Write hdf5 files containing spectrograms of random audio chunks. Note ---- This should be consistent with ``ava.preprocessing.preprocess.process_sylls``. TO DO ----- * Add the option to also write segments. This could be useful for noise removal. Parameters ---------- save_dir : str Where to write. num_files : int, optional Number of files to write. Defaults to `400`. sylls_per_file : int, optional Number of spectrograms to write per file. Defaults to `100`. """ if save_dir != '' and not os.path.exists(save_dir): os.mkdir(save_dir) for write_file_num in range(num_files): specs = self.__getitem__(np.arange(sylls_per_file), seed=write_file_num) specs = np.array([spec.detach().numpy() for spec in specs]) fn = "sylls_" + str(write_file_num).zfill(4) + '.hdf5' fn = os.path.join(save_dir, fn) with h5py.File(fn, "w") as f: f.create_dataset('specs', data=specs)
def _get_unwarped_times(self, y_vals, index): """ Convert warped quantile times in [0,1] to real quantile times. Assumes y_vals is sorted. In affinewarp, x-values are empirical times, stored as quantiles from 0 to 1, and y-values are template times. Here, we're given template times and converting to empirical times. In other words we're considering measured times as ``unwarped'' and aligned times as ``warped''. """ x_knots, y_knots = self.x_knots[index], self.y_knots[index] interp = interp1d(y_knots, x_knots, bounds_error=False, \ fill_value='extrapolate', assume_sorted=True) x_vals = interp(y_vals) return x_vals def _compute_warp(self, load_warp=False, save_warp=True, \ warp_type='spectrogram'): """ Jointly warp all the song motifs. Warping is performed on spectrograms if ``warp_type == 'spectrogram'``. Otherwise, if ``warp_type == 'amplitude'``, warping is performed on spectrograms summed over the frequency dimension. """ if save_warp: assert self.warp_fn is not None, "``warp_fn`` must be specified " +\ "to save warps!" # If it's a null warp, make it and return. if warp_type == 'null': knots = np.zeros((len(self.audio),2)) knots[:,1] = 1.0 self.x_knots = knots self.y_knots = np.copy(knots) _, _, template_dur = _get_specs_and_amplitude_traces(self.audio,\ self.fs, self.p) self.template_dur = template_dur print("Made null warp.") if save_warp: print("Saving warp to:", self.warp_fn) to_save = { 'x_knots' : self.x_knots, 'y_knots' : self.y_knots, 'template_dur' : self.template_dur, 'audio_filenames' : self.audio_filenames, 'warp_params': self.warp_params, } np.save(self.warp_fn, to_save) return # Load warps if we can. if load_warp: if self.warp_fn is None: warnings.warn( "Tried to load warps, but ``warp_fns`` is None.", UserWarning ) else: try: data = np.load(self.warp_fn, allow_pickle=True).item() self.x_knots = data['x_knots'] self.y_knots = data['y_knots'] self.template_dur = data['template_dur'] temp_fns = data['audio_filenames'] assert np.all(temp_fns[:-1] <= temp_fns[1:]), "Filenames "+\ "in " + self.warp_fn + " are not sorted!" assert len(temp_fns) >= len(self.audio_filenames) if len(temp_fns) == len(self.audio_filenames): # If the saved filenames and the passed filenames have # the same length, make sure they match. assert np.array_equal(temp_fns, self.audio_filenames), \ "Input filenames do not match saved filenames!" else: # Otherwise, make sure the passed filenames are a subset # of the saved filenames and keep track of the correct # indices. unique_fns = np.unique(self.audio_filenames) assert len(self.audio_filenames) == len(unique_fns) perm = np.zeros(len(self.audio_filenames), dtype='int') for i in range(len(self.audio_filenames)): assert self.audio_filenames[i] in temp_fns, \ "Could not find filename " + \ self.audio_filenames[i] + " in saved warps!" index = temp_fns.index(self.audio_filenames[i]) perm[i] = index self.x_knots = self.x_knots[perm] self.y_knots = self.y_knots[perm] if type(self.audio_filenames) == type(np.array([])): self.audio_filenames = self.audio_filenames.tolist() self.warp_params = data['warp_params'] return except IOError: warnings.warn( "Can't load warps from: "+str(self.warp_fn), UserWarning ) # Otherwise, first make the spectrograms. amps, specs, template_dur = _get_specs_and_amplitude_traces(self.audio,\ self.fs, self.p) self.template_dur = template_dur # Then warp. model = PiecewiseWarping(**self.warp_params) if warp_type == 'amplitude': print("Computing amplitude warp:", amps.shape) model.fit(amps, iterations=50, warp_iterations=200) elif warp_type == 'spectrogram': print("Computing spectrogram warp:", specs.shape) model.fit(specs, iterations=50, warp_iterations=200) else: raise NotImplementedError # Save the warps. self.x_knots = model.x_knots self.y_knots = model.y_knots if save_warp: print("Saving warp to:", self.warp_fn) to_save = { 'x_knots' : self.x_knots, 'y_knots' : self.y_knots, 'template_dur' : self.template_dur, 'audio_filenames' : self.audio_filenames, 'amplitude_traces': amps, 'warp_params': self.warp_params, } np.save(self.warp_fn, to_save) def __getitem__(self, index, seed=None): """ Return a random window of birdsong. Parameters ---------- index : {int, list of int} Determines the number of spectrograms to return. If an int is passed, a single spectrogram is returned. If a list is passed, ``len(index)`` spectrograms are returned. Elements (ints) themselves are ignored. seed : {None, int}, optional Random seed Returns ------- spec : {numpy.ndarray, list of numpy.ndarray} Spectrograms """ result = [] single_index = False try: iterator = iter(index) except TypeError: index = [index] single_index = True np.random.seed(seed) for i in index: while True: # First find the file, then the ROI. file_index = np.random.randint(len(self.audio)) # Then choose a chunk of audio uniformly at random. start_t = self.start_q + np.random.rand() * \ (self.stop_q - self.start_q - self.window_frac) stop_t = start_t + self.window_frac t_vals = np.linspace(start_t, stop_t, self.p['num_time_bins']) # Inverse warp. target_ts = self._get_unwarped_times(t_vals, file_index) target_ts *= self.template_dur # Then make a spectrogram. spec, flag = self.p['get_spec'](0.0, self.template_dur, \ self.audio[file_index], self.p, fs=self.fs, \ max_dur=None, target_times=target_ts) assert flag if self.transform: spec = self.transform(spec) result.append(spec) break np.random.seed(None) if single_index: return result[0] return result
[docs] def get_specific_item(self, query_filename, quantile): """ Return a specific window of birdsong as a Numpy array. Parameters ---------- query_filename : str Audio filename. quantile : float 0 <= ``quantile`` <= 1 Returns ------- spec : numpy.ndarray Spectrogram. """ file_index = self.audio_filenames.index(query_filename) start_t = self.start_q + quantile * \ (self.stop_q - self.start_q - self.window_frac) stop_t = start_t + self.window_frac t_vals = np.linspace(start_t, stop_t, self.p['num_time_bins']) # Inverse warp. target_ts = self._get_unwarped_times(t_vals, file_index) target_ts *= self.template_dur # Then make a spectrogram. spec, flag = self.p['get_spec'](0.0, self.template_dur, \ self.audio[file_index], self.p, fs=self.fs, \ max_dur=None, target_times=target_ts) assert flag return spec
[docs] def get_whole_warped_spectrogram(self, query_filename, time_bins=128): """ Get an entire warped song motif. Parameters ---------- query_filename : str Which audio file to use. time_bins : int, optional Number of time bins. Returns ------- spec : numpy.ndarray Spectrogram. """ file_index = self.audio_filenames.index(query_filename) t_vals = np.linspace(self.start_q, self.stop_q, time_bins) # Inverse warp. target_ts = self._get_unwarped_times(t_vals, file_index) target_ts *= self.template_dur # Then make a spectrogram. spec, flag = self.p['get_spec'](0.0, self.template_dur, \ self.audio[file_index], self.p, fs=self.fs, \ max_dur=None, target_times=target_ts) assert flag return spec
if __name__ == '__main__': pass ###