diff options
Diffstat (limited to 'src/tuberia.c')
-rw-r--r-- | src/tuberia.c | 27 |
1 files changed, 22 insertions, 5 deletions
diff --git a/src/tuberia.c b/src/tuberia.c index d9af3c8..46bff85 100644 --- a/src/tuberia.c +++ b/src/tuberia.c @@ -265,13 +265,20 @@ int tube_start(tube *ctx) return 0; } -int tube_inject(tube *ctx, int timeout_ms, void *element) +int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element) { - if (ctx == NULL || !ctx->nitcs) { + if (ctx == NULL || stage >= ctx->nitcs || ctx->itcs == NULL + || ctx->itcs[stage] == NULL) { return -1; } - return itc_inject(ctx->itcs[0], timeout_ms, element); + return itc_inject(ctx->itcs[stage], timeout_ms, element); + +} + +int tube_inject(tube *ctx, int timeout_ms, void *element) +{ + return tube_inject_at(ctx, 0, timeout_ms, element); } void *tube_retrive(tube *ctx, int timeout_ms) @@ -283,7 +290,7 @@ void *tube_retrive(tube *ctx, int timeout_ms) return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms); } -void tube_wait_empty(tube *ctx) +void tube_stop_and_wait_empty(tube *ctx) { int i; @@ -292,8 +299,19 @@ void tube_wait_empty(tube *ctx) } for (i = 0; i < ctx->nitcs; i++) { + if (i == 0 && ctx->fetch->run) { + ctx->fetch->run = 0; + pthread_join(ctx->fetch_thread, NULL); + } else if (i > 0 && ctx->stages[i - 1].run) { + ctx->stages[i - 1].run = 0; + pthread_join(ctx->stages_thread[i - 1], NULL); + } itc_wait_empty(ctx->itcs[i]); } + if (ctx->sink->run) { + ctx->sink->run = 0; + pthread_join(ctx->sink_thread, NULL); + } } void tube_discard_all(tube *ctx) @@ -334,7 +352,6 @@ int tube_stop(tube *ctx) return 0; } - void tube_free(tube **ctx) { int i; |