Subversion Repositories shark

Rev

Rev 38 | Rev 1618 | Go to most recent revision | Details | Compare with Previous | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 pj 1
/*
2
 * Project: S.Ha.R.K.
3
 *
4
 * Coordinators:
5
 *   Giorgio Buttazzo    <giorgio@sssup.it>
6
 *   Paolo Gai           <pj@gandalf.sssup.it>
7
 *
8
 * Authors     :
9
 *   Paolo Gai           <pj@gandalf.sssup.it>
10
 *   (see the web pages for full authors list)
11
 *
12
 * ReTiS Lab (Scuola Superiore S.Anna - Pisa - Italy)
13
 *
14
 * http://www.sssup.it
15
 * http://retis.sssup.it
16
 * http://shark.sssup.it
17
 */
18
 
19
/**
20
 ------------
318 giacomo 21
 CVS :        $Id: mqueue.c,v 1.4 2003-11-05 15:05:11 giacomo Exp $
2 pj 22
 
23
 File:        $File$
318 giacomo 24
 Revision:    $Revision: 1.4 $
25
 Last update: $Date: 2003-11-05 15:05:11 $
2 pj 26
 ------------
27
 
28
 POSIX message queues
29
 
30
**/
31
 
32
/*
33
 * Copyright (C) 2000 Paolo Gai
34
 *
35
 * This program is free software; you can redistribute it and/or modify
36
 * it under the terms of the GNU General Public License as published by
37
 * the Free Software Foundation; either version 2 of the License, or
38
 * (at your option) any later version.
39
 *
40
 * This program is distributed in the hope that it will be useful,
41
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
42
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
43
 * GNU General Public License for more details.
44
 *
45
 * You should have received a copy of the GNU General Public License
46
 * along with this program; if not, write to the Free Software
47
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
48
 *
49
 */
50
 
51
#include <mqueue.h>
52
#include <ll/string.h>
53
#include <kernel/types.h>
54
#include <kernel/var.h>
55
#include <kernel/func.h>
56
#include <errno.h>
57
#include <stdarg.h>
58
#include <pthread.h>
59
#include <sys/types.h>
60
 
61
/* some flags... */
62
#define MQ_USED                 1
63
#define MQ_NONBLOCK             2
64
#define MQ_NOTIFICATION_PRESENT 4
65
 
66
static int mq_once = 1;
67
 
68
struct mq_elem {
69
  unsigned int mq_prio;   /* the priority of a message */
70
  ssize_t msglen;         /* the length of a message   */
71
  int next;               /* the priority queue        */
72
};
73
 
74
/* Semaphores descriptor tables */
75
static struct mq_des {
76
    char *name;              /* a name */
77
    int  flags;              /* flags... */
78
 
79
    long maxmsg;             /* maximum number of messages */
80
    long msgsize;            /* Maximum message size */
81
 
82
    long count;              /* Number of messages currently queued */
83
    long start;              /* first not-empty message */
84
 
85
    BYTE *mq_data;           /* the data... */
86
    struct mq_elem *mq_info; /* the priorities */
87
    int mq_first;            /* the first empty message */
88
 
89
    struct sigevent notification; /* the notification, valid only if the
90
                                     correct bit is set */
91
 
92
    /* the blocked processes queues */
29 pj 93
    IQUEUE blocked_send;
94
    IQUEUE blocked_rcv;
2 pj 95
 
96
    int next;            /* the mq queue */
97
} mq_table[MQ_OPEN_MAX];
98
 
99
 
100
/* this -IS- an extension to the proc_table!!! */
101
static struct {
102
    int intsig;   /* Normally it is =0, -1 only when a task is woken up
103
                     by a signal */
104
    int mqdes;    /* message queue on which a task is blocked (meaningless
105
                     if the task is not blocked...) */
106
} mqproc_table[MAX_PROC];
107
 
29 pj 108
static int free_mq;         /* Queue of free sem                    */
2 pj 109
 
