Blame |
Last modification |
View Log
| RSS feed
/*
* Copyright 1995, Brown University, Providence, RI
*
* Permission to use and modify this software and its documentation for
* any purpose other than its incorporation into a commercial product is
* hereby granted without fee. Permission to copy and distribute this
* software and its documentation only for non-commercial use is also
* granted without fee, provided, however, that the above copyright notice
* appear in all copies, that both that copyright notice and this permission
* notice appear in supporting documentation, that the name of Brown
* University not be used in advertising or publicity pertaining to
* distribution of the software without specific, written prior permission,
* and that the person doing the distribution notify Brown University of
* such distributions outside of his or her organization. Brown University
* makes no representations about the suitability of this software for
* any purpose. It is provided "as is" without express or implied warranty.
* Brown University requests notification of any modifications to this
* software or its documentation.
*
* Send the following redistribution information:
*
* Name:
* Organization:
* Address (postal and/or electronic):
*
* To:
* Software Librarian
* Computer Science Department, Box 1910
* Brown University
* Providence, RI 02912
*
* or
*
* brusd@cs.brown.edu
*
* We will acknowledge all electronic notifications.
*/
static char Version
[] =
"$Id: threadmp.c,v 1.1.1.1 2002-03-29 14:12:59 pj Exp $";
static char Trick_to_use_Version
() {char *unused
= Version
; return unused
[0];}
/* -------------------------------------------------------------------------
*
* Original code by Tom Meyer
* Altered by Loring Holden to work with MPEGmovie object
*
* ------------------------------------------------------------------------- */
#include "thread_mp.H"
#include <iostream.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>
#if defined(sol)
#elif defined(sgi)
usptr_t
*THREADprocessor_arena
;
#endif
/* ----------------------- Private Methods ------------------------------- */
//
// ------------ SOL-specific stuff ----------------------
//
#if defined(sol)
#ifdef CPLPLTHREADS
void THREAD_processor
::startup()
{
while (1) {
sema
->wait
(__FILE__
, __LINE__
);
_start_routine
(_arg
,_arg2
);
semaDone
->post
(__FILE__
, __LINE__
);
}
}
int THREAD_processor
::wait_for_done()
{
semaDone
->wait
(__FILE__
, __LINE__
);
running
=0;
return 1;
}
int THREAD_processor
::still_running()
{
//try_wait returns 0 if semaphore could be decremented
// returns 1 if semaphore could not be decremented
if (semaDone
->try_wait
()) {
return 1;
} else {
running
=0;
return 0;
}
}
#else
void *THREADprocessor_startup_func
(void *proc
)
{
((THREADprocessor
*)proc
)->startup_func
();
return NULL
;
}
void THREAD_processor
::startup_func()
{
while (1) {
sema_wait
(&sema
);
_start_routine
(_arg
,_arg2
);
sema_post
(&semaDone
);
}
}
int THREAD_processor
::wait_for_done()
{
sema_wait
(&semaDone
);
running
=0;
return 1;
}
int THREAD_processor
::still_running()
{
//try_wait returns 0 if semaphore could be decremented
// returns 1 if semaphore could not be decremented
if (sema_trywait
(&semaDone
)) {
return 1;
} else {
running
=0;
return 0;
}
}
#endif
char new_name
[255];
THREAD_processor
::THREAD_processor(char *name
, char *thrname
)
#if CPLPLTHREADS
: Thread
(thrname
)
#endif
{
#if CPLPLTHREADS
char *newname
,*newname2
;
newname
= (char *)malloc(sizeof(char)*(strlen(name
) + strlen("/sema")));
sprintf(newname
, "%s/sema", name
);
newname2
= (char *)malloc(sizeof(char)*(strlen(name
) + strlen("/doneSema")));
sprintf(newname2
, "%s/doneSema", name
);
sema
= new Semaphore
(0, newname
);
if (sema
==NULL
) {
cerr
<< "Could not allocate semaphore (sema)" << endl
<<flush
;
}
semaDone
= new Semaphore
(0, newname2
);
if (semaDone
==NULL
) {
cerr
<< "Could not allocate semaphore (semaDone)" << endl
<<flush
;
}
make_runnable
();
#else
sema_init
(&sema
, 0, USYNC_THREAD
, NULL
);
sema_init
(&semaDone
, 0, USYNC_THREAD
, NULL
);
thr_create
(NULL
, NULL
, THREADprocessor_startup_func
, (void *)this
,
THR_BOUND
, &thr
);
#endif
running
=0;
}
void THREAD_processor
::start_execute(void *(*start_routine
)(void *, void*),
void *arg
, void* arg2
)
{
if (running
) {
fprintf(stderr
,"start_execute: already executing\n");
fflush(stderr
);
}
// cerr << "Waking up a process" << endl << flush;
_start_routine
= start_routine
;
_arg
= arg
;
_arg2
= arg2
;
running
=1;
#ifdef CPLPLTHREADS
sema
->post
(__FILE__
, __LINE__
);
#else
sema_post
(&sema
);
#endif
}
int THREAD_mp_manager
::num_procs() const
{
return sysconf
(_SC_NPROCESSORS_ONLN
);
}
THREAD_mp_manager
::THREAD_mp_manager(int _procs
)
{
}
#elif defined(sgi)
//
// ------------ SGI-specific stuff ----------------------
//
void THREADprocessor_startup_func
(void *proc
)
{
((THREADprocessor
*)proc
)->startup
();
}
void THREAD_processor
::startup()
{
while (1) {
// Wait for us to get restarted
uspsema
(_sema
);
// Execute the routine that got left for us to do
_start_routine
(_arg
,_arg2
);
// If someone's blocked on us, unblock them
usvsema
(semaDone
);
}
}
void THREAD_processor
::start_execute(void *(*start_routine
)(void *, void *),
void *arg
, void *arg2
)
{
_start_routine
= start_routine
;
_arg
= arg
;
_arg2
= arg2
;
usvsema
(_sema
);
running
=1;
}
int THREAD_processor
::wait_for_done()
{
// Block the calling process
uspsema
(semaDone
);
running
=0;
return 1;
}
int THREAD_processor
::still_running()
{
if (ustestsema
(semaDone
)){
uspsema
(semaDone
);
running
=0;
return 0;
} else {
return 1;
}
}
THREAD_processor
::THREAD_processor(char *, char *)
{
_sema
= usnewsema
(THREADprocessor_arena
,0); // Create our semaphore
semaDone
= usnewsema
(THREADprocessor_arena
,0);
if (_sema
==NULL
) {
cerr
<< "problem allocating _sema" << endl
;
}
if (semaDone
==NULL
) {
cerr
<< "problem allocating semaDone" << endl
;
}
// Share all vertual space of the parent, Synchronize file open table
if (sproc
(THREADprocessor_startup_func
, PR_SADDR
| PR_SFDS
,
(void *)this
)==-1) {
cerr
<< "sproc problem" << endl
;
}
}
int THREAD_mp_manager
::num_procs() const
{
return prctl
(PR_MAXPPROCS
);
}
THREAD_mp_manager
::THREAD_mp_manager(int)
{
// Set us up so that everyone dies simultaneously
prctl
(PR_SETEXITSIG
, SIGQUIT
);
// Allocate shared memory arena
usconfig
(CONF_INITUSERS
, num_procs
()*2 + 8);
//Should not be a hard coded name in case we are running more than once
//on the same machine
char *tmpFile
=tempnam
("/tmp","THRED");
THREADprocessor_arena
= usinit
(tmpFile
);
unlink
(tmpFile
);
free(tmpFile
);
if (!THREADprocessor_arena
) {
cerr
<< "fatal error, shared memory arena could not be created"<<endl
;
exit(1);
}
}
#else
//
// ------------ Specific stuff for platforms without multi-threading support
//
THREAD_processor
::THREAD_processor(char *, char *)
{
running
=0;
}
void THREAD_processor
::start_execute(void *(*start_routine
)(void *, void*),
void *arg
,void *arg2
)
{
start_routine
(arg
,arg2
);
}
void THREAD_processor
::startup()
{
}
int THREAD_processor
::wait_for_done() {
return 1;
}
int THREAD_processor
::still_running()
{
return 1;
}
THREAD_mp_manager
::THREAD_mp_manager(int)
{
cerr
<< "No multithreading..." << endl
;
}
THREAD_mp_manager
::num_procs() const
{
return 1;
}
#endif
//
// General stuff for all platforms
//
void
THREADmp_manager
::THREADmp_initproc(int i
) {
char *name
, *thrname
;
name
= (char *)malloc(sizeof(char)*50);
thrname
= (char *)malloc(sizeof(char)*50);
sprintf(name
, "Proc_%d", i
);
sprintf(thrname
, "Proc_%d/thread", i
);
procs
[i
] = new THREADprocessor
(name
, thrname
);
enqThread
(i
);
}
void
THREADmp_manager
::THREADmp_newproc()
{
procs
=(THREADprocessor
**) realloc((void *) procs
,++_num_procs
*
sizeof(THREADprocessor
*));
THREADmp_initproc
(_num_procs
-1);
}
THREADmp_manager
::THREADmp_manager(int _procs
)
: THREAD_mp_manager
(_procs
)
{
cerr
<< "Creating mp manager\n";
if (_procs
== 0) {
_num_procs
= num_procs
();
} else {
_num_procs
= _procs
;
}
cerr
<< "Num procs: " << num_procs
() << endl
<< flush
;
procs
= (THREADprocessor
**)malloc(sizeof(THREADprocessor
*) *_num_procs
);
for (int i
= 0; i
< _num_procs
; i
++) {
THREADmp_initproc
(i
);
}
cerr
<< "Done\n";
}
void
THREADmp_manager
::enqThread(int threadNumber
)
{
queue.
enQueue(threadNumber
);
}
int
THREADmp_manager
::deqThread()
{
int itemOk
;
int retVal
=queue.
deQueue(itemOk
);
if (!itemOk
) {
// This better allocate a thread, or we're not halting
THREADmp_newproc
();
return deqThread
();
} else return retVal
;
}
THREADmp_manager
::~THREADmp_manager
()
{
for (int i
= 0; i
< _num_procs
; i
++)
delete procs
[i
];
free(procs
);
}