aboutsummaryrefslogtreecommitdiff
path: root/src/tuberia.h
blob: f83ea6ffdd27038d1c18a3395d0dc8a32643bbdf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#ifndef LIBTUBERIA_H__
#define LIBTUBERIA_H__

typedef struct tube tube;
typedef struct tube_source tube_source;
typedef struct tube_stage tube_stage;

typedef void *(*tube_fetch)(void *opaque);
typedef void *(*tube_process)(void *element, void *opaque);
typedef void (*tube_sink)(void *element, void *opaque);
typedef void (*tube_free_element)(void *element);

tube_source *tube_source_alloc(int nslots, tube_fetch fetch, void *opaque, tube_free_element free_element);
tube_stage *tube_stage_alloc(int nslots, tube_process process, void *opaque, tube_free_element free_element);
void tube_stage_free(tube_stage **stage);
void tube_source_and_stages_free(tube_source **source);

int tube_stage_append(tube_source *source, tube_stage *stage);

tube *tube_alloc(const tube_source *source, tube_sink sink, void *opaque);
int tube_start(tube *ctx);
int tube_stop(tube *ctx);
void tube_free(tube **ctx);

int tube_inject(tube *ctx, int timeout_ms, void *element);
int tube_inject_at(tube *ctx, int stage, int timeout_ms, void *element);
void *tube_retrive(tube *ctx, int timeout_ms);
void tube_stop_and_wait_empty(tube *ctx);
void tube_discard_all(tube *ctx);

int tube_get_queued(const tube *ctx, int stage);
int tube_get_slots(const tube *ctx, int stage);

#endif //LIBTUBERIA_H__