Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19 
20 #include "../tbb_stddef.h"
21 #include "../task.h"
22 #include "../task_arena.h"
23 #include "../flow_graph_abstractions.h"
24 
25 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
26 #include "../concurrent_priority_queue.h"
27 #endif
28 
29 #include <list>
30 
31 #if TBB_DEPRECATED_FLOW_ENQUEUE
32 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
33 #else
34 #define FLOW_SPAWN(a) tbb::task::spawn((a))
35 #endif
36 
37 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
38 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
39 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
40 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
41 #else
42 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
43 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
44 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
45 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
46 
47 #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
48 #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
49 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
50 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
51 #else
52 #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
53 #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
54 #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
55 #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
56 
57 namespace tbb {
58 namespace flow {
59 
60 namespace internal {
61 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
62 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
63 typedef unsigned int node_priority_t;
64 static const node_priority_t no_priority = node_priority_t(0);
65 #endif
66 }
67 
68 namespace interface10 {
69 
71 
72 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
76 struct graph_task : public task {
77  graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
79 };
80 #else
81 typedef task graph_task;
82 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
83 
84 
85 class graph;
86 class graph_node;
87 
88 template <typename GraphContainerType, typename GraphNodeType>
90  friend class graph;
91  friend class graph_node;
92 public:
93  typedef size_t size_type;
94  typedef GraphNodeType value_type;
95  typedef GraphNodeType* pointer;
96  typedef GraphNodeType& reference;
97  typedef const GraphNodeType& const_reference;
98  typedef std::forward_iterator_tag iterator_category;
99 
101  graph_iterator() : my_graph(NULL), current_node(NULL) {}
102 
105  my_graph(other.my_graph), current_node(other.current_node)
106  {}
107 
110  if (this != &other) {
111  my_graph = other.my_graph;
112  current_node = other.current_node;
113  }
114  return *this;
115  }
116 
118  reference operator*() const;
119 
121  pointer operator->() const;
122 
124  bool operator==(const graph_iterator& other) const {
125  return ((my_graph == other.my_graph) && (current_node == other.current_node));
126  }
127 
129  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
130 
133  internal_forward();
134  return *this;
135  }
136 
139  graph_iterator result = *this;
140  operator++();
141  return result;
142  }
143 
144 private:
145  // the graph over which we are iterating
146  GraphContainerType *my_graph;
147  // pointer into my_graph's my_nodes list
148  pointer current_node;
149 
151  graph_iterator(GraphContainerType *g, bool begin);
152  void internal_forward();
153 }; // class graph_iterator
154 
155 // flags to modify the behavior of the graph reset(). Can be combined.
158  rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
159  rf_clear_edges = 1 << 1 // delete edges
160 };
161 
162 namespace internal {
163 
164 void activate_graph(graph& g);
165 void deactivate_graph(graph& g);
166 bool is_graph_active(graph& g);
167 tbb::task& prioritize_task(graph& g, tbb::task& arena_task);
168 void spawn_in_graph_arena(graph& g, tbb::task& arena_task);
169 void enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
171 
172 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
174  bool operator()(const graph_task* left, const graph_task* right) {
175  return left->priority < right->priority;
176  }
177 };
178 
180 
181 class priority_task_selector : public task {
182 public:
183  priority_task_selector(graph_task_priority_queue_t& priority_queue)
184  : my_priority_queue(priority_queue) {}
186  graph_task* t = NULL;
187  bool result = my_priority_queue.try_pop(t);
188  __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
189  " in graph's priority queue mismatched" );
191  "Incorrect task submitted to graph priority queue" );
193  "Tasks from graph's priority queue must have priority" );
194  task* t_next = t->execute();
195  task::destroy(*t);
196  return t_next;
197  }
198 private:
199  graph_task_priority_queue_t& my_priority_queue;
200 };
201 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
202 
203 }
204 
206 
208  friend class graph_node;
209 
210  template< typename Body >
211  class run_task : public graph_task {
212  public:
213  run_task(Body& body
215  , node_priority_t node_priority = no_priority
216  ) : graph_task(node_priority),
217 #else
218  ) :
219 #endif
220  my_body(body) { }
222  my_body();
223  return NULL;
224  }
225  private:
226  Body my_body;
227  };
228 
229  template< typename Receiver, typename Body >
230  class run_and_put_task : public graph_task {
231  public:
232  run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
234  tbb::task *res = my_receiver.try_put_task(my_body());
235  if (res == SUCCESSFULLY_ENQUEUED) res = NULL;
236  return res;
237  }
238  private:
239  Receiver &my_receiver;
240  Body my_body;
241  };
242  typedef std::list<tbb::task *> task_list_type;
243 
244  class wait_functor {
246  public:
247  wait_functor(tbb::task* t) : graph_root_task(t) {}
248  void operator()() const { graph_root_task->wait_for_all(); }
249  };
250 
254  public:
255  spawn_functor(tbb::task& t) : spawn_task(t) {}
256  void operator()() const {
257  FLOW_SPAWN(spawn_task);
258  }
259  };
260 
261  void prepare_task_arena(bool reinit = false) {
262  if (reinit) {
263  __TBB_ASSERT(my_task_arena, "task arena is NULL");
264  my_task_arena->terminate();
265  my_task_arena->initialize(tbb::task_arena::attach());
266  }
267  else {
268  __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
269  my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
270  }
271  if (!my_task_arena->is_active()) // failed to attach
272  my_task_arena->initialize(); // create a new, default-initialized arena
273  __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
274  }
275 
276 public:
278  graph();
279 
281  explicit graph(tbb::task_group_context& use_this_context);
282 
284 
285  ~graph();
286 
287 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
288  void set_name(const char *name);
289 #endif
290 
292  reserve_wait();
293  }
294 
296  release_wait();
297  }
298 
300 
302  void reserve_wait() __TBB_override;
303 
305 
307  void release_wait() __TBB_override;
308 
310 
312  template< typename Receiver, typename Body >
313  void run(Receiver &r, Body body) {
314  if (internal::is_graph_active(*this)) {
315  task* rtask = new (task::allocate_additional_child_of(*root_task()))
317  my_task_arena->execute(spawn_functor(*rtask));
318  }
319  }
320 
322 
324  template< typename Body >
325  void run(Body body) {
326  if (internal::is_graph_active(*this)) {
327  task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
328  my_task_arena->execute(spawn_functor(*rtask));
329  }
330  }
331 
333 
334  void wait_for_all() {
335  cancelled = false;
336  caught_exception = false;
337  if (my_root_task) {
338 #if TBB_USE_EXCEPTIONS
339  try {
340 #endif
341  my_task_arena->execute(wait_functor(my_root_task));
342  cancelled = my_context->is_group_execution_cancelled();
343 #if TBB_USE_EXCEPTIONS
344  }
345  catch (...) {
346  my_root_task->set_ref_count(1);
347  my_context->reset();
348  caught_exception = true;
349  cancelled = true;
350  throw;
351  }
352 #endif
353  // TODO: the "if" condition below is just a work-around to support the concurrent wait
354  // mode. The cancellation and exception mechanisms are still broken in this mode.
355  // Consider using task group not to re-implement the same functionality.
356  if (!(my_context->traits() & tbb::task_group_context::concurrent_wait)) {
357  my_context->reset(); // consistent with behavior in catch()
358  my_root_task->set_ref_count(1);
359  }
360  }
361  }
362 
365  return my_root_task;
366  }
367 
368  // ITERATORS
369  template<typename C, typename N>
370  friend class graph_iterator;
371 
372  // Graph iterator typedefs
375 
376  // Graph iterator constructors
378  iterator begin();
380  iterator end();
382  const_iterator begin() const;
384  const_iterator end() const;
386  const_iterator cbegin() const;
388  const_iterator cend() const;
389 
391  bool is_cancelled() { return cancelled; }
392  bool exception_thrown() { return caught_exception; }
393 
394  // thread-unsafe state reset.
395  void reset(reset_flags f = rf_reset_protocol);
396 
397 private:
401  bool cancelled;
404  task_list_type my_reset_task_list;
405 
407 
409  void register_node(graph_node *n);
410  void remove_node(graph_node *n);
411 
413 
414 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
416 #endif
417 
418  friend void internal::activate_graph(graph& g);
419  friend void internal::deactivate_graph(graph& g);
420  friend bool internal::is_graph_active(graph& g);
421  friend tbb::task& internal::prioritize_task(graph& g, tbb::task& arena_task);
422  friend void internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
423  friend void internal::enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
425 
427 
428 }; // class graph
429 
432  friend class graph;
433  template<typename C, typename N>
434  friend class graph_iterator;
435 protected:
437  graph_node *next, *prev;
438 public:
439  explicit graph_node(graph& g);
440 
441  virtual ~graph_node();
442 
443 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
444  virtual void set_name(const char *name) = 0;
445 #endif
446 
447 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
448  virtual void extract() = 0;
449 #endif
450 
451 protected:
452  // performs the reset on an individual node.
453  virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
454 }; // class graph_node
455 
456 namespace internal {
457 
458 inline void activate_graph(graph& g) {
459  g.my_is_active = true;
460 }
461 
462 inline void deactivate_graph(graph& g) {
463  g.my_is_active = false;
464 }
465 
466 inline bool is_graph_active(graph& g) {
467  return g.my_is_active;
468 }
469 
470 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
472  task* critical_task = &t;
473  // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
474  graph_task* gt = static_cast<graph_task*>(&t);
475  if( gt->priority != no_priority ) {
480  critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
481  tbb::internal::make_critical( *critical_task );
482  g.my_priority_queue.push(gt);
483  }
484  return *critical_task;
485 }
486 #else
488  return t;
489 }
490 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
491 
493 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
494  if (is_graph_active(g)) {
495  graph::spawn_functor s_fn(prioritize_task(g, arena_task));
497  g.my_task_arena->execute(s_fn);
498  }
499 }
500 
502 inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
503  if (is_graph_active(g)) {
504  __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
505  task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
506  }
507 }
508 
510  g.my_reset_task_list.push_back(tp);
511 }
512 
513 } // namespace internal
514 
515 } // namespace interface10
516 } // namespace flow
517 } // namespace tbb
518 
519 #endif // __TBB_flow_graph_impl_H
graph_iterator< const graph, const graph_node > const_iterator
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
internal::graph_task_priority_queue_t my_priority_queue
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
bool is_cancelled()
return status of graph execution
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:646
#define __TBB_override
Definition: tbb_stddef.h:240
graph_task(node_priority_t node_priority=no_priority)
unsigned int node_priority_t
graph_iterator & operator=(const graph_iterator &other)
Assignment.
void make_critical(task &t)
Definition: task.h:957
task * execute() __TBB_override
Should be overridden by derived classes.
tbb::task & prioritize_task(graph &g, tbb::task &arena_task)
bool operator==(const graph_iterator &other) const
Equality.
The graph class.
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:821
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
tbb::task * root_task()
Returns the root task of the graph.
static tbb::task *const SUCCESSFULLY_ENQUEUED
graph_iterator< graph, graph_node > iterator
bool operator==(const cache_aligned_allocator< T > &, const cache_aligned_allocator< U > &)
std::list< tbb::task * > task_list_type
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
void enqueue_in_graph_arena(graph &g, tbb::task &arena_task)
Enqueues a task inside graph arena.
graph_iterator & operator++()
Pre-increment.
virtual task * execute()=0
Should be overridden by derived classes.
Base class for tasks generated by graph nodes.
void prepare_task_arena(bool reinit=false)
The base of all graph nodes.
void run(Body body)
Spawns a task that runs a function object.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
Pure virtual template classes that define interfaces for async communication.
graph_iterator(const graph_iterator &other)
Copy constructor.
void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
tbb::task_group_context * my_context
static const node_priority_t no_priority
Used to form groups of tasks.
Definition: task.h:332
static void enqueue(task &t)
Enqueue task for starvation-resistant execution.
Definition: task.h:806
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert ...
Definition: tbb_stddef.h:167
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:331
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:343
bool operator!=(const graph_iterator &other) const
Inequality.
#define FLOW_SPAWN(a)
std::forward_iterator_tag iterator_category
Base class for user-defined tasks.
Definition: task.h:589
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:789
run_task(Body &body, node_priority_t node_priority=no_priority)
A lock that occupies a single byte.
Definition: spin_mutex.h:36
bool operator()(const graph_task *left, const graph_task *right)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
graph_iterator operator++(int)
Post-increment.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
priority_task_selector(graph_task_priority_queue_t &priority_queue)
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:236
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls...

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.