Subversion Repositories shark

Rev

Blame | Last modification | View Log | RSS feed

%----------------------------------------------------------------------------
\chapter{Synchronization and communication}
%----------------------------------------------------------------------------

This chapter describes the task's interaction capabilities provided by the
S.Ha.R.K. kernel. In order to improve the programming flexibility without
jeopardizing the hard tasks' a priori guarantee, the kernel implements different
mechanisms.

In general, hard tasks should not use system calls that can cause an unbounded
(or unknown) blocking time, since they can jeopardize the system schedulability.

\begin{figure}
\begin{center}\includegraphics{over.eps}\end{center}
\caption{EDF Scheduling - Overload due to a task\_delay().}
\label{fg:semaf-over}
\end{figure}

For example, consider Figure~\ref{fg:semaf-over}: in this case there are two
periodic tasks, $\tau_{1}$ (execution time $C_{1}=5$ and period $T_{1}=15$) and
$\tau_{2}$ (execution time $C_{2}=2$ and period $T_{2}=4$). Since the total
utilization factor is $U=1/2+1/3=5/6<1$, the system is schedulable by EDF, but
if $\tau_{1}$ blocks for $8$ time units at time $t=4$, $\tau_{2}$
misses a
deadline at time $t=16$.

For efficiency reasons, the system does not perform any check to avoid the use
of blocking primitives in hard tasks; so this aspect is left to the programmer
responsibility.

%----------------------------------------------------------------------------
\section{POSIX Semaphores}
%----------------------------------------------------------------------------

The primitives described in this Section covers the semaphore mechanism
interface that can be used by the S.Ha.R.K. applications. The semaphore
interface directly follows the POSIX semaphore interface; the S.Ha.R.K. Kernel
add also some other primitives that allows to increment/decrement the semaphore
counter by more than one unit at a time.

These primitives can be used both for synchronization and mutual exclusion. It
is worth noting that the traditional semaphore mechanism can cause unbounded
\emph{priority inversion}, so it is not suitable for hard real-time tasks.
Concerning the synchronization, we note that the guarantee mechanism does not
take synchronization into account; therefore the programmer should avoid to
explicitly synchronize hard tasks by means of blocking primitives. It is instead
possible to use a weak synchronization between hard real-time tasks, realized
through non-blocking semaphores.

Only \texttt{SEM\_NSEMS\_MAX} semaphores can be created in the system. If an
application needs to use the POSIX semaphores, it have to add the call to the
function

\texttt{void SEM\_register\_module(void);}

into the \texttt{\_\_kernel\_register\_levels\_\_} function of the
initialization file (see Volume III - S.Ha.R.K. Modules).

In this section will be briefly described the POSIX semaphore interface
\footnote{This section only described unnamed semaphores. The interface for
named semaphores is also provided, althoutgh it does not use a file system but
resolve the names internally (as allowed by the POSIX 1003.13 PSE51 profile).
}.
For a complete reference see the POSIX standard (the Linux manpage also works
well).

%----------------------------------------------------------------------------
\begin{intest}
SEM\_INIT\index{sem\_init()}
\end{intest}

\begin{description}

\item [\textbf{int sem\_init(sem\_t {*}sem, int pshared, unsigned int value);}]

\item [\textbf{Description:}] It is used to initialize a semaphore referred by
sem. The value of the initialized semaphore is \texttt{value}. The pshared
argument is ignored. After the call to the primitive, the sem value can bve used
to refer the semaphore.

\item [\textbf{Return value:}] on successful completion, the function
initializes the semaphore in sem and returns 0. Otherwise, it returns -1 and
\texttt{errno} is set according to the POSIX standard.

\item [\textbf{See also}:] sem\_wait(), sem\_trywait, sem\_post(),
sem\_destroy().

\end{description}

\begin{description}
\item \texttt{[Example]}
\end{description}

\begin{verbatim}
sem_t mutex;

TASK demo(void *arg) {
    ...
   
    /* The task enters a critical section protected by a mutex semaphore */
    sem_wait(&mutex);
    <critical section>
    sem_post(&mutex);
    ...
}

int main(int argc, char**argv) {
    ...
    sem_init(&mutex, 0, 1);
    ...
}
\end{verbatim
}

%----------------------------------------------------------------------------
\begin{intest}
SEM\_DESTROY\index{sem\_destroy()}
\end{intest}

\begin{description}

\item [\textbf{int sem\_destroy(sem\_t {*}sem);}]

\item [\textbf{Description:}] It is used to destroy the semaphore indicated by
sem. Only a semaphore that was created using sem\_init() may be destroyed using
sem\_destroy(). \textbf{Warning}: This system call does not check if the
semaphore queue is empty or not, and does not awake tasks blocked on the
semaphore. The programmer has to make sure that \texttt{s} is free before
destroying it.

\item [\textbf{Return value}:] on successful completion, the function destroys
the sem semaphore and returns 0. Otherwise, it returns -1 and \texttt{errno} is
set according to the POSIX standard.

\item [\textbf{See also}:] sem\_init().
 