110
mqd_t mq_open(const char *name, int oflag, ...)
111
{
112
    int     i;
113
    int     found = 0;
114
    mode_t  m;
115
    mqd_t   mq;
116
    struct mq_attr *attr;
318 giacomo 117
    SYS_FLAGS f;
2 pj 118
 
318 giacomo 119
    f = kern_fsave();
2 pj 120
 
121
    for (i = 0; i < MQ_OPEN_MAX; i++)
122
      if (mq_table[i].flags & MQ_USED) {
123
        if (strcmp((char*)name, mq_table[i].name) == 0) {
124
          found = 1;
125
          break;
126
        }
127
      }
128
    if (found) {
129
      if (oflag == (O_CREAT | O_EXCL)) {
130
          errno = EEXIST;
318 giacomo 131
          kern_frestore(f);
2 pj 132
          return -1;
133
      } else {
318 giacomo 134
          kern_frestore(f);
2 pj 135
          return i;
136
      }
137
    } else {
138
      if (!(oflag & O_CREAT)) {
139
          errno = ENOENT;
318 giacomo 140
          kern_frestore(f);
2 pj 141
          return -1;
142
      } else if (!(oflag & O_RDWR)) {
143
          errno = EACCES;
318 giacomo 144
          kern_frestore(f);
2 pj 145
          return -1;
146
      } else {
147
          va_list l;
148
 
149
          va_start(l, oflag);
150
            m = va_arg(l,mode_t);
151
            attr = va_arg(l, struct mq_attr *);
152
          va_end(l);
153
 
154
          mq = free_mq;
155
          if (mq != -1) {
156
            mq_table[mq].name = kern_alloc(strlen((char *)name)+1);
157
            if (!mq_table[mq].name) {
158
              errno = ENOSPC;
318 giacomo 159
              kern_frestore(f);
2 pj 160
              return -1;
161
            }
162
            strcpy(mq_table[mq].name, (char *)name);
163
 
164
            if (attr) {
165
              mq_table[mq].maxmsg  = attr->mq_maxmsg;
166
              mq_table[mq].msgsize = attr->mq_msgsize;
167
            }
168
            else {
169
              mq_table[mq].maxmsg  = MQ_DEFAULT_MAXMSG;
170
              mq_table[mq].msgsize = MQ_DEFAULT_MSGSIZE;
171
            }
29 pj 172
            iq_init(&mq_table[mq].blocked_send, &freedesc, 0);
173
            iq_init(&mq_table[mq].blocked_rcv, &freedesc, 0);
2 pj 174
 
175
            mq_table[mq].count = 0;
176
            mq_table[mq].start = -1;
177
 
178
            mq_table[mq].mq_first = 0;
179
 
180
            if (oflag & O_NONBLOCK)
181
              mq_table[mq].flags = MQ_USED | MQ_NONBLOCK;
182
            else
183
              mq_table[mq].flags = MQ_USED;
184
 
185
            mq_table[mq].mq_data = (BYTE *)
186
              kern_alloc(mq_table[mq].maxmsg * mq_table[mq].msgsize);
187
            if (!mq_table[mq].mq_data) {
188
              kern_free(mq_table[mq].name,strlen((char *)name)+1);
189
 
190
              errno = ENOSPC;
318 giacomo 191
              kern_frestore(f);
2 pj 192
              return -1;
193
            }
194
 
195
            mq_table[mq].mq_info = (struct mq_elem *)
196
              kern_alloc(mq_table[mq].maxmsg * sizeof(struct mq_elem));
197
            if (!mq_table[mq].mq_info) {
198
              kern_free(mq_table[mq].name,strlen((char *)name)+1);
199
              kern_free(mq_table[mq].mq_data,
200
                        mq_table[mq].maxmsg * mq_table[mq].msgsize);
201
 
202
              errno = ENOSPC;
318 giacomo 203
              kern_frestore(f);
2 pj 204
              return -1;
205
            }
206
 
207
            /* set up the element queue */
208
            for (i=0; i<mq_table[mq].maxmsg-1; i++)
209
              mq_table[mq].mq_info[i].next = i+1;
210
            mq_table[mq].mq_info[mq_table[mq].maxmsg-1].next = -1;
211
            mq_table[mq].mq_first = 0;
212
 
213
            free_mq = mq_table[mq].next;
318 giacomo 214
            kern_frestore(f);
2 pj 215
            return mq;
216
          }
217
          else {
218
            errno = ENOSPC;
318 giacomo 219
            kern_frestore(f);
2 pj 220
            return -1;
221
          }
222
      }
223
    }
224
}
225
 
