Subversion Repositories shark

Rev

Go to most recent revision | Details | Last modification | View Log | RSS feed

Rev Author Line No. Line
2 pj 1
/*
2
 * Copyright 1995, Brown University, Providence, RI
3
 *
4
 * Permission to use and modify this software and its documentation for
5
 * any purpose other than its incorporation into a commercial product is
6
 * hereby granted without fee.  Permission to copy and distribute this
7
 * software and its documentation only for non-commercial use is also
8
 * granted without fee, provided, however, that the above copyright notice
9
 * appear in all copies, that both that copyright notice and this permission
10
 * notice appear in supporting documentation, that the name of Brown
11
 * University not be used in advertising or publicity pertaining to
12
 * distribution of the software without specific, written prior permission,
13
 * and that the person doing the distribution notify Brown University of
14
 * such distributions outside of his or her organization. Brown University
15
 * makes no representations about the suitability of this software for
16
 * any purpose.  It is provided "as is" without express or implied warranty.
17
 * Brown University requests notification of any modifications to this
18
 * software or its documentation.
19
 *
20
 * Send the following redistribution information:
21
 *
22
 *      Name:
23
 *      Organization:
24
 *      Address (postal and/or electronic):
25
 *
26
 * To:
27
 *      Software Librarian
28
 *      Computer Science Department, Box 1910
29
 *      Brown University
30
 *      Providence, RI 02912
31
 *
32
 *              or
33
 *
34
 *      brusd@cs.brown.edu
35
 *
36
 * We will acknowledge all electronic notifications.
37
 */
38
 
39
static char Version[] =
40
   "$Id: threadmp.c,v 1.1.1.1 2002-03-29 14:12:59 pj Exp $";
41
static char Trick_to_use_Version() {char *unused = Version; return unused[0];}
42
 
43
/* -------------------------------------------------------------------------
44
 *
45
 * Original code by Tom Meyer
46
 * Altered by Loring Holden to work with MPEGmovie object
47
 *
48
 * ------------------------------------------------------------------------- */
49
 
50
#include "thread_mp.H"
51
#include <iostream.h>
52
#include <unistd.h>
53
#include <signal.h>
54
#include <stdlib.h>
55
#include <stdio.h>
56
 
57
#if defined(sol)
58
#elif defined(sgi)
59
usptr_t *THREADprocessor_arena;
60
#endif
61
 
62
/* -----------------------  Private Methods  ------------------------------- */
63
//
64
//  ------------  SOL-specific stuff ----------------------
65
//
66
#if defined(sol)
67
#ifdef CPLPLTHREADS
68
void THREAD_processor::startup()
69
{
70
   while (1) {
71
      sema->wait(__FILE__, __LINE__);
72
 
73
      _start_routine(_arg,_arg2);
74
 
75
      semaDone->post(__FILE__, __LINE__);
76
   }
77
}
78
 
79
int THREAD_processor::wait_for_done()
80
{
81
   semaDone->wait(__FILE__, __LINE__);
82
   running=0;
83
   return 1;
84
}
85
 
86
int THREAD_processor::still_running()
87
{
88
        //try_wait returns 0 if semaphore could be decremented
89
        //         returns 1 if semaphore could not be decremented
90
        if (semaDone->try_wait()) {
91
                return 1;
92
        } else {
93
                running=0;
94
                return 0;
95
        }
96
}
97
#else
98
 
99
void *THREADprocessor_startup_func(void *proc)
100
{
101
   ((THREADprocessor *)proc)->startup_func();
102
   return NULL;
103
}
104
 
105
void THREAD_processor::startup_func()
106
{
107
   while (1) {
108
      sema_wait(&sema);
109
 
110
      _start_routine(_arg,_arg2);
111
 
112
      sema_post(&semaDone);
113
   }
114
}
115
 
116
int THREAD_processor::wait_for_done()
117
{
118
   sema_wait(&semaDone);
119
   running=0;
120
   return 1;
121
}
122
 
123
int THREAD_processor::still_running()
124
{
125
        //try_wait returns 0 if semaphore could be decremented
126
        //         returns 1 if semaphore could not be decremented
127
        if (sema_trywait(&semaDone)) {
128
                return 1;
129
        } else {
130
                running=0;
131
                return 0;
132
        }
133
}
134
#endif
135
 
