aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorNicolas Dato <nicolas.dato@gmail.com>2024-10-13 19:57:49 -0300
committerNicolas Dato <nicolas.dato@gmail.com>2024-10-13 19:57:49 -0300
commit4a4e956c26d2f73534e504c36e2ca6a2617644b8 (patch)
tree72bc232f93514b3206da986e4d4aad1e3024e7d8 /src
parent37e2f4899b5d3e594c1080e25c04b40bc29d53a4 (diff)
downloadlibtuberia-4a4e956c26d2f73534e504c36e2ca6a2617644b8.tar.gz
adding tests for the main library
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.in1
-rw-r--r--src/itc.c28
-rw-r--r--src/itc.h13
-rw-r--r--src/tuberia.c34
-rw-r--r--src/tuberia.h15
-rw-r--r--src/utils.h6
6 files changed, 71 insertions, 26 deletions
diff --git a/src/Makefile.in b/src/Makefile.in
index 7765882..e7f1714 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -263,6 +263,7 @@ datarootdir = @datarootdir@
docdir = @docdir@
dvidir = @dvidir@
exec_prefix = @exec_prefix@
+has_valgrind = @has_valgrind@
host_alias = @host_alias@
htmldir = @htmldir@
includedir = @includedir@
diff --git a/src/itc.c b/src/itc.c
index 5b6fee9..e4698e8 100644
--- a/src/itc.c
+++ b/src/itc.c
@@ -37,7 +37,7 @@ void itc_free(itc **ctx, itc_free_element free_element)
return;
}
- itc_drop(*ctx, free_element);
+ itc_discard_all(*ctx, free_element);
sem_destroy(&(*ctx)->emptied);
sem_destroy(&(*ctx)->occupied);
@@ -71,7 +71,7 @@ int itc_inject(itc *ctx, int timeout_ms, void *element)
{
struct timespec ts;
- if (ctx == NULL) {
+ if (ctx == NULL || element == NULL) {
return -1;
}
@@ -87,7 +87,7 @@ int itc_inject(itc *ctx, int timeout_ms, void *element)
return 0;
}
-void itc_flush(itc *ctx)
+void itc_wait_empty(itc *ctx)
{
int i;
@@ -97,11 +97,13 @@ void itc_flush(itc *ctx)
for (i = 0; i < ctx->nslots; i++) {
sem_wait(&ctx->emptied);
+ }
+ for (i = 0; i < ctx->nslots; i++) {
sem_post(&ctx->emptied);
}
}
-void itc_drop(itc *ctx, itc_free_element free_element)
+void itc_discard_all(itc *ctx, itc_free_element free_element)
{
void *element;
@@ -116,3 +118,21 @@ void itc_drop(itc *ctx, itc_free_element free_element)
}
}
+int itc_get_queued(itc *ctx)
+{
+ int val;
+ if (ctx == NULL) {
+ return -1;
+ }
+ sem_getvalue(&ctx->occupied, &val);
+ return val;
+}
+
+int itc_get_slots(itc *ctx)
+{
+ if (ctx == NULL) {
+ return -1;
+ }
+ return ctx->nslots;
+}
+
diff --git a/src/itc.h b/src/itc.h
index 01a0564..07dc03a 100644
--- a/src/itc.h
+++ b/src/itc.h
@@ -1,5 +1,5 @@
-#ifndef __LIBTUBERIA_ITC_H__
-#define __LIBTUBERIA_ITC_H__
+#ifndef LIBTUBERIA_ITC_H__
+#define LIBTUBERIA_ITC_H__
typedef struct itc itc;
typedef void (*itc_free_element)(void *element);
@@ -9,7 +9,10 @@ void itc_free(itc **ctx, itc_free_element free_element);
void *itc_retrive(itc *ctx, int timeout_ms);
int itc_inject(itc *ctx, int timeout_ms, void *element);
-void itc_flush(itc *ctx);
-void itc_drop(itc *ctx, itc_free_element free_element);
+void itc_wait_empty(itc *ctx);
+void itc_discard_all(itc *ctx, itc_free_element free_element);
-#endif //__LIBTUBERIA_ITC_H__
+int itc_get_queued(itc *ctx);
+int itc_get_slots(itc *ctx);
+
+#endif //LIBTUBERIA_ITC_H__
diff --git a/src/tuberia.c b/src/tuberia.c
index 7e3a013..e6cd398 100644
--- a/src/tuberia.c
+++ b/src/tuberia.c
@@ -134,7 +134,7 @@ int tube_stage_append(tube_source *source, tube_stage *stage)
return 0;
}
-tube *tube_create(tube_source *source, tube_sink sink, void *opaque)
+tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque)
{
tube *ctx;
struct tube_stage *s;
@@ -250,7 +250,7 @@ int tube_start(tube *ctx)
for (i = 0; i < ctx->nstages; i++) {
if (!ctx->stages[i].run) {
ctx->stages[i].run = 1;
- pthread_create(&ctx->stages_thread[0], NULL, tube_stage_thread, &ctx->stages[i]);
+ pthread_create(&ctx->stages_thread[i], NULL, tube_stage_thread, &ctx->stages[i]);
}
}
if (!ctx->fetch->run) {
@@ -283,7 +283,7 @@ void *tube_retrive(tube *ctx, int timeout_ms)
return itc_retrive(ctx->itcs[ctx->nitcs - 1], timeout_ms);
}
-void tube_flush(tube *ctx)
+void tube_wait_empty(tube *ctx)
{
int i;
@@ -292,11 +292,11 @@ void tube_flush(tube *ctx)
}
for (i = 0; i < ctx->nitcs; i++) {
- itc_flush(ctx->itcs[i]);
+ itc_wait_empty(ctx->itcs[i]);
}
}
-void tube_drop(tube *ctx)
+void tube_discard_all(tube *ctx)
{
int i;
@@ -305,7 +305,7 @@ void tube_drop(tube *ctx)
}
for (i = 0; i < ctx->nitcs; i++) {
- itc_drop(ctx->itcs[i], ctx->free_element[i]);
+ itc_discard_all(ctx->itcs[i], ctx->free_element[i]);
}
}
@@ -319,7 +319,7 @@ int tube_stop(tube *ctx)
for (i = 0; i < ctx->nstages; i++) {
if (ctx->stages[i].run) {
ctx->stages[i].run = 0;
- pthread_join(ctx->stages_thread[0], NULL);
+ pthread_join(ctx->stages_thread[i], NULL);
}
}
if (ctx->fetch->run) {
@@ -343,13 +343,15 @@ void tube_free(tube **ctx)
return;
}
- tube_stop((*ctx));
+ tube_stop(*ctx);
for (i = 0; i < (*ctx)->nitcs; i++) {
itc_free(&(*ctx)->itcs[i], (*ctx)->free_element[i]);
}
free((*ctx)->itcs);
+ free((*ctx)->free_element);
free((*ctx)->stages);
+ free((*ctx)->stages_thread);
free((*ctx)->fetch);
free((*ctx)->sink);
@@ -357,3 +359,19 @@ void tube_free(tube **ctx)
*ctx = NULL;
}
+int tube_get_queued(const tube *ctx, int stage)
+{
+ if (ctx == NULL || stage < 0 || stage >= ctx->nstages) {
+ return -1;
+ }
+ return itc_get_queued(ctx->itcs[stage]);
+}
+
+int tube_get_slots(const tube *ctx, int stage)
+{
+ if (ctx == NULL || stage < 0 || stage >= ctx->nitcs) {
+ return -1;
+ }
+ return itc_get_slots(ctx->itcs[stage]);
+}
+
diff --git a/src/tuberia.h b/src/tuberia.h
index d410bab..9b3c56c 100644
--- a/src/tuberia.h
+++ b/src/tuberia.h
@@ -1,5 +1,5 @@
-#ifndef __LIBTUBERIA_H__
-#define __LIBTUBERIA_H__
+#ifndef LIBTUBERIA_H__
+#define LIBTUBERIA_H__
typedef struct tube tube;
typedef struct tube_source tube_source;
@@ -17,15 +17,18 @@ void tube_source_and_stages_free(tube_source **source);
int tube_stage_append(tube_source *source, tube_stage *stage);
-tube *tube_create(tube_source *source, tube_sink sink, void *opaque);
+tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque);
int tube_start(tube *ctx);
int tube_stop(tube *ctx);
void tube_free(tube **ctx);
int tube_inject(tube *ctx, int timeout_ms, void *element);
void *tube_retrive(tube *ctx, int timeout_ms);
-void tube_flush(tube *ctx);
-void tube_drop(tube *ctx);
+void tube_wait_empty(tube *ctx);
+void tube_discard_all(tube *ctx);
-#endif //__LIBTUBERIA_H__
+int tube_get_queued(const tube *ctx, int stage);
+int tube_get_slots(const tube *ctx, int stage);
+
+#endif //LIBTUBERIA_H__
diff --git a/src/utils.h b/src/utils.h
index a98763a..0fde429 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -1,10 +1,10 @@
-#ifndef __LIBTUBERIA_UTILS_H__
-#define __LIBTUBERIA_UTILS_H__
+#ifndef LIBTUBERIA_UTILS_H__
+#define LIBTUBERIA_UTILS_H__
#include <sys/time.h>
struct timespec *timespec_add_ms(struct timespec *ts, int timeout_ms);
struct timespec *gettimespec(struct timespec *ts);
-#endif
+#endif //LIBTUBERIA_UTILS_H__