aboutsummaryrefslogtreecommitdiff
path: root/src/tuberia.h
blob: a4ce84ddcf174c35623a18f2e67f5d477314116c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
/*
 * Copyright (C) 2023, 2024, 2025 Nicolas Dato
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

#ifndef LIBTUBERIA_H__
#define LIBTUBERIA_H__

/** @mainpage
 *
 * @tableofcontents
 *
 * @section introduction Introduction
 *
 * libtuberia: a library to implement a pipeline.
 *
 * A pipeline would be:
 *
 * <em>[Source] -> [queue_1] -> [Stage_1] -> [Queue_2] -> [Stage_2] -> [...] -> [Sink]</em>
 *
 * Each source, stage, and sink runs in a thread, reads from its input queue,
 * processes the element, and writes the result to the output queue.
 *
 * @section quick_guide Quick Guide
 *
 * See the files example/simple.c and example/decode_resize_encode.c (function
 * do_tuberia() in particular)
 *
 * Create a source with @ref tube_source_alloc().
 * If you set fetch to something, there will be a thread running fetch to inject
 * elements into the pipeline.
 * If you set fetch to NULL, you have to inject elements with @ref tube_inject().
 *
 * Create as many stages as you need with @ref tube_stage_alloc(), and append
 * them to the source with @ref tube_stage_append().
 *
 * Build the pipeline with @ref tube_alloc(). If you set sink to something,
 * there will be a thread running sink to get the elements from the pipeline.
 * If you set sink to NULL, you have to retrieve the elements
 * with @ref tube_retrieve().
 *
 * The @ref tube_alloc() function copies the source and all the appended stages,
 * so the source and stages can be freed with @ref tube_source_and_stages_free().
 * Or, you can reutilize the same @ref tube_source to build more pipelines.
 *
 * Start the pipeline with @ref tube_start().
 *
 * Free the pipeline with @ref tube_free().
 *
 * @subsection quick_example Example
 *
 * See example/simple.c
 *
 * Compile with: `gcc -o simple simple.c -ltuberia -lpthread`
 *
 * Or with: ``gcc -o simple simple.c `pkg-config --cflags --libs --static tuberia```
 *
@code {.c}
#include <stdio.h>
#include <stdlib.h>
#include <tuberia.h>

void *stage_one(void *element, void *opaque)
{
    int *i = element;
    *i += 5;
    return i;
}

void *stage_two(void *element, void *opaque)
{
    int *i = element;
    *i *= 10;
    return i;
}

void sink(void *element, void *opaque)
{
    int *i = element;
    printf("sink: %d\n", *i);
    free(i);
}

int main(void)
{
    tube *ctx;
    tube_source *source;
    int *element, n;

    source = tube_source_alloc(2, NULL, NULL, free);
    tube_stage_append(source, tube_stage_alloc(2, stage_one, NULL, free));
    tube_stage_append(source, tube_stage_alloc(2, stage_two, NULL, free));
    ctx = tube_alloc(source, sink, NULL);
    tube_source_and_stages_free(&source);
    tube_start(ctx);
    while (scanf("%d", &n) == 1) {
        element = malloc(sizeof(*element));
        *element = n;
        tube_inject(ctx, 1000, element);
    }
    tube_free(&ctx);

    return 0;
}
@endcode
 */

/**
 * The main structure.
 *
 * Allocate with @ref tube_alloc()
 * Free it with @ref tube_free()
 *
 * @see tube_alloc(), tube_free()
 */
typedef struct tube tube;

/**
 * The source, to generate elements and put them into the pipeline.
 *
 * This is the start of the pipeline, the start of a list with all the stages.
 * Allocate with @ref tube_source_alloc()
 * Free it with @ref tube_source_and_stages_free()
 *
 * @see tube_source_alloc(), tube_source_and_stages_free()
 */
typedef struct tube_source tube_source;

/**
 * An stage in the pipeline, gets an element from a stage (or the source),
 * process it, and puts it into the next stage (or the sink).
 *
 * Allocate with @ref tube_stage_alloc()
 * Append it to a source at the end of the list with @ref tube_stage_append()
 * Free it with @ref tube_source_and_stages_free()
 * If it wasn't appended, free it with @ref tube_stage_free()
 *
 * @see tube_stage_alloc(), tube_source_and_stages_free(), tube_stage_free()
 */
typedef struct tube_stage tube_stage;

/**
 * A function to generate a new element and insert it in the pipeline.
 *
 * @param opaque The opaque pointer in @ref tube_source_alloc()
 *
 * @return The new element
 *
 * @see tube_source_alloc()
 */
