/* * Copyright (C) 2023, 2024, 2025 Nicolas Dato * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in all * copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ #ifndef LIBTUBERIA_H__ #define LIBTUBERIA_H__ /** @mainpage * * @tableofcontents * * @section introduction Introduction * * libtuberia: a library to implement a pipeline. * * A pipeline would be: * * [Source] -> [queue_1] -> [Stage_1] -> [Queue_2] -> [Stage_2] -> [...] -> [Sink] * * Each source, stage, and sink runs in a thread, reads from its input queue, * processes the element, and writes the result to the output queue. * * @section quick_guide Quick Guide * * See the files example/simple.c and example/decode_resize_encode.c (function * do_tuberia() in particular) * * Create a source with @ref tube_source_alloc(). * If you set fetch to something, there will be a thread running fetch to inject * elements into the pipeline. * If you set fetch to NULL, you have to inject elements with @ref tube_inject(). * * Create as many stages as you need with @ref tube_stage_alloc(), and append * them to the source with @ref tube_stage_append(). * * Build the pipeline with @ref tube_alloc(). If you set sink to something, * there will be a thread running sink to get the elements from the pipeline. * If you set sink to NULL, you have to retrieve the elements * with @ref tube_retrieve(). * * The @ref tube_alloc() function copies the source and all the appended stages, * so the source and stages can be freed with @ref tube_source_and_stages_free(). * Or, you can reutilize the same @ref tube_source to build more pipelines. * * Start the pipeline with @ref tube_start(). * * Free the pipeline with @ref tube_free(). * * @subsection quick_example Example * * See example/simple.c * * Compile with: `gcc -o simple simple.c -ltuberia -lpthread` * * Or with: ``gcc -o simple simple.c `pkg-config --cflags --libs --static tuberia``` * @code {.c} #include #include #include void *stage_one(void *element, void *opaque) { int *i = element; *i += 5; return i; } void *stage_two(void *element, void *opaque) { int *i = element; *i *= 10; return i; } void sink(void *element, void *opaque) { int *i = element; printf("sink: %d\n", *i); free(i); } int main(void) { tube *ctx; tube_source *source; int *element, n; source = tube_source_alloc(2, NULL, NULL, free); tube_stage_append(source, tube_stage_alloc(2, stage_one, NULL, free)); tube_stage_append(source, tube_stage_alloc(2, stage_two, NULL, free)); ctx = tube_alloc(source, sink, NULL); tube_source_and_stages_free(&source); tube_start(ctx); while (scanf("%d", &n) == 1) { element = malloc(sizeof(*element)); *element = n; tube_inject(ctx, 1000, element); } tube_free(&ctx); return 0; } @endcode */ /** * The main structure. * * Allocate with @ref tube_alloc() * Free it with @ref tube_free() * * @see tube_alloc(), tube_free() */ typedef struct tube tube; /** * The source, to generate elements and put them into the pipeline. * * This is the start of the pipeline, the start of a list with all the stages. * Allocate with @ref tube_source_alloc() * Free it with @ref tube_source_and_stages_free() * * @see tube_source_alloc(), tube_source_and_stages_free() */ typedef struct tube_source tube_source; /** * An stage in the pipeline, gets an element from a stage (or the source), * process it, and puts it into the next stage (or the sink). * * Allocate with @ref tube_stage_alloc() * Append it to a source at the end of the list with @ref tube_stage_append() * Free it with @ref tube_source_and_stages_free() * If it wasn't appended, free it with @ref tube_stage_free() * * @see tube_stage_alloc(), tube_source_and_stages_free(), tube_stage_free() */ typedef struct tube_stage tube_stage; /** * A function to generate a new element and insert it in the pipeline. * * @param opaque The opaque pointer in @ref tube_source_alloc() * * @return The new element * * @see tube_source_alloc() */ typedef void *(*tube_fetch)(void *opaque); /** * A function to process an element and put the result in the next stage. * * @param element The input element. Remember to free it if needed. * @param opaque The opaque pointer in @ref tube_stage_alloc() * * @return The output element * * @see tube_stage_alloc() */ typedef void *(*tube_process)(void *element, void *opaque); /** * A function to get the last element from the pipeline * * @param element The element from the last stage. * @param opaque The opaque pointer in @ref tube_alloc() * * @see tube_alloc() */ typedef void (*tube_sink)(void *element, void *opaque); /** * A function to free an element from the pipeline. * * This is used by @ref tube_discard_all() and @ref tube_free() * * @see tube_discard_all(), tube_free() */ typedef void (*tube_free_element)(void *element); /** * Allocates a source, where elements are created and inserted into the pipeline. * * Free it with @ref tube_source_and_stages_free() * Append stages at the end of the pipeline with @ref tube_stage_append() * * @param nslots Number of slots available in the queue between this source and * the next stage. If the queue is full, the source will block * @param fetch A function to generate a new element. A thread will run and call * this function to get the new element and put it in the queue. * If this is NULL, you have to insert the elements * with @ref tube_inject(). * @param opaque A data pointer passed to the fetch function. * @param free_element A function to free the elements generated by the fetch * function, or inserted with @ref tube_inject(). * * @return A new source * * @see tube_source_and_stages_free() */ tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque, tube_free_element free_element); /** * Allocates a source, a function to receive an element, process it, and send it * to the next stage. * * Free it with @ref tube_source_and_stages_free(), unless it was never appended * then you free it with @ref tube_stage_free(). * * @param nslots Number of slots available in the queue between this stage and * the next stage. If the queue is full, the stage will block. * @param process A function to process the element. Can't be NULL. * @param opaque A data pointer passed to the process function. * @param free_element A function to free the elements generated by the process * function. * * @return A new stage * * @see tube_stage_append(), tube_source_and_stages_free(), tube_stage_free() */ tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque, tube_free_element free_element); /** * Frees an stage and sets the pointer to NULL * * @note If the stage was appended to a pipeline, * you must use @ref tube_source_and_stages_free(). * * @param stage A pointer to the stage to be freed * * @see tube_source_alloc(), tube_source_and_stages_free() */ void tube_stage_free(tube_stage **stage); /** * Frees a source and all stages appended to it, and sets the pointer to NULL * * @note This also frees all stages appended to the source with @ref tube_stage_append(). * * @param source A pointer to the source to be freed * * @see tube_source_alloc(), tube_stage_alloc(), tube_stage_free() */ void tube_source_and_stages_free(tube_source **source); /** * Appends an stage to the source, and the end of the pipeline. * * You can use tube_stage_append(source, tube_source_alloc(...)). * * @note The owner of the stage pointer will be the source. Don't use it anymore. * * @param source The source to append the stage. * @param stage The stage to be appended to the source. The source will become * the owner of this pointer. * * @return 0 if OK, less than 0 in case of error. */ int tube_stage_append(tube_source *source, tube_stage *stage); /** * Creates the pipeline, using the source allocated with @ref tube_source_alloc() * and all the stages allocated with @ref tube_stage_alloc() and appended with * @ref tube_stage_append(). * * Free it with @ref tube_free(). * The pipeline won't start until you call @ref tube_start(). * * @param source The source with all the stages appended * @param sink A function to receive the last element from the pipeline. * If this is NULL, you have to use @ref tube_retrieve(). * @param opaque A data pointer passed to the sink function. * * @return The pipeline, or NULL in case of error * * @see tube_free() */ tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque); /** * Starts the pipeline. * * This will start the threads to read from queues, process the elements, and * write to queues the result. * Can be stopped with @ref tube_stop() or @ref tube_stop_and_wait_empty(). * * @param ctx The pipeline to be started * * @return 0 if OK, less than 0 in case of error. * * @see tube_stop(), tube_stop_and_wait_empty() */ int tube_start(tube *ctx); /** * Stops the pipeline. * * This will stop the threads. Can be resumed later with @ref tube_start(). * This won't flush the queues, if there are pending elements they will still be * in the queues. * Use @ref tube_stop_and_wait_empty() to stop and flush the queues. * * @param ctx The pipeline to be stopped * * @return 0 if OK, less than 0 in case of error. * * @see tube_start(), tube_stop_and_wait_empty() */ int tube_stop(tube *ctx); /** * Frees the pipeline and sets the pointer to NULL. * * This also frees all pending elements in the queue. * * @param ctx A pointer to the pipeline to be freed. * * @see tube_alloc() */ void tube_free(tube **ctx); /** * Insert an element to the first stage. * * Use this function if you set NULL to the source at @ref tube_source_alloc(). * You can call this if the source is not NULL, i.e. if there is a thread * running @ref tube_source, but be aware that the element will be inserted out * of order, if that's important for the pipeline. * * @param ctx The pipeline where to insert the element * @param timeout_ms A timeout in milliseconds, use 0 to return without blocking * @param element The element to be inserted into the pipeline. Can't be NULL * * @return 0 if OK, less than 0 if timeout is reached or if there is an error * * @see tube_inject_at(), tube_retrieve() */ int tube_inject(tube *ctx, int timeout_ms, void *element); /** * Insert an element to an specific stage. * * If stage is 0, this is equivalent to calling @ref tube_inject(). * * @warning This will insert an out of order element. Use this function if the * order of the element inserted is not important. * * @param ctx The pipeline where to insert the element. * @param timeout_ms A timeout in milliseconds, use 0 to return without blocking. * @param stage The 0-based stage that will process this element. * If there are 2 stages, stage = 0 means the element will be * inserted to the input queue of the first stage, * stage = 1 means the input queue of the second stage, * and stage = 3 means the input queue of the sink. * @param element The element to be inserted into the pipeline. Can't be NULL * * @return 0 if OK, less than 0 if timeout is reached or if there is an error * * @see tube_inject(), tube_retrieve() */ int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element); /** * Gets an element from the last stage. * * Use this function if you set NULL to the sink at @ref tube_alloc(). * You can call this if the sink is not NULL, i.e. if there is a thread running * @ref tube_sink, but be aware that the element will be removed out of order, * if that's important for the pipeline. * * @param ctx The pipeline from where to get the element * @param timeout_ms A timeout in milliseconds, use 0 to return without blocking * * @return The element, NULL if timeout is reached or if there is an error * * @see tube_inject() */ void *tube_retrieve(tube *ctx, int timeout_ms); /** * Stops the pipeline, waiting for all queues to be empty. * * @param ctx The pipeline to be stopped * * @see tube_stop(), tube_start() */ void tube_stop_and_wait_empty(tube *ctx); /** * Discards and frees all elements in the queue * * @param ctx The pipeline to discard elements * * @see tube_free_element() */ void tube_discard_all(tube *ctx); /** * Returns the number of queued elements at a stage. * * @param ctx The pipeline * @param stage The 0-based stage. If there are 2 stages, * stage = 0 means the elements queued for the first stage, * stage = 1 means the elements queued for the second stage, * and stage = 3 means the elements queued for the sink. * * @return The number of queued elements */ int tube_get_queued(const tube *ctx, int stage); /** * Returns the size (slots) of the queue at a stage. * * @param ctx The pipeline * @param stage The 0-based stage. If there are 2 stages, * stage = 0 means the queue for the first stage, * stage = 1 means the queue for the second stage, * and stage = 3 means the queue for the sink. * * @return The number of slots of the queue */ int tube_get_slots(const tube *ctx, int stage); #endif //LIBTUBERIA_H__