226
int mq_close(mqd_t mqdes)
227
{
318 giacomo 228
    SYS_FLAGS f;
2 pj 229
 
318 giacomo 230
    f = kern_fsave();
231
 
2 pj 232
    if (mqdes < 0 ||
233
        mqdes >= MQ_OPEN_MAX ||
234
        !(mq_table[mqdes].flags & MQ_USED) ) {
235
      errno = EBADF;
318 giacomo 236
      kern_frestore(f);
2 pj 237
      return -1;
238
    }
239
 
240
    kern_free(mq_table[mqdes].name, strlen(mq_table[mqdes].name)+1);
241
    kern_free(mq_table[mqdes].mq_data,
242
              mq_table[mqdes].maxmsg * mq_table[mqdes].msgsize);
243
    kern_free(mq_table[mqdes].mq_info,
244
              mq_table[mqdes].maxmsg * sizeof(struct mq_elem));
245
 
246
    mq_table[mqdes].flags = 0;
247
    mq_table[mqdes].next = free_mq;
248
    free_mq = mqdes;
249
 
318 giacomo 250
    kern_frestore(f);
2 pj 251
    return 0;
252
}
253
 
254
int mq_unlink(const char *name)
255
{
256
    int i;
257
    int found = 0;
318 giacomo 258
    SYS_FLAGS f;
2 pj 259
 
318 giacomo 260
    f = kern_fsave();
2 pj 261
 
262
    for (i = 0; i < MQ_OPEN_MAX; i++)
263
      if (mq_table[i].flags & MQ_USED) {
264
        if (strcmp((char*)name, mq_table[i].name) == 0) {
265
          found = 1;
266
        }
267
      }
268
 
269
    if (found) {
270
      kern_free(mq_table[i].name, strlen((char *)name)+1);
271
      kern_free(mq_table[i].mq_data,
272
                mq_table[i].maxmsg * mq_table[i].msgsize);
273
      kern_free(mq_table[i].mq_info,
274
                mq_table[i].maxmsg * sizeof(struct mq_elem));
275
 
276
      mq_table[i].flags = 0;
277
      mq_table[i].next = free_mq;
278
      free_mq = i;
318 giacomo 279
      kern_frestore(f);
2 pj 280
      return 0;
281
    } else {
282
      errno = ENOENT;
318 giacomo 283
      kern_frestore(f);
2 pj 284
      return -1;
285
    }
286
}
287
 
288
/* this function inserts a message in amessage queue mantaining the
289
   priority order */
290
static void insert_mq_entry(mqd_t mqdes, int newmsg)
291
{
292
    int prio; /* the priority of the message to insert */
293
    int p,q;  /* the messages... */
294
 
295
    p = NIL;
296
    q = mq_table[mqdes].start;
297
    prio = mq_table[mqdes].mq_info[ newmsg ].mq_prio;
298
 
299
    while ((q != NIL) && (prio <= mq_table[mqdes].mq_info[ q ].mq_prio)) {
300
        p = q;
301
        q = mq_table[mqdes].mq_info[ q ].next;
302
    }
303
 
304
    if (p != NIL)
305
      mq_table[mqdes].mq_info[ p ].next = newmsg;
306
    else
307
      mq_table[mqdes].start = newmsg;
308
 
309
    mq_table[mqdes].mq_info[ newmsg ].next = q;
310
}
311
 
312
 
313
 
314
 
315
 
316
 
317
/* this is the test that is done when a task is being killed
318
   and it is waiting on a sigwait */
