From 885679133f98b041dd358a7eb92550acaef5f325 Mon Sep 17 00:00:00 2001 From: Paul Brossier Date: Fri, 30 Sep 2016 01:54:50 +0200 Subject: [PATCH] src/effects/timestretch_rubberband.c: add initial pthread support --- src/effects/timestretch_rubberband.c | 138 ++++++++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/src/effects/timestretch_rubberband.c b/src/effects/timestretch_rubberband.c index d14e7269..de61bf42 100644 --- a/src/effects/timestretch_rubberband.c +++ b/src/effects/timestretch_rubberband.c @@ -33,6 +33,15 @@ #define MIN_STRETCH_RATIO 0.025 #define MAX_STRETCH_RATIO 40. +#define HAVE_THREADS 1 +#if 0 +#undef HAVE_THREADS +#endif + +#ifdef HAVE_THREADS +#include +#endif + /** generic time stretching structure */ struct _aubio_timestretch_t { @@ -48,12 +57,23 @@ struct _aubio_timestretch_t RubberBandState rb; RubberBandOptions rboptions; + +#ifdef HAVE_THREADS + pthread_t read_thread; + pthread_mutex_t read_mutex; + pthread_cond_t read_avail; + pthread_cond_t read_request; + sint_t available; +#endif }; extern RubberBandOptions aubio_get_rubberband_opts(const char_t *mode); static void aubio_timestretch_warmup (aubio_timestretch_t * p); static sint_t aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t fetch); +#ifdef HAVE_THREADS +static void *aubio_timestretch_readfn(void *p); +#endif aubio_timestretch_t * new_aubio_timestretch (const char_t * uri, const char_t * mode, @@ -62,6 +82,7 @@ new_aubio_timestretch (const char_t * uri, const char_t * mode, aubio_timestretch_t *p = AUBIO_NEW (aubio_timestretch_t); p->samplerate = samplerate; p->hopsize = hopsize; + //p->source_hopsize = 2048; p->source_hopsize = hopsize; p->pitchscale = 1.; p->eof = 0; @@ -90,7 +111,19 @@ new_aubio_timestretch (const char_t * uri, const char_t * mode, rubberband_set_max_process_size(p->rb, p->source_hopsize); //rubberband_set_debug_level(p->rb, 10); +#ifdef HAVE_THREADS + pthread_mutex_init(&p->read_mutex, 0); + pthread_cond_init (&p->read_avail, 0); + pthread_cond_init (&p->read_request, 0); + pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p); + //AUBIO_DBG("timestretch: new_ waiting for warmup, got %d available\n", p->available); + pthread_mutex_lock(&p->read_mutex); + pthread_cond_wait(&p->read_avail, &p->read_mutex); + pthread_mutex_unlock(&p->read_mutex); + //AUBIO_DBG("timestretch: new_ warm up success, got %d available\n", p->available); +#else aubio_timestretch_warmup(p); +#endif return p; @@ -99,17 +132,84 @@ beach: return NULL; } +#ifdef HAVE_THREADS +void * +aubio_timestretch_readfn(void *z) +{ + aubio_timestretch_t *p = z; + // signal main-thread when we are done + //AUBIO_WRN("timestretch: read_thread locking, got %d available\n", p->available); + pthread_mutex_lock(&p->read_mutex); + aubio_timestretch_warmup(p); + //AUBIO_WRN("timestretch: signaling warmup\n"); + pthread_cond_signal(&p->read_avail); + //AUBIO_WRN("timestretch: unlocking in readfn\n"); + pthread_mutex_unlock(&p->read_mutex); + AUBIO_WRN("timestretch: entering readfn loop\n"); + while(1) { //p->available < (int)p->hopsize && p->eof != 1) { + //AUBIO_WRN("timestretch: locking in readfn\n"); + pthread_mutex_lock(&p->read_mutex); + p->available = aubio_timestretch_fetch(p, p->hopsize); + //AUBIO_WRN("timestretch: read_thread read %d\n", p->available); + // signal main-thread when we are done + //AUBIO_WRN("timestretch: signaling new read\n"); + pthread_cond_signal(&p->read_avail); + if (p->eof != 1) { + pthread_cond_wait(&p->read_request, &p->read_mutex); + } + if (p->eof == 1) { + AUBIO_WRN("timestretch: read_thread eof reached %d, %d/%d\n", p->available, + p->hopsize, p->source_hopsize); + pthread_mutex_unlock(&p->read_mutex); + break; + } + //AUBIO_WRN("timestretch: unlocking in readfn\n"); + pthread_mutex_unlock(&p->read_mutex); + } +#if 1 + pthread_mutex_lock(&p->read_mutex); + //AUBIO_WRN("timestretch: signaling end\n"); + pthread_cond_signal(&p->read_avail); + pthread_mutex_unlock(&p->read_mutex); +#endif + //AUBIO_WRN("timestretch: exiting readfn\n"); + pthread_exit(NULL); +} +#endif + static void aubio_timestretch_warmup (aubio_timestretch_t * p) { // warm up rubber band unsigned int latency = MAX(p->hopsize, rubberband_get_latency(p->rb)); +#ifdef HAVE_THREADS + p->available = aubio_timestretch_fetch(p, latency); +#else aubio_timestretch_fetch(p, latency); +#endif } void del_aubio_timestretch (aubio_timestretch_t * p) { +#ifdef HAVE_THREADS + pthread_mutex_lock(&p->read_mutex); + pthread_cond_signal(&p->read_request); + //pthread_cond_wait(&p->read_avail, &p->read_mutex); + pthread_mutex_unlock(&p->read_mutex); +#if 1 + void *threadfn; + if ((p->eof == 0) && (pthread_cancel(p->read_thread))) { + AUBIO_WRN("timestretch: cancelling thread failed\n"); + } + if (pthread_join(p->read_thread, &threadfn)) { + AUBIO_WRN("timestretch: joining thread failed\n"); + } +#endif + pthread_mutex_destroy(&p->read_mutex); + pthread_cond_destroy(&p->read_avail); + pthread_cond_destroy(&p->read_request); +#endif if (p->in) del_fvec(p->in); if (p->source) del_aubio_source(p->source); if (p->rb) { @@ -204,7 +304,20 @@ aubio_timestretch_fetch(aubio_timestretch_t *p, uint_t length) void aubio_timestretch_do (aubio_timestretch_t * p, fvec_t * out, uint_t * read) { +#ifndef HAVE_THREADS int available = aubio_timestretch_fetch(p, p->hopsize); +#else /* HAVE_THREADS */ + int available; + pthread_mutex_lock(&p->read_mutex); + if (p->eof != 1) { + // signal a read request + pthread_cond_signal(&p->read_request); + // wait for an available signal + pthread_cond_wait(&p->read_avail, &p->read_mutex); + } else { + available = rubberband_available(p->rb); + } +#endif /* HAVE_THREADS */ // now retrieve the samples and write them into out->data if (available >= (int)p->hopsize) { rubberband_retrieve(p->rb, (float* const*)&(out->data), p->hopsize); @@ -216,14 +329,37 @@ aubio_timestretch_do (aubio_timestretch_t * p, fvec_t * out, uint_t * read) fvec_zeros(out); *read = 0; } +#ifdef HAVE_THREADS + pthread_mutex_unlock(&p->read_mutex); +#endif } uint_t aubio_timestretch_seek (aubio_timestretch_t *p, uint_t pos) { + uint_t err = AUBIO_OK; +#if HAVE_THREADS + AUBIO_WRN("timestretch: seek_ waiting for warmup, got %d available\n", p->available); + pthread_mutex_lock(&p->read_mutex); +#endif p->eof = 0; rubberband_reset(p->rb); - return aubio_source_seek(p->source, pos); + err = aubio_source_seek(p->source, pos); +#if HAVE_THREADS + p->available = 0; + void *threadfn; + if ((p->eof == 0) && (pthread_cancel(p->read_thread) == 0)) { + AUBIO_WRN("timestretch: cancelling thread failed\n"); + } + if (pthread_join(p->read_thread, &threadfn)) { + AUBIO_WRN("timestretch: joining thread failed\n"); + } + pthread_create(&p->read_thread, 0, aubio_timestretch_readfn, p); + pthread_cond_wait(&p->read_avail, &p->read_mutex); + pthread_mutex_unlock(&p->read_mutex); + //AUBIO_WRN("timestretch: seek_ warm up success, got %d available\n", p->available); +#endif + return err; } #endif -- 2.11.0