aboutsummaryrefslogtreecommitdiff
path: root/src/itc.c
blob: e4698e8ac97544675e6cf8a867f1840837f65adc (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include "itc.h"
#include "utils.h"

#include <semaphore.h>
#include <stdlib.h>
#include <sys/time.h>

struct itc {
        int nslots;
        void **slots;
        int inputidx;
        int outputidx;
        sem_t emptied;
        sem_t occupied;
};

itc *itc_alloc(int nslots)
{
        itc *ctx;

        if (nslots <= 0) {
                return NULL;
        }

        ctx = calloc(1, sizeof(*ctx));
        ctx->nslots = nslots;
        ctx->slots = calloc(nslots, sizeof(*ctx->slots));
        sem_init(&ctx->emptied, 0, nslots);
        sem_init(&ctx->occupied, 0, 0);

        return ctx;
}

void itc_free(itc **ctx, itc_free_element free_element)
{
        if (ctx == NULL || *ctx == NULL) {
                return;
        }

        itc_discard_all(*ctx, free_element);

        sem_destroy(&(*ctx)->emptied);
        sem_destroy(&(*ctx)->occupied);
        free((*ctx)->slots);
        free(*ctx);
        *ctx = NULL;
}

void *itc_retrive(itc *ctx, int timeout_ms)
{
        struct timespec ts;
        void *element;

        if (ctx == NULL) {
                return NULL;
        }

        timespec_add_ms(gettimespec(&ts), timeout_ms);

        if (sem_timedwait(&ctx->occupied, &ts)) {
                return NULL;
        }
        element = ctx->slots[ctx->outputidx];
        ctx->outputidx = (ctx->outputidx + 1) % ctx->nslots;
        sem_post(&ctx->emptied);

        return element;
}

int itc_inject(itc *ctx, int timeout_ms, void *element)
{
        struct timespec ts;

        if (ctx == NULL || element == NULL) {
                return -1;
        }

        timespec_add_ms(gettimespec(&ts), timeout_ms);

        if (sem_timedwait(&ctx->emptied, &ts)) {
                return -1;
        }
        ctx->slots[ctx->inputidx] = element;
        ctx->inputidx = (ctx->inputidx + 1) % ctx->nslots;
        sem_post(&ctx->occupied);

        return 0;
}

void itc_wait_empty(itc *ctx)
{
        int i;

        if (ctx == NULL) {
                return;
        }

        for (i = 0; i < ctx->nslots; i++) {
                sem_wait(&ctx->emptied);
        }
        for (i = 0; i < ctx->nslots; i++) {
                sem_post(&ctx->emptied);
        }
}

void itc_discard_all(itc *ctx, itc_free_element free_element)
{
        void *element;

        if (ctx == NULL) {
                return;
        }

        while ((element = itc_retrive(ctx, 0)) != NULL) {
                if (free_element != NULL) {
                        free_element(element);
                }
        }
}

int itc_get_queued(itc *ctx)
{
        int val;
        if (ctx == NULL) {
                return -1;
        }
        sem_getvalue(&ctx->occupied, &val);
        return val;
}

int itc_get_slots(itc *ctx)
{
        if (ctx == NULL) {
                return -1;
        }
        return ctx->nslots;
}