typedef void *(*tube_fetch)(void *opaque);

/**
 * A function to process an element and put the result in the next stage.
 *
 * @param element The input element. Remember to free it if needed.
 * @param opaque The opaque pointer in @ref tube_stage_alloc()
 *
 * @return The output element
 *
 * @see tube_stage_alloc()
 */
typedef void *(*tube_process)(void *element, void *opaque);

/**
 * A function to get the last element from the pipeline
 *
 * @param element The element from the last stage.
 * @param opaque The opaque pointer in @ref tube_alloc()
 *
 * @see tube_alloc()
 */
typedef void (*tube_sink)(void *element, void *opaque);

/**
 * A function to free an element from the pipeline.
 *
 * This is used by @ref tube_discard_all() and @ref tube_free()
 *
 * @see tube_discard_all(), tube_free()
 */
typedef void (*tube_free_element)(void *element);

/**
 * Allocates a source, where elements are created and inserted into the pipeline.
 *
 * Free it with @ref tube_source_and_stages_free()
 * Append stages at the end of the pipeline with @ref tube_stage_append()
 *
 * @param nslots Number of slots available in the queue between this source and
 *               the next stage. If the queue is full, the source will block
 * @param fetch  A function to generate a new element. A thread will run and call
 *               this function to get the new element and put it in the queue.
 *               If this is NULL, you have to insert the elements
 *               with @ref tube_inject().
 * @param opaque A data pointer passed to the fetch function.
 * @param free_element A function to free the elements generated by the fetch
 *                     function, or inserted with @ref tube_inject().
 *
 * @return A new source
 *
 * @see tube_source_and_stages_free()
 */
tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque,
		tube_free_element free_element);

/**
 * Allocates a source, a function to receive an element, process it, and send it
 * to the next stage.
 *
 * Free it with @ref tube_source_and_stages_free(), unless it was never appended
 * then you free it with @ref tube_stage_free().
 *
 * @param nslots Number of slots available in the queue between this stage and
 *               the next stage. If the queue is full, the stage will block.
 * @param process A function to process the element. Can't be NULL.
 * @param opaque A data pointer passed to the process function.
 * @param free_element A function to free the elements generated by the process
 *                     function.
 *
 * @return A new stage
 *
 * @see tube_stage_append(), tube_source_and_stages_free(), tube_stage_free()
 */
tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque,
		tube_free_element free_element);

/**
 * Frees an stage and sets the pointer to NULL
 *
 * @note If the stage was appended to a pipeline,
 *       you must use @ref tube_source_and_stages_free().
 *
 * @param stage A pointer to the stage to be freed
 *
 * @see tube_source_alloc(), tube_source_and_stages_free()
 */
void tube_stage_free(tube_stage **stage);

/**
 * Frees a source and all stages appended to it, and sets the pointer to NULL
 *
 * @note This also frees all stages appended to the source with @ref tube_stage_append().
 *
 * @param source A pointer to the source to be freed
 *
 * @see tube_source_alloc(), tube_stage_alloc(), tube_stage_free()
 */
void tube_source_and_stages_free(tube_source **source);

/**
 * Appends an stage to the source, and the end of the pipeline.
 *
 * You can use tube_stage_append(source, tube_source_alloc(...)).
 *
 * @note The owner of the stage pointer will be the source. Don't use it anymore.
 *
 * @param source The source to append the stage.
 * @param stage  The stage to be appended to the source. The source will become
 *               the owner of this pointer.
 *
 * @return 0 if OK, less than 0 in case of error.
 */
int tube_stage_append(tube_source *source, tube_stage *stage);

/**
 * Creates the pipeline, using the source allocated with @ref tube_source_alloc()
 * and all the stages allocated with @ref tube_stage_alloc() and appended with
 * @ref tube_stage_append().
 *
 * Free it with @ref tube_free().
 * The pipeline won't start until you call @ref tube_start().
 *
 * @param source The source with all the stages appended
 * @param sink   A function to receive the last element from the pipeline.
 *               If this is NULL, you have to use @ref tube_retrieve().
 * @param opaque A data pointer passed to the sink function.
 *
 * @return The pipeline, or NULL in case of error
 *
 * @see tube_free()
 */
tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque);

/**
 * Starts the pipeline.
 *
 * This will start the threads to read from queues, process the elements, and
 * write to queues the result.
 * Can be stopped with @ref tube_stop() or @ref tube_stop_and_wait_empty().
 *
 * @param ctx The pipeline to be started
 *
 * @return 0 if OK, less than 0 in case of error.
 *
 * @see tube_stop(), tube_stop_and_wait_empty()
 */
