/*
* Copyright (c) 2025 Nicolas Dato
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
#ifndef LIBTUBERIA_H__
#define LIBTUBERIA_H__
/** @mainpage
*
* @tableofcontents
*
* @section introduction Introduction
*
* libtuberia: a library to implement a pipeline.
*
* A pipeline would be:
*
* [Source] -> [queue_1] -> [Stage_1] -> [Queue_2] -> [Stage_2] -> [...] -> [Sink]
*
* Each source, stage, and sink runs in a thread, reads from its input queue and
* write to the output queue.
*
* You can omit the source and/or sink, and inject and/or retrieve elements from
* the pipeline.
*
* @section building Building
*
* If you don't have a ./configure file, run:
* - `autoreconf -fi`
*
* When you have the ./configure file, run:
* - `./configure`
* - `make`
* - `make install`
*
* You can see some configure options with
* - `./configure --help`
*
* @section using Using this library
*
* Include tuberia.h, and link libtuberia.a and pthread: `-ltuberia -lpthread`
*
* You can use: `pkg-config --cflags --libs --static tuberia`
*
* @section quick_guide Quick Guide
*
* See the files example/simple.c and example/decode_resize_encode.c (function
* do_tuberia() in particular)
*
* Create a source with @ref tube_source_alloc().
* If you set fetch to something, there will be a thread running fetch to inject
* elements into the pipeline.
* If you set fetch to NULL, you have to inject elements with @ref tube_inject().
*
* Create as many stages as you need with @ref tube_stage_alloc(), and append
* them to the source with @ref tube_stage_append().
*
* Build the pipeline with @ref tube_alloc(). If you set sink to something,
* there will be a thread running sink to get the elements from the pipeline.
* If you set sink to NULL, you have to retrieve the elements
* with @ref tube_retrieve().
*
* The @ref tube_alloc() function copies the source and all the appended stages,
* so the source and stages can be freed with @ref tube_source_and_stages_free().
* Or, you can reutilize the same @ref tube_source to build more pipelines.
*
* Start the pipeline with @ref tube_start().
*
* Free the pipeline with @ref tube_free().
*
* @subsection quick_example Example
*
* See example/simple.c
*
* Compile with: `gcc -o simple simple.c -ltuberia -lpthread`
*
* Or with: ``gcc -o simple simple.c `pkg-config --cflags --libs --static tuberia```
*
@code {.c}
#include
#include
#include
void *stage_one(void *element, void *opaque)
{
int *i = element;
*i += 5;
return i;
}
void *stage_two(void *element, void *opaque)
{
int *i = element;
*i *= 10;
return i;
}
void sink(void *element, void *opaque)
{
int *i = element;
printf("sink: %d\n", *i);
free(i);
}
int main(void)
{
tube *ctx;
tube_source *source;
int *element, n;
source = tube_source_alloc(2, NULL, NULL, free);
tube_stage_append(source, tube_stage_alloc(2, stage_one, NULL, free));
tube_stage_append(source, tube_stage_alloc(2, stage_two, NULL, free));
ctx = tube_alloc(source, sink, NULL);
tube_source_and_stages_free(&source);
tube_start(ctx);
while (scanf("%d", &n) == 1) {
element = malloc(sizeof(*element));
*element = n;
tube_inject(ctx, 1000, element);
}
tube_free(&ctx);
return 0;
}
@endcode
*/
/**
* The main structure.
*
* Allocate with @ref tube_alloc()
* Free it with @ref tube_free()
*
* @see tube_alloc(), tube_free()
*/
typedef struct tube tube;
/**
* The source, to generate elements and put them into the pipeline.
*
* This is the start of the pipeline, the start of a list with all the stages.
* Allocate with @ref tube_source_alloc()
* Free it with @ref tube_source_and_stages_free()
*
* @see tube_source_alloc(), tube_source_and_stages_free()
*/
typedef struct tube_source tube_source;
/**
* An stage in the pipeline, gets an element from a stage (or the source),
* process it, and puts it into the next stage (or the sink).
*
* Allocate with @ref tube_stage_alloc()
* Append it to a source at the end of the list with @ref tube_stage_append()
* Free it with @ref tube_source_and_stages_free()
* If it wasn't appended, free it with @ref tube_stage_free()
*
* @see tube_stage_alloc(), tube_source_and_stages_free(), tube_stage_free()
*/
typedef struct tube_stage tube_stage;
/**
* A function to generate a new element and insert it in the pipeline.
*
* @param opaque The opaque pointer in @ref tube_source_alloc()
*
* @return The new element
*
* @see tube_source_alloc()
*/
typedef void *(*tube_fetch)(void *opaque);
/**
* A function to process an element and put the result in the next stage.
*
* @param element The input element. Remember to free it if needed.
* @param opaque The opaque pointer in @ref tube_stage_alloc()
*
* @return The output element
*
* @see tube_stage_alloc()
*/
typedef void *(*tube_process)(void *element, void *opaque);
/**
* A function to get the last element from the pipeline
*
* @param element The element from the last stage.
* @param opaque The opaque pointer in @ref tube_alloc()
*
* @see tube_alloc()
*/
typedef void (*tube_sink)(void *element, void *opaque);
/**
* A function to free an element from the pipeline.
*
* This is used by @ref tube_discard_all() and @ref tube_free()
*
* @see tube_discard_all(), tube_free()
*/
typedef void (*tube_free_element)(void *element);
/**
* Allocates a source, where elements are created and inserted into the pipeline.
*
* Free it with @ref tube_source_and_stages_free()
* Append stages at the end of the pipeline with @ref tube_stage_append()
*
* @param nslots Number of slots available in the queue between this source and
* the next stage. If the queue is full, the source will block
* @param fetch A function to generate a new element. A thread will run and call
* this function to get the new element and put it in the queue.
* If this is NULL, you have to insert the elements
* with @ref tube_inject().
* @param opaque A data pointer passed to the fetch function.
* @param free_element A function to free the elements generated by the fetch
* function, or inserted with @ref tube_inject().
*
* @return A new source
*
* @see tube_source_and_stages_free()
*/
tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque,
tube_free_element free_element);
/**
* Allocates a source, a function to receive an element, process it, and send it
* to the next stage.
*
* Free it with @ref tube_source_and_stages_free(), unless it was never appended
* then you free it with @ref tube_stage_free().
*
* @param nslots Number of slots available in the queue between this stage and
* the next stage. If the queue is full, the stage will block.
* @param process A function to process the element. Can't be NULL.
* @param opaque A data pointer passed to the process function.
* @param free_element A function to free the elements generated by the process
* function.
*
* @return A new stage
*
* @see tube_stage_append(), tube_source_and_stages_free(), tube_stage_free()
*/
tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque,
tube_free_element free_element);
/**
* Frees an stage and sets the pointer to NULL
*
* @note If the stage was appended to a pipeline,
* you must use @ref tube_source_and_stages_free().
*
* @param stage A pointer to the stage to be freed
*
* @see tube_source_alloc(), tube_source_and_stages_free()
*/
void tube_stage_free(tube_stage **stage);
/**
* Frees a source and all stages appended to it, and sets the pointer to NULL
*
* @note This also frees all stages appended to the source with @ref tube_stage_append().
*
* @param source A pointer to the source to be freed
*
* @see tube_source_alloc(), tube_stage_alloc(), tube_stage_free()
*/
void tube_source_and_stages_free(tube_source **source);
/**
* Appends an stage to the source, and the end of the pipeline.
*
* You can use tube_stage_append(source, tube_source_alloc(...)).
*
* @note The owner of the stage pointer will be the source. Don't use it anymore.
*
* @param source The source to append the stage.
* @param stage The stage to be appended to the source. The source will become
* the owner of this pointer.
*
* @return 0 if OK, less than 0 in case of error.
*/
int tube_stage_append(tube_source *source, tube_stage *stage);
/**
* Creates the pipeline, using the source allocated with @ref tube_source_alloc()
* and all the stages allocated with @ref tube_stage_alloc() and appended with
* @ref tube_stage_append().
*
* Free it with @ref tube_free().
* The pipeline won't start until you call @ref tube_start().
*
* @param source The source with all the stages appended
* @param sink A function to receive the last element from the pipeline.
* If this is NULL, you have to use @ref tube_retrieve().
* @param opaque A data pointer passed to the sink function.
*
* @return The pipeline, or NULL in case of error
*
* @see tube_free()
*/
tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque);
/**
* Starts the pipeline.
*
* This will start the threads to read from queues, process the elements, and
* write to queues the result.
* Can be stopped with @ref tube_stop() or @ref tube_stop_and_wait_empty().
*
* @param ctx The pipeline to be started
*
* @return 0 if OK, less than 0 in case of error.
*
* @see tube_stop(), tube_stop_and_wait_empty()
*/
int tube_start(tube *ctx);
/**
* Stops the pipeline.
*
* This will stop the threads. Can be resumed later with @ref tube_start().
* This won't flush the queues, if there are pending elements they will still be
* in the queues.
* Use @ref tube_stop_and_wait_empty() to stop and flush the queues.
*
* @param ctx The pipeline to be stopped
*
* @return 0 if OK, less than 0 in case of error.
*
* @see tube_start(), tube_stop_and_wait_empty()
*/
int tube_stop(tube *ctx);
/**
* Frees the pipeline and sets the pointer to NULL.
*
* This also frees all pending elements in the queue.
*
* @param ctx A pointer to the pipeline to be freed.
*
* @see tube_alloc()
*/
void tube_free(tube **ctx);
/**
* Insert an element to the first stage.
*
* Use this function if you set NULL to the source at @ref tube_source_alloc().
* You can call this if the source is not NULL, i.e. if there is a thread
* running @ref tube_source, but be aware that the element will be inserted out
* of order, if that's important for the pipeline.
*
* @param ctx The pipeline where to insert the element
* @param timeout_ms A timeout in milliseconds, use 0 to return without blocking
* @param element The element to be inserted into the pipeline. Can't be NULL
*
* @return 0 if OK, less than 0 if timeout is reached or if there is an error
*
* @see tube_inject_at(), tube_retrieve()
*/
int tube_inject(tube *ctx, int timeout_ms, void *element);
/**
* Insert an element to an specific stage.
*
* If stage is 0, this is equivalent to calling @ref tube_inject().
*
* @warning This will insert an out of order element. Use this function if the
* order of the element inserted is not important.
*
* @param ctx The pipeline where to insert the element.
* @param timeout_ms A timeout in milliseconds, use 0 to return without blocking.
* @param stage The 0-based stage that will process this element.
* If there are 2 stages, stage = 0 means the element will be
* inserted to the input queue of the first stage,
* stage = 1 means the input queue of the second stage,
* and stage = 3 means the input queue of the sink.
* @param element The element to be inserted into the pipeline. Can't be NULL
*
* @return 0 if OK, less than 0 if timeout is reached or if there is an error
*
* @see tube_inject(), tube_retrieve()
*/
int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element);
/**
* Gets an element from the last stage.
*
* Use this function if you set NULL to the sink at @ref tube_alloc().
* You can call this if the sink is not NULL, i.e. if there is a thread running
* @ref tube_sink, but be aware that the element will be removed out of order,
* if that's important for the pipeline.
*
* @param ctx The pipeline from where to get the element
* @param timeout_ms A timeout in milliseconds, use 0 to return without blocking
*
* @return The element, NULL if timeout is reached or if there is an error
*
* @see tube_inject()
*/
void *tube_retrieve(tube *ctx, int timeout_ms);
/**
* Stops the pipeline, waiting for all queues to be empty.
*
* @param ctx The pipeline to be stopped
*
* @see tube_stop(), tube_start()
*/
void tube_stop_and_wait_empty(tube *ctx);
/**
* Discards and frees all elements in the queue
*
* @param ctx The pipeline to discard elements
*
* @see tube_free_element()
*/
void tube_discard_all(tube *ctx);
/**
* Returns the number of queued elements at a stage.
*
* @param ctx The pipeline
* @param stage The 0-based stage. If there are 2 stages,
* stage = 0 means the elements queued for the first stage,
* stage = 1 means the elements queued for the second stage,
* and stage = 3 means the elements queued for the sink.
*
* @return The number of queued elements
*/
int tube_get_queued(const tube *ctx, int stage);
/**
* Returns the size (slots) of the queue at a stage.
*
* @param ctx The pipeline
* @param stage The 0-based stage. If there are 2 stages,
* stage = 0 means the queue for the first stage,
* stage = 1 means the queue for the second stage,
* and stage = 3 means the queue for the sink.
*
* @return The number of slots of the queue
*/
int tube_get_slots(const tube *ctx, int stage);
#endif //LIBTUBERIA_H__