StarPU Internal Handbook
workers.h
1 /* StarPU --- Runtime system for heterogeneous multicore architectures.
2  *
3  * Copyright (C) 2011-2017 Inria
4  * Copyright (C) 2008-2019 Université de Bordeaux
5  * Copyright (C) 2010-2019 CNRS
6  * Copyright (C) 2013 Thibaut Lambert
7  * Copyright (C) 2016 Uppsala University
8  *
9  * StarPU is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU Lesser General Public License as published by
11  * the Free Software Foundation; either version 2.1 of the License, or (at
12  * your option) any later version.
13  *
14  * StarPU is distributed in the hope that it will be useful, but
15  * WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
17  *
18  * See the GNU Lesser General Public License in COPYING.LGPL for more details.
19  */
20 
21 #ifndef __WORKERS_H__
22 #define __WORKERS_H__
23 
25 /* @{ */
26 
27 #include <limits.h>
28 
29 #include <starpu.h>
30 #include <common/config.h>
31 #include <common/timing.h>
32 #include <common/fxt.h>
33 #include <common/thread.h>
34 #include <common/utils.h>
35 #include <core/jobs.h>
36 #include <core/perfmodel/perfmodel.h>
37 #include <core/sched_policy.h>
38 #include <core/topology.h>
39 #include <core/errorcheck.h>
40 #include <core/sched_ctx.h>
41 #include <core/sched_ctx_list.h>
42 #include <core/simgrid.h>
43 #ifdef STARPU_HAVE_HWLOC
44 #include <hwloc.h>
45 #endif
46 
47 #include <core/drivers.h>
48 #include <drivers/cuda/driver_cuda.h>
49 #include <drivers/opencl/driver_opencl.h>
50 
51 #ifdef STARPU_USE_MIC
52 #include <drivers/mic/driver_mic_source.h>
53 #endif /* STARPU_USE_MIC */
54 
55 #ifdef STARPU_USE_MPI_MASTER_SLAVE
56 #include <drivers/mpi/driver_mpi_source.h>
57 #endif
58 
59 #include <drivers/cpu/driver_cpu.h>
60 
61 #include <datawizard/datawizard.h>
62 
63 #include <starpu_parameters.h>
64 
65 #define STARPU_MAX_PIPELINE 4
66 
67 enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
68 
69 struct _starpu_ctx_change_list;
70 
72 LIST_TYPE(_starpu_worker,
73  struct _starpu_machine_config *config;
74  starpu_pthread_mutex_t mutex;
75  enum starpu_worker_archtype arch;
76  uint32_t worker_mask;
77  struct starpu_perfmodel_arch perf_arch;
78  starpu_pthread_t worker_thread;
79  unsigned devid;
80  unsigned subworkerid;
81  int bindid;
82  int workerid;
86  starpu_pthread_cond_t started_cond;
87  starpu_pthread_cond_t ready_cond;
88  unsigned memory_node;
89  unsigned numa_memory_node;
94  starpu_pthread_cond_t sched_cond;
95  starpu_pthread_mutex_t sched_mutex;
96  unsigned state_relax_refcnt;
97 #ifdef STARPU_SPINLOCK_CHECK
98  const char *relax_on_file;
99  int relax_on_line;
100  const char *relax_on_func;
101  const char *relax_off_file;
102  int relax_off_line;
103  const char *relax_off_func;
104 #endif
122  starpu_pthread_t thread_changing_ctx;
130  struct _starpu_ctx_change_list ctx_change_list;
131  struct starpu_task_list local_tasks;
132  struct starpu_task **local_ordered_tasks;
136  struct starpu_task *current_task;
137  struct starpu_task *current_tasks[STARPU_MAX_PIPELINE];
138 #ifdef STARPU_SIMGRID
139  starpu_pthread_wait_t wait;
140 #endif
141 
142  struct timespec cl_start;
143  struct timespec cl_end;
144  unsigned char first_task;
145  unsigned char ntasks;
146  unsigned char pipeline_length;
147  unsigned char pipeline_stuck;
149  unsigned worker_is_running;
150  unsigned worker_is_initialized;
151  enum _starpu_worker_status status;
152  unsigned state_keep_awake;
153  char name[128];
154  char short_name[32];
155  unsigned run_by_starpu;
156  struct _starpu_driver_ops *driver_ops;
157 
158  struct _starpu_sched_ctx_list *sched_ctx_list;
159  int tmp_sched_ctx;
160  unsigned nsched_ctxs;
161  struct _starpu_barrier_counter tasks_barrier;
163  unsigned has_prev_init;
165  unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS+1];
166 
167  unsigned spinning_backoff ;
171  struct starpu_task *task_transferring;
177  unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS+1];
178 
179  unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS+1];
185  unsigned reverse_phase[2];
186 
187  unsigned pop_ctx_priority;
190  struct _starpu_sched_ctx *stream_ctx;
191 
192 #ifdef __GLIBC__
193  cpu_set_t cpu_set;
194 #endif /* __GLIBC__ */
195 #ifdef STARPU_HAVE_HWLOC
196  hwloc_bitmap_t hwloc_cpu_set;
197  hwloc_obj_t hwloc_obj;
198 #endif
199 );
200 
202 {
203  struct starpu_perfmodel_arch perf_arch;
204  uint32_t worker_mask;
205  int worker_size;
206  unsigned memory_node;
207  int combined_workerid[STARPU_NMAXWORKERS];
208 #ifdef STARPU_USE_MP
209  int count;
210  starpu_pthread_mutex_t count_mutex;
211 #endif
212 
213 #ifdef __GLIBC__
214  cpu_set_t cpu_set;
215 #endif /* __GLIBC__ */
216 #ifdef STARPU_HAVE_HWLOC
217  hwloc_bitmap_t hwloc_cpu_set;
218 #endif
219 };
220 
226 {
227  starpu_pthread_mutex_t mutex;
228  starpu_pthread_t worker_thread;
229  unsigned nworkers;
230  unsigned started;
231  void *retval;
232  struct _starpu_worker *workers;
233  starpu_pthread_cond_t ready_cond;
234  unsigned set_is_initialized;
235 };
236 
237 #ifdef STARPU_USE_MPI_MASTER_SLAVE
238 extern struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
239 #endif
240 
242 {
244  unsigned nworkers;
245 
248 
249  unsigned nsched_ctxs;
250 
251 #ifdef STARPU_HAVE_HWLOC
252 
253  hwloc_topology_t hwtopology;
254 #endif
255 
256  struct starpu_tree *tree;
257 
261  unsigned nhwcpus;
262 
266  unsigned nhwpus;
267 
271  unsigned nhwcudagpus;
272 
276  unsigned nhwopenclgpus;
277 
281  unsigned nhwmpi;
282 
284  unsigned ncpus;
285 
287  unsigned ncudagpus;
288  unsigned nworkerpercuda;
289  int cuda_th_per_stream;
290  int cuda_th_per_dev;
291 
293  unsigned nopenclgpus;
294 
296  unsigned nmpidevices;
297  unsigned nhwmpidevices;
298 
299  unsigned nhwmpicores[STARPU_MAXMPIDEVS];
300  unsigned nmpicores[STARPU_MAXMPIDEVS];
301 
304  unsigned nhwmicdevices;
305  unsigned nmicdevices;
306 
307  unsigned nhwmiccores[STARPU_MAXMICDEVS];
308  unsigned nmiccores[STARPU_MAXMICDEVS];
309 
317  unsigned workers_bindid[STARPU_NMAXWORKERS];
318 
325  unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS];
326 
333  unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS];
334 
335  /*** Indicates the successive MIC devices that should be used
336  * by the MIC driver. It is either filled according to the
337  * user's explicit parameters (from starpu_conf) or according
338  * to the STARPU_WORKERS_MICID env. variable. Otherwise, they
339  * are taken in ID order. */
343  unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS];
344 };
345 
347 {
348  struct _starpu_machine_topology topology;
349 
350 #ifdef STARPU_HAVE_HWLOC
351  int cpu_depth;
352  int pu_depth;
353 #endif
354 
357  char currently_bound[STARPU_NMAXWORKERS];
358  char currently_shared[STARPU_NMAXWORKERS];
359 
362 
365 
368 
371 
382 
385  struct _starpu_worker workers[STARPU_NMAXWORKERS];
386 
389  struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS];
390 
392  struct
393  {
394  int *workerids;
395  unsigned nworkers;
396  } *bindid_workers;
397  unsigned nbindid;
402  uint32_t worker_mask;
403 
405  struct starpu_conf conf;
406 
408  unsigned running;
409 
410  int disable_kernels;
411 
415 
417  struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1];
418 
420  unsigned submitting;
421 
422  int watchdog_ok;
423 
424  starpu_pthread_mutex_t submitted_mutex;
425 };
426 
427 extern int _starpu_worker_parallel_blocks;
428 
429 extern struct _starpu_machine_config _starpu_config STARPU_ATTRIBUTE_INTERNAL;
430 extern int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
431 extern starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
432 extern starpu_pthread_key_t _starpu_worker_set_key STARPU_ATTRIBUTE_INTERNAL;
433 
435 void _starpu_set_argc_argv(int *argc, char ***argv);
436 int *_starpu_get_argc();
437 char ***_starpu_get_argv();
438 
440 void _starpu_conf_check_environment(struct starpu_conf *conf);
441 
443 void _starpu_may_pause(void);
444 
446 static inline unsigned _starpu_machine_is_running(void)
447 {
448  unsigned ret;
449  /* running is just protected by a memory barrier */
450  STARPU_RMB();
451 
452  ANNOTATE_HAPPENS_AFTER(&_starpu_config.running);
453  ret = _starpu_config.running;
454  ANNOTATE_HAPPENS_BEFORE(&_starpu_config.running);
455  return ret;
456 }
457 
458 
460 void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig);
461 
463 uint32_t _starpu_worker_exists(struct starpu_task *);
464 
466 uint32_t _starpu_can_submit_cuda_task(void);
467 
469 uint32_t _starpu_can_submit_cpu_task(void);
470 
472 uint32_t _starpu_can_submit_opencl_task(void);
473 
476 unsigned _starpu_worker_can_block(unsigned memnode, struct _starpu_worker *worker);
477 
481 void _starpu_block_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
482 
484 void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
486 void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
487 
488 static inline unsigned _starpu_worker_get_count(void)
489 {
490  return _starpu_config.topology.nworkers;
491 }
492 #define starpu_worker_get_count _starpu_worker_get_count
493 
497 static inline void _starpu_set_local_worker_key(struct _starpu_worker *worker)
498 {
499  STARPU_ASSERT(_starpu_keys_initialized);
500  STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, worker);
501 }
502 
505 static inline struct _starpu_worker *_starpu_get_local_worker_key(void)
506 {
507  if (!_starpu_keys_initialized)
508  return NULL;
509  return (struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
510 }
511 
515 static inline void _starpu_set_local_worker_set_key(struct _starpu_worker_set *worker)
516 {
517  STARPU_ASSERT(_starpu_keys_initialized);
518  STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, worker);
519 }
520 
523 static inline struct _starpu_worker_set *_starpu_get_local_worker_set_key(void)
524 {
525  if (!_starpu_keys_initialized)
526  return NULL;
527  return (struct _starpu_worker_set *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
528 }
529 
532 static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
533 {
534  STARPU_ASSERT(id < starpu_worker_get_count());
535  return &_starpu_config.workers[id];
536 }
537 
540 static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
541 {
542  return (id > STARPU_NMAX_SCHED_CTXS) ? NULL : &_starpu_config.sched_ctxs[id];
543 }
544 
545 struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id);
546 
549 static inline struct _starpu_machine_config *_starpu_get_machine_config(void)
550 {
551  return &_starpu_config;
552 }
553 
555 static inline int _starpu_get_disable_kernels(void)
556 {
557  return _starpu_config.disable_kernels;
558 }
559 
561 static inline enum _starpu_worker_status _starpu_worker_get_status(int workerid)
562 {
563  return _starpu_config.workers[workerid].status;
564 }
565 
568 static inline void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
569 {
570  _starpu_config.workers[workerid].status = status;
571 }
572 
574 static inline struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void)
575 {
576  return &_starpu_config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
577 }
578 
579 int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
580 
583 int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
584 
585 static inline unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
586 {
587  struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
588  return &w->sched_mutex == mutex;
589 }
590 
591 static inline int _starpu_worker_get_nsched_ctxs(int workerid)
592 {
593  return _starpu_config.workers[workerid].nsched_ctxs;
594 }
595 
597 static inline unsigned _starpu_get_nsched_ctxs(void)
598 {
599  /* topology.nsched_ctxs may be increased asynchronously in sched_ctx_create */
600  STARPU_RMB();
601  return _starpu_config.topology.nsched_ctxs;
602 }
603 
605 static inline int _starpu_worker_get_id(void)
606 {
607  struct _starpu_worker * worker;
608 
609  worker = _starpu_get_local_worker_key();
610  if (worker)
611  {
612  return worker->workerid;
613  }
614  else
615  {
616  /* there is no worker associated to that thread, perhaps it is
617  * a thread from the application or this is some SPU worker */
618  return -1;
619  }
620 }
621 #define starpu_worker_get_id _starpu_worker_get_id
622 
625 static inline unsigned __starpu_worker_get_id_check(const char *f, int l)
626 {
627  (void) l;
628  (void) f;
629  int id = starpu_worker_get_id();
630  STARPU_ASSERT_MSG(id>=0, "%s:%d Cannot be called from outside a worker\n", f, l);
631  return id;
632 }
633 #define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
634 
635 enum starpu_node_kind _starpu_worker_get_node_kind(enum starpu_worker_archtype type);
636 
637 void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *sched_ctx);
638 
639 struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
640 
646 static inline void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker * const worker)
647 {
648  _starpu_worker_parallel_blocks = 1;
649  /* flush pending requests to start on a fresh transaction epoch */
650  while (worker->state_unblock_in_parallel_req)
651  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
652 
653  /* announce blocking intent */
654  STARPU_ASSERT(worker->block_in_parallel_ref_count < UINT_MAX);
655  worker->block_in_parallel_ref_count++;
656 
657  if (worker->block_in_parallel_ref_count == 1)
658  {
659  /* only the transition from 0 to 1 triggers the block_in_parallel_req */
660 
661  STARPU_ASSERT(!worker->state_blocked_in_parallel);
662  STARPU_ASSERT(!worker->state_block_in_parallel_req);
663  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
664  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
665  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
666 
667  /* trigger the block_in_parallel_req */
668  worker->state_block_in_parallel_req = 1;
669  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
670 #ifdef STARPU_SIMGRID
671  starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
672 #endif
673 
674  /* wait for block_in_parallel_req to be processed */
675  while (!worker->state_block_in_parallel_ack)
676  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
677 
678  STARPU_ASSERT(worker->block_in_parallel_ref_count >= 1);
679  STARPU_ASSERT(worker->state_block_in_parallel_req);
680  STARPU_ASSERT(worker->state_blocked_in_parallel);
681 
682  /* reset block_in_parallel_req state flags */
683  worker->state_block_in_parallel_req = 0;
684  worker->state_block_in_parallel_ack = 0;
685 
686  /* broadcast block_in_parallel_req state flags reset */
687  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
688  }
689 }
690 
695 static inline void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker * const worker)
696 {
697  /* flush pending requests to start on a fresh transaction epoch */
698  while (worker->state_block_in_parallel_req)
699  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
700 
701  /* unblocking may be requested unconditionnally
702  * thus, check is unblocking is really needed */
703  if (worker->state_blocked_in_parallel)
704  {
705  if (worker->block_in_parallel_ref_count == 1)
706  {
707  /* only the transition from 1 to 0 triggers the unblock_in_parallel_req */
708 
709  STARPU_ASSERT(!worker->state_block_in_parallel_req);
710  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
711  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
712  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
713 
714  /* trigger the unblock_in_parallel_req */
715  worker->state_unblock_in_parallel_req = 1;
716  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
717 
718  /* wait for the unblock_in_parallel_req to be processed */
719  while (!worker->state_unblock_in_parallel_ack)
720  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
721 
722  STARPU_ASSERT(worker->state_unblock_in_parallel_req);
723  STARPU_ASSERT(!worker->state_blocked_in_parallel);
724 
725  /* reset unblock_in_parallel_req state flags */
726  worker->state_unblock_in_parallel_req = 0;
727  worker->state_unblock_in_parallel_ack = 0;
728 
729  /* broadcast unblock_in_parallel_req state flags reset */
730  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
731  }
732 
733  /* announce unblocking complete */
734  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
735  worker->block_in_parallel_ref_count--;
736  }
737 }
738 
744 static inline void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker * const worker)
745 {
746  while (worker->state_block_in_parallel_req)
747  {
748  STARPU_ASSERT(!worker->state_blocked_in_parallel);
749  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
750  STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
751  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
752  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
753 
754  /* enter effective blocked state */
755  worker->state_blocked_in_parallel = 1;
756 
757  /* notify block_in_parallel_req processing */
758  worker->state_block_in_parallel_ack = 1;
759  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
760 
761  /* block */
762  while (!worker->state_unblock_in_parallel_req)
763  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
764 
765  STARPU_ASSERT(worker->state_blocked_in_parallel);
766  STARPU_ASSERT(!worker->state_block_in_parallel_req);
767  STARPU_ASSERT(!worker->state_block_in_parallel_ack);
768  STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
769  STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
770 
771  /* leave effective blocked state */
772  worker->state_blocked_in_parallel = 0;
773 
774  /* notify unblock_in_parallel_req processing */
775  worker->state_unblock_in_parallel_ack = 1;
776  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
777  }
778 }
779 
796 #ifdef STARPU_SPINLOCK_CHECK
797 static inline void __starpu_worker_enter_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
798 #else
799 static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
800 #endif
801 {
802  STARPU_ASSERT(!worker->state_sched_op_pending);
804  {
805  /* process pending block requests before entering a sched_op region */
806  _starpu_worker_process_block_in_parallel_requests(worker);
807  while (worker->state_changing_ctx_notice)
808  {
809  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
810 
811  /* new block requests may have been triggered during the wait,
812  * need to check again */
813  _starpu_worker_process_block_in_parallel_requests(worker);
814  }
815  }
816  else
817  {
818  /* if someone observed the worker state since the last call, postpone block request
819  * processing for one sched_op turn more, because the observer will not have seen
820  * new block requests between its observation and now.
821  *
822  * however, the worker still has to wait for context change operations to complete
823  * before entering sched_op again*/
824  while (worker->state_changing_ctx_notice)
825  {
826  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
827  }
828  }
829 
830  /* no block request and no ctx change ahead,
831  * enter sched_op */
832  worker->state_sched_op_pending = 1;
834  worker->state_relax_refcnt = 0;
835 #ifdef STARPU_SPINLOCK_CHECK
836  worker->relax_on_file = file;
837  worker->relax_on_line = line;
838  worker->relax_on_func = func;
839 #endif
840 }
841 #ifdef STARPU_SPINLOCK_CHECK
842 #define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
843 #endif
844 
849 void _starpu_worker_apply_deferred_ctx_changes(void);
850 #ifdef STARPU_SPINLOCK_CHECK
851 static inline void __starpu_worker_leave_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
852 #else
853 static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
854 #endif
855 {
856  STARPU_ASSERT(worker->state_sched_op_pending);
857  worker->state_relax_refcnt = 1;
858 #ifdef STARPU_SPINLOCK_CHECK
859  worker->relax_off_file = file;
860  worker->relax_off_line = line;
861  worker->relax_off_func = func;
862 #endif
863  worker->state_sched_op_pending = 0;
864  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
865  _starpu_worker_apply_deferred_ctx_changes();
866 }
867 #ifdef STARPU_SPINLOCK_CHECK
868 #define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
869 #endif
870 
871 static inline int _starpu_worker_sched_op_pending(void)
872 {
873  int workerid = starpu_worker_get_id();
874  if (workerid == -1)
875  return 0;
876  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
877  STARPU_ASSERT(worker != NULL);
878  return worker->state_sched_op_pending;
879 }
880 
890 static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
891 {
892  STARPU_ASSERT(!starpu_pthread_equal(worker->thread_changing_ctx, starpu_pthread_self()));
893  /* flush pending requests to start on a fresh transaction epoch */
894  while (worker->state_changing_ctx_notice)
895  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
896 
897  /* announce changing_ctx intent
898  *
899  * - an already started sched_op is allowed to complete
900  * - no new sched_op may be started
901  */
902  worker->state_changing_ctx_notice = 1;
903 
904  worker->thread_changing_ctx = starpu_pthread_self();
905 
906  /* allow for an already started sched_op to complete */
907  if (worker->state_sched_op_pending)
908  {
909  /* request sched_op to broadcast when way is cleared */
910  worker->state_changing_ctx_waiting = 1;
911 
912  /* wait for sched_op completion */
913  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
914 #ifdef STARPU_SIMGRID
915  starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
916 #endif
917  do
918  {
919  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
920  }
921  while (worker->state_sched_op_pending);
922 
923  /* reset flag so other sched_ops wont have to broadcast state */
924  worker->state_changing_ctx_waiting = 0;
925  }
926 }
927 
932 static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
933 {
934  worker->thread_changing_ctx = (starpu_pthread_t)0;
935  worker->state_changing_ctx_notice = 0;
936  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
937 }
938 
941 #ifdef STARPU_SPINLOCK_CHECK
942 static inline void __starpu_worker_relax_on(const char*file, int line, const char* func)
943 #else
944 static inline void _starpu_worker_relax_on(void)
945 #endif
946 {
947  struct _starpu_worker *worker = _starpu_get_local_worker_key();
948  if (worker == NULL)
949  return;
950  if (!worker->state_sched_op_pending)
951  return;
952  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
953 #ifdef STARPU_SPINLOCK_CHECK
954  STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
955 #else
956  STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
957 #endif
958  worker->state_relax_refcnt++;
959 #ifdef STARPU_SPINLOCK_CHECK
960  worker->relax_on_file = file;
961  worker->relax_on_line = line;
962  worker->relax_on_func = func;
963 #endif
964  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
965  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
966 }
967 #ifdef STARPU_SPINLOCK_CHECK
968 #define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
969 #endif
970 #define starpu_worker_relax_on _starpu_worker_relax_on
971 
973 #ifdef STARPU_SPINLOCK_CHECK
974 static inline void __starpu_worker_relax_on_locked(struct _starpu_worker *worker, const char*file, int line, const char* func)
975 #else
976 static inline void _starpu_worker_relax_on_locked(struct _starpu_worker *worker)
977 #endif
978 {
979  if (!worker->state_sched_op_pending)
980  return;
981 #ifdef STARPU_SPINLOCK_CHECK
982  STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
983 #else
984  STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
985 #endif
986  worker->state_relax_refcnt++;
987 #ifdef STARPU_SPINLOCK_CHECK
988  worker->relax_on_file = file;
989  worker->relax_on_line = line;
990  worker->relax_on_func = func;
991 #endif
992  STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
993 }
994 #ifdef STARPU_SPINLOCK_CHECK
995 #define _starpu_worker_relax_on_locked(worker) __starpu_worker_relax_on_locked(worker,__FILE__, __LINE__, __starpu_func__)
996 #endif
997 
998 #ifdef STARPU_SPINLOCK_CHECK
999 static inline void __starpu_worker_relax_off(const char*file, int line, const char* func)
1000 #else
1001 static inline void _starpu_worker_relax_off(void)
1002 #endif
1003 {
1004  int workerid = starpu_worker_get_id();
1005  if (workerid == -1)
1006  return;
1007  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1008  STARPU_ASSERT(worker != NULL);
1009  if (!worker->state_sched_op_pending)
1010  return;
1011  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1012 #ifdef STARPU_SPINLOCK_CHECK
1013  STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1014 #else
1015  STARPU_ASSERT(worker->state_relax_refcnt>0);
1016 #endif
1017  worker->state_relax_refcnt--;
1018 #ifdef STARPU_SPINLOCK_CHECK
1019  worker->relax_off_file = file;
1020  worker->relax_off_line = line;
1021  worker->relax_off_func = func;
1022 #endif
1023  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1024 }
1025 #ifdef STARPU_SPINLOCK_CHECK
1026 #define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
1027 #endif
1028 #define starpu_worker_relax_off _starpu_worker_relax_off
1029 
1030 #ifdef STARPU_SPINLOCK_CHECK
1031 static inline void __starpu_worker_relax_off_locked(const char*file, int line, const char* func)
1032 #else
1033 static inline void _starpu_worker_relax_off_locked(void)
1034 #endif
1035 {
1036  int workerid = starpu_worker_get_id();
1037  if (workerid == -1)
1038  return;
1039  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1040  STARPU_ASSERT(worker != NULL);
1041  if (!worker->state_sched_op_pending)
1042  return;
1043 #ifdef STARPU_SPINLOCK_CHECK
1044  STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1045 #else
1046  STARPU_ASSERT(worker->state_relax_refcnt>0);
1047 #endif
1048  worker->state_relax_refcnt--;
1049 #ifdef STARPU_SPINLOCK_CHECK
1050  worker->relax_off_file = file;
1051  worker->relax_off_line = line;
1052  worker->relax_off_func = func;
1053 #endif
1054 }
1055 #ifdef STARPU_SPINLOCK_CHECK
1056 #define _starpu_worker_relax_off_locked() __starpu_worker_relax_off_locked(__FILE__, __LINE__, __starpu_func__)
1057 #endif
1058 
1059 static inline int _starpu_worker_get_relax_state(void)
1060 {
1061  int workerid = starpu_worker_get_id();
1062  if (workerid < 0)
1063  return 1;
1064  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1065  STARPU_ASSERT(worker != NULL);
1066  return worker->state_relax_refcnt != 0;
1067 }
1068 #define starpu_worker_get_relax_state _starpu_worker_get_relax_state
1069 
1074 static inline void _starpu_worker_lock(int workerid)
1075 {
1076  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1077  STARPU_ASSERT(worker != NULL);
1078  int cur_workerid = starpu_worker_get_id();
1079  if (workerid != cur_workerid)
1080  {
1081  starpu_worker_relax_on();
1082 
1083  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1084  while (!worker->state_relax_refcnt)
1085  {
1086  STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
1087  }
1088  }
1089  else
1090  {
1091  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1092  }
1093 }
1094 
1095 static inline int _starpu_worker_trylock(int workerid)
1096 {
1097  struct _starpu_worker *cur_worker = _starpu_get_local_worker_key();
1098  int cur_workerid = cur_worker->workerid;
1099  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1100  STARPU_ASSERT(worker != NULL);
1101 
1102  /* Start with ourself */
1103  int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&cur_worker->sched_mutex);
1104  if (ret)
1105  return ret;
1106  if (workerid == cur_workerid)
1107  /* We only needed to lock ourself */
1108  return 0;
1109 
1110  /* Now try to lock the other worker */
1111  ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
1112  if (!ret)
1113  {
1114  /* Good, check that it is relaxed */
1115  ret = !worker->state_relax_refcnt;
1116  if (ret)
1117  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1118  }
1119  if (!ret)
1120  _starpu_worker_relax_on_locked(cur_worker);
1121  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
1122  return ret;
1123 }
1124 
1125 static inline void _starpu_worker_unlock(int workerid)
1126 {
1127  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1128  STARPU_ASSERT(worker != NULL);
1129  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1130  int cur_workerid = starpu_worker_get_id();
1131  if (workerid != cur_workerid)
1132  {
1133  starpu_worker_relax_off();
1134  }
1135 }
1136 
1137 static inline void _starpu_worker_lock_self(void)
1138 {
1139  int workerid = starpu_worker_get_id_check();
1140  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1141  STARPU_ASSERT(worker != NULL);
1142  STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1143 }
1144 
1145 static inline void _starpu_worker_unlock_self(void)
1146 {
1147  int workerid = starpu_worker_get_id_check();
1148  struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1149  STARPU_ASSERT(worker != NULL);
1150  STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1151 }
1152 
1153 static inline int _starpu_wake_worker_relax(int workerid)
1154 {
1155  _starpu_worker_lock(workerid);
1156  int ret = starpu_wake_worker_locked(workerid);
1157  _starpu_worker_unlock(workerid);
1158  return ret;
1159 }
1160 
1161 int starpu_wake_worker_relax_light(int workerid);
1162 
1167 void _starpu_worker_refuse_task(struct _starpu_worker *worker, struct starpu_task *task);
1168 
1169 /* @}*/
1170 
1171 #endif // __WORKERS_H__
_starpu_machine_config::opencl_nodeid
int opencl_nodeid
Definition: workers.h:377
_starpu_worker::has_prev_init
unsigned has_prev_init
Definition: workers.h:163
_starpu_machine_topology::workers_bindid
unsigned workers_bindid[STARPU_NMAXWORKERS]
Definition: workers.h:317
_starpu_machine_config::bindid_workers
struct _starpu_machine_config::@0 * bindid_workers
_starpu_machine_topology::nhwmiccores
unsigned nhwmiccores[STARPU_MAXMICDEVS]
Definition: workers.h:307
_starpu_machine_topology::nhwmpicores
unsigned nhwmpicores[STARPU_MAXMPIDEVS]
Definition: workers.h:299
_starpu_machine_config::workers
struct _starpu_worker workers[STARPU_NMAXWORKERS]
Definition: workers.h:385
_starpu_machine_topology::workers_cuda_gpuid
unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:325
_starpu_worker::state_blocked_in_parallel_observed
unsigned state_blocked_in_parallel_observed
Definition: workers.h:109
_starpu_worker::worker_mask
uint32_t worker_mask
Definition: workers.h:76
_starpu_worker::is_slave_somewhere
unsigned is_slave_somewhere
Definition: workers.h:188
_starpu_worker::current_rank
int current_rank
Definition: workers.h:84
_starpu_worker::state_block_in_parallel_req
unsigned state_block_in_parallel_req
Definition: workers.h:110
_starpu_worker::thread_changing_ctx
starpu_pthread_t thread_changing_ctx
Definition: workers.h:122
_starpu_machine_topology::workers_mpi_ms_deviceid
unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS]
Definition: workers.h:343
_starpu_combined_worker::memory_node
unsigned memory_node
Definition: workers.h:206
_starpu_machine_config::cpus_nodeid
int cpus_nodeid
Definition: workers.h:373
_starpu_machine_topology::ncombinedworkers
unsigned ncombinedworkers
Definition: workers.h:247
_starpu_worker::pipeline_length
unsigned char pipeline_length
Definition: workers.h:146
_starpu_worker::state_unblock_in_parallel_req
unsigned state_unblock_in_parallel_req
Definition: workers.h:112
_starpu_worker::state_relax_refcnt
unsigned state_relax_refcnt
Definition: workers.h:96
_starpu_worker::sched_cond
starpu_pthread_cond_t sched_cond
Definition: workers.h:94
_starpu_machine_topology::nhwopenclgpus
unsigned nhwopenclgpus
Definition: workers.h:276
_starpu_worker::memory_node
unsigned memory_node
Definition: workers.h:88
_starpu_machine_topology::nmpidevices
unsigned nmpidevices
Definition: workers.h:296
_starpu_machine_config::sched_ctxs
struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1]
Definition: workers.h:417
_starpu_worker::subworkerid
unsigned subworkerid
Definition: workers.h:80
_starpu_machine_topology::nhwmpi
unsigned nhwmpi
Definition: workers.h:281
_starpu_worker::state_unblock_in_parallel_ack
unsigned state_unblock_in_parallel_ack
Definition: workers.h:113
_starpu_worker::bindid
int bindid
Definition: workers.h:81
_starpu_combined_worker::perf_arch
struct starpu_perfmodel_arch perf_arch
Definition: workers.h:203
_starpu_worker::state_blocked_in_parallel
unsigned state_blocked_in_parallel
Definition: workers.h:108
_starpu_machine_config::conf
struct starpu_conf conf
Definition: workers.h:405
_starpu_worker::state_changing_ctx_waiting
unsigned state_changing_ctx_waiting
Definition: workers.h:106
_starpu_worker::local_ordered_tasks_size
unsigned local_ordered_tasks_size
Definition: workers.h:133
_starpu_worker::spinning_backoff
unsigned spinning_backoff
Definition: workers.h:167
_starpu_worker::state_keep_awake
unsigned state_keep_awake
Definition: workers.h:152
_starpu_worker
Definition: workers.h:72
_starpu_machine_config::pause_depth
int pause_depth
Definition: workers.h:414
_starpu_machine_topology::nhwpus
unsigned nhwpus
Definition: workers.h:266
_starpu_machine_config::submitting
unsigned submitting
Definition: workers.h:420
_starpu_machine_topology::nworkers
unsigned nworkers
Definition: workers.h:244
_starpu_worker::pipeline_stuck
unsigned char pipeline_stuck
Definition: workers.h:147
_starpu_worker_set::started
unsigned started
Definition: workers.h:230
_starpu_worker::worker_thread
starpu_pthread_t worker_thread
Definition: workers.h:78
_starpu_worker::run_by_starpu
unsigned run_by_starpu
Definition: workers.h:155
_starpu_machine_topology::nhwmicdevices
unsigned nhwmicdevices
Definition: workers.h:304
_starpu_machine_config::current_bindid
int current_bindid
Definition: workers.h:356
_starpu_machine_config::worker_mask
uint32_t worker_mask
Definition: workers.h:402
_starpu_worker::state_changing_ctx_notice
unsigned state_changing_ctx_notice
Definition: workers.h:107
_starpu_machine_config::combined_workers
struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS]
Definition: workers.h:389
_starpu_worker::nb_buffers_transferred
unsigned nb_buffers_transferred
Definition: workers.h:169
_starpu_machine_config::nbindid
unsigned nbindid
Definition: workers.h:397
_starpu_machine_config::current_mpi_deviceid
int current_mpi_deviceid
Definition: workers.h:370
_starpu_worker::devid
unsigned devid
Definition: workers.h:79
_starpu_machine_config::running
unsigned running
Definition: workers.h:408
_starpu_worker::ntasks
unsigned char ntasks
Definition: workers.h:145
_starpu_worker::task_transferring
struct starpu_task * task_transferring
Definition: workers.h:171
_starpu_machine_config::current_cuda_gpuid
int current_cuda_gpuid
Definition: workers.h:361
_starpu_machine_topology::hwtopology
hwloc_topology_t hwtopology
Definition: workers.h:253
_starpu_combined_worker
Definition: workers.h:201
_starpu_machine_topology::nhwcudagpus
unsigned nhwcudagpus
Definition: workers.h:271
_starpu_worker::current_task
struct starpu_task * current_task
Definition: workers.h:136
_starpu_machine_config::mpi_nodeid
int mpi_nodeid
Definition: workers.h:381
_starpu_machine_topology::ncpus
unsigned ncpus
Definition: workers.h:284
_starpu_machine_config::current_opencl_gpuid
int current_opencl_gpuid
Definition: workers.h:364
_starpu_machine_topology::ncudagpus
unsigned ncudagpus
Definition: workers.h:287
_starpu_worker::nsched_ctxs
unsigned nsched_ctxs
Definition: workers.h:160
_starpu_worker::worker_size
int worker_size
Definition: workers.h:85
_starpu_worker_set::ready_cond
starpu_pthread_cond_t ready_cond
Definition: workers.h:233
_starpu_machine_topology::nopenclgpus
unsigned nopenclgpus
Definition: workers.h:293
_starpu_machine_config
Definition: workers.h:346
_starpu_machine_config::cuda_nodeid
int cuda_nodeid
Definition: workers.h:375
_starpu_machine_topology::nhwcpus
unsigned nhwcpus
Definition: workers.h:261
_starpu_machine_config::mic_nodeid
int mic_nodeid
Definition: workers.h:379
_starpu_machine_topology::workers_opencl_gpuid
unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:333
_starpu_worker::local_ordered_tasks
struct starpu_task ** local_ordered_tasks
Definition: workers.h:132
_starpu_worker::set
struct _starpu_worker_set * set
Definition: workers.h:148
_starpu_worker_set::worker_thread
starpu_pthread_t worker_thread
Definition: workers.h:228
_starpu_worker::combined_workerid
int combined_workerid
Definition: workers.h:83
_starpu_worker::first_task
unsigned char first_task
Definition: workers.h:144
_starpu_worker::current_ordered_task
unsigned current_ordered_task
Definition: workers.h:134
_starpu_machine_topology::tree
struct starpu_tree * tree
Definition: workers.h:256
_starpu_worker::state_sched_op_pending
unsigned state_sched_op_pending
Definition: workers.h:105
_starpu_worker::nb_buffers_totransfer
unsigned nb_buffers_totransfer
Definition: workers.h:170
_starpu_worker::workerid
int workerid
Definition: workers.h:82
_starpu_worker::current_ordered_task_order
unsigned current_ordered_task_order
Definition: workers.h:135
_starpu_worker::numa_memory_node
unsigned numa_memory_node
Definition: workers.h:89
_starpu_worker::block_in_parallel_ref_count
unsigned block_in_parallel_ref_count
Definition: workers.h:121
_starpu_worker::state_block_in_parallel_ack
unsigned state_block_in_parallel_ack
Definition: workers.h:111
_starpu_worker::sched_mutex
starpu_pthread_mutex_t sched_mutex
Definition: workers.h:95
_starpu_machine_topology
Definition: workers.h:241
_starpu_machine_config::current_mic_deviceid
int current_mic_deviceid
Definition: workers.h:367
_starpu_worker::ready_cond
starpu_pthread_cond_t ready_cond
Definition: workers.h:87
_starpu_worker::started_cond
starpu_pthread_cond_t started_cond
Definition: workers.h:86
_starpu_worker_set
Definition: workers.h:225
_starpu_combined_worker::worker_mask
uint32_t worker_mask
Definition: workers.h:204
_starpu_worker::pop_ctx_priority
unsigned pop_ctx_priority
Definition: workers.h:187