diff options
-rw-r--r-- | src/itc.c | 104 | ||||
-rw-r--r-- | src/itc.h | 14 | ||||
-rw-r--r-- | src/pipeline.h | 41 | ||||
-rw-r--r-- | src/tuberia.c | 76 | ||||
-rw-r--r-- | src/tuberia.h | 43 | ||||
-rw-r--r-- | src/utils.c | 35 | ||||
-rw-r--r-- | src/utils.h | 10 |
7 files changed, 282 insertions, 41 deletions
diff --git a/src/itc.c b/src/itc.c new file mode 100644 index 0000000..6171d4e --- /dev/null +++ b/src/itc.c @@ -0,0 +1,104 @@ +#include "itc.h" +#include "utils.h" + +#include <semaphore.h> +#include <stdlib.h> +#include <sys/time.h> + +struct itc { + int nslots; + void **slots; + int inputidx; + int outputidx; + sem_t emptied; + sem_t occupied; +}; + +itc *itc_alloc(int nslots) +{ + itc *ctx; + + if (nslots <= 0) { + return NULL; + } + + ctx = calloc(1, sizeof(*ctx)); + ctx->nslots = nslots; + ctx->slots = calloc(nslots, sizeof(*ctx->slots)); + sem_init(&ctx->emptied, 0, nslots); + sem_init(&ctx->occupied, 0, 0); + + return ctx; +} + +void itc_free(itc **ctx, itc_free_element free_element) +{ + if (ctx == NULL || *ctx == NULL) { + return; + } + + itc_drop(*ctx, free_element); + + sem_destroy(&(*ctx)->emptied); + sem_destroy(&(*ctx)->occupied); + free((*ctx)->slots); + free(*ctx); + *ctx = NULL; +} + +void *itc_retrive(itc *ctx, int timeout_ms) +{ + struct timespec ts; + void *element; + + if (ctx == NULL) { + return NULL; + } + + timespec_add_ms(gettimespec(&ts), timeout_ms); + + if (sem_timedwait(&ctx->occupied, &ts)) { + return NULL; + } + element = ctx->slots[ctx->outputidx]; + ctx->outputidx = (ctx->outputidx + 1) % ctx->nslots; + sem_post(&ctx->emptied); + + return element; +} + +int itc_inject(itc *ctx, int timeout_ms, void *element) +{ + struct timespec ts; + + if (ctx == NULL) { + return -1; + } + + timespec_add_ms(gettimespec(&ts), timeout_ms); + + if (sem_timedwait(&ctx->emptied, &ts)) { + return -1; + } + ctx->slots[ctx->inputidx] = element; + ctx->inputidx = (ctx->inputidx + 1) % ctx->nslots; + sem_post(&ctx->occupied); + + return 0; +} + +void itc_drop(itc *ctx, itc_free_element free_element) +{ + void *element; + + if (ctx == NULL) { + return; + } + + while ((element = itc_retrive(ctx, 0)) != NULL) { + if (free_element != NULL) { + free_element(element); + } + } +} + diff --git a/src/itc.h b/src/itc.h new file mode 100644 index 0000000..0d43e3d --- /dev/null +++ b/src/itc.h @@ -0,0 +1,14 @@ +#ifndef __LIBTUBERIA_ITC_H__ +#define __LIBTUBERIA_ITC_H__ + +typedef struct itc itc; +typedef void (*itc_free_element)(void *element); + +itc *itc_alloc(int nslots); +void itc_free(itc **ctx, itc_free_element free_element); + +void *itc_retrive(itc *ctx, int timeout_ms); +int itc_inject(itc *ctx, int timeout_ms, void *element); +void itc_drop(itc *ctx, itc_free_element free_element); + +#endif //__LIBTUBERIA_ITC_H__ diff --git a/src/pipeline.h b/src/pipeline.h deleted file mode 100644 index 42fed98..0000000 --- a/src/pipeline.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef __LIBPIPELINE_H__ -#define __LIBPIPELINE_H__ - -typedef struct pl_pipleline pl_pipeline; - -typedef void *(*pl_fetch)(void *opaque); -typedef void *(*pl_process)(void *element, void *opaque); -typedef void (*pl_ready)(void *element, void *opaque); -typedef void (*pl_sink)(void *element, void *opaque); - -struct pl_stage { - struct pl_sage *next; - int nqueue; - pl_process *process; - void *opaque; -}; - -struct pl_source { - struct pl_sage *next; - int nqueue; - pl_fetch *fetch; - void *opaque; -}; - -struct pl_source *pl_source_alloc(int nqueue, pl_fetch *fetch); -struct pl_stage *pl_stage_alloc(int nqueue, pl_process *process); -void pl_stage_free(struct pl_stage **stage); -void pl_source_and_stages_free(struct pl_source **stage); - -int pl_stage_append(struct pl_source *source, struct pl_stage *stage); - -pl_pipeline *pl_create(struct pl_source *source, pl_sink *sink, void *opaque); -int pl_start(pl_pipeline *pipeline); -int pl_stop(pl_pipeline *pipeline); -void pl_free(pl_pipeline **pipeline); - -int pl_inject(pl_pipeline *pipeline, void *element); -void *pl_retrive(pl_pipeline *pipeline, int timeout_ms); - -#endif //__LIBPIPELINE_H__ - diff --git a/src/tuberia.c b/src/tuberia.c new file mode 100644 index 0000000..3dc5c10 --- /dev/null +++ b/src/tuberia.c @@ -0,0 +1,76 @@ +#include "tuberia.h" +#include "itc.h" + +#include <stdlib.h> + +struct tube { + struct itc *itc; +}; + +struct tube_source *tube_source_alloc(int nqueue, tube_fetch *fetch, void *opaque) +{ + struct tube_source *source = calloc(1, sizeof(struct tube_source)); + + source->nqueue = nqueue; + source->fetch = fetch; + source->opaque = opaque; + + return source; +} + +struct tube_stage *tube_stage_alloc(int nqueue, tube_process *process, void *opaque) +{ + struct tube_stage *stage = calloc(1, sizeof(struct tube_stage)); + + stage->nqueue = nqueue; + stage->process = process; + stage->opaque = opaque; + + return stage; +} + +void tube_stage_free(struct tube_stage **stage) +{ + if (stage != NULL) { + free(stage); + *stage = NULL; + } +} + +void tube_source_and_stages_free(struct tube_source **source) +{ + struct tube_stage *stage; + struct tube_stage *next; + + if (source != NULL && *source != NULL) { + stage = (*source)->next; + while (stage != NULL) { + next = stage->next; + free(stage); + stage = next; + } + free(*source); + *source = NULL; + } +} + +int tube_stage_append(struct tube_source *source, struct tube_stage *stage) +{ + struct tube_stage **next; + + if (source != NULL || stage != NULL) { + return -1; + } + next = &(source->next); + while (*next != NULL) { + next = &((*next)->next); + } + *next = stage; + + return 0; +} + +tube *tube_create(struct tube_source *source, tube_sink *sink, void *opaque) +{ +} + diff --git a/src/tuberia.h b/src/tuberia.h new file mode 100644 index 0000000..048b078 --- /dev/null +++ b/src/tuberia.h @@ -0,0 +1,43 @@ +#ifndef __LIBTUBERIA_H__ +#define __LIBTUBERIA_H__ + +typedef struct tube tube; + +typedef void *(*tube_fetch)(void *opaque); +typedef void *(*tube_process)(void *element, void *opaque); +typedef void (*tube_sink)(void *element, void *opaque); +typedef void (*tube_free_element)(void *element); + +struct tube_stage { + struct tube_stage *next; + int nqueue; + tube_process *process; + void *opaque; +}; + +struct tube_source { + struct tube_stage *next; + int nqueue; + tube_fetch *fetch; + void *opaque; +}; + +struct tube_source *tube_source_alloc(int nqueue, tube_fetch *fetch, void *opaque); +struct tube_stage *tube_stage_alloc(int nqueue, tube_process *process, void *opaque); +void tube_stage_free(struct tube_stage **stage); +void tube_source_and_stages_free(struct tube_source **source); + +int tube_stage_append(struct tube_source *source, struct tube_stage *stage); + +tube *tube_create(struct tube_source *source, tube_sink *sink, void *opaque); +int tube_start(tube *tube); +int tube_stop(tube *tube); +void tube_free(tube **tube, tube_free_element free_element); + +int tube_inject(tube *tube, int timeout_ms, void *element); +void *tube_retrive(tube *tube, int timeout_ms); +int tube_flush(tube *tube); +int tube_drop(tube *tube, tube_free_element free_element); + +#endif //__LIBTUBERIA_H__ + diff --git a/src/utils.c b/src/utils.c new file mode 100644 index 0000000..8505b63 --- /dev/null +++ b/src/utils.c @@ -0,0 +1,35 @@ +#include "utils.h" + +#include <stdlib.h> +#include <sys/time.h> + +struct timespec *timespec_add_ms(struct timespec *ts, int timeout_ms) +{ + long long nsec; + + if (!ts) { + return NULL; + } + + nsec = ts->tv_nsec + timeout_ms * 1000000LL; + ts->tv_sec += nsec / 1000000000LL; + ts->tv_nsec = nsec % 1000000000LL; + + return ts; +} + +struct timespec *gettimespec(struct timespec *ts) +{ + struct timeval tv; + + if (!ts) { + return NULL; + } + + gettimeofday(&tv, NULL); + ts->tv_sec = tv.tv_sec; + ts->tv_nsec = tv.tv_usec * 1000L; + + return ts; +} + diff --git a/src/utils.h b/src/utils.h new file mode 100644 index 0000000..a98763a --- /dev/null +++ b/src/utils.h @@ -0,0 +1,10 @@ +#ifndef __LIBTUBERIA_UTILS_H__ +#define __LIBTUBERIA_UTILS_H__ + +#include <sys/time.h> + +struct timespec *timespec_add_ms(struct timespec *ts, int timeout_ms); +struct timespec *gettimespec(struct timespec *ts); + +#endif + |