diff options
author | Nicolas Dato <nicolas.dato@gmail.com> | 2025-06-26 20:45:48 -0300 |
---|---|---|
committer | Nicolas Dato <nicolas.dato@gmail.com> | 2025-06-26 20:45:48 -0300 |
commit | 2ce4fb4a3cfb0d1cb92e9cbd71087fa16414631e (patch) | |
tree | 37b6692499de8bd8fcdcd402a505f1eec015eb49 | |
parent | 8d8819359d33b24454289a8a4c456a1c4caa5e26 (diff) | |
download | libtuberia-2ce4fb4a3cfb0d1cb92e9cbd71087fa16414631e.tar.gz |
code indentation
-rw-r--r-- | src/itc.c | 162 | ||||
-rw-r--r-- | src/tuberia.c | 520 | ||||
-rw-r--r-- | src/utils.c | 32 |
3 files changed, 357 insertions, 357 deletions
@@ -6,133 +6,133 @@ #include <sys/time.h> struct itc { - int nslots; - void **slots; - int inputidx; - int outputidx; - sem_t emptied; - sem_t occupied; + int nslots; + void **slots; + int inputidx; + int outputidx; + sem_t emptied; + sem_t occupied; }; itc *itc_alloc(int nslots) { - itc *ctx; + itc *ctx; - if (nslots <= 0) { - return NULL; - } + if (nslots <= 0) { + return NULL; + } - ctx = calloc(1, sizeof(*ctx)); - ctx->nslots = nslots; - ctx->slots = calloc(nslots, sizeof(*ctx->slots)); - sem_init(&ctx->emptied, 0, nslots); - sem_init(&ctx->occupied, 0, 0); + ctx = calloc(1, sizeof(*ctx)); + ctx->nslots = nslots; + ctx->slots = calloc(nslots, sizeof(*ctx->slots)); + sem_init(&ctx->emptied, 0, nslots); + sem_init(&ctx->occupied, 0, 0); - return ctx; + return ctx; } void itc_free(itc **ctx, itc_free_element free_element) { - if (ctx == NULL || *ctx == NULL) { - return; - } + if (ctx == NULL || *ctx == NULL) { + return; + } - itc_discard_all(*ctx, free_element); + itc_discard_all(*ctx, free_element); - sem_destroy(&(*ctx)->emptied); - sem_destroy(&(*ctx)->occupied); - free((*ctx)->slots); - free(*ctx); - *ctx = NULL; + sem_destroy(&(*ctx)->emptied); + sem_destroy(&(*ctx)->occupied); + free((*ctx)->slots); + free(*ctx); + *ctx = NULL; } void *itc_retrive(itc *ctx, int timeout_ms) { - struct timespec ts; - void *element; + struct timespec ts; + void *element; - if (ctx == NULL) { - return NULL; - } + if (ctx == NULL) { + return NULL; + } - timespec_add_ms(gettimespec(&ts), timeout_ms); + timespec_add_ms(gettimespec(&ts), timeout_ms); - if (sem_timedwait(&ctx->occupied, &ts)) { - return NULL; - } - element = ctx->slots[ctx->outputidx]; - ctx->outputidx = (ctx->outputidx + 1) % ctx->nslots; - sem_post(&ctx->emptied); + if (sem_timedwait(&ctx->occupied, &ts)) { + return NULL; + } + element = ctx->slots[ctx->outputidx]; + ctx->outputidx = (ctx->outputidx + 1) % ctx->nslots; + sem_post(&ctx->emptied); - return element; + return element; } int itc_inject(itc *ctx, int timeout_ms, void *element) { - struct timespec ts; + struct timespec ts; - if (ctx == NULL || element == NULL) { - return -1; - } + if (ctx == NULL || element == NULL) { + return -1; + } - timespec_add_ms(gettimespec(&ts), timeout_ms); + timespec_add_ms(gettimespec(&ts), timeout_ms); - if (sem_timedwait(&ctx->emptied, &ts)) { - return -1; - } - ctx->slots[ctx->inputidx] = element; - ctx->inputidx = (ctx->inputidx + 1) % ctx->nslots; - sem_post(&ctx->occupied); + if (sem_timedwait(&ctx->emptied, &ts)) { + return -1; + } + ctx->slots[ctx->inputidx] = element; + ctx->inputidx = (ctx->inputidx + 1) % ctx->nslots; + sem_post(&ctx->occupied); - return 0; + return 0; } void itc_wait_empty(itc *ctx) { - int i; - - if (ctx == NULL) { - return; - } - - for (i = 0; i < ctx->nslots; i++) { - sem_wait(&ctx->emptied); - } - for (i = 0; i < ctx->nslots; i++) { - sem_post(&ctx->emptied); - } + int i; + + if (ctx == NULL) { + return; + } + + for (i = 0; i < ctx->nslots; i++) { + sem_wait(&ctx->emptied); + } + for (i = 0; i < ctx->nslots; i++) { + sem_post(&ctx->emptied); + } } void itc_discard_all(itc *ctx, itc_free_element free_element) { - void *element; + void *element; - if (ctx == NULL) { - return; - } + if (ctx == NULL) { + return; + } - while ((element = itc_retrive(ctx, 0)) != NULL) { - if (free_element != NULL) { - free_element(element); - } - } + while ((element = itc_retrive(ctx, 0)) != NULL) { + if (free_element != NULL) { + free_element(element); + } + } } int itc_get_queued(itc *ctx) { - int val; - if (ctx == NULL) { - return -1; - } - sem_getvalue(&ctx->occupied, &val); - return val; + int val; + if (ctx == NULL) { + return -1; + } + sem_getvalue(&ctx->occupied, &val); + return val; } int itc_get_slots(itc *ctx) { - if (ctx == NULL) { - return -1; - } - return ctx->nslots; + if (ctx == NULL) { + return -1; + } + return ctx->nslots; } diff --git a/src/tuberia.c b/src/tuberia.c index 368a06d..d8b5688 100644 --- a/src/tuberia.c +++ b/src/tuberia.c @@ -7,371 +7,371 @@ #define ITC_DEFAULT_TIMEOUT_MS 250 struct stage_fetch { - struct itc *output; - tube_fetch fetch; - void *opaque; - int run; + struct itc *output; + tube_fetch fetch; + void *opaque; + int run; }; struct stage_sink { - struct itc *input; - tube_sink sink; - void *opaque; - int run; + struct itc *input; + tube_sink sink; + void *opaque; + int run; }; struct stage { - struct itc *input; - struct itc *output; - tube_process process; - void *opaque; - int run; + struct itc *input; + struct itc *output; + tube_process process; + void *opaque; + int run; }; struct tube_stage { - struct tube_stage *next; - int nslots; - tube_process process; - void *opaque; - tube_free_element free_element; + struct tube_stage *next; + int nslots; + tube_process process; + void *opaque; + tube_free_element free_element; }; struct tube_source { - struct tube_stage *next; - int nslots; - tube_fetch fetch; - void *opaque; - tube_free_element free_element; + struct tube_stage *next; + int nslots; + tube_fetch fetch; + void *opaque; + tube_free_element free_element; }; struct tube { - int nitcs; - itc **itcs; - tube_free_element *free_element; - int nstages; - struct stage *stages; - pthread_t *stages_thread; - struct stage_fetch *fetch; - pthread_t fetch_thread; - struct stage_sink *sink; - pthread_t sink_thread; + int nitcs; + itc **itcs; + tube_free_element *free_element; + int nstages; + struct stage *stages; + pthread_t *stages_thread; + struct stage_fetch *fetch; + pthread_t fetch_thread; + struct stage_sink *sink; + pthread_t sink_thread; }; tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque, - tube_free_element free_element) + tube_free_element free_element) { - struct tube_source *source; + struct tube_source *source; - if (nslots <= 0) { - return NULL; - } + if (nslots <= 0) { + return NULL; + } - source = calloc(1, sizeof(struct tube_source)); - source->nslots = nslots; - source->fetch = fetch; - source->opaque = opaque; - source->free_element = free_element; + source = calloc(1, sizeof(struct tube_source)); + source->nslots = nslots; + source->fetch = fetch; + source->opaque = opaque; + source->free_element = free_element; - return source; + return source; } tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque, - tube_free_element free_element) + tube_free_element free_element) { - struct tube_stage *stage; + struct tube_stage *stage; - if (process == NULL || nslots <= 0) { - return NULL; - } + if (process == NULL || nslots <= 0) { + return NULL; + } - stage = calloc(1, sizeof(struct tube_stage)); - stage->nslots = nslots; - stage->process = process; - stage->opaque = opaque; - stage->free_element = free_element; + stage = calloc(1, sizeof(struct tube_stage)); + stage->nslots = nslots; + stage->process = process; + stage->opaque = opaque; + stage->free_element = free_element; - return stage; + return stage; } void tube_stage_free(tube_stage **stage) { - if (stage != NULL) { - free(*stage); - *stage = NULL; - } + if (stage != NULL) { + free(*stage); + *stage = NULL; + } } void tube_source_and_stages_free(tube_source **source) { - struct tube_stage *stage; - struct tube_stage *next; - - if (source != NULL && *source != NULL) { - stage = (*source)->next; - while (stage != NULL) { - next = stage->next; - free(stage); - stage = next; - } - free(*source); - *source = NULL; - } + struct tube_stage *stage; + struct tube_stage *next; + + if (source != NULL && *source != NULL) { + stage = (*source)->next; + while (stage != NULL) { + next = stage->next; + free(stage); + stage = next; + } + free(*source); + *source = NULL; + } } int tube_stage_append(tube_source *source, tube_stage *stage) { - struct tube_stage **next; - - if (source == NULL || stage == NULL) { - return -1; - } - next = &(source->next); - while (*next != NULL) { - next = &((*next)->next); - } - *next = stage; - - return 0; + struct tube_stage **next; + + if (source == NULL || stage == NULL) { + return -1; + } + next = &(source->next); + while (*next != NULL) { + next = &((*next)->next); + } + *next = stage; + + return 0; } tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque) { - tube *ctx; - struct tube_stage *s; - int i; - - if (source == NULL) { - return NULL; - } - ctx = calloc(1, sizeof(*ctx)); - - for (ctx->nitcs = 1, s = source->next; s != NULL; ctx->nitcs++) { - s = s->next; - } - ctx->itcs = calloc(ctx->nitcs, sizeof(itc *)); - ctx->free_element = calloc(ctx->nitcs, sizeof(tube_free_element)); - ctx->itcs[0] = itc_alloc(source->nslots); - ctx->free_element[0] = source->free_element; - for (i = 1, s = source->next; s != NULL; s = s->next, i++) { - ctx->itcs[i] = itc_alloc(s->nslots); - ctx->free_element[i] = s->free_element; - } - - ctx->nstages = ctx->nitcs - 1; - ctx->stages = calloc(ctx->nstages, sizeof(struct stage)); - for (i = 1, s = source->next; s != NULL; s = s->next, i++) { - ctx->stages[i - 1].input = ctx->itcs[i - 1]; - ctx->stages[i - 1].output= ctx->itcs[i]; - ctx->stages[i - 1].process = s->process; - ctx->stages[i - 1].opaque = s->opaque; - } - ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t)); - - ctx->fetch = calloc(1, sizeof(struct stage_fetch)); - ctx->fetch->output = ctx->itcs[0]; - ctx->fetch->fetch = source->fetch; - ctx->fetch->opaque = source->opaque; - - ctx->sink = calloc(1, sizeof(struct stage_sink)); - ctx->sink->input = ctx->itcs[ctx->nitcs - 1]; - ctx->sink->sink = sink; - ctx->sink->opaque = opaque; - - return ctx; + tube *ctx; + struct tube_stage *s; + int i; + + if (source == NULL) { + return NULL; + } + ctx = calloc(1, sizeof(*ctx)); + + for (ctx->nitcs = 1, s = source->next; s != NULL; ctx->nitcs++) { + s = s->next; + } + ctx->itcs = calloc(ctx->nitcs, sizeof(itc *)); + ctx->free_element = calloc(ctx->nitcs, sizeof(tube_free_element)); + ctx->itcs[0] = itc_alloc(source->nslots); + ctx->free_element[0] = source->free_element; + for (i = 1, s = source->next; s != NULL; s = s->next, i++) { + ctx->itcs[i] = itc_alloc(s->nslots); + ctx->free_element[i] = s->free_element; + } + + ctx->nstages = ctx->nitcs - 1; + ctx->stages = calloc(ctx->nstages, sizeof(struct stage)); + for (i = 1, s = source->next; s != NULL; s = s->next, i++) { + ctx->stages[i - 1].input = ctx->itcs[i - 1]; + ctx->stages[i - 1].output= ctx->itcs[i]; + ctx->stages[i - 1].process = s->process; + ctx->stages[i - 1].opaque = s->opaque; + } + ctx->stages_thread = calloc(ctx->nstages, sizeof(pthread_t)); + + ctx->fetch = calloc(1, sizeof(struct stage_fetch)); + ctx->fetch->output = ctx->itcs[0]; + ctx->fetch->fetch = source->fetch; + ctx->fetch->opaque = source->opaque; + + ctx->sink = calloc(1, sizeof(struct stage_sink)); + ctx->sink->input = ctx->itcs[ctx->nitcs - 1]; + ctx->sink->sink = sink; + ctx->sink->opaque = opaque; + + return ctx; } static void *tube_stage_thread(void *arg) { - struct stage *stage = arg; - void *element; - - if (stage == NULL) { - return NULL; - } - - while (stage->run) { - element = itc_retrive(stage->input, ITC_DEFAULT_TIMEOUT_MS); - if (element) { - element = stage->process(element, stage->opaque); - if (element) { - while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run); - } - } - } - - return NULL; + struct stage *stage = arg; + void *element; + + if (stage == NULL) { + return NULL; + } + + while (stage->run) { + element = itc_retrive(stage->input, ITC_DEFAULT_TIMEOUT_MS); + if (element) { + element = stage->process(element, stage->opaque); + if (element) { + while (itc_inject(stage->output, ITC_DEFAULT_TIMEOUT_MS, element) && stage->run); + } + } + } + + return NULL; } static void *tube_fetch_thread(void *arg) { - struct stage_fetch *fetch = arg; - void *element; + struct stage_fetch *fetch = arg; + void *element; - if (fetch == NULL || fetch->fetch == NULL) { - return NULL; - } + if (fetch == NULL || fetch->fetch == NULL) { + return NULL; + } - while (fetch->run) { - element = fetch->fetch(fetch->opaque); - if (element) { - while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run); - } - } + while (fetch->run) { + element = fetch->fetch(fetch->opaque); + if (element) { + while (itc_inject(fetch->output, ITC_DEFAULT_TIMEOUT_MS, element) && fetch->run); + } + } - return NULL; + return NULL; } static void *tube_sink_thread(void *arg) { - struct stage_sink *sink = arg; - void *element; + struct stage_sink *sink = arg; + void *element; - if (sink == NULL || sink->sink == NULL) { - return NULL; - } + if (sink == NULL || sink->sink == NULL) { + return NULL; + } - while (sink->run) { - element = itc_retrive(sink->input, ITC_DEFAULT_TIMEOUT_MS); - if (element) { - sink->sink(element, sink->opaque); - } - } + while (sink->run) { + element = itc_retrive(sink->input, ITC_DEFAULT_TIMEOUT_MS); + if (element) { + sink->sink(element, sink->opaque); + } + } - return NULL; + return NULL; } int tube_start(tube *ctx) { - int i = 0; - if (ctx == NULL) { - return -1; - } - - for (i = 0; i < ctx->nstages; i++) { - if (!ctx->stages[i].run) { - ctx->stages[i].run = 1; - pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]); - } - } - if (!ctx->fetch->run) { - ctx->fetch->run = 1; - pthread_create(&ctx->fetch_thread, NULL, tube_fetch_thread, ctx->fetch); - } - if (!ctx->sink->run) { - ctx->sink->run = 1; - pthread_create(&ctx->sink_thread, NULL, tube_sink_thread, ctx->sink); - } - - return 0; + int i = 0; + if (ctx == NULL) { + return -1; + } + + for (i = 0; i < ctx->nstages; i++) { + if (!ctx->stages[i].run) { + ctx->stages[i].run = 1; + pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]); + } + } + if (!ctx->fetch->run) { + ctx->fetch->run = 1; + pthread_create(&ctx->fetch_thread, NULL, tube_fetch_thread, ctx->fetch); + } + if (!ctx->sink->run) { + ctx->sink->run = 1; + pthread_create(&ctx->sink_thread, NULL, tube_sink_thread, ctx->sink); + } + + return 0; } int tube_inject(tube *ctx, int timeout_ms, void *element) { - if (ctx == NULL || !ctx->nitcs) { - return -1; - } + if (ctx == NULL || !ctx->nitcs) { + return -1; + } - return itc_inject(ctx->itcs[0], timeout_ms, element); + return itc_inject(ctx->itcs[0], timeout_ms, element); } void *tube_retrive(tube *ctx, int timeout_ms) { - if (ctx == NULL || !ctx->nitcs) { - return NULL; - } + if (ctx == NULL || !ctx->nitcs) { + return NULL; + } - return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms); + return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms); } void tube_wait_empty(tube *ctx) { - int i; + int i; - if (ctx == NULL) { - return; - } + if (ctx == NULL) { + return; + } - for (i = 0; i < ctx->nitcs; i++) { - itc_wait_empty(ctx->itcs[i]); - } + for (i = 0; i < ctx->nitcs; i++) { + itc_wait_empty(ctx->itcs[i]); + } } void tube_discard_all(tube *ctx) { - int i; + int i; - if (ctx == NULL) { - return; - } + if (ctx == NULL) { + return; + } - for (i = 0; i < ctx->nitcs; i++) { - itc_discard_all(ctx->itcs[i], ctx->free_element[i]); - } + for (i = 0; i < ctx->nitcs; i++) { + itc_discard_all(ctx->itcs[i], ctx->free_element[i]); + } } int tube_stop(tube *ctx) { - int i = 0; - if (ctx == NULL) { - return -1; - } - - for (i = 0; i < ctx->nstages; i++) { - if (ctx->stages[i].run) { - ctx->stages[i].run = 0; - pthread_join(ctx->stages_thread[i], NULL); - } - } - if (ctx->fetch->run) { - ctx->fetch->run = 0; - pthread_join(ctx->fetch_thread, NULL); - } - if (ctx->sink->run) { - ctx->sink->run = 0; - pthread_join(ctx->sink_thread, NULL); - } - - return 0; + int i = 0; + if (ctx == NULL) { + return -1; + } + + for (i = 0; i < ctx->nstages; i++) { + if (ctx->stages[i].run) { + ctx->stages[i].run = 0; + pthread_join(ctx->stages_thread[i], NULL); + } + } + if (ctx->fetch->run) { + ctx->fetch->run = 0; + pthread_join(ctx->fetch_thread, NULL); + } + if (ctx->sink->run) { + ctx->sink->run = 0; + pthread_join(ctx->sink_thread, NULL); + } + + return 0; } void tube_free(tube **ctx) { - int i; - - if (ctx == NULL || *ctx == NULL) { - return; - } - - tube_stop(*ctx); - - for (i = 0; i < (*ctx)->nitcs; i++) { - itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]); - } - free((*ctx)->itcs); - free((*ctx)->free_element); - free((*ctx)->stages); - free((*ctx)->stages_thread); - free((*ctx)->fetch); - free((*ctx)->sink); - - free(*ctx); - *ctx = NULL; + int i; + + if (ctx == NULL || *ctx == NULL) { + return; + } + + tube_stop(*ctx); + + for (i = 0; i < (*ctx)->nitcs; i++) { + itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]); + } + free((*ctx)->itcs); + free((*ctx)->free_element); + free((*ctx)->stages); + free((*ctx)->stages_thread); + free((*ctx)->fetch); + free((*ctx)->sink); + + free(*ctx); + *ctx = NULL; } int tube_get_queued(const tube *ctx, int stage) { - if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) { - return -1; - } - return itc_get_queued(ctx->itcs[stage]); + if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) { + return -1; + } + return itc_get_queued(ctx->itcs[stage]); } int tube_get_slots(const tube *ctx, int stage) { - if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) { - return -1; - } - return itc_get_slots(ctx->itcs[stage]); + if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) { + return -1; + } + return itc_get_slots(ctx->itcs[stage]); } diff --git a/src/utils.c b/src/utils.c index 8505b63..6b2ff46 100644 --- a/src/utils.c +++ b/src/utils.c @@ -5,31 +5,31 @@ struct timespec *timespec_add_ms(struct timespec *ts, int timeout_ms) { - long long nsec; + long long nsec; - if (!ts) { - return NULL; - } + if (!ts) { + return NULL; + } - nsec = ts->tv_nsec + timeout_ms * 1000000LL; - ts->tv_sec += nsec / 1000000000LL; - ts->tv_nsec = nsec % 1000000000LL; + nsec = ts->tv_nsec + timeout_ms * 1000000LL; + ts->tv_sec += nsec / 1000000000LL; + ts->tv_nsec = nsec % 1000000000LL; - return ts; + return ts; } struct timespec *gettimespec(struct timespec *ts) { - struct timeval tv; + struct timeval tv; - if (!ts) { - return NULL; - } + if (!ts) { + return NULL; + } - gettimeofday(&tv, NULL); - ts->tv_sec = tv.tv_sec; - ts->tv_nsec = tv.tv_usec * 1000L; + gettimeofday(&tv, NULL); + ts->tv_sec = tv.tv_sec; + ts->tv_nsec = tv.tv_usec * 1000L; - return ts; + return ts; } |