Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_queue.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 */
16 
17 #ifndef __TBB_concurrent_queue_H
18 #define __TBB_concurrent_queue_H
19 
22 
23 namespace tbb {
24 
25 namespace strict_ppl {
26 
28 
31 template<typename T, typename A = cache_aligned_allocator<T> >
32 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
33  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
34 
37  page_allocator_type my_allocator;
38 
40  virtual void *allocate_block( size_t n ) __TBB_override {
41  void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
42  if( !b )
44  return b;
45  }
46 
48  virtual void deallocate_block( void *b, size_t n ) __TBB_override {
49  my_allocator.deallocate( reinterpret_cast<char*>(b), n );
50  }
51 
52  static void copy_construct_item(T* location, const void* src){
53  new (location) T(*static_cast<const T*>(src));
54  }
55 
56 #if __TBB_CPP11_RVALUE_REF_PRESENT
57  static void move_construct_item(T* location, const void* src) {
58  new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
59  }
60 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
61 public:
63  typedef T value_type;
64 
66  typedef T& reference;
67 
69  typedef const T& const_reference;
70 
72  typedef size_t size_type;
73 
75  typedef ptrdiff_t difference_type;
76 
78  typedef A allocator_type;
79 
81  explicit concurrent_queue(const allocator_type& a = allocator_type()) :
82  my_allocator( a )
83  {
84  }
85 
87  template<typename InputIterator>
88  concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
89  my_allocator( a )
90  {
91  for( ; begin != end; ++begin )
92  this->push(*begin);
93  }
94 
96  concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
97  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
98  {
99  this->assign( src, copy_construct_item );
100  }
101 
102 #if __TBB_CPP11_RVALUE_REF_PRESENT
105  internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
106  {
107  this->internal_swap( src );
108  }
109 
110  concurrent_queue( concurrent_queue&& src, const allocator_type& a ) :
111  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
112  {
113  // checking that memory allocated by one instance of allocator can be deallocated
114  // with another
115  if( my_allocator == src.my_allocator) {
116  this->internal_swap( src );
117  } else {
118  // allocators are different => performing per-element move
119  this->assign( src, move_construct_item );
120  src.clear();
121  }
122  }
123 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
124 
127 
129  void push( const T& source ) {
130  this->internal_push( &source, copy_construct_item );
131  }
132 
133 #if __TBB_CPP11_RVALUE_REF_PRESENT
134  void push( T&& source ) {
135  this->internal_push( &source, move_construct_item );
136  }
137 
138 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
139  template<typename... Arguments>
140  void emplace( Arguments&&... args ) {
141  push( T(std::forward<Arguments>( args )...) );
142  }
143 #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
144 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
145 
147 
149  bool try_pop( T& result ) {
150  return this->internal_try_pop( &result );
151  }
152 
154  size_type unsafe_size() const {return this->internal_size();}
155 
157  bool empty() const {return this->internal_empty();}
158 
160  void clear() ;
161 
163  allocator_type get_allocator() const { return this->my_allocator; }
164 
165  typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
166  typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
167 
168  //------------------------------------------------------------------------
169  // The iterators are intended only for debugging. They are slow and not thread safe.
170  //------------------------------------------------------------------------
171  iterator unsafe_begin() {return iterator(*this);}
172  iterator unsafe_end() {return iterator();}
173  const_iterator unsafe_begin() const {return const_iterator(*this);}
174  const_iterator unsafe_end() const {return const_iterator();}
175 } ;
176 
177 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
178 // Deduction guide for the constructor from two iterators
179 template<typename InputIterator,
180  typename T = typename std::iterator_traits<InputIterator>::value_type,
181  typename A = cache_aligned_allocator<T>
182 > concurrent_queue(InputIterator, InputIterator, const A& = A())
184 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
185 
186 template<typename T, class A>
188  clear();
189  this->internal_finish_clear();
190 }
191 
192 template<typename T, class A>
194  T value;
195  while( !empty() ) try_pop(value);
196 }
197 
198 } // namespace strict_ppl
199 
201 
206 template<typename T, class A = cache_aligned_allocator<T> >
207 class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
208  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
210 
212  page_allocator_type my_allocator;
213 
216 
218  class destroyer: internal::no_copy {
220  public:
221  destroyer( T& value ) : my_value(value) {}
222  ~destroyer() {my_value.~T();}
223  };
224 
225  T& get_ref( page& p, size_t index ) {
226  __TBB_ASSERT( index<items_per_page, NULL );
227  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
228  }
229 
230  virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
231  new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
232  }
233 
234 #if __TBB_CPP11_RVALUE_REF_PRESENT
235  virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
236  new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
237  }
238 #else
239  virtual void move_item( page&, size_t, const void* ) __TBB_override {
240  __TBB_ASSERT( false, "Unreachable code" );
241  }
242 #endif
243 
244  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
245  new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
246  }
247 
248 #if __TBB_CPP11_RVALUE_REF_PRESENT
249  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
250  new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
251  }
252 #else
253  virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
254  __TBB_ASSERT( false, "Unreachable code" );
255  }
256 #endif
257 
258  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
259  T& from = get_ref(src,index);
260  destroyer d(from);
261  *static_cast<T*>(dst) = tbb::internal::move( from );
262  }
263 
265  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
266  page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
267  if( !p )
269  return p;
270  }
271 
273  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
274  my_allocator.deallocate( reinterpret_cast<char*>(p), n );
275  }
276 
277 public:
279  typedef T value_type;
280 
282  typedef A allocator_type;
283 
285  typedef T& reference;
286 
288  typedef const T& const_reference;
289 
291 
293  typedef std::ptrdiff_t size_type;
294 
296  typedef std::ptrdiff_t difference_type;
297 
299  explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
300  concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
301  {
302  }
303 
305  concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type())
306  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
307  {
308  assign( src );
309  }
310 
311 #if __TBB_CPP11_RVALUE_REF_PRESENT
315  {
316  internal_swap( src );
317  }
318 
319  concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a )
320  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
321  {
322  // checking that memory allocated by one instance of allocator can be deallocated
323  // with another
324  if( my_allocator == src.my_allocator) {
325  this->internal_swap( src );
326  } else {
327  // allocators are different => performing per-element move
328  this->move_content( src );
329  src.clear();
330  }
331  }
332 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
333 
335  template<typename InputIterator>
336  concurrent_bounded_queue( InputIterator begin, InputIterator end,
337  const allocator_type& a = allocator_type())
338  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
339  {
340  for( ; begin != end; ++begin )
341  internal_push_if_not_full(&*begin);
342  }
343 
346 
348  void push( const T& source ) {
349  internal_push( &source );
350  }
351 
352 #if __TBB_CPP11_RVALUE_REF_PRESENT
353  void push( T&& source ) {
355  internal_push_move( &source );
356  }
357 
358 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
359  template<typename... Arguments>
360  void emplace( Arguments&&... args ) {
361  push( T(std::forward<Arguments>( args )...) );
362  }
363 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
364 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
365 
367 
368  void pop( T& destination ) {
369  internal_pop( &destination );
370  }
371 
372 #if TBB_USE_EXCEPTIONS
373  void abort() {
375  internal_abort();
376  }
377 #endif
378 
380 
382  bool try_push( const T& source ) {
383  return internal_push_if_not_full( &source );
384  }
385 
386 #if __TBB_CPP11_RVALUE_REF_PRESENT
387 
390  bool try_push( T&& source ) {
391  return internal_push_move_if_not_full( &source );
392  }
393 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
394  template<typename... Arguments>
395  bool try_emplace( Arguments&&... args ) {
396  return try_push( T(std::forward<Arguments>( args )...) );
397  }
398 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
399 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
400 
402 
404  bool try_pop( T& destination ) {
405  return internal_pop_if_present( &destination );
406  }
407 
409 
412  size_type size() const {return internal_size();}
413 
415  bool empty() const {return internal_empty();}
416 
418  size_type capacity() const {
419  return my_capacity;
420  }
421 
423 
425  void set_capacity( size_type new_capacity ) {
426  internal_set_capacity( new_capacity, sizeof(T) );
427  }
428 
430  allocator_type get_allocator() const { return this->my_allocator; }
431 
433  void clear() ;
434 
435  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
436  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
437 
438  //------------------------------------------------------------------------
439  // The iterators are intended only for debugging. They are slow and not thread safe.
440  //------------------------------------------------------------------------
441  iterator unsafe_begin() {return iterator(*this);}
442  iterator unsafe_end() {return iterator();}
443  const_iterator unsafe_begin() const {return const_iterator(*this);}
444  const_iterator unsafe_end() const {return const_iterator();}
445 
446 };
447 
448 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
449 // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
450 template<typename InputIterator,
451  typename T = typename std::iterator_traits<InputIterator>::value_type,
452  typename A = cache_aligned_allocator<T>
453 > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
455 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
456 
457 template<typename T, class A>
459  clear();
461 }
462 
463 template<typename T, class A>
465  T value;
466  while( try_pop(value) ) /*noop*/;
467 }
468 
470 
471 } // namespace tbb
472 
473 #endif /* __TBB_concurrent_queue_H */
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
concurrent_queue_base_v3::copy_specifics copy_specifics
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
internal::concurrent_queue_iterator< concurrent_bounded_queue, const T > const_iterator
std::ptrdiff_t size_type
Integral type for representing size of the queue.
#define __TBB_override
Definition: tbb_stddef.h:240
allocator_type get_allocator() const
Return allocator object.
auto last(Container &c) -> decltype(begin(c))
bool empty() const
Equivalent to size()<=0.
virtual page * allocate_page() __TBB_override
custom allocator
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
const_iterator unsafe_end() const
friend class internal::concurrent_queue_iterator
bool empty() const
Equivalent to size()==0.
bool try_pop(T &destination)
Attempt to dequeue an item from head of queue.
virtual void move_item(page &dst, size_t index, const void *src) __TBB_override
const_iterator unsafe_begin() const
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
allocator_traits< Alloc >::template rebind_alloc< T >::other type
const_iterator unsafe_begin() const
virtual void copy_item(page &dst, size_t index, const void *src) __TBB_override
~concurrent_bounded_queue()
Destroy queue.
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
The graph class.
bool try_pop(T &result)
Attempt to dequeue an item from head of queue.
bool try_push(T &&source)
Move an item at tail of queue if queue is not already full.
void emplace(Arguments &&... args)
void push(const T &source)
Enqueue an item at tail of queue.
const T & const_reference
Const reference type.
virtual void deallocate_page(page *p) __TBB_override
custom de-allocator
bool internal_empty() const
check if the queue is empty; thread safe
Class used to ensure exception-safety of method "pop".
virtual void deallocate_block(void *b, size_t n) __TBB_override
Deallocates block created by allocate_block.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
concurrent_queue(concurrent_queue &&src, const allocator_type &a)
const T & const_reference
Const reference type.
concurrent_queue(const concurrent_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:165
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void clear()
clear the queue. not thread-safe.
T value_type
Element type in the queue.
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:305
size_t size_type
Integral type for representing size of the queue.
page_allocator_type my_allocator
Allocator type.
void push(const T &source)
Enqueue an item at tail of queue.
virtual void * allocate_block(size_t n) __TBB_override
Allocates a block of size n (bytes)
size_type size() const
Return number of pushes minus number of pops.
concurrent_bounded_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
Allocator type.
concurrent_bounded_queue(concurrent_bounded_queue &&src, const allocator_type &a)
concurrent_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
internal::concurrent_queue_iterator< concurrent_bounded_queue, T > iterator
bool try_push(const T &source)
Enqueue an item at tail of queue if queue is not already full.
void clear()
Clear the queue. not thread-safe.
A high-performance thread-safe blocking concurrent bounded queue.
bool try_emplace(Arguments &&... args)
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
concurrent_queue_base_v3::padded_page< T > padded_page
concurrent_bounded_queue(const allocator_type &a=allocator_type())
Construct empty queue.
static void move_construct_item(T *location, const void *src)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
virtual void assign_and_destroy_item(void *dst, page &src, size_t index) __TBB_override
internal::concurrent_queue_iterator< concurrent_queue, T > iterator
static void copy_construct_item(T *location, const void *src)
T value_type
Element type in the queue.
void set_capacity(size_type new_capacity)
Set the capacity.
size_type capacity() const
Maximum number of allowed elements.
void pop(T &destination)
Dequeue item from head of queue.
concurrent_queue(const allocator_type &a=allocator_type())
Construct empty queue.
std::ptrdiff_t difference_type
Difference type for iterator.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
void const char const char int ITT_FORMAT __itt_group_sync p
const_iterator unsafe_end() const
size_type unsafe_size() const
Return the number of items in the queue; thread unsafe.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
A high-performance thread-safe non-blocking concurrent queue.
concurrent_bounded_queue(const concurrent_bounded_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
ptrdiff_t difference_type
Difference type for iterator.
allocator_type get_allocator() const
return allocator object
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
T & get_ref(page &p, size_t index)
internal::concurrent_queue_iterator< concurrent_queue, const T > const_iterator
void emplace(Arguments &&... args)

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.