#! /usr/bin/env python
-from numpy.testing import TestCase, assert_equal, assert_almost_equal
+from nose2 import main
+from nose2.tools import params
+from numpy.testing import TestCase
from aubio import fvec, source, sink
-from numpy import array
-from utils import list_all_sounds
+from .utils import list_all_sounds, get_tmp_sink_path, del_tmp_sink_path
+
+import warnings
+warnings.filterwarnings('ignore', category=UserWarning, append=True)
list_of_sounds = list_all_sounds('sounds')
+samplerates = [0, 44100, 8000, 32000]
+hop_sizes = [512, 1024, 64]
+
path = None
+many_files = 300 # 256 opened files is too much
+
+all_params = []
+for soundfile in list_of_sounds:
+ for hop_size in hop_sizes:
+ for samplerate in samplerates:
+ all_params.append((hop_size, samplerate, soundfile))
+
class aubio_sink_test_case(TestCase):
def setUp(self):
- if not len(list_of_sounds): self.skipTest('add some sound files in \'python/tests/sounds\'')
+ if not len(list_of_sounds):
+ self.skipTest('add some sound files in \'python/tests/sounds\'')
+
+ def test_wrong_filename(self):
+ with self.assertRaises(RuntimeError):
+ sink('')
+
+ def test_wrong_samplerate(self):
+ with self.assertRaises(RuntimeError):
+ sink(get_tmp_sink_path(), -1)
+
+ def test_wrong_samplerate_too_large(self):
+ with self.assertRaises(RuntimeError):
+ sink(get_tmp_sink_path(), 1536001, 2)
+
+ def test_wrong_channels(self):
+ with self.assertRaises(RuntimeError):
+ sink(get_tmp_sink_path(), 44100, -1)
+
+ def test_wrong_channels_too_large(self):
+ with self.assertRaises(RuntimeError):
+ sink(get_tmp_sink_path(), 44100, 202020)
def test_many_sinks(self):
- for i in range(100):
- g = sink('/tmp/f.wav', 0)
- write = 256
- for n in range(200):
+ from tempfile import mkdtemp
+ import os.path
+ import shutil
+ tmpdir = mkdtemp()
+ sink_list = []
+ for i in range(many_files):
+ path = os.path.join(tmpdir, 'f-' + str(i) + '.wav')
+ g = sink(path, 0)
+ sink_list.append(g)
+ write = 32
+ for _ in range(200):
vec = fvec(write)
g(vec, write)
- del g
-
- def test_read(self):
- for path in list_of_sounds:
- for samplerate, hop_size in zip([0, 44100, 8000, 32000], [512, 1024, 64, 256]):
- f = source(path, samplerate, hop_size)
- if samplerate == 0: samplerate = f.samplerate
- g = sink('/tmp/f.wav', samplerate)
- total_frames = 0
- while True:
- vec, read = f()
- g(vec, read)
- total_frames += read
- if read < f.hop_size: break
- print "read", "%.2fs" % (total_frames / float(f.samplerate) ),
- print "(", total_frames, "frames", "in",
- print total_frames / f.hop_size, "blocks", "at", "%dHz" % f.samplerate, ")",
- print "from", f.uri,
- print "to", g.uri
- #del f, g
+ g.close()
+ shutil.rmtree(tmpdir)
+
+ @params(*all_params)
+ def test_read_and_write(self, hop_size, samplerate, path):
+
+ try:
+ f = source(path, samplerate, hop_size)
+ except RuntimeError as e:
+ self.skipTest('failed opening with hop_s = {:d}, samplerate = {:d} ({:s})'.format(hop_size, samplerate, str(e)))
+ if samplerate == 0: samplerate = f.samplerate
+ sink_path = get_tmp_sink_path()
+ g = sink(sink_path, samplerate)
+ total_frames = 0
+ while True:
+ vec, read = f()
+ g(vec, read)
+ total_frames += read
+ if read < f.hop_size: break
+ del_tmp_sink_path(sink_path)
+
+ @params(*all_params)
+ def test_read_and_write_multi(self, hop_size, samplerate, path):
+ try:
+ f = source(path, samplerate, hop_size)
+ except RuntimeError as e:
+ self.skipTest('failed opening with hop_s = {:d}, samplerate = {:d} ({:s})'.format(hop_size, samplerate, str(e)))
+ if samplerate == 0: samplerate = f.samplerate
+ sink_path = get_tmp_sink_path()
+ g = sink(sink_path, samplerate, channels = f.channels)
+ total_frames = 0
+ while True:
+ vec, read = f.do_multi()
+ g.do_multi(vec, read)
+ total_frames += read
+ if read < f.hop_size: break
+ del_tmp_sink_path(sink_path)
+
+ def test_close_file(self):
+ samplerate = 44100
+ sink_path = get_tmp_sink_path()
+ g = sink(sink_path, samplerate)
+ g.close()
+ del_tmp_sink_path(sink_path)
+
+ def test_close_file_twice(self):
+ samplerate = 44100
+ sink_path = get_tmp_sink_path()
+ g = sink(sink_path, samplerate)
+ g.close()
+ g.close()
+ del_tmp_sink_path(sink_path)
+
+ def test_read_with(self):
+ sink_path =get_tmp_sink_path()
+ vec = fvec(128)
+ with sink(sink_path, samplerate) as g:
+ for i in range(10):
+ g(vec, 128)
if __name__ == '__main__':
- from unittest import main
main()