Subversion Repositories shark

Rev

Blame | Last modification | View Log | RSS feed

/*
 * Copyright 1995, Brown University, Providence, RI
 *
 * Permission to use and modify this software and its documentation for
 * any purpose other than its incorporation into a commercial product is
 * hereby granted without fee.  Permission to copy and distribute this
 * software and its documentation only for non-commercial use is also
 * granted without fee, provided, however, that the above copyright notice
 * appear in all copies, that both that copyright notice and this permission
 * notice appear in supporting documentation, that the name of Brown
 * University not be used in advertising or publicity pertaining to
 * distribution of the software without specific, written prior permission,
 * and that the person doing the distribution notify Brown University of
 * such distributions outside of his or her organization. Brown University
 * makes no representations about the suitability of this software for
 * any purpose.  It is provided "as is" without express or implied warranty.
 * Brown University requests notification of any modifications to this
 * software or its documentation.
 *
 * Send the following redistribution information:
 *
 *      Name:
 *      Organization:
 *      Address (postal and/or electronic):
 *
 * To:
 *      Software Librarian
 *      Computer Science Department, Box 1910
 *      Brown University
 *      Providence, RI 02912
 *
 *              or
 *
 *      brusd@cs.brown.edu
 *
 * We will acknowledge all electronic notifications.
 */


static char Version[] =
   "$Id: threadmp.c,v 1.1.1.1 2002-03-29 14:12:59 pj Exp $";
static char Trick_to_use_Version() {char *unused = Version; return unused[0];}

/* -------------------------------------------------------------------------
 *
 * Original code by Tom Meyer
 * Altered by Loring Holden to work with MPEGmovie object
 *
 * ------------------------------------------------------------------------- */


#include "thread_mp.H"
#include <iostream.h>
#include <unistd.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>

#if defined(sol)
#elif defined(sgi)
usptr_t *THREADprocessor_arena;
#endif

/* -----------------------  Private Methods  ------------------------------- */
//
//  ------------  SOL-specific stuff ----------------------
//
#if defined(sol)
#ifdef CPLPLTHREADS
void THREAD_processor::startup()
{
   while (1) {
      sema->wait(__FILE__, __LINE__);

      _start_routine(_arg,_arg2);

      semaDone->post(__FILE__, __LINE__);
   }
}

int THREAD_processor::wait_for_done()
{
   semaDone->wait(__FILE__, __LINE__);
   running=0;
   return 1;
}

int THREAD_processor::still_running()
{
        //try_wait returns 0 if semaphore could be decremented
        //         returns 1 if semaphore could not be decremented
        if (semaDone->try_wait()) {
                return 1;
        } else {
                running=0;
                return 0;
        }
}
#else

void *THREADprocessor_startup_func(void *proc)
{
   ((THREADprocessor *)proc)->startup_func();
   return NULL;
}

void THREAD_processor::startup_func()
{
   while (1) {
      sema_wait(&sema);

      _start_routine(_arg,_arg2);

      sema_post(&semaDone);
   }
}

int THREAD_processor::wait_for_done()
{
   sema_wait(&semaDone);
   running=0;
   return 1;
}

int THREAD_processor::still_running()
{
        //try_wait returns 0 if semaphore could be decremented
        //         returns 1 if semaphore could not be decremented
        if (sema_trywait(&semaDone)) {
                return 1;
        } else {
                running=0;
                return 0;
        }
}
#endif

char new_name[255];

THREAD_processor::THREAD_processor(char *name, char *thrname)
#if CPLPLTHREADS
: Thread (thrname)
#endif
{
#if CPLPLTHREADS
   char *newname,*newname2;

   newname = (char *)malloc(sizeof(char)*(strlen(name) + strlen("/sema")));
   sprintf(newname, "%s/sema", name);

   newname2 = (char *)malloc(sizeof(char)*(strlen(name) + strlen("/doneSema")));
   sprintf(newname2, "%s/doneSema", name);
   sema = new Semaphore(0, newname);
   if (sema==NULL) {
      cerr << "Could not allocate semaphore (sema)" << endl<<flush;
   }
   semaDone = new Semaphore(0, newname2);
   if (semaDone==NULL) {
      cerr << "Could not allocate semaphore (semaDone)" << endl<<flush;
   }
   make_runnable();
#else
   sema_init(&sema, 0, USYNC_THREAD, NULL);
   sema_init(&semaDone, 0, USYNC_THREAD, NULL);
   thr_create(NULL, NULL, THREADprocessor_startup_func, (void *)this,
              THR_BOUND, &thr);
#endif
   running=0;
}



void THREAD_processor::start_execute(void *(*start_routine)(void *, void*),
                                    void *arg, void* arg2)
{
   if (running) {
     fprintf(stderr,"start_execute: already executing\n");
     fflush(stderr);
   }
//   cerr << "Waking up a process" << endl << flush;
   _start_routine = start_routine;
   _arg = arg;
   _arg2 = arg2;
   running=1;
#ifdef CPLPLTHREADS
   sema->post(__FILE__, __LINE__);
#else
   sema_post(&sema);
#endif
}

