diff options
Diffstat (limited to 'src/tuberia.c')
-rw-r--r-- | src/tuberia.c | 34 |
1 files changed, 26 insertions, 8 deletions
diff --git a/src/tuberia.c b/src/tuberia.c index 7e3a013..e6cd398 100644 --- a/src/tuberia.c +++ b/src/tuberia.c @@ -134,7 +134,7 @@ int tube_stage_append(tube_source *source, tube_stage *stage) return 0; } -tube *tube_create(tube_source *source, tube_sink sink, void *opaque) +tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque) { tube *ctx; struct tube_stage *s; @@ -250,7 +250,7 @@ int tube_start(tube *ctx) for (i = 0; i < ctx->nstages; i++) { if (!ctx->stages[i].run) { ctx->stages[i].run = 1; - pthread_create(&ctx->stages_thread[0], NULL, tube_stage_thread, &ctx->stages[i]); + pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]); } } if (!ctx->fetch->run) { @@ -283,7 +283,7 @@ void *tube_retrive(tube *ctx, int timeout_ms) return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms); } -void tube_flush(tube *ctx) +void tube_wait_empty(tube *ctx) { int i; @@ -292,11 +292,11 @@ void tube_flush(tube *ctx) } for (i = 0; i < ctx->nitcs; i++) { - itc_flush(ctx->itcs[i]); + itc_wait_empty(ctx->itcs[i]); } } -void tube_drop(tube *ctx) +void tube_discard_all(tube *ctx) { int i; @@ -305,7 +305,7 @@ void tube_drop(tube *ctx) } for (i = 0; i < ctx->nitcs; i++) { - itc_drop(ctx->itcs[i], ctx->free_element[i]); + itc_discard_all(ctx->itcs[i], ctx->free_element[i]); } } @@ -319,7 +319,7 @@ int tube_stop(tube *ctx) for (i = 0; i < ctx->nstages; i++) { if (ctx->stages[i].run) { ctx->stages[i].run = 0; - pthread_join(ctx->stages_thread[0], NULL); + pthread_join(ctx->stages_thread[i], NULL); } } if (ctx->fetch->run) { @@ -343,13 +343,15 @@ void tube_free(tube **ctx) return; } - tube_stop((*ctx)); + tube_stop(*ctx); for (i = 0; i < (*ctx)->nitcs; i++) { itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]); } free((*ctx)->itcs); + free((*ctx)->free_element); free((*ctx)->stages); + free((*ctx)->stages_thread); free((*ctx)->fetch); free((*ctx)->sink); @@ -357,3 +359,19 @@ void tube_free(tube **ctx) *ctx = NULL; } +int tube_get_queued(const tube *ctx, int stage) +{ + if (ctx == NULL || stage < 0 || stage >= ctx->nstages) { + return -1; + } + return itc_get_queued(ctx->itcs[stage]); +} + +int tube_get_slots(const tube *ctx, int stage) +{ + if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) { + return -1; + } + return itc_get_slots(ctx->itcs[stage]); +} + |