17 #ifndef __TBB__flow_graph_join_impl_H 18 #define __TBB__flow_graph_join_impl_H 20 #ifndef __TBB_flow_graph_H 21 #error Do not #include this internal file directly; use public TBB headers instead. 39 template<
typename KeyType>
43 virtual task * increment_key_count(current_key_type
const & ,
bool ) = 0;
50 template<
typename TupleType,
typename PortType >
52 tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
55 template<
typename TupleType >
57 tbb::flow::get<N-1>( my_input ).consume();
61 template<
typename TupleType >
63 tbb::flow::get<N-1>( my_input ).
release();
66 template <
typename TupleType>
69 release_my_reservation(my_input);
72 template<
typename InputTuple,
typename OutputTuple >
73 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
74 if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) )
return false;
76 release_my_reservation( my_input );
82 template<
typename InputTuple,
typename OutputTuple>
83 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
84 bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) );
88 template<
typename InputTuple,
typename OutputTuple>
89 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
90 return get_my_item(my_input, out);
93 template<
typename InputTuple>
96 tbb::flow::get<N-1>(my_input).reset_port();
99 template<
typename InputTuple>
101 reset_my_port(my_input);
104 template<
typename InputTuple,
typename KeyFuncTuple>
106 tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
107 tbb::flow::get<N-1>(my_key_funcs) = NULL;
111 template<
typename KeyFuncTuple>
113 if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
114 tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
119 template<
typename InputTuple>
122 tbb::flow::get<N-1>(my_input).reset_receiver(f);
125 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 126 template<
typename InputTuple>
127 static inline void extract_inputs(InputTuple &my_input) {
129 tbb::flow::get<N-1>(my_input).extract_receiver();
137 template<
typename TupleType,
typename PortType >
139 tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
142 template<
typename TupleType >
144 tbb::flow::get<0>( my_input ).consume();
147 template<
typename TupleType >
149 tbb::flow::get<0>( my_input ).
release();
152 template<
typename TupleType>
154 release_my_reservation(my_input);
157 template<
typename InputTuple,
typename OutputTuple >
158 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
159 return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
162 template<
typename InputTuple,
typename OutputTuple>
163 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
164 return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
167 template<
typename InputTuple,
typename OutputTuple>
168 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
169 return get_my_item(my_input, out);
172 template<
typename InputTuple>
174 tbb::flow::get<0>(my_input).reset_port();
177 template<
typename InputTuple>
179 reset_my_port(my_input);
182 template<
typename InputTuple,
typename KeyFuncTuple>
184 tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
185 tbb::flow::get<0>(my_key_funcs) = NULL;
188 template<
typename KeyFuncTuple>
190 if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
191 tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
194 template<
typename InputTuple>
196 tbb::flow::get<0>(my_input).reset_receiver(f);
199 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 200 template<
typename InputTuple>
201 static inline void extract_inputs(InputTuple &my_input) {
202 tbb::flow::get<0>(my_input).extract_receiver();
208 template<
typename T >
213 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 214 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
215 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
220 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 221 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 234 predecessor_list_type *plist;
238 type(char(t)), my_arg(const_cast<T*>(&e)) {}
240 my_pred(const_cast<predecessor_type *>(&s)) {}
244 typedef internal::aggregating_functor<class_type, reserving_port_operation>
handler_type;
250 bool no_predecessors;
253 op_list = op_list->next;
254 switch(current->
type) {
256 no_predecessors = my_predecessors.empty();
257 my_predecessors.add(*(current->
my_pred));
258 if ( no_predecessors ) {
259 (
void) my_join->decrement_port_count(
true);
264 my_predecessors.remove(*(current->
my_pred));
265 if(my_predecessors.empty()) my_join->increment_port_count();
272 else if ( my_predecessors.try_reserve( *(current->
my_arg) ) ) {
276 if ( my_predecessors.empty() ) {
277 my_join->increment_port_count();
284 my_predecessors.try_release( );
289 my_predecessors.try_consume( );
292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 294 my_predecessors.internal_add_built_predecessor(*(current->
my_pred));
298 my_predecessors.internal_delete_built_predecessor(*(current->
my_pred));
302 current->cnt_val = my_predecessors.predecessor_count();
306 my_predecessors.copy_predecessors(*(current->plist));
315 template<
typename R,
typename B >
friend class run_and_put_task;
323 return my_join->graph_ref;
331 my_predecessors.set_owner(
this );
332 my_aggregator.initialize_handler(handler_type(
this));
339 my_predecessors.set_owner(
this );
340 my_aggregator.initialize_handler(handler_type(
this));
350 my_aggregator.execute(&op_data);
357 my_aggregator.execute(&op_data);
364 my_aggregator.execute(&op_data);
371 my_aggregator.execute(&op_data);
377 my_aggregator.execute(&op_data);
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 381 built_predecessors_type &built_predecessors()
__TBB_override {
return my_predecessors.built_predecessors(); }
382 void internal_add_built_predecessor(predecessor_type &src)
__TBB_override {
384 my_aggregator.execute(&op_data);
387 void internal_delete_built_predecessor(predecessor_type &src)
__TBB_override {
389 my_aggregator.execute(&op_data);
394 my_aggregator.execute(&op_data);
395 return op_data.cnt_val;
401 my_aggregator.execute(&op_data);
404 void extract_receiver() {
405 my_predecessors.built_predecessors().receiver_extract(*
this);
413 my_predecessors.reset();
415 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(),
"port edges not removed");
431 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 432 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
433 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
438 enum op_type { get__item, res_port, try__put_task
439 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 440 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
449 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 450 predecessor_type *pred;
452 predecessor_list_type *plist;
457 type(char(t)), my_val(e)
462 type(char(t)), my_arg(const_cast<T*>(p))
471 typedef internal::aggregating_functor<class_type, queueing_port_operation>
handler_type;
480 op_list = op_list->next;
481 switch(current->
type) {
482 case try__put_task: {
484 was_empty = this->buffer_empty();
485 this->push_back(current->
my_val);
486 if (was_empty) rtask = my_join->decrement_port_count(
false);
494 if(!this->buffer_empty()) {
495 *(current->
my_arg) = this->front();
503 __TBB_ASSERT(this->my_item_valid(this->my_head),
"No item to reset");
504 this->destroy_front();
505 if(this->my_item_valid(this->my_head)) {
506 (
void)my_join->decrement_port_count(
true);
510 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 512 my_built_predecessors.add_edge(*(current->pred));
516 my_built_predecessors.delete_edge(*(current->pred));
520 current->cnt_val = my_built_predecessors.edge_count();
524 my_built_predecessors.copy_edges(*(current->plist));
534 template<
typename R,
typename B >
friend class run_and_put_task;
539 my_aggregator.execute(&op_data);
546 return my_join->graph_ref;
554 my_aggregator.initialize_handler(handler_type(
this));
560 my_aggregator.initialize_handler(handler_type(
this));
570 my_aggregator.execute(&op_data);
578 my_aggregator.execute(&op_data);
582 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 583 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
585 void internal_add_built_predecessor(predecessor_type &
p)
__TBB_override {
588 my_aggregator.execute(&op_data);
591 void internal_delete_built_predecessor(predecessor_type &p)
__TBB_override {
594 my_aggregator.execute(&op_data);
599 my_aggregator.execute(&op_data);
600 return op_data.cnt_val;
606 my_aggregator.execute(&op_data);
609 void extract_receiver() {
611 my_built_predecessors.receiver_extract(*
this);
618 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 620 my_built_predecessors.clear();
626 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 627 edge_container<predecessor_type> my_built_predecessors;
641 template<
typename K >
649 template<
class TraitsType >
651 public receiver<typename TraitsType::T>,
652 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
653 typename TraitsType::KHash > {
664 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 665 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
666 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
672 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 673 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
682 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 683 predecessor_type *pred;
685 predecessor_list_type *plist;
689 type(char(t)), my_val(e) {}
692 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
697 typedef internal::aggregating_functor<class_type, key_matching_port_operation>
handler_type;
705 op_list = op_list->next;
706 switch(current->
type) {
708 bool was_inserted = this->insert_with_key(current->
my_val);
715 if(!this->find_with_key(my_join->current_key, *(current->
my_arg))) {
716 __TBB_ASSERT(
false,
"Failed to find item corresponding to current_key.");
722 this->delete_with_key(my_join->current_key);
725 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 727 my_built_predecessors.add_edge(*(current->pred));
731 my_built_predecessors.delete_edge(*(current->pred));
735 current->cnt_val = my_built_predecessors.edge_count();
739 my_built_predecessors.copy_edges(*(current->plist));
748 template<
typename R,
typename B >
friend class run_and_put_task;
754 my_aggregator.execute(&op_data);
756 rtask = my_join->increment_key_count((*(this->get_key_func()))(v),
false);
764 return my_join->graph_ref;
771 my_aggregator.initialize_handler(handler_type(
this));
777 my_aggregator.initialize_handler(handler_type(
this));
793 my_aggregator.execute(&op_data);
797 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 798 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
800 void internal_add_built_predecessor(predecessor_type &
p)
__TBB_override {
803 my_aggregator.execute(&op_data);
806 void internal_delete_built_predecessor(predecessor_type &p)
__TBB_override {
809 my_aggregator.execute(&op_data);
814 my_aggregator.execute(&op_data);
815 return op_data.cnt_val;
821 my_aggregator.execute(&op_data);
829 my_aggregator.execute(&op_data);
833 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 834 void extract_receiver() {
835 buffer_type::reset();
836 my_built_predecessors.receiver_extract(*
this);
841 buffer_type::reset();
842 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 844 my_built_predecessors.clear();
852 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 853 edge_container<predecessor_type> my_built_predecessors;
857 using namespace graph_policy_namespace;
859 template<
typename JP,
typename InputTuple,
typename OutputTuple>
863 template<
typename JP,
typename InputTuple,
typename OutputTuple>
866 template<
typename InputTuple,
typename OutputTuple>
875 ports_with_no_inputs = N;
880 ports_with_no_inputs = N;
884 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
887 ++ports_with_no_inputs;
892 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
894 task *rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
896 if(!handle_task)
return rtask;
909 ports_with_no_inputs = N;
913 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 916 ports_with_no_inputs = N;
924 return !ports_with_no_inputs;
928 if(ports_with_no_inputs)
return false;
944 template<
typename InputTuple,
typename OutputTuple>
953 ports_with_no_items = N;
958 ports_with_no_items = N;
963 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
966 ports_with_no_items = N;
972 if(ports_with_no_items.fetch_and_decrement() == 1) {
974 task *rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
976 if(!handle_task)
return rtask;
994 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1003 return !ports_with_no_items;
1007 if(ports_with_no_items)
return false;
1025 template<
typename InputTuple,
typename OutputTuple,
typename K,
typename KHash>
1029 typename tbb::internal::strip<K>::type&,
1030 count_element<typename tbb::internal::strip<K>::type>,
1031 internal::type_to_key_function_body<
1032 count_element<typename tbb::internal::strip<K>::type>,
1033 typename tbb::internal::strip<K>::type& >,
1062 enum op_type { res_count, inc_count, may_succeed, try_make };
1065 class key_matching_FE_operation :
public aggregated_operation<key_matching_FE_operation> {
1074 my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1076 enqueue_task(true) {}
1081 typedef internal::aggregating_functor<class_type, key_matching_FE_operation>
handler_type;
1082 friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1092 this->current_key = t;
1093 this->delete_with_key(this->current_key);
1095 this->push_back(l_out);
1097 rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
1109 __TBB_ASSERT(
false,
"should have had something to push");
1115 key_matching_FE_operation *current;
1118 op_list = op_list->next;
1119 switch(current->type) {
1122 this->destroy_front();
1127 count_element_type *
p = 0;
1128 unref_key_type &t = current->my_val;
1129 bool do_enqueue = current->enqueue_task;
1130 if(!(this->find_ref_with_key(t,p))) {
1131 count_element_type ev;
1134 this->insert_with_key(ev);
1135 if(!(this->find_ref_with_key(t,p))) {
1136 __TBB_ASSERT(
false,
"should find key after inserting it");
1140 task *rtask = fill_output_buffer(t,
true, do_enqueue);
1141 __TBB_ASSERT(!rtask || !do_enqueue,
"task should not be returned");
1142 current->bypass_t = rtask;
1151 if(this->buffer_empty()) {
1155 *(current->my_output) = this->front();
1165 template<
typename FunctionTuple>
1166 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1169 my_aggregator.initialize_handler(handler_type(
this));
1170 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1171 this->set_key_func(cfb);
1175 output_buffer_type() {
1179 my_aggregator.initialize_handler(handler_type(
this));
1180 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1181 this->set_key_func(cfb);
1185 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1188 key_matching_FE_operation op_data(res_count);
1189 my_aggregator.execute(&op_data);
1196 key_matching_FE_operation op_data(t, handle_task, inc_count);
1197 my_aggregator.execute(&op_data);
1198 return op_data.bypass_t;
1213 key_to_count_buffer_type::reset();
1214 output_buffer_type::reset();
1217 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1221 key_to_count_buffer_type::reset();
1222 output_buffer_type::reset();
1229 key_matching_FE_operation op_data(may_succeed);
1230 my_aggregator.execute(&op_data);
1237 key_matching_FE_operation op_data(&out,try_make);
1238 my_aggregator.execute(&op_data);
1255 template<
typename JP,
typename InputTuple,
typename OutputTuple>
1257 public sender<OutputTuple> {
1259 using graph_node::my_graph;
1265 using input_ports_type::tuple_build_may_succeed;
1266 using input_ports_type::try_to_make_tuple;
1267 using input_ports_type::tuple_accepted;
1268 using input_ports_type::tuple_rejected;
1269 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1270 typedef typename sender<output_type>::built_successors_type built_successors_type;
1271 typedef typename sender<output_type>::successor_list_type successor_list_type;
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1278 , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1291 successor_list_type *slist;
1296 my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1298 my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1302 typedef internal::aggregating_functor<class_type, join_node_base_operation>
handler_type;
1311 op_list = op_list->next;
1312 switch(current->
type) {
1314 my_successors.register_successor(*(current->
my_succ));
1316 task *rtask =
new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1320 forwarder_busy =
true;
1326 my_successors.remove_successor(*(current->
my_succ));
1330 if(tuple_build_may_succeed()) {
1331 if(try_to_make_tuple(*(current->
my_arg))) {
1339 case do_fwrd_bypass: {
1340 bool build_succeeded;
1341 task *last_task = NULL;
1343 if(tuple_build_may_succeed()) {
1345 build_succeeded = try_to_make_tuple(out);
1346 if(build_succeeded) {
1347 task *new_task = my_successors.try_put_task(out);
1354 build_succeeded =
false;
1357 }
while(build_succeeded);
1361 forwarder_busy =
false;
1364 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1366 my_successors.internal_add_built_successor(*(current->
my_succ));
1370 my_successors.internal_delete_built_successor(*(current->
my_succ));
1374 current->cnt_val = my_successors.successor_count();
1378 my_successors.copy_successors(*(current->slist));
1387 join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1388 my_successors.set_owner(
this);
1389 input_ports_type::set_my_node(
this);
1390 my_aggregator.initialize_handler(handler_type(
this));
1394 graph_node(other.graph_node::my_graph), input_ports_type(other),
1395 sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1396 my_successors.set_owner(
this);
1397 input_ports_type::set_my_node(
this);
1398 my_aggregator.initialize_handler(handler_type(
this));
1401 template<
typename FunctionTuple>
1402 join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1403 my_successors.set_owner(
this);
1404 input_ports_type::set_my_node(
this);
1405 my_aggregator.initialize_handler(handler_type(
this));
1410 my_aggregator.execute(&op_data);
1416 my_aggregator.execute(&op_data);
1422 my_aggregator.execute(&op_data);
1426 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1427 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
1429 void internal_add_built_successor( successor_type &r)
__TBB_override {
1431 my_aggregator.execute(&op_data);
1434 void internal_delete_built_successor( successor_type &r)
__TBB_override {
1436 my_aggregator.execute(&op_data);
1441 my_aggregator.execute(&op_data);
1442 return op_data.cnt_val;
1448 my_aggregator.execute(&op_data);
1452 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1454 input_ports_type::extract();
1455 my_successors.built_successors().sender_extract(*
this);
1462 input_ports_type::reset(f);
1472 my_aggregator.execute(&op_data);
1479 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1484 template<
int N,
typename OutputTuple,
typename K,
typename KHash>
1499 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1511 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1512 template <
typename K,
typename T>
1513 struct key_from_message_body {
1514 K operator()(
const T& t)
const {
1516 return key_from_message<K>(t);
1520 template <
typename K,
typename T>
1521 struct key_from_message_body<K&,T> {
1522 const K& operator()(
const T& t)
const {
1524 return key_from_message<const K&>(t);
1531 template<
typename OutputTuple,
typename K,
typename KHash>
1533 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1545 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1547 func_initializer_type(
1553 template<
typename Body0,
typename Body1>
1555 func_initializer_type(
1564 template<
typename OutputTuple,
typename K,
typename KHash>
1566 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1580 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1582 func_initializer_type(
1589 template<
typename Body0,
typename Body1,
typename Body2>
1591 func_initializer_type(
1601 template<
typename OutputTuple,
typename K,
typename KHash>
1603 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1619 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1621 func_initializer_type(
1629 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3>
1631 func_initializer_type(
1642 template<
typename OutputTuple,
typename K,
typename KHash>
1644 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1662 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1664 func_initializer_type(
1673 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4>
1674 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1675 func_initializer_type(
1687 #if __TBB_VARIADIC_MAX >= 6 1688 template<
typename OutputTuple,
typename K,
typename KHash>
1689 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1690 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1710 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1712 func_initializer_type(
1722 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
typename Body5>
1723 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1724 : base_type(g, func_initializer_type(
1738 #if __TBB_VARIADIC_MAX >= 7 1739 template<
typename OutputTuple,
typename K,
typename KHash>
1740 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1741 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1761 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p >
func_initializer_type;
1763 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1765 func_initializer_type(
1776 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1777 typename Body5,
typename Body6>
1778 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1779 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1790 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1794 #if __TBB_VARIADIC_MAX >= 8 1795 template<
typename OutputTuple,
typename K,
typename KHash>
1796 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1797 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1819 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p >
func_initializer_type;
1821 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1823 func_initializer_type(
1835 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1836 typename Body5,
typename Body6,
typename Body7>
1837 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1838 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1850 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1854 #if __TBB_VARIADIC_MAX >= 9 1855 template<
typename OutputTuple,
typename K,
typename KHash>
1856 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1857 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1881 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p >
func_initializer_type;
1883 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1885 func_initializer_type(
1898 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1899 typename Body5,
typename Body6,
typename Body7,
typename Body8>
1900 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1901 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1914 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1918 #if __TBB_VARIADIC_MAX >= 10 1919 template<
typename OutputTuple,
typename K,
typename KHash>
1920 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1921 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1947 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p >
func_initializer_type;
1949 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1951 func_initializer_type(
1965 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1966 typename Body5,
typename Body6,
typename Body7,
typename Body8,
typename Body9>
1967 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1968 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1982 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1987 template<
size_t N,
typename JNT>
1989 return tbb::flow::get<N>(jn.input_ports());
1993 #endif // __TBB__flow_graph_join_impl_H static void set_join_node_pointer(TupleType &my_input, PortType *port)
bool try_get(output_type &v) __TBB_override
Request an item from the sender.
aggregator< handler_type, queueing_port_operation > my_aggregator
void set_my_node(base_node_type *new_my_node)
internal::type_to_key_function_body< T1, K > * f1_p
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
queueing_port_operation(const T *p, op_type t)
join_node_FE(graph &g, FunctionTuple &TtoK_funcs)
key_matching_FE_operation(op_type t)
join_node_base(graph &g, FunctionTuple f)
join_node_base< JP, InputTuple, OutputTuple > class_type
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
graph & graph_reference() __TBB_override
current_key_type current_key
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
task * try_put_task(const T &) __TBB_override
key_matching_port< traits > class_type
tbb::flow::tuple_element< 0, OutputTuple >::type T0
receiver< input_type >::predecessor_type predecessor_type
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
void reset(reset_flags f)
static bool reserve(InputTuple &my_input, OutputTuple &out)
tbb::flow::tuple_element< 1, OutputTuple >::type T1
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
unfolded_join_node(const unfolded_join_node &other)
aggregator< handler_type, key_matching_port_operation > my_aggregator
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
unfolded_join_node(graph &g, Body0 body0, Body1 body1)
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
tbb::flow::tuple_element< 1, OutputTuple >::type T1
tbb::flow::tuple< f0_p, f1_p > func_initializer_type
aggregator< handler_type, reserving_port_operation > my_aggregator
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
static void consume_reservations(TupleType &my_input)
static void reset_my_port(InputTuple &my_input)
static void set_join_node_pointer(TupleType &my_input, PortType *port)
internal::type_to_key_function_body< T4, K > * f4_p
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
predecessor_type * my_pred
matching_forwarding_base< key_type > * my_join
tbb::flow::tuple_element< 2, OutputTuple >::type T2
key_matching_port_operation(const input_type &e, op_type t)
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
static void reset_inputs(InputTuple &my_input, reset_flags f)
tbb::flow::tuple_element< 2, OutputTuple >::type T2
reserving_port()
Constructor.
key_matching_FE_operation(output_type *p, op_type t)
tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type
graph & graph_reference() __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync s
reserving_port(const reserving_port &)
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
void reset(reset_flags f)
input_type & input_ports()
tbb::flow::tuple_element< 2, OutputTuple >::type T2
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
void set_join_node_pointer(forwarding_base *join)
static void reset_inputs(InputTuple &my_input, reset_flags f)
void set_my_node(base_node_type *new_my_node)
receiver< input_type >::predecessor_type predecessor_type
static void release_reservations(TupleType &my_input)
forwarding_base * my_join
task * try_put_task(const T &v) __TBB_override
forwarding_base * my_join
internal::type_to_key_function_body< T0, K > * f0_p
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
static bool get_items(InputTuple &my_input, OutputTuple &out)
internal::type_to_key_function_body< T0, K > * f0_p
reserving_port_operation(const T &e, op_type t)
reserving_port_operation(op_type t)
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
join_node_base(const join_node_base &other)
bool try_to_make_tuple(output_type &out)
static void reset_my_port(InputTuple &my_input)
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
key_to_count_functor< unref_key_type > key_to_count_func
K key_from_message(const T &t)
queueing_port_operation(const T &e, op_type t)
internal::type_to_key_function_body< T1, K > * f1_p
void reset_receiver(reset_flags f) __TBB_override
internal::type_to_key_function_body< T0, K > * f0_p
queueing_port(const queueing_port &)
copy constructor
unfolded_join_node(graph &g)
type_to_key_func_type * get_my_key_func()
key_matching_FE_operation(const unref_key_type &e, bool q_task, op_type t)
#define __TBB_STATIC_ASSERT(condition, msg)
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
unfolded_join_node(const unfolded_join_node &other)
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
broadcast_cache< output_type, null_rw_mutex > my_successors
A cache of successors that are broadcast to.
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
bool try_to_make_tuple(output_type &out)
static tbb::task *const SUCCESSFULLY_ENQUEUED
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
virtual task * decrement_port_count(bool handle_task)=0
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
void handle_operations(key_matching_FE_operation *op_list)
virtual void increment_port_count()=0
reservable_predecessor_cache< T, null_mutex > my_predecessors
unfolded_join_node(const unfolded_join_node &other)
tbb::flow::tuple_element< 0, OutputTuple >::type T0
bool is_graph_active(graph &g)
tbb::flow::tuple_element< 0, OutputTuple >::type T0
static bool reserve(InputTuple &my_input, OutputTuple &out)
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
tbb::flow::tuple_element< 3, OutputTuple >::type T3
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
internal::type_to_key_function_body< T3, K > * f3_p
task * decrement_port_count(bool handle_task) __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
void set_join_node_pointer(forwarding_base *join)
tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
internal::type_to_key_function_body< T2, K > * f2_p
tbb::internal::strip< key_type >::type unref_key_type
aggregator< handler_type, key_matching_FE_operation > my_aggregator
TraitsType::TtoK type_to_key_func_type
void release()
Release the port.
bool tuple_build_may_succeed()
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
aggregator< handler_type, join_node_base_operation > my_aggregator
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
count_element< K > table_item_type
unfolded_join_node(const unfolded_join_node &other)
static void release_reservations(TupleType &my_input)
task * decrement_port_count(bool handle_task) __TBB_override
reserving_port< T > class_type
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2)
join_node_base_operation(const output_type &e, op_type t)
internal::type_to_key_function_body< T0, K > * f0_p
internal::type_to_key_function_body< T2, K > * f2_p
bool get_item(input_type &v)
tbb::internal::strip< key_type >::type noref_key_type
void handle_operations(join_node_base_operation *op_list)
atomic< size_t > ports_with_no_items
tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type
unfolded_join_node(const unfolded_join_node &other)
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
static void release_my_reservation(TupleType &my_input)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
void reset(reset_flags f)
void handle_operations(queueing_port_operation *op_list)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
tbb::flow::tuple_element< 0, OutputTuple >::type T0
A task that calls a node's forward_task function.
queueing_port()
Constructor.
void __TBB_store_with_release(volatile T &location, V value)
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3)
sender< output_type >::successor_type successor_type
void increment_port_count() __TBB_override
task * decrement_port_count(bool) __TBB_override
static void consume_reservations(TupleType &my_input)
tbb::flow::tuple_element< 1, OutputTuple >::type T1
bool reserve(T &v)
Reserve an item from the port.
reserving_port_operation(const predecessor_type &s, op_type t)
queueing_port< T > class_type
join_node_base_operation(op_type t)
key_matching_port_operation(const input_type *p, op_type t)
matching_forwarding_base(graph &g)
field of type K being used for matching.
void handle_operations(reserving_port_operation *op_list)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
join_node_FE(const join_node_FE &other)
static void reset_ports(InputTuple &my_input)
graph & graph_reference() __TBB_override
tbb::flow::tuple_element< 3, OutputTuple >::type T3
key_matching_port_operation(op_type t)
void consume()
Complete use of the port.
virtual ~forwarding_base()
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
join_node_base< JP, input_ports_type, output_type > base_type
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
internal::type_to_key_function_body< T1, K > * f1_p
join_node_FE : implements input port policy
void set_my_key_func(type_to_key_func_type *f)
join_node_FE(const join_node_FE &other)
const K & operator()(const table_item_type &v)
internal::type_to_key_function_body< T3, K > * f3_p
static bool get_items(InputTuple &my_input, OutputTuple &out)
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
atomic< size_t > ports_with_no_inputs
join_node_FE(const join_node_FE &other)
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
A cache of successors that are put in a round-robin fashion.
void handle_operations(key_matching_port_operation *op_list)
internal::type_to_key_function_body< T1, K > * f1_p
void reset_node(reset_flags f) __TBB_override
void set_my_node(base_node_type *new_my_node)
forwarding_base(graph &g)
void increment_port_count() __TBB_override
bool tuple_build_may_succeed()
Base class for types that should not be assigned.
void const char const char int ITT_FORMAT __itt_group_sync p
receiver< input_type >::predecessor_type predecessor_type
static void release_my_reservation(TupleType &my_input)
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
queueing_port_operation(op_type t)
join_node_base_operation(const successor_type &s, op_type t)
count_element< unref_key_type > count_element_type
TraitsType::KHash hash_compare_type
void reset_receiver(reset_flags f) __TBB_override
static void reset_ports(InputTuple &my_input)
item_buffer< output_type > output_buffer_type
bool tuple_build_may_succeed()
tbb::flow::tuple_element< 4, OutputTuple >::type T4
bool try_to_make_tuple(output_type &out)
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
void increment_port_count() __TBB_override
internal::type_to_key_function_body< T2, K > * f2_p
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
void reset_receiver(reset_flags f) __TBB_override
tbb::flow::tuple_element< 1, OutputTuple >::type T1
input_type & input_ports()
tbb::internal::strip< KeyType >::type current_key_type
input_type & input_ports()
matching_forwarding_base< key_type > forwarding_base_type
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
key_matching_port(const key_matching_port &)
key_matching< K, KHash > key_traits_type