319
static int mq_cancellation_point(PID i, void *arg)
320
{
321
    LEVEL l;
322
 
323
    if (proc_table[i].status == WAIT_MQSEND) {
324
      /* the task that have to be killed is waiting on a mq_send */
325
 
326
      /* we have to extract the task from the blocked queue... */
29 pj 327
      iq_extract(i,&mq_table[mqproc_table[i].mqdes].blocked_send);
2 pj 328
 
329
      /* and the task have to be reinserted into the ready queues, so it
330
         will fall into task_testcancel */
331
      l = proc_table[i].task_level;
38 pj 332
      level_table[l]->public_unblock(l,i);
2 pj 333
 
334
      return 1;
335
    }
336
 
337
    if (proc_table[i].status == WAIT_MQRECEIVE) {
338
      /* the task that have to be killed is waiting on a mq_send */
339
 
340
      /* we have to extract the task from the blocked queue... */
29 pj 341
      iq_extract(i, &mq_table[mqproc_table[i].mqdes].blocked_rcv);
2 pj 342
 
343
      /* and the task have to be reinserted into the ready queues, so it
344
         will fall into task_testcancel */
345
      l = proc_table[i].task_level;
38 pj 346
      level_table[l]->public_unblock(l,i);
2 pj 347
 
348
      return 1;
349
    }
350
 
351
    return 0;
352
}
353
 
354
int mq_interrupted_by_signal(PID i, void *arg)
355
{
356
    LEVEL l;
357
 
358
    if (proc_table[i].status == WAIT_MQSEND) {
359
      /* the task is waiting on a nanosleep and it is still receiving a
360
         signal... */
361
      mqproc_table[exec_shadow].intsig = 1;
362
 
363
      /* we have to extract the task from the blocked queue... */
29 pj 364
      iq_extract(i, &mq_table[mqproc_table[i].mqdes].blocked_send);
2 pj 365
 
366
      /* and the task have to be reinserted into the ready queues, so it
367
         will fall into task_testcancel */
368
      l = proc_table[i].task_level;
38 pj 369
      level_table[l]->public_unblock(l,i);
2 pj 370
 
371
      return 1;
372
    }
373
 
374
    if (proc_table[i].status == WAIT_MQRECEIVE) {
375
      /* the task is waiting on a nanosleep and it is still receiving a
376
         signal... */
377
      mqproc_table[exec_shadow].intsig = 1;
378
 
379
      /* we have to extract the task from the blocked queue... */
29 pj 380
      iq_extract(i, &mq_table[mqproc_table[i].mqdes].blocked_rcv);
2 pj 381
 
382
      /* and the task have to be reinserted into the ready queues, so it
383
         will fall into task_testcancel */
384
      l = proc_table[i].task_level;
38 pj 385
      level_table[l]->public_unblock(l,i);
2 pj 386
 
387
      return 1;
388
    }
389
 
390
    return 0;
391
}
392
 
393
 
394
 
395
 
396
 
397
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
398
            unsigned int msg_prio)