int tube_start(tube *ctx);

/**
 * Stops the pipeline.
 *
 * This will stop the threads. Can be resumed later with @ref tube_start().
 * This won't flush the queues, if there are pending elements they will still be
 * in the queues.
 * Use @ref tube_stop_and_wait_empty() to stop and flush the queues.
 *
 * @param ctx The pipeline to be stopped
 *
 * @return 0 if OK, less than 0 in case of error.
 *
 * @see tube_start(), tube_stop_and_wait_empty()
 */
int tube_stop(tube *ctx);

/**
 * Frees the pipeline and sets the pointer to NULL.
 *
 * This also frees all pending elements in the queue.
 *
 * @param ctx A pointer to the pipeline to be freed.
 *
 * @see tube_alloc()
 */
void tube_free(tube **ctx);

/**
 * Insert an element to the first stage.
 *
 * Use this function if you set NULL to the source at @ref tube_source_alloc().
 * You can call this if the source is not NULL, i.e. if there is a thread
 * running @ref tube_source, but be aware that the element will be inserted out
 * of order, if that's important for the pipeline.
 *
 * @param ctx        The pipeline where to insert the element
 * @param timeout_ms A timeout in milliseconds, use 0 to return without blocking
 * @param element    The element to be inserted into the pipeline. Can't be NULL
 *
 * @return 0 if OK, less than 0 if timeout is reached or if there is an error
 *
 * @see tube_inject_at(), tube_retrieve()
 */
int tube_inject(tube *ctx, int timeout_ms, void *element);

/**
 * Insert an element to an specific stage.
 *
 * If stage is 0, this is equivalent to calling @ref tube_inject().
 *
 * @warning This will insert an out of order element. Use this function if the
 *          order of the element inserted is not important.
 *
 * @param ctx        The pipeline where to insert the element.
 * @param timeout_ms A timeout in milliseconds, use 0 to return without blocking.
 * @param stage      The 0-based stage that will process this element.
 *                   If there are 2 stages, stage = 0 means the element will be
 *                   inserted to the input queue of the first stage,
 *                   stage = 1 means the input queue of the second stage,
 *                   and stage = 3 means the input queue of the sink.
 * @param element    The element to be inserted into the pipeline. Can't be NULL
 *
 * @return 0 if OK, less than 0 if timeout is reached or if there is an error
 *
 * @see tube_inject(), tube_retrieve()
 */
int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element);

/**
 * Gets an element from the last stage.
 *
 * Use this function if you set NULL to the sink at @ref tube_alloc().
 * You can call this if the sink is not NULL, i.e. if there is a thread running
 * @ref tube_sink, but be aware that the element will be removed out of order,
 * if that's important for the pipeline.
 *
 * @param ctx        The pipeline from where to get the element
 * @param timeout_ms A timeout in milliseconds, use 0 to return without blocking
 *
 * @return The element, NULL if timeout is reached or if there is an error
 *
 * @see tube_inject()
 */
void *tube_retrieve(tube *ctx, int timeout_ms);

/**
 * Stops the pipeline, waiting for all queues to be empty.
 *
 * @param ctx The pipeline to be stopped
 *
 * @see tube_stop(), tube_start()
 */
void tube_stop_and_wait_empty(tube *ctx);

/**
 * Discards and frees all elements in the queue
 *
 * @param ctx The pipeline to discard elements
 *
 * @see tube_free_element()
 */
void tube_discard_all(tube *ctx);

/**
 * Returns the number of queued elements at a stage.
 *
 * @param ctx   The pipeline
 * @param stage The 0-based stage. If there are 2 stages,
 *              stage = 0 means the elements queued for the first stage,
 *              stage = 1 means the elements queued for the second stage,
 *              and stage = 3 means the elements queued for the sink.
 *
 * @return The number of queued elements
 */
int tube_get_queued(const tube *ctx, int stage);

/**
 * Returns the size (slots) of the queue at a stage.
 *
 * @param ctx   The pipeline
 * @param stage The 0-based stage. If there are 2 stages,
 *              stage = 0 means the queue for the first stage,
 *              stage = 1 means the queue for the second stage,
 *              and stage = 3 means the queue for the sink.
 *
 * @return The number of slots of the queue
 */
int tube_get_slots(const tube *ctx, int stage);

#endif //LIBTUBERIA_H__