\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
SEM\_WAIT\index{sem\_wait()} and SEM\_TRYWAIT\index{sem\_trywait()}
\end{intest}

\begin{description}

\item [\textbf{int sem\_wait(sem\_t {*}sem);}]

\item [\textbf{int sem\_trywait(sem\_t {*}sem);}]

\item [\textbf{Description:}] is used to lock the semaphore referenced by
\texttt{sem}. If the semaphore value is currently zero, then the calling task
shall not return from the call to \texttt{sem\_wait}() until it either locks the
semaphore. \texttt{sem\_trywait} locks the semaphore referenced by \texttt{sem}
only if the semaphore is currently not locked; that is, if the semaphore value
is currently positive. Otherwise, it does not lock the semaphore.
\texttt{sem\_wait} is a cancellation point.

\item [\textbf{Return value:}] on successful completion the functions return 0.
Otherwise, they return -1 and \texttt{errno} is set according to the POSIX
standard.

\item [\textbf{See also}:] sem\_post().

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
SEM\_XWAIT\index{sem\_wait()}
\end{intest}

\begin{description}

\item [\textbf{BYTE sem\_xwait(sem\_t {*}s, int n, int wait);}]

\item [Description:]\texttt{sem\_xwait()} is a non-portable extension to the
POSIX semaphores that decreases the semaphore counter by \texttt{n}. If the
counter is greater than or equal to \texttt{n} and there are no tasks blocked on
semaphore \texttt{s}, the counter is decreased by \texttt{n} and
\texttt{sem\_xwait} returns 0, otherwise the system call's behavior depends on
the \texttt{b} parameter. If \texttt{wait} is \texttt{BLOCK}, the calling task
blocks on the semaphore, if \texttt{wait} is \texttt{NON\_BLOCK}
\texttt{sem\_xwait} returns -1, errno is set to EAGAIN and the calling task does
not block. The semaphore queue is ordered using a FIFO strategy, in order to
avoid starvation. Hard tasks should not use blocking system calls, so it is
suggested to use \texttt{sem\_trywait()/xwait()} (only with \texttt{b =
NON\_BLOCK}). \texttt{sem\_xwait} is a cancellation point.

\item [\textbf{Return value:}] on successful completion, the function returns 0.
Otherwise, it returns -1 and \texttt{errno} is set according to the POSIX
standard.

\item [\textbf{See also}:] sem\_wait(), sem\_trywait(), sem\_post().

\end{description}

\begin{description}
\item [\textbf{Example:}]
\end{description}

\begin{verbatim}
sem_t sync;

TASK demo(void *arg) {
    ...
    /* The demo task synchronizes itself */
    /* with the wake task, waiting */
    /* for 5 signals on the sync semaphore */
    sem_xwait(&sync, 5, BLOCK);
    ...
}

TASK wake(void *arg) {
    while (1) {
        ...
        sem_xsignal(sync, 1);
        ...
        task_endcycle();
    }
}

void main(void) {
    ...
    sem_init(&sync, 0, 0);
    ...
}
\end{verbatim
}

%----------------------------------------------------------------------------
\begin{intest}
SEM\_POST\index{sem\_post()}
\end{intest}

\begin{description}

\item [\textbf{int sem\_post(sem\_t {*}sem);}]

\item [\textbf{Description:}]It unlocks the semaphore referenced by sem by
performing the semaphore unlock operation on that semaphore. If the semaphore
queue is not empty and the first task in the queue requests a feasible counter
decrement, it can be awaken. The task is put in the ready queue and the
scheduler is invoked: for this reason this system call can cause a preemption.
The semaphore queue is a FIFO queue: tasks are awoken in a FIFO order according
to resource availability.

\item [\textbf{Return value:}] on successful completion, the function destroys
the sem semaphore and returns 0. Otherwise, it returns -1 and \texttt{errno} is
set according to the POSIX standard.

\item [\textbf{See also}:] sem\_wait(), sem\_trywait().

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
SEM\_XPOST\index{sem\_xpost()}
\end{intest}

\begin{description}

\item [\textbf{int sem\_xpost(sem\_t {*}s, int n);}]

\item [\textbf{Description:}]\texttt{sem\_xpost()} is a non-portable extension
to the POSIX semaphores that implements the classical signal primitive on
semaphore \texttt{s}, increasing the counter by \texttt{n}. If the semaphore
queue is not empty and the first task in the queue requests a feasible counter
decrement, it can be awaken. The task is put in the \texttt{READY} queue and the
scheduler is invoked: for this reason this system call can cause a preemption.
The semaphore queue is a FIFO queue: tasks are awoken in a FIFO order according
to resource availability.

\item [\textbf{Return value:}] on successful completion, the function destroys
the sem semaphore and returns 0. Otherwise, it returns -1 and \texttt{errno} is
set according to the POSIX standard.

\item [\textbf{See also}:] \texttt{sem\_init(), sem\_wait(), sem\_destroy()}.

\item [Example:]see \texttt{sem\_wait()}.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
SEM\_GETVALUE\index{sem\_getvalue()}
\end{intest}

