b1e2ba7092f85214d86e85b5e2c86ee7ab356c5e
[aubio.git] / python / scripts / aubio
1 #! /usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 """aubio command line tool
5
6 This file was written by Paul Brossier <piem@aubio.org> and is released under
7 the GNU/GPL v3.
8
9 Note: this script is mostly about parsing command line arguments. For more
10 readable code examples, check out the `python/demos` folder."""
11
12 import sys
13 import argparse
14 import aubio
15
16 def aubio_parser():
17     epilog = 'use "%(prog)s <command> --help" for more info about each command'
18     parser = argparse.ArgumentParser(epilog=epilog)
19     parser.add_argument('-V', '--version', help="show version",
20             action="store_true", dest="show_version")
21
22     subparsers = parser.add_subparsers(dest='command',
23             description="", metavar="<command>")
24
25     # onset subcommand
26     subparser = subparsers.add_parser('onset',
27             help='get onset times',
28             formatter_class = argparse.ArgumentDefaultsHelpFormatter)
29     parser_add_input(subparser)
30     parser_add_buf_hop_size(subparser)
31     helpstr = "onset novelty function"
32     helpstr += " <default|energy|hfc|complex|phase|specdiff|kl|mkl|specflux>"
33     parser_add_method(subparser, helpstr=helpstr)
34     parser_add_threshold(subparser)
35     parser_add_silence(subparser)
36     parser_add_minioi(subparser)
37     parser_add_time_format(subparser)
38     parser_add_verbose_help(subparser)
39     subparser.set_defaults(process=process_onset)
40
41     # pitch subcommand
42     subparser = subparsers.add_parser('pitch',
43             help='extract fundamental frequency')
44     parser_add_input(subparser)
45     parser_add_buf_hop_size(subparser, buf_size=2048)
46     helpstr = "pitch detection method <default|yinfft|yin|mcomb|fcomb|schmitt>"
47     parser_add_method(subparser, helpstr=helpstr)
48     parser_add_threshold(subparser)
49     parser_add_silence(subparser)
50     parser_add_time_format(subparser)
51     parser_add_verbose_help(subparser)
52     subparser.set_defaults(process=process_pitch)
53
54     # tempo subcommand
55     subparser = subparsers.add_parser('beat',
56             help='get locations of beats')
57     parser_add_input(subparser)
58     parser_add_buf_hop_size(subparser, buf_size=1024, hop_size=512)
59     parser_add_time_format(subparser)
60     parser_add_verbose_help(subparser)
61     subparser.set_defaults(process=process_tempo)
62
63     # notes subcommand
64     subparser = subparsers.add_parser('notes',
65             help='get midi-like notes')
66     parser_add_input(subparser)
67     parser_add_buf_hop_size(subparser)
68     parser_add_time_format(subparser)
69     parser_add_verbose_help(subparser)
70     subparser.set_defaults(process=process_notes)
71
72     # mfcc subcommand
73     subparser = subparsers.add_parser('mfcc',
74             help='extract mel-frequency cepstrum coefficients')
75     parser_add_input(subparser)
76     parser_add_buf_hop_size(subparser)
77     parser_add_time_format(subparser)
78     parser_add_verbose_help(subparser)
79     subparser.set_defaults(process=process_mfcc)
80
81     # melbands subcommand
82     subparser = subparsers.add_parser('melbands',
83             help='extract mel-frequency energies per band')
84     parser_add_input(subparser)
85     parser_add_buf_hop_size(subparser)
86     parser_add_time_format(subparser)
87     parser_add_verbose_help(subparser)
88     subparser.set_defaults(process=process_melbands)
89
90     return parser
91
92 def parser_add_input(parser):
93     parser.add_argument("source_uri", default=None, nargs='?',
94             help="input sound file to analyse", metavar = "<source_uri>")
95     parser.add_argument("-i", "--input", dest = "source_uri2",
96             help="input sound file to analyse", metavar = "<source_uri>")
97     parser.add_argument("-r", "--samplerate",
98             metavar = "<freq>", type=int,
99             action="store", dest="samplerate", default=0,
100             help="samplerate at which the file should be represented")
101
102 def parser_add_verbose_help(parser):
103     parser.add_argument("-v","--verbose",
104             action="count", dest="verbose", default=1,
105             help="make lots of noise [default]")
106     parser.add_argument("-q","--quiet",
107             action="store_const", dest="verbose", const=0,
108             help="be quiet")
109
110 def parser_add_buf_hop_size(parser, buf_size=512, hop_size=256):
111     parser.add_argument("-B","--bufsize",
112             action="store", dest="buf_size", default=buf_size,
113             metavar = "<size>", type=int,
114             help="buffer size [default=%d]" % buf_size)
115     parser.add_argument("-H","--hopsize",
116             metavar = "<size>", type=int,
117             action="store", dest="hop_size", default=hop_size,
118             help="overlap size [default=%d]" % hop_size)
119
120 def parser_add_method(parser, method='default', helpstr='method'):
121     parser.add_argument("-m","--method",
122             metavar = "<method>", type=str,
123             action="store", dest="method", default=method,
124             help="%s [default=%s]" % (helpstr, method))
125
126 def parser_add_threshold(parser, default=None):
127     parser.add_argument("-t","--threshold",
128             metavar = "<threshold>", type=float,
129             action="store", dest="threshold", default=default,
130             help="threshold [default=%s]" % default)
131
132 def parser_add_silence(parser):
133     parser.add_argument("-s", "--silence",
134             metavar = "<value>", type=float,
135             action="store", dest="silence", default=-70,
136             help="silence threshold")
137
138 def parser_add_minioi(parser):
139     parser.add_argument("-M", "--minioi",
140             metavar = "<value>", type=str,
141             action="store", dest="minioi", default="12ms",
142             help="minimum Inter-Onset Interval")
143
144 def parser_add_time_format(parser):
145     helpstr = "select time values output format (samples, ms, seconds)"
146     helpstr += " [default=seconds]"
147     parser.add_argument("-T", "--time-format",
148              metavar='format',
149              dest="time_format",
150              default=None,
151              help=helpstr)
152
153 # some utilities
154
155 def parse_options(args, valid_opts):
156     options = {k :v for k,v in vars(args).items() if k in valid_opts}
157     return options
158
159 def remap_pvoc_options(options):
160     # remap buf_size to win_s, hop_size to hop_s
161     # FIXME: adjust python/ext/py-phasevoc.c to understand buf_size/hop_size
162     options['win_s'] = options['buf_size']
163     del options['buf_size']
164     options['hop_s'] = options['hop_size']
165     del options['hop_size']
166     return options
167
168 def samples2seconds(n_frames, samplerate):
169     return "%f\t" % (n_frames / float(a_source.samplerate))
170
171 def samples2milliseconds(n_frames, samplerate):
172     return "%f\t" % (1000. * n_frames / float(a_source.samplerate))
173
174 def samples2samples(n_frames, samplerate):
175     return "%d\t" % n_frames
176
177 def timefunc(mode):
178     if mode is None or mode == 'seconds' or mode == 's':
179         return samples2seconds
180     elif mode == 'ms' or mode == 'milliseconds':
181         return samples2milliseconds
182     elif mode == 'samples':
183         return samples2samples
184     else:
185         raise ValueError('invalid time format %s' % mode)
186
187 # definition of processing classes
188
189 class default_process(object):
190     def __init__(self, args):
191         if 'time_format' in args:
192             self.time2string = timefunc(args.time_format)
193         if args.verbose > 2 and hasattr(self, 'options'):
194             name = type(self).__name__.split('_')[1]
195             optstr = ' '.join(['running', name, 'with options', repr(self.options), '\n'])
196             sys.stderr.write(optstr)
197
198 class process_onset(default_process):
199     valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
200     def __init__(self, args):
201         self.options = parse_options(args, self.valid_opts)
202         self.onset = aubio.onset(**self.options)
203         if args.threshold is not None:
204             self.onset.set_threshold(args.threshold)
205         if args.minioi:
206             if args.minioi.endswith('ms'):
207                 self.onset.set_minioi_ms(float(args.minioi[:-2]))
208             elif args.minioi.endswith('s'):
209                 self.onset.set_minioi_s(float(args.minioi[:-1]))
210             else:
211                 self.onset.set_minioi(int(args.minioi))
212         if args.silence:
213             self.onset.set_silence(args.silence)
214         super(process_onset, self).__init__(args)
215     def __call__(self, block):
216         return self.onset(block)
217     def repr_res(self, res, frames_read, a_source):
218         if res[0] != 0:
219             outstr = self.time2string(self.onset.get_last(), a_source.samplerate)
220             sys.stdout.write(outstr + '\n')
221
222 class process_pitch(default_process):
223     valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
224     def __init__(self, args):
225         self.options = parse_options(args, self.valid_opts)
226         self.pitch = aubio.pitch(**self.options)
227         if args.threshold is not None:
228             self.pitch.set_tolerance(args.threshold)
229         if args.silence is not None:
230             self.pitch.set_silence(args.silence)
231         super(process_pitch, self).__init__(args)
232     def __call__(self, block):
233         return self.pitch(block)
234     def repr_res(self, res, frames_read, a_source):
235         fmt_out = self.time2string(frames_read, a_source.samplerate)
236         sys.stdout.write(fmt_out + "%.6f\n" % res[0])
237
238 class process_tempo(default_process):
239     valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
240     def __init__(self, args):
241         self.options = parse_options(args, self.valid_opts)
242         self.tempo = aubio.tempo(**self.options)
243         super(process_tempo, self).__init__(args)
244     def __call__(self, block):
245         return self.tempo(block)
246     def repr_res(self, res, frames_read, a_source):
247         if res[0] != 0:
248             outstr = self.time2string(self.tempo.get_last(), a_source.samplerate)
249             sys.stdout.write(outstr + '\n')
250
251 class process_notes(default_process):
252     valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
253     def __init__(self, args):
254         self.options = parse_options(args, self.valid_opts)
255         self.notes = aubio.notes(**self.options)
256         super(process_notes, self).__init__(args)
257     def __call__(self, block):
258         return self.notes(block)
259     def repr_res(self, res, frames_read, a_source):
260         if res[2] != 0: # note off
261             fmt_out = self.time2string(frames_read, a_source.samplerate)
262             sys.stdout.write(fmt_out + '\n')
263         if res[0] != 0: # note on
264             lastmidi = res[0]
265             fmt_out = "%f\t" % lastmidi
266             fmt_out += self.time2string(frames_read, a_source.samplerate)
267             sys.stdout.write(fmt_out) # + '\t')
268
269 class process_mfcc(default_process):
270     def __init__(self, args):
271         valid_opts = ['hop_size', 'buf_size']
272         options = parse_options(args, valid_opts)
273         self.options = remap_pvoc_options(options)
274         self.pv = aubio.pvoc(**options)
275
276         valid_opts = ['buf_size', 'n_filters', 'n_coeffs', 'samplerate']
277         options = parse_options(args, valid_opts)
278         self.mfcc = aubio.mfcc(**options)
279         self.options.update(options)
280
281         super(process_mfcc, self).__init__(args)
282
283     def __call__(self, block):
284         fftgrain = self.pv(block)
285         return self.mfcc(fftgrain)
286     def repr_res(self, res, frames_read, a_source):
287         fmt_out = self.time2string(frames_read, a_source.samplerate)
288         fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()])
289         sys.stdout.write(fmt_out + '\n')
290
291 class process_melbands(default_process):
292     def __init__(self, args):
293         self.args = args
294         valid_opts = ['hop_size', 'buf_size']
295         options = parse_options(args, valid_opts)
296         options = remap_pvoc_options(options)
297         self.pv = aubio.pvoc(**options)
298
299         valid_opts = ['buf_size', 'n_filters']
300         options = {k :v for k,v in vars(args).items() if k in valid_opts}
301         # FIXME
302         options['win_s'] = options['buf_size']
303         del options['buf_size']
304         self.filterbank = aubio.filterbank(**options)
305         self.filterbank.set_mel_coeffs_slaney(args.samplerate)
306
307         super(process_melbands, self).__init__(args)
308     def __call__(self, block):
309         fftgrain = self.pv(block)
310         return self.filterbank(fftgrain)
311     def repr_res(self, res, frames_read, a_source):
312         fmt_out = self.time2string(frames_read, a_source.samplerate)
313         fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()])
314         sys.stdout.write(fmt_out + '\n')
315
316 if __name__ == '__main__':
317     parser = aubio_parser()
318     args = parser.parse_args()
319     if args.show_version or ('verbose' in args and args.verbose > 3):
320         sys.stdout.write('aubio version ' + aubio.version + '\n')
321     if args.show_version and args.command is None:
322         sys.exit(0)
323     if args.command is None:
324         sys.stderr.write("Error: a command is required\n")
325         parser.print_help()
326         sys.exit(1)
327     elif not args.source_uri and not args.source_uri2:
328         sys.stderr.write("Error: a source is required\n")
329         parser.print_help()
330         sys.exit(1)
331     elif args.source_uri2 is not None:
332         args.source_uri = args.source_uri2
333     try:
334         # open source_uri
335         with aubio.source(args.source_uri, hop_size=args.hop_size,
336                 samplerate=args.samplerate) as a_source:
337             args.samplerate = a_source.samplerate
338             # create the processor for this subcommand
339             processor = args.process(args)
340             frames_read = 0
341             while True:
342                 # read new block from source
343                 block, read = a_source()
344                 # execute processor on this block
345                 res = processor(block)
346                 # print results for this block
347                 if args.verbose > 0:
348                     processor.repr_res(res, frames_read, a_source)
349                 # increment total number of frames read
350                 frames_read += read
351                 # exit loop at end of file
352                 if read < a_source.hop_size: break
353             # special case for notes
354             if args.command == 'notes':
355                 eof = processor.time2string(frames_read, a_source.samplerate)
356                 sys.stdout.write(eof + '\n')
357                 sys.stdout.flush()
358             if args.verbose > 1:
359                 fmt_string = "read {:.2f}s"
360                 fmt_string += " ({:d} samples in {:d} blocks of {:d})"
361                 fmt_string += " from {:s} at {:d}Hz\n"
362                 sys.stderr.write(fmt_string.format(
363                         frames_read/float(a_source.samplerate),
364                         frames_read,
365                         frames_read // a_source.hop_size + 1,
366                         a_source.hop_size,
367                         a_source.uri,
368                         a_source.samplerate))
369     except KeyboardInterrupt as e:
370         sys.exit(1)