From f36eceab2fa563ccd8158c47113206994840f77d Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Sun, 12 Jan 2014 00:54:42 -0400 Subject: [PATCH] python/lib/aubio/slicing.py: use start and end stamps, make sure read > 0, improve tests --- python/lib/aubio/slicing.py | 40 +++++++++++++++++++++++++++------------- python/tests/test_slicing.py | 18 +++++++++++++----- python/tests/utils.py | 11 +++++++++++ 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/python/lib/aubio/slicing.py b/python/lib/aubio/slicing.py index 9e0af535..78d41988 100644 --- a/python/lib/aubio/slicing.py +++ b/python/lib/aubio/slicing.py @@ -1,6 +1,8 @@ from aubio import source, sink import os +max_timestamp = 1e120 + def slice_source_at_stamps(source_file, timestamps, timestamps_end = None, output_dir = None, samplerate = 0, @@ -9,8 +11,13 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None, if timestamps == None or len(timestamps) == 0: raise ValueError ("no timestamps given") + if timestamps[0] != 0: + timestamps = [0] + timestamps + if timestamps_end != None and len(timestamps_end) != len(timestamps): raise ValueError ("len(timestamps_end) != len(timestamps)") + else: + timestamps_end = [t - 1 for t in timestamps[1:] ] + [ max_timestamp ] source_base_name, source_ext = os.path.splitext(os.path.basename(source_file)) if output_dir != None: @@ -18,22 +25,29 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None, os.makedirs(output_dir) source_base_name = os.path.join(output_dir, source_base_name) - def new_sink_name(source_base_name, timestamp): - return source_base_name + '_%02.3f' % (timestamp) + '.wav' + def new_sink_name(source_base_name, timestamp, samplerate): + timestamp_seconds = timestamp / float(samplerate) + #print source_base_name + '_%02.3f' % (timestamp_seconds) + '.wav' + return source_base_name + '_%02.3f' % (timestamp_seconds) + '.wav' # reopen source file s = source(source_file, samplerate, hopsize) if samplerate == 0: samplerate = s.get_samplerate() - # create first sink at 0 - g = sink(new_sink_name(source_base_name, 0.), samplerate) total_frames = 0 # get next region - next_stamp = int(timestamps.pop(0)) + start_stamp = int(timestamps.pop(0)) + end_stamp = int(timestamps_end.pop(0)) + + # create first sink + new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate) + #print "new slice", total_frames, "+", remaining, "=", end_stamp + g = sink(new_sink_path, samplerate) while True: # get hopsize new samples from source vec, read = s() - remaining = next_stamp - total_frames + # number of samples until end of region + remaining = end_stamp - total_frames # not enough frames remaining, time to split if remaining < read: if remaining != 0: @@ -41,17 +55,17 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None, g(vec[0:remaining], remaining) # close this file del g + # get the next region + start_stamp = int(timestamps.pop(0)) + end_stamp = int(timestamps_end.pop(0)) # create a new file for the new region - new_sink_path = new_sink_name(source_base_name, next_stamp / float(samplerate)) - #print "new slice", total_frames, "+", remaining, "=", next_stamp + new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate) + #print "new slice", total_frames, "+", remaining, "=", end_stamp g = sink(new_sink_path, samplerate) # write the remaining samples in the new file g(vec[remaining:read], read - remaining) - if len(timestamps): - next_stamp = int(timestamps.pop(0)) - else: - next_stamp = 1e120 - else: + elif read > 0: + # write all the samples g(vec[0:read], read) total_frames += read if read < hopsize: break diff --git a/python/tests/test_slicing.py b/python/tests/test_slicing.py index 3316708e..986918f1 100755 --- a/python/tests/test_slicing.py +++ b/python/tests/test_slicing.py @@ -4,8 +4,7 @@ from numpy.testing import TestCase, run_module_suite from numpy.testing import assert_equal, assert_almost_equal from aubio import slice_source_at_stamps -from utils import count_samples_in_file, count_samples_in_directory -from utils import get_default_test_sound +from utils import * import tempfile import shutil @@ -28,13 +27,22 @@ class aubio_slicing_test_case(TestCase): def test_slice_start_beyond_end(self): regions_start = [i*1000 for i in range(1, n_slices)] - regions_start += [count_samples_in_file(self.source_file)] regions_start += [count_samples_in_file(self.source_file) + 1000] slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir) + def test_slice_start_every_blocksize(self): + hopsize = 200 + regions_start = [i*hopsize for i in range(1, n_slices)] + regions_start += [count_samples_in_file(self.source_file) + 1000] + slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir, + hopsize = 200) + def tearDown(self): original_samples = count_samples_in_file(self.source_file) written_samples = count_samples_in_directory(self.output_dir) + total_files = count_files_in_directory(self.output_dir) + assert_equal(n_slices, total_files, + "number of slices created different from expected") assert_equal(written_samples, original_samples, "number of samples written different from number of original samples") shutil.rmtree(self.output_dir) @@ -67,14 +75,14 @@ class aubio_slicing_wrong_ends_test_case(TestCase): self.output_dir = tempfile.mkdtemp(suffix = 'aubio_slicing_test_case') def test_slice_wrong_ends(self): - regions_start = [i*1000 for i in range(1, 100)] + regions_start = [i*1000 for i in range(1, n_slices)] regions_end = [] self.assertRaises (ValueError, slice_source_at_stamps, self.source_file, regions_start, regions_end, output_dir = self.output_dir) def test_slice_no_ends(self): - regions_start = [i*1000 for i in range(1, 100)] + regions_start = [i*1000 for i in range(1, n_slices)] regions_end = None slice_source_at_stamps (self.source_file, regions_start, regions_end, output_dir = self.output_dir) diff --git a/python/tests/utils.py b/python/tests/utils.py index c42c6080..79a0952a 100644 --- a/python/tests/utils.py +++ b/python/tests/utils.py @@ -47,3 +47,14 @@ def count_samples_in_directory(samples_dir): if file_path: total_frames += count_samples_in_file(file_path) return total_frames + +def count_files_in_directory(samples_dir): + import os + total_files = 0 + for f in os.walk(samples_dir): + if len(f[2]): + for each in f[2]: + file_path = os.path.join(f[0], each) + if file_path: + total_files += 1 + return total_files -- 2.11.0