aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/tuberia.c24
1 files changed, 21 insertions, 3 deletions
diff --git a/src/tuberia.c b/src/tuberia.c
index 46bff85..0184289 100644
--- a/src/tuberia.c
+++ b/src/tuberia.c
@@ -11,6 +11,7 @@ struct stage_fetch {
tube_fetch fetch;
void *opaque;
int run;
+ tube_free_element free_element;
};
struct stage_sink {
@@ -26,6 +27,7 @@ struct stage {
tube_process process;
void *opaque;
int run;
+ tube_free_element free_element;
};
struct tube_stage {
@@ -161,9 +163,10 @@ tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque)
ctx->stages = calloc(ctx->nstages, sizeof(struct stage));
for (i = 1, s = source->next; s != NULL; s = s->next, i++) {
ctx->stages[i - 1].input = ctx->itcs[i - 1];
- ctx->stages[i - 1].output= ctx->itcs[i];
+ ctx->stages[i - 1].output = ctx->itcs[i];
ctx->stages[i - 1].process = s->process;
ctx->stages[i - 1].opaque = s->opaque;
+ ctx->stages[i - 1].free_element = s->free_element;
}
ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t));
@@ -171,6 +174,7 @@ tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque)
ctx->fetch->output = ctx->itcs[0];
ctx->fetch->fetch = source->fetch;
ctx->fetch->opaque = source->opaque;
+ ctx->fetch->free_element = source->free_element;
ctx->sink = calloc(1, sizeof(struct stage_sink));
ctx->sink->input = ctx->itcs[ctx->nitcs - 1];
@@ -184,6 +188,7 @@ static void *tube_stage_thread(void *arg)
{
struct stage *stage = arg;
void *element;
+ int ret;
if (stage == NULL) {
return NULL;
@@ -194,7 +199,13 @@ static void *tube_stage_thread(void *arg)
if (element) {
element = stage->process(element, stage->opaque);
if (element) {
- while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run);
+ ret = -1;
+ while (ret && stage->run) {
+ ret = itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element);
+ }
+ if (ret) {
+ stage->free_element(element);
+ }
}
}
}
@@ -206,6 +217,7 @@ static void *tube_fetch_thread(void *arg)
{
struct stage_fetch *fetch = arg;
void *element;
+ int ret;
if (fetch == NULL || fetch->fetch == NULL) {
return NULL;
@@ -214,7 +226,13 @@ static void *tube_fetch_thread(void *arg)
while (fetch->run) {
element = fetch->fetch(fetch->opaque);
if (element) {
- while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run);
+ ret = -1;
+ while (ret && fetch->run) {
+ ret = itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element);
+ }
+ if (ret) {
+ fetch->free_element(element);
+ }
}
}