Subversion Repositories shark

Rev

Rev 3 | Go to most recent revision | Blame | Compare with Previous | Last modification | View Log | RSS feed

/*
 * Project: S.Ha.R.K.
 *
 * Coordinators:
 *   Giorgio Buttazzo    <giorgio@sssup.it>
 *   Paolo Gai           <pj@gandalf.sssup.it>
 *
 * Authors     :
 *   Paolo Gai           <pj@gandalf.sssup.it>
 *   (see the web pages for full authors list)
 *
 * ReTiS Lab (Scuola Superiore S.Anna - Pisa - Italy)
 *
 * http://www.sssup.it
 * http://retis.sssup.it
 * http://shark.sssup.it
 */


/**
 ------------
 CVS :        $Id: mqueue.c,v 1.1.1.1 2002-03-29 14:12:52 pj Exp $

 File:        $File$
 Revision:    $Revision: 1.1.1.1 $
 Last update: $Date: 2002-03-29 14:12:52 $
 ------------

 POSIX message queues

**/


/*
 * Copyright (C) 2000 Paolo Gai
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *
 */


#include <mqueue.h>
#include <ll/string.h>
#include <kernel/types.h>
#include <kernel/var.h>
#include <kernel/func.h>
#include <errno.h>
#include <stdarg.h>
#include <pthread.h>
#include <sys/types.h>

/* some flags... */
#define MQ_USED                 1
#define MQ_NONBLOCK             2
#define MQ_NOTIFICATION_PRESENT 4

static int mq_once = 1;

struct mq_elem {
  unsigned int mq_prio;   /* the priority of a message */
  ssize_t msglen;         /* the length of a message   */
  int next;               /* the priority queue        */
};

/* Semaphores descriptor tables */
static struct mq_des {
    char *name;              /* a name */
    int  flags;              /* flags... */

    long maxmsg;             /* maximum number of messages */
    long msgsize;            /* Maximum message size */

    long count;              /* Number of messages currently queued */
    long start;              /* first not-empty message */

    BYTE *mq_data;           /* the data... */
    struct mq_elem *mq_info; /* the priorities */
    int mq_first;            /* the first empty message */

    struct sigevent notification; /* the notification, valid only if the
                                     correct bit is set */


    /* the blocked processes queues */
    QQUEUE blocked_send;
    QQUEUE blocked_rcv;

    int next;            /* the mq queue */
} mq_table[MQ_OPEN_MAX];


/* this -IS- an extension to the proc_table!!! */
static struct {
    int intsig;   /* Normally it is =0, -1 only when a task is woken up
                     by a signal */

    int mqdes;    /* message queue on which a task is blocked (meaningless
                     if the task is not blocked...) */

} mqproc_table[MAX_PROC];

static QUEUE free_mq;         /* Queue of free sem                    */

mqd_t mq_open(const char *name, int oflag, ...)
{
    int     i;
    int     found = 0;
    mode_t  m;
    mqd_t   mq;
    struct mq_attr *attr;

    kern_cli();

    for (i = 0; i < MQ_OPEN_MAX; i++)
      if (mq_table[i].flags & MQ_USED) {
        if (strcmp((char*)name, mq_table[i].name) == 0) {
          found = 1;
          break;
        }
      }
    if (found) {
      if (oflag == (O_CREAT | O_EXCL)) {
          errno = EEXIST;
          kern_sti();
          return -1;
      } else {
          kern_sti();
          return i;
      }
    } else {
      if (!(oflag & O_CREAT)) {
          errno = ENOENT;
          kern_sti();
          return -1;
      } else if (!(oflag & O_RDWR)) {
          errno = EACCES;
          kern_sti();
          return -1;
      } else {
          va_list l;

          va_start(l, oflag);
            m = va_arg(l,mode_t);
            attr = va_arg(l, struct mq_attr *);
          va_end(l);

          mq = free_mq;
          if (mq != -1) {
            mq_table[mq].name = kern_alloc(strlen((char *)name)+1);
            if (!mq_table[mq].name) {
              errno = ENOSPC;
              kern_sti();
              return -1;
            }
            strcpy(mq_table[mq].name, (char *)name);

            if (attr) {
              mq_table[mq].maxmsg  = attr->mq_maxmsg;
              mq_table[mq].msgsize = attr->mq_msgsize;
            }
            else {
              mq_table[mq].maxmsg  = MQ_DEFAULT_MAXMSG;
              mq_table[mq].msgsize = MQ_DEFAULT_MSGSIZE;
            }
            qq_init(&mq_table[mq].blocked_send);
            qq_init(&mq_table[mq].blocked_rcv);

            mq_table[mq].count = 0;
            mq_table[mq].start = -1;
       
            mq_table[mq].mq_first = 0;

            if (oflag & O_NONBLOCK)
              mq_table[mq].flags = MQ_USED | MQ_NONBLOCK;
            else
              mq_table[mq].flags = MQ_USED;

            mq_table[mq].mq_data = (BYTE *)
              kern_alloc(mq_table[mq].maxmsg * mq_table[mq].msgsize);
            if (!mq_table[mq].mq_data) {
              kern_free(mq_table[mq].name,strlen((char *)name)+1);

              errno = ENOSPC;
              kern_sti();
              return -1;
            }

            mq_table[mq].mq_info = (struct mq_elem *)
              kern_alloc(mq_table[mq].maxmsg * sizeof(struct mq_elem));
            if (!mq_table[mq].mq_info) {
              kern_free(mq_table[mq].name,strlen((char *)name)+1);
              kern_free(mq_table[mq].mq_data,
                        mq_table[mq].maxmsg * mq_table[mq].msgsize);

              errno = ENOSPC;
              kern_sti();
              return -1;
            }

            /* set up the element queue */
            for (i=0; i<mq_table[mq].maxmsg-1; i++)
              mq_table[mq].mq_info[i].next = i+1;
            mq_table[mq].mq_info[mq_table[mq].maxmsg-1].next = -1;
            mq_table[mq].mq_first = 0;

            free_mq = mq_table[mq].next;
            kern_sti();
            return mq;
          }
          else {
            errno = ENOSPC;
            kern_sti();
            return -1;
          }
      }
    }
}

