diff options
Diffstat (limited to 'src/tuberia.c')
-rw-r--r-- | src/tuberia.c | 309 |
1 files changed, 296 insertions, 13 deletions
diff --git a/src/tuberia.c b/src/tuberia.c index 3dc5c10..7e3a013 100644 --- a/src/tuberia.c +++ b/src/tuberia.c @@ -1,43 +1,107 @@ #include "tuberia.h" #include "itc.h" +#include <pthread.h> #include <stdlib.h> +#define ITC_DEFAULT_TIMEOUT_MS 250 + +struct stage_fetch { + struct itc *output; + tube_fetch fetch; + void *opaque; + int run; +}; + +struct stage_sink { + struct itc *input; + tube_sink sink; + void *opaque; + int run; +}; + +struct stage { + struct itc *input; + struct itc *output; + tube_process process; + void *opaque; + int run; +}; + +struct tube_stage { + struct tube_stage *next; + int nslots; + tube_process process; + void *opaque; + tube_free_element free_element; +}; + +struct tube_source { + struct tube_stage *next; + int nslots; + tube_fetch fetch; + void *opaque; + tube_free_element free_element; +}; + struct tube { - struct itc *itc; + int nitcs; + itc **itcs; + tube_free_element *free_element; + int nstages; + struct stage *stages; + pthread_t *stages_thread; + struct stage_fetch *fetch; + pthread_t fetch_thread; + struct stage_sink *sink; + pthread_t sink_thread; }; -struct tube_source *tube_source_alloc(int nqueue, tube_fetch *fetch, void *opaque) +tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque, + tube_free_element free_element) { - struct tube_source *source = calloc(1, sizeof(struct tube_source)); + struct tube_source *source; + + if (nslots <= 0) { + return NULL; + } - source->nqueue = nqueue; + source = calloc(1, sizeof(struct tube_source)); + source->nslots = nslots; source->fetch = fetch; source->opaque = opaque; + source->free_element = free_element; return source; } -struct tube_stage *tube_stage_alloc(int nqueue, tube_process *process, void *opaque) +tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque, + tube_free_element free_element) { - struct tube_stage *stage = calloc(1, sizeof(struct tube_stage)); + struct tube_stage *stage; + + if (process == NULL || nslots <= 0) { + return NULL; + } - stage->nqueue = nqueue; + stage = calloc(1, sizeof(struct tube_stage)); + stage->nslots = nslots; stage->process = process; stage->opaque = opaque; + stage->free_element = free_element; return stage; } -void tube_stage_free(struct tube_stage **stage) +void tube_stage_free(tube_stage **stage) { if (stage != NULL) { - free(stage); + free(*stage); *stage = NULL; } } -void tube_source_and_stages_free(struct tube_source **source) +void tube_source_and_stages_free(tube_source **source) { struct tube_stage *stage; struct tube_stage *next; @@ -54,11 +118,11 @@ void tube_source_and_stages_free(struct tube_source **source) } } -int tube_stage_append(struct tube_source *source, struct tube_stage *stage) +int tube_stage_append(tube_source *source, tube_stage *stage) { struct tube_stage **next; - if (source != NULL || stage != NULL) { + if (source == NULL || stage == NULL) { return -1; } next = &(source->next); @@ -70,7 +134,226 @@ int tube_stage_append(struct tube_source *source, struct tube_stage *stage) return 0; } -tube *tube_create(struct tube_source *source, tube_sink *sink, void *opaque) +tube *tube_create(tube_source *source, tube_sink sink, void *opaque) +{ + tube *ctx; + struct tube_stage *s; + int i; + + if (source == NULL) { + return NULL; + } + ctx = calloc(1, sizeof(*ctx)); + + for (ctx->nitcs = 1, s = source->next; s != NULL; ctx->nitcs++) { + s = s->next; + } + ctx->itcs = calloc(ctx->nitcs, sizeof(itc *)); + ctx->free_element = calloc(ctx->nitcs, sizeof(tube_free_element)); + ctx->itcs[0] = itc_alloc(source->nslots); + ctx->free_element[0] = source->free_element; + for (i = 1, s = source->next; s != NULL; s = s->next, i++) { + ctx->itcs[i] = itc_alloc(s->nslots); + ctx->free_element[i] = s->free_element; + } + + ctx->nstages = ctx->nitcs - 1; + ctx->stages = calloc(ctx->nstages, sizeof(struct stage)); + for (i = 1, s = source->next; s != NULL; s = s->next, i++) { + ctx->stages[i - 1].input = ctx->itcs[i - 1]; + ctx->stages[i - 1].output= ctx->itcs[i]; + ctx->stages[i - 1].process = s->process; + ctx->stages[i - 1].opaque = s->opaque; + } + ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t)); + + ctx->fetch = calloc(1, sizeof(struct stage_fetch)); + ctx->fetch->output = ctx->itcs[0]; + ctx->fetch->fetch = source->fetch; + ctx->fetch->opaque = source->opaque; + + ctx->sink = calloc(1, sizeof(struct stage_sink)); + ctx->sink->input = ctx->itcs[ctx->nitcs - 1]; + ctx->sink->sink = sink; + ctx->sink->opaque = opaque; + + return ctx; +} + +static void *tube_stage_thread(void *arg) +{ + struct stage *stage = arg; + void *element; + + if (stage == NULL) { + return NULL; + } + + while (stage->run) { + element = itc_retrive(stage->input, ITC_DEFAULT_TIMEOUT_MS); + if (element) { + element = stage->process(element, stage->opaque); + if (element) { + while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run); + } + } + } + + return NULL; +} + +static void *tube_fetch_thread(void *arg) +{ + struct stage_fetch *fetch = arg; + void *element; + + if (fetch == NULL || fetch->fetch == NULL) { + return NULL; + } + + while (fetch->run) { + element = fetch->fetch(fetch->opaque); + if (element) { + while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run); + } + } + + return NULL; +} + +static void *tube_sink_thread(void *arg) +{ + struct stage_sink *sink = arg; + void *element; + + if (sink == NULL || sink->sink == NULL) { + return NULL; + } + + while (sink->run) { + element = itc_retrive(sink->input, ITC_DEFAULT_TIMEOUT_MS); + if (element) { + sink->sink(element, sink->opaque); + } + } + + return NULL; +} + +int tube_start(tube *ctx) +{ + int i = 0; + if (ctx == NULL) { + return -1; + } + + for (i = 0; i < ctx->nstages; i++) { + if (!ctx->stages[i].run) { + ctx->stages[i].run = 1; + pthread_create(&ctx->stages_thread[0], NULL, tube_stage_thread, &ctx->stages[i]); + } + } + if (!ctx->fetch->run) { + ctx->fetch->run = 1; + pthread_create(&ctx->fetch_thread, NULL, tube_fetch_thread, ctx->fetch); + } + if (!ctx->sink->run) { + ctx->sink->run = 1; + pthread_create(&ctx->sink_thread, NULL, tube_sink_thread, ctx->sink); + } + + return 0; +} + +int tube_inject(tube *ctx, int timeout_ms, void *element) { + if (ctx == NULL || !ctx->nitcs) { + return -1; + } + + return itc_inject(ctx->itcs[0], timeout_ms, element); +} + +void *tube_retrive(tube *ctx, int timeout_ms) +{ + if (ctx == NULL || !ctx->nitcs) { + return NULL; + } + + return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms); +} + +void tube_flush(tube *ctx) +{ + int i; + + if (ctx == NULL) { + return; + } + + for (i = 0; i < ctx->nitcs; i++) { + itc_flush(ctx->itcs[i]); + } +} + +void tube_drop(tube *ctx) +{ + int i; + + if (ctx == NULL) { + return; + } + + for (i = 0; i < ctx->nitcs; i++) { + itc_drop(ctx->itcs[i], ctx->free_element[i]); + } +} + +int tube_stop(tube *ctx) +{ + int i = 0; + if (ctx == NULL) { + return -1; + } + + for (i = 0; i < ctx->nstages; i++) { + if (ctx->stages[i].run) { + ctx->stages[i].run = 0; + pthread_join(ctx->stages_thread[0], NULL); + } + } + if (ctx->fetch->run) { + ctx->fetch->run = 0; + pthread_join(ctx->fetch_thread, NULL); + } + if (ctx->sink->run) { + ctx->sink->run = 0; + pthread_join(ctx->sink_thread, NULL); + } + + return 0; +} + + +void tube_free(tube **ctx) +{ + int i; + + if (ctx == NULL || *ctx == NULL) { + return; + } + + tube_stop((*ctx)); + + for (i = 0; i < (*ctx)->nitcs; i++) { + itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]); + } + free((*ctx)->itcs); + free((*ctx)->stages); + free((*ctx)->fetch); + free((*ctx)->sink); + + free(*ctx); + *ctx = NULL; } |