diff options
author | Nicolas Dato <nicolas.dato@gmail.com> | 2025-07-05 19:53:02 -0300 |
---|---|---|
committer | Nicolas Dato <nicolas.dato@gmail.com> | 2025-07-05 19:53:02 -0300 |
commit | 37feaa48d5b7c7e1773acf364d7e28bfa46bbd78 (patch) | |
tree | f105eb52670246987b932200afa6fd2d992e5d00 | |
parent | 4c15b4e3459311f8d458e0e9c031a473efe5f0b8 (diff) | |
download | libtuberia-37feaa48d5b7c7e1773acf364d7e28bfa46bbd78.tar.gz |
adding tube_inject_at() to inject elements at any stage, and changing tube_wait_empty() for tube_stop_and_wait_empty()
-rw-r--r-- | src/tuberia.c | 27 | ||||
-rw-r--r-- | src/tuberia.h | 3 |
2 files changed, 24 insertions, 6 deletions
diff --git a/src/tuberia.c b/src/tuberia.c index d9af3c8..46bff85 100644 --- a/src/tuberia.c +++ b/src/tuberia.c @@ -265,13 +265,20 @@ int tube_start(tube *ctx) return 0; } -int tube_inject(tube *ctx, int timeout_ms, void *element) +int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element) { - if (ctx == NULL || !ctx->nitcs) { + if (ctx == NULL || stage >= ctx->nitcs || ctx->itcs == NULL + || ctx->itcs[stage] == NULL) { return -1; } - return itc_inject(ctx->itcs[0], timeout_ms, element); + return itc_inject(ctx->itcs[stage], timeout_ms, element); + +} + +int tube_inject(tube *ctx, int timeout_ms, void *element) +{ + return tube_inject_at(ctx, 0, timeout_ms, element); } void *tube_retrive(tube *ctx, int timeout_ms) @@ -283,7 +290,7 @@ void *tube_retrive(tube *ctx, int timeout_ms) return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms); } -void tube_wait_empty(tube *ctx) +void tube_stop_and_wait_empty(tube *ctx) { int i; @@ -292,8 +299,19 @@ void tube_wait_empty(tube *ctx) } for (i = 0; i < ctx->nitcs; i++) { + if (i == 0 && ctx->fetch->run) { + ctx->fetch->run = 0; + pthread_join(ctx->fetch_thread, NULL); + } else if (i > 0 && ctx->stages[i - 1].run) { + ctx->stages[i - 1].run = 0; + pthread_join(ctx->stages_thread[i - 1], NULL); + } itc_wait_empty(ctx->itcs[i]); } + if (ctx->sink->run) { + ctx->sink->run = 0; + pthread_join(ctx->sink_thread, NULL); + } } void tube_discard_all(tube *ctx) @@ -334,7 +352,6 @@ int tube_stop(tube *ctx) return 0; } - void tube_free(tube **ctx) { int i; diff --git a/src/tuberia.h b/src/tuberia.h index 9b3c56c..f83ea6f 100644 --- a/src/tuberia.h +++ b/src/tuberia.h @@ -23,8 +23,9 @@ int tube_stop(tube *ctx); void tube_free(tube **ctx); int tube_inject(tube *ctx, int timeout_ms, void *element); +int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element); void *tube_retrive(tube *ctx, int timeout_ms); -void tube_wait_empty(tube *ctx); +void tube_stop_and_wait_empty(tube *ctx); void tube_discard_all(tube *ctx); int tube_get_queued(const tube *ctx, int stage); |