int mq_close(mqd_t mqdes)
{
    kern_cli();

    if (mqdes < 0 ||
        mqdes >= MQ_OPEN_MAX ||
        !(mq_table[mqdes].flags & MQ_USED) ) {
      errno = EBADF;
      kern_sti();
      return -1;
    }

    kern_free(mq_table[mqdes].name, strlen(mq_table[mqdes].name)+1);
    kern_free(mq_table[mqdes].mq_data,
              mq_table[mqdes].maxmsg * mq_table[mqdes].msgsize);
    kern_free(mq_table[mqdes].mq_info,
              mq_table[mqdes].maxmsg * sizeof(struct mq_elem));

    mq_table[mqdes].flags = 0;
    mq_table[mqdes].next = free_mq;
    free_mq = mqdes;

    kern_sti();
    return 0;
}

int mq_unlink(const char *name)
{
    int i;
    int found = 0;

    kern_cli();

    for (i = 0; i < MQ_OPEN_MAX; i++)
      if (mq_table[i].flags & MQ_USED) {
        if (strcmp((char*)name, mq_table[i].name) == 0) {
          found = 1;
        }
      }

    if (found) {
      kern_free(mq_table[i].name, strlen((char *)name)+1);
      kern_free(mq_table[i].mq_data,
                mq_table[i].maxmsg * mq_table[i].msgsize);
      kern_free(mq_table[i].mq_info,
                mq_table[i].maxmsg * sizeof(struct mq_elem));
 
      mq_table[i].flags = 0;
      mq_table[i].next = free_mq;
      free_mq = i;
      kern_sti();
      return 0;
    } else {
      errno = ENOENT;
      kern_sti();
      return -1;
    }
}

/* this function inserts a message in amessage queue mantaining the
   priority order */

static void insert_mq_entry(mqd_t mqdes, int newmsg)
{
    int prio; /* the priority of the message to insert */
    int p,q;  /* the messages... */

    p = NIL;
    q = mq_table[mqdes].start;
    prio = mq_table[mqdes].mq_info[ newmsg ].mq_prio;

    while ((q != NIL) && (prio <= mq_table[mqdes].mq_info[ q ].mq_prio)) {
        p = q;
        q = mq_table[mqdes].mq_info[ q ].next;
    }

    if (p != NIL)
      mq_table[mqdes].mq_info[ p ].next = newmsg;
    else
      mq_table[mqdes].start = newmsg;

    mq_table[mqdes].mq_info[ newmsg ].next = q;
}






/* this is the test that is done when a task is being killed
   and it is waiting on a sigwait */

