lib/aubio/slicing.py: allow any regions, overlaping or not, add more tests
authorPaul Brossier <piem@piem.org>
Sun, 12 Jan 2014 06:58:06 +0000 (02:58 -0400)
committerPaul Brossier <piem@piem.org>
Sun, 12 Jan 2014 06:58:06 +0000 (02:58 -0400)
python/lib/aubio/slicing.py
python/tests/test_slicing.py

index 78d4198..9107fb1 100644 (file)
@@ -14,11 +14,15 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None,
     if timestamps[0] != 0:
         timestamps = [0] + timestamps
 
-    if timestamps_end != None and len(timestamps_end) != len(timestamps):
-        raise ValueError ("len(timestamps_end) != len(timestamps)")
+    if timestamps_end != None:
+        if len(timestamps_end) != len(timestamps):
+            raise ValueError ("len(timestamps_end) != len(timestamps)")
     else:
         timestamps_end = [t - 1 for t in timestamps[1:] ] + [ max_timestamp ]
 
+    regions = zip(timestamps, timestamps_end)
+    #print regions
+
     source_base_name, source_ext = os.path.splitext(os.path.basename(source_file))
     if output_dir != None:
         if not os.path.isdir(output_dir):
@@ -32,41 +36,48 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None,
 
     # reopen source file
     s = source(source_file, samplerate, hopsize)
-    if samplerate == 0: samplerate = s.get_samplerate()
-    total_frames = 0
-    # get next region
-    start_stamp = int(timestamps.pop(0))
-    end_stamp = int(timestamps_end.pop(0))
+    samplerate = s.get_samplerate()
 
-    # create first sink
-    new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
-    #print "new slice", total_frames, "+", remaining, "=", end_stamp
-    g = sink(new_sink_path, samplerate)
+    total_frames = 0
+    slices = []
 
     while True:
         # get hopsize new samples from source
         vec, read = s()
-        # number of samples until end of region
-        remaining = end_stamp - total_frames
-        # not enough frames remaining, time to split
-        if remaining < read:
-            if remaining != 0:
-                # write remaining samples from current region
-                g(vec[0:remaining], remaining)
-            # close this file
-            del g
-            # get the next region
-            start_stamp = int(timestamps.pop(0))
-            end_stamp = int(timestamps_end.pop(0))
-            # create a new file for the new region
+        # if the total number of frames read will exceed the next region start
+        if len(regions) and total_frames + read >= regions[0][0]:
+            #print "getting", regions[0], "at", total_frames
+            # get next region
+            start_stamp, end_stamp = regions.pop(0)
+            # create a name for the sink
             new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
-            #print "new slice", total_frames, "+", remaining, "=", end_stamp
+            # create its sink
             g = sink(new_sink_path, samplerate)
-            # write the remaining samples in the new file
-            g(vec[remaining:read], read - remaining)
-        elif read > 0:
-            # write all the samples
-            g(vec[0:read], read)
+            # create a dictionary containing all this
+            new_slice = {'start_stamp': start_stamp, 'end_stamp': end_stamp, 'sink': g}
+            # append the dictionary to the current list of slices
+            slices.append(new_slice)
+
+        for current_slice in slices:
+            start_stamp = current_slice['start_stamp']
+            end_stamp = current_slice['end_stamp']
+            g = current_slice['sink']
+            # sample index to start writing from new source vector
+            start = max(start_stamp - total_frames, 0)
+            # number of samples yet to written be until end of region
+            remaining = end_stamp - total_frames + 1
+            #print current_slice, remaining, start
+            # not enough frames remaining, time to split
+            if remaining < read:
+                if remaining > start:
+                    # write remaining samples from current region
+                    g(vec[start:remaining], remaining - start)
+                    #print "closing region", "remaining", remaining
+                    # close this file
+                    del g
+            elif read > start:
+                # write all the samples
+                g(vec[start:read], read - start)
         total_frames += read
         if read < hopsize: break
 
index 986918f..8151d61 100755 (executable)
@@ -9,7 +9,7 @@ from utils import *
 import tempfile
 import shutil
 
-n_slices = 8
+n_slices = 4
 
 class aubio_slicing_test_case(TestCase):
 
@@ -33,7 +33,6 @@ class aubio_slicing_test_case(TestCase):
     def test_slice_start_every_blocksize(self):
         hopsize = 200
         regions_start = [i*hopsize for i in range(1, n_slices)]
-        regions_start += [count_samples_in_file(self.source_file) + 1000]
         slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir,
                 hopsize = 200)
 
@@ -47,6 +46,55 @@ class aubio_slicing_test_case(TestCase):
             "number of samples written different from number of original samples")
         shutil.rmtree(self.output_dir)
 
+class aubio_slicing_with_ends_test_case(TestCase):
+
+    def setUp(self):
+        self.source_file = get_default_test_sound(self)
+        self.output_dir = tempfile.mkdtemp(suffix = 'aubio_slicing_test_case')
+
+    def test_slice_start_and_ends_no_gap(self):
+        regions_start = [i*1000 for i in range(n_slices)]
+        regions_ends = [start - 1 for start in regions_start[1:]] + [1e120]
+        slice_source_at_stamps(self.source_file, regions_start, regions_ends,
+                output_dir = self.output_dir)
+        original_samples = count_samples_in_file(self.source_file)
+        written_samples = count_samples_in_directory(self.output_dir)
+        total_files = count_files_in_directory(self.output_dir)
+        assert_equal(n_slices, total_files,
+            "number of slices created different from expected")
+        assert_equal(written_samples, original_samples,
+            "number of samples written different from number of original samples")
+
+    def test_slice_start_and_ends_200_gap(self):
+        regions_start = [i*1000 for i in range(n_slices)]
+        regions_ends = [start + 199 for start in regions_start]
+        slice_source_at_stamps(self.source_file, regions_start, regions_ends,
+                output_dir = self.output_dir)
+        expected_samples = 200 * n_slices
+        written_samples = count_samples_in_directory(self.output_dir)
+        total_files = count_files_in_directory(self.output_dir)
+        assert_equal(n_slices, total_files,
+            "number of slices created different from expected")
+        assert_equal(written_samples, expected_samples,
+            "number of samples written different from number of original samples")
+
+    def test_slice_start_and_ends_overlaping(self):
+        regions_start = [i*1000 for i in range(n_slices)]
+        regions_ends = [start + 1199 for start in regions_start]
+        slice_source_at_stamps(self.source_file, regions_start, regions_ends,
+                output_dir = self.output_dir)
+        expected_samples = 1200 * n_slices
+        written_samples = count_samples_in_directory(self.output_dir)
+        total_files = count_files_in_directory(self.output_dir)
+        assert_equal(n_slices, total_files,
+            "number of slices created different from expected")
+        assert_equal(written_samples, expected_samples,
+            "number of samples written different from number of original samples")
+
+    def tearDown(self):
+        shutil.rmtree(self.output_dir)
+
+
 class aubio_slicing_wrong_starts_test_case(TestCase):
 
     def setUp(self):
@@ -86,6 +134,9 @@ class aubio_slicing_wrong_ends_test_case(TestCase):
         regions_end = None
         slice_source_at_stamps (self.source_file, regions_start, regions_end,
                 output_dir = self.output_dir)
+        total_files = count_files_in_directory(self.output_dir)
+        assert_equal(n_slices, total_files,
+            "number of slices created different from expected")
         original_samples = count_samples_in_file(self.source_file)
         written_samples = count_samples_in_directory(self.output_dir)
         assert_equal(written_samples, original_samples,