aboutsummaryrefslogtreecommitdiff
path: root/src/tuberia.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/tuberia.c')
-rw-r--r--src/tuberia.c34
1 files changed, 26 insertions, 8 deletions
diff --git a/src/tuberia.c b/src/tuberia.c
index 7e3a013..e6cd398 100644
--- a/src/tuberia.c
+++ b/src/tuberia.c
@@ -134,7 +134,7 @@ int tube_stage_append(tube_source *source, tube_stage *stage)
return 0;
}
-tube *tube_create(tube_source *source, tube_sink sink, void *opaque)
+tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque)
{
tube *ctx;
struct tube_stage *s;
@@ -250,7 +250,7 @@ int tube_start(tube *ctx)
for (i = 0; i < ctx->nstages; i++) {
if (!ctx->stages[i].run) {
ctx->stages[i].run = 1;
- pthread_create(&ctx->stages_thread[0], NULL, tube_stage_thread, &ctx->stages[i]);
+ pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]);
}
}
if (!ctx->fetch->run) {
@@ -283,7 +283,7 @@ void *tube_retrive(tube *ctx, int timeout_ms)
return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms);
}
-void tube_flush(tube *ctx)
+void tube_wait_empty(tube *ctx)
{
int i;
@@ -292,11 +292,11 @@ void tube_flush(tube *ctx)
}
for (i = 0; i < ctx->nitcs; i++) {
- itc_flush(ctx->itcs[i]);
+ itc_wait_empty(ctx->itcs[i]);
}
}
-void tube_drop(tube *ctx)
+void tube_discard_all(tube *ctx)
{
int i;
@@ -305,7 +305,7 @@ void tube_drop(tube *ctx)
}
for (i = 0; i < ctx->nitcs; i++) {
- itc_drop(ctx->itcs[i], ctx->free_element[i]);
+ itc_discard_all(ctx->itcs[i], ctx->free_element[i]);
}
}
@@ -319,7 +319,7 @@ int tube_stop(tube *ctx)
for (i = 0; i < ctx->nstages; i++) {
if (ctx->stages[i].run) {
ctx->stages[i].run = 0;
- pthread_join(ctx->stages_thread[0], NULL);
+ pthread_join(ctx->stages_thread[i], NULL);
}
}
if (ctx->fetch->run) {
@@ -343,13 +343,15 @@ void tube_free(tube **ctx)
return;
}
- tube_stop((*ctx));
+ tube_stop(*ctx);
for (i = 0; i < (*ctx)->nitcs; i++) {
itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]);
}
free((*ctx)->itcs);
+ free((*ctx)->free_element);
free((*ctx)->stages);
+ free((*ctx)->stages_thread);
free((*ctx)->fetch);
free((*ctx)->sink);
@@ -357,3 +359,19 @@ void tube_free(tube **ctx)
*ctx = NULL;
}
+int tube_get_queued(const tube *ctx, int stage)
+{
+ if (ctx == NULL || stage < 0 || stage >= ctx->nstages) {
+ return -1;
+ }
+ return itc_get_queued(ctx->itcs[stage]);
+}
+
+int tube_get_slots(const tube *ctx, int stage)
+{
+ if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) {
+ return -1;
+ }
+ return itc_get_slots(ctx->itcs[stage]);
+}
+