Blokovani pres konstantu
[mirrors/Programs.git] / c / pthread_extra / pthread_msgqueue.c
CommitLineData
90c3a0d4
TM
1/*
2 * CFLAGS=-lpthread make pthread_msgqueue
3 */
4
5#include <pthread.h>
6#include <time.h>
7#include <string.h>
8#include <stdio.h>
9#include <stdbool.h>
10#include <stdlib.h>
774d1b14 11#include <assert.h>
63658368
TM
12#include <unistd.h>
13
14#define PTHREAD_X_NONWAIT (&(struct timespec){ .tv_sec = 0, .tv_nsec = 0 })
15#define PTHREAD_X_FOREVER NULL
90c3a0d4
TM
16
17typedef struct pthread_mq_t {
774d1b14
TM
18 pthread_mutex_t lock;
19 pthread_cond_t cond_readable;
20 pthread_cond_t cond_writable;
90c3a0d4
TM
21 void * data;
22 size_t msg_size;
23 size_t msg_count;
24 size_t msg_count_max;
25 size_t head_idx;
26 char * name;
27} pthread_mq_t;
28
774d1b14
TM
29bool pthread_mq_readable(pthread_mq_t *mq) { return (mq->msg_count > 0); }
30bool pthread_mq_writable(pthread_mq_t *mq) { return (mq->msg_count < mq->msg_count_max); }
90c3a0d4
TM
31
32bool pthread_mq_init(pthread_mq_t *mq, size_t msg_size, size_t msg_count_max) {
774d1b14
TM
33 pthread_mutex_init(&mq->lock, NULL);
34 pthread_cond_init(&mq->cond_readable, NULL);
35 pthread_cond_init(&mq->cond_writable, NULL);
63658368 36 //pthread_mutexattr_setclock(&mq->lock, CLOCK_MONOTONIC);
90c3a0d4
TM
37 mq->data = malloc(msg_size*msg_count_max);
38 mq->msg_size = msg_size;
39 mq->msg_count_max = msg_count_max;
40 mq->msg_count = 0;
41 mq->head_idx = 0;
42 mq->name = NULL;
43
44 if(((msg_size*msg_count_max) > 0) && mq->data == NULL) return false;
45 return true;
46}
47
48void pthread_mq_free(pthread_mq_t *mq) {
49 free(mq->data);
50}
51
90c3a0d4
TM
52void pthread_mq_cond(pthread_mq_t *mq) {
53 if(pthread_mq_readable(mq)) pthread_cond_broadcast(&mq->cond_readable);
54 if(pthread_mq_writable(mq)) pthread_cond_broadcast(&mq->cond_writable);
55}
56
63658368
TM
57size_t pthread_mq_waiting(pthread_mq_t *mq) {
58 return mq->msg_count;
59}
60
774d1b14
TM
61bool pthread_mq_reset(pthread_mq_t *mq) {
62 if(pthread_mutex_lock(&mq->lock)) return false;
63 mq->msg_count = 0;
64 mq->head_idx = 0;
65 pthread_mq_cond(mq);
66 pthread_mutex_unlock(&mq->lock);
67 return true;
68}
69
63658368 70bool pthread_mq_send_generic(pthread_mq_t *mq, void * data, bool to_front, const struct timespec *restrict abs_timeout) {
3297654e
TM
71 int ret;
72
774d1b14
TM
73 //Lock queue
74 if(pthread_mutex_timedlock(&mq->lock, abs_timeout)) return false;
90c3a0d4 75
774d1b14 76 //Wait for queue to be in writable condition
90c3a0d4 77 while(!pthread_mq_writable(mq)) {
3297654e
TM
78 if(abs_timeout == NULL) {
79 ret = pthread_cond_wait(&mq->cond_writable, &mq->lock);
80 } else {
81 ret = pthread_cond_timedwait(&mq->cond_writable, &mq->lock, abs_timeout);
82 }
83 if(ret) {
774d1b14
TM
84 pthread_mutex_unlock(&mq->lock);
85 return false;
86 }
90c3a0d4
TM
87 }
88
774d1b14
TM
89 //Write data to queue
90 size_t idx = ( ( mq->msg_count_max + mq->head_idx + (to_front?-1:mq->msg_count) ) % mq->msg_count_max );
90c3a0d4
TM
91 void *ptr = mq->data + (idx * mq->msg_size);
92 mq->msg_count++;
774d1b14 93 if(to_front) mq->head_idx = (mq->msg_count_max + mq->head_idx - 1) % mq->msg_count_max;
90c3a0d4
TM
94 memcpy(ptr, data, mq->msg_size);
95
774d1b14 96 //Signal conditions and unlock
90c3a0d4
TM
97 pthread_mq_cond(mq);
98 pthread_mutex_unlock(&mq->lock);
99 return true;
100}
101
63658368
TM
102bool pthread_mq_receive_generic(pthread_mq_t *mq, void * data, bool peek, const struct timespec *restrict abs_timeout) {
103 int ret;
104
105 //Lock queue
106 if(pthread_mutex_timedlock(&mq->lock, abs_timeout)) return false;
107
108 //Wait for queue to be in readable condition
109 while(!pthread_mq_readable(mq)) {
110 if(abs_timeout == NULL) {
111 ret = pthread_cond_wait(&mq->cond_readable, &mq->lock);
112 } else {
113 ret = pthread_cond_timedwait(&mq->cond_readable, &mq->lock, abs_timeout);
114 }
115 if(ret) {
116 pthread_mutex_unlock(&mq->lock);
117 return false;
118 }
119 }
120
121 //Read data from queue
122 void *ptr = mq->data + (mq->head_idx * mq->msg_size);
123 memcpy(data, ptr, mq->msg_size);
124
125 //Delete data from queue if not peeking
126 if(!peek) {
127 mq->msg_count--;
128 mq->head_idx = (mq->head_idx+1) % mq->msg_count_max;
129 }
130
131 //Signal conditions and unlock
132 pthread_mq_cond(mq);
133 pthread_mutex_unlock(&mq->lock);
134 return true;
135}
136
137
138
139
140
141
142
143
144
145
146pthread_mq_t myq;
147
148void *thread_recv(void *args) {
149 char str[128];
150 while(1) {
151 pthread_mq_receive_generic(&myq, &str, false, NULL);
152 printf("RECVD: %.6s\t\t(waiting %d)\n", str, pthread_mq_waiting(&myq));
153 sleep(1);
154 }
155}
156
90c3a0d4 157int main() {
63658368
TM
158 char tmp[128];
159
3297654e 160 pthread_mq_init(&myq, 6, 5);
63658368
TM
161
162 pthread_t t;
163 pthread_create(&t, NULL, thread_recv, NULL);
164
165 pthread_mq_send_generic(&myq, "AHOJ1", false, NULL);
166 pthread_mq_send_generic(&myq, "AHOJ2", false, NULL);
167 pthread_mq_send_generic(&myq, "AHOJ3", true, NULL);
168 pthread_mq_send_generic(&myq, "AHOJ4", true, NULL);
169 pthread_mq_send_generic(&myq, "AHOJ5", false, NULL);
170 pthread_mq_send_generic(&myq, "AHOJ6", true, NULL);
171
172 while(1) {
173 pthread_mq_send_generic(&myq, "B", false, NULL);
174 pthread_mq_send_generic(&myq, "A", true, NULL);
175 pthread_mq_send_generic(&myq, " A", false, NULL);
176 pthread_mq_send_generic(&myq, " B", false, NULL);
3297654e 177 }
63658368
TM
178
179 pthread_join(t, NULL);
90c3a0d4 180}
This page took 0.215218 seconds and 4 git commands to generate.