399
{
400
  int newmsg;
318 giacomo 401
  SYS_FLAGS f;
2 pj 402
 
403
  task_testcancel();
404
 
318 giacomo 405
  f = kern_fsave();
2 pj 406
 
407
  /* first, if it is the first time that mq_receive or mq_send is called,
408
     register the cancellation point */
409
  if (mq_once) {
410
    mq_once = 0;
411
    register_cancellation_point(mq_cancellation_point, NULL);
412
    register_interruptable_point(mq_interrupted_by_signal, NULL);
413
  }
414
 
415
  if (mqdes < 0 ||
416
      mqdes >= MQ_OPEN_MAX ||
417
      !(mq_table[mqdes].flags & MQ_USED) ) {
418
    errno = EBADF;
318 giacomo 419
    kern_frestore(f);
2 pj 420
    return -1;
421
  }
422
 
423
  if (msg_len > mq_table[mqdes].msgsize) {
424
    errno = EMSGSIZE;
318 giacomo 425
    kern_frestore(f);
2 pj 426
    return -1;
427
  }
428
 
429
  if (msg_prio > MQ_PRIO_MAX) {
430
    errno = EINVAL;
318 giacomo 431
    kern_frestore(f);
2 pj 432
    return -1;
433
  }
434
 
435
  /* block the task if necessary */
436
  if (mq_table[mqdes].mq_first == -1) {
437
    /* the message queue is full!!! */
438
    if (mq_table[mqdes].flags & O_NONBLOCK) {
439
      errno = EAGAIN;
318 giacomo 440
      kern_frestore(f);
2 pj 441
      return -1;
442
    }
443
    else {
444
      LEVEL l;
445
 
446
      /* we block the task until:
447
         - a message is received, or
448
         - a signal is sent to the task, or
449
         - the task is killed               */
450
 
451
      mqproc_table[exec_shadow].intsig = 0;
452
 
38 pj 453
      kern_epilogue_macro();
2 pj 454
 
455
      l = proc_table[exec_shadow].task_level;
38 pj 456
      level_table[l]->public_block(l,exec_shadow);
2 pj 457
 
458
      /* we insert the task in the message queue */
459
      proc_table[exec_shadow].status = WAIT_MQSEND;
29 pj 460
      iq_priority_insert(exec_shadow,&mq_table[mqdes].blocked_send);
2 pj 461
 
462
      /* and finally we reschedule */
463
      exec = exec_shadow = -1;
464
      scheduler();
465
      ll_context_to(proc_table[exec_shadow].context);
466
      kern_deliver_pending_signals();
467
 
468
      /* mq_send is a cancellation point... */
469
      task_testcancel();
470
 
471
      if (mqproc_table[exec_shadow].intsig) {
472
        errno = EINTR;
318 giacomo 473
        kern_frestore(f);
2 pj 474
        return -1;
475
      }
476
    }
477
  }
478
 
479
  /* Now there is space to insert a new message */
480
  /* alloc a descriptor */
481
  newmsg = mq_table[mqdes].mq_first;
482
  mq_table[mqdes].mq_first = mq_table[mqdes].mq_info[newmsg].next;
483
  mq_table[mqdes].count++;
484
 
485
  /* fill the data */
486
  memcpy(mq_table[mqdes].mq_data + newmsg * mq_table[mqdes].msgsize,
487
         msg_ptr, msg_len);
488
  mq_table[mqdes].mq_info[ newmsg ].mq_prio = msg_prio;
489
  mq_table[mqdes].mq_info[ newmsg ].msglen  = msg_len;
490
 
491
  /* insert the data in an ordered way */
492
  insert_mq_entry(mqdes, newmsg);
493
 
494
//  kern_printf("Ûmq_des=%d, newmsg=%d, count=%dÛ",
495
//              mqdes, newmsg, mq_table[mqdes].count);
496
 
497
  if (mq_table[mqdes].count == 1) {
498
    /* the mq was empty */
499
    PID p;
500
 
29 pj 501
    p = iq_getfirst(&mq_table[mqdes].blocked_rcv);
2 pj 502
 
503
    if ( p != NIL) {
504
      /* The first blocked task has to be woken up */
505
      LEVEL l;
506
 
507
      proc_table[exec_shadow].context = ll_context_from();
508
 
509
      l = proc_table[p].task_level;
38 pj 510
      level_table[l]->public_unblock(l,p);
2 pj 511
 
512
      /* Preempt if necessary */
513
      scheduler();
514
      kern_context_load(proc_table[exec_shadow].context);
515
      return 0;
516
    }
517
    else if (mq_table[mqdes].flags & MQ_NOTIFICATION_PRESENT) {
518
      mq_table[mqdes].flags &= ~MQ_NOTIFICATION_PRESENT;
519
 
520
      // manage the notification...
521
      if (mq_table[mqdes].notification.sigev_notify == SIGEV_SIGNAL) {
522
          // there is no signal pending... post the signal!!!
523
          sigqueue_internal(0,
524
                            mq_table[mqdes].notification.sigev_signo,
525
                            mq_table[mqdes].notification.sigev_value,
526
                            SI_MESGQ);
527
      } else if (mq_table[mqdes].notification.sigev_notify == SIGEV_THREAD) {
528
        /* a new thread must be created; note that the pthread_create
529
           calls task_createn and task_activate; if task_activate is called
530
           into signal handlers and calls event_need_reschedule */
531
        pthread_t new_thread;
532
 
533
        if (mq_table[mqdes].notification.sigev_notify_attributes)
534
          pthread_create(&new_thread,
535
                         mq_table[mqdes].notification.sigev_notify_attributes,
536
                         (void *(*)(void *))mq_table[mqdes].notification.sigev_notify_function,
537
                         mq_table[mqdes].notification.sigev_value.sival_ptr);
538
        else {
539
          pthread_attr_t new_attr;
540
          // the task must be created detached
541
          pthread_attr_init(&new_attr);
542
          pthread_attr_setdetachstate(&new_attr, PTHREAD_CREATE_DETACHED);
543
 
544
          pthread_create(&new_thread,
545
                         &new_attr,
546
                         (void *(*)(void *))mq_table[mqdes].notification.sigev_notify_function,
547
                         &mq_table[mqdes].notification.sigev_value);
548
        }
549
      }
550
    }
551
  }
552
 
318 giacomo 553
  kern_frestore(f);
2 pj 554
  return 0;
555
}
556
 
