Source code for ava.preprocessing.preprocess

"""
Make and save syllable spectrograms.

"""
__date__ = "December 2018 - July 2020"


import h5py
import matplotlib.pyplot as plt
plt.switch_backend('agg')
import numpy as np
import os
from scipy.io import wavfile
from scipy.io.wavfile import WavFileWarning
import warnings

from ava.preprocessing.utils import _mel, _inv_mel

EPSILON = 1e-12



[docs]def process_sylls(audio_dir, segment_dir, save_dir, p, shuffle=True, \ verbose=True): """ Extract syllables from `audio_dir` and save to `save_dir`. Parameters ---------- audio_dir : str Directory containing audio files. segment_dir : str Directory containing segmenting decisions. save_dir : str Directory to save processed syllables in. p : dict Preprocessing parameters. TO DO: add reference. shuffle : bool, optional Shuffle by filename. Defaults to ``True``. verbose : bool, optional Defaults to ``True``. """ if verbose: print("Processing audio files in", audio_dir) if not os.path.exists(save_dir): os.makedirs(save_dir) audio_filenames, seg_filenames = \ get_audio_seg_filenames(audio_dir, segment_dir, p) if shuffle: np.random.seed(42) perm = np.random.permutation(len(audio_filenames)) np.random.seed(None) audio_filenames = np.array(audio_filenames)[perm] seg_filenames = np.array(seg_filenames)[perm] write_file_num = 0 syll_data = { 'specs':[], 'onsets':[], 'offsets':[], 'audio_filenames':[], } sylls_per_file = p['sylls_per_file'] # For each pair of files... for audio_filename, seg_filename in zip(audio_filenames, seg_filenames): # Get onsets and offsets. onsets, offsets = read_onsets_offsets_from_file(seg_filename, p) # Retrieve a spectrogram for each detected syllable. specs, good_sylls = get_syll_specs(onsets, offsets, audio_filename, p) onsets = [onsets[i] for i in good_sylls] offsets = [offsets[i] for i in good_sylls] # Add the syllables to <syll_data>. syll_data['specs'] += specs syll_data['onsets'] += onsets syll_data['offsets'] += offsets syll_data['audio_filenames'] += \ len(onsets)*[os.path.split(audio_filename)[-1]] # Write files until we don't have enough syllables. while len(syll_data['onsets']) >= sylls_per_file: save_filename = \ "syllables_" + str(write_file_num).zfill(4) + '.hdf5' save_filename = os.path.join(save_dir, save_filename) with h5py.File(save_filename, "w") as f: # Add all the fields. for key in ['onsets', 'offsets']: f.create_dataset(key, \ data=np.array(syll_data[key][:sylls_per_file])) f.create_dataset('specs', \ data=np.stack(syll_data['specs'][:sylls_per_file])) temp = [os.path.join(audio_dir, i) for i in \ syll_data['audio_filenames'][:sylls_per_file]] f.create_dataset('audio_filenames', \ data=np.array(temp).astype('S')) write_file_num += 1 # Remove the written data from temporary storage. for key in syll_data: syll_data[key] = syll_data[key][sylls_per_file:] # Stop if we've written `max_num_syllables`. if p['max_num_syllables'] is not None and \ write_file_num*sylls_per_file >= p['max_num_syllables']: if verbose: print("\tSaved max_num_syllables (" + \ str(p['max_num_syllables'])+"). Returning.") return if verbose: print("\tDone.")
[docs]def get_syll_specs(onsets, offsets, audio_filename, p): """ Return the spectrograms corresponding to `onsets` and `offsets`. Parameters ---------- onsets : list of floats Syllable onsets. offsets : list of floats Syllable offsets. audio_filename : str Audio filename. p : dict A dictionary mapping preprocessing parameters to their values. NOTE: ADD REFERENCE HERE! Returns ------- specs : list of {numpy.ndarray, None} Spectrograms. valid_syllables : list of int Indices of `specs` containing valid syllables. """ with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) fs, audio = wavfile.read(audio_filename) assert p['nperseg'] % 2 == 0 and p['nperseg'] > 2 if p['mel']: target_freqs = np.linspace( \ _mel(p['min_freq']), _mel(p['max_freq']), p['num_freq_bins']) target_freqs = _inv_mel(target_freqs) else: target_freqs = np.linspace( \ p['min_freq'], p['max_freq'], p['num_freq_bins']) specs, valid_syllables = [], [] # For each syllable... for i, t1, t2 in zip(range(len(onsets)), onsets, offsets): spec, valid = p['get_spec'](t1, t2, audio, p, fs, \ target_freqs=target_freqs) if valid: valid_syllables.append(i) specs.append(spec) return specs, valid_syllables
[docs]def tune_syll_preprocessing_params(audio_dirs, seg_dirs, p, img_fn='temp.pdf'): """ Flip through spectrograms and tune preprocessing parameters. Parameters ---------- audio_dirs : list of str Audio directories seg_dirs : list of str Segment directories p : dict Preprocessing parameters: Add a reference! Returns ------- p : dict Adjusted preprocessing parameters. """ print("Tune preprocessing parameters:") # Collect all the relevant filenames. audio_filenames, seg_filenames = [], [] for audio_dir, seg_dir in zip(audio_dirs, seg_dirs): temp_audio, temp_seg = get_audio_seg_filenames(audio_dir, seg_dir, p) audio_filenames += temp_audio seg_filenames += temp_seg audio_filenames = np.array(audio_filenames) seg_filenames = np.array(seg_filenames) assert len(audio_filenames) > 0, "Didn't find any audio files!" # Main loop: keep tuning parameters ... while True: # Tune parameters. p = _tune_input_helper(p) # Keep plotting example spectrograms. temp = 'not (s or r)' while temp != 's' and temp != 'r': # Grab a random file. file_index = np.random.randint(len(audio_filenames)) audio_filename = audio_filenames[file_index] seg_filename = seg_filenames[file_index] # Grab a random syllable from within the file. onsets, offsets = read_onsets_offsets_from_file(seg_filename, p) if len(onsets) == 0: continue syll_index = np.random.randint(len(onsets)) onsets, offsets = [onsets[syll_index]], [offsets[syll_index]] # Get the preprocessed spectrogram. specs, good_sylls = get_syll_specs(onsets, offsets, \ audio_filename, p) specs = [specs[i] for i in good_sylls] if len(specs) == 0: continue spec = specs[np.random.randint(len(specs))] # Plot. plt.imshow(spec, aspect='equal', origin='lower', vmin=0, vmax=1) plt.axis('off') plt.savefig(img_fn) plt.close('all') temp = input('Continue? [y] or [s]top tuning or [r]etune params: ') if temp == 's': return p
[docs]def tune_window_preprocessing_params(audio_dirs, p, img_fn='temp.pdf'): """ Flip through spectrograms and tune preprocessing parameters. Parameters ---------- audio_dirs : list of str Audio directories p : dict Preprocessing parameters ADD REFERENCE img_fn : str, optional Where to save images. Defaults to ``'temp.pdf'``. Returns ------- p : dict Adjusted preprocessing parameters. """ print("Tune preprocessing parameters:") # Collect all the relevant filenames. audio_filenames = [] for audio_dir in audio_dirs: audio_filenames += get_audio_filenames(audio_dir) audio_filenames = np.array(audio_filenames) # Main loop: keep tuning parameters ... while True: # Tune parameters. p = _tune_input_helper(p) # Keep plotting example spectrograms. temp = 'not (s or r)' while temp != 's' and temp != 'r': # Grab a random file. file_index = np.random.randint(len(audio_filenames)) audio_filename = audio_filenames[file_index] with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=WavFileWarning) fs, audio = wavfile.read(audio_filename) assert fs == p['fs'], "Found fs="+str(fs)+", expected "+str(p['fs']) # Get a random onset & offset. duration = len(audio) / fs assert duration > p['window_length'] onset = np.random.rand() * (duration - p['window_length']) offset = onset + p['window_length'] target_times = np.linspace(onset, offset, p['num_time_bins']) # Get the preprocessed spectrogram. spec, flag = p['get_spec'](0.0, duration, audio, p, fs=p['fs'], \ max_dur=None, target_times=target_times) assert flag # Plot. plt.imshow(spec, aspect='equal', origin='lower', vmin=0, vmax=1) plt.axis('off') plt.savefig(img_fn) plt.close('all') temp = input('Continue? [y] or [s]top tuning or [r]etune params: ') if temp == 's': return p
def _tune_input_helper(p): """Get parameter adjustments from the user.""" for key in p['real_preprocess_params']: temp = 'not (number or empty)' while not _is_number_or_empty(temp): temp = input('Set value for '+key+': ['+str(p[key])+ '] ') if temp != '': p[key] = float(temp) for key in p['int_preprocess_params']: temp = 'not (number or empty)' while not _is_number_or_empty(temp): temp = input('Set value for '+key+': ['+str(p[key])+ '] ') if temp != '': p[key] = int(temp) for key in p['binary_preprocess_params']: temp = 'not (t or f)' while temp not in ['t', 'T', 'f', 'F', '']: current_value = 'T' if p[key] else 'F' temp = input('Set value for '+key+': ['+current_value+'] ') if temp != '': p[key] = temp in ['t', 'T'] return p
[docs]def get_audio_seg_filenames(audio_dir, segment_dir, p): """Return lists of sorted filenames.""" # Collect all the audio filenames. temp_filenames = [i for i in sorted(os.listdir(audio_dir)) if \ is_audio_file(i)] audio_filenames = [os.path.join(audio_dir, i) for i in temp_filenames] temp_filenames = [i[:-4] + '.txt' for i in temp_filenames] seg_filenames = [os.path.join(segment_dir, i) for i in temp_filenames] # Remove filenames with segments that don't exist. for i in range(len(seg_filenames)-1,-1,-1): if not os.path.exists(seg_filenames[i]): del seg_filenames[i] del audio_filenames[i] return audio_filenames, seg_filenames
[docs]def get_audio_filenames(audio_dir): """Return a list of sorted audio files.""" fns = [os.path.join(audio_dir, i) for i in sorted(os.listdir(audio_dir)) \ if is_audio_file(i)] return fns
[docs]def read_onsets_offsets_from_file(txt_filename, p): """ Read a text file to collect onsets and offsets. Note ---- * The text file must have two coulumns separated by whitespace and ``#`` prepended to header and footer lines. """ segs = np.loadtxt(txt_filename) assert segs.size % 2 == 0, "Incorrect formatting: " + txt_filename segs = segs.reshape(-1,2) return segs[:,0], segs[:,1]
def _is_number_or_empty(s): if s == '': return True try: float(s) return True except: return False def _is_number(s): return type(s) == type(4) or type(s) == type(4.0)
[docs]def is_audio_file(fn): """Return whether the given filename is an audio filename.""" return len(fn) >= 4 and fn[-4:] == '.wav'
if __name__ == '__main__': pass ###