python/scripts/aubio: simple script to replace examples/ in the python world
authorPaul Brossier <piem@piem.org>
Thu, 23 Mar 2017 13:28:50 +0000 (14:28 +0100)
committerPaul Brossier <piem@piem.org>
Thu, 23 Mar 2017 13:28:50 +0000 (14:28 +0100)
python/scripts/aubio [new file with mode: 0755]
setup.py

diff --git a/python/scripts/aubio b/python/scripts/aubio
new file mode 100755 (executable)
index 0000000..b1e2ba7
--- /dev/null
@@ -0,0 +1,370 @@
+#! /usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""aubio command line tool
+
+This file was written by Paul Brossier <piem@aubio.org> and is released under
+the GNU/GPL v3.
+
+Note: this script is mostly about parsing command line arguments. For more
+readable code examples, check out the `python/demos` folder."""
+
+import sys
+import argparse
+import aubio
+
+def aubio_parser():
+    epilog = 'use "%(prog)s <command> --help" for more info about each command'
+    parser = argparse.ArgumentParser(epilog=epilog)
+    parser.add_argument('-V', '--version', help="show version",
+            action="store_true", dest="show_version")
+
+    subparsers = parser.add_subparsers(dest='command',
+            description="", metavar="<command>")
+
+    # onset subcommand
+    subparser = subparsers.add_parser('onset',
+            help='get onset times',
+            formatter_class = argparse.ArgumentDefaultsHelpFormatter)
+    parser_add_input(subparser)
+    parser_add_buf_hop_size(subparser)
+    helpstr = "onset novelty function"
+    helpstr += " <default|energy|hfc|complex|phase|specdiff|kl|mkl|specflux>"
+    parser_add_method(subparser, helpstr=helpstr)
+    parser_add_threshold(subparser)
+    parser_add_silence(subparser)
+    parser_add_minioi(subparser)
+    parser_add_time_format(subparser)
+    parser_add_verbose_help(subparser)
+    subparser.set_defaults(process=process_onset)
+
+    # pitch subcommand
+    subparser = subparsers.add_parser('pitch',
+            help='extract fundamental frequency')
+    parser_add_input(subparser)
+    parser_add_buf_hop_size(subparser, buf_size=2048)
+    helpstr = "pitch detection method <default|yinfft|yin|mcomb|fcomb|schmitt>"
+    parser_add_method(subparser, helpstr=helpstr)
+    parser_add_threshold(subparser)
+    parser_add_silence(subparser)
+    parser_add_time_format(subparser)
+    parser_add_verbose_help(subparser)
+    subparser.set_defaults(process=process_pitch)
+
+    # tempo subcommand
+    subparser = subparsers.add_parser('beat',
+            help='get locations of beats')
+    parser_add_input(subparser)
+    parser_add_buf_hop_size(subparser, buf_size=1024, hop_size=512)
+    parser_add_time_format(subparser)
+    parser_add_verbose_help(subparser)
+    subparser.set_defaults(process=process_tempo)
+
+    # notes subcommand
+    subparser = subparsers.add_parser('notes',
+            help='get midi-like notes')
+    parser_add_input(subparser)
+    parser_add_buf_hop_size(subparser)
+    parser_add_time_format(subparser)
+    parser_add_verbose_help(subparser)
+    subparser.set_defaults(process=process_notes)
+
+    # mfcc subcommand
+    subparser = subparsers.add_parser('mfcc',
+            help='extract mel-frequency cepstrum coefficients')
+    parser_add_input(subparser)
+    parser_add_buf_hop_size(subparser)
+    parser_add_time_format(subparser)
+    parser_add_verbose_help(subparser)
+    subparser.set_defaults(process=process_mfcc)
+
+    # melbands subcommand
+    subparser = subparsers.add_parser('melbands',
+            help='extract mel-frequency energies per band')
+    parser_add_input(subparser)
+    parser_add_buf_hop_size(subparser)
+    parser_add_time_format(subparser)
+    parser_add_verbose_help(subparser)
+    subparser.set_defaults(process=process_melbands)
+
+    return parser
+
+def parser_add_input(parser):
+    parser.add_argument("source_uri", default=None, nargs='?',
+            help="input sound file to analyse", metavar = "<source_uri>")
+    parser.add_argument("-i", "--input", dest = "source_uri2",
+            help="input sound file to analyse", metavar = "<source_uri>")
+    parser.add_argument("-r", "--samplerate",
+            metavar = "<freq>", type=int,
+            action="store", dest="samplerate", default=0,
+            help="samplerate at which the file should be represented")
+
+def parser_add_verbose_help(parser):
+    parser.add_argument("-v","--verbose",
+            action="count", dest="verbose", default=1,
+            help="make lots of noise [default]")
+    parser.add_argument("-q","--quiet",
+            action="store_const", dest="verbose", const=0,
+            help="be quiet")
+
+def parser_add_buf_hop_size(parser, buf_size=512, hop_size=256):
+    parser.add_argument("-B","--bufsize",
+            action="store", dest="buf_size", default=buf_size,
+            metavar = "<size>", type=int,
+            help="buffer size [default=%d]" % buf_size)
+    parser.add_argument("-H","--hopsize",
+            metavar = "<size>", type=int,
+            action="store", dest="hop_size", default=hop_size,
+            help="overlap size [default=%d]" % hop_size)
+
+def parser_add_method(parser, method='default', helpstr='method'):
+    parser.add_argument("-m","--method",
+            metavar = "<method>", type=str,
+            action="store", dest="method", default=method,
+            help="%s [default=%s]" % (helpstr, method))
+
+def parser_add_threshold(parser, default=None):
+    parser.add_argument("-t","--threshold",
+            metavar = "<threshold>", type=float,
+            action="store", dest="threshold", default=default,
+            help="threshold [default=%s]" % default)
+
+def parser_add_silence(parser):
+    parser.add_argument("-s", "--silence",
+            metavar = "<value>", type=float,
+            action="store", dest="silence", default=-70,
+            help="silence threshold")
+
+def parser_add_minioi(parser):
+    parser.add_argument("-M", "--minioi",
+            metavar = "<value>", type=str,
+            action="store", dest="minioi", default="12ms",
+            help="minimum Inter-Onset Interval")
+
+def parser_add_time_format(parser):
+    helpstr = "select time values output format (samples, ms, seconds)"
+    helpstr += " [default=seconds]"
+    parser.add_argument("-T", "--time-format",
+             metavar='format',
+             dest="time_format",
+             default=None,
+             help=helpstr)
+
+# some utilities
+
+def parse_options(args, valid_opts):
+    options = {k :v for k,v in vars(args).items() if k in valid_opts}
+    return options
+
+def remap_pvoc_options(options):
+    # remap buf_size to win_s, hop_size to hop_s
+    # FIXME: adjust python/ext/py-phasevoc.c to understand buf_size/hop_size
+    options['win_s'] = options['buf_size']
+    del options['buf_size']
+    options['hop_s'] = options['hop_size']
+    del options['hop_size']
+    return options
+
+def samples2seconds(n_frames, samplerate):
+    return "%f\t" % (n_frames / float(a_source.samplerate))
+
+def samples2milliseconds(n_frames, samplerate):
+    return "%f\t" % (1000. * n_frames / float(a_source.samplerate))
+
+def samples2samples(n_frames, samplerate):
+    return "%d\t" % n_frames
+
+def timefunc(mode):
+    if mode is None or mode == 'seconds' or mode == 's':
+        return samples2seconds
+    elif mode == 'ms' or mode == 'milliseconds':
+        return samples2milliseconds
+    elif mode == 'samples':
+        return samples2samples
+    else:
+        raise ValueError('invalid time format %s' % mode)
+
+# definition of processing classes
+
+class default_process(object):
+    def __init__(self, args):
+        if 'time_format' in args:
+            self.time2string = timefunc(args.time_format)
+        if args.verbose > 2 and hasattr(self, 'options'):
+            name = type(self).__name__.split('_')[1]
+            optstr = ' '.join(['running', name, 'with options', repr(self.options), '\n'])
+            sys.stderr.write(optstr)
+
+class process_onset(default_process):
+    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
+    def __init__(self, args):
+        self.options = parse_options(args, self.valid_opts)
+        self.onset = aubio.onset(**self.options)
+        if args.threshold is not None:
+            self.onset.set_threshold(args.threshold)
+        if args.minioi:
+            if args.minioi.endswith('ms'):
+                self.onset.set_minioi_ms(float(args.minioi[:-2]))
+            elif args.minioi.endswith('s'):
+                self.onset.set_minioi_s(float(args.minioi[:-1]))
+            else:
+                self.onset.set_minioi(int(args.minioi))
+        if args.silence:
+            self.onset.set_silence(args.silence)
+        super(process_onset, self).__init__(args)
+    def __call__(self, block):
+        return self.onset(block)
+    def repr_res(self, res, frames_read, a_source):
+        if res[0] != 0:
+            outstr = self.time2string(self.onset.get_last(), a_source.samplerate)
+            sys.stdout.write(outstr + '\n')
+
+class process_pitch(default_process):
+    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
+    def __init__(self, args):
+        self.options = parse_options(args, self.valid_opts)
+        self.pitch = aubio.pitch(**self.options)
+        if args.threshold is not None:
+            self.pitch.set_tolerance(args.threshold)
+        if args.silence is not None:
+            self.pitch.set_silence(args.silence)
+        super(process_pitch, self).__init__(args)
+    def __call__(self, block):
+        return self.pitch(block)
+    def repr_res(self, res, frames_read, a_source):
+        fmt_out = self.time2string(frames_read, a_source.samplerate)
+        sys.stdout.write(fmt_out + "%.6f\n" % res[0])
+
+class process_tempo(default_process):
+    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
+    def __init__(self, args):
+        self.options = parse_options(args, self.valid_opts)
+        self.tempo = aubio.tempo(**self.options)
+        super(process_tempo, self).__init__(args)
+    def __call__(self, block):
+        return self.tempo(block)
+    def repr_res(self, res, frames_read, a_source):
+        if res[0] != 0:
+            outstr = self.time2string(self.tempo.get_last(), a_source.samplerate)
+            sys.stdout.write(outstr + '\n')
+
+class process_notes(default_process):
+    valid_opts = ['method', 'hop_size', 'buf_size', 'samplerate']
+    def __init__(self, args):
+        self.options = parse_options(args, self.valid_opts)
+        self.notes = aubio.notes(**self.options)
+        super(process_notes, self).__init__(args)
+    def __call__(self, block):
+        return self.notes(block)
+    def repr_res(self, res, frames_read, a_source):
+        if res[2] != 0: # note off
+            fmt_out = self.time2string(frames_read, a_source.samplerate)
+            sys.stdout.write(fmt_out + '\n')
+        if res[0] != 0: # note on
+            lastmidi = res[0]
+            fmt_out = "%f\t" % lastmidi
+            fmt_out += self.time2string(frames_read, a_source.samplerate)
+            sys.stdout.write(fmt_out) # + '\t')
+
+class process_mfcc(default_process):
+    def __init__(self, args):
+        valid_opts = ['hop_size', 'buf_size']
+        options = parse_options(args, valid_opts)
+        self.options = remap_pvoc_options(options)
+        self.pv = aubio.pvoc(**options)
+
+        valid_opts = ['buf_size', 'n_filters', 'n_coeffs', 'samplerate']
+        options = parse_options(args, valid_opts)
+        self.mfcc = aubio.mfcc(**options)
+        self.options.update(options)
+
+        super(process_mfcc, self).__init__(args)
+
+    def __call__(self, block):
+        fftgrain = self.pv(block)
+        return self.mfcc(fftgrain)
+    def repr_res(self, res, frames_read, a_source):
+        fmt_out = self.time2string(frames_read, a_source.samplerate)
+        fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()])
+        sys.stdout.write(fmt_out + '\n')
+
+class process_melbands(default_process):
+    def __init__(self, args):
+        self.args = args
+        valid_opts = ['hop_size', 'buf_size']
+        options = parse_options(args, valid_opts)
+        options = remap_pvoc_options(options)
+        self.pv = aubio.pvoc(**options)
+
+        valid_opts = ['buf_size', 'n_filters']
+        options = {k :v for k,v in vars(args).items() if k in valid_opts}
+        # FIXME
+        options['win_s'] = options['buf_size']
+        del options['buf_size']
+        self.filterbank = aubio.filterbank(**options)
+        self.filterbank.set_mel_coeffs_slaney(args.samplerate)
+
+        super(process_melbands, self).__init__(args)
+    def __call__(self, block):
+        fftgrain = self.pv(block)
+        return self.filterbank(fftgrain)
+    def repr_res(self, res, frames_read, a_source):
+        fmt_out = self.time2string(frames_read, a_source.samplerate)
+        fmt_out += ' '.join(["% 9.7f" % f for f in res.tolist()])
+        sys.stdout.write(fmt_out + '\n')
+
+if __name__ == '__main__':
+    parser = aubio_parser()
+    args = parser.parse_args()
+    if args.show_version or ('verbose' in args and args.verbose > 3):
+        sys.stdout.write('aubio version ' + aubio.version + '\n')
+    if args.show_version and args.command is None:
+        sys.exit(0)
+    if args.command is None:
+        sys.stderr.write("Error: a command is required\n")
+        parser.print_help()
+        sys.exit(1)
+    elif not args.source_uri and not args.source_uri2:
+        sys.stderr.write("Error: a source is required\n")
+        parser.print_help()
+        sys.exit(1)
+    elif args.source_uri2 is not None:
+        args.source_uri = args.source_uri2
+    try:
+        # open source_uri
+        with aubio.source(args.source_uri, hop_size=args.hop_size,
+                samplerate=args.samplerate) as a_source:
+            args.samplerate = a_source.samplerate
+            # create the processor for this subcommand
+            processor = args.process(args)
+            frames_read = 0
+            while True:
+                # read new block from source
+                block, read = a_source()
+                # execute processor on this block
+                res = processor(block)
+                # print results for this block
+                if args.verbose > 0:
+                    processor.repr_res(res, frames_read, a_source)
+                # increment total number of frames read
+                frames_read += read
+                # exit loop at end of file
+                if read < a_source.hop_size: break
+            # special case for notes
+            if args.command == 'notes':
+                eof = processor.time2string(frames_read, a_source.samplerate)
+                sys.stdout.write(eof + '\n')
+                sys.stdout.flush()
+            if args.verbose > 1:
+                fmt_string = "read {:.2f}s"
+                fmt_string += " ({:d} samples in {:d} blocks of {:d})"
+                fmt_string += " from {:s} at {:d}Hz\n"
+                sys.stderr.write(fmt_string.format(
+                        frames_read/float(a_source.samplerate),
+                        frames_read,
+                        frames_read // a_source.hop_size + 1,
+                        a_source.hop_size,
+                        a_source.uri,
+                        a_source.samplerate))
+    except KeyboardInterrupt as e:
+        sys.exit(1)
index 0248a1a..0c93bb9 100755 (executable)
--- a/setup.py
+++ b/setup.py
@@ -57,7 +57,7 @@ distrib = setup(name='aubio',
     version = __version__,
     packages = ['aubio'],
     package_dir = {'aubio':'python/lib/aubio'},
     version = __version__,
     packages = ['aubio'],
     package_dir = {'aubio':'python/lib/aubio'},
-    scripts = ['python/scripts/aubiocut'],
+    scripts = ['python/scripts/aubiocut', 'python/scripts/aubio'],
     ext_modules = [aubio_extension],
     description = 'a collection of tools for music analysis',
     long_description = 'a collection of tools for music analysis',
     ext_modules = [aubio_extension],
     description = 'a collection of tools for music analysis',
     long_description = 'a collection of tools for music analysis',