static int mq_cancellation_point(PID i, void *arg)
{
    LEVEL l;

    if (proc_table[i].status == WAIT_MQSEND) {
      /* the task that have to be killed is waiting on a mq_send */

      /* we have to extract the task from the blocked queue... */
      qq_extract(i,&mq_table[mqproc_table[i].mqdes].blocked_send);

      /* and the task have to be reinserted into the ready queues, so it
         will fall into task_testcancel */

      l = proc_table[i].task_level;
      level_table[l]->task_insert(l,i);

      return 1;
    }

    if (proc_table[i].status == WAIT_MQRECEIVE) {
      /* the task that have to be killed is waiting on a mq_send */

      /* we have to extract the task from the blocked queue... */
      qq_extract(i, &mq_table[mqproc_table[i].mqdes].blocked_rcv);

      /* and the task have to be reinserted into the ready queues, so it
         will fall into task_testcancel */

      l = proc_table[i].task_level;
      level_table[l]->task_insert(l,i);

      return 1;
    }

    return 0;
}

int mq_interrupted_by_signal(PID i, void *arg)
{
    LEVEL l;

    if (proc_table[i].status == WAIT_MQSEND) {
      /* the task is waiting on a nanosleep and it is still receiving a
         signal... */

      mqproc_table[exec_shadow].intsig = 1;

      /* we have to extract the task from the blocked queue... */
      qq_extract(i, &mq_table[mqproc_table[i].mqdes].blocked_send);

      /* and the task have to be reinserted into the ready queues, so it
         will fall into task_testcancel */

      l = proc_table[i].task_level;
      level_table[l]->task_insert(l,i);

      return 1;
    }

    if (proc_table[i].status == WAIT_MQRECEIVE) {
      /* the task is waiting on a nanosleep and it is still receiving a
         signal... */

      mqproc_table[exec_shadow].intsig = 1;

      /* we have to extract the task from the blocked queue... */
      qq_extract(i, &mq_table[mqproc_table[i].mqdes].blocked_rcv);

      /* and the task have to be reinserted into the ready queues, so it
         will fall into task_testcancel */

      l = proc_table[i].task_level;
      level_table[l]->task_insert(l,i);

      return 1;
    }

    return 0;
}





