From: Paul Brossier Date: Thu, 23 Mar 2017 13:28:50 +0000 (+0100) Subject: python/scripts/aubio: simple script to replace examples/ in the python world X-Git-Tag: 0.4.5~44^2~13 X-Git-Url: https://git.aubio.org/?p=aubio.git;a=commitdiff_plain;h=1d2cc5ef9a54e168c405ba61f0445179383132de;hp=91fa88d3c702a0a38a3a75d842f3cecf823b25e1 python/scripts/aubio: simple script to replace examples/ in the python world --- diff --git a/python/scripts/aubio b/python/scripts/aubio new file mode 100755 index 00000000..b1e2ba70 --- /dev/null +++ b/python/scripts/aubio @@ -0,0 +1,370 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- + +"""aubio command line tool + +This file was written by Paul Brossier and is released under +the GNU/GPL v3. + +Note: this script is mostly about parsing command line arguments. For more +readable code examples, check out the `python/demos` folder.""" + +import sys +import argparse +import aubio + +def aubio_parser(): + epilog = 'use "%(prog)s --help" for more info about each command' + parser = argparse.ArgumentParser(epilog=epilog) + parser.add_argument('-V', '--version', help="show version", + action="store_true", dest="show_version") + + subparsers = parser.add_subparsers(dest='command', + description="", metavar="") + + # onset subcommand + subparser = subparsers.add_parser('onset', + help='get onset times', + formatter_class = argparse.ArgumentDefaultsHelpFormatter) + parser_add_input(subparser) + parser_add_buf_hop_size(subparser) + helpstr = "onset novelty function" + helpstr += " " + parser_add_method(subparser, helpstr=helpstr) + parser_add_threshold(subparser) + parser_add_silence(subparser) + parser_add_minioi(subparser) + parser_add_time_format(subparser) + parser_add_verbose_help(subparser) + subparser.set_defaults(process=process_onset) + + # pitch subcommand + subparser = subparsers.add_parser('pitch', + help='extract fundamental frequency') + parser_add_input(subparser) + parser_add_buf_hop_size(subparser, buf_size=2048) + helpstr = "pitch detection method " + parser_add_method(subparser, helpstr=helpstr) + parser_add_threshold(subparser) + parser_add_silence(subparser) + parser_add_time_format(subparser) + parser_add_verbose_help(subparser) + subparser.set_defaults(process=process_pitch) + + # tempo subcommand + subparser = subparsers.add_parser('beat', + help='get locations of beats') + parser_add_input(subparser) + parser_add_buf_hop_size(subparser, buf_size=1024, hop_size=512) + parser_add_time_format(subparser) + parser_add_verbose_help(subparser) + subparser.set_defaults(process=process_tempo) + + # notes subcommand + subparser = subparsers.add_parser('notes', + help='get midi-like notes') + parser_add_input(subparser) + parser_add_buf_hop_size(subparser) + parser_add_time_format(subparser) + parser_add_verbose_help(subparser) + subparser.set_defaults(process=process_notes) + + # mfcc subcommand + subparser = subparsers.add_parser('mfcc', + help='extract mel-frequency cepstrum coefficients') + parser_add_input(subparser) + parser_add_buf_hop_size(subparser) + parser_add_time_format(subparser) + parser_add_verbose_help(subparser) + subparser.set_defaults(process=process_mfcc) + + # melbands subcommand + subparser = subparsers.add_parser('melbands', + help='extract mel-frequency energies per band') + parser_add_input(subparser) + parser_add_buf_hop_size(subparser) + parser_add_time_format(subparser) + parser_add_verbose_help(subparser) + subparser.set_defaults(process=process_melbands) + + return parser + +def parser_add_input(parser): + parser.add_argument("source_uri", default=None, nargs='?', + help="input sound file to analyse", metavar = "") + parser.add_argument("-i", "--input", dest = "source_uri2", + help="input sound file to analyse", metavar = "") + parser.add_argument("-r", "--samplerate", + metavar = "", type=int, + action="store", dest="samplerate", default=0, + help="samplerate at which the file should be represented") + +def parser_add_verbose_help(parser): + parser.add_argument("-v","--verbose", + action="count", dest="verbose", default=1, + help="make lots of noise [default]") + parser.add_argument("-q","--quiet", + action="store_const", dest="verbose", const=0, + help="be quiet") + +def parser_add_buf_hop_size(parser, buf_size=512, hop_size=256): + parser.add_argument("-B","--bufsize", + action="store", dest="buf_size", default=buf_size, + metavar = "", type=int, + help="buffer size [default=%d]" % buf_size) + parser.add_argument("-H","--hopsize", + metavar = "", type=int, + action="store", dest="hop_size", default=hop_size, + help="overlap size [default=%d]" % hop_size) + +def parser_add_method(parser, method='default', helpstr='method'): + parser.add_argument("-m","--method", + metavar = "", type=str, + action="store", dest="method", default=method, + help="%s [default=%s]" % (helpstr, method)) + +def parser_add_threshold(parser, default=None): + parser.add_argument("-t","--threshold", + metavar = "", type=float, + action="store", dest="threshold", default=default, + help="threshold [default=%s]" % default) + +def parser_add_silence(parser): + parser.add_argument("-s", "--silence", + metavar = "", type=float, + action="store", dest="silence", default=-70, + help="silence threshold") + +def parser_add_minioi(parser): + parser.add_argument("-M", "--minioi", + metavar = "", type=str, + action="store", dest="minioi", default="12ms", + help="minimum Inter-Onset Interval") + +def parser_add_time_format(parser): + helpstr = "select time values output format (samples, ms, seconds)" + helpstr += " [default=seconds]" + parser.add_argument("-T", "--time-format", + metavar='format', + dest="time_format", + default=None, + help=helpstr) + +# some utilities + +def parse_options(args, valid_opts): + options = {k :v for k,v in vars(args).items() if k in valid_opts} + return options + +def remap_pvoc_options(options): + # remap buf_size to win_s, hop_size to hop_s + # FIXME: adjust python/ext/py-phasevoc.c to understand buf_size/hop_size + options['win_s'] = options['buf_size'] + del options['buf_size'] + options['hop_s'] = options['hop_size'] + del options['hop_size'] + return options + +def samples2seconds(n_frames, samplerate): + return "%f\t" % (n_frames / float(a_source.samplerate)) + +def samples2milliseconds(n_frames, samplerate): + return "%f\t" % (1000. * n_frames / float(a_source.samplerate)) + +def samples2samples(n_frames, samplerate): + return "%d\t" % n_frames + +def timefunc(mode): + if mode is None or mode == 'seconds' or mode == 's': + return samples2seconds + elif mode == 'ms' or mode == 'milliseconds': + return samples2milliseconds + elif mode == 'samples': + return samples2samples + else: + raise ValueError('invalid time format %s' % mode) + +# definition of processing classes + +class default_process(object): + def __init__(self, args): + if 'time_format' in args: + self.time2string = timefunc(args.time_format) + if args.verbose > 2 and hasattr(self, 'options'): + name = type(self).__name__.split('_')[1] + optstr = ' '.join(['running', name, 'with options', repr(self.options), '\n']) + sys.stderr.write(optstr) + +class process_onset(default_process): + valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate'] + def __init__(self, args): + self.options = parse_options(args, self.valid_opts) + self.onset = aubio.onset(**self.options) + if args.threshold is not None: + self.onset.set_threshold(args.threshold) + if args.minioi: + if args.minioi.endswith('ms'): + self.onset.set_minioi_ms(float(args.minioi[:-2])) + elif args.minioi.endswith('s'): + self.onset.set_minioi_s(float(args.minioi[:-1])) + else: + self.onset.set_minioi(int(args.minioi)) + if args.silence: + self.onset.set_silence(args.silence) + super(process_onset, self).__init__(args) + def __call__(self, block): + return self.onset(block) + def repr_res(self, res, frames_read, a_source): + if res[0] != 0: + outstr = self.time2string(self.onset.get_last(), a_source.samplerate) + sys.stdout.write(outstr + '\n') + +class process_pitch(default_process): + valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate'] + def __init__(self, args): + self.options = parse_options(args, self.valid_opts) + self.pitch = aubio.pitch(**self.options) + if args.threshold is not None: + self.pitch.set_tolerance(args.threshold) + if args.silence is not None: + self.pitch.set_silence(args.silence) + super(process_pitch, self).__init__(args) + def __call__(self, block): + return self.pitch(block) + def repr_res(self, res, frames_read, a_source): + fmt_out = self.time2string(frames_read, a_source.samplerate) + sys.stdout.write(fmt_out + "%.6f\n" % res[0]) + +class process_tempo(default_process): + valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate'] + def __init__(self, args): + self.options = parse_options(args, self.valid_opts) + self.tempo = aubio.tempo(**self.options) + super(process_tempo, self).__init__(args) + def __call__(self, block): + return self.tempo(block) + def repr_res(self, res, frames_read, a_source): + if res[0] != 0: + outstr = self.time2string(self.tempo.get_last(), a_source.samplerate) + sys.stdout.write(outstr + '\n') + +class process_notes(default_process): + valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate'] + def __init__(self, args): + self.options = parse_options(args, self.valid_opts) + self.notes = aubio.notes(**self.options) + super(process_notes, self).__init__(args) + def __call__(self, block): + return self.notes(block) + def repr_res(self, res, frames_read, a_source): + if res[2] != 0: # note off + fmt_out = self.time2string(frames_read, a_source.samplerate) + sys.stdout.write(fmt_out + '\n') + if res[0] != 0: # note on + lastmidi = res[0] + fmt_out = "%f\t" % lastmidi + fmt_out += self.time2string(frames_read, a_source.samplerate) + sys.stdout.write(fmt_out) # + '\t') + +class process_mfcc(default_process): + def __init__(self, args): + valid_opts = ['hop_size', 'buf_size'] + options = parse_options(args, valid_opts) + self.options = remap_pvoc_options(options) + self.pv = aubio.pvoc(**options) + + valid_opts = ['buf_size', 'n_filters', 'n_coeffs', 'samplerate'] + options = parse_options(args, valid_opts) + self.mfcc = aubio.mfcc(**options) + self.options.update(options) + + super(process_mfcc, self).__init__(args) + + def __call__(self, block): + fftgrain = self.pv(block) + return self.mfcc(fftgrain) + def repr_res(self, res, frames_read, a_source): + fmt_out = self.time2string(frames_read, a_source.samplerate) + fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()]) + sys.stdout.write(fmt_out + '\n') + +class process_melbands(default_process): + def __init__(self, args): + self.args = args + valid_opts = ['hop_size', 'buf_size'] + options = parse_options(args, valid_opts) + options = remap_pvoc_options(options) + self.pv = aubio.pvoc(**options) + + valid_opts = ['buf_size', 'n_filters'] + options = {k :v for k,v in vars(args).items() if k in valid_opts} + # FIXME + options['win_s'] = options['buf_size'] + del options['buf_size'] + self.filterbank = aubio.filterbank(**options) + self.filterbank.set_mel_coeffs_slaney(args.samplerate) + + super(process_melbands, self).__init__(args) + def __call__(self, block): + fftgrain = self.pv(block) + return self.filterbank(fftgrain) + def repr_res(self, res, frames_read, a_source): + fmt_out = self.time2string(frames_read, a_source.samplerate) + fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()]) + sys.stdout.write(fmt_out + '\n') + +if __name__ == '__main__': + parser = aubio_parser() + args = parser.parse_args() + if args.show_version or ('verbose' in args and args.verbose > 3): + sys.stdout.write('aubio version ' + aubio.version + '\n') + if args.show_version and args.command is None: + sys.exit(0) + if args.command is None: + sys.stderr.write("Error: a command is required\n") + parser.print_help() + sys.exit(1) + elif not args.source_uri and not args.source_uri2: + sys.stderr.write("Error: a source is required\n") + parser.print_help() + sys.exit(1) + elif args.source_uri2 is not None: + args.source_uri = args.source_uri2 + try: + # open source_uri + with aubio.source(args.source_uri, hop_size=args.hop_size, + samplerate=args.samplerate) as a_source: + args.samplerate = a_source.samplerate + # create the processor for this subcommand + processor = args.process(args) + frames_read = 0 + while True: + # read new block from source + block, read = a_source() + # execute processor on this block + res = processor(block) + # print results for this block + if args.verbose > 0: + processor.repr_res(res, frames_read, a_source) + # increment total number of frames read + frames_read += read + # exit loop at end of file + if read < a_source.hop_size: break + # special case for notes + if args.command == 'notes': + eof = processor.time2string(frames_read, a_source.samplerate) + sys.stdout.write(eof + '\n') + sys.stdout.flush() + if args.verbose > 1: + fmt_string = "read {:.2f}s" + fmt_string += " ({:d} samples in {:d} blocks of {:d})" + fmt_string += " from {:s} at {:d}Hz\n" + sys.stderr.write(fmt_string.format( + frames_read/float(a_source.samplerate), + frames_read, + frames_read // a_source.hop_size + 1, + a_source.hop_size, + a_source.uri, + a_source.samplerate)) + except KeyboardInterrupt as e: + sys.exit(1) diff --git a/setup.py b/setup.py index 0248a1a1..0c93bb97 100755 --- a/setup.py +++ b/setup.py @@ -57,7 +57,7 @@ distrib = setup(name='aubio', version = __version__, packages = ['aubio'], package_dir = {'aubio':'python/lib/aubio'}, - scripts = ['python/scripts/aubiocut'], + scripts = ['python/scripts/aubiocut', 'python/scripts/aubio'], ext_modules = [aubio_extension], description = 'a collection of tools for music analysis', long_description = 'a collection of tools for music analysis',