[tests] also capture expected source warnings in test_sink
[aubio.git] / python / tests / test_sink.py
index 54749d8..9516fe4 100755 (executable)
@@ -1,16 +1,46 @@
 #! /usr/bin/env python
 
-from numpy.testing import TestCase, assert_equal, assert_almost_equal
+from numpy.testing import TestCase
 from aubio import fvec, source, sink
-from numpy import array
 from utils import list_all_sounds, get_tmp_sink_path, del_tmp_sink_path
+from utils import parse_file_samplerate
+from _tools import parametrize, skipTest, assert_raises, assert_warns
 
 list_of_sounds = list_all_sounds('sounds')
+samplerates = [0, 44100, 8000, 32000]
+hop_sizes = [512, 1024, 64]
+
 path = None
 
 many_files = 300 # 256 opened files is too much
 
-class aubio_sink_test_case(TestCase):
+all_params = []
+for soundfile in list_of_sounds:
+    for hop_size in hop_sizes:
+        for samplerate in samplerates:
+            all_params.append((hop_size, samplerate, soundfile))
+
+class Test_aubio_sink(object):
+
+    def test_wrong_filename(self):
+        with assert_raises(RuntimeError):
+            sink('')
+
+    def test_wrong_samplerate(self):
+        with assert_raises(RuntimeError):
+            sink(get_tmp_sink_path(), -1)
+
+    def test_wrong_samplerate_too_large(self):
+        with assert_raises(RuntimeError):
+            sink(get_tmp_sink_path(), 1536001, 2)
+
+    def test_wrong_channels(self):
+        with assert_raises(RuntimeError):
+            sink(get_tmp_sink_path(), 44100, -1)
+
+    def test_wrong_channels_too_large(self):
+        with assert_raises(RuntimeError):
+            sink(get_tmp_sink_path(), 44100, 202020)
 
     def test_many_sinks(self):
         from tempfile import mkdtemp
@@ -23,86 +53,59 @@ class aubio_sink_test_case(TestCase):
             g = sink(path, 0)
             sink_list.append(g)
             write = 32
-            for n in range(200):
+            for _ in range(200):
                 vec = fvec(write)
                 g(vec, write)
             g.close()
         shutil.rmtree(tmpdir)
 
-    def test_many_sinks_not_closed(self):
-        from tempfile import mkdtemp
-        import os.path
-        import shutil
-        tmpdir = mkdtemp()
-        sink_list = []
+    @parametrize('hop_size, samplerate, path', all_params)
+    def test_read_and_write(self, hop_size, samplerate, path):
+        orig_samplerate = parse_file_samplerate(soundfile)
         try:
-            for i in range(many_files):
-                path = os.path.join(tmpdir, 'f-' + str(i) + '.wav')
-                g = sink(path, 0)
-                sink_list.append(g)
-                write = 256
-                for n in range(200):
-                    vec = fvec(write)
-                    g(vec, write)
-        except StandardError:
-            pass
-        else:
-            self.fail("does not fail on too many files open")
-        for g in sink_list:
-            g.close()
-        shutil.rmtree(tmpdir)
+            if orig_samplerate is not None and orig_samplerate < samplerate:
+                # upsampling should emit a warning
+                with assert_warns(UserWarning):
+                    f = source(soundfile, samplerate, hop_size)
+            else:
+                f = source(soundfile, samplerate, hop_size)
+        except RuntimeError as e:
+            err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
+            skipTest(err_msg.format(str(e), hop_size, samplerate))
+        if samplerate == 0: samplerate = f.samplerate
+        sink_path = get_tmp_sink_path()
+        g = sink(sink_path, samplerate)
+        total_frames = 0
+        while True:
+            vec, read = f()
+            g(vec, read)
+            total_frames += read
+            if read < f.hop_size: break
+        del_tmp_sink_path(sink_path)
 
-    def test_read_and_write(self):
-
-        if not len(list_of_sounds):
-            self.skipTest('add some sound files in \'python/tests/sounds\'')
-
-        for path in list_of_sounds:
-            for samplerate, hop_size in zip([0, 44100, 8000, 32000], [512, 1024, 64, 256]):
-                f = source(path, samplerate, hop_size)
-                if samplerate == 0: samplerate = f.samplerate
-                sink_path = get_tmp_sink_path()
-                g = sink(sink_path, samplerate)
-                total_frames = 0
-                while True:
-                    vec, read = f()
-                    g(vec, read)
-                    total_frames += read
-                    if read < f.hop_size: break
-                if 0:
-                    print "read", "%.2fs" % (total_frames / float(f.samplerate) ),
-                    print "(", total_frames, "frames", "in",
-                    print total_frames / f.hop_size, "blocks", "at", "%dHz" % f.samplerate, ")",
-                    print "from", f.uri,
-                    print "to", g.uri
-                del_tmp_sink_path(sink_path)
-
-    def test_read_and_write_multi(self):
-
-        if not len(list_of_sounds):
-            self.skipTest('add some sound files in \'python/tests/sounds\'')
-
-        for path in list_of_sounds:
-            for samplerate, hop_size in zip([0, 44100, 8000, 32000], [512, 1024, 64, 256]):
-                f = source(path, samplerate, hop_size)
-                if samplerate == 0: samplerate = f.samplerate
-                sink_path = get_tmp_sink_path()
-                g = sink(sink_path, samplerate, channels = f.channels)
-                total_frames = 0
-                while True:
-                    vec, read = f.do_multi()
-                    g.do_multi(vec, read)
-                    total_frames += read
-                    if read < f.hop_size: break
-                if 0:
-                    print "read", "%.2fs" % (total_frames / float(f.samplerate) ),
-                    print "(", total_frames, "frames", "in",
-                    print f.channels, "channels", "in",
-                    print total_frames / f.hop_size, "blocks", "at", "%dHz" % f.samplerate, ")",
-                    print "from", f.uri,
-                    print "to", g.uri,
-                    print "in", g.channels, "channels"
-                del_tmp_sink_path(sink_path)
+    @parametrize('hop_size, samplerate, path', all_params)
+    def test_read_and_write_multi(self, hop_size, samplerate, path):
+        orig_samplerate = parse_file_samplerate(soundfile)
+        try:
+            if orig_samplerate is not None and orig_samplerate < samplerate:
+                # upsampling should emit a warning
+                with assert_warns(UserWarning):
+                    f = source(soundfile, samplerate, hop_size)
+            else:
+                f = source(soundfile, samplerate, hop_size)
+        except RuntimeError as e:
+            err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
+            skipTest(err_msg.format(str(e), hop_size, samplerate))
+        if samplerate == 0: samplerate = f.samplerate
+        sink_path = get_tmp_sink_path()
+        g = sink(sink_path, samplerate, channels = f.channels)
+        total_frames = 0
+        while True:
+            vec, read = f.do_multi()
+            g.do_multi(vec, read)
+            total_frames += read
+            if read < f.hop_size: break
+        del_tmp_sink_path(sink_path)
 
     def test_close_file(self):
         samplerate = 44100
@@ -119,6 +122,14 @@ class aubio_sink_test_case(TestCase):
         g.close()
         del_tmp_sink_path(sink_path)
 
+    def test_read_with(self):
+        samplerate = 44100
+        sink_path = get_tmp_sink_path()
+        vec = fvec(128)
+        with sink(sink_path, samplerate) as g:
+            for _ in range(10):
+                g(vec, 128)
+
 if __name__ == '__main__':
-    from unittest import main
-    main()
+    from _tools import run_module_suite
+    run_module_suite()