\begin{description}

\item [\textbf{int sem\_getvalue(sem\_t {*}sem, int {*}sval);}]

\item [\textbf{Description:}]\texttt{sem\_getvalue()} updates the location
referenced by the sval argument to have the value of the semaphore referenced by
sem without affecting the state of the semaphore. If sem is locked the value
returned by sem\_getvalue is a negative number whose absolute value represents
the number of processes waiting for the semaphore at some unspecified time
during the call.

\item [\textbf{Return value:}] on successful completion, the function destroys
the sem semaphore and returns 0. Otherwise, it returns -1 and \texttt{errno} is
set according to the POSIX standard.

\item [\textbf{See also}:] \texttt{sem\_init(), sem\_wait(), sem\_destroy()}.

\end{description
}

%----------------------------------------------------------------------------
\section{Internal Semaphores}
%----------------------------------------------------------------------------

When developing a complex driver of the kernel, a designer usually needs to
manage a lot of shared resources that have to be accessed in mutual exclusion,
and needs also a lot of synchronization points that are not cancellation points.
For these pourposes the POSIX semaphores are not good because they are limited
in number and they are cancellation points.

For this pourpose the S.Ha.R.K. Kernel provides a sort of lightweight semaphores
called \emph{internal} semaphores, that fulfill the designer needs \footnote{The
existence of two type of semaphores is not new in Kernel development; For
example, the Linux Kernel differentiate the semaphores used by the applications
and the semaphors used by the Kernel.}: they are not cancellation points and
there is no limit on the number of semaphores that can be created in a system
\footnote{Only 8 bytes are taken for each internal semaphore. In some sense the
internal semaphores are similar to the POSIX mutexes...
}. The interface of the
Internal semaphores is very similar to POSIX semaphore interface.

To use the Internal Semaphores, you don't need to call any registration function
at kernel startup time.

%----------------------------------------------------------------------------
\begin{intest}
INTERNAL\_SEM\_INIT\index{internal\_sem\_init()}
\end{intest}

\begin{description}

\item [\textbf{void internal\_sem\_init(internal\_sem\_t {*}s, int value);}]

\item [\textbf{Description:}]It initializes the internal semaphore \texttt{s}
with a specified \texttt{value}.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
INTERNAL\_SEM\_WAIT\index{internal\_sem\_wait()}
\end{intest}

\begin{description}

\item [\textbf{void internal\_sem\_wait(internal\_sem\_t {*}s);}]

\item [\textbf{Description:}]It implements a blocking wait. the semaphore
counter is decremented by one.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
INTERNAL\_SEM\_TRYWAIT\index{internal\_sem\_trywait()}
\end{intest}

\begin{description}

\item [\textbf{int internal\_sem\_trywait(internal\_sem\_t {*}s);}]

\item [\textbf{Description:}]It implements a non-blocking wait. It returns 0 if
the counter is decremented, -1 if not.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
INTERNAL\_SEM\_POST\index{internal\_sem\_post()}
\end{intest}

\begin{description}

\item [\textbf{void internal\_sem\_post(internal\_sem\_t {*}s);}]

\item [\textbf{Description:}]It implements a post operation.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
INTERNAL\_SEM\_GETVALUE\index{internal\_sem\_getvalue()}
\end{intest}

\begin{description}

\item [\textbf{int internal\_sem\_getvalue(internal\_sem\_t {*}s);}]

\item [\textbf{Description:}]It returns a value greater or equal 0 if there are
no tasks blocked on s, -1 otherwise.

\end{description
}

%----------------------------------------------------------------------------
\section{Mutexes and Condition Variables}
%----------------------------------------------------------------------------

The primitives described in this section allows the user to define and use
\emph{mutexes} and \emph{condition variables}. A mutex can be thought as a
binary semaphore initialized to 1. In that way, a critical section can be
specified using the \emph{mutex\_lock} and \emph{mutex\_unlock
} primitives.
Moreover, using condition variables a task can block itself waiting for an
event.

The provided implementation extends the POSIX standard mutex functions
implementing protocols like Stack Resource Policy and Non Preemptive
Protocol, that are not part of the standard. To do that, the mutex
initialization interface is different from the standard to allow the
specification of the various policies. In any case, the standard interface
is provided based on the extended interface.

%----------------------------------------------------------------------------
\subsection{Mutex attributes}
%----------------------------------------------------------------------------

A mutex can be used to implement critical sections that uses different
policies (for example, the Priority Inheritance, Priority Ceiling
or Stack Resource Policy protocol). The S.Ha.R.K. Kernel provides
a set of structures derived from the basic structure mutexattr\_t
\footnote{Similar to the pthread\_mutexattr\_t structures of the POSIX
standard.
} that allow to handle the specification of different policies.

The mutex attributes are different foe every policy, that is implemented
by a Resouce Module. To see the the description of the mutex attributes
for every policy, look at the S.Ha.R.K. Modules Manual.

%----------------------------------------------------------------------------
\subsection{Functions}
%----------------------------------------------------------------------------