557
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
558
                   unsigned int *msg_prio)
559
{
560
  int msg;
561
  PID p;
562
  ssize_t returnvalue;
318 giacomo 563
  SYS_FLAGS f;
2 pj 564
 
565
  task_testcancel();
566
 
318 giacomo 567
  f = kern_fsave();
2 pj 568
 
569
  /* first, if it is the first time that mq_receive or mq_send is called,
570
     register the cancellation point */
571
  if (mq_once) {
572
    mq_once = 0;
573
    register_cancellation_point(mq_cancellation_point, NULL);
574
    register_interruptable_point(mq_interrupted_by_signal, NULL);
575
  }
576
 
577
  if (mqdes < 0 ||
578
      mqdes >= MQ_OPEN_MAX ||
579
      !(mq_table[mqdes].flags & MQ_USED) ) {
580
    errno = EBADF;
318 giacomo 581
    kern_frestore(f);
2 pj 582
    return -1;
583
  }
584
 
585
  if (msg_len > mq_table[mqdes].msgsize) {
586
    errno = EMSGSIZE;
318 giacomo 587
    kern_frestore(f);
2 pj 588
    return -1;
589
  }
590
 
591
  /* block the task if necessary */
592
  if (mq_table[mqdes].start == -1) {
593
    /* the message queue is empty!!! */
594
    if (mq_table[mqdes].flags & O_NONBLOCK) {
595
      errno = EAGAIN;
318 giacomo 596
      kern_frestore(f);
2 pj 597
      return -1;
598
    }
599
    else {
600
      LEVEL l;
601
 
602
      /* we block the task until:
603
         - a message arrives, or
604
         - a signal is sent to the task, or
605
         - the task is killed               */
606
 
607
      mqproc_table[exec_shadow].intsig = 0;
608
 
38 pj 609
      kern_epilogue_macro();
2 pj 610
 
611
      l = proc_table[exec_shadow].task_level;
38 pj 612
      level_table[l]->public_block(l,exec_shadow);
2 pj 613
 
614
      /* we insert the task into the message queue */
615
      proc_table[exec_shadow].status = WAIT_MQRECEIVE;
29 pj 616
      iq_priority_insert(exec_shadow,&mq_table[mqdes].blocked_rcv);
2 pj 617
 
618
      /* and finally we reschedule */
619
      exec = exec_shadow = -1;
620
      scheduler();
621
      ll_context_to(proc_table[exec_shadow].context);
622
      kern_deliver_pending_signals();
623
 
624
      /* mq_receive is a cancellation point... */
625
      task_testcancel();
626
 
627
      if (mqproc_table[exec_shadow].intsig) {
628
        errno = EINTR;
318 giacomo 629
        kern_frestore(f);
2 pj 630
        return -1;
631
      }
632
    }
633
  }
634
 
635
  /* Now there is at least one message...
636
     copy it to the destination, ... */
637
  msg = mq_table[mqdes].start;
638
  memcpy(msg_ptr,
639
         mq_table[mqdes].mq_data + msg * mq_table[mqdes].msgsize,
640
         mq_table[mqdes].msgsize);
641
 
642
  /* ...update the first messagee and the counters, ... */
643
  mq_table[mqdes].count++;
644
  mq_table[mqdes].start = mq_table[mqdes].mq_info[ msg ].next;
645
  /* and finally the free message queue */
646
  mq_table[mqdes].mq_info[ msg ].next = mq_table[mqdes].mq_first;
647
  mq_table[mqdes].mq_first = msg;
648
 
649
  /* return the priority if required */
650
  if (msg_prio) {
651
    *msg_prio = mq_table[mqdes].mq_info[ msg ].mq_prio;
652
  }
653
 
654
  /* set the returnvalue */
655
  returnvalue = mq_table[mqdes].mq_info[ msg ].msglen;
656
 
657
  /* if the mq was full, there may be a task into blocked-send queue */
29 pj 658
  p = iq_getfirst(&mq_table[mqdes].blocked_send);
2 pj 659
 
660
  if ( p != NIL) {
661
    /* The first blocked task on send has to be woken up */
662
    LEVEL l;
663
 
664
    proc_table[exec_shadow].context = ll_context_from();
665
 
666
    l = proc_table[p].task_level;
38 pj 667
    level_table[l]->public_unblock(l,p);
2 pj 668
 
669
    /* Preempt if necessary */
670
    scheduler();
671
    kern_context_load(proc_table[exec_shadow].context);
672
    return returnvalue;
673
  }
674
 
318 giacomo 675
  kern_frestore(f);
2 pj 676
  return returnvalue;
677
}
678
 