int THREAD_mp_manager::num_procs() const
{
   return sysconf(_SC_NPROCESSORS_ONLN);
}

THREAD_mp_manager::THREAD_mp_manager(int _procs)
{
}

#elif defined(sgi)
//
//  ------------  SGI-specific stuff ----------------------
//

void THREADprocessor_startup_func(void *proc)
{
   ((THREADprocessor *)proc)->startup();
}

void THREAD_processor::startup()
{
   while (1) {
      // Wait for us to get restarted
      uspsema(_sema);
      // Execute the routine that got left for us to do
      _start_routine(_arg,_arg2);
     
      // If someone's blocked on us, unblock them
      usvsema(semaDone);
   }
}


void THREAD_processor::start_execute(void *(*start_routine)(void *, void *),
                                    void *arg, void *arg2)
{
   _start_routine = start_routine;
   _arg = arg;
   _arg2 = arg2;

   usvsema(_sema);
   running=1;
}

int THREAD_processor::wait_for_done()
{
   // Block the calling process
   uspsema(semaDone);
   running=0;
   return 1;
}

int THREAD_processor::still_running()
{
        if (ustestsema(semaDone)){
                uspsema(semaDone);
                running=0;
                return 0;
        } else {
                return 1;
        }
}

THREAD_processor::THREAD_processor(char *, char *)
{
   _sema = usnewsema(THREADprocessor_arena,0);     // Create our semaphore
   semaDone = usnewsema(THREADprocessor_arena,0);
   if (_sema==NULL) {
        cerr << "problem allocating _sema" << endl;
   }
   if (semaDone==NULL) {
        cerr << "problem allocating semaDone" << endl;
   }
   // Share all vertual space of the parent, Synchronize file open table
   if (sproc(THREADprocessor_startup_func, PR_SADDR | PR_SFDS,
          (void *)this)==-1) {
        cerr << "sproc problem" << endl;
   }
}

int THREAD_mp_manager::num_procs() const
{
   return prctl(PR_MAXPPROCS);
}

THREAD_mp_manager::THREAD_mp_manager(int)
{
   // Set us up so that everyone dies simultaneously
   prctl(PR_SETEXITSIG, SIGQUIT);

   // Allocate shared memory arena
   usconfig(CONF_INITUSERS, num_procs()*2 + 8);
   //Should not be a hard coded name in case we are running more than once
   //on the same machine

   char *tmpFile=tempnam("/tmp","THRED");
   THREADprocessor_arena = usinit(tmpFile);
   unlink(tmpFile);
   free(tmpFile);

   if (!THREADprocessor_arena) {
      cerr << "fatal error, shared memory arena could not be created"<<endl;
      exit(1);
   }
}

#else
//
//  ------------  Specific stuff for platforms without multi-threading support
//
THREAD_processor::THREAD_processor(char *, char *)
{
    running=0;
}

void THREAD_processor::start_execute(void *(*start_routine)(void *, void*),
     void *arg,void *arg2)
{
   start_routine(arg,arg2);
}

void THREAD_processor::startup()
{
}


int THREAD_processor::wait_for_done() {
   return 1;
}

int THREAD_processor::still_running()
{
   return 1;
}

THREAD_mp_manager::THREAD_mp_manager(int)
{
    cerr << "No multithreading..." << endl;
}

THREAD_mp_manager::num_procs() const
{
   return 1;
}

#endif

//
// General stuff for all platforms
//
void
THREADmp_manager::THREADmp_initproc(int i) {
   char *name, *thrname;

   name = (char *)malloc(sizeof(char)*50);
   thrname = (char *)malloc(sizeof(char)*50);
   sprintf(name, "Proc_%d", i);
   sprintf(thrname, "Proc_%d/thread", i);
   procs[i] = new THREADprocessor(name, thrname);
   enqThread(i);
}

void
THREADmp_manager::THREADmp_newproc()
{
   procs=(THREADprocessor **) realloc((void *) procs,++_num_procs*
       sizeof(THREADprocessor *));
   THREADmp_initproc(_num_procs-1);
}

THREADmp_manager::THREADmp_manager(int _procs)
: THREAD_mp_manager(_procs)
{
   cerr << "Creating mp manager\n";
   
   if (_procs == 0) {
      _num_procs = num_procs();
   } else {
      _num_procs = _procs;
   }

   cerr << "Num procs: " << num_procs() << endl << flush;
   procs = (THREADprocessor **)malloc(sizeof(THREADprocessor *) *_num_procs);
   
   for (int i = 0; i < _num_procs; i++) {
      THREADmp_initproc(i);
   }
   cerr << "Done\n";
}

void
THREADmp_manager::enqThread(int threadNumber)
{
        queue.enQueue(threadNumber);
}

int
THREADmp_manager::deqThread()
{
        int itemOk;
        int retVal=queue.deQueue(itemOk);

        if (!itemOk) {
                // This better allocate a thread, or we're not halting
                THREADmp_newproc();
                return deqThread();
        } else return retVal;
}

THREADmp_manager::~THREADmp_manager()
{
   for (int i = 0; i < _num_procs; i++)
      delete procs[i];
   free(procs);
}