aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/itc.c104
-rw-r--r--src/itc.h14
-rw-r--r--src/pipeline.h41
-rw-r--r--src/tuberia.c76
-rw-r--r--src/tuberia.h43
-rw-r--r--src/utils.c35
-rw-r--r--src/utils.h10
7 files changed, 282 insertions, 41 deletions
diff --git a/src/itc.c b/src/itc.c
new file mode 100644
index 0000000..6171d4e
--- /dev/null
+++ b/src/itc.c
@@ -0,0 +1,104 @@
+#include "itc.h"
+#include "utils.h"
+
+#include <semaphore.h>
+#include <stdlib.h>
+#include <sys/time.h>
+
+struct itc {
+ int nslots;
+ void **slots;
+ int inputidx;
+ int outputidx;
+ sem_t emptied;
+ sem_t occupied;
+};
+
+itc *itc_alloc(int nslots)
+{
+ itc *ctx;
+
+ if (nslots <= 0) {
+ return NULL;
+ }
+
+ ctx = calloc(1, sizeof(*ctx));
+ ctx->nslots = nslots;
+ ctx->slots = calloc(nslots, sizeof(*ctx->slots));
+ sem_init(&ctx->emptied, 0, nslots);
+ sem_init(&ctx->occupied, 0, 0);
+
+ return ctx;
+}
+
+void itc_free(itc **ctx, itc_free_element free_element)
+{
+ if (ctx == NULL || *ctx == NULL) {
+ return;
+ }
+
+ itc_drop(*ctx, free_element);
+
+ sem_destroy(&(*ctx)->emptied);
+ sem_destroy(&(*ctx)->occupied);
+ free((*ctx)->slots);
+ free(*ctx);
+ *ctx = NULL;
+}
+
+void *itc_retrive(itc *ctx, int timeout_ms)
+{
+ struct timespec ts;
+ void *element;
+
+ if (ctx == NULL) {
+ return NULL;
+ }
+
+ timespec_add_ms(gettimespec(&ts), timeout_ms);
+
+ if (sem_timedwait(&ctx->occupied, &ts)) {
+ return NULL;
+ }
+ element = ctx->slots[ctx->outputidx];
+ ctx->outputidx = (ctx->outputidx + 1) % ctx->nslots;
+ sem_post(&ctx->emptied);
+
+ return element;
+}
+
+int itc_inject(itc *ctx, int timeout_ms, void *element)
+{
+ struct timespec ts;
+
+ if (ctx == NULL) {
+ return -1;
+ }
+
+ timespec_add_ms(gettimespec(&ts), timeout_ms);
+
+ if (sem_timedwait(&ctx->emptied, &ts)) {
+ return -1;
+ }
+ ctx->slots[ctx->inputidx] = element;
+ ctx->inputidx = (ctx->inputidx + 1) % ctx->nslots;
+ sem_post(&ctx->occupied);
+
+ return 0;
+}
+
+void itc_drop(itc *ctx, itc_free_element free_element)
+{
+ void *element;
+
+ if (ctx == NULL) {
+ return;
+ }
+
+ while ((element = itc_retrive(ctx, 0)) != NULL) {
+ if (free_element != NULL) {
+ free_element(element);
+ }
+ }
+}
+
diff --git a/src/itc.h b/src/itc.h
new file mode 100644
index 0000000..0d43e3d
--- /dev/null
+++ b/src/itc.h
@@ -0,0 +1,14 @@
+#ifndef __LIBTUBERIA_ITC_H__
+#define __LIBTUBERIA_ITC_H__
+
+typedef struct itc itc;
+typedef void (*itc_free_element)(void *element);
+
+itc *itc_alloc(int nslots);
+void itc_free(itc **ctx, itc_free_element free_element);
+
+void *itc_retrive(itc *ctx, int timeout_ms);
+int itc_inject(itc *ctx, int timeout_ms, void *element);
+void itc_drop(itc *ctx, itc_free_element free_element);
+
+#endif //__LIBTUBERIA_ITC_H__
diff --git a/src/pipeline.h b/src/pipeline.h
deleted file mode 100644
index 42fed98..0000000
--- a/src/pipeline.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#ifndef __LIBPIPELINE_H__
-#define __LIBPIPELINE_H__
-
-typedef struct pl_pipleline pl_pipeline;
-
-typedef void *(*pl_fetch)(void *opaque);
-typedef void *(*pl_process)(void *element, void *opaque);
-typedef void (*pl_ready)(void *element, void *opaque);
-typedef void (*pl_sink)(void *element, void *opaque);
-
-struct pl_stage {
- struct pl_sage *next;
- int nqueue;
- pl_process *process;
- void *opaque;
-};
-
-struct pl_source {
- struct pl_sage *next;
- int nqueue;
- pl_fetch *fetch;
- void *opaque;
-};
-
-struct pl_source *pl_source_alloc(int nqueue, pl_fetch *fetch);
-struct pl_stage *pl_stage_alloc(int nqueue, pl_process *process);
-void pl_stage_free(struct pl_stage **stage);
-void pl_source_and_stages_free(struct pl_source **stage);
-
-int pl_stage_append(struct pl_source *source, struct pl_stage *stage);
-
-pl_pipeline *pl_create(struct pl_source *source, pl_sink *sink, void *opaque);
-int pl_start(pl_pipeline *pipeline);
-int pl_stop(pl_pipeline *pipeline);
-void pl_free(pl_pipeline **pipeline);
-
-int pl_inject(pl_pipeline *pipeline, void *element);
-void *pl_retrive(pl_pipeline *pipeline, int timeout_ms);
-
-#endif //__LIBPIPELINE_H__
-
diff --git a/src/tuberia.c b/src/tuberia.c
new file mode 100644
index 0000000..3dc5c10
--- /dev/null
+++ b/src/tuberia.c
@@ -0,0 +1,76 @@
+#include "tuberia.h"
+#include "itc.h"
+
+#include <stdlib.h>
+
+struct tube {
+ struct itc *itc;
+};
+
+struct tube_source *tube_source_alloc(int nqueue, tube_fetch *fetch, void *opaque)
+{
+ struct tube_source *source = calloc(1, sizeof(struct tube_source));
+
+ source->nqueue = nqueue;
+ source->fetch = fetch;
+ source->opaque = opaque;
+
+ return source;
+}
+
+struct tube_stage *tube_stage_alloc(int nqueue, tube_process *process, void *opaque)
+{
+ struct tube_stage *stage = calloc(1, sizeof(struct tube_stage));
+
+ stage->nqueue = nqueue;
+ stage->process = process;
+ stage->opaque = opaque;
+
+ return stage;
+}
+
+void tube_stage_free(struct tube_stage **stage)
+{
+ if (stage != NULL) {
+ free(stage);
+ *stage = NULL;
+ }
+}
+
+void tube_source_and_stages_free(struct tube_source **source)
+{
+ struct tube_stage *stage;
+ struct tube_stage *next;
+
+ if (source != NULL && *source != NULL) {
+ stage = (*source)->next;
+ while (stage != NULL) {
+ next = stage->next;
+ free(stage);
+ stage = next;
+ }
+ free(*source);
+ *source = NULL;
+ }
+}
+
+int tube_stage_append(struct tube_source *source, struct tube_stage *stage)
+{
+ struct tube_stage **next;
+
+ if (source != NULL || stage != NULL) {
+ return -1;
+ }
+ next = &(source->next);
+ while (*next != NULL) {
+ next = &((*next)->next);
+ }
+ *next = stage;
+
+ return 0;
+}
+
+tube *tube_create(struct tube_source *source, tube_sink *sink, void *opaque)
+{
+}
+
diff --git a/src/tuberia.h b/src/tuberia.h
new file mode 100644
index 0000000..048b078
--- /dev/null
+++ b/src/tuberia.h
@@ -0,0 +1,43 @@
+#ifndef __LIBTUBERIA_H__
+#define __LIBTUBERIA_H__
+
+typedef struct tube tube;
+
+typedef void *(*tube_fetch)(void *opaque);
+typedef void *(*tube_process)(void *element, void *opaque);
+typedef void (*tube_sink)(void *element, void *opaque);
+typedef void (*tube_free_element)(void *element);
+
+struct tube_stage {
+ struct tube_stage *next;
+ int nqueue;
+ tube_process *process;
+ void *opaque;
+};
+
+struct tube_source {
+ struct tube_stage *next;
+ int nqueue;
+ tube_fetch *fetch;
+ void *opaque;
+};
+
+struct tube_source *tube_source_alloc(int nqueue, tube_fetch *fetch, void *opaque);
+struct tube_stage *tube_stage_alloc(int nqueue, tube_process *process, void *opaque);
+void tube_stage_free(struct tube_stage **stage);
+void tube_source_and_stages_free(struct tube_source **source);
+
+int tube_stage_append(struct tube_source *source, struct tube_stage *stage);
+
+tube *tube_create(struct tube_source *source, tube_sink *sink, void *opaque);
+int tube_start(tube *tube);
+int tube_stop(tube *tube);
+void tube_free(tube **tube, tube_free_element free_element);
+
+int tube_inject(tube *tube, int timeout_ms, void *element);
+void *tube_retrive(tube *tube, int timeout_ms);
+int tube_flush(tube *tube);
+int tube_drop(tube *tube, tube_free_element free_element);
+
+#endif //__LIBTUBERIA_H__
+
diff --git a/src/utils.c b/src/utils.c
new file mode 100644
index 0000000..8505b63
--- /dev/null
+++ b/src/utils.c
@@ -0,0 +1,35 @@
+#include "utils.h"
+
+#include <stdlib.h>
+#include <sys/time.h>
+
+struct timespec *timespec_add_ms(struct timespec *ts, int timeout_ms)
+{
+ long long nsec;
+
+ if (!ts) {
+ return NULL;
+ }
+
+ nsec = ts->tv_nsec + timeout_ms * 1000000LL;
+ ts->tv_sec += nsec / 1000000000LL;
+ ts->tv_nsec = nsec % 1000000000LL;
+
+ return ts;
+}
+
+struct timespec *gettimespec(struct timespec *ts)
+{
+ struct timeval tv;
+
+ if (!ts) {
+ return NULL;
+ }
+
+ gettimeofday(&tv, NULL);
+ ts->tv_sec = tv.tv_sec;
+ ts->tv_nsec = tv.tv_usec * 1000L;
+
+ return ts;
+}
+
diff --git a/src/utils.h b/src/utils.h
new file mode 100644
index 0000000..a98763a
--- /dev/null
+++ b/src/utils.h
@@ -0,0 +1,10 @@
+#ifndef __LIBTUBERIA_UTILS_H__
+#define __LIBTUBERIA_UTILS_H__
+
+#include <sys/time.h>
+
+struct timespec *timespec_add_ms(struct timespec *ts, int timeout_ms);
+struct timespec *gettimespec(struct timespec *ts);
+
+#endif
+