# libtuberia libtuberia: a library to implement a pipeline A pipeline would be: *[Source] -> [queue_1] -> [Stage_1] -> [Queue_2] -> [Stage_2] -> [...] -> [Sink]* Each source, stage, and sink runs in a thread, reads from its imput queue and write to the output queue You can ommit the source and sink, and inject or retrive elements from the pipeline ## Building, Compiling, Installing If you don't have a ./configure file, run: - `autoreconf -fi` When you have the ./configure file, run: - `./configure` - `make` - `make install` You can see some configuration options with: - `./configure --help` ## Using this library Include tuberia.h, and link libtuberia.a and pthread: `-ltuberia -lpthread` You can also use: `pkg-config --cflags --libs --static tuberia` ## Quick Guide See the files `example/simple.c` and `example/decode_resize_encode.c` (function `do_tuberia()` in particular) Create a source with `tube_source_alloc()`. If you set fetch to something, there will be a thread running fetch to inject elements into the pipeline. If you set fetch to NULL, you have to inject the elements with `tube_inject()`. Create as many stages as you need with `tube_stage_alloc()`, and append them to the source with `tube_stage_append()`. Build the pipeline with `tube_alloc()`. If you set sink to something, there will be a thread running sink to get the elements from the pipeline. If you set sink to NULL, you have to retrieve the elements with `tube_retrieve()`. The `tube_alloc()` function copies the source and all the appended stages, so you can free the source and stages with `tube_source_and_stages_free()`. Or, you can reutilize the same `tube_source` to build more pipelines. Start the pipeline with `tube_start()`. Free the pipeline with `tube_free()`. ### Quick example See `examples/simple.c` Compile with: `gcc -o simple simple.c -ltuberia -lpthread` Or with: ``gcc -o simple simple.c `pkg-config --cflags --libs --static tuberia``` ```C #include #include #include void *stage_one(void *element, void *opaque) { int *i = element; *i += 5; return i; } void *stage_two(void *element, void *opaque) { int *i = element; *i *= 10; return i; } void sink(void *element, void *opaque) { int *i = element; printf("sink: %d\n", *i); free(i); } int main(void) { tube *ctx; tube_source *source; int *element, n; source = tube_source_alloc(2, NULL, NULL, free); tube_stage_append(source, tube_stage_alloc(2, stage_one, NULL, free)); tube_stage_append(source, tube_stage_alloc(2, stage_two, NULL, free)); ctx = tube_alloc(source, sink, NULL); tube_source_and_stages_free(&source); tube_start(ctx); while (scanf("%d", &n) == 1) { element = malloc(sizeof(*element)); *element = n; tube_inject(ctx, 1000, element); } tube_free(&ctx); return 0; } ```