#include "tuberia.h" #include "itc.h" #include #include #define ITC_DEFAULT_TIMEOUT_MS 250 struct stage_fetch { struct itc *output; tube_fetch fetch; void *opaque; int run; }; struct stage_sink { struct itc *input; tube_sink sink; void *opaque; int run; }; struct stage { struct itc *input; struct itc *output; tube_process process; void *opaque; int run; }; struct tube_stage { struct tube_stage *next; int nslots; tube_process process; void *opaque; tube_free_element free_element; }; struct tube_source { struct tube_stage *next; int nslots; tube_fetch fetch; void *opaque; tube_free_element free_element; }; struct tube { int nitcs; itc **itcs; tube_free_element *free_element; int nstages; struct stage *stages; pthread_t *stages_thread; struct stage_fetch *fetch; pthread_t fetch_thread; struct stage_sink *sink; pthread_t sink_thread; }; tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque, tube_free_element free_element) { struct tube_source *source; if (nslots <= 0) { return NULL; } source = calloc(1, sizeof(struct tube_source)); source->nslots = nslots; source->fetch = fetch; source->opaque = opaque; source->free_element = free_element; return source; } tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque, tube_free_element free_element) { struct tube_stage *stage; if (process == NULL || nslots <= 0) { return NULL; } stage = calloc(1, sizeof(struct tube_stage)); stage->nslots = nslots; stage->process = process; stage->opaque = opaque; stage->free_element = free_element; return stage; } void tube_stage_free(tube_stage **stage) { if (stage != NULL) { free(*stage); *stage = NULL; } } void tube_source_and_stages_free(tube_source **source) { struct tube_stage *stage; struct tube_stage *next; if (source != NULL && *source != NULL) { stage = (*source)->next; while (stage != NULL) { next = stage->next; free(stage); stage = next; } free(*source); *source = NULL; } } int tube_stage_append(tube_source *source, tube_stage *stage) { struct tube_stage **next; if (source == NULL || stage == NULL) { return -1; } next = &(source->next); while (*next != NULL) { next = &((*next)->next); } *next = stage; return 0; } tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque) { tube *ctx; struct tube_stage *s; int i; if (source == NULL) { return NULL; } ctx = calloc(1, sizeof(*ctx)); for (ctx->nitcs = 1, s = source->next; s != NULL; ctx->nitcs++) { s = s->next; } ctx->itcs = (struct itc **)calloc(ctx->nitcs, sizeof(itc *)); ctx->free_element = (tube_free_element *)calloc(ctx->nitcs, sizeof(tube_free_element)); ctx->itcs[0] = itc_alloc(source->nslots); ctx->free_element[0] = source->free_element; for (i = 1, s = source->next; s != NULL; s = s->next, i++) { ctx->itcs[i] = itc_alloc(s->nslots); ctx->free_element[i] = s->free_element; } ctx->nstages = ctx->nitcs - 1; ctx->stages = calloc(ctx->nstages, sizeof(struct stage)); for (i = 1, s = source->next; s != NULL; s = s->next, i++) { ctx->stages[i - 1].input = ctx->itcs[i - 1]; ctx->stages[i - 1].output= ctx->itcs[i]; ctx->stages[i - 1].process = s->process; ctx->stages[i - 1].opaque = s->opaque; } ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t)); ctx->fetch = calloc(1, sizeof(struct stage_fetch)); ctx->fetch->output = ctx->itcs[0]; ctx->fetch->fetch = source->fetch; ctx->fetch->opaque = source->opaque; ctx->sink = calloc(1, sizeof(struct stage_sink)); ctx->sink->input = ctx->itcs[ctx->nitcs - 1]; ctx->sink->sink = sink; ctx->sink->opaque = opaque; return ctx; } static void *tube_stage_thread(void *arg) { struct stage *stage = arg; void *element; if (stage == NULL) { return NULL; } while (stage->run) { element = itc_retrive(stage->input, ITC_DEFAULT_TIMEOUT_MS); if (element) { element = stage->process(element, stage->opaque); if (element) { while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run); } } } return NULL; } static void *tube_fetch_thread(void *arg) { struct stage_fetch *fetch = arg; void *element; if (fetch == NULL || fetch->fetch == NULL) { return NULL; } while (fetch->run) { element = fetch->fetch(fetch->opaque); if (element) { while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run); } } return NULL; } static void *tube_sink_thread(void *arg) { struct stage_sink *sink = arg; void *element; if (sink == NULL || sink->sink == NULL) { return NULL; } while (sink->run) { element = itc_retrive(sink->input, ITC_DEFAULT_TIMEOUT_MS); if (element) { sink->sink(element, sink->opaque); } } return NULL; } int tube_start(tube *ctx) { int i = 0; if (ctx == NULL) { return -1; } for (i = 0; i < ctx->nstages; i++) { if (!ctx->stages[i].run) { ctx->stages[i].run = 1; pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]); } } if (!ctx->fetch->run) { ctx->fetch->run = 1; pthread_create(&ctx->fetch_thread, NULL, tube_fetch_thread, ctx->fetch); } if (!ctx->sink->run) { ctx->sink->run = 1; pthread_create(&ctx->sink_thread, NULL, tube_sink_thread, ctx->sink); } return 0; } int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element) { if (ctx == NULL || stage >= ctx->nitcs || ctx->itcs == NULL || ctx->itcs[stage] == NULL) { return -1; } return itc_inject(ctx->itcs[stage], timeout_ms, element); } int tube_inject(tube *ctx, int timeout_ms, void *element) { return tube_inject_at(ctx, 0, timeout_ms, element); } void *tube_retrive(tube *ctx, int timeout_ms) { if (ctx == NULL || !ctx->nitcs) { return NULL; } return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms); } void tube_stop_and_wait_empty(tube *ctx) { int i; if (ctx == NULL) { return; } for (i = 0; i < ctx->nitcs; i++) { if (i == 0 && ctx->fetch->run) { ctx->fetch->run = 0; pthread_join(ctx->fetch_thread, NULL); } else if (i > 0 && ctx->stages[i - 1].run) { ctx->stages[i - 1].run = 0; pthread_join(ctx->stages_thread[i - 1], NULL); } itc_wait_empty(ctx->itcs[i]); } if (ctx->sink->run) { ctx->sink->run = 0; pthread_join(ctx->sink_thread, NULL); } } void tube_discard_all(tube *ctx) { int i; if (ctx == NULL) { return; } for (i = 0; i < ctx->nitcs; i++) { itc_discard_all(ctx->itcs[i], ctx->free_element[i]); } } int tube_stop(tube *ctx) { int i = 0; if (ctx == NULL) { return -1; } for (i = 0; i < ctx->nstages; i++) { if (ctx->stages[i].run) { ctx->stages[i].run = 0; pthread_join(ctx->stages_thread[i], NULL); } } if (ctx->fetch->run) { ctx->fetch->run = 0; pthread_join(ctx->fetch_thread, NULL); } if (ctx->sink->run) { ctx->sink->run = 0; pthread_join(ctx->sink_thread, NULL); } return 0; } void tube_free(tube **ctx) { int i; if (ctx == NULL || *ctx == NULL) { return; } tube_stop(*ctx); for (i = 0; i < (*ctx)->nitcs; i++) { itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]); } free((*ctx)->itcs); free((*ctx)->free_element); free((*ctx)->stages); free((*ctx)->stages_thread); free((*ctx)->fetch); free((*ctx)->sink); free(*ctx); *ctx = NULL; } int tube_get_queued(const tube *ctx, int stage) { if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) { return -1; } return itc_get_queued(ctx->itcs[stage]); } int tube_get_slots(const tube *ctx, int stage) { if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) { return -1; } return itc_get_slots(ctx->itcs[stage]); }