2 * CFLAGS=-lpthread make pthread_msgqueue
14 #define PTHREAD_X_NONWAIT (&(struct timespec){ .tv_sec = 0, .tv_nsec = 0 })
15 #define PTHREAD_X_FOREVER NULL
17 typedef struct pthread_mq_t
{
19 pthread_cond_t cond_readable
;
20 pthread_cond_t cond_writable
;
29 bool pthread_mq_readable(pthread_mq_t
*mq
) { return (mq
->msg_count
> 0); }
30 bool pthread_mq_writable(pthread_mq_t
*mq
) { return (mq
->msg_count
< mq
->msg_count_max
); }
32 bool pthread_mq_init(pthread_mq_t
*mq
, size_t msg_size
, size_t msg_count_max
) {
33 pthread_mutex_init(&mq
->lock
, NULL
);
34 pthread_cond_init(&mq
->cond_readable
, NULL
);
35 pthread_cond_init(&mq
->cond_writable
, NULL
);
36 //pthread_mutexattr_setclock(&mq->lock, CLOCK_MONOTONIC);
37 mq
->data
= malloc(msg_size
*msg_count_max
);
38 mq
->msg_size
= msg_size
;
39 mq
->msg_count_max
= msg_count_max
;
44 if(((msg_size
*msg_count_max
) > 0) && mq
->data
== NULL
) return false;
48 void pthread_mq_free(pthread_mq_t
*mq
) {
52 void pthread_mq_cond(pthread_mq_t
*mq
) {
53 if(pthread_mq_readable(mq
)) pthread_cond_broadcast(&mq
->cond_readable
);
54 if(pthread_mq_writable(mq
)) pthread_cond_broadcast(&mq
->cond_writable
);
57 size_t pthread_mq_waiting(pthread_mq_t
*mq
) {
61 bool pthread_mq_reset(pthread_mq_t
*mq
) {
62 if(pthread_mutex_lock(&mq
->lock
)) return false;
66 pthread_mutex_unlock(&mq
->lock
);
70 bool pthread_mq_send_generic(pthread_mq_t
*mq
, void * data
, bool to_front
, const struct timespec
*restrict abs_timeout
) {
74 if(pthread_mutex_timedlock(&mq
->lock
, abs_timeout
)) return false;
76 //Wait for queue to be in writable condition
77 while(!pthread_mq_writable(mq
)) {
78 if(abs_timeout
== NULL
) {
79 ret
= pthread_cond_wait(&mq
->cond_writable
, &mq
->lock
);
81 ret
= pthread_cond_timedwait(&mq
->cond_writable
, &mq
->lock
, abs_timeout
);
84 pthread_mutex_unlock(&mq
->lock
);
90 size_t idx
= ( ( mq
->msg_count_max
+ mq
->head_idx
+ (to_front
?-1:mq
->msg_count
) ) % mq
->msg_count_max
);
91 void *ptr
= mq
->data
+ (idx
* mq
->msg_size
);
93 if(to_front
) mq
->head_idx
= (mq
->msg_count_max
+ mq
->head_idx
- 1) % mq
->msg_count_max
;
94 memcpy(ptr
, data
, mq
->msg_size
);
96 //Signal conditions and unlock
98 pthread_mutex_unlock(&mq
->lock
);
102 bool pthread_mq_receive_generic(pthread_mq_t
*mq
, void * data
, bool peek
, const struct timespec
*restrict abs_timeout
) {
106 if(pthread_mutex_timedlock(&mq
->lock
, abs_timeout
)) return false;
108 //Wait for queue to be in readable condition
109 while(!pthread_mq_readable(mq
)) {
110 if(abs_timeout
== NULL
) {
111 ret
= pthread_cond_wait(&mq
->cond_readable
, &mq
->lock
);
113 ret
= pthread_cond_timedwait(&mq
->cond_readable
, &mq
->lock
, abs_timeout
);
116 pthread_mutex_unlock(&mq
->lock
);
121 //Read data from queue
122 void *ptr
= mq
->data
+ (mq
->head_idx
* mq
->msg_size
);
123 memcpy(data
, ptr
, mq
->msg_size
);
125 //Delete data from queue if not peeking
128 mq
->head_idx
= (mq
->head_idx
+1) % mq
->msg_count_max
;
131 //Signal conditions and unlock
133 pthread_mutex_unlock(&mq
->lock
);
148 void *thread_recv(void *args
) {
151 pthread_mq_receive_generic(&myq
, &str
, false, NULL
);
152 printf("RECVD: %.6s\t\t(waiting %d)\n", str
, pthread_mq_waiting(&myq
));
160 pthread_mq_init(&myq
, 6, 5);
163 pthread_create(&t
, NULL
, thread_recv
, NULL
);
165 pthread_mq_send_generic(&myq
, "AHOJ1", false, NULL
);
166 pthread_mq_send_generic(&myq
, "AHOJ2", false, NULL
);
167 pthread_mq_send_generic(&myq
, "AHOJ3", true, NULL
);
168 pthread_mq_send_generic(&myq
, "AHOJ4", true, NULL
);
169 pthread_mq_send_generic(&myq
, "AHOJ5", false, NULL
);
170 pthread_mq_send_generic(&myq
, "AHOJ6", true, NULL
);
173 pthread_mq_send_generic(&myq
, "B", false, NULL
);
174 pthread_mq_send_generic(&myq
, "A", true, NULL
);
175 pthread_mq_send_generic(&myq
, " A", false, NULL
);
176 pthread_mq_send_generic(&myq
, " B", false, NULL
);
179 pthread_join(t
, NULL
);