136
char new_name[255];
137
 
138
THREAD_processor::THREAD_processor(char *name, char *thrname)
139
#if CPLPLTHREADS
140
: Thread (thrname)
141
#endif
142
{
143
#if CPLPLTHREADS
144
   char *newname,*newname2;
145
 
146
   newname = (char *)malloc(sizeof(char)*(strlen(name) + strlen("/sema")));
147
   sprintf(newname, "%s/sema", name);
148
 
149
   newname2 = (char *)malloc(sizeof(char)*(strlen(name) + strlen("/doneSema")));
150
   sprintf(newname2, "%s/doneSema", name);
151
   sema = new Semaphore(0, newname);
152
   if (sema==NULL) {
153
      cerr << "Could not allocate semaphore (sema)" << endl<<flush;
154
   }
155
   semaDone = new Semaphore(0, newname2);
156
   if (semaDone==NULL) {
157
      cerr << "Could not allocate semaphore (semaDone)" << endl<<flush;
158
   }
159
   make_runnable();
160
#else
161
   sema_init(&sema, 0, USYNC_THREAD, NULL);
162
   sema_init(&semaDone, 0, USYNC_THREAD, NULL);
163
   thr_create(NULL, NULL, THREADprocessor_startup_func, (void *)this,
164
              THR_BOUND, &thr);
165
#endif
166
   running=0;
167
}
168
 
169
 
170
 
171
void THREAD_processor::start_execute(void *(*start_routine)(void *, void*),
172
                                    void *arg, void* arg2)
173
{
174
   if (running) {
175
     fprintf(stderr,"start_execute: already executing\n");
176
     fflush(stderr);
177
   }
178
//   cerr << "Waking up a process" << endl << flush;
179
   _start_routine = start_routine;
180
   _arg = arg;
181
   _arg2 = arg2;
182
   running=1;
183
#ifdef CPLPLTHREADS
184
   sema->post(__FILE__, __LINE__);
185
#else
186
   sema_post(&sema);
187
#endif
188
}
189
 
190
int THREAD_mp_manager::num_procs() const
191
{
192
   return sysconf(_SC_NPROCESSORS_ONLN);
193
}
194
 
195
THREAD_mp_manager::THREAD_mp_manager(int _procs)
196
{
197
}
198
 
199
#elif defined(sgi)
200
//
201
//  ------------  SGI-specific stuff ----------------------
202
//
203
 
204
void THREADprocessor_startup_func(void *proc)
205
{
206
   ((THREADprocessor *)proc)->startup();
207
}
208
 
209
void THREAD_processor::startup()
210
{
211
   while (1) {
212
      // Wait for us to get restarted
213
      uspsema(_sema);
214
      // Execute the routine that got left for us to do
215
      _start_routine(_arg,_arg2);
216
 
217
      // If someone's blocked on us, unblock them
218
      usvsema(semaDone);
219
   }
220
}
221
 
222
 
223
void THREAD_processor::start_execute(void *(*start_routine)(void *, void *),
224
                                    void *arg, void *arg2)
225
{
226
   _start_routine = start_routine;
227
   _arg = arg;
228
   _arg2 = arg2;
229
 
230
   usvsema(_sema);
231
   running=1;
232
}
233
 
234
int THREAD_processor::wait_for_done()
235
{
236
   // Block the calling process
237
   uspsema(semaDone);
238
   running=0;
239
   return 1;
240
}
241
 
242
int THREAD_processor::still_running()
243
{
244
        if (ustestsema(semaDone)){
245
                uspsema(semaDone);
246
                running=0;
247
                return 0;
248
        } else {
249
                return 1;
250
        }
251
}
252
 
253
THREAD_processor::THREAD_processor(char *, char *)
254
{
255
   _sema = usnewsema(THREADprocessor_arena,0);     // Create our semaphore
256
   semaDone = usnewsema(THREADprocessor_arena,0);
257
   if (_sema==NULL) {
258
        cerr << "problem allocating _sema" << endl;
259
   }
260
   if (semaDone==NULL) {
261
        cerr << "problem allocating semaDone" << endl;
262
   }
263
   // Share all vertual space of the parent, Synchronize file open table
264
   if (sproc(THREADprocessor_startup_func, PR_SADDR | PR_SFDS,
265
          (void *)this)==-1) {
266
        cerr << "sproc problem" << endl;
267
   }
268
}
269
 
