Merge branch 'feature/mfccparams'
[aubio.git] / python / tests / test_source.py
1 #! /usr/bin/env python
2
3 from nose2 import main
4 from nose2.tools import params
5 from numpy.testing import TestCase, assert_equal
6 from aubio import source
7 from .utils import list_all_sounds
8
9 import warnings
10 warnings.filterwarnings('ignore', category=UserWarning, append=True)
11
12 list_of_sounds = list_all_sounds('sounds')
13 samplerates = [0, 44100, 8000, 32000]
14 hop_sizes = [512, 1024, 64]
15
16 path = None
17
18 all_params = []
19 for soundfile in list_of_sounds:
20     for hop_size in hop_sizes:
21         for samplerate in samplerates:
22             all_params.append((hop_size, samplerate, soundfile))
23
24
25 class aubio_source_test_case_base(TestCase):
26
27     def setUp(self):
28         if not len(list_of_sounds):
29             self.skipTest('add some sound files in \'python/tests/sounds\'')
30         self.default_test_sound = list_of_sounds[0]
31
32 class aubio_source_test_case(aubio_source_test_case_base):
33
34     @params(*list_of_sounds)
35     def test_close_file(self, filename):
36         samplerate = 0 # use native samplerate
37         hop_size = 256
38         f = source(filename, samplerate, hop_size)
39         f.close()
40
41     @params(*list_of_sounds)
42     def test_close_file_twice(self, filename):
43         samplerate = 0 # use native samplerate
44         hop_size = 256
45         f = source(filename, samplerate, hop_size)
46         f.close()
47         f.close()
48
49 class aubio_source_read_test_case(aubio_source_test_case_base):
50
51     def read_from_source(self, f):
52         total_frames = 0
53         while True:
54             samples , read = f()
55             total_frames += read
56             if read < f.hop_size:
57                 assert_equal(samples[read:], 0)
58                 break
59         #result_str = "read {:.2f}s ({:d} frames in {:d} blocks at {:d}Hz) from {:s}"
60         #result_params = total_frames / float(f.samplerate), total_frames, total_frames//f.hop_size, f.samplerate, f.uri
61         #print (result_str.format(*result_params))
62         return total_frames
63
64     @params(*all_params)
65     def test_samplerate_hopsize(self, hop_size, samplerate, soundfile):
66         try:
67             f = source(soundfile, samplerate, hop_size)
68         except RuntimeError as e:
69             self.skipTest('failed opening with hop_s = {:d}, samplerate = {:d} ({:s})'.format(hop_size, samplerate, str(e)))
70         assert f.samplerate != 0
71         read_frames = self.read_from_source(f)
72         if 'f_' in soundfile and samplerate == 0:
73             import re
74             f = re.compile('.*_\([0:9]*f\)_.*')
75             match_f = re.findall('([0-9]*)f_', soundfile)
76             if len(match_f) == 1:
77                 expected_frames = int(match_f[0])
78                 self.assertEqual(expected_frames, read_frames)
79
80     @params(*list_of_sounds)
81     def test_samplerate_none(self, p):
82         f = source(p)
83         assert f.samplerate != 0
84         self.read_from_source(f)
85
86     @params(*list_of_sounds)
87     def test_samplerate_0(self, p):
88         f = source(p, 0)
89         assert f.samplerate != 0
90         self.read_from_source(f)
91
92     @params(*list_of_sounds)
93     def test_zero_hop_size(self, p):
94         f = source(p, 0, 0)
95         assert f.samplerate != 0
96         assert f.hop_size != 0
97         self.read_from_source(f)
98
99     @params(*list_of_sounds)
100     def test_seek_to_half(self, p):
101         from random import randint
102         f = source(p, 0, 0)
103         assert f.samplerate != 0
104         assert f.hop_size != 0
105         a = self.read_from_source(f)
106         c = randint(0, a)
107         f.seek(c)
108         b = self.read_from_source(f)
109         assert a == b + c
110
111     @params(*list_of_sounds)
112     def test_duration(self, p):
113         total_frames = 0
114         f = source(p)
115         duration = f.duration
116         while True:
117             _, read = f()
118             total_frames += read
119             if read < f.hop_size: break
120         self.assertEqual(duration, total_frames)
121
122
123 class aubio_source_test_wrong_params(TestCase):
124
125     def test_wrong_file(self):
126         with self.assertRaises(RuntimeError):
127             source('path_to/unexisting file.mp3')
128
129 class aubio_source_test_wrong_params_with_file(aubio_source_test_case_base):
130
131     def test_wrong_samplerate(self):
132         with self.assertRaises(ValueError):
133             source(self.default_test_sound, -1)
134
135     def test_wrong_hop_size(self):
136         with self.assertRaises(ValueError):
137             source(self.default_test_sound, 0, -1)
138
139     def test_wrong_channels(self):
140         with self.assertRaises(ValueError):
141             source(self.default_test_sound, 0, 0, -1)
142
143     def test_wrong_seek(self):
144         f = source(self.default_test_sound)
145         with self.assertRaises(ValueError):
146             f.seek(-1)
147
148     def test_wrong_seek_too_large(self):
149         f = source(self.default_test_sound)
150         try:
151             with self.assertRaises(ValueError):
152                 f.seek(f.duration + f.samplerate * 10)
153         except AssertionError:
154             self.skipTest('seeking after end of stream failed raising ValueError')
155
156 class aubio_source_readmulti_test_case(aubio_source_read_test_case):
157
158     def read_from_source(self, f):
159         total_frames = 0
160         while True:
161             samples, read = f.do_multi()
162             total_frames += read
163             if read < f.hop_size:
164                 assert_equal(samples[:,read:], 0)
165                 break
166         #result_str = "read {:.2f}s ({:d} frames in {:d} channels and {:d} blocks at {:d}Hz) from {:s}"
167         #result_params = total_frames / float(f.samplerate), total_frames, f.channels, int(total_frames/f.hop_size), f.samplerate, f.uri
168         #print (result_str.format(*result_params))
169         return total_frames
170
171 class aubio_source_with(aubio_source_test_case_base):
172
173     #@params(*list_of_sounds)
174     @params(*list_of_sounds)
175     def test_read_from_mono(self, filename):
176         total_frames = 0
177         hop_size = 2048
178         with source(filename, 0, hop_size) as input_source:
179             assert_equal(input_source.hop_size, hop_size)
180             #assert_equal(input_source.samplerate, samplerate)
181             total_frames = 0
182             for frames in input_source:
183                 total_frames += frames.shape[-1]
184             # check we read as many samples as we expected
185             assert_equal(total_frames, input_source.duration)
186
187 if __name__ == '__main__':
188     main()