python/lib/aubio/slicing.py: use start and end stamps, make sure read > 0, improve...
authorPaul Brossier <piem@piem.org>
Sun, 12 Jan 2014 04:54:42 +0000 (00:54 -0400)
committerPaul Brossier <piem@piem.org>
Sun, 12 Jan 2014 04:54:42 +0000 (00:54 -0400)
python/lib/aubio/slicing.py
python/tests/test_slicing.py
python/tests/utils.py

index 9e0af53..78d4198 100644 (file)
@@ -1,6 +1,8 @@
 from aubio import source, sink
 import os
 
+max_timestamp = 1e120
+
 def slice_source_at_stamps(source_file, timestamps, timestamps_end = None,
         output_dir = None,
         samplerate = 0,
@@ -9,8 +11,13 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None,
     if timestamps == None or len(timestamps) == 0:
         raise ValueError ("no timestamps given")
 
+    if timestamps[0] != 0:
+        timestamps = [0] + timestamps
+
     if timestamps_end != None and len(timestamps_end) != len(timestamps):
         raise ValueError ("len(timestamps_end) != len(timestamps)")
+    else:
+        timestamps_end = [t - 1 for t in timestamps[1:] ] + [ max_timestamp ]
 
     source_base_name, source_ext = os.path.splitext(os.path.basename(source_file))
     if output_dir != None:
@@ -18,22 +25,29 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None,
             os.makedirs(output_dir)
         source_base_name = os.path.join(output_dir, source_base_name)
 
-    def new_sink_name(source_base_name, timestamp):
-        return source_base_name + '_%02.3f' % (timestamp) + '.wav'
+    def new_sink_name(source_base_name, timestamp, samplerate):
+        timestamp_seconds = timestamp / float(samplerate)
+        #print source_base_name + '_%02.3f' % (timestamp_seconds) + '.wav'
+        return source_base_name + '_%02.3f' % (timestamp_seconds) + '.wav'
 
     # reopen source file
     s = source(source_file, samplerate, hopsize)
     if samplerate == 0: samplerate = s.get_samplerate()
-    # create first sink at 0
-    g = sink(new_sink_name(source_base_name, 0.), samplerate)
     total_frames = 0
     # get next region
-    next_stamp = int(timestamps.pop(0))
+    start_stamp = int(timestamps.pop(0))
+    end_stamp = int(timestamps_end.pop(0))
+
+    # create first sink
+    new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
+    #print "new slice", total_frames, "+", remaining, "=", end_stamp
+    g = sink(new_sink_path, samplerate)
 
     while True:
         # get hopsize new samples from source
         vec, read = s()
-        remaining = next_stamp - total_frames
+        # number of samples until end of region
+        remaining = end_stamp - total_frames
         # not enough frames remaining, time to split
         if remaining < read:
             if remaining != 0:
@@ -41,17 +55,17 @@ def slice_source_at_stamps(source_file, timestamps, timestamps_end = None,
                 g(vec[0:remaining], remaining)
             # close this file
             del g
+            # get the next region
+            start_stamp = int(timestamps.pop(0))
+            end_stamp = int(timestamps_end.pop(0))
             # create a new file for the new region
-            new_sink_path = new_sink_name(source_base_name, next_stamp / float(samplerate))
-            #print "new slice", total_frames, "+", remaining, "=", next_stamp
+            new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
+            #print "new slice", total_frames, "+", remaining, "=", end_stamp
             g = sink(new_sink_path, samplerate)
             # write the remaining samples in the new file
             g(vec[remaining:read], read - remaining)
-            if len(timestamps):
-                next_stamp = int(timestamps.pop(0))
-            else:
-                next_stamp = 1e120
-        else:
+        elif read > 0:
+            # write all the samples
             g(vec[0:read], read)
         total_frames += read
         if read < hopsize: break
index 3316708..986918f 100755 (executable)
@@ -4,8 +4,7 @@ from numpy.testing import TestCase, run_module_suite
 from numpy.testing import assert_equal, assert_almost_equal
 
 from aubio import slice_source_at_stamps
-from utils import count_samples_in_file, count_samples_in_directory
-from utils import get_default_test_sound
+from utils import *
 
 import tempfile
 import shutil
@@ -28,13 +27,22 @@ class aubio_slicing_test_case(TestCase):
 
     def test_slice_start_beyond_end(self):
         regions_start = [i*1000 for i in range(1, n_slices)]
-        regions_start += [count_samples_in_file(self.source_file)]
         regions_start += [count_samples_in_file(self.source_file) + 1000]
         slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir)
 
+    def test_slice_start_every_blocksize(self):
+        hopsize = 200
+        regions_start = [i*hopsize for i in range(1, n_slices)]
+        regions_start += [count_samples_in_file(self.source_file) + 1000]
+        slice_source_at_stamps(self.source_file, regions_start, output_dir = self.output_dir,
+                hopsize = 200)
+
     def tearDown(self):
         original_samples = count_samples_in_file(self.source_file)
         written_samples = count_samples_in_directory(self.output_dir)
+        total_files = count_files_in_directory(self.output_dir)
+        assert_equal(n_slices, total_files,
+            "number of slices created different from expected")
         assert_equal(written_samples, original_samples,
             "number of samples written different from number of original samples")
         shutil.rmtree(self.output_dir)
@@ -67,14 +75,14 @@ class aubio_slicing_wrong_ends_test_case(TestCase):
         self.output_dir = tempfile.mkdtemp(suffix = 'aubio_slicing_test_case')
 
     def test_slice_wrong_ends(self):
-        regions_start = [i*1000 for i in range(1, 100)]
+        regions_start = [i*1000 for i in range(1, n_slices)]
         regions_end = []
         self.assertRaises (ValueError,
             slice_source_at_stamps, self.source_file, regions_start, regions_end,
                 output_dir = self.output_dir)
 
     def test_slice_no_ends(self):
-        regions_start = [i*1000 for i in range(1, 100)]
+        regions_start = [i*1000 for i in range(1, n_slices)]
         regions_end = None
         slice_source_at_stamps (self.source_file, regions_start, regions_end,
                 output_dir = self.output_dir)
index c42c608..79a0952 100644 (file)
@@ -47,3 +47,14 @@ def count_samples_in_directory(samples_dir):
                 if file_path:
                     total_frames += count_samples_in_file(file_path)
     return total_frames
+
+def count_files_in_directory(samples_dir):
+    import os
+    total_files = 0
+    for f in os.walk(samples_dir):
+        if len(f[2]):
+            for each in f[2]:
+                file_path = os.path.join(f[0], each)
+                if file_path:
+                    total_files += 1
+    return total_files