679
int mq_notify(mqd_t mqdes, const struct sigevent *notification)
680
{
318 giacomo 681
  SYS_FLAGS f;
2 pj 682
 
318 giacomo 683
  f = kern_fsave();
684
 
2 pj 685
  if (mqdes < 0 ||
686
      mqdes >= MQ_OPEN_MAX ||
687
      !(mq_table[mqdes].flags & MQ_USED) ) {
688
    errno = EBADF;
318 giacomo 689
    kern_frestore(f);
2 pj 690
    return -1;
691
  }
692
 
693
  if (mq_table[mqdes].flags & MQ_NOTIFICATION_PRESENT) {
694
    if (!notification) {
695
      mq_table[mqdes].flags &= ~MQ_NOTIFICATION_PRESENT;
318 giacomo 696
      kern_frestore(f);
2 pj 697
      return 0;
698
    }
699
    else {
700
      errno = EBUSY;
318 giacomo 701
      kern_frestore(f);
2 pj 702
      return -1;
703
    }
704
  }
705
 
706
  mq_table[mqdes].flags |= MQ_NOTIFICATION_PRESENT;
707
 
708
  memcpy(&mq_table[mqdes].notification, notification,sizeof(struct sigevent));
709
 
318 giacomo 710
  kern_frestore(f);
2 pj 711
  return 0;
712
}
713
 
714
int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
715
               struct mq_attr *omqstat)
716
{
318 giacomo 717
  SYS_FLAGS f;
2 pj 718
 
318 giacomo 719
  f = kern_fsave();
720
 
2 pj 721
  if (mqdes < 0 ||
722
      mqdes >= MQ_OPEN_MAX ||
723
      !(mq_table[mqdes].flags & MQ_USED) ) {
724
    errno = EBADF;
318 giacomo 725
    kern_frestore(f);
2 pj 726
    return -1;
727
  }
728
 
729
  if (omqstat) {
730
    omqstat->mq_flags   = mq_table[mqdes].flags & O_NONBLOCK;
731
    omqstat->mq_maxmsg  = mq_table[mqdes].maxmsg;
732
    omqstat->mq_msgsize = mq_table[mqdes].msgsize;
733
    omqstat->mq_curmsgs = mq_table[mqdes].count;
734
  }
735
 
736
  mq_table[mqdes].flags = (mq_table[mqdes].flags & ~O_NONBLOCK) |
737
                          (mqstat->mq_flags & O_NONBLOCK);
318 giacomo 738
  kern_frestore(f);
2 pj 739
  return 0;
740
}
741
 
742
int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
743
{
318 giacomo 744
  SYS_FLAGS f;
2 pj 745
 
318 giacomo 746
  f = kern_fsave();
747
 
2 pj 748
  if (mqdes < 0 ||
749
      mqdes >= MQ_OPEN_MAX ||
750
      !(mq_table[mqdes].flags & MQ_USED) ) {
751
    errno = EBADF;
318 giacomo 752
    kern_frestore(f);
2 pj 753
    return -1;
754
  }
755
 
756
  mqstat->mq_flags   = mq_table[mqdes].flags & O_NONBLOCK;
757
  mqstat->mq_maxmsg  = mq_table[mqdes].maxmsg;
758
  mqstat->mq_msgsize = mq_table[mqdes].msgsize;
759
  mqstat->mq_curmsgs = mq_table[mqdes].count;
760
 
318 giacomo 761
  kern_frestore(f);
2 pj 762
  return 0;
763
}