From: Paul Brossier Date: Sun, 12 Jan 2014 01:59:49 +0000 (-0400) Subject: python/lib/aubio/slicing.py: rewrite slicing loop from aubiocut, add some tests X-Git-Tag: 0.4.1~87 X-Git-Url: https://git.aubio.org/?a=commitdiff_plain;h=88432a9429e344b282b59f96a8de7d8aaf7a5548;p=aubio.git python/lib/aubio/slicing.py: rewrite slicing loop from aubiocut, add some tests --- diff --git a/python/lib/aubio/slicing.py b/python/lib/aubio/slicing.py new file mode 100644 index 00000000..c60e674d --- /dev/null +++ b/python/lib/aubio/slicing.py @@ -0,0 +1,56 @@ +from aubio import source, sink +import os + +def slice_source_at_stamps(source_file, timestamps, timestamps_end = None, + output_dir = None, + samplerate = 0, + hopsize = 256): + + source_base_name, source_ext = os.path.splitext(os.path.basename(source_file)) + if output_dir != None: + if not os.path.isdir(output_dir): + os.makedirs(output_dir) + source_base_name = os.path.join(output_dir, source_base_name) + + def new_sink_name(source_base_name, timestamp): + return source_base_name + '_%02.3f' % (timestamp) + '.wav' + + # reopen source file + s = source(source_file, samplerate, hopsize) + if samplerate == 0: samplerate = s.get_samplerate() + # create first sink at 0 + g = sink(new_sink_name(source_base_name, 0.), samplerate) + total_frames = 0 + # get next region + next_stamp = int(timestamps.pop(0)) + if not next_stamp: + next_stamp = int(timestamps.pop(0)) + + while True: + # get hopsize new samples from source + vec, read = s() + remaining = next_stamp - total_frames + # not enough frames remaining, time to split + if remaining <= read: + if remaining != 0: + # write remaining samples from current region + g(vec[0:remaining], remaining) + # close this file + del g + # create a new file for the new region + new_sink_path = new_sink_name(source_base_name, next_stamp / float(samplerate)) + #print "new slice", total_frames, "+", remaining, "=", next_stamp + g = sink(new_sink_path, samplerate) + if remaining != read: + # write the remaining samples in the new file + g(vec[remaining:read], read - remaining) + if len(timestamps): + next_stamp = int(timestamps.pop(0)) + else: + next_stamp = 1e120 + else: + g(vec[0:read], read) + total_frames += read + if read < hopsize: break + + del g diff --git a/python/tests/test_slicing.py b/python/tests/test_slicing.py new file mode 100755 index 00000000..3c0750e9 --- /dev/null +++ b/python/tests/test_slicing.py @@ -0,0 +1,41 @@ +#! /usr/bin/env python + +from numpy.testing import TestCase, run_module_suite +from numpy.testing import assert_equal, assert_almost_equal + +from aubio import slice_source_at_stamps +from utils import count_samples_in_file, count_samples_in_directory + +import tempfile +import shutil + +class aubio_slicing_test_case(TestCase): + + def setUp(self): + self.source_file = 'chocolate_1min.wav' + self.output_dir = tempfile.mkdtemp(suffix = 'aubio_slicing_test_case') + + def test_slice_start_only(self): + regions_start = [i*1000 for i in range(100)] + slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir) + + def test_slice_start_only_no_zero(self): + regions_start = [i*1000 for i in range(1, 100)] + slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir) + + def test_slice_start_beyond_end(self): + regions_start = [i*1000 for i in range(1, 100)] + regions_start += [count_samples_in_file(self.source_file)] + regions_start += [count_samples_in_file(self.source_file) + 1000] + slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir) + + def tearDown(self): + original_samples = count_samples_in_file(self.source_file) + written_samples = count_samples_in_directory(self.output_dir) + assert_equal(original_samples, written_samples, + "number samples written different from number of original samples") + shutil.rmtree(self.output_dir) + +if __name__ == '__main__': + from unittest import main + main() diff --git a/python/tests/utils.py b/python/tests/utils.py index fda8cb79..9ada420b 100644 --- a/python/tests/utils.py +++ b/python/tests/utils.py @@ -18,3 +18,25 @@ def array_from_yaml_file(filename): yaml_data = yaml.safe_load(f) f.close() return yaml_data + +def count_samples_in_file(file_path): + from aubio import source + hopsize = 256 + s = source(file_path, 0, hopsize) + total_frames = 0 + while True: + samples, read = s() + total_frames += read + if read < hopsize: break + return total_frames + +def count_samples_in_directory(samples_dir): + import os + total_frames = 0 + for f in os.walk(samples_dir): + if len(f[2]): + for each in f[2]: + file_path = os.path.join(f[0], each) + if file_path: + total_frames += count_samples_in_file(file_path) + return total_frames