9516fe4f139101854a059331783daed0c8a3cbab
[aubio.git] / python / tests / test_sink.py
1 #! /usr/bin/env python
2
3 from numpy.testing import TestCase
4 from aubio import fvec, source, sink
5 from utils import list_all_sounds, get_tmp_sink_path, del_tmp_sink_path
6 from utils import parse_file_samplerate
7 from _tools import parametrize, skipTest, assert_raises, assert_warns
8
9 list_of_sounds = list_all_sounds('sounds')
10 samplerates = [0, 44100, 8000, 32000]
11 hop_sizes = [512, 1024, 64]
12
13 path = None
14
15 many_files = 300 # 256 opened files is too much
16
17 all_params = []
18 for soundfile in list_of_sounds:
19     for hop_size in hop_sizes:
20         for samplerate in samplerates:
21             all_params.append((hop_size, samplerate, soundfile))
22
23 class Test_aubio_sink(object):
24
25     def test_wrong_filename(self):
26         with assert_raises(RuntimeError):
27             sink('')
28
29     def test_wrong_samplerate(self):
30         with assert_raises(RuntimeError):
31             sink(get_tmp_sink_path(), -1)
32
33     def test_wrong_samplerate_too_large(self):
34         with assert_raises(RuntimeError):
35             sink(get_tmp_sink_path(), 1536001, 2)
36
37     def test_wrong_channels(self):
38         with assert_raises(RuntimeError):
39             sink(get_tmp_sink_path(), 44100, -1)
40
41     def test_wrong_channels_too_large(self):
42         with assert_raises(RuntimeError):
43             sink(get_tmp_sink_path(), 44100, 202020)
44
45     def test_many_sinks(self):
46         from tempfile import mkdtemp
47         import os.path
48         import shutil
49         tmpdir = mkdtemp()
50         sink_list = []
51         for i in range(many_files):
52             path = os.path.join(tmpdir, 'f-' + str(i) + '.wav')
53             g = sink(path, 0)
54             sink_list.append(g)
55             write = 32
56             for _ in range(200):
57                 vec = fvec(write)
58                 g(vec, write)
59             g.close()
60         shutil.rmtree(tmpdir)
61
62     @parametrize('hop_size, samplerate, path', all_params)
63     def test_read_and_write(self, hop_size, samplerate, path):
64         orig_samplerate = parse_file_samplerate(soundfile)
65         try:
66             if orig_samplerate is not None and orig_samplerate < samplerate:
67                 # upsampling should emit a warning
68                 with assert_warns(UserWarning):
69                     f = source(soundfile, samplerate, hop_size)
70             else:
71                 f = source(soundfile, samplerate, hop_size)
72         except RuntimeError as e:
73             err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
74             skipTest(err_msg.format(str(e), hop_size, samplerate))
75         if samplerate == 0: samplerate = f.samplerate
76         sink_path = get_tmp_sink_path()
77         g = sink(sink_path, samplerate)
78         total_frames = 0
79         while True:
80             vec, read = f()
81             g(vec, read)
82             total_frames += read
83             if read < f.hop_size: break
84         del_tmp_sink_path(sink_path)
85
86     @parametrize('hop_size, samplerate, path', all_params)
87     def test_read_and_write_multi(self, hop_size, samplerate, path):
88         orig_samplerate = parse_file_samplerate(soundfile)
89         try:
90             if orig_samplerate is not None and orig_samplerate < samplerate:
91                 # upsampling should emit a warning
92                 with assert_warns(UserWarning):
93                     f = source(soundfile, samplerate, hop_size)
94             else:
95                 f = source(soundfile, samplerate, hop_size)
96         except RuntimeError as e:
97             err_msg = '{:s} (hop_s = {:d}, samplerate = {:d})'
98             skipTest(err_msg.format(str(e), hop_size, samplerate))
99         if samplerate == 0: samplerate = f.samplerate
100         sink_path = get_tmp_sink_path()
101         g = sink(sink_path, samplerate, channels = f.channels)
102         total_frames = 0
103         while True:
104             vec, read = f.do_multi()
105             g.do_multi(vec, read)
106             total_frames += read
107             if read < f.hop_size: break
108         del_tmp_sink_path(sink_path)
109
110     def test_close_file(self):
111         samplerate = 44100
112         sink_path = get_tmp_sink_path()
113         g = sink(sink_path, samplerate)
114         g.close()
115         del_tmp_sink_path(sink_path)
116
117     def test_close_file_twice(self):
118         samplerate = 44100
119         sink_path = get_tmp_sink_path()
120         g = sink(sink_path, samplerate)
121         g.close()
122         g.close()
123         del_tmp_sink_path(sink_path)
124
125     def test_read_with(self):
126         samplerate = 44100
127         sink_path = get_tmp_sink_path()
128         vec = fvec(128)
129         with sink(sink_path, samplerate) as g:
130             for _ in range(10):
131                 g(vec, 128)
132
133 if __name__ == '__main__':
134     from _tools import run_module_suite
135     run_module_suite()