aboutsummaryrefslogtreecommitdiff
path: root/src/tuberia.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/tuberia.c')
-rw-r--r--src/tuberia.c27
1 files changed, 22 insertions, 5 deletions
diff --git a/src/tuberia.c b/src/tuberia.c
index d9af3c8..46bff85 100644
--- a/src/tuberia.c
+++ b/src/tuberia.c
@@ -265,13 +265,20 @@ int tube_start(tube *ctx)
return 0;
}
-int tube_inject(tube *ctx, int timeout_ms, void *element)
+int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element)
{
- if (ctx == NULL || !ctx->nitcs) {
+ if (ctx == NULL || stage >= ctx->nitcs || ctx->itcs == NULL
+ || ctx->itcs[stage] == NULL) {
return -1;
}
- return itc_inject(ctx->itcs[0], timeout_ms, element);
+ return itc_inject(ctx->itcs[stage], timeout_ms, element);
+
+}
+
+int tube_inject(tube *ctx, int timeout_ms, void *element)
+{
+ return tube_inject_at(ctx, 0, timeout_ms, element);
}
void *tube_retrive(tube *ctx, int timeout_ms)
@@ -283,7 +290,7 @@ void *tube_retrive(tube *ctx, int timeout_ms)
return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms);
}
-void tube_wait_empty(tube *ctx)
+void tube_stop_and_wait_empty(tube *ctx)
{
int i;
@@ -292,8 +299,19 @@ void tube_wait_empty(tube *ctx)
}
for (i = 0; i < ctx->nitcs; i++) {
+ if (i == 0 && ctx->fetch->run) {
+ ctx->fetch->run = 0;
+ pthread_join(ctx->fetch_thread, NULL);
+ } else if (i > 0 && ctx->stages[i - 1].run) {
+ ctx->stages[i - 1].run = 0;
+ pthread_join(ctx->stages_thread[i - 1], NULL);
+ }
itc_wait_empty(ctx->itcs[i]);
}
+ if (ctx->sink->run) {
+ ctx->sink->run = 0;
+ pthread_join(ctx->sink_thread, NULL);
+ }
}
void tube_discard_all(tube *ctx)
@@ -334,7 +352,6 @@ int tube_stop(tube *ctx)
return 0;
}
-
void tube_free(tube **ctx)
{
int i;