Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_join_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB__flow_graph_join_impl_H
18 #define __TBB__flow_graph_join_impl_H
19 
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23 
24 namespace internal {
25 
27  forwarding_base(graph &g) : graph_ref(g) {}
28  virtual ~forwarding_base() {}
29  // decrement_port_count may create a forwarding task. If we cannot handle the task
30  // ourselves, ask decrement_port_count to deal with it.
31  virtual task * decrement_port_count(bool handle_task) = 0;
32  virtual void increment_port_count() = 0;
33  // moved here so input ports can queue tasks
34  graph& graph_ref;
35  };
36 
37  // specialization that lets us keep a copy of the current_key for building results.
38  // KeyType can be a reference type.
39  template<typename KeyType>
43  virtual task * increment_key_count(current_key_type const & /*t*/, bool /*handle_task*/) = 0; // {return NULL;}
44  current_key_type current_key; // so ports can refer to FE's desired items
45  };
46 
47  template< int N >
48  struct join_helper {
49 
50  template< typename TupleType, typename PortType >
51  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
52  tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
54  }
55  template< typename TupleType >
56  static inline void consume_reservations( TupleType &my_input ) {
57  tbb::flow::get<N-1>( my_input ).consume();
59  }
60 
61  template< typename TupleType >
62  static inline void release_my_reservation( TupleType &my_input ) {
63  tbb::flow::get<N-1>( my_input ).release();
64  }
65 
66  template <typename TupleType>
67  static inline void release_reservations( TupleType &my_input) {
69  release_my_reservation(my_input);
70  }
71 
72  template< typename InputTuple, typename OutputTuple >
73  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
74  if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) ) return false;
75  if ( !join_helper<N-1>::reserve( my_input, out ) ) {
76  release_my_reservation( my_input );
77  return false;
78  }
79  return true;
80  }
81 
82  template<typename InputTuple, typename OutputTuple>
83  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
84  bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) ); // may fail
85  return join_helper<N-1>::get_my_item(my_input, out) && res; // do get on other inputs before returning
86  }
87 
88  template<typename InputTuple, typename OutputTuple>
89  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
90  return get_my_item(my_input, out);
91  }
92 
93  template<typename InputTuple>
94  static inline void reset_my_port(InputTuple &my_input) {
96  tbb::flow::get<N-1>(my_input).reset_port();
97  }
98 
99  template<typename InputTuple>
100  static inline void reset_ports(InputTuple& my_input) {
101  reset_my_port(my_input);
102  }
103 
104  template<typename InputTuple, typename KeyFuncTuple>
105  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
106  tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
107  tbb::flow::get<N-1>(my_key_funcs) = NULL;
108  join_helper<N-1>::set_key_functors(my_input, my_key_funcs);
109  }
110 
111  template< typename KeyFuncTuple>
112  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
113  if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
114  tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
115  }
116  join_helper<N-1>::copy_key_functors(my_inputs, other_inputs);
117  }
118 
119  template<typename InputTuple>
120  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
121  join_helper<N-1>::reset_inputs(my_input, f);
122  tbb::flow::get<N-1>(my_input).reset_receiver(f);
123  }
124 
125 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
126  template<typename InputTuple>
127  static inline void extract_inputs(InputTuple &my_input) {
129  tbb::flow::get<N-1>(my_input).extract_receiver();
130  }
131 #endif
132  }; // join_helper<N>
133 
134  template< >
135  struct join_helper<1> {
136 
137  template< typename TupleType, typename PortType >
138  static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
139  tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
140  }
141 
142  template< typename TupleType >
143  static inline void consume_reservations( TupleType &my_input ) {
144  tbb::flow::get<0>( my_input ).consume();
145  }
146 
147  template< typename TupleType >
148  static inline void release_my_reservation( TupleType &my_input ) {
149  tbb::flow::get<0>( my_input ).release();
150  }
151 
152  template<typename TupleType>
153  static inline void release_reservations( TupleType &my_input) {
154  release_my_reservation(my_input);
155  }
156 
157  template< typename InputTuple, typename OutputTuple >
158  static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
159  return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
160  }
161 
162  template<typename InputTuple, typename OutputTuple>
163  static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
164  return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
165  }
166 
167  template<typename InputTuple, typename OutputTuple>
168  static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
169  return get_my_item(my_input, out);
170  }
171 
172  template<typename InputTuple>
173  static inline void reset_my_port(InputTuple &my_input) {
174  tbb::flow::get<0>(my_input).reset_port();
175  }
176 
177  template<typename InputTuple>
178  static inline void reset_ports(InputTuple& my_input) {
179  reset_my_port(my_input);
180  }
181 
182  template<typename InputTuple, typename KeyFuncTuple>
183  static inline void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs) {
184  tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
185  tbb::flow::get<0>(my_key_funcs) = NULL;
186  }
187 
188  template< typename KeyFuncTuple>
189  static inline void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs) {
190  if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
191  tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
192  }
193  }
194  template<typename InputTuple>
195  static inline void reset_inputs(InputTuple &my_input, reset_flags f) {
196  tbb::flow::get<0>(my_input).reset_receiver(f);
197  }
198 
199 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
200  template<typename InputTuple>
201  static inline void extract_inputs(InputTuple &my_input) {
202  tbb::flow::get<0>(my_input).extract_receiver();
203  }
204 #endif
205  }; // join_helper<1>
206 
208  template< typename T >
209  class reserving_port : public receiver<T> {
210  public:
211  typedef T input_type;
212  typedef typename receiver<input_type>::predecessor_type predecessor_type;
213 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
214  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
215  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
216 #endif
217  private:
218  // ----------- Aggregator ------------
219  enum op_type { reg_pred, rem_pred, res_item, rel_res, con_res
220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
221  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
222 #endif
223  };
225 
226  class reserving_port_operation : public aggregated_operation<reserving_port_operation> {
227  public:
228  char type;
229  union {
230  T *my_arg;
231  predecessor_type *my_pred;
232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
233  size_t cnt_val;
234  predecessor_list_type *plist;
235 #endif
236  };
238  type(char(t)), my_arg(const_cast<T*>(&e)) {}
239  reserving_port_operation(const predecessor_type &s, op_type t) : type(char(t)),
240  my_pred(const_cast<predecessor_type *>(&s)) {}
241  reserving_port_operation(op_type t) : type(char(t)) {}
242  };
243 
244  typedef internal::aggregating_functor<class_type, reserving_port_operation> handler_type;
245  friend class internal::aggregating_functor<class_type, reserving_port_operation>;
246  aggregator<handler_type, reserving_port_operation> my_aggregator;
247 
249  reserving_port_operation *current;
250  bool no_predecessors;
251  while(op_list) {
252  current = op_list;
253  op_list = op_list->next;
254  switch(current->type) {
255  case reg_pred:
256  no_predecessors = my_predecessors.empty();
257  my_predecessors.add(*(current->my_pred));
258  if ( no_predecessors ) {
259  (void) my_join->decrement_port_count(true); // may try to forward
260  }
261  __TBB_store_with_release(current->status, SUCCEEDED);
262  break;
263  case rem_pred:
264  my_predecessors.remove(*(current->my_pred));
265  if(my_predecessors.empty()) my_join->increment_port_count();
266  __TBB_store_with_release(current->status, SUCCEEDED);
267  break;
268  case res_item:
269  if ( reserved ) {
270  __TBB_store_with_release(current->status, FAILED);
271  }
272  else if ( my_predecessors.try_reserve( *(current->my_arg) ) ) {
273  reserved = true;
274  __TBB_store_with_release(current->status, SUCCEEDED);
275  } else {
276  if ( my_predecessors.empty() ) {
277  my_join->increment_port_count();
278  }
279  __TBB_store_with_release(current->status, FAILED);
280  }
281  break;
282  case rel_res:
283  reserved = false;
284  my_predecessors.try_release( );
285  __TBB_store_with_release(current->status, SUCCEEDED);
286  break;
287  case con_res:
288  reserved = false;
289  my_predecessors.try_consume( );
290  __TBB_store_with_release(current->status, SUCCEEDED);
291  break;
292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
293  case add_blt_pred:
294  my_predecessors.internal_add_built_predecessor(*(current->my_pred));
295  __TBB_store_with_release(current->status, SUCCEEDED);
296  break;
297  case del_blt_pred:
298  my_predecessors.internal_delete_built_predecessor(*(current->my_pred));
299  __TBB_store_with_release(current->status, SUCCEEDED);
300  break;
301  case blt_pred_cnt:
302  current->cnt_val = my_predecessors.predecessor_count();
303  __TBB_store_with_release(current->status, SUCCEEDED);
304  break;
305  case blt_pred_cpy:
306  my_predecessors.copy_predecessors(*(current->plist));
307  __TBB_store_with_release(current->status, SUCCEEDED);
308  break;
309 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
310  }
311  }
312  }
313 
314  protected:
315  template< typename R, typename B > friend class run_and_put_task;
316  template<typename X, typename Y> friend class internal::broadcast_cache;
317  template<typename X, typename Y> friend class internal::round_robin_cache;
319  return NULL;
320  }
321 
323  return my_join->graph_ref;
324  }
325 
326  public:
327 
329  reserving_port() : reserved(false) {
330  my_join = NULL;
331  my_predecessors.set_owner( this );
332  my_aggregator.initialize_handler(handler_type(this));
333  }
334 
335  // copy constructor
336  reserving_port(const reserving_port& /* other */) : receiver<T>() {
337  reserved = false;
338  my_join = NULL;
339  my_predecessors.set_owner( this );
340  my_aggregator.initialize_handler(handler_type(this));
341  }
342 
344  my_join = join;
345  }
346 
348  bool register_predecessor( predecessor_type &src ) __TBB_override {
349  reserving_port_operation op_data(src, reg_pred);
350  my_aggregator.execute(&op_data);
351  return op_data.status == SUCCEEDED;
352  }
353 
355  bool remove_predecessor( predecessor_type &src ) __TBB_override {
356  reserving_port_operation op_data(src, rem_pred);
357  my_aggregator.execute(&op_data);
358  return op_data.status == SUCCEEDED;
359  }
360 
362  bool reserve( T &v ) {
363  reserving_port_operation op_data(v, res_item);
364  my_aggregator.execute(&op_data);
365  return op_data.status == SUCCEEDED;
366  }
367 
369  void release( ) {
370  reserving_port_operation op_data(rel_res);
371  my_aggregator.execute(&op_data);
372  }
373 
375  void consume( ) {
376  reserving_port_operation op_data(con_res);
377  my_aggregator.execute(&op_data);
378  }
379 
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381  built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
382  void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
383  reserving_port_operation op_data(src, add_blt_pred);
384  my_aggregator.execute(&op_data);
385  }
386 
387  void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
388  reserving_port_operation op_data(src, del_blt_pred);
389  my_aggregator.execute(&op_data);
390  }
391 
392  size_t predecessor_count() __TBB_override {
393  reserving_port_operation op_data(blt_pred_cnt);
394  my_aggregator.execute(&op_data);
395  return op_data.cnt_val;
396  }
397 
398  void copy_predecessors(predecessor_list_type &l) __TBB_override {
399  reserving_port_operation op_data(blt_pred_cpy);
400  op_data.plist = &l;
401  my_aggregator.execute(&op_data);
402  }
403 
404  void extract_receiver() {
405  my_predecessors.built_predecessors().receiver_extract(*this);
406  }
407 
408 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
409 
411  if(f & rf_clear_edges) my_predecessors.clear();
412  else
413  my_predecessors.reset();
414  reserved = false;
415  __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(), "port edges not removed");
416  }
417 
418  private:
421  bool reserved;
422  }; // reserving_port
423 
425  template<typename T>
426  class queueing_port : public receiver<T>, public item_buffer<T> {
427  public:
428  typedef T input_type;
429  typedef typename receiver<input_type>::predecessor_type predecessor_type;
431 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
432  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
433  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
434 #endif
435 
436  // ----------- Aggregator ------------
437  private:
438  enum op_type { get__item, res_port, try__put_task
439 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
440  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
441 #endif
442  };
443 
444  class queueing_port_operation : public aggregated_operation<queueing_port_operation> {
445  public:
446  char type;
448  T *my_arg;
449 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
450  predecessor_type *pred;
451  size_t cnt_val;
452  predecessor_list_type *plist;
453 #endif
455  // constructor for value parameter
457  type(char(t)), my_val(e)
458  , bypass_t(NULL)
459  {}
460  // constructor for pointer parameter
462  type(char(t)), my_arg(const_cast<T*>(p))
463  , bypass_t(NULL)
464  {}
465  // constructor with no parameter
466  queueing_port_operation(op_type t) : type(char(t))
467  , bypass_t(NULL)
468  {}
469  };
470 
471  typedef internal::aggregating_functor<class_type, queueing_port_operation> handler_type;
472  friend class internal::aggregating_functor<class_type, queueing_port_operation>;
473  aggregator<handler_type, queueing_port_operation> my_aggregator;
474 
476  queueing_port_operation *current;
477  bool was_empty;
478  while(op_list) {
479  current = op_list;
480  op_list = op_list->next;
481  switch(current->type) {
482  case try__put_task: {
483  task *rtask = NULL;
484  was_empty = this->buffer_empty();
485  this->push_back(current->my_val);
486  if (was_empty) rtask = my_join->decrement_port_count(false);
487  else
488  rtask = SUCCESSFULLY_ENQUEUED;
489  current->bypass_t = rtask;
490  __TBB_store_with_release(current->status, SUCCEEDED);
491  }
492  break;
493  case get__item:
494  if(!this->buffer_empty()) {
495  *(current->my_arg) = this->front();
496  __TBB_store_with_release(current->status, SUCCEEDED);
497  }
498  else {
499  __TBB_store_with_release(current->status, FAILED);
500  }
501  break;
502  case res_port:
503  __TBB_ASSERT(this->my_item_valid(this->my_head), "No item to reset");
504  this->destroy_front();
505  if(this->my_item_valid(this->my_head)) {
506  (void)my_join->decrement_port_count(true);
507  }
508  __TBB_store_with_release(current->status, SUCCEEDED);
509  break;
510 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
511  case add_blt_pred:
512  my_built_predecessors.add_edge(*(current->pred));
513  __TBB_store_with_release(current->status, SUCCEEDED);
514  break;
515  case del_blt_pred:
516  my_built_predecessors.delete_edge(*(current->pred));
517  __TBB_store_with_release(current->status, SUCCEEDED);
518  break;
519  case blt_pred_cnt:
520  current->cnt_val = my_built_predecessors.edge_count();
521  __TBB_store_with_release(current->status, SUCCEEDED);
522  break;
523  case blt_pred_cpy:
524  my_built_predecessors.copy_edges(*(current->plist));
525  __TBB_store_with_release(current->status, SUCCEEDED);
526  break;
527 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
528  }
529  }
530  }
531  // ------------ End Aggregator ---------------
532 
533  protected:
534  template< typename R, typename B > friend class run_and_put_task;
535  template<typename X, typename Y> friend class internal::broadcast_cache;
536  template<typename X, typename Y> friend class internal::round_robin_cache;
538  queueing_port_operation op_data(v, try__put_task);
539  my_aggregator.execute(&op_data);
540  __TBB_ASSERT(op_data.status == SUCCEEDED || !op_data.bypass_t, "inconsistent return from aggregator");
541  if(!op_data.bypass_t) return SUCCESSFULLY_ENQUEUED;
542  return op_data.bypass_t;
543  }
544 
546  return my_join->graph_ref;
547  }
548 
549  public:
550 
553  my_join = NULL;
554  my_aggregator.initialize_handler(handler_type(this));
555  }
556 
558  queueing_port(const queueing_port& /* other */) : receiver<T>(), item_buffer<T>() {
559  my_join = NULL;
560  my_aggregator.initialize_handler(handler_type(this));
561  }
562 
565  my_join = join;
566  }
567 
568  bool get_item( T &v ) {
569  queueing_port_operation op_data(&v, get__item);
570  my_aggregator.execute(&op_data);
571  return op_data.status == SUCCEEDED;
572  }
573 
574  // reset_port is called when item is accepted by successor, but
575  // is initiated by join_node.
576  void reset_port() {
577  queueing_port_operation op_data(res_port);
578  my_aggregator.execute(&op_data);
579  return;
580  }
581 
582 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
583  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
584 
585  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
586  queueing_port_operation op_data(add_blt_pred);
587  op_data.pred = &p;
588  my_aggregator.execute(&op_data);
589  }
590 
591  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
592  queueing_port_operation op_data(del_blt_pred);
593  op_data.pred = &p;
594  my_aggregator.execute(&op_data);
595  }
596 
597  size_t predecessor_count() __TBB_override {
598  queueing_port_operation op_data(blt_pred_cnt);
599  my_aggregator.execute(&op_data);
600  return op_data.cnt_val;
601  }
602 
603  void copy_predecessors(predecessor_list_type &l) __TBB_override {
604  queueing_port_operation op_data(blt_pred_cpy);
605  op_data.plist = &l;
606  my_aggregator.execute(&op_data);
607  }
608 
609  void extract_receiver() {
611  my_built_predecessors.receiver_extract(*this);
612  }
613 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
614 
618 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
619  if (f & rf_clear_edges)
620  my_built_predecessors.clear();
621 #endif
622  }
623 
624  private:
626 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
627  edge_container<predecessor_type> my_built_predecessors;
628 #endif
629  }; // queueing_port
630 
632 
633  template<typename K>
634  struct count_element {
636  size_t my_value;
637  };
638 
639  // method to access the key in the counting table
640  // the ref has already been removed from K
641  template< typename K >
644  const K& operator()(const table_item_type& v) { return v.my_key; }
645  };
646 
647  // the ports can have only one template parameter. We wrap the types needed in
648  // a traits type
649  template< class TraitsType >
651  public receiver<typename TraitsType::T>,
652  public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
653  typename TraitsType::KHash > {
654  public:
655  typedef TraitsType traits;
657  typedef typename TraitsType::T input_type;
658  typedef typename TraitsType::K key_type;
660  typedef typename receiver<input_type>::predecessor_type predecessor_type;
661  typedef typename TraitsType::TtoK type_to_key_func_type;
662  typedef typename TraitsType::KHash hash_compare_type;
664 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
665  typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
666  typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
667 #endif
668  private:
669 // ----------- Aggregator ------------
670  private:
671  enum op_type { try__put, get__item, res_port
672 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
673  , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
674 #endif
675  };
676 
677  class key_matching_port_operation : public aggregated_operation<key_matching_port_operation> {
678  public:
679  char type;
680  input_type my_val;
681  input_type *my_arg;
682 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
683  predecessor_type *pred;
684  size_t cnt_val;
685  predecessor_list_type *plist;
686 #endif
687  // constructor for value parameter
688  key_matching_port_operation(const input_type& e, op_type t) :
689  type(char(t)), my_val(e) {}
690  // constructor for pointer parameter
691  key_matching_port_operation(const input_type* p, op_type t) :
692  type(char(t)), my_arg(const_cast<input_type*>(p)) {}
693  // constructor with no parameter
694  key_matching_port_operation(op_type t) : type(char(t)) {}
695  };
696 
697  typedef internal::aggregating_functor<class_type, key_matching_port_operation> handler_type;
698  friend class internal::aggregating_functor<class_type, key_matching_port_operation>;
699  aggregator<handler_type, key_matching_port_operation> my_aggregator;
700 
703  while(op_list) {
704  current = op_list;
705  op_list = op_list->next;
706  switch(current->type) {
707  case try__put: {
708  bool was_inserted = this->insert_with_key(current->my_val);
709  // return failure if a duplicate insertion occurs
710  __TBB_store_with_release(current->status, was_inserted ? SUCCEEDED : FAILED);
711  }
712  break;
713  case get__item:
714  // use current_key from FE for item
715  if(!this->find_with_key(my_join->current_key, *(current->my_arg))) {
716  __TBB_ASSERT(false, "Failed to find item corresponding to current_key.");
717  }
718  __TBB_store_with_release(current->status, SUCCEEDED);
719  break;
720  case res_port:
721  // use current_key from FE for item
722  this->delete_with_key(my_join->current_key);
723  __TBB_store_with_release(current->status, SUCCEEDED);
724  break;
725 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
726  case add_blt_pred:
727  my_built_predecessors.add_edge(*(current->pred));
728  __TBB_store_with_release(current->status, SUCCEEDED);
729  break;
730  case del_blt_pred:
731  my_built_predecessors.delete_edge(*(current->pred));
732  __TBB_store_with_release(current->status, SUCCEEDED);
733  break;
734  case blt_pred_cnt:
735  current->cnt_val = my_built_predecessors.edge_count();
736  __TBB_store_with_release(current->status, SUCCEEDED);
737  break;
738  case blt_pred_cpy:
739  my_built_predecessors.copy_edges(*(current->plist));
740  __TBB_store_with_release(current->status, SUCCEEDED);
741  break;
742 #endif
743  }
744  }
745  }
746 // ------------ End Aggregator ---------------
747  protected:
748  template< typename R, typename B > friend class run_and_put_task;
749  template<typename X, typename Y> friend class internal::broadcast_cache;
750  template<typename X, typename Y> friend class internal::round_robin_cache;
751  task *try_put_task(const input_type& v) __TBB_override {
752  key_matching_port_operation op_data(v, try__put);
753  task *rtask = NULL;
754  my_aggregator.execute(&op_data);
755  if(op_data.status == SUCCEEDED) {
756  rtask = my_join->increment_key_count((*(this->get_key_func()))(v), false); // may spawn
757  // rtask has to reflect the return status of the try_put
758  if(!rtask) rtask = SUCCESSFULLY_ENQUEUED;
759  }
760  return rtask;
761  }
762 
764  return my_join->graph_ref;
765  }
766 
767  public:
768 
769  key_matching_port() : receiver<input_type>(), buffer_type() {
770  my_join = NULL;
771  my_aggregator.initialize_handler(handler_type(this));
772  }
773 
774  // copy constructor
775  key_matching_port(const key_matching_port& /*other*/) : receiver<input_type>(), buffer_type() {
776  my_join = NULL;
777  my_aggregator.initialize_handler(handler_type(this));
778  }
779 
781 
783  my_join = dynamic_cast<matching_forwarding_base<key_type>*>(join);
784  }
785 
786  void set_my_key_func(type_to_key_func_type *f) { this->set_key_func(f); }
787 
788  type_to_key_func_type* get_my_key_func() { return this->get_key_func(); }
789 
790  bool get_item( input_type &v ) {
791  // aggregator uses current_key from FE for Key
792  key_matching_port_operation op_data(&v, get__item);
793  my_aggregator.execute(&op_data);
794  return op_data.status == SUCCEEDED;
795  }
796 
797 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
798  built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
799 
800  void internal_add_built_predecessor(predecessor_type &p) __TBB_override {
801  key_matching_port_operation op_data(add_blt_pred);
802  op_data.pred = &p;
803  my_aggregator.execute(&op_data);
804  }
805 
806  void internal_delete_built_predecessor(predecessor_type &p) __TBB_override {
807  key_matching_port_operation op_data(del_blt_pred);
808  op_data.pred = &p;
809  my_aggregator.execute(&op_data);
810  }
811 
812  size_t predecessor_count() __TBB_override {
813  key_matching_port_operation op_data(blt_pred_cnt);
814  my_aggregator.execute(&op_data);
815  return op_data.cnt_val;
816  }
817 
818  void copy_predecessors(predecessor_list_type &l) __TBB_override {
819  key_matching_port_operation op_data(blt_pred_cpy);
820  op_data.plist = &l;
821  my_aggregator.execute(&op_data);
822  }
823 #endif
824 
825  // reset_port is called when item is accepted by successor, but
826  // is initiated by join_node.
827  void reset_port() {
828  key_matching_port_operation op_data(res_port);
829  my_aggregator.execute(&op_data);
830  return;
831  }
832 
833 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
834  void extract_receiver() {
835  buffer_type::reset();
836  my_built_predecessors.receiver_extract(*this);
837  }
838 #endif
841  buffer_type::reset();
842 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
843  if (f & rf_clear_edges)
844  my_built_predecessors.clear();
845 #endif
846  }
847 
848  private:
849  // my_join forwarding base used to count number of inputs that
850  // received key.
852 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
853  edge_container<predecessor_type> my_built_predecessors;
854 #endif
855  }; // key_matching_port
856 
857  using namespace graph_policy_namespace;
858 
859  template<typename JP, typename InputTuple, typename OutputTuple>
861 
863  template<typename JP, typename InputTuple, typename OutputTuple>
865 
866  template<typename InputTuple, typename OutputTuple>
867  class join_node_FE<reserving, InputTuple, OutputTuple> : public forwarding_base {
868  public:
870  typedef OutputTuple output_type;
871  typedef InputTuple input_type;
873 
874  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
875  ports_with_no_inputs = N;
876  join_helper<N>::set_join_node_pointer(my_inputs, this);
877  }
878 
879  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
880  ports_with_no_inputs = N;
881  join_helper<N>::set_join_node_pointer(my_inputs, this);
882  }
883 
884  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
885 
887  ++ports_with_no_inputs;
888  }
889 
890  // if all input_ports have predecessors, spawn forward to try and consume tuples
892  if(ports_with_no_inputs.fetch_and_decrement() == 1) {
894  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
896  if(!handle_task) return rtask;
898  }
899  }
900  return NULL;
901  }
902 
903  input_type &input_ports() { return my_inputs; }
904 
905  protected:
906 
907  void reset( reset_flags f) {
908  // called outside of parallel contexts
909  ports_with_no_inputs = N;
910  join_helper<N>::reset_inputs(my_inputs, f);
911  }
912 
913 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
914  void extract( ) {
915  // called outside of parallel contexts
916  ports_with_no_inputs = N;
918  }
919 #endif
920 
921  // all methods on input ports should be called under mutual exclusion from join_node_base.
922 
924  return !ports_with_no_inputs;
925  }
926 
927  bool try_to_make_tuple(output_type &out) {
928  if(ports_with_no_inputs) return false;
929  return join_helper<N>::reserve(my_inputs, out);
930  }
931 
932  void tuple_accepted() {
934  }
935  void tuple_rejected() {
937  }
938 
939  input_type my_inputs;
940  base_node_type *my_node;
941  atomic<size_t> ports_with_no_inputs;
942  }; // join_node_FE<reserving, ... >
943 
944  template<typename InputTuple, typename OutputTuple>
945  class join_node_FE<queueing, InputTuple, OutputTuple> : public forwarding_base {
946  public:
948  typedef OutputTuple output_type;
949  typedef InputTuple input_type;
951 
952  join_node_FE(graph &g) : forwarding_base(g), my_node(NULL) {
953  ports_with_no_items = N;
954  join_helper<N>::set_join_node_pointer(my_inputs, this);
955  }
956 
957  join_node_FE(const join_node_FE& other) : forwarding_base((other.forwarding_base::graph_ref)), my_node(NULL) {
958  ports_with_no_items = N;
959  join_helper<N>::set_join_node_pointer(my_inputs, this);
960  }
961 
962  // needed for forwarding
963  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
964 
966  ports_with_no_items = N;
967  }
968 
969  // if all input_ports have items, spawn forward to try and consume tuples
971  {
972  if(ports_with_no_items.fetch_and_decrement() == 1) {
974  task *rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
976  if(!handle_task) return rtask;
978  }
979  }
980  return NULL;
981  }
982 
983  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
984 
985  input_type &input_ports() { return my_inputs; }
986 
987  protected:
988 
989  void reset( reset_flags f) {
990  reset_port_count();
991  join_helper<N>::reset_inputs(my_inputs, f );
992  }
993 
994 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
995  void extract() {
996  reset_port_count();
998  }
999 #endif
1000  // all methods on input ports should be called under mutual exclusion from join_node_base.
1001 
1003  return !ports_with_no_items;
1004  }
1005 
1006  bool try_to_make_tuple(output_type &out) {
1007  if(ports_with_no_items) return false;
1008  return join_helper<N>::get_items(my_inputs, out);
1009  }
1010 
1012  reset_port_count();
1013  join_helper<N>::reset_ports(my_inputs);
1014  }
1016  // nothing to do.
1017  }
1018 
1019  input_type my_inputs;
1020  base_node_type *my_node;
1021  atomic<size_t> ports_with_no_items;
1022  }; // join_node_FE<queueing, ...>
1023 
1024  // key_matching join front-end.
1025  template<typename InputTuple, typename OutputTuple, typename K, typename KHash>
1026  class join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple> : public matching_forwarding_base<K>,
1027  // buffer of key value counts
1028  public hash_buffer< // typedefed below to key_to_count_buffer_type
1029  typename tbb::internal::strip<K>::type&, // force ref type on K
1030  count_element<typename tbb::internal::strip<K>::type>,
1031  internal::type_to_key_function_body<
1032  count_element<typename tbb::internal::strip<K>::type>,
1033  typename tbb::internal::strip<K>::type& >,
1034  KHash >,
1035  // buffer of output items
1036  public item_buffer<OutputTuple> {
1037  public:
1039  typedef OutputTuple output_type;
1040  typedef InputTuple input_type;
1041  typedef K key_type;
1043  typedef KHash key_hash_compare;
1044  // must use K without ref.
1046  // method that lets us refer to the key of this type.
1050  // this is the type of the special table that keeps track of the number of discrete
1051  // elements corresponding to each key that we've seen.
1055  typedef join_node_base<key_matching<key_type,key_hash_compare>, InputTuple, OutputTuple> base_node_type; // for forwarding
1057 
1058 // ----------- Aggregator ------------
1059  // the aggregator is only needed to serialize the access to the hash table.
1060  // and the output_buffer_type base class
1061  private:
1062  enum op_type { res_count, inc_count, may_succeed, try_make };
1064 
1065  class key_matching_FE_operation : public aggregated_operation<key_matching_FE_operation> {
1066  public:
1067  char type;
1068  unref_key_type my_val;
1069  output_type* my_output;
1072  // constructor for value parameter
1073  key_matching_FE_operation(const unref_key_type& e , bool q_task , op_type t) : type(char(t)), my_val(e),
1074  my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1075  key_matching_FE_operation(output_type *p, op_type t) : type(char(t)), my_output(p), bypass_t(NULL),
1076  enqueue_task(true) {}
1077  // constructor with no parameter
1078  key_matching_FE_operation(op_type t) : type(char(t)), my_output(NULL), bypass_t(NULL), enqueue_task(true) {}
1079  };
1080 
1081  typedef internal::aggregating_functor<class_type, key_matching_FE_operation> handler_type;
1082  friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1083  aggregator<handler_type, key_matching_FE_operation> my_aggregator;
1084 
1085  // called from aggregator, so serialized
1086  // returns a task pointer if the a task would have been enqueued but we asked that
1087  // it be returned. Otherwise returns NULL.
1088  task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task) {
1089  output_type l_out;
1090  task *rtask = NULL;
1091  bool do_fwd = should_enqueue && this->buffer_empty() && internal::is_graph_active(this->graph_ref);
1092  this->current_key = t;
1093  this->delete_with_key(this->current_key); // remove the key
1094  if(join_helper<N>::get_items(my_inputs, l_out)) { // <== call back
1095  this->push_back(l_out);
1096  if(do_fwd) { // we enqueue if receiving an item from predecessor, not if successor asks for item
1097  rtask = new ( task::allocate_additional_child_of( *(this->graph_ref.root_task()) ) )
1099  if(handle_task) {
1101  rtask = NULL;
1102  }
1103  do_fwd = false;
1104  }
1105  // retire the input values
1106  join_helper<N>::reset_ports(my_inputs); // <== call back
1107  }
1108  else {
1109  __TBB_ASSERT(false, "should have had something to push");
1110  }
1111  return rtask;
1112  }
1113 
1114  void handle_operations(key_matching_FE_operation* op_list) {
1115  key_matching_FE_operation *current;
1116  while(op_list) {
1117  current = op_list;
1118  op_list = op_list->next;
1119  switch(current->type) {
1120  case res_count: // called from BE
1121  {
1122  this->destroy_front();
1123  __TBB_store_with_release(current->status, SUCCEEDED);
1124  }
1125  break;
1126  case inc_count: { // called from input ports
1127  count_element_type *p = 0;
1128  unref_key_type &t = current->my_val;
1129  bool do_enqueue = current->enqueue_task;
1130  if(!(this->find_ref_with_key(t,p))) {
1131  count_element_type ev;
1132  ev.my_key = t;
1133  ev.my_value = 0;
1134  this->insert_with_key(ev);
1135  if(!(this->find_ref_with_key(t,p))) {
1136  __TBB_ASSERT(false, "should find key after inserting it");
1137  }
1138  }
1139  if(++(p->my_value) == size_t(N)) {
1140  task *rtask = fill_output_buffer(t, true, do_enqueue);
1141  __TBB_ASSERT(!rtask || !do_enqueue, "task should not be returned");
1142  current->bypass_t = rtask;
1143  }
1144  }
1145  __TBB_store_with_release(current->status, SUCCEEDED);
1146  break;
1147  case may_succeed: // called from BE
1148  __TBB_store_with_release(current->status, this->buffer_empty() ? FAILED : SUCCEEDED);
1149  break;
1150  case try_make: // called from BE
1151  if(this->buffer_empty()) {
1152  __TBB_store_with_release(current->status, FAILED);
1153  }
1154  else {
1155  *(current->my_output) = this->front();
1156  __TBB_store_with_release(current->status, SUCCEEDED);
1157  }
1158  break;
1159  }
1160  }
1161  }
1162 // ------------ End Aggregator ---------------
1163 
1164  public:
1165  template<typename FunctionTuple>
1166  join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1167  join_helper<N>::set_join_node_pointer(my_inputs, this);
1168  join_helper<N>::set_key_functors(my_inputs, TtoK_funcs);
1169  my_aggregator.initialize_handler(handler_type(this));
1170  TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1171  this->set_key_func(cfb);
1172  }
1173 
1174  join_node_FE(const join_node_FE& other) : forwarding_base_type((other.forwarding_base_type::graph_ref)), key_to_count_buffer_type(),
1175  output_buffer_type() {
1176  my_node = NULL;
1177  join_helper<N>::set_join_node_pointer(my_inputs, this);
1178  join_helper<N>::copy_key_functors(my_inputs, const_cast<input_type &>(other.my_inputs));
1179  my_aggregator.initialize_handler(handler_type(this));
1180  TtoK_function_body_type *cfb = new TtoK_function_body_leaf_type(key_to_count_func());
1181  this->set_key_func(cfb);
1182  }
1183 
1184  // needed for forwarding
1185  void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1186 
1187  void reset_port_count() { // called from BE
1188  key_matching_FE_operation op_data(res_count);
1189  my_aggregator.execute(&op_data);
1190  return;
1191  }
1192 
1193  // if all input_ports have items, spawn forward to try and consume tuples
1194  // return a task if we are asked and did create one.
1195  task *increment_key_count(unref_key_type const & t, bool handle_task) __TBB_override { // called from input_ports
1196  key_matching_FE_operation op_data(t, handle_task, inc_count);
1197  my_aggregator.execute(&op_data);
1198  return op_data.bypass_t;
1199  }
1200 
1201  task *decrement_port_count(bool /*handle_task*/) __TBB_override { __TBB_ASSERT(false, NULL); return NULL; }
1202 
1203  void increment_port_count() __TBB_override { __TBB_ASSERT(false, NULL); } // should never be called
1204 
1205  input_type &input_ports() { return my_inputs; }
1206 
1207  protected:
1208 
1209  void reset( reset_flags f ) {
1210  // called outside of parallel contexts
1211  join_helper<N>::reset_inputs(my_inputs, f);
1212 
1213  key_to_count_buffer_type::reset();
1214  output_buffer_type::reset();
1215  }
1216 
1217 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1218  void extract() {
1219  // called outside of parallel contexts
1220  join_helper<N>::extract_inputs(my_inputs);
1221  key_to_count_buffer_type::reset(); // have to reset the tag counts
1222  output_buffer_type::reset(); // also the queue of outputs
1223  // my_node->current_tag = NO_TAG;
1224  }
1225 #endif
1226  // all methods on input ports should be called under mutual exclusion from join_node_base.
1227 
1228  bool tuple_build_may_succeed() { // called from back-end
1229  key_matching_FE_operation op_data(may_succeed);
1230  my_aggregator.execute(&op_data);
1231  return op_data.status == SUCCEEDED;
1232  }
1233 
1234  // cannot lock while calling back to input_ports. current_key will only be set
1235  // and reset under the aggregator, so it will remain consistent.
1236  bool try_to_make_tuple(output_type &out) {
1237  key_matching_FE_operation op_data(&out,try_make);
1238  my_aggregator.execute(&op_data);
1239  return op_data.status == SUCCEEDED;
1240  }
1241 
1243  reset_port_count(); // reset current_key after ports reset.
1244  }
1245 
1247  // nothing to do.
1248  }
1249 
1250  input_type my_inputs; // input ports
1251  base_node_type *my_node;
1252  }; // join_node_FE<key_matching<K,KHash>, InputTuple, OutputTuple>
1253 
1255  template<typename JP, typename InputTuple, typename OutputTuple>
1256  class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
1257  public sender<OutputTuple> {
1258  protected:
1259  using graph_node::my_graph;
1260  public:
1261  typedef OutputTuple output_type;
1262 
1263  typedef typename sender<output_type>::successor_type successor_type;
1265  using input_ports_type::tuple_build_may_succeed;
1266  using input_ports_type::try_to_make_tuple;
1267  using input_ports_type::tuple_accepted;
1268  using input_ports_type::tuple_rejected;
1269 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1270  typedef typename sender<output_type>::built_successors_type built_successors_type;
1271  typedef typename sender<output_type>::successor_list_type successor_list_type;
1272 #endif
1273 
1274  private:
1275  // ----------- Aggregator ------------
1276  enum op_type { reg_succ, rem_succ, try__get, do_fwrd, do_fwrd_bypass
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1278  , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1279 #endif
1280  };
1282 
1283  class join_node_base_operation : public aggregated_operation<join_node_base_operation> {
1284  public:
1285  char type;
1286  union {
1287  output_type *my_arg;
1288  successor_type *my_succ;
1289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1290  size_t cnt_val;
1291  successor_list_type *slist;
1292 #endif
1293  };
1295  join_node_base_operation(const output_type& e, op_type t) : type(char(t)),
1296  my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1297  join_node_base_operation(const successor_type &s, op_type t) : type(char(t)),
1298  my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1299  join_node_base_operation(op_type t) : type(char(t)), bypass_t(NULL) {}
1300  };
1301 
1302  typedef internal::aggregating_functor<class_type, join_node_base_operation> handler_type;
1303  friend class internal::aggregating_functor<class_type, join_node_base_operation>;
1305  aggregator<handler_type, join_node_base_operation> my_aggregator;
1306 
1308  join_node_base_operation *current;
1309  while(op_list) {
1310  current = op_list;
1311  op_list = op_list->next;
1312  switch(current->type) {
1313  case reg_succ: {
1314  my_successors.register_successor(*(current->my_succ));
1315  if(tuple_build_may_succeed() && !forwarder_busy && internal::is_graph_active(my_graph)) {
1316  task *rtask = new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1319  internal::spawn_in_graph_arena(my_graph, *rtask);
1320  forwarder_busy = true;
1321  }
1322  __TBB_store_with_release(current->status, SUCCEEDED);
1323  }
1324  break;
1325  case rem_succ:
1326  my_successors.remove_successor(*(current->my_succ));
1327  __TBB_store_with_release(current->status, SUCCEEDED);
1328  break;
1329  case try__get:
1330  if(tuple_build_may_succeed()) {
1331  if(try_to_make_tuple(*(current->my_arg))) {
1332  tuple_accepted();
1333  __TBB_store_with_release(current->status, SUCCEEDED);
1334  }
1335  else __TBB_store_with_release(current->status, FAILED);
1336  }
1337  else __TBB_store_with_release(current->status, FAILED);
1338  break;
1339  case do_fwrd_bypass: {
1340  bool build_succeeded;
1341  task *last_task = NULL;
1342  output_type out;
1343  if(tuple_build_may_succeed()) { // checks output queue of FE
1344  do {
1345  build_succeeded = try_to_make_tuple(out); // fetch front_end of queue
1346  if(build_succeeded) {
1347  task *new_task = my_successors.try_put_task(out);
1348  last_task = combine_tasks(my_graph, last_task, new_task);
1349  if(new_task) {
1350  tuple_accepted();
1351  }
1352  else {
1353  tuple_rejected();
1354  build_succeeded = false;
1355  }
1356  }
1357  } while(build_succeeded);
1358  }
1359  current->bypass_t = last_task;
1360  __TBB_store_with_release(current->status, SUCCEEDED);
1361  forwarder_busy = false;
1362  }
1363  break;
1364 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1365  case add_blt_succ:
1366  my_successors.internal_add_built_successor(*(current->my_succ));
1367  __TBB_store_with_release(current->status, SUCCEEDED);
1368  break;
1369  case del_blt_succ:
1370  my_successors.internal_delete_built_successor(*(current->my_succ));
1371  __TBB_store_with_release(current->status, SUCCEEDED);
1372  break;
1373  case blt_succ_cnt:
1374  current->cnt_val = my_successors.successor_count();
1375  __TBB_store_with_release(current->status, SUCCEEDED);
1376  break;
1377  case blt_succ_cpy:
1378  my_successors.copy_successors(*(current->slist));
1379  __TBB_store_with_release(current->status, SUCCEEDED);
1380  break;
1381 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1382  }
1383  }
1384  }
1385  // ---------- end aggregator -----------
1386  public:
1387  join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1388  my_successors.set_owner(this);
1389  input_ports_type::set_my_node(this);
1390  my_aggregator.initialize_handler(handler_type(this));
1391  }
1392 
1394  graph_node(other.graph_node::my_graph), input_ports_type(other),
1395  sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1396  my_successors.set_owner(this);
1397  input_ports_type::set_my_node(this);
1398  my_aggregator.initialize_handler(handler_type(this));
1399  }
1400 
1401  template<typename FunctionTuple>
1402  join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1403  my_successors.set_owner(this);
1404  input_ports_type::set_my_node(this);
1405  my_aggregator.initialize_handler(handler_type(this));
1406  }
1407 
1408  bool register_successor(successor_type &r) __TBB_override {
1409  join_node_base_operation op_data(r, reg_succ);
1410  my_aggregator.execute(&op_data);
1411  return op_data.status == SUCCEEDED;
1412  }
1413 
1414  bool remove_successor( successor_type &r) __TBB_override {
1415  join_node_base_operation op_data(r, rem_succ);
1416  my_aggregator.execute(&op_data);
1417  return op_data.status == SUCCEEDED;
1418  }
1419 
1420  bool try_get( output_type &v) __TBB_override {
1421  join_node_base_operation op_data(v, try__get);
1422  my_aggregator.execute(&op_data);
1423  return op_data.status == SUCCEEDED;
1424  }
1425 
1426 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1427  built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
1428 
1429  void internal_add_built_successor( successor_type &r) __TBB_override {
1430  join_node_base_operation op_data(r, add_blt_succ);
1431  my_aggregator.execute(&op_data);
1432  }
1433 
1434  void internal_delete_built_successor( successor_type &r) __TBB_override {
1435  join_node_base_operation op_data(r, del_blt_succ);
1436  my_aggregator.execute(&op_data);
1437  }
1438 
1439  size_t successor_count() __TBB_override {
1440  join_node_base_operation op_data(blt_succ_cnt);
1441  my_aggregator.execute(&op_data);
1442  return op_data.cnt_val;
1443  }
1444 
1445  void copy_successors(successor_list_type &l) __TBB_override {
1446  join_node_base_operation op_data(blt_succ_cpy);
1447  op_data.slist = &l;
1448  my_aggregator.execute(&op_data);
1449  }
1450 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
1451 
1452 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1453  void extract() __TBB_override {
1454  input_ports_type::extract();
1455  my_successors.built_successors().sender_extract(*this);
1456  }
1457 #endif
1458 
1459  protected:
1460 
1462  input_ports_type::reset(f);
1463  if(f & rf_clear_edges) my_successors.clear();
1464  }
1465 
1466  private:
1468 
1469  friend class forward_task_bypass< join_node_base<JP, InputTuple, OutputTuple> >;
1471  join_node_base_operation op_data(do_fwrd_bypass);
1472  my_aggregator.execute(&op_data);
1473  return op_data.bypass_t;
1474  }
1475 
1476  }; // join_node_base
1477 
1478  // join base class type generator
1479  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1480  struct join_base {
1482  };
1483 
1484  template<int N, typename OutputTuple, typename K, typename KHash>
1485  struct join_base<N, key_matching_port, OutputTuple, key_matching<K,KHash> > {
1487  typedef K key_type;
1488  typedef KHash key_hash_compare;
1489  typedef typename internal::join_node_base< key_traits_type,
1490  // ports type
1492  OutputTuple > type;
1493  };
1494 
1496  // using tuple_element. The class PT is the port type (reserving_port, queueing_port, key_matching_port)
1497  // and should match the typename.
1498 
1499  template<int N, template<class> class PT, typename OutputTuple, typename JP>
1500  class unfolded_join_node : public join_base<N,PT,OutputTuple,JP>::type {
1501  public:
1503  typedef OutputTuple output_type;
1504  private:
1506  public:
1507  unfolded_join_node(graph &g) : base_type(g) {}
1508  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1509  };
1510 
1511 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1512  template <typename K, typename T>
1513  struct key_from_message_body {
1514  K operator()(const T& t) const {
1516  return key_from_message<K>(t);
1517  }
1518  };
1519  // Adds const to reference type
1520  template <typename K, typename T>
1521  struct key_from_message_body<K&,T> {
1522  const K& operator()(const T& t) const {
1524  return key_from_message<const K&>(t);
1525  }
1526  };
1527 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1528  // key_matching unfolded_join_node. This must be a separate specialization because the constructors
1529  // differ.
1530 
1531  template<typename OutputTuple, typename K, typename KHash>
1532  class unfolded_join_node<2,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1533  join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1536  public:
1538  typedef OutputTuple output_type;
1539  private:
1540  typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1543  typedef typename tbb::flow::tuple< f0_p, f1_p > func_initializer_type;
1544  public:
1545 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1546  unfolded_join_node(graph &g) : base_type(g,
1547  func_initializer_type(
1548  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1549  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>())
1550  ) ) {
1551  }
1552 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1553  template<typename Body0, typename Body1>
1554  unfolded_join_node(graph &g, Body0 body0, Body1 body1) : base_type(g,
1555  func_initializer_type(
1556  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1557  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1)
1558  ) ) {
1559  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 2, "wrong number of body initializers");
1560  }
1561  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1562  };
1563 
1564  template<typename OutputTuple, typename K, typename KHash>
1565  class unfolded_join_node<3,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1566  join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1570  public:
1572  typedef OutputTuple output_type;
1573  private:
1574  typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1578  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type;
1579  public:
1580 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1581  unfolded_join_node(graph &g) : base_type(g,
1582  func_initializer_type(
1583  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1584  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1585  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>())
1586  ) ) {
1587  }
1588 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1589  template<typename Body0, typename Body1, typename Body2>
1590  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2) : base_type(g,
1591  func_initializer_type(
1592  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1593  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1594  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2)
1595  ) ) {
1596  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 3, "wrong number of body initializers");
1597  }
1598  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1599  };
1600 
1601  template<typename OutputTuple, typename K, typename KHash>
1602  class unfolded_join_node<4,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1603  join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1608  public:
1610  typedef OutputTuple output_type;
1611  private:
1612  typedef join_node_base<key_matching<K,KHash>, input_ports_type, output_type > base_type;
1617  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type;
1618  public:
1619 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1620  unfolded_join_node(graph &g) : base_type(g,
1621  func_initializer_type(
1622  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1623  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1624  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1625  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>())
1626  ) ) {
1627  }
1628 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1629  template<typename Body0, typename Body1, typename Body2, typename Body3>
1630  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3) : base_type(g,
1631  func_initializer_type(
1632  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1633  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1634  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1635  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3)
1636  ) ) {
1637  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 4, "wrong number of body initializers");
1638  }
1639  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1640  };
1641 
1642  template<typename OutputTuple, typename K, typename KHash>
1643  class unfolded_join_node<5,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1644  join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1650  public:
1652  typedef OutputTuple output_type;
1653  private:
1654  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1660  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type;
1661  public:
1662 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1663  unfolded_join_node(graph &g) : base_type(g,
1664  func_initializer_type(
1665  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1666  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1667  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1668  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1669  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>())
1670  ) ) {
1671  }
1672 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1673  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4>
1674  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1675  func_initializer_type(
1676  new internal::type_to_key_function_body_leaf<T0, K, Body0>(body0),
1677  new internal::type_to_key_function_body_leaf<T1, K, Body1>(body1),
1678  new internal::type_to_key_function_body_leaf<T2, K, Body2>(body2),
1679  new internal::type_to_key_function_body_leaf<T3, K, Body3>(body3),
1680  new internal::type_to_key_function_body_leaf<T4, K, Body4>(body4)
1681  ) ) {
1682  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 5, "wrong number of body initializers");
1683  }
1684  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1685  };
1686 
1687 #if __TBB_VARIADIC_MAX >= 6
1688  template<typename OutputTuple, typename K, typename KHash>
1689  class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1690  join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1697  public:
1699  typedef OutputTuple output_type;
1700  private:
1701  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1707  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1708  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p > func_initializer_type;
1709  public:
1710 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1711  unfolded_join_node(graph &g) : base_type(g,
1712  func_initializer_type(
1713  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1714  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1715  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1716  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1717  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1718  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>())
1719  ) ) {
1720  }
1721 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1722  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4, typename Body5>
1723  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1724  : base_type(g, func_initializer_type(
1731  ) ) {
1732  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 6, "wrong number of body initializers");
1733  }
1734  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1735  };
1736 #endif
1737 
1738 #if __TBB_VARIADIC_MAX >= 7
1739  template<typename OutputTuple, typename K, typename KHash>
1740  class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1741  join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1749  public:
1751  typedef OutputTuple output_type;
1752  private:
1753  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1759  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1760  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1761  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p > func_initializer_type;
1762  public:
1763 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1764  unfolded_join_node(graph &g) : base_type(g,
1765  func_initializer_type(
1766  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1767  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1768  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1769  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1770  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1771  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1772  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>())
1773  ) ) {
1774  }
1775 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1776  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1777  typename Body5, typename Body6>
1778  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1779  Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1787  ) ) {
1788  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 7, "wrong number of body initializers");
1789  }
1790  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1791  };
1792 #endif
1793 
1794 #if __TBB_VARIADIC_MAX >= 8
1795  template<typename OutputTuple, typename K, typename KHash>
1796  class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1797  join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1806  public:
1808  typedef OutputTuple output_type;
1809  private:
1810  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1816  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1817  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1818  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1819  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p > func_initializer_type;
1820  public:
1821 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1822  unfolded_join_node(graph &g) : base_type(g,
1823  func_initializer_type(
1824  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1825  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1826  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1827  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1828  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1829  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1830  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1831  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>())
1832  ) ) {
1833  }
1834 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1835  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1836  typename Body5, typename Body6, typename Body7>
1837  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1838  Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1847  ) ) {
1848  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 8, "wrong number of body initializers");
1849  }
1850  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1851  };
1852 #endif
1853 
1854 #if __TBB_VARIADIC_MAX >= 9
1855  template<typename OutputTuple, typename K, typename KHash>
1856  class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1857  join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1867  public:
1869  typedef OutputTuple output_type;
1870  private:
1871  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1877  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1878  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1879  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1880  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1881  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p > func_initializer_type;
1882  public:
1883 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1884  unfolded_join_node(graph &g) : base_type(g,
1885  func_initializer_type(
1886  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1887  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1888  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1889  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1890  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1891  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1892  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1893  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1894  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>())
1895  ) ) {
1896  }
1897 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1898  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1899  typename Body5, typename Body6, typename Body7, typename Body8>
1900  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1901  Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1911  ) ) {
1912  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 9, "wrong number of body initializers");
1913  }
1914  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1915  };
1916 #endif
1917 
1918 #if __TBB_VARIADIC_MAX >= 10
1919  template<typename OutputTuple, typename K, typename KHash>
1920  class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > : public
1921  join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >::type {
1932  public:
1934  typedef OutputTuple output_type;
1935  private:
1936  typedef join_node_base<key_matching<K,KHash> , input_ports_type, output_type > base_type;
1942  typedef typename internal::type_to_key_function_body<T5, K> *f5_p;
1943  typedef typename internal::type_to_key_function_body<T6, K> *f6_p;
1944  typedef typename internal::type_to_key_function_body<T7, K> *f7_p;
1945  typedef typename internal::type_to_key_function_body<T8, K> *f8_p;
1946  typedef typename internal::type_to_key_function_body<T9, K> *f9_p;
1947  typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p > func_initializer_type;
1948  public:
1949 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
1950  unfolded_join_node(graph &g) : base_type(g,
1951  func_initializer_type(
1952  new internal::type_to_key_function_body_leaf<T0, K, key_from_message_body<K,T0> >(key_from_message_body<K,T0>()),
1953  new internal::type_to_key_function_body_leaf<T1, K, key_from_message_body<K,T1> >(key_from_message_body<K,T1>()),
1954  new internal::type_to_key_function_body_leaf<T2, K, key_from_message_body<K,T2> >(key_from_message_body<K,T2>()),
1955  new internal::type_to_key_function_body_leaf<T3, K, key_from_message_body<K,T3> >(key_from_message_body<K,T3>()),
1956  new internal::type_to_key_function_body_leaf<T4, K, key_from_message_body<K,T4> >(key_from_message_body<K,T4>()),
1957  new internal::type_to_key_function_body_leaf<T5, K, key_from_message_body<K,T5> >(key_from_message_body<K,T5>()),
1958  new internal::type_to_key_function_body_leaf<T6, K, key_from_message_body<K,T6> >(key_from_message_body<K,T6>()),
1959  new internal::type_to_key_function_body_leaf<T7, K, key_from_message_body<K,T7> >(key_from_message_body<K,T7>()),
1960  new internal::type_to_key_function_body_leaf<T8, K, key_from_message_body<K,T8> >(key_from_message_body<K,T8>()),
1961  new internal::type_to_key_function_body_leaf<T9, K, key_from_message_body<K,T9> >(key_from_message_body<K,T9>())
1962  ) ) {
1963  }
1964 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
1965  template<typename Body0, typename Body1, typename Body2, typename Body3, typename Body4,
1966  typename Body5, typename Body6, typename Body7, typename Body8, typename Body9>
1967  unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1968  Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1979  ) ) {
1980  __TBB_STATIC_ASSERT(tbb::flow::tuple_size<OutputTuple>::value == 10, "wrong number of body initializers");
1981  }
1982  unfolded_join_node(const unfolded_join_node &other) : base_type(other) {}
1983  };
1984 #endif
1985 
1987  template<size_t N, typename JNT>
1989  return tbb::flow::get<N>(jn.input_ports());
1990  }
1991 
1992 }
1993 #endif // __TBB__flow_graph_join_impl_H
1994 
static void set_join_node_pointer(TupleType &my_input, PortType *port)
bool try_get(output_type &v) __TBB_override
Request an item from the sender.
aggregator< handler_type, queueing_port_operation > my_aggregator
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
join_node_base(graph &g, FunctionTuple f)
join_node_base< JP, InputTuple, OutputTuple > class_type
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
graph & graph_reference() __TBB_override
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
task * try_put_task(const T &) __TBB_override
key_matching_port< traits > class_type
receiver< input_type >::predecessor_type predecessor_type
#define __TBB_override
Definition: tbb_stddef.h:240
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
static bool reserve(InputTuple &my_input, OutputTuple &out)
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
aggregator< handler_type, key_matching_port_operation > my_aggregator
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
aggregator< handler_type, reserving_port_operation > my_aggregator
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
static void consume_reservations(TupleType &my_input)
static void reset_my_port(InputTuple &my_input)
static void set_join_node_pointer(TupleType &my_input, PortType *port)
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
matching_forwarding_base< key_type > * my_join
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
static void reset_inputs(InputTuple &my_input, reset_flags f)
graph & graph_reference() __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync s
reserving_port(const reserving_port &)
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
void set_join_node_pointer(forwarding_base *join)
static void reset_inputs(InputTuple &my_input, reset_flags f)
receiver< input_type >::predecessor_type predecessor_type
static void release_reservations(TupleType &my_input)
task * try_put_task(const T &v) __TBB_override
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
static bool get_items(InputTuple &my_input, OutputTuple &out)
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
join_node_base(const join_node_base &other)
static void reset_my_port(InputTuple &my_input)
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
K key_from_message(const T &t)
Definition: flow_graph.h:693
void reset_receiver(reset_flags f) __TBB_override
queueing_port(const queueing_port &)
copy constructor
type_to_key_func_type * get_my_key_func()
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:532
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
Release.
Definition: atomic.h:45
broadcast_cache< output_type, null_rw_mutex > my_successors
A cache of successors that are broadcast to.
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
static tbb::task *const SUCCESSFULLY_ENQUEUED
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
virtual task * decrement_port_count(bool handle_task)=0
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
Definition: tbb_stddef.h:377
virtual void increment_port_count()=0
reservable_predecessor_cache< T, null_mutex > my_predecessors
unfolded_join_node(const unfolded_join_node &other)
static bool reserve(InputTuple &my_input, OutputTuple &out)
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
void set_join_node_pointer(forwarding_base *join)
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
aggregator< handler_type, key_matching_FE_operation > my_aggregator
The two-phase join port.
void release()
Release the port.
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
aggregator< handler_type, join_node_base_operation > my_aggregator
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
static void release_reservations(TupleType &my_input)
task * decrement_port_count(bool handle_task) __TBB_override
reserving_port< T > class_type
tbb::internal::strip< key_type >::type noref_key_type
void handle_operations(join_node_base_operation *op_list)
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
static void release_my_reservation(TupleType &my_input)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
void handle_operations(queueing_port_operation *op_list)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
A task that calls a node&#39;s forward_task function.
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:716
sender< output_type >::successor_type successor_type
static void consume_reservations(TupleType &my_input)
bool reserve(T &v)
Reserve an item from the port.
reserving_port_operation(const predecessor_type &s, op_type t)
field of type K being used for matching.
void handle_operations(reserving_port_operation *op_list)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
static void reset_ports(InputTuple &my_input)
graph & graph_reference() __TBB_override
void consume()
Complete use of the port.
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
join_node_base< JP, input_ports_type, output_type > base_type
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
Definition: flow_graph.h:171
join_node_FE : implements input port policy
void set_my_key_func(type_to_key_func_type *f)
const K & operator()(const table_item_type &v)
static bool get_items(InputTuple &my_input, OutputTuple &out)
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
A cache of successors that are put in a round-robin fashion.
void handle_operations(key_matching_port_operation *op_list)
void reset_node(reset_flags f) __TBB_override
Base class for types that should not be assigned.
Definition: tbb_stddef.h:320
void const char const char int ITT_FORMAT __itt_group_sync p
receiver< input_type >::predecessor_type predecessor_type
static void release_my_reservation(TupleType &my_input)
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
join_node_base_operation(const successor_type &s, op_type t)
void reset_receiver(reset_flags f) __TBB_override
static void reset_ports(InputTuple &my_input)
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
void reset_receiver(reset_flags f) __TBB_override
tbb::internal::strip< KeyType >::type current_key_type
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
key_matching_port(const key_matching_port &)

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.