int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
            unsigned int msg_prio)
{
  int newmsg;

  task_testcancel();

  kern_cli();

  /* first, if it is the first time that mq_receive or mq_send is called,
     register the cancellation point */

  if (mq_once) {
    mq_once = 0;
    register_cancellation_point(mq_cancellation_point, NULL);
    register_interruptable_point(mq_interrupted_by_signal, NULL);
  }

  if (mqdes < 0 ||
      mqdes >= MQ_OPEN_MAX ||
      !(mq_table[mqdes].flags & MQ_USED) ) {
    errno = EBADF;
    kern_sti();
    return -1;
  }

  if (msg_len > mq_table[mqdes].msgsize) {
    errno = EMSGSIZE;
    kern_sti();
    return -1;
  }

  if (msg_prio > MQ_PRIO_MAX) {
    errno = EINVAL;
    kern_sti();
    return -1;
  }

  /* block the task if necessary */
  if (mq_table[mqdes].mq_first == -1) {
    /* the message queue is full!!! */
    if (mq_table[mqdes].flags & O_NONBLOCK) {
      errno = EAGAIN;
      kern_sti();
      return -1;
    }
    else {
      LEVEL l;
      struct timespec ty;
      TIME tx;

      /* we block the task until:
         - a message is received, or
         - a signal is sent to the task, or
         - the task is killed               */


      mqproc_table[exec_shadow].intsig = 0;

      /* SAME AS SCHEDULER... manage the capacity event and the load_info */
      ll_gettime(TIME_EXACT, &schedule_time);
      SUBTIMESPEC(&schedule_time, &cap_lasttime, &ty);
      tx = TIMESPEC2USEC(&ty);
      proc_table[exec_shadow].avail_time -= tx;
      jet_update_slice(tx);
      if (cap_timer != NIL) {
        event_delete(cap_timer);
        cap_timer = NIL;
      }
   
      l = proc_table[exec_shadow].task_level;
      level_table[l]->task_extract(l,exec_shadow);

      /* we insert the task in the message queue */
      proc_table[exec_shadow].status = WAIT_MQSEND;
      qq_insert(exec_shadow,&mq_table[mqdes].blocked_send);

      /* and finally we reschedule */
      exec = exec_shadow = -1;
      scheduler();
      ll_context_to(proc_table[exec_shadow].context);
      kern_deliver_pending_signals();

      /* mq_send is a cancellation point... */
      task_testcancel();

      if (mqproc_table[exec_shadow].intsig) {
        errno = EINTR;
        kern_sti();
        return -1;
      }
    }
  }

  /* Now there is space to insert a new message */
  /* alloc a descriptor */
  newmsg = mq_table[mqdes].mq_first;
  mq_table[mqdes].mq_first = mq_table[mqdes].mq_info[newmsg].next;
  mq_table[mqdes].count++;

  /* fill the data */
  memcpy(mq_table[mqdes].mq_data + newmsg * mq_table[mqdes].msgsize,
         msg_ptr, msg_len);
  mq_table[mqdes].mq_info[ newmsg ].mq_prio = msg_prio;
  mq_table[mqdes].mq_info[ newmsg ].msglen  = msg_len;

  /* insert the data in an ordered way */
  insert_mq_entry(mqdes, newmsg);

//  kern_printf("Ûmq_des=%d, newmsg=%d, count=%dÛ",
//              mqdes, newmsg, mq_table[mqdes].count);

  if (mq_table[mqdes].count == 1) {
    /* the mq was empty */
    PID p;

    p = qq_getfirst(&mq_table[mqdes].blocked_rcv);

    if ( p != NIL) {
      /* The first blocked task has to be woken up */
      LEVEL l;

      proc_table[exec_shadow].context = ll_context_from();

      l = proc_table[p].task_level;
      level_table[l]->task_insert(l,p);

      /* Preempt if necessary */
      scheduler();
      kern_context_load(proc_table[exec_shadow].context);
      return 0;
    }
    else if (mq_table[mqdes].flags & MQ_NOTIFICATION_PRESENT) {
      mq_table[mqdes].flags &= ~MQ_NOTIFICATION_PRESENT;

      // manage the notification...
      if (mq_table[mqdes].notification.sigev_notify == SIGEV_SIGNAL) {
          // there is no signal pending... post the signal!!!
          sigqueue_internal(0,
                            mq_table[mqdes].notification.sigev_signo,
                            mq_table[mqdes].notification.sigev_value,
                            SI_MESGQ);
      } else if (mq_table[mqdes].notification.sigev_notify == SIGEV_THREAD) {
        /* a new thread must be created; note that the pthread_create
           calls task_createn and task_activate; if task_activate is called
           into signal handlers and calls event_need_reschedule */

        pthread_t new_thread;
   
        if (mq_table[mqdes].notification.sigev_notify_attributes)
          pthread_create(&new_thread,
                         mq_table[mqdes].notification.sigev_notify_attributes,
                         (void *(*)(void *))mq_table[mqdes].notification.sigev_notify_function,
                         mq_table[mqdes].notification.sigev_value.sival_ptr);
        else {
          pthread_attr_t new_attr;
          // the task must be created detached
          pthread_attr_init(&new_attr);
          pthread_attr_setdetachstate(&new_attr, PTHREAD_CREATE_DETACHED);
   
          pthread_create(&new_thread,
                         &new_attr,
                         (void *(*)(void *))mq_table[mqdes].notification.sigev_notify_function,
                         &mq_table[mqdes].notification.sigev_value);
        }
      }
    }
  }

  kern_sti();
  return 0;
}

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
                   unsigned int *msg_prio)
{
  int msg;
  PID p;
  ssize_t returnvalue;

  task_testcancel();

  kern_cli();

  /* first, if it is the first time that mq_receive or mq_send is called,
     register the cancellation point */

  if (mq_once) {
    mq_once = 0;
    register_cancellation_point(mq_cancellation_point, NULL);
    register_interruptable_point(mq_interrupted_by_signal, NULL);
  }

  if (mqdes < 0 ||
      mqdes >= MQ_OPEN_MAX ||
      !(mq_table[mqdes].flags & MQ_USED) ) {
    errno = EBADF;
    kern_sti();
    return -1;
  }

  if (msg_len > mq_table[mqdes].msgsize) {
    errno = EMSGSIZE;
    kern_sti();
    return -1;
  }

  /* block the task if necessary */
  if (mq_table[mqdes].start == -1) {
    /* the message queue is empty!!! */
    if (mq_table[mqdes].flags & O_NONBLOCK) {
      errno = EAGAIN;
      kern_sti();
      return -1;
    }
    else {
      LEVEL l;
      struct timespec ty;
      TIME tx;

      /* we block the task until:
         - a message arrives, or
         - a signal is sent to the task, or
         - the task is killed               */


      mqproc_table[exec_shadow].intsig = 0;

      /* SAME AS SCHEDULER... manage the capacity event and the load_info */
      ll_gettime(TIME_EXACT, &schedule_time);
      SUBTIMESPEC(&schedule_time, &cap_lasttime, &ty);
      tx = TIMESPEC2USEC(&ty);
      proc_table[exec_shadow].avail_time -= tx;
      jet_update_slice(tx);
      if (cap_timer != NIL) {
        event_delete(cap_timer);
        cap_timer = NIL;
      }
   
      l = proc_table[exec_shadow].task_level;
      level_table[l]->task_extract(l,exec_shadow);

      /* we insert the task into the message queue */
      proc_table[exec_shadow].status = WAIT_MQRECEIVE;
      qq_insert(exec_shadow,&mq_table[mqdes].blocked_rcv);

      /* and finally we reschedule */
      exec = exec_shadow = -1;
      scheduler();
      ll_context_to(proc_table[exec_shadow].context);
      kern_deliver_pending_signals();

      /* mq_receive is a cancellation point... */
      task_testcancel();

      if (mqproc_table[exec_shadow].intsig) {
        errno = EINTR;
        kern_sti();
        return -1;
      }
    }
  }

  /* Now there is at least one message...
     copy it to the destination, ... */

  msg = mq_table[mqdes].start;
  memcpy(msg_ptr,
         mq_table[mqdes].mq_data + msg * mq_table[mqdes].msgsize,
         mq_table[mqdes].msgsize);

  /* ...update the first messagee and the counters, ... */
  mq_table[mqdes].count++;
  mq_table[mqdes].start = mq_table[mqdes].mq_info[ msg ].next;
  /* and finally the free message queue */
  mq_table[mqdes].mq_info[ msg ].next = mq_table[mqdes].mq_first;
  mq_table[mqdes].mq_first = msg;

  /* return the priority if required */
  if (msg_prio) {
    *msg_prio = mq_table[mqdes].mq_info[ msg ].mq_prio;
  }

  /* set the returnvalue */
  returnvalue = mq_table[mqdes].mq_info[ msg ].msglen;

  /* if the mq was full, there may be a task into blocked-send queue */
  p = qq_getfirst(&mq_table[mqdes].blocked_send);

  if ( p != NIL) {
    /* The first blocked task on send has to be woken up */
    LEVEL l;

    proc_table[exec_shadow].context = ll_context_from();

    l = proc_table[p].task_level;
    level_table[l]->task_insert(l,p);

    /* Preempt if necessary */
    scheduler();
    kern_context_load(proc_table[exec_shadow].context);
    return returnvalue;
  }

  kern_sti();
  return returnvalue;
}