This subsection describes the functions that handle mutexes and condition
variables.

%----------------------------------------------------------------------------
\begin{intest}
MUTEX\_INIT\index{mutex\_init()}
\end{intest}

\begin{description}

\item [\textbf{int mutex\_init(mutex\_t {*}mutex, const mutexattr\_t {*}attr);}]

\item [\textbf{Description:}]The mutex\_init function inituializes the mutex
referenced by \emph{mutex} with attributes specified by \emph{attr}. \emph{attr}
shall be not equal NULL. Upon successful initialization, the state
of the mutex becomes initialized and unlocked.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\item [\textbf{See also}:] mutex\_destroy().

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
MUTEX\_DESTROY\index{mutex\_destroy()}
\end{intest}

\begin{description}

\item [\textbf{int mutex\_destroy(mutex\_t {*}mutex);}]

\item [\textbf{Description:}]The mutex\_destroy function destroys the mutex
object
referenced by mutex. It is safe to destroy an initialize mutex that
is unlocked.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\item [\textbf{See also}:] mutex\_init().

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
MUTEX\_LOCK\index{mutex\_lock()}
\end{intest}

\begin{description}

\item [\textbf{int mutex\_lock(mutex\_t {*}mutex);}]

\item [\textbf{Description:}]The mutex\_lock function locks an unlocked mutex.
If
the mutex is already locked , the calling thread waits until the mutex
becomes available. the behaviour of the function may change depending
on the particular policy passed with the mutexattr\_t parameter at
mutex initialization. The function is \emph{not} a cancellation point.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\end{description
}

\pagebreak

%----------------------------------------------------------------------------
\begin{intest}
MUTEX\_TRYLOCK\index{mutex\_trylock()}
\end{intest}

\begin{description}

\item [\textbf{int mutex\_lock(mutex\_t {*}mutex);}]

\item [\textbf{Description:}]The mutex\_trylock function is identycal to
mutex\_lock
except that if the mutex us locked when the function is called, the
calling task does not block but returns -1 and an errno value of EBUSY,
as specified by the POSIX standard.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
MUTEX\_UNLOCK\index{mutex\_unlock()}
\end{intest}

\begin{description}

\item [\textbf{int mutex\_unlock(mutex\_t {*}mutex);}]

\item [\textbf{Description:}]The mutex\_unlock function is called by the owner
of
the mutex object to release it. If there are thread blocked on the
mutex object referenced by mutex when mutex\_lock is called, the mutex
becomes available, and the task that will acquire the mutex depends
on the policy with that the mutex was initialized.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
COND\_INIT\index{cond\_init()}
\end{intest}

\begin{description}

\item [\textbf{int cond\_init(cond\_t {*}cond);}]

\item [\textbf{Description:}]The function initializes the condition variable
referenced
by cond.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
COND\_DESTROY\index{cond\_destroy()}
\end{intest}

\begin{description}

\item [\textbf{int cond\_destroy(cond\_t {*}cond);}]

\item [\textbf{Description:}]The function destroys the given condition variable
specified by cond.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
COND\_SIGNAL\index{cond\_signal()} and COND\_BROADCAST\index{cond\_broadcast()}
\end{intest}

\begin{description}

\item [\textbf{int cond\_signal(cond\_t {*}cond);}]

\item [\textbf{int cond\_broadcast(cond\_t {*}cond);}]

\item [\textbf{Description:}]The function cond\_signal unblocks at least one of
the threads that are blocked on the specified condition variable cond.
The function cond\_broadcast unblocks all threads currently blocked
on the specified condition variable cond. These functions have no
effect if there are no threads currently blocked on cond.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\end{description
}

%----------------------------------------------------------------------------
\begin{intest}
COND\_WAIT\index{cond\_wait()} and COND\_TIMEDWAIT\index{cond\_timedwait()}
\end{intest}

\begin{description}

\item [\textbf{int cond\_wait(cond\_t {*}cond, mutex\_t {*}mutex);}]

\item [\textbf{int cond\_timedwait(cond\_t {*}cond, mutex\_t {*}mutex,
const struct timespec {*}abstime);}
]

\item [\textbf{Description:}]These functions are used to block on a condition
variable.
They shall be called with \emph{mutex} locked by the calling task.
These functions release mutex and cause the calling task to block
on the condition variable cond. Upon successful return, the mutex
is locked and is owned by the calling task. When using condition variables,
there is always a Boolean predicate involving shared variables associated
with each condition wait that is true if the thread should proceed.
Spurious wakeups from the \texttt{cond\_wait} or \texttt{cond\_timedwait}
functions may occur. Since the return from \texttt{cond\_wait} or
\texttt{cond\_timedwait} does not imply anything about the value of
this predicate, the predicate should be re-evaluated upon each return.

