aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNicolas Dato <nicolas.dato@gmail.com>2025-07-05 19:53:02 -0300
committerNicolas Dato <nicolas.dato@gmail.com>2025-07-05 19:53:02 -0300
commit37feaa48d5b7c7e1773acf364d7e28bfa46bbd78 (patch)
treef105eb52670246987b932200afa6fd2d992e5d00
parent4c15b4e3459311f8d458e0e9c031a473efe5f0b8 (diff)
downloadlibtuberia-37feaa48d5b7c7e1773acf364d7e28bfa46bbd78.tar.gz
adding tube_inject_at() to inject elements at any stage, and changing tube_wait_empty() for tube_stop_and_wait_empty()
-rw-r--r--src/tuberia.c27
-rw-r--r--src/tuberia.h3
2 files changed, 24 insertions, 6 deletions
diff --git a/src/tuberia.c b/src/tuberia.c
index d9af3c8..46bff85 100644
--- a/src/tuberia.c
+++ b/src/tuberia.c
@@ -265,13 +265,20 @@ int tube_start(tube *ctx)
return 0;
}
-int tube_inject(tube *ctx, int timeout_ms, void *element)
+int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element)
{
- if (ctx == NULL || !ctx->nitcs) {
+ if (ctx == NULL || stage >= ctx->nitcs || ctx->itcs == NULL
+ || ctx->itcs[stage] == NULL) {
return -1;
}
- return itc_inject(ctx->itcs[0], timeout_ms, element);
+ return itc_inject(ctx->itcs[stage], timeout_ms, element);
+
+}
+
+int tube_inject(tube *ctx, int timeout_ms, void *element)
+{
+ return tube_inject_at(ctx, 0, timeout_ms, element);
}
void *tube_retrive(tube *ctx, int timeout_ms)
@@ -283,7 +290,7 @@ void *tube_retrive(tube *ctx, int timeout_ms)
return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms);
}
-void tube_wait_empty(tube *ctx)
+void tube_stop_and_wait_empty(tube *ctx)
{
int i;
@@ -292,8 +299,19 @@ void tube_wait_empty(tube *ctx)
}
for (i = 0; i < ctx->nitcs; i++) {
+ if (i == 0 && ctx->fetch->run) {
+ ctx->fetch->run = 0;
+ pthread_join(ctx->fetch_thread, NULL);
+ } else if (i > 0 && ctx->stages[i - 1].run) {
+ ctx->stages[i - 1].run = 0;
+ pthread_join(ctx->stages_thread[i - 1], NULL);
+ }
itc_wait_empty(ctx->itcs[i]);
}
+ if (ctx->sink->run) {
+ ctx->sink->run = 0;
+ pthread_join(ctx->sink_thread, NULL);
+ }
}
void tube_discard_all(tube *ctx)
@@ -334,7 +352,6 @@ int tube_stop(tube *ctx)
return 0;
}
-
void tube_free(tube **ctx)
{
int i;
diff --git a/src/tuberia.h b/src/tuberia.h
index 9b3c56c..f83ea6f 100644
--- a/src/tuberia.h
+++ b/src/tuberia.h
@@ -23,8 +23,9 @@ int tube_stop(tube *ctx);
void tube_free(tube **ctx);
int tube_inject(tube *ctx, int timeout_ms, void *element);
+int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element);
void *tube_retrive(tube *ctx, int timeout_ms);
-void tube_wait_empty(tube *ctx);
+void tube_stop_and_wait_empty(tube *ctx);
void tube_discard_all(tube *ctx);
int tube_get_queued(const tube *ctx, int stage);