From 0ef96288e19796ba28eb1f6138ca66bbb254dcf4 Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Mon, 6 Jun 2016 15:50:59 +0200 Subject: [PATCH] python/demos/demo_timestretch.py: add timescale algorithm --- python/demos/demo_timestretch.py | 110 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100755 python/demos/demo_timestretch.py diff --git a/python/demos/demo_timestretch.py b/python/demos/demo_timestretch.py new file mode 100755 index 00000000..8569a449 --- /dev/null +++ b/python/demos/demo_timestretch.py @@ -0,0 +1,110 @@ +#! /usr/bin/env python + +# Implementation of the timescale algorithm according to Dan Ellis, *A Phase +# Vocoder in Matlab*. http://www.ee.columbia.edu/~dpwe/resources/matlab/pvoc/ + +# This file follows the original implementation, with analysis in a first pass, +# and synthesis in a second pass. + +import sys +from aubio import source, sink, pvoc, mfcc, cvec +from aubio import unwrap2pi, float_type +import numpy as np + +win_s = 1024 +hop_s = win_s / 8 # 87.5 % overlap + +warmup = win_s // hop_s - 1 + +if len(sys.argv) < 3: + print("Usage: %s [samplerate]".format(sys.argv[0])) + print("""Examples: + # twice faster + {0} track_01.mp3 track_01_faster.wav 2.0 + # twice slower + {0} track_02.flac track_02_slower.wav 0.5 + # one and a half time faster, resampling first the input to 22050 + {0} track_02.flac track_02_slower.wav 1.5 22050""".format(sys.argv[0])) + sys.exit(1) + +source_filename = sys.argv[1] +output_filename = sys.argv[2] +rate = float(sys.argv[3]) + +samplerate = 0 if len(sys.argv) < 5 else int(sys.argv[4]) +source_in = source(source_filename, samplerate, hop_s) +samplerate = source_in.samplerate +p = pvoc(win_s, hop_s) + +# allocate memory to store norms and phases +n_blocks = source_in.duration // hop_s + 1 +# adding an empty frame at end of spectrogram +norms = np.zeros((n_blocks + 1, win_s // 2 + 1), dtype = float_type) +phases = np.zeros((n_blocks + 1, win_s // 2 + 1), dtype = float_type) + +block_read = 0 +while True: + # read from source + samples, read = source_in() + # compute fftgrain + spec = p(samples) + # store current grain + norms[block_read] = spec.norm + phases[block_read] = spec.phas + # until end of file + if read < hop_s: break + # increment block counter + block_read += 1 + +# just to make sure +#source_in.close() + +sink_out = sink(output_filename, samplerate) + +# interpolated time steps (j = alpha * i) +steps = np.arange(0, n_blocks, rate, dtype = float_type) +# initial phase +phas_acc = phases[0] +# excepted phase advance in each bin +phi_advance = np.linspace(0, np.pi * hop_s, win_s / 2 + 1).astype (float_type) + +new_grain = cvec(win_s) + +for (t, step) in enumerate(steps): + + frac = 1. - np.mod(step, 1.0) + # get pair of frames + t_norms = norms[int(step):int(step+2)] + t_phases = phases[int(step):int(step+2)] + + # compute interpolated frame + new_grain.norm = frac * t_norms[0] + (1. - frac) * t_norms[1] + new_grain.phas = phas_acc + #print t, step, new_grain.norm + #print t, step, phas_acc + + # psola + samples = p.rdo(new_grain) + if t > warmup: # skip the first few frames to warm up phase vocoder + # write to sink + sink_out(samples, hop_s) + + # calculate phase advance + dphas = t_phases[1] - t_phases[0] - phi_advance + # unwrap angle to [-pi; pi] + dphas = unwrap2pi(dphas) + # cumulate phase, to be used for next frame + phas_acc += phi_advance + dphas + +for t in range(warmup + 1): # purge the last frames from the phase vocoder + new_grain.norm[:] = 0 + new_grain.phas[:] = 0 + samples = p.rdo(new_grain) + sink_out(samples, read if t == warmup else hop_s) + +# just to make sure +#sink_out.close() + +format_out = "read {:d} blocks from {:s} at {:d}Hz and rate {:f}, wrote {:d} blocks to {:s}" +print (format_out.format(block_read, source_filename, samplerate, rate, + len(steps), output_filename)) -- 2.11.0