User Manual first draft.
[Faustine.git] / interpreter / preprocessor / faust-0.9.47mr3 / architecture / scheduler.h
1
2 #include <stdlib.h>
3 #include <assert.h>
4 #include <pthread.h>
5 #include <stdio.h>
6 #include <errno.h>
7 #include <string.h>
8 #include <semaphore.h>
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <errno.h>
12 #include <fcntl.h>
13 #include <unistd.h>
14 #include <math.h>
15
16 using namespace std;
17
18 // Globals
19
20 #define THREAD_SIZE 64
21 #define QUEUE_SIZE 4096
22
23 #define WORK_STEALING_INDEX 0
24 #define LAST_TASK_INDEX 1
25
26
27 #ifdef __ICC
28 #define INLINE __forceinline
29 #else
30 #define INLINE inline
31 #endif
32
33
34 // On Intel set FZ (Flush to Zero) and DAZ (Denormals Are Zero)
35 // flags to avoid costly denormals
36 #ifdef __SSE__
37 #include <xmmintrin.h>
38 #ifdef __SSE2__
39 #define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8040)
40 #else
41 #define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8000)
42 #endif
43 #else
44 #define AVOIDDENORMALS
45 #endif
46
47 #ifdef __linux__
48
49 // handle 32/64 bits int size issues
50 #ifdef __x86_64__
51 #define UInt32 unsigned int
52 #define UInt64 unsigned long int
53 #else
54 #define UInt32 unsigned int
55 #define UInt64 unsigned long long int
56 #endif
57
58 #endif
59
60 #ifdef __APPLE__
61 #include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
62 #endif
63
64 class TaskQueue;
65 struct DSPThreadPool;
66
67 extern TaskQueue* gTaskQueueList[THREAD_SIZE];
68 extern DSPThreadPool* gThreadPool;
69 extern int gClientCount;
70 extern UInt64 gMaxStealing;
71
72 void Yield();
73
74 /**
75 * Returns the number of clock cycles elapsed since the last reset
76 * of the processor
77 */
78 static INLINE UInt64 DSP_rdtsc(void)
79 {
80 union {
81 UInt32 i32[2];
82 UInt64 i64;
83 } count;
84
85 __asm__ __volatile__("rdtsc" : "=a" (count.i32[0]), "=d" (count.i32[1]));
86 return count.i64;
87 }
88
89 #if defined(__i386__) || defined(__x86_64__)
90
91 #define LOCK "lock ; "
92
93 static INLINE void NOP(void)
94 {
95 __asm__ __volatile__("nop \n\t");
96 }
97
98 static INLINE char CAS1(volatile void* addr, volatile int value, int newvalue)
99 {
100 register char ret;
101 __asm__ __volatile__ (
102 "# CAS \n\t"
103 LOCK "cmpxchg %2, (%1) \n\t"
104 "sete %0 \n\t"
105 : "=a" (ret)
106 : "c" (addr), "d" (newvalue), "a" (value)
107 : "memory"
108 );
109 return ret;
110 }
111
112 static INLINE int atomic_xadd(volatile int* atomic, int val)
113 {
114 register int result;
115 __asm__ __volatile__ ("# atomic_xadd \n\t"
116 LOCK "xaddl %0,%1 \n\t"
117 : "=r" (result), "=m" (*atomic)
118 : "0" (val), "m" (*atomic));
119 return result;
120 }
121
122 #endif
123
124
125 /*
126 static INLINE int INC_ATOMIC(volatile int* val)
127 {
128 int actual;
129 do {
130 actual = *val;
131 } while (!CAS1(val, actual, actual + 1));
132 return actual;
133 }
134
135 static INLINE int DEC_ATOMIC(volatile int* val)
136 {
137 int actual;
138 do {
139 actual = *val;
140 } while (!CAS1(val, actual, actual - 1));
141 return actual;
142 }
143 */
144
145 static INLINE int INC_ATOMIC(volatile int* val)
146 {
147 return atomic_xadd(val, 1);
148 }
149
150 static INLINE int DEC_ATOMIC(volatile int* val)
151 {
152 return atomic_xadd(val, -1);
153 }
154
155 // To be used in lock-free queue
156 struct AtomicCounter
157 {
158 union {
159 struct {
160 short fHead;
161 short fTail;
162 }
163 scounter;
164 int fValue;
165 }info;
166
167 INLINE AtomicCounter()
168 {
169 info.fValue = 0;
170 }
171
172 INLINE AtomicCounter& operator=(AtomicCounter& obj)
173 {
174 info.fValue = obj.info.fValue;
175 return *this;
176 }
177
178 INLINE AtomicCounter& operator=(volatile AtomicCounter& obj)
179 {
180 info.fValue = obj.info.fValue;
181 return *this;
182 }
183
184 };
185
186 int get_max_cpu()
187 {
188 return sysconf(_SC_NPROCESSORS_ONLN);
189 }
190
191 static int GetPID()
192 {
193 #ifdef WIN32
194 return _getpid();
195 #else
196 return getpid();
197 #endif
198 }
199
200 #define Value(e) (e).info.fValue
201
202 #define Head(e) (e).info.scounter.fHead
203 #define IncHead(e) (e).info.scounter.fHead++
204 #define DecHead(e) (e).info.scounter.fHead--
205
206 #define Tail(e) (e).info.scounter.fTail
207 #define IncTail(e) (e).info.scounter.fTail++
208 #define DecTail(e) (e).info.scounter.fTail--
209
210 #define MASTER_THREAD 0
211
212 #define MAX_STEAL_DUR 50 // in usec
213 #define DEFAULT_CLOCKSPERSEC 2500000000 // in cycles (2,5 Ghz)
214
215 class TaskQueue
216 {
217 private:
218
219 int fTaskList[QUEUE_SIZE];
220 volatile AtomicCounter fCounter;
221 UInt64 fStealingStart;
222
223 public:
224
225 INLINE TaskQueue(int cur_thread)
226 {
227 for (int i = 0; i < QUEUE_SIZE; i++) {
228 fTaskList[i] = -1;
229 }
230 gTaskQueueList[cur_thread] = this;
231 fStealingStart = 0;
232 }
233
234 INLINE void PushHead(int item)
235 {
236 fTaskList[Head(fCounter)] = item;
237 IncHead(fCounter);
238 }
239
240 INLINE int PopHead()
241 {
242 AtomicCounter old_val;
243 AtomicCounter new_val;
244
245 do {
246 old_val = fCounter;
247 new_val = old_val;
248 if (Head(old_val) == Tail(old_val)) {
249 return WORK_STEALING_INDEX;
250 } else {
251 DecHead(new_val);
252 }
253 } while (!CAS1(&fCounter, Value(old_val), Value(new_val)));
254
255 return fTaskList[Head(old_val) - 1];
256 }
257
258 INLINE int PopTail()
259 {
260 AtomicCounter old_val;
261 AtomicCounter new_val;
262
263 do {
264 old_val = fCounter;
265 new_val = old_val;
266 if (Head(old_val) == Tail(old_val)) {
267 return WORK_STEALING_INDEX;
268 } else {
269 IncTail(new_val);
270 }
271 } while (!CAS1(&fCounter, Value(old_val), Value(new_val)));
272
273 return fTaskList[Tail(old_val)];
274 }
275
276 INLINE void MeasureStealingDur()
277 {
278 // Takes first timetamp
279 if (fStealingStart == 0) {
280 fStealingStart = DSP_rdtsc();
281 } else if ((DSP_rdtsc() - fStealingStart) > gMaxStealing) {
282 Yield();
283 }
284 }
285
286 INLINE void ResetStealingDur()
287 {
288 fStealingStart = 0;
289 }
290
291 static INLINE int GetNextTask(int thread, int num_threads)
292 {
293 int tasknum;
294 for (int i = 0; i < num_threads; i++) {
295 if ((i != thread) && gTaskQueueList[i] && (tasknum = gTaskQueueList[i]->PopTail()) != WORK_STEALING_INDEX) {
296 #ifdef __linux__
297 //if (thread != MASTER_THREAD)
298 gTaskQueueList[thread]->ResetStealingDur();
299 #endif
300 return tasknum; // Task is found
301 }
302 }
303 NOP();
304 #ifdef __linux__
305 //if (thread != MASTER_THREAD)
306 gTaskQueueList[thread]->MeasureStealingDur();
307 #endif
308 return WORK_STEALING_INDEX; // Otherwise will try "workstealing" again next cycle...
309 }
310
311 INLINE void InitTaskList(int task_list_size, int* task_list, int thread_num, int cur_thread, int& tasknum)
312 {
313 int task_slice = task_list_size / thread_num;
314 int task_slice_rest = task_list_size % thread_num;
315
316 if (task_slice == 0) {
317 // Each thread directly executes one task
318 tasknum = task_list[cur_thread];
319 // Thread 0 takes remaining ready tasks
320 if (cur_thread == 0) {
321 for (int index = 0; index < task_slice_rest - thread_num; index++) {
322 PushHead(task_list[task_slice_rest + index]);
323 }
324 }
325 } else {
326 // Each thread takes a part of ready tasks
327 int index;
328 for (index = 0; index < task_slice - 1; index++) {
329 PushHead(task_list[cur_thread * task_slice + index]);
330 }
331 // Each thread directly executes one task
332 tasknum = task_list[cur_thread * task_slice + index];
333 // Thread 0 takes remaining ready tasks
334 if (cur_thread == 0) {
335 for (index = 0; index < task_slice_rest; index++) {
336 PushHead(task_list[thread_num * task_slice + index]);
337 }
338 }
339 }
340 }
341
342 static INLINE void Init()
343 {
344 for (int i = 0; i < THREAD_SIZE; i++) {
345 gTaskQueueList[i] = 0;
346 }
347 }
348
349 };
350
351 struct TaskGraph
352 {
353 volatile int gTaskList[QUEUE_SIZE];
354
355 TaskGraph()
356 {
357 for (int i = 0; i < QUEUE_SIZE; i++) {
358 gTaskList[i] = 0;
359 }
360 }
361
362 INLINE void InitTask(int task, int val)
363 {
364 gTaskList[task] = val;
365 }
366
367 void Display()
368 {
369 for (int i = 0; i < QUEUE_SIZE; i++) {
370 printf("Task = %d activation = %d\n", i, gTaskList[i]);
371 }
372 }
373
374 INLINE void ActivateOutputTask(TaskQueue& queue, int task, int& tasknum)
375 {
376 if (DEC_ATOMIC(&gTaskList[task]) == 1) {
377 if (tasknum == WORK_STEALING_INDEX) {
378 tasknum = task;
379 } else {
380 queue.PushHead(task);
381 }
382 }
383 }
384
385 INLINE void ActivateOutputTask(TaskQueue& queue, int task)
386 {
387 if (DEC_ATOMIC(&gTaskList[task]) == 1) {
388 queue.PushHead(task);
389 }
390 }
391
392 INLINE void ActivateOneOutputTask(TaskQueue& queue, int task, int& tasknum)
393 {
394 if (DEC_ATOMIC(&gTaskList[task]) == 1) {
395 tasknum = task;
396 } else {
397 tasknum = queue.PopHead();
398 }
399 }
400
401 INLINE void GetReadyTask(TaskQueue& queue, int& tasknum)
402 {
403 if (tasknum == WORK_STEALING_INDEX) {
404 tasknum = queue.PopHead();
405 }
406 }
407
408 };
409
410
411 #define THREAD_POOL_SIZE 16
412 #define JACK_SCHED_POLICY SCHED_FIFO
413
414 /* use 512KB stack per thread - the default is way too high to be feasible
415 * with mlockall() on many systems */
416 #define THREAD_STACK 524288
417
418
419 #ifdef __APPLE__
420
421 #include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
422 #include <mach/thread_policy.h>
423 #include <mach/thread_act.h>
424
425 #define THREAD_SET_PRIORITY 0
426 #define THREAD_SCHEDULED_PRIORITY 1
427
428 static UInt32 GetThreadPriority(pthread_t thread, int inWhichPriority);
429
430 // returns the thread's priority as it was last set by the API
431 static UInt32 GetThreadSetPriority(pthread_t thread)
432 {
433 return GetThreadPriority(thread, THREAD_SET_PRIORITY);
434 }
435
436 // returns the thread's priority as it was last scheduled by the Kernel
437 static UInt32 GetThreadScheduledPriority(pthread_t thread)
438 {
439 return GetThreadPriority(thread, THREAD_SCHEDULED_PRIORITY);
440 }
441
442 static int SetThreadToPriority(pthread_t thread, UInt32 inPriority, Boolean inIsFixed, UInt64 period, UInt64 computation, UInt64 constraint)
443 {
444 if (inPriority == 96) {
445 // REAL-TIME / TIME-CONSTRAINT THREAD
446 thread_time_constraint_policy_data_t theTCPolicy;
447 theTCPolicy.period = period;
448 theTCPolicy.computation = computation;
449 theTCPolicy.constraint = constraint;
450 theTCPolicy.preemptible = true;
451 kern_return_t res = thread_policy_set(pthread_mach_thread_np(thread), THREAD_TIME_CONSTRAINT_POLICY, (thread_policy_t)&theTCPolicy, THREAD_TIME_CONSTRAINT_POLICY_COUNT);
452 return (res == KERN_SUCCESS) ? 0 : -1;
453 } else {
454 // OTHER THREADS
455 thread_extended_policy_data_t theFixedPolicy;
456 thread_precedence_policy_data_t thePrecedencePolicy;
457 SInt32 relativePriority;
458
459 // [1] SET FIXED / NOT FIXED
460 theFixedPolicy.timeshare = !inIsFixed;
461 thread_policy_set(pthread_mach_thread_np(thread), THREAD_EXTENDED_POLICY, (thread_policy_t)&theFixedPolicy, THREAD_EXTENDED_POLICY_COUNT);
462
463 // [2] SET PRECEDENCE
464 // N.B.: We expect that if thread A created thread B, and the program wishes to change
465 // the priority of thread B, then the call to change the priority of thread B must be
466 // made by thread A.
467 // This assumption allows us to use pthread_self() to correctly calculate the priority
468 // of the feeder thread (since precedency policy's importance is relative to the
469 // spawning thread's priority.)
470 relativePriority = inPriority - GetThreadSetPriority(pthread_self());
471
472 thePrecedencePolicy.importance = relativePriority;
473 kern_return_t res = thread_policy_set(pthread_mach_thread_np(thread), THREAD_PRECEDENCE_POLICY, (thread_policy_t)&thePrecedencePolicy, THREAD_PRECEDENCE_POLICY_COUNT);
474 return (res == KERN_SUCCESS) ? 0 : -1;
475 }
476 }
477
478 static UInt32 GetThreadPriority(pthread_t thread, int inWhichPriority)
479 {
480 thread_basic_info_data_t threadInfo;
481 policy_info_data_t thePolicyInfo;
482 unsigned int count;
483
484 // get basic info
485 count = THREAD_BASIC_INFO_COUNT;
486 thread_info(pthread_mach_thread_np(thread), THREAD_BASIC_INFO, (thread_info_t)&threadInfo, &count);
487
488 switch (threadInfo.policy) {
489 case POLICY_TIMESHARE:
490 count = POLICY_TIMESHARE_INFO_COUNT;
491 thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_TIMESHARE_INFO, (thread_info_t)&(thePolicyInfo.ts), &count);
492 if (inWhichPriority == THREAD_SCHEDULED_PRIORITY) {
493 return thePolicyInfo.ts.cur_priority;
494 } else {
495 return thePolicyInfo.ts.base_priority;
496 }
497 break;
498
499 case POLICY_FIFO:
500 count = POLICY_FIFO_INFO_COUNT;
501 thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_FIFO_INFO, (thread_info_t)&(thePolicyInfo.fifo), &count);
502 if ((thePolicyInfo.fifo.depressed) && (inWhichPriority == THREAD_SCHEDULED_PRIORITY)) {
503 return thePolicyInfo.fifo.depress_priority;
504 }
505 return thePolicyInfo.fifo.base_priority;
506 break;
507
508 case POLICY_RR:
509 count = POLICY_RR_INFO_COUNT;
510 thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_RR_INFO, (thread_info_t)&(thePolicyInfo.rr), &count);
511 if ((thePolicyInfo.rr.depressed) && (inWhichPriority == THREAD_SCHEDULED_PRIORITY)) {
512 return thePolicyInfo.rr.depress_priority;
513 }
514 return thePolicyInfo.rr.base_priority;
515 break;
516 }
517
518 return 0;
519 }
520
521 static int GetParams(pthread_t thread, UInt64* period, UInt64* computation, UInt64* constraint)
522 {
523 thread_time_constraint_policy_data_t theTCPolicy;
524 mach_msg_type_number_t count = THREAD_TIME_CONSTRAINT_POLICY_COUNT;
525 boolean_t get_default = false;
526
527 kern_return_t res = thread_policy_get(pthread_mach_thread_np(thread),
528 THREAD_TIME_CONSTRAINT_POLICY,
529 (thread_policy_t)&theTCPolicy,
530 &count,
531 &get_default);
532 if (res == KERN_SUCCESS) {
533 *period = theTCPolicy.period;
534 *computation = theTCPolicy.computation;
535 *constraint = theTCPolicy.constraint;
536 return 0;
537 } else {
538 return -1;
539 }
540 }
541
542 static UInt64 period = 0;
543 static UInt64 computation = 0;
544 static UInt64 constraint = 0;
545
546 INLINE void GetRealTime()
547 {
548 if (period == 0) {
549 GetParams(pthread_self(), &period, &computation, &constraint);
550 }
551 }
552
553 INLINE void SetRealTime()
554 {
555 SetThreadToPriority(pthread_self(), 96, true, period, computation, constraint);
556 }
557
558 void CancelThread(pthread_t fThread)
559 {
560 mach_port_t machThread = pthread_mach_thread_np(fThread);
561 thread_terminate(machThread);
562 }
563
564 INLINE void Yield()
565 {
566 //sched_yield();
567 }
568
569 #endif
570
571 #ifdef __linux__
572
573 static int faust_sched_policy = -1;
574 static struct sched_param faust_rt_param;
575
576 INLINE void GetRealTime()
577 {
578 if (faust_sched_policy == -1) {
579 memset(&faust_rt_param, 0, sizeof(faust_rt_param));
580 pthread_getschedparam(pthread_self(), &faust_sched_policy, &faust_rt_param);
581 }
582 }
583
584 INLINE void SetRealTime()
585 {
586 faust_rt_param.sched_priority--;
587 pthread_setschedparam(pthread_self(), faust_sched_policy, &faust_rt_param);
588 }
589
590 void CancelThread(pthread_t fThread)
591 {
592 pthread_cancel(fThread);
593 pthread_join(fThread, NULL);
594 }
595
596 INLINE void Yield()
597 {
598 pthread_yield();
599 }
600
601
602 #endif
603
604 #define KDSPMESURE 50
605
606 static INLINE int Range(int min, int max, int val)
607 {
608 if (val < min) {
609 return min;
610 } else if (val > max) {
611 return max;
612 } else {
613 return val;
614 }
615 }
616
617 struct Runnable {
618
619 UInt64 fTiming[KDSPMESURE];
620 UInt64 fStart;
621 UInt64 fStop;
622 int fCounter;
623 float fOldMean;
624 int fOldfDynamicNumThreads;
625 bool fDynAdapt;
626
627 virtual void computeThread(int cur_thread) = 0;
628
629 Runnable():fCounter(0), fOldMean(1000000000.f), fOldfDynamicNumThreads(1)
630 {
631 memset(fTiming, 0, sizeof(long long int ) * KDSPMESURE);
632 fDynAdapt = getenv("OMP_DYN_THREAD") ? strtol(getenv("OMP_DYN_THREAD"), NULL, 10) : false;
633 }
634
635 INLINE float ComputeMean()
636 {
637 float mean = 0;
638 for (int i = 0; i < KDSPMESURE; i++) {
639 mean += float(fTiming[i]);
640 }
641 mean /= float(KDSPMESURE);
642 return mean;
643 }
644
645 INLINE void StartMeasure()
646 {
647 if (!fDynAdapt)
648 return;
649
650 fStart = DSP_rdtsc();
651 }
652
653 INLINE void StopMeasure(int staticthreadnum, int& dynthreadnum)
654 {
655 if (!fDynAdapt)
656 return;
657
658 fStop = DSP_rdtsc();
659 fCounter = (fCounter + 1) % KDSPMESURE;
660 if (fCounter == 0) {
661 float mean = ComputeMean();
662 if (fabs(mean - fOldMean) > 5000) {
663 if (mean > fOldMean) { // Worse...
664 //printf("Worse %f %f\n", mean, fOldMean);
665 if (fOldfDynamicNumThreads > dynthreadnum) {
666 fOldfDynamicNumThreads = dynthreadnum;
667 dynthreadnum += 1;
668 } else {
669 fOldfDynamicNumThreads = dynthreadnum;
670 dynthreadnum -= 1;
671 }
672 } else { // Better...
673 //printf("Better %f %f\n", mean, fOldMean);
674 if (fOldfDynamicNumThreads > dynthreadnum) {
675 fOldfDynamicNumThreads = dynthreadnum;
676 dynthreadnum -= 1;
677 } else {
678 fOldfDynamicNumThreads = dynthreadnum;
679 dynthreadnum += 1;
680 }
681 }
682 fOldMean = mean;
683 dynthreadnum = Range(1, staticthreadnum, dynthreadnum);
684 //printf("dynthreadnum %d\n", dynthreadnum);
685 }
686 }
687 fTiming[fCounter] = fStop - fStart;
688 }
689 };
690
691 struct DSPThread;
692
693 struct DSPThreadPool {
694
695 DSPThread* fThreadPool[THREAD_POOL_SIZE];
696 int fThreadCount;
697 volatile int fCurThreadCount;
698
699 DSPThreadPool();
700 ~DSPThreadPool();
701
702 void StartAll(int num, bool realtime);
703 void StopAll();
704 void SignalAll(int num, Runnable* runnable);
705
706 void SignalOne();
707 bool IsFinished();
708
709 static DSPThreadPool* Init();
710 static void Destroy();
711
712 };
713
714 struct DSPThread {
715
716 pthread_t fThread;
717 DSPThreadPool* fThreadPool;
718 Runnable* fRunnable;
719 sem_t* fSemaphore;
720 char fName[128];
721 bool fRealTime;
722 int fNum;
723
724 DSPThread(int num, DSPThreadPool* pool)
725 {
726 fNum = num;
727 fThreadPool = pool;
728 fRunnable = NULL;
729 fRealTime = false;
730
731 sprintf(fName, "faust_sem_%d_%p", GetPID(), this);
732
733 if ((fSemaphore = sem_open(fName, O_CREAT, 0777, 0)) == (sem_t*)SEM_FAILED) {
734 printf("Allocate: can't check in named semaphore name = %s err = %s", fName, strerror(errno));
735 }
736 }
737
738 virtual ~DSPThread()
739 {
740 sem_unlink(fName);
741 sem_close(fSemaphore);
742 }
743
744 void Run()
745 {
746 while (sem_wait(fSemaphore) != 0) {}
747 fRunnable->computeThread(fNum + 1);
748 fThreadPool->SignalOne();
749 }
750
751 static void* ThreadHandler(void* arg)
752 {
753 DSPThread* thread = static_cast<DSPThread*>(arg);
754
755 AVOIDDENORMALS;
756
757 // One "dummy" cycle to setup thread
758 if (thread->fRealTime) {
759 thread->Run();
760 SetRealTime();
761 }
762
763 while (true) {
764 thread->Run();
765 }
766
767 return NULL;
768 }
769
770 int Start(bool realtime)
771 {
772 pthread_attr_t attributes;
773 struct sched_param rt_param;
774 pthread_attr_init(&attributes);
775
776 int priority = 60; // TODO
777 int res;
778
779 if (realtime) {
780 fRealTime = true;
781 }else {
782 fRealTime = getenv("OMP_REALTIME") ? strtol(getenv("OMP_REALTIME"), NULL, 10) : true;
783 }
784
785 if ((res = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE))) {
786 printf("Cannot request joinable thread creation for real-time thread res = %d err = %s\n", res, strerror(errno));
787 return -1;
788 }
789
790 if ((res = pthread_attr_setscope(&attributes, PTHREAD_SCOPE_SYSTEM))) {
791 printf("Cannot set scheduling scope for real-time thread res = %d err = %s\n", res, strerror(errno));
792 return -1;
793 }
794
795 if (realtime) {
796
797 if ((res = pthread_attr_setinheritsched(&attributes, PTHREAD_EXPLICIT_SCHED))) {
798 printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res, strerror(errno));
799 return -1;
800 }
801
802 if ((res = pthread_attr_setschedpolicy(&attributes, JACK_SCHED_POLICY))) {
803 printf("Cannot set RR scheduling class for RT thread res = %d err = %s\n", res, strerror(errno));
804 return -1;
805 }
806
807 memset(&rt_param, 0, sizeof(rt_param));
808 rt_param.sched_priority = priority;
809
810 if ((res = pthread_attr_setschedparam(&attributes, &rt_param))) {
811 printf("Cannot set scheduling priority for RT thread res = %d err = %s\n", res, strerror(errno));
812 return -1;
813 }
814
815 } else {
816
817 if ((res = pthread_attr_setinheritsched(&attributes, PTHREAD_INHERIT_SCHED))) {
818 printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res, strerror(errno));
819 return -1;
820 }
821 }
822
823 if ((res = pthread_attr_setstacksize(&attributes, THREAD_STACK))) {
824 printf("Cannot set thread stack size res = %d err = %s\n", res, strerror(errno));
825 return -1;
826 }
827
828 if ((res = pthread_create(&fThread, &attributes, ThreadHandler, this))) {
829 printf("Cannot create thread res = %d err = %s\n", res, strerror(errno));
830 return -1;
831 }
832
833 pthread_attr_destroy(&attributes);
834 return 0;
835 }
836
837 void Signal(bool stop, Runnable* runnable)
838 {
839 fRunnable = runnable;
840 sem_post(fSemaphore);
841 }
842
843 void Stop()
844 {
845 CancelThread(fThread);
846 }
847
848 };
849
850 DSPThreadPool::DSPThreadPool()
851 {
852 for (int i = 0; i < THREAD_POOL_SIZE; i++) {
853 fThreadPool[i] = NULL;
854 }
855 fThreadCount = 0;
856 fCurThreadCount = 0;
857 }
858
859 DSPThreadPool::~DSPThreadPool()
860 {
861 StopAll();
862
863 for (int i = 0; i < fThreadCount; i++) {
864 delete(fThreadPool[i]);
865 fThreadPool[i] = NULL;
866 }
867
868 fThreadCount = 0;
869 }
870
871 void DSPThreadPool::StartAll(int num, bool realtime)
872 {
873 if (fThreadCount == 0) { // Protection for multiple call... (like LADSPA plug-ins in Ardour)
874 for (int i = 0; i < num; i++) {
875 fThreadPool[i] = new DSPThread(i, this);
876 fThreadPool[i]->Start(realtime);
877 fThreadCount++;
878 }
879 }
880 }
881
882 void DSPThreadPool::StopAll()
883 {
884 for (int i = 0; i < fThreadCount; i++) {
885 fThreadPool[i]->Stop();
886 }
887 }
888
889 void DSPThreadPool::SignalAll(int num, Runnable* runnable)
890 {
891 fCurThreadCount = num;
892
893 for (int i = 0; i < num; i++) { // Important : use local num here...
894 fThreadPool[i]->Signal(false, runnable);
895 }
896 }
897
898 void DSPThreadPool::SignalOne()
899 {
900 DEC_ATOMIC(&fCurThreadCount);
901 }
902
903 bool DSPThreadPool::IsFinished()
904 {
905 return (fCurThreadCount == 0);
906 }
907
908 DSPThreadPool* DSPThreadPool::Init()
909 {
910 if (gClientCount++ == 0 && !gThreadPool) {
911 gThreadPool = new DSPThreadPool();
912 }
913 return gThreadPool;
914 }
915
916 void DSPThreadPool::Destroy()
917 {
918 if (--gClientCount == 0 && gThreadPool) {
919 delete gThreadPool;
920 gThreadPool = NULL;
921 }
922 }
923
924 #ifndef PLUG_IN
925
926 // Globals
927 TaskQueue* gTaskQueueList[THREAD_SIZE] = {0};
928
929 DSPThreadPool* gThreadPool = 0;
930 int gClientCount = 0;
931
932 int clock_per_microsec = (getenv("CLOCKSPERSEC")
933 ? strtoll(getenv("CLOCKSPERSEC"), NULL, 10)
934 : DEFAULT_CLOCKSPERSEC) / 1000000;
935
936 UInt64 gMaxStealing = getenv("OMP_STEALING_DUR")
937 ? strtoll(getenv("OMP_STEALING_DUR"), NULL, 10) * clock_per_microsec
938 : MAX_STEAL_DUR * clock_per_microsec;
939
940 #endif
941