aboutsummaryrefslogtreecommitdiff
path: root/src/tuberia.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/tuberia.c')
-rw-r--r--src/tuberia.c520
1 files changed, 260 insertions, 260 deletions
diff --git a/src/tuberia.c b/src/tuberia.c
index 368a06d..d8b5688 100644
--- a/src/tuberia.c
+++ b/src/tuberia.c
@@ -7,371 +7,371 @@
#define ITC_DEFAULT_TIMEOUT_MS 250
struct stage_fetch {
- struct itc *output;
- tube_fetch fetch;
- void *opaque;
- int run;
+ struct itc *output;
+ tube_fetch fetch;
+ void *opaque;
+ int run;
};
struct stage_sink {
- struct itc *input;
- tube_sink sink;
- void *opaque;
- int run;
+ struct itc *input;
+ tube_sink sink;
+ void *opaque;
+ int run;
};
struct stage {
- struct itc *input;
- struct itc *output;
- tube_process process;
- void *opaque;
- int run;
+ struct itc *input;
+ struct itc *output;
+ tube_process process;
+ void *opaque;
+ int run;
};
struct tube_stage {
- struct tube_stage *next;
- int nslots;
- tube_process process;
- void *opaque;
- tube_free_element free_element;
+ struct tube_stage *next;
+ int nslots;
+ tube_process process;
+ void *opaque;
+ tube_free_element free_element;
};
struct tube_source {
- struct tube_stage *next;
- int nslots;
- tube_fetch fetch;
- void *opaque;
- tube_free_element free_element;
+ struct tube_stage *next;
+ int nslots;
+ tube_fetch fetch;
+ void *opaque;
+ tube_free_element free_element;
};
struct tube {
- int nitcs;
- itc **itcs;
- tube_free_element *free_element;
- int nstages;
- struct stage *stages;
- pthread_t *stages_thread;
- struct stage_fetch *fetch;
- pthread_t fetch_thread;
- struct stage_sink *sink;
- pthread_t sink_thread;
+ int nitcs;
+ itc **itcs;
+ tube_free_element *free_element;
+ int nstages;
+ struct stage *stages;
+ pthread_t *stages_thread;
+ struct stage_fetch *fetch;
+ pthread_t fetch_thread;
+ struct stage_sink *sink;
+ pthread_t sink_thread;
};
tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque,
- tube_free_element free_element)
+ tube_free_element free_element)
{
- struct tube_source *source;
+ struct tube_source *source;
- if (nslots <= 0) {
- return NULL;
- }
+ if (nslots <= 0) {
+ return NULL;
+ }
- source = calloc(1, sizeof(struct tube_source));
- source->nslots = nslots;
- source->fetch = fetch;
- source->opaque = opaque;
- source->free_element = free_element;
+ source = calloc(1, sizeof(struct tube_source));
+ source->nslots = nslots;
+ source->fetch = fetch;
+ source->opaque = opaque;
+ source->free_element = free_element;
- return source;
+ return source;
}
tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque,
- tube_free_element free_element)
+ tube_free_element free_element)
{
- struct tube_stage *stage;
+ struct tube_stage *stage;
- if (process == NULL || nslots <= 0) {
- return NULL;
- }
+ if (process == NULL || nslots <= 0) {
+ return NULL;
+ }
- stage = calloc(1, sizeof(struct tube_stage));
- stage->nslots = nslots;
- stage->process = process;
- stage->opaque = opaque;
- stage->free_element = free_element;
+ stage = calloc(1, sizeof(struct tube_stage));
+ stage->nslots = nslots;
+ stage->process = process;
+ stage->opaque = opaque;
+ stage->free_element = free_element;
- return stage;
+ return stage;
}
void tube_stage_free(tube_stage **stage)
{
- if (stage != NULL) {
- free(*stage);
- *stage = NULL;
- }
+ if (stage != NULL) {
+ free(*stage);
+ *stage = NULL;
+ }
}
void tube_source_and_stages_free(tube_source **source)
{
- struct tube_stage *stage;
- struct tube_stage *next;
-
- if (source != NULL && *source != NULL) {
- stage = (*source)->next;
- while (stage != NULL) {
- next = stage->next;
- free(stage);
- stage = next;
- }
- free(*source);
- *source = NULL;
- }
+ struct tube_stage *stage;
+ struct tube_stage *next;
+
+ if (source != NULL && *source != NULL) {
+ stage = (*source)->next;
+ while (stage != NULL) {
+ next = stage->next;
+ free(stage);
+ stage = next;
+ }
+ free(*source);
+ *source = NULL;
+ }
}
int tube_stage_append(tube_source *source, tube_stage *stage)
{
- struct tube_stage **next;
-
- if (source == NULL || stage == NULL) {
- return -1;
- }
- next = &(source->next);
- while (*next != NULL) {
- next = &((*next)->next);
- }
- *next = stage;
-
- return 0;
+ struct tube_stage **next;
+
+ if (source == NULL || stage == NULL) {
+ return -1;
+ }
+ next = &(source->next);
+ while (*next != NULL) {
+ next = &((*next)->next);
+ }
+ *next = stage;
+
+ return 0;
}
tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque)
{
- tube *ctx;
- struct tube_stage *s;
- int i;
-
- if (source == NULL) {
- return NULL;
- }
- ctx = calloc(1, sizeof(*ctx));
-
- for (ctx->nitcs = 1, s = source->next; s != NULL; ctx->nitcs++) {
- s = s->next;
- }
- ctx->itcs = calloc(ctx->nitcs, sizeof(itc *));
- ctx->free_element = calloc(ctx->nitcs, sizeof(tube_free_element));
- ctx->itcs[0] = itc_alloc(source->nslots);
- ctx->free_element[0] = source->free_element;
- for (i = 1, s = source->next; s != NULL; s = s->next, i++) {
- ctx->itcs[i] = itc_alloc(s->nslots);
- ctx->free_element[i] = s->free_element;
- }
-
- ctx->nstages = ctx->nitcs - 1;
- ctx->stages = calloc(ctx->nstages, sizeof(struct stage));
- for (i = 1, s = source->next; s != NULL; s = s->next, i++) {
- ctx->stages[i - 1].input = ctx->itcs[i - 1];
- ctx->stages[i - 1].output= ctx->itcs[i];
- ctx->stages[i - 1].process = s->process;
- ctx->stages[i - 1].opaque = s->opaque;
- }
- ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t));
-
- ctx->fetch = calloc(1, sizeof(struct stage_fetch));
- ctx->fetch->output = ctx->itcs[0];
- ctx->fetch->fetch = source->fetch;
- ctx->fetch->opaque = source->opaque;
-
- ctx->sink = calloc(1, sizeof(struct stage_sink));
- ctx->sink->input = ctx->itcs[ctx->nitcs - 1];
- ctx->sink->sink = sink;
- ctx->sink->opaque = opaque;
-
- return ctx;
+ tube *ctx;
+ struct tube_stage *s;
+ int i;
+
+ if (source == NULL) {
+ return NULL;
+ }
+ ctx = calloc(1, sizeof(*ctx));
+
+ for (ctx->nitcs = 1, s = source->next; s != NULL; ctx->nitcs++) {
+ s = s->next;
+ }
+ ctx->itcs = calloc(ctx->nitcs, sizeof(itc *));
+ ctx->free_element = calloc(ctx->nitcs, sizeof(tube_free_element));
+ ctx->itcs[0] = itc_alloc(source->nslots);
+ ctx->free_element[0] = source->free_element;
+ for (i = 1, s = source->next; s != NULL; s = s->next, i++) {
+ ctx->itcs[i] = itc_alloc(s->nslots);
+ ctx->free_element[i] = s->free_element;
+ }
+
+ ctx->nstages = ctx->nitcs - 1;
+ ctx->stages = calloc(ctx->nstages, sizeof(struct stage));
+ for (i = 1, s = source->next; s != NULL; s = s->next, i++) {
+ ctx->stages[i - 1].input = ctx->itcs[i - 1];
+ ctx->stages[i - 1].output= ctx->itcs[i];
+ ctx->stages[i - 1].process = s->process;
+ ctx->stages[i - 1].opaque = s->opaque;
+ }
+ ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t));
+
+ ctx->fetch = calloc(1, sizeof(struct stage_fetch));
+ ctx->fetch->output = ctx->itcs[0];
+ ctx->fetch->fetch = source->fetch;
+ ctx->fetch->opaque = source->opaque;
+
+ ctx->sink = calloc(1, sizeof(struct stage_sink));
+ ctx->sink->input = ctx->itcs[ctx->nitcs - 1];
+ ctx->sink->sink = sink;
+ ctx->sink->opaque = opaque;
+
+ return ctx;
}
static void *tube_stage_thread(void *arg)
{
- struct stage *stage = arg;
- void *element;
-
- if (stage == NULL) {
- return NULL;
- }
-
- while (stage->run) {
- element = itc_retrive(stage->input, ITC_DEFAULT_TIMEOUT_MS);
- if (element) {
- element = stage->process(element, stage->opaque);
- if (element) {
- while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run);
- }
- }
- }
-
- return NULL;
+ struct stage *stage = arg;
+ void *element;
+
+ if (stage == NULL) {
+ return NULL;
+ }
+
+ while (stage->run) {
+ element = itc_retrive(stage->input, ITC_DEFAULT_TIMEOUT_MS);
+ if (element) {
+ element = stage->process(element, stage->opaque);
+ if (element) {
+ while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run);
+ }
+ }
+ }
+
+ return NULL;
}
static void *tube_fetch_thread(void *arg)
{
- struct stage_fetch *fetch = arg;
- void *element;
+ struct stage_fetch *fetch = arg;
+ void *element;
- if (fetch == NULL || fetch->fetch == NULL) {
- return NULL;
- }
+ if (fetch == NULL || fetch->fetch == NULL) {
+ return NULL;
+ }
- while (fetch->run) {
- element = fetch->fetch(fetch->opaque);
- if (element) {
- while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run);
- }
- }
+ while (fetch->run) {
+ element = fetch->fetch(fetch->opaque);
+ if (element) {
+ while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run);
+ }
+ }
- return NULL;
+ return NULL;
}
static void *tube_sink_thread(void *arg)
{
- struct stage_sink *sink = arg;
- void *element;
+ struct stage_sink *sink = arg;
+ void *element;
- if (sink == NULL || sink->sink == NULL) {
- return NULL;
- }
+ if (sink == NULL || sink->sink == NULL) {
+ return NULL;
+ }
- while (sink->run) {
- element = itc_retrive(sink->input, ITC_DEFAULT_TIMEOUT_MS);
- if (element) {
- sink->sink(element, sink->opaque);
- }
- }
+ while (sink->run) {
+ element = itc_retrive(sink->input, ITC_DEFAULT_TIMEOUT_MS);
+ if (element) {
+ sink->sink(element, sink->opaque);
+ }
+ }
- return NULL;
+ return NULL;
}
int tube_start(tube *ctx)
{
- int i = 0;
- if (ctx == NULL) {
- return -1;
- }
-
- for (i = 0; i < ctx->nstages; i++) {
- if (!ctx->stages[i].run) {
- ctx->stages[i].run = 1;
- pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]);
- }
- }
- if (!ctx->fetch->run) {
- ctx->fetch->run = 1;
- pthread_create(&ctx->fetch_thread, NULL, tube_fetch_thread, ctx->fetch);
- }
- if (!ctx->sink->run) {
- ctx->sink->run = 1;
- pthread_create(&ctx->sink_thread, NULL, tube_sink_thread, ctx->sink);
- }
-
- return 0;
+ int i = 0;
+ if (ctx == NULL) {
+ return -1;
+ }
+
+ for (i = 0; i < ctx->nstages; i++) {
+ if (!ctx->stages[i].run) {
+ ctx->stages[i].run = 1;
+ pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]);
+ }
+ }
+ if (!ctx->fetch->run) {
+ ctx->fetch->run = 1;
+ pthread_create(&ctx->fetch_thread, NULL, tube_fetch_thread, ctx->fetch);
+ }
+ if (!ctx->sink->run) {
+ ctx->sink->run = 1;
+ pthread_create(&ctx->sink_thread, NULL, tube_sink_thread, ctx->sink);
+ }
+
+ return 0;
}
int tube_inject(tube *ctx, int timeout_ms, void *element)
{
- if (ctx == NULL || !ctx->nitcs) {
- return -1;
- }
+ if (ctx == NULL || !ctx->nitcs) {
+ return -1;
+ }
- return itc_inject(ctx->itcs[0], timeout_ms, element);
+ return itc_inject(ctx->itcs[0], timeout_ms, element);
}
void *tube_retrive(tube *ctx, int timeout_ms)
{
- if (ctx == NULL || !ctx->nitcs) {
- return NULL;
- }
+ if (ctx == NULL || !ctx->nitcs) {
+ return NULL;
+ }
- return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms);
+ return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms);
}
void tube_wait_empty(tube *ctx)
{
- int i;
+ int i;
- if (ctx == NULL) {
- return;
- }
+ if (ctx == NULL) {
+ return;
+ }
- for (i = 0; i < ctx->nitcs; i++) {
- itc_wait_empty(ctx->itcs[i]);
- }
+ for (i = 0; i < ctx->nitcs; i++) {
+ itc_wait_empty(ctx->itcs[i]);
+ }
}
void tube_discard_all(tube *ctx)
{
- int i;
+ int i;
- if (ctx == NULL) {
- return;
- }
+ if (ctx == NULL) {
+ return;
+ }
- for (i = 0; i < ctx->nitcs; i++) {
- itc_discard_all(ctx->itcs[i], ctx->free_element[i]);
- }
+ for (i = 0; i < ctx->nitcs; i++) {
+ itc_discard_all(ctx->itcs[i], ctx->free_element[i]);
+ }
}
int tube_stop(tube *ctx)
{
- int i = 0;
- if (ctx == NULL) {
- return -1;
- }
-
- for (i = 0; i < ctx->nstages; i++) {
- if (ctx->stages[i].run) {
- ctx->stages[i].run = 0;
- pthread_join(ctx->stages_thread[i], NULL);
- }
- }
- if (ctx->fetch->run) {
- ctx->fetch->run = 0;
- pthread_join(ctx->fetch_thread, NULL);
- }
- if (ctx->sink->run) {
- ctx->sink->run = 0;
- pthread_join(ctx->sink_thread, NULL);
- }
-
- return 0;
+ int i = 0;
+ if (ctx == NULL) {
+ return -1;
+ }
+
+ for (i = 0; i < ctx->nstages; i++) {
+ if (ctx->stages[i].run) {
+ ctx->stages[i].run = 0;
+ pthread_join(ctx->stages_thread[i], NULL);
+ }
+ }
+ if (ctx->fetch->run) {
+ ctx->fetch->run = 0;
+ pthread_join(ctx->fetch_thread, NULL);
+ }
+ if (ctx->sink->run) {
+ ctx->sink->run = 0;
+ pthread_join(ctx->sink_thread, NULL);
+ }
+
+ return 0;
}
void tube_free(tube **ctx)
{
- int i;
-
- if (ctx == NULL || *ctx == NULL) {
- return;
- }
-
- tube_stop(*ctx);
-
- for (i = 0; i < (*ctx)->nitcs; i++) {
- itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]);
- }
- free((*ctx)->itcs);
- free((*ctx)->free_element);
- free((*ctx)->stages);
- free((*ctx)->stages_thread);
- free((*ctx)->fetch);
- free((*ctx)->sink);
-
- free(*ctx);
- *ctx = NULL;
+ int i;
+
+ if (ctx == NULL || *ctx == NULL) {
+ return;
+ }
+
+ tube_stop(*ctx);
+
+ for (i = 0; i < (*ctx)->nitcs; i++) {
+ itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]);
+ }
+ free((*ctx)->itcs);
+ free((*ctx)->free_element);
+ free((*ctx)->stages);
+ free((*ctx)->stages_thread);
+ free((*ctx)->fetch);
+ free((*ctx)->sink);
+
+ free(*ctx);
+ *ctx = NULL;
}
int tube_get_queued(const tube *ctx, int stage)
{
- if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) {
- return -1;
- }
- return itc_get_queued(ctx->itcs[stage]);
+ if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) {
+ return -1;
+ }
+ return itc_get_queued(ctx->itcs[stage]);
}
int tube_get_slots(const tube *ctx, int stage)
{
- if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) {
- return -1;
- }
- return itc_get_slots(ctx->itcs[stage]);
+ if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) {
+ return -1;
+ }
+ return itc_get_slots(ctx->itcs[stage]);
}