The \texttt{cond\_wait} and cond\_timedwait functions are cancellation
points. When the cancelability enable state of a task is set to
\texttt{TASK\_CANCEL\_DEFERRED},
a side effect of acting upon a cancellation request while in a condition
wait is that the mutex is (in effect) reaquired before calling the
first cancellation cleanup handler. To ensure a correct cancellation,
a cleanup function should be pushed before the \texttt{cond\_wait}
call (in case of cancellation it simply unlock the mutex).

The \texttt{cond\_timedwait} function is the same as the \texttt{cond\_wait}
function except that an error is returned if the absolute time specified
by abstime passes before the condition cond is signaled or broadcasted,
or if the absolute time specified by abstime has already been passed
at the time of the call.

\item [\textbf{Return value:}] on successful completion the functions return
0. Otherwise, they return -1 and \texttt{errno} is set according to
the POSIX standard.

\end{description
}

%
% Tool: still in use?
%

% %----------------------------------------------------------------------------
% \section{Communication Ports \footnote{The S.Ha.R.K. communication ports are
% directly derived from the previous varsions of the Hartik Kernel.}}
% %----------------------------------------------------------------------------
%
% S.Ha.R.K. communication ports allow tasks to exchange messages. Each
% port is uniquely identified by a symbolic name (i.e., a string of
% characters); a task willing to use this communication facility has
% to open the channel using the \texttt{port\_create()} call, thus becoming
% the owner of the resource. Any other task that wants to use this communication
% end-point to send or receive data needs to connect to it by using
% the \texttt{port\_connect()} primitive.
%
% S.Ha.R.K. offers three types of ports:
%
% \begin{itemize}
%
% \item \texttt{STREAM}: it is a one-to-one communication facility, which
% can be opened either by the reader or by the writer task. The task
% executing the \texttt{port\_create()} must specify the message size
% and maximum number of messages in the queue. The task executing the
%
% \texttt{port\_connect()} must only specify the size of the messages
% it wants to receive/send, which can be different from the one specified
% by the owner. For example, a task may open a port for reading messages
% of 4 bytes, while another task can connect to it to write one-byte
% messages. This mechanism turns out to be useful for character oriented
% device drivers which need to fill a given structure, before the message
% can be processed further by a higher-level task.
%
% \item \texttt{MAILBOX}: it is a many-to-one communication facility, thought
% for being used in classical client/server mechanisms. This kind of
% port can only be opened by the reader task (the server) which wants
% to receive data from writer tasks (the clients). Message size is fixed
% and defined by the reader.
%
% \item \texttt{STICK}: it is a one-to-many communication facility intended
% to be used for exchanging periodic state-messages, for which the most
% recent information is relevant. It can be opened only by the (unique)
% writer task and the reading tasks must connect to it. It contains
% just one message and any new message posted by the writer will overwrite
% the previous one. Messages are non-consumable: a reader task can perform
% many readings of a given message until the writer posts a new one.
% \end{itemize}
%
% The first two kinds of port implement the synchronous communication
% paradigm, while \texttt{STICK} ports implement an asynchronous (state-message)
% paradigm. It is worth noting that in order to protect the internal
% data structures, \texttt{STREAM} ports use semaphores for synchronizing
% the accesses, \texttt{STICK} ports just use a mutual exclusion semaphore,
% and the \texttt{MAILBOX} ports use both kinds of semaphores.
%
% For this reason, \texttt{MAILBOX} and \texttt{STICK} ports should
% not be used by critical tasks, whereas \texttt{STREAM} ports can be
% used by any task requiring a state-message non-blocking semantics.
% Moreover, the execution time of a transaction depends on the message
% size (the message is copied in/from the buffer when a send/receive
% is performed). The semantics associated with each port is graphically
% illustrated in Figure \ref{fg:port-type}.
%
% An application that uses the communication ports, must register the
% HARTPORT Module. Please see Volume III - S.Ha.R.K. Modules for details.
%
% \begin{figure}
% \begin{center}\includegraphics[width=8cm]{port.eps}\end{center}
% \caption{HARTIK ports.\label{fg:port-type}}
% \end{figure}
%
% %----------------------------------------------------------------------------
% \begin{intest}
% PORT\_CREATE\index{port\_create()}
% \end{intest}
%
% \begin{description}
%
% \item [\textbf{PORT port\_create(char {*}name, int dim, int num,
% int type, int mode);}]
%
% \item [\textbf{Description:}]It opens the port identified by the string
% \texttt{name}.
% The argument \texttt{dim} specifies the message size in bytes, \texttt{num}
% specifies the queue size, \texttt{type} the port type (\texttt{STREAM},
% \texttt{MAILBOX}, or \texttt{STICK}), and \texttt{mode} the access
% mode (\texttt{READ} or \texttt{WRITE}).
%
% \item [\textbf{Return Value:}] The primitive returns the port identifier,
% which identifies the connection between the port and the task, and
% not the port itself, which is identified through its name. A return
% value -1 indicates that an error is occurred.
%
% \item [\textbf{See also}:] \texttt{port\_delete(), port\_connect(),
% port\_disconnect(), port\_send(), port\_receive()}.
%
% \end{description}
%
% \begin{description}
% \item [Example:\label{pg:port-ex}]
% \item \texttt{TASK demo(void)} \{
% \item \texttt{~~PORT p; }
% \item \texttt{~~char msg{[}6{]}; }
% \item \texttt{~~\ldots{}}
% \item \texttt{~~/{*} Demo task, of NRT type, opens the \char`\"{}goofy\char`\"{}
% port {*}/}
% \item \texttt{~~/{*} and sends a message of 6 bytes. {*}/}
% \item \texttt{~~p = port\_create(\char`\"{}goofy\char`\"{}, 6, 8, STREAM,
% WRITE);}
% \item \texttt{~~\ldots{}}
% \item \texttt{~~port\_send(p, msg, BLOCK); }
% \item \texttt{\}}
% \item \texttt{~}
% \item \texttt{TASK duro(void)} \{
% \item \texttt{~~PORT q; }
% \item \texttt{~~char msg{[}2{]}; }
% \item \texttt{~~/{*} Duro task (HARD) connects to the \char`\"{}goofy\char`\"{}
% {*}/}
% \item \texttt{~~/{*} port and receives messages of 2 bytes {*}/ }
% \item \texttt{~~q = port\_connect(\char`\"{}goofy\char`\"{}, 2, STREAM,
% READ);}
% \item \texttt{~~while (condition) \{}
% \item \texttt{~~~~\ldots{}}
% \item \texttt{~~~~if (port\_receive(q, msg, NON\textbackslash{}\_BLOCK) \{}
% \item \texttt{~~~~~~<action 1>;~/{*} Ready Message! {*}/}
% \item \texttt{~~~~\}}
% \item \texttt{~~~~else \{}
% \item \texttt{~~~~~~<action 2>;~/{*} Message not Ready! {*}/}
% \item \texttt{~~~~\}}
% \item \texttt{~~~~\ldots{}}
% \item \texttt{~~~~task\_endcycle();}
% \item \texttt{~~\}}
% \item \texttt{\}}
%
% \end{description}
%
% %----------------------------------------------------------------------------
% \begin{intest}
% PORT\_DELETE\index{port\_delete()}
% \end{intest}
%
% \begin{description}
%
% \item [\textbf{void port\_delete(PORT p)};]
%
% \item [\textbf{Description:}]It destroys the port identified by \texttt{p}.
%
% \item [\textbf{See also}:] \texttt{port\_create(), port\_connect(),
% port\_disconnect(),
% port\_send(), port\_receive()}.
%
% \item [\textbf{Example:}]see the example at page \pageref{pg:port-ex}.
%
% \end{description}
%
% %----------------------------------------------------------------------------
% \begin{intest}
% PORT\_CONNECT\index{port\_connect()}
% \end{intest}
%
% \begin{description}
%
% \item [\textbf{PORT port\_connect(char {*}name, int dim, int type,
% int mode);}]
%
% \item [\textbf{Description:}]It connects the calling task to the port identified
% by \texttt{name}. The argument \texttt{dim} specifies the message
% size in bytes, \texttt{type} the port type (\texttt{STREAM}, \texttt{MAILBOX},
% or \texttt{STICK}), and \texttt{mode} the access mode (\texttt{READ}
% or \texttt{WRITE}). If the port has not been opened by \texttt{port\_create()},
% the task is blocked, waiting for port creation. To avoid synchronization
% delays, connection should be established only \underbar{after} opening
% the port.
%
% \item [\textbf{Return value:}] The function returns the port identification
% number in the case of successful operation; else -1 is returned.
%
% \item [\textbf{See also}:] \texttt{port\_create(), port\_delete(),
% port\_disconnect(), port\_send(), port\_receive()}.
%
% \end{description}
%
% %----------------------------------------------------------------------------
% \begin{intest}
% PORT\_DISCONNECT\index{port\_disconnect()}
% \end{intest}
%
% \begin{description}
%
% \item [\textbf{void port\_disconnect(PORT p)};]
%
% \item [\textbf{Description:}]It closes the connection identified by \texttt{p}.
%
% \item [\textbf{See also}:] \texttt{port\_create(), port\_connect(),
% port\_delete(), port\_send(), port\_receive()}.
%
% \end{description}
%
% %----------------------------------------------------------------------------
% \begin{intest}
% PORT\_SEND\index{port\_send()}
% \end{intest}
%
% \begin{description}
%
% \item [\textbf{int port\_send(PORT p, char {*}msg, BYTE b);}]
%
% \item [\textbf{Description:}]It sends a message pointed by \texttt{msg} to the
% port identified by \texttt{p}. Message dimension is defined through
% \texttt{port\_create()}
% or \texttt{port\_connect()} and cannot be dynamically changed. The
% argument \texttt{b} can be \texttt{BLOCK} or \texttt{NON\_BLOCK}.
% If \texttt{b = BLOCK} and the port queue is full, then the task is
% blocked until the buffer is freed. If \texttt{b = NON\_BLOCK} and
% the port queue is full, then the primitive returns 0 and the message
% is not sent.
%
% \item [\textbf{Return value:}] 1 (TRUE) if the operation can be performed,
% 0 otherwise.
%
% \item [\textbf{See also}:] \texttt{port\_create(), port\_connect(),
% port\_disconnect(),
% port\_send(), port\_receive()}.
%
% \item [\textbf{Example:}]see the example at page \pageref{pg:port-ex}.
%
% \end{description}
%
% %----------------------------------------------------------------------------
% \begin{intest}
% PORT\_RECEIVE\index{port\_receive()}
% \end{intest}
%
% \begin{description}
%
% \item [\textbf{int port\_receive(PORT p, char {*}msg, BYTE b);}]
%
% \item [\textbf{Description:}]It receives a message from the port identified by
% \texttt{p} and copies it in a memory buffer pointed by \texttt{msg}. Message
% dimension is defined through \texttt{port\_create()} or \texttt{port\_connect()}
% and cannot be dynamically changed. The argument \texttt{b} can be
% \texttt{BLOCK} or \texttt{NON\_BLOCK}. If \texttt{b = BLOCK} and the
% port queue is empty, then the task is blocked until a message is available.
% If \texttt{b = NON\_BLOCK} and the port queue is empty, then the primitive
% returns 0 and no message is received.
%
% \item [\textbf{Return value:}] 1 (TRUE) if the operation can be performed,
% 0 otherwise.
%
% \item [\textbf{See also}:] \texttt{port\_create(), port\_connect(),
% port\_disconnect(), port\_send(), port\_receive()}.
%
% \item [\textbf{Example:}]see the example at page \pageref{pg:port-ex}.
%
% \end{description}

%----------------------------------------------------------------------------
\section{Cyclical Asynchronous Buffers}
%----------------------------------------------------------------------------

\textbf{Cyclical Asynchronous Buffers} (or CABs) represent a particular
mechanism purposely designed for the cooperation among periodic activities
with different activation rates. See \cite{But97} for implementation
details.

A CAB provides a one-to-many communication channel, which at any instant
contains the most recent message inserted into it. A message is not
consumed (that is, extracted) by a receiving process but is maintained
into the CAB structure until a new message is overwritten. As a consequence,
once the first message is put in a CAB, a task can never be blocked
during a receive operation. Similarly, since a new message overwrites
the old one, a sender can never be blocked.

Notice that, using such a semantics, a message can be read more than
once if the receiver is faster than the sender, while messages can
be lost if the sender is faster than the receiver. However, this is
not a problem in many control applications, where tasks are interested
only in fresh sensory data rather than in the complete message history
produced by a sensory acquisition task.

Notice that more tasks can simultaneously access the same buffer in
a CAB for reading. Also, if a task $P$ reserves a CAB for writing
while another task $Q$ is using that CAB, a new buffer is created,
so that $P$ can write its message without interfering with $Q$.
As $P$ finishes writing, its message becomes the most recent one
in that CAB. The maximum number of buffers that can be created in
a CAB is specified as a parameter in the \emph{cab\_create} primitive.
To avoid blocking, this number must be equal to the number of tasks
that use the CAB plus one.

CABs can be created and initialized by the \emph{cab\_create} primitive,
which requires the CAB name, the dimension of the message, and the
number of messages that the CAB may contain simultaneously. The
\emph{cab\_delete}
primitive removes a CAB from the system and releases the memory space
used by its data structures.

To insert a message in a CAB, a task must first reserve a buffer from
the CAB memory space, then copy the message into the buffer, and finally
put the buffer into the CAB structure, where it becomes the most recent
message. This is done according to the following scheme:

\vspace{5mm} \begin{tt}
\begin{tabbing}
\={b}uf\_pointer = {\textbf{cab\_reserve}}(cab\_id);\\
\>\(<\)copy message in *buf\_pointer\(>\)\\
\>{\textbf{cab\_putmes}}(cab\_id, buf\_pointer);\\
\end{tabbing}
\end{tt}

\noindent Similarly, to get a message from a CAB, a task has to get
the pointer to the most recent message, use the data, and release
the pointer. This is done according to the following scheme:

\vspace{5mm} \begin{tt}
\begin{tabbing}
\={m}es\_pointer = {\textbf{cab\_getmes}}(cab\_id);\\
\>\(<\)use message\(>\)\\
\>{\textbf{cab\_unget}}(cab\_id, mes\_pointer);
\end{tabbing}
\end{tt} A simple example of CABs' usage is reported below.

\label{pg:cab-ex} \begin{minipage}{\columnwidth}
\begin{small}
\begin{tt}
\begin{verbatim}
CAB cc;

void main(void) {
    SYS_PARMS parms = BASE_SYS;
 
    /* global declaration */
    sys_def_tick(parms, 1, mSec);
    sys_init(&parms);
   
    /* The CAB named cc contains a message of */
    /* 5 floats and can be used by two tasks */
    cc = cab_create("my_cab", 5 * sizeof(float), 3);
    task_activate(task_create("ll", read, HARD, APERIODIC, 100, NULL));
    task_activate(task_create("ss", write, HARD, PERIODIC, 333, NULL));
    ...
}

/*---------------------------------------------------------------------*/
TASK write(void) {
    float msg[5];
    char *pun;
    ...
   
    while (1) {
     
        /* send a message to the `cc' cab */
        pun = cab_reserve(cc);                /* reserve a buffer */
        memcpy(pun, msg, 5 * sizeof(float));
        cab_putmes(cc, pun);                  /* release the buffer */
        task_endcycle();
    }
}

/*---------------------------------------------------------------------*/
TASK read(void) {
    float msg[5];
    char *pun;
    ...
   
    while (1) {
   
        /* get a message from the  `cc' CAB */
        pun = cab_getmes(cc);                /* reserve a buffer */
        memcpy(msg, pun, 5 * sizeof(float));
        cab_unget(cc, pun);                  /* release the buffer */
        task_endcycle();
 }
}
\end{verbatim}
\end{tt}
\end{small}
\end{minipage
}

%----------------------------------------------------------------------------
\vspace{7mm} \begin{intest}
CAB\_CREATE\index{cab\_create()}
\end{intest}

\begin{description}

\item [\textbf{CAB cab\_create(char {*}name, int dim\_mes, BYTE num\_mes)}]

\item [\textbf{Description:}]It initializes a CAB. \texttt{name} is a pointer to
an identification string (used only for debugging purposes); \texttt{dim}
is the size of the messages contained in the CAB; \texttt{numbuf}
is the number of buffers the CAB is composed of. Notice that such
a number must be greater than or equal to the number of tasks that
use the CAB plus one.

\item [\textbf{Return value:}] It returns the index of the created CAB.

\item [\textbf{See also}:] \texttt{cab\_delete(), cab\_reserve(), cab\_putmes(),
cab\_getmes(), cab\_unget()}.

\end{description
}

%----------------------------------------------------------------------------
\vspace{7mm} \begin{intest}
CAB\_DELETE\index{cab\_delete()}
\end{intest}

\begin{description}

\item [\textbf{void cab\_delete(CAB cc);}]

\item [\textbf{Description:}]It removes the \texttt{cc} CAB from the system,
deallocating
its buffers and data structures.

\item [\textbf{See also}:] \texttt{cab\_create(), cab\_reserve(), cab\_putmes(),
cab\_getmes(), cab\_unget()}.

\end{description
}

%----------------------------------------------------------------------------
\vspace{7mm} \begin{intest}
CAB\_RESERVE\index{cab\_reserve()}
\end{intest}

\begin{description}

\item [\textbf{char {*}cab\_reserve(CAB cc);}]

\item [\textbf{Description:}]it reserves a buffer belonging to the \texttt{cc}
CAB and returns a pointer to it. The primitive has to be used only by
writers and \emph{never} by readers.

\item [\textbf{Return value:}] it returns a pointer to the reserved buffer.

\item [\textbf{See also}:] \texttt{cab\_delete(), cab\_create(), cab\_putmes(),
cab\_getmes(), cab\_unget()}.

\end{description
}

\pagebreak

%----------------------------------------------------------------------------
\begin{intest}
CAB\_PUTMES\index{cab\_putmes()}
\end{intest}

\begin{description}

\item [\textbf{void cab\_putmes(CAB id, char {*}pun)}]

\item [\textbf{Description:}]It inserts the message pointed by \texttt{pun} into
the CAB identified by \texttt{id}. This primitive must be used \emph{only}
by writing tasks.

\item [\textbf{See also}:] \texttt{cab\_delete(), cab\_create(), cab\_reserve(),
cab\_getmes(), cab\_unget()}.

\end{description
}

%----------------------------------------------------------------------------
\vspace{7mm} \begin{intest}
CAB\_GETMES\index{cab\_getmes()}
\end{intest}

\begin{description}

\item [\textbf{char {*}cab\_getmes(CAB cc);}]

\item [\textbf{Description:}]It returns a pointer to the latest message written
into the \texttt{cc} CAB. This primitive must be used \emph{only}
by reading tasks.

\item [\textbf{Returned value:}] It returns a pointer to the most recent
message contained in the CAB.

\item [\textbf{See also}:] \texttt{cab\_delete(), cab\_create(), cab\_putmes(),
cab\_reserve(), cab\_unget()}.

\end{description
}

%----------------------------------------------------------------------------
\vspace{7mm} \begin{intest}
CAB\_UNGET\index{cab\_unget()}
\end{intest}

\begin{description}

\item [\textbf{void cab\_unget(CAB cc, char {*}pun);}]

\item [\textbf{Description:}]it notifies the system that the buffer pointed by
\texttt{pun} belonging to the \texttt{cc} CAB is no longer used by the calling
task.

\item [\textbf{See also}:] \texttt{cab\_delete(), cab\_create(), cab\_reserve(),
cab\_getmes(), cab\_putmes()}.

\end{description}

\section{POSIX Message Queues
}

S.Ha.R.K. provides the message passing function defined in the POSIX
standard. For more information, see Section 15 of the POSIX standard,
Message Passing.