int mq_notify(mqd_t mqdes, const struct sigevent *notification)
{
  kern_cli();

  if (mqdes < 0 ||
      mqdes >= MQ_OPEN_MAX ||
      !(mq_table[mqdes].flags & MQ_USED) ) {
    errno = EBADF;
    kern_sti();
    return -1;
  }

  if (mq_table[mqdes].flags & MQ_NOTIFICATION_PRESENT) {
    if (!notification) {
      mq_table[mqdes].flags &= ~MQ_NOTIFICATION_PRESENT;
      kern_sti();
      return 0;
    }
    else {
      errno = EBUSY;
      kern_sti();
      return -1;
    }
  }

  mq_table[mqdes].flags |= MQ_NOTIFICATION_PRESENT;

  memcpy(&mq_table[mqdes].notification, notification,sizeof(struct sigevent));

  kern_sti();
  return 0;
}

int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
               struct mq_attr *omqstat)
{
  kern_cli();

  if (mqdes < 0 ||
      mqdes >= MQ_OPEN_MAX ||
      !(mq_table[mqdes].flags & MQ_USED) ) {
    errno = EBADF;
    kern_sti();
    return -1;
  }

  if (omqstat) {
    omqstat->mq_flags   = mq_table[mqdes].flags & O_NONBLOCK;
    omqstat->mq_maxmsg  = mq_table[mqdes].maxmsg;
    omqstat->mq_msgsize = mq_table[mqdes].msgsize;
    omqstat->mq_curmsgs = mq_table[mqdes].count;
  }

  mq_table[mqdes].flags = (mq_table[mqdes].flags & ~O_NONBLOCK) |
                          (mqstat->mq_flags & O_NONBLOCK);
  kern_sti();
  return 0;
}

int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
{
  kern_cli();

  if (mqdes < 0 ||
      mqdes >= MQ_OPEN_MAX ||
      !(mq_table[mqdes].flags & MQ_USED) ) {
    errno = EBADF;
    kern_sti();
    return -1;
  }

  mqstat->mq_flags   = mq_table[mqdes].flags & O_NONBLOCK;
  mqstat->mq_maxmsg  = mq_table[mqdes].maxmsg;
  mqstat->mq_msgsize = mq_table[mqdes].msgsize;
  mqstat->mq_curmsgs = mq_table[mqdes].count;

  kern_sti();
  return 0;
}