1 """utility routines to slice sound files at given timestamps"""
4 from aubio import source, sink
8 def slice_source_at_stamps(source_file, timestamps, timestamps_end=None,
9 output_dir=None, samplerate=0, hopsize=256):
10 """ slice a sound file at given timestamps """
12 if timestamps is None or len(timestamps) == 0:
13 raise ValueError("no timestamps given")
15 if timestamps[0] != 0:
16 timestamps = [0] + timestamps
17 if timestamps_end != None:
18 timestamps_end = [timestamps[1] - 1] + timestamps_end
20 if timestamps_end != None:
21 if len(timestamps_end) != len(timestamps):
22 raise ValueError("len(timestamps_end) != len(timestamps)")
24 timestamps_end = [t - 1 for t in timestamps[1:]] + [_max_timestamp]
26 regions = list(zip(timestamps, timestamps_end))
29 source_base_name, _ = os.path.splitext(os.path.basename(source_file))
30 if output_dir != None:
31 if not os.path.isdir(output_dir):
32 os.makedirs(output_dir)
33 source_base_name = os.path.join(output_dir, source_base_name)
35 def new_sink_name(source_base_name, timestamp, samplerate):
36 """ create a sink based on a timestamp in samples, converted in seconds """
37 timestamp_seconds = timestamp / float(samplerate)
38 return source_base_name + "_%011.6f" % timestamp_seconds + '.wav'
41 _source = source(source_file, samplerate, hopsize)
42 samplerate = _source.samplerate
48 # get hopsize new samples from source
49 vec, read = _source.do_multi()
50 # if the total number of frames read will exceed the next region start
51 if len(regions) and total_frames + read >= regions[0][0]:
52 #print "getting", regions[0], "at", total_frames
54 start_stamp, end_stamp = regions.pop(0)
55 # create a name for the sink
56 new_sink_path = new_sink_name(source_base_name, start_stamp, samplerate)
58 _sink = sink(new_sink_path, samplerate, _source.channels)
59 # create a dictionary containing all this
60 new_slice = {'start_stamp': start_stamp, 'end_stamp': end_stamp, 'sink': _sink}
61 # append the dictionary to the current list of slices
62 slices.append(new_slice)
64 for current_slice in slices:
65 start_stamp = current_slice['start_stamp']
66 end_stamp = current_slice['end_stamp']
67 _sink = current_slice['sink']
68 # sample index to start writing from new source vector
69 start = max(start_stamp - total_frames, 0)
70 # number of samples yet to written be until end of region
71 remaining = end_stamp - total_frames + 1
72 #print current_slice, remaining, start
73 # not enough frames remaining, time to split
76 # write remaining samples from current region
77 _sink.do_multi(vec[:, start:remaining], remaining - start)
78 #print "closing region", "remaining", remaining
82 # write all the samples
83 _sink.do_multi(vec[:, start:read], read - start)