diff options
Diffstat (limited to 'src/tuberia.c')
-rw-r--r-- | src/tuberia.c | 24 |
1 files changed, 21 insertions, 3 deletions
diff --git a/src/tuberia.c b/src/tuberia.c index 46bff85..0184289 100644 --- a/src/tuberia.c +++ b/src/tuberia.c @@ -11,6 +11,7 @@ struct stage_fetch { tube_fetch fetch; void *opaque; int run; + tube_free_element free_element; }; struct stage_sink { @@ -26,6 +27,7 @@ struct stage { tube_process process; void *opaque; int run; + tube_free_element free_element; }; struct tube_stage { @@ -161,9 +163,10 @@ tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque) ctx->stages = calloc(ctx->nstages, sizeof(struct stage)); for (i = 1, s = source->next; s != NULL; s = s->next, i++) { ctx->stages[i - 1].input = ctx->itcs[i - 1]; - ctx->stages[i - 1].output= ctx->itcs[i]; + ctx->stages[i - 1].output = ctx->itcs[i]; ctx->stages[i - 1].process = s->process; ctx->stages[i - 1].opaque = s->opaque; + ctx->stages[i - 1].free_element = s->free_element; } ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t)); @@ -171,6 +174,7 @@ tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque) ctx->fetch->output = ctx->itcs[0]; ctx->fetch->fetch = source->fetch; ctx->fetch->opaque = source->opaque; + ctx->fetch->free_element = source->free_element; ctx->sink = calloc(1, sizeof(struct stage_sink)); ctx->sink->input = ctx->itcs[ctx->nitcs - 1]; @@ -184,6 +188,7 @@ static void *tube_stage_thread(void *arg) { struct stage *stage = arg; void *element; + int ret; if (stage == NULL) { return NULL; @@ -194,7 +199,13 @@ static void *tube_stage_thread(void *arg) if (element) { element = stage->process(element, stage->opaque); if (element) { - while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run); + ret = -1; + while (ret && stage->run) { + ret = itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element); + } + if (ret) { + stage->free_element(element); + } } } } @@ -206,6 +217,7 @@ static void *tube_fetch_thread(void *arg) { struct stage_fetch *fetch = arg; void *element; + int ret; if (fetch == NULL || fetch->fetch == NULL) { return NULL; @@ -214,7 +226,13 @@ static void *tube_fetch_thread(void *arg) while (fetch->run) { element = fetch->fetch(fetch->opaque); if (element) { - while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run); + ret = -1; + while (ret && fetch->run) { + ret = itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element); + } + if (ret) { + fetch->free_element(element); + } } } |