270
int THREAD_mp_manager::num_procs() const
271
{
272
   return prctl(PR_MAXPPROCS);
273
}
274
 
275
THREAD_mp_manager::THREAD_mp_manager(int)
276
{
277
   // Set us up so that everyone dies simultaneously
278
   prctl(PR_SETEXITSIG, SIGQUIT);
279
 
280
   // Allocate shared memory arena 
281
   usconfig(CONF_INITUSERS, num_procs()*2 + 8);
282
   //Should not be a hard coded name in case we are running more than once
283
   //on the same machine
284
 
285
   char *tmpFile=tempnam("/tmp","THRED");
286
   THREADprocessor_arena = usinit(tmpFile);
287
   unlink(tmpFile);
288
   free(tmpFile);
289
 
290
   if (!THREADprocessor_arena) {
291
      cerr << "fatal error, shared memory arena could not be created"<<endl;
292
      exit(1);
293
   }
294
}
295
 
296
#else
297
//
298
//  ------------  Specific stuff for platforms without multi-threading support
299
//
300
THREAD_processor::THREAD_processor(char *, char *)
301
{
302
    running=0;
303
}
304
 
305
void THREAD_processor::start_execute(void *(*start_routine)(void *, void*),
306
     void *arg,void *arg2)
307
{
308
   start_routine(arg,arg2);
309
}
310
 
311
void THREAD_processor::startup()
312
{
313
}
314
 
315
 
316
int THREAD_processor::wait_for_done() {
317
   return 1;
318
}
319
 
320
int THREAD_processor::still_running()
321
{
322
   return 1;
323
}
324
 
325
THREAD_mp_manager::THREAD_mp_manager(int)
326
{
327
    cerr << "No multithreading..." << endl;
328
}
329
 
330
THREAD_mp_manager::num_procs() const
331
{
332
   return 1;
333
}
334
 
335
#endif
336
 
337
//
338
// General stuff for all platforms
339
//
340
void
341
THREADmp_manager::THREADmp_initproc(int i) {
342
   char *name, *thrname;
343
 
344
   name = (char *)malloc(sizeof(char)*50);
345
   thrname = (char *)malloc(sizeof(char)*50);
346
   sprintf(name, "Proc_%d", i);
347
   sprintf(thrname, "Proc_%d/thread", i);
348
   procs[i] = new THREADprocessor(name, thrname);
349
   enqThread(i);
350
}
351
 
352
void
353
THREADmp_manager::THREADmp_newproc()
354
{
355
   procs=(THREADprocessor **) realloc((void *) procs,++_num_procs*
356
       sizeof(THREADprocessor *));
357
   THREADmp_initproc(_num_procs-1);
358
}
359
 
360
THREADmp_manager::THREADmp_manager(int _procs)
361
: THREAD_mp_manager(_procs)
362
{
363
   cerr << "Creating mp manager\n";
364
 
365
   if (_procs == 0) {
366
      _num_procs = num_procs();
367
   } else {
368
      _num_procs = _procs;
369
   }
370
 
371
   cerr << "Num procs: " << num_procs() << endl << flush;
372
   procs = (THREADprocessor **)malloc(sizeof(THREADprocessor *) *_num_procs);
373
 
374
   for (int i = 0; i < _num_procs; i++) {
375
      THREADmp_initproc(i);
376
   }
377
   cerr << "Done\n";
378
}
379
 
380
void
381
THREADmp_manager::enqThread(int threadNumber)
382
{
383
        queue.enQueue(threadNumber);
384
}
385
 
386
int
387
THREADmp_manager::deqThread()
388
{
389
        int itemOk;
390
        int retVal=queue.deQueue(itemOk);
391
 
392
        if (!itemOk) {
393
                // This better allocate a thread, or we're not halting
394
                THREADmp_newproc();
395
                return deqThread();
396
        } else return retVal;
397
}
398
 
399
THREADmp_manager::~THREADmp_manager()
400
{
401
   for (int i = 0; i < _num_procs; i++)
402
      delete procs[i];
403
   free(procs);
404
}