aboutsummaryrefslogtreecommitdiff
path: root/src/tuberia.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/tuberia.c')
-rw-r--r--src/tuberia.c309
1 files changed, 296 insertions, 13 deletions
diff --git a/src/tuberia.c b/src/tuberia.c
index 3dc5c10..7e3a013 100644
--- a/src/tuberia.c
+++ b/src/tuberia.c
@@ -1,43 +1,107 @@
#include "tuberia.h"
#include "itc.h"
+#include <pthread.h>
#include <stdlib.h>
+#define ITC_DEFAULT_TIMEOUT_MS 250
+
+struct stage_fetch {
+ struct itc *output;
+ tube_fetch fetch;
+ void *opaque;
+ int run;
+};
+
+struct stage_sink {
+ struct itc *input;
+ tube_sink sink;
+ void *opaque;
+ int run;
+};
+
+struct stage {
+ struct itc *input;
+ struct itc *output;
+ tube_process process;
+ void *opaque;
+ int run;
+};
+
+struct tube_stage {
+ struct tube_stage *next;
+ int nslots;
+ tube_process process;
+ void *opaque;
+ tube_free_element free_element;
+};
+
+struct tube_source {
+ struct tube_stage *next;
+ int nslots;
+ tube_fetch fetch;
+ void *opaque;
+ tube_free_element free_element;
+};
+
struct tube {
- struct itc *itc;
+ int nitcs;
+ itc **itcs;
+ tube_free_element *free_element;
+ int nstages;
+ struct stage *stages;
+ pthread_t *stages_thread;
+ struct stage_fetch *fetch;
+ pthread_t fetch_thread;
+ struct stage_sink *sink;
+ pthread_t sink_thread;
};
-struct tube_source *tube_source_alloc(int nqueue, tube_fetch *fetch, void *opaque)
+tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque,
+ tube_free_element free_element)
{
- struct tube_source *source = calloc(1, sizeof(struct tube_source));
+ struct tube_source *source;
+
+ if (nslots <= 0) {
+ return NULL;
+ }
- source->nqueue = nqueue;
+ source = calloc(1, sizeof(struct tube_source));
+ source->nslots = nslots;
source->fetch = fetch;
source->opaque = opaque;
+ source->free_element = free_element;
return source;
}
-struct tube_stage *tube_stage_alloc(int nqueue, tube_process *process, void *opaque)
+tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque,
+ tube_free_element free_element)
{
- struct tube_stage *stage = calloc(1, sizeof(struct tube_stage));
+ struct tube_stage *stage;
+
+ if (process == NULL || nslots <= 0) {
+ return NULL;
+ }
- stage->nqueue = nqueue;
+ stage = calloc(1, sizeof(struct tube_stage));
+ stage->nslots = nslots;
stage->process = process;
stage->opaque = opaque;
+ stage->free_element = free_element;
return stage;
}
-void tube_stage_free(struct tube_stage **stage)
+void tube_stage_free(tube_stage **stage)
{
if (stage != NULL) {
- free(stage);
+ free(*stage);
*stage = NULL;
}
}
-void tube_source_and_stages_free(struct tube_source **source)
+void tube_source_and_stages_free(tube_source **source)
{
struct tube_stage *stage;
struct tube_stage *next;
@@ -54,11 +118,11 @@ void tube_source_and_stages_free(struct tube_source **source)
}
}
-int tube_stage_append(struct tube_source *source, struct tube_stage *stage)
+int tube_stage_append(tube_source *source, tube_stage *stage)
{
struct tube_stage **next;
- if (source != NULL || stage != NULL) {
+ if (source == NULL || stage == NULL) {
return -1;
}
next = &(source->next);
@@ -70,7 +134,226 @@ int tube_stage_append(struct tube_source *source, struct tube_stage *stage)
return 0;
}
-tube *tube_create(struct tube_source *source, tube_sink *sink, void *opaque)
+tube *tube_create(tube_source *source, tube_sink sink, void *opaque)
+{
+ tube *ctx;
+ struct tube_stage *s;
+ int i;
+
+ if (source == NULL) {
+ return NULL;
+ }
+ ctx = calloc(1, sizeof(*ctx));
+
+ for (ctx->nitcs = 1, s = source->next; s != NULL; ctx->nitcs++) {
+ s = s->next;
+ }
+ ctx->itcs = calloc(ctx->nitcs, sizeof(itc *));
+ ctx->free_element = calloc(ctx->nitcs, sizeof(tube_free_element));
+ ctx->itcs[0] = itc_alloc(source->nslots);
+ ctx->free_element[0] = source->free_element;
+ for (i = 1, s = source->next; s != NULL; s = s->next, i++) {
+ ctx->itcs[i] = itc_alloc(s->nslots);
+ ctx->free_element[i] = s->free_element;
+ }
+
+ ctx->nstages = ctx->nitcs - 1;
+ ctx->stages = calloc(ctx->nstages, sizeof(struct stage));
+ for (i = 1, s = source->next; s != NULL; s = s->next, i++) {
+ ctx->stages[i - 1].input = ctx->itcs[i - 1];
+ ctx->stages[i - 1].output= ctx->itcs[i];
+ ctx->stages[i - 1].process = s->process;
+ ctx->stages[i - 1].opaque = s->opaque;
+ }
+ ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t));
+
+ ctx->fetch = calloc(1, sizeof(struct stage_fetch));
+ ctx->fetch->output = ctx->itcs[0];
+ ctx->fetch->fetch = source->fetch;
+ ctx->fetch->opaque = source->opaque;
+
+ ctx->sink = calloc(1, sizeof(struct stage_sink));
+ ctx->sink->input = ctx->itcs[ctx->nitcs - 1];
+ ctx->sink->sink = sink;
+ ctx->sink->opaque = opaque;
+
+ return ctx;
+}
+
+static void *tube_stage_thread(void *arg)
+{
+ struct stage *stage = arg;
+ void *element;
+
+ if (stage == NULL) {
+ return NULL;
+ }
+
+ while (stage->run) {
+ element = itc_retrive(stage->input, ITC_DEFAULT_TIMEOUT_MS);
+ if (element) {
+ element = stage->process(element, stage->opaque);
+ if (element) {
+ while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run);
+ }
+ }
+ }
+
+ return NULL;
+}
+
+static void *tube_fetch_thread(void *arg)
+{
+ struct stage_fetch *fetch = arg;
+ void *element;
+
+ if (fetch == NULL || fetch->fetch == NULL) {
+ return NULL;
+ }
+
+ while (fetch->run) {
+ element = fetch->fetch(fetch->opaque);
+ if (element) {
+ while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run);
+ }
+ }
+
+ return NULL;
+}
+
+static void *tube_sink_thread(void *arg)
+{
+ struct stage_sink *sink = arg;
+ void *element;
+
+ if (sink == NULL || sink->sink == NULL) {
+ return NULL;
+ }
+
+ while (sink->run) {
+ element = itc_retrive(sink->input, ITC_DEFAULT_TIMEOUT_MS);
+ if (element) {
+ sink->sink(element, sink->opaque);
+ }
+ }
+
+ return NULL;
+}
+
+int tube_start(tube *ctx)
+{
+ int i = 0;
+ if (ctx == NULL) {
+ return -1;
+ }
+
+ for (i = 0; i < ctx->nstages; i++) {
+ if (!ctx->stages[i].run) {
+ ctx->stages[i].run = 1;
+ pthread_create(&ctx->stages_thread[0], NULL, tube_stage_thread, &ctx->stages[i]);
+ }
+ }
+ if (!ctx->fetch->run) {
+ ctx->fetch->run = 1;
+ pthread_create(&ctx->fetch_thread, NULL, tube_fetch_thread, ctx->fetch);
+ }
+ if (!ctx->sink->run) {
+ ctx->sink->run = 1;
+ pthread_create(&ctx->sink_thread, NULL, tube_sink_thread, ctx->sink);
+ }
+
+ return 0;
+}
+
+int tube_inject(tube *ctx, int timeout_ms, void *element)
{
+ if (ctx == NULL || !ctx->nitcs) {
+ return -1;
+ }
+
+ return itc_inject(ctx->itcs[0], timeout_ms, element);
+}
+
+void *tube_retrive(tube *ctx, int timeout_ms)
+{
+ if (ctx == NULL || !ctx->nitcs) {
+ return NULL;
+ }
+
+ return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms);
+}
+
+void tube_flush(tube *ctx)
+{
+ int i;
+
+ if (ctx == NULL) {
+ return;
+ }
+
+ for (i = 0; i < ctx->nitcs; i++) {
+ itc_flush(ctx->itcs[i]);
+ }
+}
+
+void tube_drop(tube *ctx)
+{
+ int i;
+
+ if (ctx == NULL) {
+ return;
+ }
+
+ for (i = 0; i < ctx->nitcs; i++) {
+ itc_drop(ctx->itcs[i], ctx->free_element[i]);
+ }
+}
+
+int tube_stop(tube *ctx)
+{
+ int i = 0;
+ if (ctx == NULL) {
+ return -1;
+ }
+
+ for (i = 0; i < ctx->nstages; i++) {
+ if (ctx->stages[i].run) {
+ ctx->stages[i].run = 0;
+ pthread_join(ctx->stages_thread[0], NULL);
+ }
+ }
+ if (ctx->fetch->run) {
+ ctx->fetch->run = 0;
+ pthread_join(ctx->fetch_thread, NULL);
+ }
+ if (ctx->sink->run) {
+ ctx->sink->run = 0;
+ pthread_join(ctx->sink_thread, NULL);
+ }
+
+ return 0;
+}
+
+
+void tube_free(tube **ctx)
+{
+ int i;
+
+ if (ctx == NULL || *ctx == NULL) {
+ return;
+ }
+
+ tube_stop((*ctx));
+
+ for (i = 0; i < (*ctx)->nitcs; i++) {
+ itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]);
+ }
+ free((*ctx)->itcs);
+ free((*ctx)->stages);
+ free((*ctx)->fetch);
+ free((*ctx)->sink);
+
+ free(*ctx);
+ *ctx = NULL;
}