44ad399ecb9ef195b78cae64994af93cd4c685db
2 #include <pthread_extra.h>
10 bool pthread_mq_readable(pthread_mq_t
*mq
) { return (mq
->msg_count
> 0); }
11 bool pthread_mq_writable(pthread_mq_t
*mq
) { return (mq
->msg_count
< mq
->msg_count_max
); }
13 bool pthread_mq_init(pthread_mq_t
*mq
, size_t msg_size
, size_t msg_count_max
) {
14 pthread_mutex_init(&mq
->lock
, NULL
);
15 pthread_cond_init(&mq
->cond_readable
, NULL
);
16 pthread_cond_init(&mq
->cond_writable
, NULL
);
17 //pthread_mutexattr_setclock(&mq->lock, CLOCK_MONOTONIC);
18 mq
->data
= malloc(msg_size
*msg_count_max
);
19 mq
->msg_size
= msg_size
;
20 mq
->msg_count_max
= msg_count_max
;
25 if(((msg_size
*msg_count_max
) > 0) && mq
->data
== NULL
) return false;
29 void pthread_mq_free(pthread_mq_t
*mq
) {
33 void pthread_mq_cond(pthread_mq_t
*mq
) {
34 if(pthread_mq_readable(mq
)) pthread_cond_broadcast(&mq
->cond_readable
);
35 if(pthread_mq_writable(mq
)) pthread_cond_broadcast(&mq
->cond_writable
);
38 size_t pthread_mq_waiting(pthread_mq_t
*mq
) {
42 size_t pthread_mq_vacant(pthread_mq_t
*mq
) {
43 return (mq
->msg_count_max
- mq
->msg_count
);
46 bool pthread_mq_reset(pthread_mq_t
*mq
) {
47 if(pthread_mutex_lock(&mq
->lock
)) return false;
51 pthread_mutex_unlock(&mq
->lock
);
55 bool pthread_mq_send_generic(pthread_mq_t
*mq
, void * data
, pthread_mq_flags_t flags
, const struct timespec
*restrict abs_timeout
) {
56 //printf("S-Timed: %p\n", abs_timeout);
60 if(pthread_mutex_timedlock(&mq
->lock
, abs_timeout
)) return false;
62 //Wait for queue to be in writable condition
63 while(!pthread_mq_writable(mq
)) {
64 //printf("S+Timed: %p\n", abs_timeout);
65 if(abs_timeout
== NULL
) {
66 ret
= pthread_cond_wait(&mq
->cond_writable
, &mq
->lock
);
68 //printf("STimed: %p\n", abs_timeout);
69 //assert(abs_timeout != NULL);
70 ret
= pthread_cond_timedwait(&mq
->cond_writable
, &mq
->lock
, abs_timeout
);
73 pthread_mutex_unlock(&mq
->lock
);
79 assert(!(flags
& PTHREAD_XMQ_OVERW
) && "FIXME: Overwrite not implemented yet!");
82 bool to_front
= (flags
& PTHREAD_XMQ_FRONT
);
83 size_t idx
= ( ( mq
->head_idx
+ (to_front
?mq
->msg_count_max
-1:mq
->msg_count
) ) % mq
->msg_count_max
);
84 void *ptr
= mq
->data
+ (idx
* mq
->msg_size
);
86 if(to_front
) mq
->head_idx
= (mq
->msg_count_max
+ mq
->head_idx
- 1) % mq
->msg_count_max
;
87 memcpy(ptr
, data
, mq
->msg_size
);
89 //Signal conditions and unlock
91 pthread_mutex_unlock(&mq
->lock
);
95 bool pthread_mq_receive_generic(pthread_mq_t
*mq
, void * data
, pthread_mq_flags_t flags
, const struct timespec
*restrict abs_timeout
) {
99 if(pthread_mutex_timedlock(&mq
->lock
, abs_timeout
)) return false;
101 //Wait for queue to be in readable condition
102 while(!pthread_mq_readable(mq
)) {
103 if(abs_timeout
== NULL
) {
104 ret
= pthread_cond_wait(&mq
->cond_readable
, &mq
->lock
);
106 //printf("RTimed: %p\n", abs_timeout);
107 //assert(abs_timeout != NULL);
108 ret
= pthread_cond_timedwait(&mq
->cond_readable
, &mq
->lock
, abs_timeout
);
111 pthread_mutex_unlock(&mq
->lock
);
116 //Read data from queue
117 void *ptr
= mq
->data
+ (mq
->head_idx
* mq
->msg_size
);
118 memcpy(data
, ptr
, mq
->msg_size
);
120 //Delete data from queue if not peeking
121 bool peek
= (flags
& PTHREAD_XMQ_PEEK
);
124 mq
->head_idx
= (mq
->head_idx
+1) % mq
->msg_count_max
;
127 //Signal conditions and unlock
129 pthread_mutex_unlock(&mq
->lock
);
This page took 0.28866 seconds and 3 git commands to generate.