c++-gtk-utils
async_queue.h
Go to the documentation of this file.
1 /* Copyright (C) 2006 to 2013 Chris Vine
2 
3 The library comprised in this file or of which this file is part is
4 distributed by Chris Vine under the GNU Lesser General Public
5 License as follows:
6 
7  This library is free software; you can redistribute it and/or
8  modify it under the terms of the GNU Lesser General Public License
9  as published by the Free Software Foundation; either version 2.1 of
10  the License, or (at your option) any later version.
11 
12  This library is distributed in the hope that it will be useful, but
13  WITHOUT ANY WARRANTY; without even the implied warranty of
14  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15  Lesser General Public License, version 2.1, for more details.
16 
17  You should have received a copy of the GNU Lesser General Public
18  License, version 2.1, along with this library (see the file LGPL.TXT
19  which came with this source code package in the c++-gtk-utils
20  sub-directory); if not, write to the Free Software Foundation, Inc.,
21  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 
23 However, it is not intended that the object code of a program whose
24 source code instantiates a template from this file or uses macros or
25 inline functions (of any length) should by reason only of that
26 instantiation or use be subject to the restrictions of use in the GNU
27 Lesser General Public License. With that in mind, the words "and
28 macros, inline functions and instantiations of templates (of any
29 length)" shall be treated as substituted for the words "and small
30 macros and small inline functions (ten lines or less in length)" in
31 the fourth paragraph of section 5 of that licence. This does not
32 affect any other reason why object code may be subject to the
33 restrictions in that licence (nor for the avoidance of doubt does it
34 affect the application of section 2 of that licence to modifications
35 of the source code in this file).
36 
37 */
38 
39 /**
40  * @file async_queue.h
41  * @brief This file provides thread-safe asynchronous queue classes.
42  *
43  * AsyncQueue is a class which provides some of the functionality of a
44  * std::queue object (but note that the AsyncQueue::pop(value_type&
45  * obj) and AsyncQueue::move_pop(value_type& obj) methods provide the
46  * popped element by reference - see the comments on that method for
47  * the reason), except that it has mutex locking of the data container
48  * so as to permit pushing and popping from different threads. It is
49  * therefore useful for passing data between threads, perhaps in
50  * response to a signal being emitted from a Notifier object. Data
51  * can be pushed or pulled by move constructor/move assignment
52  * operator or by means of a std::unique_ptr object, SharedLockPtr
53  * object or an IntrusivePtr object referencing data derived from
54  * IntrusiveLockCounter.
55  *
56  * AsyncQueueDispatch is a class which has blocking pop() and
57  * move_pop() methods, which allows it to be waited on by a dedicated
58  * event/message dispatching thread for incoming work (represented by
59  * the data pushed onto the queue). In the same way, it can be used
60  * to implement thread pools, by having threads in the pool waiting on
61  * the queue.
62  *
63  * By default the queues use a std::list object as their container
64  * because when adding an item to the queue all allocation can take
65  * place outside the queue object's mutex. However, for types which
66  * have low overhead copy or move constructors, this can be changed
67  * to, say, a std::deque object by specifying it as the second
68  * template parameter.
69  */
70 
71 #ifndef CGU_ASYNC_QUEUE_H
72 #define CGU_ASYNC_QUEUE_H
73 
74 #include <queue>
75 #include <list>
76 #include <exception>
77 #include <utility> // for std::move and std::forward
78 #include <algorithm> // for std::swap
79 #include <time.h>
80 
81 #include <c++-gtk-utils/mutex.h>
82 #include <c++-gtk-utils/thread.h>
84 
85 #ifdef CGU_USE_SCHED_YIELD
86 #include <sched.h>
87 #else
88 #include <unistd.h>
89 #endif
90 
91 namespace Cgu {
92 
93 /**
94  * @class AsyncQueuePopError async_queue.h c++-gtk-utils/async_queue.h
95  * @brief An exception thrown if calling pop() on a AsyncQueue or
96  * AsyncQueueDispatch object fails because the queue is empty.
97  * @sa AsyncQueue AsyncQueueDispatch
98  */
99 
100 struct AsyncQueuePopError: public std::exception {
101  virtual const char* what() const throw() {return "AsyncQueuePopError: popping from empty AsyncQueue object\n";}
102 };
103 
104 
105 /**
106  * @class AsyncQueue async_queue.h c++-gtk-utils/async_queue.h
107  * @brief A thread-safe asynchronous queue.
108  * @sa AsyncQueueDispatch AsyncResult
109  *
110  * AsyncQueue is a class which provides some of the functionality of a
111  * std::queue object (but note that the AsyncQueue::pop(value_type&
112  * obj) and AsyncQueue::move_pop(value_type& obj) methods provide the
113  * popped element by reference - see the comments on that method for
114  * the reason), except that it has mutex locking of the data container
115  * so as to permit pushing and popping from different threads. It is
116  * therefore useful for passing data between threads, perhaps in
117  * response to a signal being emitted from a Notifier object. Data
118  * can be pushed or pulled by move constructor/move assignment
119  * operator or by means of a std::unique_ptr object, SharedLockPtr
120  * object or an IntrusivePtr object referencing data derived from
121  * IntrusiveLockCounter.
122  *
123  * By default the queue uses a std::list object as its container
124  * because when adding an item to the queue all allocation can take
125  * place outside the queue object's mutex. However, for types which
126  * have low overhead copy or move constructors, this can be changed
127  * to, say, a std::deque object by specifying it as the second
128  * template parameter.
129  *
130  * If the library is installed using the
131  * \--with-glib-memory-slices-compat or
132  * \--with-glib-memory-slices-no-compat configuration options, any
133  * AsyncQueue objects constructed on free store will be constructed in
134  * glib memory slices. This does not affect the queue container
135  * itself: to change the allocator of the C++ container, a custom
136  * allocator type can be provided when the AsyncQueue object is
137  * instantiated offering the standard allocator interface. If glib
138  * memory slices are not used or no AsyncQueue objects are constructed
139  * on free store, it is not necessary to call g_thread_init() before
140  * manipulating or using an AsyncQueue object in multiple threads, but
141  * prior to glib version 2.32 glib itself (and thus glib memory
142  * slices) are not thread safe unless that function has been called.
143  */
144 
145 template <class T, class Container = std::list<T> > class AsyncQueue {
146 public:
147  typedef typename Container::value_type value_type;
148  typedef typename Container::size_type size_type;
149  typedef Container container_type;
150 private:
151  mutable Thread::Mutex mutex;
152  std::queue<T, Container> q;
153 
154 // TODO: at the next ABI break make this method explicitly static
155 // this method won't throw: it is for the user to ensure the arguments
156 // do not refer to the same mutex object
157  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
158  m1.lock();
159  for(;;) {
160  if (!m2.trylock()) {
161  return;
162  }
163  m1.unlock();
164  // spin nicely
165 #ifdef CGU_USE_SCHED_YIELD
166  sched_yield();
167 #else
168  usleep(10);
169 #endif
170  m1.lock();
171  }
172  }
173 public:
174 /**
175  * Pushes an item onto the queue. This method has strong exception
176  * safety if the container is a std::list or std::deque container (the
177  * default is std::list), except that if std::deque is used as the
178  * container and the copy constructor, move constructor, copy
179  * assignment operator or move assignment operator of the queue item
180  * throws, it only gives the basic exception guarantee (and the basic
181  * guarantee is not given by std::deque if the queue item's move
182  * constructor throws and it uses a non-default allocator which does
183  * not provide for it to be CopyInsertable). It is thread safe.
184  * @param obj The item to be pushed onto the queue.
185  * @exception std::bad_alloc The method might throw std::bad_alloc if
186  * memory is exhausted and the system throws in that case. It might
187  * also throw if the copy constructor, move constructor, assignment
188  * operator or move assignment operator of the queue item might throw.
189  */
190  void push(const value_type& obj) {
191  Thread::Mutex::Lock lock{mutex};
192  q.push(obj);
193  }
194 
195 /**
196  * Pushes an item onto the queue. This method has strong exception
197  * safety if the container is a std::list or std::deque container (the
198  * default is std::list), except that if std::deque is used as the
199  * container and the copy constructor, move constructor, copy
200  * assignment operator or move assignment operator of the queue item
201  * throws, it only gives the basic exception guarantee (and the basic
202  * guarantee is not given by std::deque if the queue item's move
203  * constructor throws and it uses a non-default allocator which does
204  * not provide for it to be CopyInsertable). It is thread safe.
205  * @param obj The item to be pushed onto the queue.
206  * @exception std::bad_alloc The method might throw std::bad_alloc if
207  * memory is exhausted and the system throws in that case. It might
208  * also throw if the copy constructor, move constructor, assignment
209  * operator or move assignment operator of the queue item might throw.
210  *
211  * Since 2.0.0-rc5
212  */
213  void push(value_type&& obj) {
214  Thread::Mutex::Lock lock{mutex};
215  q.push(std::move(obj));
216  }
217 
218 /**
219  * Pushes an item onto the queue by constructing it in place: that is,
220  * by passing to this method the item's constructor's arguments,
221  * rather than the item itself. This method has strong exception
222  * safety if the container is a std::list or std::deque container (the
223  * default is std::list). (Technically, for a std::deque container,
224  * emplace() only offers the same exception guarantees as does push(),
225  * namely only the basic guarantee where a copy or move of the queue
226  * item throws during the call, but the purpose of emplace is to
227  * construct in place and any reasonable implementation will not copy
228  * or move the queue item.) It is thread safe.
229  * @param args The constructor arguments for the item to be pushed
230  * onto the queue.
231  * @exception std::bad_alloc The method might throw std::bad_alloc if
232  * memory is exhausted and the system throws in that case. It might
233  * also throw if the item's constructor (including any of its
234  * constructor arguments) might throw when constructing the item.
235  * @note The constructor of the item pushed onto the queue must not
236  * access any of the methods of the same queue object, or a deadlock
237  * might occur.
238  *
239  * Since 2.0.0-rc5
240  */
241  template<class... Args>
242  void emplace(Args&&... args) {
243  Thread::Mutex::Lock lock{mutex};
244  q.emplace(std::forward<Args>(args)...);
245  }
246 
247 /**
248  * Pops an item from the queue. This method has strong exception
249  * safety if the container is a std::deque or std::list container (the
250  * default is std::list), provided the destructor of a contained item
251  * does not throw. It is thread safe.
252  * @param obj A value type reference to which the item at the front of
253  * the queue will be assigned.
254  * @exception AsyncQueuePopError If the queue is empty when a pop is
255  * attempted, this method will throw AsyncQueuePopError. It might
256  * also throw if the copy assignment operator of the queue item might
257  * throw. In order to complete pop operations atomically under a
258  * single lock and to retain strong exception safety, the object into
259  * which the popped data is to be placed is passed as an argument by
260  * reference (this avoids a copy from a temporary object after the
261  * data has been extracted from the queue, which would occur if the
262  * item extracted were returned by value). It might also throw if the
263  * destructor of the queue item might throw (but that should never
264  * happen), or if the empty() method of the container type throws
265  * (which would not happen on any sane implementation).
266  */
267  void pop(value_type& obj) {
268  Thread::Mutex::Lock lock{mutex};
269  if (q.empty()) throw AsyncQueuePopError();
270  obj = q.front();
271  q.pop();
272  }
273 
274 /**
275  * Pops an item from the queue using the contained type's move
276  * assignment operator, if it has one. This method is identical to
277  * the pop() method if that type has no move assignment operator.
278  * This method has strong exception safety if the container is a
279  * std::deque or std::list container (the default is std::list),
280  * provided the destructor of a contained item does not throw and the
281  * move assignment operator of a contained item has strong exception
282  * safety. It is thread safe. Use this method in preference to the
283  * pop() method if it is known that the contained items' move
284  * assignment operator does not throw or is strongly exception safe,
285  * or if the use case does not require strong exception safety. This
286  * method must be used in place of the pop() method if the contained
287  * type has a move assignment operator but no copy assignment operator
288  * (such as a std::unique_ptr object).
289  * @param obj A value type reference to which the item at the front of
290  * the queue will be move assigned.
291  * @exception AsyncQueuePopError If the queue is empty when a pop is
292  * attempted, this method will throw AsyncQueuePopError. It might
293  * also throw if the move assignment operator of the queue item might
294  * throw, or if it has no move assignment operator and its copy
295  * assignment operator throws. In order to complete pop operations
296  * atomically under a single lock and to retain strong exception
297  * safety, the object into which the popped data is to be placed is
298  * passed as an argument by reference (this avoids a move from a
299  * temporary object after the data has been extracted from the queue,
300  * which would occur if the item extracted were returned by value).
301  * It might also throw if the destructor of the queue item might throw
302  * (but that should never happen), or if the empty() method of the
303  * container type throws (which would not happen on any sane
304  * implementation).
305  *
306  * Since 2.0.11
307  */
308  void move_pop(value_type& obj) {
309  Thread::Mutex::Lock lock{mutex};
310  if (q.empty()) throw AsyncQueuePopError();
311  obj = std::move(q.front());
312  q.pop();
313  }
314 
315 /**
316  * Discards the item at the front of the queue. This method has
317  * strong exception safety if the container is a std::deque or
318  * std::list container (the default is std::list), provided the
319  * destructor of a contained item does not throw. It is thread safe.
320  * @exception AsyncQueuePopError If the queue is empty when a pop is
321  * attempted, this method will throw AsyncQueuePopError. It might
322  * also throw if the destructor of the queue item might throw (but
323  * that should never happen), or if the empty() method of the
324  * container type throws (which would not happen on any sane
325  * implementation).
326  */
327  void pop() {
328  Thread::Mutex::Lock lock{mutex};
329  if (q.empty()) throw AsyncQueuePopError();
330  q.pop();
331  }
332 
333 /**
334  * @return Whether the queue is empty. It will not throw assuming
335  * that the empty() method of the container type does not throw, as it
336  * will not on any sane implementation.
337  * @note This method is thread safe, but the return value may not be
338  * valid if another thread has pushed to or popped from the queue
339  * before the value returned by the method is acted on. It is
340  * provided as a utility, but may not be meaningful, depending on the
341  * intended usage.
342  */
343  bool empty() const {
344  Thread::Mutex::Lock lock{mutex};
345  return q.empty();
346  }
347 
348 /**
349  * @return The number of items currently in the queue. It will not
350  * throw assuming that the size() method of the container type does
351  * not throw, as it will not on any sane implementation.
352  * @note This method is thread safe, but the return value may not be
353  * valid if another thread has pushed to or popped from the queue
354  * before the value returned by the method is acted on. It is
355  * provided as a utility, but may not be meaningful, depending on the
356  * intended usage.
357  *
358  * Since 2.0.8
359  */
360  size_type size() const {
361  Thread::Mutex::Lock lock{mutex};
362  return q.size();
363  }
364 
365 /**
366  * Swaps the contents of 'this' and 'other'. It will not throw
367  * assuming that the swap method of the container type does not throw
368  * (which the C++11 standard requires not to happen with the standard
369  * sequence containers). It is thread safe and the swap is
370  * thread-wise atomic. A non-class function
371  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
372  * provided which will call this method.
373  * @param other The object to be swapped with this one.
374  *
375  * Since 2.0.8
376  */
377  void swap(AsyncQueue& other) {
378  if (this != &other) {
379  lock2(mutex, other.mutex); // doesn't throw
381  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
382  q.swap(other.q);
383  }
384  }
385 
386 /**
387  * The copy assignment operator is strongly exception safe with the
388  * standard sequence containers (it uses copy and swap). It is also
389  * thread safe, as it safely locks both the assignor's and assignee's
390  * mutex to provide a thread-wise atomic assignment.
391  * @param rhs The assignor.
392  * @return The AsyncQueue object after assignment.
393  * @exception std::bad_alloc The copy constructor of the queue's
394  * container type, and so this assignment operator, might throw
395  * std::bad_alloc if memory is exhausted and the system throws in that
396  * case. This assignment operator will also throw if the copy
397  * constructor of the queue's container type throws any other
398  * exceptions, including if any copy or move constructor or copy or
399  * move assignment operator of a contained item throws.
400  * @exception Thread::MutexError This assignment operator might throw
401  * Thread::MutexError if initialization of a transitional object's
402  * contained mutex fails. (It is often not worth checking for this,
403  * as it means either memory is exhausted or pthread has run out of
404  * other resources to create new mutexes.)
405  *
406  * Since 2.0.8
407  */
409  if (this != &rhs) {
410  lock2(mutex, rhs.mutex); // doesn't throw
412  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
413  std::queue<T, Container> temp{rhs.q};
414  q.swap(temp);
415  }
416  return *this;
417  }
418 
419 /**
420  * This move assignment operator is thread safe as regards the
421  * assignee (the object moved to), but no synchronization is carried
422  * out with respect to the rvalue assignor/movant. This is because
423  * temporaries are only visible and accessible in the thread carrying
424  * out the move operation and synchronization for them would represent
425  * pointless overhead. In a case where the user uses std::move to
426  * force a move from a named object, and that named object's lifetime
427  * is managed by (or the object is otherwise accessed by) a different
428  * thread than the one making the move, the user must carry out her
429  * own synchronization with respect to that different thread, both to
430  * ensure that a consistent view of the the named object is obtained
431  * and because that object will be mutated by the move. This method
432  * invokes std::queue's move assignment operator, and therefore has
433  * the same exception safety as the standard library's implementation
434  * of that operator. It will not normally throw unless a custom
435  * allocator is used which throws on move assignment, or the
436  * destructor of a contained item throws.
437  * @param rhs The assignor/movant.
438  * @return The AsyncQueue object after move assignment.
439  *
440  * Since 2.0.8
441  */
443  Thread::Mutex::Lock lock{mutex};
444  q = std::move(rhs.q);
445  return *this;
446  }
447 
448 /**
449  * @exception std::bad_alloc The default constructor might throw
450  * std::bad_alloc if memory is exhausted and the system throws in that
451  * case.
452  * @exception Thread::MutexError The default constructor might throw
453  * Thread::MutexError if initialization of the contained mutex fails.
454  * (It is often not worth checking for this, as it means either memory
455  * is exhausted or pthread has run out of other resources to create
456  * new mutexes.)
457  */
458  AsyncQueue() = default;
459 
460 /**
461  * As regards thread safety, the move constructor does not synchronize
462  * with respect to the initializing rvalue. This is because
463  * temporaries are only visible and accessible in the thread carrying
464  * out the move operation and synchronization for them would represent
465  * pointless overhead. In a case where a user uses std::move to force
466  * a move from a named object, and that named object's lifetime is
467  * managed by (or the object is otherwise accessed by) a different
468  * thread than the one making the move, the user must carry out her
469  * own synchronization with respect to that different thread, both to
470  * ensure that a consistent view of the the named object is obtained
471  * and because that object will be mutated by the move.
472  * @param rhs The AsyncQueue object to be moved.
473  * @exception Thread::MutexError The move constructor might throw
474  * Thread::MutexError if initialization of the contained mutex fails.
475  * If it does so this move constructor is strongly exception safe (if
476  * it is thrown, the object passed as an argument will be unchanged).
477  * (It is often not worth checking for this, as it means either memory
478  * is exhausted or pthread has run out of other resources to create
479  * new mutexes.) It might also throw if the queue's container type's
480  * move constructor might throw, but it should not do that unless a
481  * custom allocator is in use.
482  *
483  * Since 2.0.8
484  */
485  AsyncQueue(AsyncQueue&& rhs): q(std::move(rhs.q)) {}
486 
487 /**
488  * The copy constructor is thread safe, as it locks the initializing
489  * object's mutex to obtain a consistent view of it.
490  * @param rhs The AsyncQueue object to be copied.
491  * @exception std::bad_alloc The copy constructor of the queue's
492  * container type, and so this constructor, might throw std::bad_alloc
493  * if memory is exhausted and the system throws in that case. It will
494  * also throw if the copy constructor of the queue's container type
495  * throws any other exceptions, including if any copy or move
496  * constructor or copy or move assignment operator of a contained item
497  * throws.
498  * @exception Thread::MutexError The copy constructor might throw
499  * Thread::MutexError if initialization of the contained mutex fails.
500  * (It is often not worth checking for this, as it means either memory
501  * is exhausted or pthread has run out of other resources to create
502  * new mutexes.)
503  *
504  * Since 2.0.8
505  */
506  // we use the comma operator here to lock the mutex and call the
507  // copy constructor: the lock will be retained until the end of the
508  // full expression in which it is lexically situated, namely until
509  // the end of q's constructor - see C++11 1.9/10 and 12.2/3
510  AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
511 
512 /**
513  * The destructor does not throw unless the destructor of a contained
514  * item throws. It is thread safe (any thread may delete the
515  * AsyncQueue object).
516  */
518  // lock and unlock the mutex in the destructor so that we have an
519  // acquire operation to ensure that when the std::queue object is
520  // destroyed memory is synchronised, so any thread may destroy the
521  // AsyncQueue object
522  Thread::Mutex::Lock lock{mutex};
523  }
524 
525 /* Only has effect if --with-glib-memory-slices-compat or
526  * --with-glib-memory-slices-no-compat option picked */
528 };
529 
530 /**
531  * @class AsyncQueueDispatch async_queue.h c++-gtk-utils/async_queue.h
532  * @brief A thread-safe asynchronous queue with a blocking pop()
533  * method.
534  * @sa AsyncQueue AsyncResult
535  *
536  * AsyncQueueDispatch is a class which has blocking pop_dispatch()
537  * and move_pop_dispatch() methods, which allows it to be waited on by a dedicated
538  * event/message dispatching thread for incoming work (represented by
539  * the data pushed onto the queue). In the same way, it can be used
540  * to implement thread pools, by having threads in the pool waiting on
541  * the queue. The AsyncResult class can be useful for passing results
542  * between threads in conjunction with AsyncQueueDispatch (the
543  * documentation on AsyncResult gives an example).
544  *
545  * By default the queue uses a std::list object as its container
546  * because when adding an item to the queue all allocation can take
547  * place outside the queue object's mutex. However, for types which
548  * have low overhead copy or move constructors, this can be changed
549  * to, say, a std::deque object by specifying it as the second
550  * template parameter.
551  *
552  * If the library is installed using the
553  * \--with-glib-memory-slices-compat or
554  * \--with-glib-memory-slices-no-compat configuration options, any
555  * AsyncQueueDispatch objects constructed on free store will be
556  * constructed in glib memory slices. This does not affect the queue
557  * container itself: to change the allocator of the C++ container, a
558  * custom allocator type can be provided when the AsyncQueueDispatch
559  * object is instantiated offering the standard allocator interface.
560  * If glib memory slices are not used or no AsyncQueueDispatch objects
561  * are constructed on free store, it is not necessary to call
562  * g_thread_init() before manipulating or using an AsyncQueueDispatch
563  * object in multiple threads, but prior to glib version 2.32 glib
564  * itself (and thus glib memory slices) are not thread safe unless
565  * that function has been called.
566  */
567 
568 template <class T, class Container = std::list<T> > class AsyncQueueDispatch {
569 public:
570  typedef typename Container::value_type value_type;
571  typedef typename Container::size_type size_type;
572  typedef Container container_type;
573 private:
574  mutable Thread::Mutex mutex;
575  Thread::Cond cond;
576  std::queue<T, Container> q;
577 
578 // TODO: at the next ABI break make this method explicitly static
579 // this method won't throw: it is for the user to ensure the arguments
580 // do not refer to the same mutex object
581  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
582  m1.lock();
583  for(;;) {
584  if (!m2.trylock()) {
585  return;
586  }
587  m1.unlock();
588  // spin nicely
589 #ifdef CGU_USE_SCHED_YIELD
590  sched_yield();
591 #else
592  usleep(10);
593 #endif
594  m1.lock();
595  }
596  }
597 public:
598 /**
599  * Pushes an item onto the queue. This method has strong exception
600  * safety if the container is a std::list or std::deque container (the
601  * default is std::list), except that if std::deque is used as the
602  * container and the copy constructor, move constructor, copy
603  * assignment operator or move assignment operator of the queue item
604  * throws, it only gives the basic exception guarantee (and the basic
605  * guarantee is not given by std::deque if the queue item's move
606  * constructor throws and it uses a non-default allocator which does
607  * not provide for it to be CopyInsertable). It is thread safe.
608  * @param obj The item to be pushed onto the queue.
609  * @exception std::bad_alloc The method might throw std::bad_alloc if
610  * memory is exhausted and the system throws in that case. It might
611  * also throw if the copy constructor, move constructor, assignment
612  * operator or move assignment operator of the queue item might throw.
613  */
614  void push(const value_type& obj) {
615  Thread::Mutex::Lock lock{mutex};
616  q.push(obj);
617  cond.signal();
618  }
619 
620 /**
621  * Pushes an item onto the queue. This method has strong exception
622  * safety if the container is a std::list or std::deque container (the
623  * default is std::list), except that if std::deque is used as the
624  * container and the copy constructor, move constructor, copy
625  * assignment operator or move assignment operator of the queue item
626  * throws, it only gives the basic exception guarantee (and the basic
627  * guarantee is not given by std::deque if the queue item's move
628  * constructor throws and it uses a non-default allocator which does
629  * not provide for it to be CopyInsertable). It is thread safe.
630  * @param obj The item to be pushed onto the queue.
631  * @exception std::bad_alloc The method might throw std::bad_alloc if
632  * memory is exhausted and the system throws in that case. It might
633  * also throw if the copy constructor, move constructor, assignment
634  * operator or move assignment operator of the queue item might throw.
635  *
636  * Since 2.0.0-rc5
637  */
638  void push(value_type&& obj) {
639  Thread::Mutex::Lock lock{mutex};
640  q.push(std::move(obj));
641  cond.signal();
642  }
643 
644 /**
645  * Pushes an item onto the queue by constructing it in place: that is,
646  * by passing to this method the item's constructor's arguments,
647  * rather than the item itself. This method has strong exception
648  * safety if the container is a std::list or std::deque container (the
649  * default is std::list). (Technically, for a std::deque container,
650  * emplace() only offers the same exception guarantees as does push(),
651  * namely only the basic guarantee where a copy or move of the queue
652  * item throws during the call, but the purpose of emplace is to
653  * construct in place and any reasonable implementation will not copy
654  * or move the queue item.) It is thread safe.
655  * @param args The constructor arguments for the item to be pushed
656  * onto the queue.
657  * @exception std::bad_alloc The method might throw std::bad_alloc if
658  * memory is exhausted and the system throws in that case. It might
659  * also throw if the item's constructor (including any of its
660  * constructor arguments) might throw when constructing the item.
661  * @note The constructor of the item pushed onto the queue must not
662  * access any of the methods of the same queue object, or a deadlock
663  * might occur.
664  *
665  * Since 2.0.0-rc5
666  */
667  template<class... Args>
668  void emplace(Args&&... args) {
669  Thread::Mutex::Lock lock{mutex};
670  q.emplace(std::forward<Args>(args)...);
671  cond.signal();
672  }
673 
674 /**
675  * Pops an item from the queue. This method has strong exception
676  * safety if the container is a std::deque or std::list container (the
677  * default is std::list), provided the destructor of a contained item
678  * does not throw. It is thread safe.
679  * @param obj A value type reference to which the item at the front of
680  * the queue will be assigned.
681  * @exception AsyncQueuePopError If the queue is empty when a pop is
682  * attempted, this method will throw AsyncQueuePopError. It might
683  * also throw if the copy assignment operator of the queue item might
684  * throw. In order to complete pop operations atomically under a
685  * single lock and to retain strong exception safety, the object into
686  * which the popped data is to be placed is passed as an argument by
687  * reference (this avoids a copy from a temporary object after the
688  * data has been extracted from the queue, which would occur if the
689  * item extracted were returned by value). It might also throw if the
690  * destructor of the queue item might throw (but that should never
691  * happen), or if the empty() method of the container type throws
692  * (which would not happen on any sane implementation).
693  */
694  void pop(value_type& obj) {
695  Thread::Mutex::Lock lock{mutex};
696  if (q.empty()) throw AsyncQueuePopError();
697  obj = q.front();
698  q.pop();
699  }
700 
701 /**
702  * Pops an item from the queue using the contained type's move
703  * assignment operator, if it has one. This method is identical to
704  * the pop() method if that type has no move assignment operator.
705  * This method has strong exception safety if the container is a
706  * std::deque or std::list container (the default is std::list),
707  * provided the destructor of a contained item does not throw and the
708  * move assignment operator of a contained item has strong exception
709  * safety. It is thread safe. Use this method in preference to the
710  * pop() method if it is known that the contained items' move
711  * assignment operator does not throw or is strongly exception safe,
712  * or if the use case does not require strong exception safety. This
713  * method must be used in place of the pop() method if the contained
714  * type has a move assignment operator but no copy assignment operator
715  * (such as a std::unique_ptr object).
716  * @param obj A value type reference to which the item at the front of
717  * the queue will be move assigned.
718  * @exception AsyncQueuePopError If the queue is empty when a pop is
719  * attempted, this method will throw AsyncQueuePopError. It might
720  * also throw if the move assignment operator of the queue item might
721  * throw, or if it has no move assignment operator and its copy
722  * assignment operator throws. In order to complete pop operations
723  * atomically under a single lock and to retain strong exception
724  * safety, the object into which the popped data is to be placed is
725  * passed as an argument by reference (this avoids a move from a
726  * temporary object after the data has been extracted from the queue,
727  * which would occur if the item extracted were returned by value).
728  * It might also throw if the destructor of the queue item might throw
729  * (but that should never happen), or if the empty() method of the
730  * container type throws (which would not happen on any sane
731  * implementation).
732  *
733  * Since 2.0.11
734  */
735  void move_pop(value_type& obj) {
736  Thread::Mutex::Lock lock{mutex};
737  if (q.empty()) throw AsyncQueuePopError();
738  obj = std::move(q.front());
739  q.pop();
740  }
741 
742 /**
743  * Pops an item from the queue. If the queue is empty, it will block
744  * until an item becomes available. If it blocks, the wait comprises
745  * a cancellation point. This method is cancellation safe if the
746  * stack unwinds on cancellation, as cancellation is blocked while the
747  * queue is being operated on after coming out of a wait. This method
748  * has strong exception safety if the container is a std::deque or
749  * std::list container (the default is std::list), provided the
750  * destructor of a contained item does not throw. It is thread safe.
751  * @param obj A value type reference to which the item at the front of
752  * the queue will be assigned. This method might throw if the copy
753  * assignment operator of the queue item might throw. In order to
754  * complete pop operations atomically under a single lock and to
755  * retain strong exception safety, the object into which the popped
756  * data is to be placed is passed as an argument by reference (this
757  * avoids a copy from a temporary object after the data has been
758  * extracted from the queue, which would occur if the item extracted
759  * were returned by value). It might also throw if the destructor of
760  * the queue item might throw (but that should never happen), or if
761  * the empty() method of the container type throws (which would not
762  * happen on any sane implementation).
763  */
765  Thread::Mutex::Lock lock{mutex};
766  while (q.empty()) cond.wait(mutex);
768  obj = q.front();
769  q.pop();
770  }
771 
772 /**
773  * Pops an item from the queue using the contained type's move
774  * assignment operator, if it has one (this method is identical to the
775  * pop_dispatch() method if that type has no move assignment
776  * operator). If the queue is empty, it will block until an item
777  * becomes available. If it blocks, the wait comprises a cancellation
778  * point. This method is cancellation safe if the stack unwinds on
779  * cancellation, as cancellation is blocked while the queue is being
780  * operated on after coming out of a wait. This method has strong
781  * exception safety if the container is a std::deque or std::list
782  * container (the default is std::list), provided the destructor of a
783  * contained item does not throw and the move assignment operator of a
784  * contained item has strong exception safety. It is thread safe.
785  * Use this method in preference to the pop_dispatch() method if it is
786  * known that the contained items' move assignment operator does not
787  * throw or is strongly exception safe, or if the use case does not
788  * require strong exception safety. This method must be used in place
789  * of the pop_dispatch() method if the contained type has a move
790  * assignment operator but no copy assignment operator (such as a
791  * std::unique_ptr object).
792  * @param obj A value type reference to which the item at the front of
793  * the queue will be move assigned. This method might throw if the
794  * move assignment operator of the queue item might throw, or if it
795  * has no move assignment operator and its copy assignment operator
796  * throws. In order to complete pop operations atomically under a
797  * single lock and to retain strong exception safety, the object into
798  * which the popped data is to be placed is passed as an argument by
799  * reference (this avoids a move from a temporary object after the
800  * data has been extracted from the queue, which would occur if the
801  * item extracted were returned by value). It might also throw if the
802  * destructor of the queue item might throw (but that should never
803  * happen), or if the empty() method of the container type throws
804  * (which would not happen on any sane implementation).
805  *
806  * Since 2.0.11
807  */
809  Thread::Mutex::Lock lock{mutex};
810  while (q.empty()) cond.wait(mutex);
812  obj = std::move(q.front());
813  q.pop();
814  }
815 
816 /**
817  * Pops an item from the queue. If the queue is empty, it will block
818  * until an item becomes available or until the timeout expires. If
819  * it blocks, the wait comprises a cancellation point. This method is
820  * cancellation safe if the stack unwinds on cancellation, as
821  * cancellation is blocked while the queue is being operated on after
822  * coming out of a wait. This method has strong exception safety if
823  * the container is a std::deque or std::list container (the default
824  * is std::list), provided the destructor of a contained item does not
825  * throw. It is thread safe.
826  * @param obj A value type reference to which the item at the front of
827  * the queue will be assigned. This method might throw if the copy
828  * assignment operator of the queue item might throw. In order to
829  * complete pop operations atomically under a single lock and to
830  * retain strong exception safety, the object into which the popped
831  * data is to be placed is passed as an argument by reference (this
832  * avoids a copy from a temporary object after the data has been
833  * extracted from the queue, which would occur if the item extracted
834  * were returned by value). It might also throw if the destructor of
835  * the queue item might throw (but that should never happen), or if
836  * the empty() method of the container type throws (which would not
837  * happen on any sane implementation).
838  * @param millisec The timeout interval, in milliseconds.
839  * @return If the timeout expires without an item becoming available,
840  * the method will return true. If an item from the queue is
841  * extracted, it returns false.
842  */
843  bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
844  timespec ts;
845  Thread::Cond::get_abs_time(ts, millisec);
846  Thread::Mutex::Lock lock{mutex};
847  while (q.empty()) {
848  if (cond.timed_wait(mutex, ts)) return true;
849  }
851  obj = q.front();
852  q.pop();
853  return false;
854  }
855 
856 /**
857  * Pops an item from the queue using the contained type's move
858  * assignment operator, if it has one (this method is identical to the
859  * pop_timed_dispatch() method if that type has no move assignment
860  * operator). If the queue is empty, it will block until an item
861  * becomes available or until the timeout expires. If it blocks, the
862  * wait comprises a cancellation point. This method is cancellation
863  * safe if the stack unwinds on cancellation, as cancellation is
864  * blocked while the queue is being operated on after coming out of a
865  * wait. This method has strong exception safety if the container is
866  * a std::deque or std::list container (the default is std::list),
867  * provided the destructor of a contained item does not throw and the
868  * move assignment operator of a contained item has strong exception
869  * safety. It is thread safe. Use this method in preference to the
870  * pop_timed_dispatch() method if it is known that the contained
871  * items' move assignment operator does not throw or is strongly
872  * exception safe, or if the use case does not require strong
873  * exception safety. This method must be used in place of the
874  * pop_timed_dispatch() method if the contained type has a move
875  * assignment operator but no copy assignment operator (such as a
876  * std::unique_ptr object).
877  * @param obj A value type reference to which the item at the front of
878  * the queue will be move assigned. This method might throw if the
879  * move assignment operator of the queue item might throw, or if it
880  * has no move assignment operator and its copy assignment operator
881  * throws. In order to complete pop operations atomically under a
882  * single lock and to retain strong exception safety, the object into
883  * which the popped data is to be placed is passed as an argument by
884  * reference (this avoids a move from a temporary object after the
885  * data has been extracted from the queue, which would occur if the
886  * item extracted were returned by value). It might also throw if the
887  * destructor of the queue item might throw (but that should never
888  * happen), or if the empty() method of the container type throws
889  * (which would not happen on any sane implementation).
890  * @param millisec The timeout interval, in milliseconds.
891  * @return If the timeout expires without an item becoming available,
892  * the method will return true. If an item from the queue is
893  * extracted, it returns false.
894  *
895  * Since 2.0.11
896  */
897  bool move_pop_timed_dispatch(value_type& obj, unsigned int millisec) {
898  timespec ts;
899  Thread::Cond::get_abs_time(ts, millisec);
900  Thread::Mutex::Lock lock{mutex};
901  while (q.empty()) {
902  if (cond.timed_wait(mutex, ts)) return true;
903  }
905  obj = std::move(q.front());
906  q.pop();
907  return false;
908  }
909 
910 /**
911  * Discards the item at the front of the queue. This method has
912  * strong exception safety if the container is a std::deque or
913  * std::list container (the default is std::list), provided the
914  * destructor of a contained item does not throw. It is thread safe.
915  * @exception AsyncQueuePopError If the queue is empty when a pop is
916  * attempted, this method will throw AsyncQueuePopError. It might
917  * also throw if the destructor of the queue item might throw (but
918  * that should never happen), or if the empty() method of the
919  * container type throws (which would not happen on any sane
920  * implementation).
921  */
922  void pop() {
923  Thread::Mutex::Lock lock{mutex};
924  if (q.empty()) throw AsyncQueuePopError();
925  q.pop();
926  }
927 
928 /**
929  * @return Whether the queue is empty. It will not throw assuming
930  * that the empty() method of the container type does not throw, as it
931  * will not on any sane implementation.
932  * @note This method is thread safe, but the return value may not be
933  * valid if another thread has pushed to or popped from the queue
934  * before the value returned by the method is acted on. It is
935  * provided as a utility, but may not be meaningful, depending on the
936  * intended usage.
937  */
938  bool empty() const {
939  Thread::Mutex::Lock lock{mutex};
940  return q.empty();
941  }
942 
943 /**
944  * @return The number of items currently in the queue. It will not
945  * throw assuming that the size() method of the container type does
946  * not throw, as it will not on any sane implementation.
947  * @note This method is thread safe, but the return value may not be
948  * valid if another thread has pushed to or popped from the queue
949  * before the value returned by the method is acted on. It is
950  * provided as a utility, but may not be meaningful, depending on the
951  * intended usage.
952  *
953  * Since 2.0.8
954  */
955  size_type size() const {
956  Thread::Mutex::Lock lock{mutex};
957  return q.size();
958  }
959 
960 /**
961  * Swaps the contents of 'this' and 'other'. It will not throw
962  * assuming that the swap method of the container type does not throw
963  * (which the C++11 standard requires not to happen with the standard
964  * sequence containers). It is thread safe and the swap is
965  * thread-wise atomic. A non-class function
966  * Cgu::swap(Cgu::AsyncQueue&, Cgu::AsyncQueue&) method is also
967  * provided which will call this method.
968  * @param other The object to be swapped with this one.
969  * @note An object swapped does not, by virtue of the swap, inherit
970  * any threads waiting on the other one. However if threads were
971  * waiting on a swapped object prior to the swap, and it acquires
972  * items by virtue of the swap, the waiting threads will unblock and
973  * extract those items.
974  *
975  * Since 2.0.8
976  */
977  void swap(AsyncQueueDispatch& other) {
978  if (this != &other) {
979  lock2(mutex, other.mutex); // doesn't throw
981  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
982  q.swap(other.q);
983  if (!q.empty()) cond.broadcast();
984  if (!other.q.empty()) other.cond.broadcast();
985  }
986  }
987 
988 /**
989  * The copy assignment operator is strongly exception safe with the
990  * standard sequence containers (it uses copy and swap). It is also
991  * thread safe, as it safely locks both the assignor's and assignee's
992  * mutex to provide a thread-wise atomic assignment.
993  * @param rhs The assignor.
994  * @return The AsyncQueueDispatch object after assignment.
995  * @exception std::bad_alloc The copy constructor of the queue's
996  * container type, and so this assignment operator, might throw
997  * std::bad_alloc if memory is exhausted and the system throws in that
998  * case. This assignment operator will also throw if the copy
999  * constructor of the queue's container type throws any other
1000  * exceptions, including if any copy or move constructor or copy or
1001  * move assignment operator of a contained item throws.
1002  * @exception Thread::MutexError This assignment operator might
1003  * throw Thread::MutexError if initialization of a transitional
1004  * object's contained mutex fails. (It is often not worth checking
1005  * for this, as it means either memory is exhausted or pthread has run
1006  * out of other resources to create new mutexes.)
1007  * @exception Thread::CondError This assignment operator might throw
1008  * Thread::CondError if initialisation of a transitional object's
1009  * contained condition variable fails. (It is often not worth
1010  * checking for this, as it means either memory is exhausted or
1011  * pthread has run out of other resources to create new condition
1012  * variables.)
1013  * @note The assignee does not, by virtue of the assignment, inherit
1014  * any threads waiting on the assignor. However, if prior to the
1015  * assignment threads were waiting on the assignee and the assignee
1016  * acquires items from the assignor as a result of the assignment, the
1017  * waiting threads will unblock and extract those items.
1018  *
1019  * Since 2.0.8
1020  */
1022  if (this != &rhs) {
1023  lock2(mutex, rhs.mutex); // doesn't throw
1025  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
1026  std::queue<T, Container> temp{rhs.q};
1027  q.swap(temp);
1028  if (!q.empty()) cond.broadcast();
1029  }
1030  return *this;
1031  }
1032 
1033 /**
1034  * This move assignment operator is thread safe as regards the
1035  * assignee (the object moved to), but no synchronization is carried
1036  * out with respect to the rvalue assignor/movant. This is because
1037  * temporaries are only visible and accessible in the thread carrying
1038  * out the move operation and synchronization for them would represent
1039  * pointless overhead. In a case where the user uses std::move to
1040  * force a move from a named object, and that named object's lifetime
1041  * is managed by (or the object is otherwise accessed by) a different
1042  * thread than the one making the move, the user must carry out her
1043  * own synchronization with respect to that different thread, both to
1044  * ensure that a consistent view of the the named object is obtained
1045  * and because that object will be mutated by the move. This method
1046  * invokes std::queue's move assignment operator, and therefore has
1047  * the same exception safety as the standard library's implementation
1048  * of that operator. It will not normally throw unless a custom
1049  * allocator is used which throws on move assignment, or the
1050  * destructor of a contained item throws.
1051  * @param rhs The assignor/movant.
1052  * @return The AsyncQueueDispatch object after move assignment.
1053  * @note The assignee does not, by virtue of the move, inherit any
1054  * threads waiting on the assignor/movant. However, if prior to the
1055  * move threads were waiting on the assignee and the assignee acquires
1056  * items from the assignor/movant as a result of the move, from
1057  * version 2.0.9 the waiting threads will unblock and extract those
1058  * items (such unblocking on move assignment did not happen with
1059  * version 2.0.8, which was a bug).
1060  *
1061  * Since 2.0.8
1062  */
1064  Thread::Mutex::Lock lock{mutex};
1065  q = std::move(rhs.q);
1066  if (!q.empty()) cond.broadcast();
1067  return *this;
1068  }
1069 
1070 /**
1071  * @exception std::bad_alloc The default constructor might throw this
1072  * exception if memory is exhausted and the system throws in that
1073  * case.
1074  * @exception Thread::MutexError The default constructor might throw
1075  * this exception if initialisation of the contained mutex fails. (It
1076  * is often not worth checking for this, as it means either memory is
1077  * exhausted or pthread has run out of other resources to create new
1078  * mutexes.)
1079  * @exception Thread::CondError The default constructor might throw
1080  * this exception if initialisation of the contained condition
1081  * variable fails. (It is often not worth checking for this, as it
1082  * means either memory is exhausted or pthread has run out of other
1083  * resources to create new condition variables.)
1084  */
1085  AsyncQueueDispatch() = default;
1086 
1087 /**
1088  * As regards thread safety, the move constructor does not synchronize
1089  * with respect to the initializing rvalue. This is because
1090  * temporaries are only visible and accessible in the thread carrying
1091  * out the move operation and synchronization for them would represent
1092  * pointless overhead. In a case where a user uses std::move to force
1093  * a move from a named object, and that named object's lifetime is
1094  * managed by (or the object is otherwise accessed by) a different
1095  * thread than the one making the move, the user must carry out her
1096  * own synchronization with respect to that different thread, both to
1097  * ensure that a consistent view of the the named object is obtained
1098  * and because that object will be mutated by the move.
1099  * @param rhs The AsyncQueueDispatch object to be moved.
1100  * @exception Thread::MutexError The move constructor might throw
1101  * Thread::MutexError if initialization of the contained mutex fails.
1102  * If it does so this move constructor is strongly exception safe (if
1103  * it is thrown, the object passed as an argument will be unchanged).
1104  * (It is often not worth checking for this, as it means either memory
1105  * is exhausted or pthread has run out of other resources to create
1106  * new mutexes.)
1107  * @exception Thread::CondError The move constructor might throw
1108  * Thread::CondError exception if initialisation of the contained
1109  * condition variable fails. If it does so this move constructor is
1110  * strongly exception safe (if it is thrown, the object passed as an
1111  * argument will be unchanged). (It is often not worth checking for
1112  * this, as it means either memory is exhausted or pthread has run out
1113  * of other resources to create new condition variables.)
1114  * @note The move constructor might also throw if the queue's
1115  * container type's move constructor might throw, but it should not do
1116  * that unless a custom allocator is in use.
1117  *
1118  * Since 2.0.8
1119  */
1120  AsyncQueueDispatch(AsyncQueueDispatch&& rhs): q(std::move(rhs.q)) {}
1121 
1122 /**
1123  * The copy constructor is thread safe, as it locks the initializing
1124  * object's mutex to obtain a consistent view of it.
1125  * @param rhs The AsyncQueueDispatch object to be copied.
1126  * @exception std::bad_alloc The copy constructor of the queue's
1127  * container type, and so this constructor, might throw std::bad_alloc
1128  * if memory is exhausted and the system throws in that case. It will
1129  * also throw if the copy constructor of the queue's container type
1130  * throws any other exceptions, including if any copy or move
1131  * constructor or copy or move assignment operator of a contained item
1132  * throws.
1133  * @exception Thread::MutexError The copy constructor might throw
1134  * Thread::MutexError if initialization of the contained mutex fails.
1135  * (It is often not worth checking for this, as it means either memory
1136  * is exhausted or pthread has run out of other resources to create
1137  * new mutexes.)
1138  * @exception Thread::CondError The copy constructor might throw this
1139  * exception if initialisation of the contained condition variable
1140  * fails. (It is often not worth checking for this, as it means
1141  * either memory is exhausted or pthread has run out of other
1142  * resources to create new condition variables.)
1143  *
1144  * Since 2.0.8
1145  */
1146  // we use the comma operator here to lock the mutex and call the
1147  // copy constructor: the lock will be retained until the end of the
1148  // full expression in which it is lexically situated, namely until
1149  // the end of q's constructor - see C++11 1.9/10 and 12.2/3
1151  q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1152 
1153 /**
1154  * The destructor does not throw unless the destructor of a contained
1155  * item throws. It is thread safe (any thread may delete the
1156  * AsyncQueueDispatch object). Destroying an AsyncQueueDispatch
1157  * object on which another thread is currently blocked results in
1158  * undefined behavior.
1159  */
1161  // lock and unlock the mutex in the destructor so that we have an
1162  // acquire operation to ensure that when the std::queue object is
1163  // destroyed memory is synchronised, so any thread may destroy the
1164  // AsyncQueueDispatch object
1165  Thread::Mutex::Lock lock{mutex};
1166  }
1167 
1168 /* Only has effect if --with-glib-memory-slices-compat or
1169  * --with-glib-memory-slices-no-compat option picked */
1171 };
1172 
1173 /**
1174  * Swaps the contents of two AsyncQueue objects. It will not throw
1175  * assuming that the swap method of the container type does not throw
1176  * (which the C++11 standard requires not to happen with the standard
1177  * sequence containers). It is thread safe and the swap is
1178  * thread-wise atomic.
1179  * @param q1 An object to be swapped with the other.
1180  * @param q2 An object to be swapped with the other.
1181  * @note Calling std::swap on AsyncQueue objects is thread safe but
1182  * does not provide a thread-wise atomic swap (the swapped objects may
1183  * not be mirror images if during the execution of std::swap's default
1184  * algorithm one of them has been modified), although in many cases
1185  * that doesn't matter. If swap() is called without a namespace
1186  * qualifier, argument dependent look-up will pick this one correctly.
1187  *
1188  * Since 2.0.8
1189  */
1190 template <class T, class Container>
1193  q1.swap(q2);
1194 }
1195 
1196 /**
1197  * Swaps the contents of two AsyncQueueDispatch objects. It will not
1198  * throw assuming that the swap method of the container type does not
1199  * throw (which the C++11 standard requires not to happen with the
1200  * standard sequence containers). It is thread safe and the swap is
1201  * thread-wise atomic.
1202  * @param q1 An object to be swapped with the other.
1203  * @param q2 An object to be swapped with the other.
1204  * @note 1. An object swapped does not, by virtue of the swap, inherit
1205  * any threads waiting on the other one. However if threads were
1206  * waiting on a swapped object prior to the swap, and it acquires
1207  * items by virtue of the swap, the waiting threads will unblock and
1208  * extract those items.
1209  * @note 2. Calling std::swap on AsyncQueueDispatch objects is thread
1210  * safe but does not provide a thread-wise atomic swap (the swapped
1211  * objects may not be mirror images if during the execution of
1212  * std::swap's default algorithm one of them has been modified),
1213  * although in many cases that doesn't matter. If swap() is called
1214  * without a namespace qualifier, argument dependent look-up will pick
1215  * this one correctly.
1216  *
1217  * Since 2.0.8
1218  */
1219 template <class T, class Container>
1222  q1.swap(q2);
1223 }
1224 
1225 #if defined(CGU_USE_INHERITABLE_QUEUE) && !defined(DOXYGEN_PARSING)
1226 
1227 /* This is a specialization of AsyncQueue for std::list objects, which
1228  uses std::list::splice() to push or emplace a new item on the
1229  queue. This means that allocation for the push or emplacement
1230  occurs outside the mutex, so reducing contention (a tip from a talk
1231  by Sean Parent of Adobe). This is first available in version
1232  2.0.20/2.2.3.
1233  */
1234 template <class T, class Allocator>
1235 class AsyncQueue<T, std::list<T, Allocator> > {
1236 public:
1237  typedef std::list<T, Allocator> Container;
1238  typedef typename Container::value_type value_type;
1239  typedef typename Container::size_type size_type;
1240  typedef Container container_type;
1241 private:
1242  mutable Thread::Mutex mutex;
1243  // 23.6.3.1 of C++11 requires std::queue to have a protected
1244  // container member called 'c' for the purposes of derivation. This
1245  // specialisation will have the same binary layout as the
1246  // unspecialized version on any practical implementation: all we do
1247  // is add a splice_end() member
1248  class Q: public std::queue<T, Container> {
1249  public:
1250  void splice_end(Container&& lst) {
1251  this->c.splice(this->c.end(), std::move(lst));
1252  }
1253  } q;
1254 
1255 // TODO: at the next ABI break make this method explicitly static
1256  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1257  m1.lock();
1258  for(;;) {
1259  if (!m2.trylock()) {
1260  return;
1261  }
1262  m1.unlock();
1263  // spin nicely
1264 #ifdef CGU_USE_SCHED_YIELD
1265  sched_yield();
1266 #else
1267  usleep(10);
1268 #endif
1269  m1.lock();
1270  }
1271  }
1272 public:
1273  void push(const value_type& obj) {
1274  Container temp{obj};
1275  Thread::Mutex::Lock lock{mutex};
1276  // splice_end doesn't throw
1277  q.splice_end(std::move(temp));
1278  }
1279 
1280  void push(value_type&& obj) {
1281  // although move intialisation of a std::list object via an
1282  // initializer list with a single element is almost certain to be
1283  // strongly exception safe, it is not mandated by the standard so
1284  // use push_back() (which is required to be strongly exception
1285  // safe)
1286  Container temp;
1287  temp.push_back(std::move(obj));
1288  Thread::Mutex::Lock lock{mutex};
1289  // splice_end doesn't throw
1290  q.splice_end(std::move(temp));
1291  }
1292 
1293  template<class... Args>
1294  void emplace(Args&&... args) {
1295  Container temp;
1296  temp.emplace_back(std::forward<Args>(args)...);
1297  Thread::Mutex::Lock lock{mutex};
1298  // splice_end doesn't throw
1299  q.splice_end(std::move(temp));
1300  }
1301 
1302  void pop(value_type& obj) {
1303  Thread::Mutex::Lock lock{mutex};
1304  if (q.empty()) throw AsyncQueuePopError();
1305  obj = q.front();
1306  q.pop();
1307  }
1308 
1309  void move_pop(value_type& obj) {
1310  Thread::Mutex::Lock lock{mutex};
1311  if (q.empty()) throw AsyncQueuePopError();
1312  obj = std::move(q.front());
1313  q.pop();
1314  }
1315 
1316  void pop() {
1317  Thread::Mutex::Lock lock{mutex};
1318  if (q.empty()) throw AsyncQueuePopError();
1319  q.pop();
1320  }
1321 
1322  bool empty() const {
1323  Thread::Mutex::Lock lock{mutex};
1324  return q.empty();
1325  }
1326 
1327  size_type size() const {
1328  Thread::Mutex::Lock lock{mutex};
1329  return q.size();
1330  }
1331 
1332  void swap(AsyncQueue& other) {
1333  if (this != &other) {
1334  lock2(mutex, other.mutex); // doesn't throw
1335  Thread::Mutex::Lock l1{mutex, Thread::locked};
1336  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
1337  q.swap(other.q);
1338  }
1339  }
1340 
1341  AsyncQueue& operator=(const AsyncQueue& rhs) {
1342  if (this != &rhs) {
1343  lock2(mutex, rhs.mutex); // doesn't throw
1344  Thread::Mutex::Lock l1{mutex, Thread::locked};
1345  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
1346  Q temp{rhs.q};
1347  q.swap(temp);
1348  }
1349  return *this;
1350  }
1351 
1352  AsyncQueue& operator=(AsyncQueue&& rhs) {
1353  Thread::Mutex::Lock lock{mutex};
1354  q = std::move(rhs.q);
1355  return *this;
1356  }
1357 
1358  AsyncQueue() = default;
1359 
1360  AsyncQueue(AsyncQueue&& rhs): q(std::move(rhs.q)) {}
1361 
1362  AsyncQueue(const AsyncQueue& rhs): q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1363 
1364  ~AsyncQueue() {
1365  Thread::Mutex::Lock lock{mutex};
1366  }
1367 
1369 };
1370 
1371 /* This is a specialization of AsyncQueueDispatch for std::list
1372  objects, which uses std::list::splice() to push or emplace a new
1373  item on the queue. This means that allocation for the push or
1374  emplacement occurs outside the mutex, so reducing contention (a tip
1375  from a talk by Sean Parent of Adobe). This is first available in
1376  version 2.0.20/2.2.3.
1377  */
1378 template <class T, class Allocator>
1379 class AsyncQueueDispatch<T, std::list<T, Allocator> > {
1380 public:
1381  typedef std::list<T, Allocator> Container;
1382  typedef typename Container::value_type value_type;
1383  typedef typename Container::size_type size_type;
1384  typedef Container container_type;
1385 private:
1386  mutable Thread::Mutex mutex;
1387  Thread::Cond cond;
1388  // 23.6.3.1 of C++11 requires std::queue to have a protected
1389  // container member called 'c' for the purposes of derivation. This
1390  // specialisation will have the same binary layout as the
1391  // unspecialized version on any practical implementation: all we do
1392  // is add a splice_end() member
1393  class Q: public std::queue<T, Container> {
1394  public:
1395  void splice_end(Container&& lst) {
1396  this->c.splice(this->c.end(), std::move(lst));
1397  }
1398  } q;
1399 
1400 // TODO: at the next ABI break make this method explicitly static
1401  void lock2(Thread::Mutex& m1, Thread::Mutex& m2) {
1402  m1.lock();
1403  for(;;) {
1404  if (!m2.trylock()) {
1405  return;
1406  }
1407  m1.unlock();
1408  // spin nicely
1409 #ifdef CGU_USE_SCHED_YIELD
1410  sched_yield();
1411 #else
1412  usleep(10);
1413 #endif
1414  m1.lock();
1415  }
1416  }
1417 public:
1418  void push(const value_type& obj) {
1419  Container temp{obj};
1420  Thread::Mutex::Lock lock{mutex};
1421  // splice_end doesn't throw
1422  q.splice_end(std::move(temp));
1423  cond.signal();
1424  }
1425 
1426  void push(value_type&& obj) {
1427  // although move intialisation of a std::list object via an
1428  // initializer list with a single element is almost certain to be
1429  // strongly exception safe, it is not mandated by the standard so
1430  // use push_back() (which is required to be strongly exception
1431  // safe)
1432  Container temp;
1433  temp.push_back(std::move(obj));
1434  Thread::Mutex::Lock lock{mutex};
1435  // splice_end doesn't throw
1436  q.splice_end(std::move(temp));
1437  cond.signal();
1438  }
1439 
1440  template<class... Args>
1441  void emplace(Args&&... args) {
1442  Container temp;
1443  temp.emplace_back(std::forward<Args>(args)...);
1444  Thread::Mutex::Lock lock{mutex};
1445  // splice_end doesn't throw
1446  q.splice_end(std::move(temp));
1447  cond.signal();
1448  }
1449 
1450  void pop(value_type& obj) {
1451  Thread::Mutex::Lock lock{mutex};
1452  if (q.empty()) throw AsyncQueuePopError();
1453  obj = q.front();
1454  q.pop();
1455  }
1456 
1457  void move_pop(value_type& obj) {
1458  Thread::Mutex::Lock lock{mutex};
1459  if (q.empty()) throw AsyncQueuePopError();
1460  obj = std::move(q.front());
1461  q.pop();
1462  }
1463 
1464  void pop_dispatch(value_type& obj) {
1465  Thread::Mutex::Lock lock{mutex};
1466  while (q.empty()) cond.wait(mutex);
1467  Thread::CancelBlock b;
1468  obj = q.front();
1469  q.pop();
1470  }
1471 
1472  void move_pop_dispatch(value_type& obj) {
1473  Thread::Mutex::Lock lock{mutex};
1474  while (q.empty()) cond.wait(mutex);
1475  Thread::CancelBlock b;
1476  obj = std::move(q.front());
1477  q.pop();
1478  }
1479 
1480  bool pop_timed_dispatch(value_type& obj, unsigned int millisec) {
1481  timespec ts;
1482  Thread::Cond::get_abs_time(ts, millisec);
1483  Thread::Mutex::Lock lock{mutex};
1484  while (q.empty()) {
1485  if (cond.timed_wait(mutex, ts)) return true;
1486  }
1487  Thread::CancelBlock b;
1488  obj = q.front();
1489  q.pop();
1490  return false;
1491  }
1492 
1493  bool move_pop_timed_dispatch(value_type& obj, unsigned int millisec) {
1494  timespec ts;
1495  Thread::Cond::get_abs_time(ts, millisec);
1496  Thread::Mutex::Lock lock{mutex};
1497  while (q.empty()) {
1498  if (cond.timed_wait(mutex, ts)) return true;
1499  }
1500  Thread::CancelBlock b;
1501  obj = std::move(q.front());
1502  q.pop();
1503  return false;
1504  }
1505 
1506  void pop() {
1507  Thread::Mutex::Lock lock{mutex};
1508  if (q.empty()) throw AsyncQueuePopError();
1509  q.pop();
1510  }
1511 
1512  bool empty() const {
1513  Thread::Mutex::Lock lock{mutex};
1514  return q.empty();
1515  }
1516 
1517  size_type size() const {
1518  Thread::Mutex::Lock lock{mutex};
1519  return q.size();
1520  }
1521 
1522  void swap(AsyncQueueDispatch& other) {
1523  if (this != &other) {
1524  lock2(mutex, other.mutex); // doesn't throw
1525  Thread::Mutex::Lock l1{mutex, Thread::locked};
1526  Thread::Mutex::Lock l2{other.mutex, Thread::locked};
1527  q.swap(other.q);
1528  if (!q.empty()) cond.broadcast();
1529  if (!other.q.empty()) other.cond.broadcast();
1530  }
1531  }
1532 
1534  if (this != &rhs) {
1535  lock2(mutex, rhs.mutex); // doesn't throw
1536  Thread::Mutex::Lock l1{mutex, Thread::locked};
1537  Thread::Mutex::Lock l2{rhs.mutex, Thread::locked};
1538  Q temp{rhs.q};
1539  q.swap(temp);
1540  if (!q.empty()) cond.broadcast();
1541  }
1542  return *this;
1543  }
1544 
1546  Thread::Mutex::Lock lock{mutex};
1547  q = std::move(rhs.q);
1548  if (!q.empty()) cond.broadcast();
1549  return *this;
1550  }
1551 
1552  AsyncQueueDispatch() = default;
1553 
1554  AsyncQueueDispatch(AsyncQueueDispatch&& rhs): q(std::move(rhs.q)) {}
1555 
1557  q((Thread::Mutex::Lock(rhs.mutex), rhs.q)) {}
1558 
1560  Thread::Mutex::Lock lock{mutex};
1561  }
1562 
1564 };
1565 
1566 #endif // CGU_USE_INHERITABLE_QUEUE
1567 
1568 } // namespace Cgu
1569 
1570 #endif