6e509c775eff38300699a3d65a321d4a1e8937ec
[mirrors/Programs.git] / c / pthread_extra / pthread_msgqueue.c
1 /*
2 * CFLAGS=-lpthread make pthread_msgqueue
3 */
4
5 #include <pthread.h>
6 #include <time.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <stdbool.h>
10 #include <stdlib.h>
11 #include <assert.h>
12 #include <unistd.h>
13
14 #define PTHREAD_X_NONWAIT (&(struct timespec){ .tv_sec = 0, .tv_nsec = 0 })
15 #define PTHREAD_X_FOREVER NULL
16
17 typedef struct pthread_mq_t {
18 pthread_mutex_t lock;
19 pthread_cond_t cond_readable;
20 pthread_cond_t cond_writable;
21 void * data;
22 size_t msg_size;
23 size_t msg_count;
24 size_t msg_count_max;
25 size_t head_idx;
26 char * name;
27 } pthread_mq_t;
28
29 bool pthread_mq_readable(pthread_mq_t *mq) { return (mq->msg_count > 0); }
30 bool pthread_mq_writable(pthread_mq_t *mq) { return (mq->msg_count < mq->msg_count_max); }
31
32 bool pthread_mq_init(pthread_mq_t *mq, size_t msg_size, size_t msg_count_max) {
33 pthread_mutex_init(&mq->lock, NULL);
34 pthread_cond_init(&mq->cond_readable, NULL);
35 pthread_cond_init(&mq->cond_writable, NULL);
36 //pthread_mutexattr_setclock(&mq->lock, CLOCK_MONOTONIC);
37 mq->data = malloc(msg_size*msg_count_max);
38 mq->msg_size = msg_size;
39 mq->msg_count_max = msg_count_max;
40 mq->msg_count = 0;
41 mq->head_idx = 0;
42 mq->name = NULL;
43
44 if(((msg_size*msg_count_max) > 0) && mq->data == NULL) return false;
45 return true;
46 }
47
48 void pthread_mq_free(pthread_mq_t *mq) {
49 free(mq->data);
50 }
51
52 void pthread_mq_cond(pthread_mq_t *mq) {
53 if(pthread_mq_readable(mq)) pthread_cond_broadcast(&mq->cond_readable);
54 if(pthread_mq_writable(mq)) pthread_cond_broadcast(&mq->cond_writable);
55 }
56
57 size_t pthread_mq_waiting(pthread_mq_t *mq) {
58 return mq->msg_count;
59 }
60
61 bool pthread_mq_reset(pthread_mq_t *mq) {
62 if(pthread_mutex_lock(&mq->lock)) return false;
63 mq->msg_count = 0;
64 mq->head_idx = 0;
65 pthread_mq_cond(mq);
66 pthread_mutex_unlock(&mq->lock);
67 return true;
68 }
69
70 bool pthread_mq_send_generic(pthread_mq_t *mq, void * data, bool to_front, const struct timespec *restrict abs_timeout) {
71 int ret;
72
73 //Lock queue
74 if(pthread_mutex_timedlock(&mq->lock, abs_timeout)) return false;
75
76 //Wait for queue to be in writable condition
77 while(!pthread_mq_writable(mq)) {
78 if(abs_timeout == NULL) {
79 ret = pthread_cond_wait(&mq->cond_writable, &mq->lock);
80 } else {
81 ret = pthread_cond_timedwait(&mq->cond_writable, &mq->lock, abs_timeout);
82 }
83 if(ret) {
84 pthread_mutex_unlock(&mq->lock);
85 return false;
86 }
87 }
88
89 //Write data to queue
90 size_t idx = ( ( mq->msg_count_max + mq->head_idx + (to_front?-1:mq->msg_count) ) % mq->msg_count_max );
91 void *ptr = mq->data + (idx * mq->msg_size);
92 mq->msg_count++;
93 if(to_front) mq->head_idx = (mq->msg_count_max + mq->head_idx - 1) % mq->msg_count_max;
94 memcpy(ptr, data, mq->msg_size);
95
96 //Signal conditions and unlock
97 pthread_mq_cond(mq);
98 pthread_mutex_unlock(&mq->lock);
99 return true;
100 }
101
102 bool pthread_mq_receive_generic(pthread_mq_t *mq, void * data, bool peek, const struct timespec *restrict abs_timeout) {
103 int ret;
104
105 //Lock queue
106 if(pthread_mutex_timedlock(&mq->lock, abs_timeout)) return false;
107
108 //Wait for queue to be in readable condition
109 while(!pthread_mq_readable(mq)) {
110 if(abs_timeout == NULL) {
111 ret = pthread_cond_wait(&mq->cond_readable, &mq->lock);
112 } else {
113 ret = pthread_cond_timedwait(&mq->cond_readable, &mq->lock, abs_timeout);
114 }
115 if(ret) {
116 pthread_mutex_unlock(&mq->lock);
117 return false;
118 }
119 }
120
121 //Read data from queue
122 void *ptr = mq->data + (mq->head_idx * mq->msg_size);
123 memcpy(data, ptr, mq->msg_size);
124
125 //Delete data from queue if not peeking
126 if(!peek) {
127 mq->msg_count--;
128 mq->head_idx = (mq->head_idx+1) % mq->msg_count_max;
129 }
130
131 //Signal conditions and unlock
132 pthread_mq_cond(mq);
133 pthread_mutex_unlock(&mq->lock);
134 return true;
135 }
136
137
138
139
140
141
142
143
144
145
146 pthread_mq_t myq;
147
148 void *thread_recv(void *args) {
149 char str[128];
150 while(1) {
151 pthread_mq_receive_generic(&myq, &str, false, NULL);
152 printf("RECVD: %.6s\t\t(waiting %d)\n", str, pthread_mq_waiting(&myq));
153 sleep(1);
154 }
155 }
156
157 int main() {
158 char tmp[128];
159
160 pthread_mq_init(&myq, 6, 5);
161
162 pthread_t t;
163 pthread_create(&t, NULL, thread_recv, NULL);
164
165 pthread_mq_send_generic(&myq, "AHOJ1", false, NULL);
166 pthread_mq_send_generic(&myq, "AHOJ2", false, NULL);
167 pthread_mq_send_generic(&myq, "AHOJ3", true, NULL);
168 pthread_mq_send_generic(&myq, "AHOJ4", true, NULL);
169 pthread_mq_send_generic(&myq, "AHOJ5", false, NULL);
170 pthread_mq_send_generic(&myq, "AHOJ6", true, NULL);
171
172 while(1) {
173 pthread_mq_send_generic(&myq, "B", false, NULL);
174 pthread_mq_send_generic(&myq, "A", true, NULL);
175 pthread_mq_send_generic(&myq, " A", false, NULL);
176 pthread_mq_send_generic(&myq, " B", false, NULL);
177 }
178
179 pthread_join(t, NULL);
180 }
This page took 0.315333 seconds and 3 git commands to generate.