diff options
Diffstat (limited to 'src/tuberia.h')
-rw-r--r-- | src/tuberia.h | 328 |
1 files changed, 327 insertions, 1 deletions
diff --git a/src/tuberia.h b/src/tuberia.h index f83ea6f..e965840 100644 --- a/src/tuberia.h +++ b/src/tuberia.h @@ -1,34 +1,360 @@ #ifndef LIBTUBERIA_H__ #define LIBTUBERIA_H__ +/** @mainpage + * + * @tableofcontents + * + * @section introduction Introduction + * + * libtuberia: a library to implement a pipeline. + * + * A pipeline would be: + * + * <em>[Source] -> [queue_1] -> [Stage_1] -> [Queue_2] -> [Stage_2] -> [...] -> [Sink]</em> + * + * Each source, stage, and sink runs in a thread, reads from its input queue and + * write to the output queue. + * + * You can omit the source and/or sink, and inject and/or retrieve elements from the pipeline. + * + * @section building Building + * + * If you don't have a ./configure file, run: + * - autoreconf -fi + * + * When you have the ./configure file, run: + * - ./configure + * - make + * - make install + * + * You can see some configure options with + * - ./configure --help + * + * @section quick_guide Quick Guide + * + * See the example/decode_resize_encode.c and the function do_tuberia() + * + * Create a source with @ref tube_source_alloc(). + * If you set fetch to something, there will be a thread running fetch to inject + * elements into the pipeline. + * If you set fetch to NULL, you have to inject the elements with @ref tube_inject(). + * + * Create as many stages as you need with @ref tube_stage_alloc(), and append them + * to the source with @ref tube_stage_append(). + * + * Build the pipeline with @ref tube_alloc(). If you set sink to something, + * there will be a thread running sink to get the elements from the pipeline. + * If you set sink to NULL, you have to retrieve the elements with @ref tube_retrieve(). + * + * The @ref tube_alloc() function copies the source and all the appended stages, + * so you can free the source and stages with @ref tube_source_and_stages_free(). + * Or, you can reutilize the same tube_source to build more pipelines. + * + * Start the pipeline with @ref tube_start(). + * + * Free the pipeline with @ref tube_free(). + * + * @subsection quick_example Example + */ + +/** + * The main structure. + * + * Allocate with @ref tube_alloc() + * Free it with @ref tube_free() + * + * @see tube_alloc(), tube_free() + */ typedef struct tube tube; + +/** + * The source, to generate elements and put them into the pipeline. + * + * This is the start of the pipeline, the start of a list with all the stages. + * Allocate with @ref tube_source_alloc() + * Free it with @ref tube_source_and_stages_free() + * + * @see tube_source_alloc(), tube_source_and_stages_free() + */ typedef struct tube_source tube_source; + +/** + * An stage in the pipeline, gets an element from a stage (or the source), + * process it, and puts it into the next stage (or the sink). + * + * Allocate with @ref tube_stage_alloc() + * Append it to a source at the end of the list with @ref tube_stage_append() + * Free it with @ref tube_source_and_stages_free() + * If it wasn't appended, free it with @ref tube_stage_free() + * + * @see tube_stage_alloc(), tube_source_and_stages_free(), tube_stage_free() + */ typedef struct tube_stage tube_stage; +/** + * A function to generate a new element and insert it in the pipeline. + * + * @param opaque The opaque pointer in @ref tube_source_alloc() + * + * @return The new element + * + * @see tube_source_alloc() + */ typedef void *(*tube_fetch)(void *opaque); + +/** + * A function to process an element and put the result in the next stage. + * + * @param element The input element. Remember to free it if needed. + * @param opaque The opaque pointer in @ref tube_stage_alloc() + * + * @return The output element + * + * @see tube_stage_alloc() + */ typedef void *(*tube_process)(void *element, void *opaque); + +/** + * A function to get the last element from the pipeline + * + * @param element The element from the last stage. + * @param opaque The opaque pointer in @ref tube_alloc() + * + * @see tube_alloc() + */ typedef void (*tube_sink)(void *element, void *opaque); + +/** + * A function to free an element from the pipeline. + * + * This is used by @ref tube_discard_all() and @ref tube_free() + * + * @see tube_discard_all(), tube_free() + */ typedef void (*tube_free_element)(void *element); +/** + * Allocates a source, where elements are created and inserted into the pipeline. + * + * Free it with @ref tube_source_and_stages_free() + * Append stages at the end of the pipeline with @ref tube_stage_append() + * + * @param nslots Number of slots available in the queue between this source and + * the next stage. If the queue is full, the source will block + * @param fetch A function to generate a new element. A thread will run and call + * this function to get the new element and put it in the queue. + * If this is NULL, you have to insert the elements with @ref tube_inject(). + * @param opaque A data pointer passed to the fetch function. + * @param free_element A function to free the elements generated by the fetch + * function, or inserted with @ref tube_inject(). + * + * @return A new source + * + * @see tube_source_and_stages_free() + */ tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque, tube_free_element free_element); + +/** + * Allocates a source, a function to receive an element, process it, and send it to the next stage. + * + * Free it with @ref tube_source_and_stages_free(), unless it was never appended then you free it with @ref tube_stage_free() + * + * @param nslots Number of slots available in the queue between this stage and + * the next stage. If the queue is full, the stage will block. + * @param process A function to process the element. Can't be NULL. + * @param opaque A data pointer passed to the process function. + * @param free_element A function to free the elements generated by the process + * function. + * + * @return A new stage + * + * @see tube_stage_append(), tube_source_and_stages_free(), tube_stage_free() + */ tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque, tube_free_element free_element); + +/** + * Frees an stage and sets the pointer to NULL + * + * @note If the stage was appended to a pipeline, + * you must use @ref tube_source_and_stages_free(). + * + * @param stage A pointer to the stage to be freed + * + * @see tube_source_alloc(), tube_source_and_stages_free() + */ void tube_stage_free(tube_stage **stage); + +/** + * Frees a source and all stages appended to it, and sets the pointer to NULL + * + * @note This also frees all stages appended to the source with @ref tube_stage_append(). + * + * @param source A pointer to the source to be freed + * + * @see tube_source_alloc(), tube_stage_alloc(), tube_stage_free() + */ void tube_source_and_stages_free(tube_source **source); +/** + * Appends an stage to the source, and the end of the pipeline. + * + * You can use tube_stage_append(source, tube_source_alloc(...)). + * + * @note The owner of the stage pointer will be the source. Don't use it anymore. + * + * @param source The source to append the stage. + * @param stage The stage to be appended to the source. The source will become the owner of this pointer. + * + * @return 0 if OK, less than 0 in case of error. + */ int tube_stage_append(tube_source *source, tube_stage *stage); +/** + * Creates the pipeline, using the source allocated with @ref tube_source_alloc() + * and all the stages allocated with @ref tube_stage_alloc() and appended with + * @ref tube_stage_append(). + * + * Free it with @ref tube_free(). + * The pipeline won't start until you call @ref tube_start(). + * + * @param source The source with all the stages appended + * @param sink A function to receive the last element from the pipeline. If this is NULL, you have to use @ref tube_retrieve(). + * @param opaque A data pointer passed to the sink function. + * + * @return The pipeline, or NULL in case of error + * + * @see tube_free() + */ tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque); + +/** + * Starts the pipeline. + * + * This will start the threads to read from queues, process the elements, and write to queues the result. Can be stopped with @ref tube_stop() or @ref tube_stop_and_wait_empty(). + * + * @param ctx The pipeline to be started + * + * @return 0 if OK, less than 0 in case of error. + * + * @see tube_stop(), tube_stop_and_wait_empty() + */ int tube_start(tube *ctx); + +/** + * Stops the pipeline. + * + * This will stop the threads. Can be resumed later with @ref tube_start(). + * This won't flush the queues, if there are pending elements they will still be in the queues. Use @ref tube_stop_and_wait_empty() to stop and flush the queues. + * + * @param ctx The pipeline to be stopped + * + * @return 0 if OK, less than 0 in case of error. + * + * @see tube_start(), tube_stop_and_wait_empty() + */ int tube_stop(tube *ctx); + +/** + * Frees the pipeline and sets the pointer to NULL. + * + * This also frees all pending elements in the queue. + * + * @param ctx A pointer to the pipeline to be freed. + * + * @see tube_alloc() + */ void tube_free(tube **ctx); +/** + * Insert an element to the first stage. + * + * Use this function if you set NULL to the source at @ref tube_source_alloc(). + * You can call this if the source is not NULL, i.e. if there is a thread running + * @ref tube_source, but be aware that the element will be inserted out of order, + * if that's important for the pipeline. + * + * @param ctx The pipeline where to insert the element + * @param timeout_ms A timeout in milliseconds, can be 0 to return without blocking + * @param element The element to be inserted into the pipeline. Can't be NULL + * + * @return 0 if OK, less than 0 if the timeout is reached or if there is an error + * + * @see tube_inject_at(), tube_retrieve() + */ int tube_inject(tube *ctx, int timeout_ms, void *element); + +/** + * Insert an element to an specific stage. + * + * If stage is 0, this is equivalent to calling @ref tube_inject(). + * + * @warning This will insert an out of order element. Use this function if the order of the element inserted is not important. + * + * @param ctx The pipeline where to insert the element + * @param timeout_ms A timeout in milliseconds, can be 0 to return without blocking + * @param stage The 0-based stage that will process this element. If there are 2 stages, stage = 0 means the element will be inserted to the input queue of the first stage, stage = 1 means the input queue of the second stage, and stage = 3 means the input queue of the sink. + * @param element The element to be inserted into the pipeline. Can't be NULL + * + * @return 0 if OK, less than 0 if the timeout is reached or if there is an error + * + * @see tube_inject(), tube_retrieve() + */ int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element); -void *tube_retrive(tube *ctx, int timeout_ms); + +/** + * Gets an element from the last stage. + * + * Use this function if you set NULL to the sink at @ref tube_alloc(). + * You can call this if the sink is not NULL, i.e. if there is a thread running + * @ref tube_sink, but be aware that the element will be removed out of order, + * if that's important for the pipeline. + * + * @param ctx The pipeline from where to get the element + * @param timeout_ms A timeout in milliseconds, can be 0 to return without blocking + * + * @return The element, NULL if timeout is reached or if there is an error + * + * @see tube_inject() + */ +void *tube_retrieve(tube *ctx, int timeout_ms); + +/** + * Stops the pipeline, waiting for all queues to be empty. + * + * @param ctx The pipeline to be stopped + * + * @see tube_stop(), tube_start() + */ void tube_stop_and_wait_empty(tube *ctx); + +/** + * Discards and frees all elements in the queue + * + * @param ctx The pipeline to discard elements + * + * @see tube_free_element() + */ void tube_discard_all(tube *ctx); +/** + * Returns the number of queued elements at a stage. + * + * @param ctx The pipeline + * @param stage The 0-based stage. If there are 2 stages, stage = 0 means the elements queued for the first stage, stage = 1 means the elements queued for the second stage, and stage = 3 means the elements queued for the sink. + * + * @return The number of queued elements + */ int tube_get_queued(const tube *ctx, int stage); + +/** + * Returns the size (slots) of the queue at a stage. + * + * @param ctx The pipeline + * @param stage The 0-based stage. If there are 2 stages, stage = 0 means the queue for the first stage, stage = 1 means the queue for the second stage, and stage = 3 means the queue for the sink. + * + * @return The number of slots of the queue + */ int tube_get_slots(const tube *ctx, int stage); #endif //LIBTUBERIA_H__ |