From 43206799c120845686eb1a6f4154199146dc1644 Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Sun, 12 Jan 2014 02:58:06 -0400 Subject: [PATCH] lib/aubio/slicing.py: allow any regions, overlaping or not, add more tests --- python/lib/aubio/slicing.py | 71 +++++++++++++++++++++++++------------------- python/tests/test_slicing.py | 55 ++++++++++++++++++++++++++++++++-- 2 files changed, 94 insertions(+), 32 deletions(-) diff --git a/python/lib/aubio/slicing.py b/python/lib/aubio/slicing.py index 78d41988..9107fb14 100644 --- a/python/lib/aubio/slicing.py +++ b/python/lib/aubio/slicing.py @@ -14,11 +14,15 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None, if timestamps[0] != 0: timestamps = [0] + timestamps - if timestamps_end != None and len(timestamps_end) != len(timestamps): - raise ValueError ("len(timestamps_end) != len(timestamps)") + if timestamps_end != None: + if len(timestamps_end) != len(timestamps): + raise ValueError ("len(timestamps_end) != len(timestamps)") else: timestamps_end = [t - 1 for t in timestamps[1:] ] + [ max_timestamp ] + regions = zip(timestamps, timestamps_end) + #print regions + source_base_name, source_ext = os.path.splitext(os.path.basename(source_file)) if output_dir != None: if not os.path.isdir(output_dir): @@ -32,41 +36,48 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None, # reopen source file s = source(source_file, samplerate, hopsize) - if samplerate == 0: samplerate = s.get_samplerate() - total_frames = 0 - # get next region - start_stamp = int(timestamps.pop(0)) - end_stamp = int(timestamps_end.pop(0)) + samplerate = s.get_samplerate() - # create first sink - new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate) - #print "new slice", total_frames, "+", remaining, "=", end_stamp - g = sink(new_sink_path, samplerate) + total_frames = 0 + slices = [] while True: # get hopsize new samples from source vec, read = s() - # number of samples until end of region - remaining = end_stamp - total_frames - # not enough frames remaining, time to split - if remaining < read: - if remaining != 0: - # write remaining samples from current region - g(vec[0:remaining], remaining) - # close this file - del g - # get the next region - start_stamp = int(timestamps.pop(0)) - end_stamp = int(timestamps_end.pop(0)) - # create a new file for the new region + # if the total number of frames read will exceed the next region start + if len(regions) and total_frames + read >= regions[0][0]: + #print "getting", regions[0], "at", total_frames + # get next region + start_stamp, end_stamp = regions.pop(0) + # create a name for the sink new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate) - #print "new slice", total_frames, "+", remaining, "=", end_stamp + # create its sink g = sink(new_sink_path, samplerate) - # write the remaining samples in the new file - g(vec[remaining:read], read - remaining) - elif read > 0: - # write all the samples - g(vec[0:read], read) + # create a dictionary containing all this + new_slice = {'start_stamp': start_stamp, 'end_stamp': end_stamp, 'sink': g} + # append the dictionary to the current list of slices + slices.append(new_slice) + + for current_slice in slices: + start_stamp = current_slice['start_stamp'] + end_stamp = current_slice['end_stamp'] + g = current_slice['sink'] + # sample index to start writing from new source vector + start = max(start_stamp - total_frames, 0) + # number of samples yet to written be until end of region + remaining = end_stamp - total_frames + 1 + #print current_slice, remaining, start + # not enough frames remaining, time to split + if remaining < read: + if remaining > start: + # write remaining samples from current region + g(vec[start:remaining], remaining - start) + #print "closing region", "remaining", remaining + # close this file + del g + elif read > start: + # write all the samples + g(vec[start:read], read - start) total_frames += read if read < hopsize: break diff --git a/python/tests/test_slicing.py b/python/tests/test_slicing.py index 986918f1..8151d617 100755 --- a/python/tests/test_slicing.py +++ b/python/tests/test_slicing.py @@ -9,7 +9,7 @@ from utils import * import tempfile import shutil -n_slices = 8 +n_slices = 4 class aubio_slicing_test_case(TestCase): @@ -33,7 +33,6 @@ class aubio_slicing_test_case(TestCase): def test_slice_start_every_blocksize(self): hopsize = 200 regions_start = [i*hopsize for i in range(1, n_slices)] - regions_start += [count_samples_in_file(self.source_file) + 1000] slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir, hopsize = 200) @@ -47,6 +46,55 @@ class aubio_slicing_test_case(TestCase): "number of samples written different from number of original samples") shutil.rmtree(self.output_dir) +class aubio_slicing_with_ends_test_case(TestCase): + + def setUp(self): + self.source_file = get_default_test_sound(self) + self.output_dir = tempfile.mkdtemp(suffix = 'aubio_slicing_test_case') + + def test_slice_start_and_ends_no_gap(self): + regions_start = [i*1000 for i in range(n_slices)] + regions_ends = [start - 1 for start in regions_start[1:]] + [1e120] + slice_source_at_stamps(self.source_file, regions_start, regions_ends, + output_dir = self.output_dir) + original_samples = count_samples_in_file(self.source_file) + written_samples = count_samples_in_directory(self.output_dir) + total_files = count_files_in_directory(self.output_dir) + assert_equal(n_slices, total_files, + "number of slices created different from expected") + assert_equal(written_samples, original_samples, + "number of samples written different from number of original samples") + + def test_slice_start_and_ends_200_gap(self): + regions_start = [i*1000 for i in range(n_slices)] + regions_ends = [start + 199 for start in regions_start] + slice_source_at_stamps(self.source_file, regions_start, regions_ends, + output_dir = self.output_dir) + expected_samples = 200 * n_slices + written_samples = count_samples_in_directory(self.output_dir) + total_files = count_files_in_directory(self.output_dir) + assert_equal(n_slices, total_files, + "number of slices created different from expected") + assert_equal(written_samples, expected_samples, + "number of samples written different from number of original samples") + + def test_slice_start_and_ends_overlaping(self): + regions_start = [i*1000 for i in range(n_slices)] + regions_ends = [start + 1199 for start in regions_start] + slice_source_at_stamps(self.source_file, regions_start, regions_ends, + output_dir = self.output_dir) + expected_samples = 1200 * n_slices + written_samples = count_samples_in_directory(self.output_dir) + total_files = count_files_in_directory(self.output_dir) + assert_equal(n_slices, total_files, + "number of slices created different from expected") + assert_equal(written_samples, expected_samples, + "number of samples written different from number of original samples") + + def tearDown(self): + shutil.rmtree(self.output_dir) + + class aubio_slicing_wrong_starts_test_case(TestCase): def setUp(self): @@ -86,6 +134,9 @@ class aubio_slicing_wrong_ends_test_case(TestCase): regions_end = None slice_source_at_stamps (self.source_file, regions_start, regions_end, output_dir = self.output_dir) + total_files = count_files_in_directory(self.output_dir) + assert_equal(n_slices, total_files, + "number of slices created different from expected") original_samples = count_samples_in_file(self.source_file) written_samples = count_samples_in_directory(self.output_dir) assert_equal(written_samples, original_samples, -- 2.11.0