View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  /*
18   * Written by Doug Lea with assistance from members of JCP JSR-166
19   * Expert Group and released to the public domain, as explained at
20   * http://creativecommons.org/publicdomain/zero/1.0/
21   */
22  
23  package org.jboss.netty.util.internal;
24  
25  import java.util.AbstractQueue;
26  import java.util.Collection;
27  import java.util.Iterator;
28  import java.util.NoSuchElementException;
29  import java.util.Queue;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.locks.LockSupport;
33  
34  /**
35   * This class is a copied from <a href="http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/LinkedTransferQueue.java"> URL revision 1.91 </a>
36   * <br>
37   * The only difference is that it replace {@link BlockingQueue} and any reference to the TransferQueue interface was removed
38   * <br>
39   *
40   * <strong>
41   * Please use {@link QueueFactory} to create a Queue as it will use the "optimal" implementation depending on the JVM
42   * </strong>
43   * <br>
44   * <br>
45   *
46   * An unbounded {@link BlockingQueue} based on linked nodes.
47   * This queue orders elements FIFO (first-in-first-out) with respect
48   * to any given producer.  The <em>head</em> of the queue is that
49   * element that has been on the queue the longest time for some
50   * producer.  The <em>tail</em> of the queue is that element that has
51   * been on the queue the shortest time for some producer.
52   *
53   * <p>Beware that, unlike in most collections, the {@code size} method
54   * is <em>NOT</em> a constant-time operation. Because of the
55   * asynchronous nature of these queues, determining the current number
56   * of elements requires a traversal of the elements, and so may report
57   * inaccurate results if this collection is modified during traversal.
58   * Additionally, the bulk operations {@code addAll},
59   * {@code removeAll}, {@code retainAll}, {@code containsAll},
60   * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
61   * to be performed atomically. For example, an iterator operating
62   * concurrently with an {@code addAll} operation might view only some
63   * of the added elements.
64   *
65   * <p>This class and its iterator implement all of the
66   * <em>optional</em> methods of the {@link Collection} and {@link
67   * Iterator} interfaces.
68   *
69   * <p>Memory consistency effects: As with other concurrent
70   * collections, actions in a thread prior to placing an object into a
71   * {@code LinkedTransferQueue}
72   * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
73   * actions subsequent to the access or removal of that element from
74   * the {@code LinkedTransferQueue} in another thread.
75   *
76   * <p>This class is a member of the
77   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
78   * Java Collections Framework</a>.
79   *
80   * @since 1.7
81   * @param <E> the type of elements held in this collection
82   */
83  @Deprecated
84  public class LinkedTransferQueue<E> extends AbstractQueue<E>
85      implements BlockingQueue<E>, java.io.Serializable {
86      private static final long serialVersionUID = -3223113410248163686L;
87  
88      /*
89       * *** Overview of Dual Queues with Slack ***
90       *
91       * Dual Queues, introduced by Scherer and Scott
92       * (http://www.cs.rice.edu/~wns1/papers/2004-DISC-DDS.pdf) are
93       * (linked) queues in which nodes may represent either data or
94       * requests.  When a thread tries to enqueue a data node, but
95       * encounters a request node, it instead "matches" and removes it;
96       * and vice versa for enqueuing requests. Blocking Dual Queues
97       * arrange that threads enqueuing unmatched requests block until
98       * other threads provide the match. Dual Synchronous Queues (see
99       * Scherer, Lea, & Scott
100      * http://www.cs.rochester.edu/u/scott/papers/2009_Scherer_CACM_SSQ.pdf)
101      * additionally arrange that threads enqueuing unmatched data also
102      * block.  Dual Transfer Queues support all of these modes, as
103      * dictated by callers.
104      *
105      * A FIFO dual queue may be implemented using a variation of the
106      * Michael & Scott (M&S) lock-free queue algorithm
107      * (http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf).
108      * It maintains two pointer fields, "head", pointing to a
109      * (matched) node that in turn points to the first actual
110      * (unmatched) queue node (or null if empty); and "tail" that
111      * points to the last node on the queue (or again null if
112      * empty). For example, here is a possible queue with four data
113      * elements:
114      *
115      *  head                tail
116      *    |                   |
117      *    v                   v
118      *    M -> U -> U -> U -> U
119      *
120      * The M&S queue algorithm is known to be prone to scalability and
121      * overhead limitations when maintaining (via CAS) these head and
122      * tail pointers. This has led to the development of
123      * contention-reducing variants such as elimination arrays (see
124      * Moir et al http://portal.acm.org/citation.cfm?id=1074013) and
125      * optimistic back pointers (see Ladan-Mozes & Shavit
126      * http://people.csail.mit.edu/edya/publications/OptimisticFIFOQueue-journal.pdf).
127      * However, the nature of dual queues enables a simpler tactic for
128      * improving M&S-style implementations when dual-ness is needed.
129      *
130      * In a dual queue, each node must atomically maintain its match
131      * status. While there are other possible variants, we implement
132      * this here as: for a data-mode node, matching entails CASing an
133      * "item" field from a non-null data value to null upon match, and
134      * vice-versa for request nodes, CASing from null to a data
135      * value. (Note that the linearization properties of this style of
136      * queue are easy to verify -- elements are made available by
137      * linking, and unavailable by matching.) Compared to plain M&S
138      * queues, this property of dual queues requires one additional
139      * successful atomic operation per enq/deq pair. But it also
140      * enables lower cost variants of queue maintenance mechanics. (A
141      * variation of this idea applies even for non-dual queues that
142      * support deletion of interior elements, such as
143      * j.u.c.ConcurrentLinkedQueue.)
144      *
145      * Once a node is matched, its match status can never again
146      * change.  We may thus arrange that the linked list of them
147      * contain a prefix of zero or more matched nodes, followed by a
148      * suffix of zero or more unmatched nodes. (Note that we allow
149      * both the prefix and suffix to be zero length, which in turn
150      * means that we do not use a dummy header.)  If we were not
151      * concerned with either time or space efficiency, we could
152      * correctly perform enqueue and dequeue operations by traversing
153      * from a pointer to the initial node; CASing the item of the
154      * first unmatched node on match and CASing the next field of the
155      * trailing node on appends. (Plus some special-casing when
156      * initially empty).  While this would be a terrible idea in
157      * itself, it does have the benefit of not requiring ANY atomic
158      * updates on head/tail fields.
159      *
160      * We introduce here an approach that lies between the extremes of
161      * never versus always updating queue (head and tail) pointers.
162      * This offers a tradeoff between sometimes requiring extra
163      * traversal steps to locate the first and/or last unmatched
164      * nodes, versus the reduced overhead and contention of fewer
165      * updates to queue pointers. For example, a possible snapshot of
166      * a queue is:
167      *
168      *  head           tail
169      *    |              |
170      *    v              v
171      *    M -> M -> U -> U -> U -> U
172      *
173      * The best value for this "slack" (the targeted maximum distance
174      * between the value of "head" and the first unmatched node, and
175      * similarly for "tail") is an empirical matter. We have found
176      * that using very small constants in the range of 1-3 work best
177      * over a range of platforms. Larger values introduce increasing
178      * costs of cache misses and risks of long traversal chains, while
179      * smaller values increase CAS contention and overhead.
180      *
181      * Dual queues with slack differ from plain M&S dual queues by
182      * virtue of only sometimes updating head or tail pointers when
183      * matching, appending, or even traversing nodes; in order to
184      * maintain a targeted slack.  The idea of "sometimes" may be
185      * operationalized in several ways. The simplest is to use a
186      * per-operation counter incremented on each traversal step, and
187      * to try (via CAS) to update the associated queue pointer
188      * whenever the count exceeds a threshold. Another, that requires
189      * more overhead, is to use random number generators to update
190      * with a given probability per traversal step.
191      *
192      * In any strategy along these lines, because CASes updating
193      * fields may fail, the actual slack may exceed targeted
194      * slack. However, they may be retried at any time to maintain
195      * targets.  Even when using very small slack values, this
196      * approach works well for dual queues because it allows all
197      * operations up to the point of matching or appending an item
198      * (hence potentially allowing progress by another thread) to be
199      * read-only, thus not introducing any further contention. As
200      * described below, we implement this by performing slack
201      * maintenance retries only after these points.
202      *
203      * As an accompaniment to such techniques, traversal overhead can
204      * be further reduced without increasing contention of head
205      * pointer updates: Threads may sometimes shortcut the "next" link
206      * path from the current "head" node to be closer to the currently
207      * known first unmatched node, and similarly for tail. Again, this
208      * may be triggered with using thresholds or randomization.
209      *
210      * These ideas must be further extended to avoid unbounded amounts
211      * of costly-to-reclaim garbage caused by the sequential "next"
212      * links of nodes starting at old forgotten head nodes: As first
213      * described in detail by Boehm
214      * (http://portal.acm.org/citation.cfm?doid=503272.503282) if a GC
215      * delays noticing that any arbitrarily old node has become
216      * garbage, all newer dead nodes will also be unreclaimed.
217      * (Similar issues arise in non-GC environments.)  To cope with
218      * this in our implementation, upon CASing to advance the head
219      * pointer, we set the "next" link of the previous head to point
220      * only to itself; thus limiting the length of connected dead lists.
221      * (We also take similar care to wipe out possibly garbage
222      * retaining values held in other Node fields.)  However, doing so
223      * adds some further complexity to traversal: If any "next"
224      * pointer links to itself, it indicates that the current thread
225      * has lagged behind a head-update, and so the traversal must
226      * continue from the "head".  Traversals trying to find the
227      * current tail starting from "tail" may also encounter
228      * self-links, in which case they also continue at "head".
229      *
230      * It is tempting in slack-based scheme to not even use CAS for
231      * updates (similarly to Ladan-Mozes & Shavit). However, this
232      * cannot be done for head updates under the above link-forgetting
233      * mechanics because an update may leave head at a detached node.
234      * And while direct writes are possible for tail updates, they
235      * increase the risk of long retraversals, and hence long garbage
236      * chains, which can be much more costly than is worthwhile
237      * considering that the cost difference of performing a CAS vs
238      * write is smaller when they are not triggered on each operation
239      * (especially considering that writes and CASes equally require
240      * additional GC bookkeeping ("write barriers") that are sometimes
241      * more costly than the writes themselves because of contention).
242      *
243      * *** Overview of implementation ***
244      *
245      * We use a threshold-based approach to updates, with a slack
246      * threshold of two -- that is, we update head/tail when the
247      * current pointer appears to be two or more steps away from the
248      * first/last node. The slack value is hard-wired: a path greater
249      * than one is naturally implemented by checking equality of
250      * traversal pointers except when the list has only one element,
251      * in which case we keep slack threshold at one. Avoiding tracking
252      * explicit counts across method calls slightly simplifies an
253      * already-messy implementation. Using randomization would
254      * probably work better if there were a low-quality dirt-cheap
255      * per-thread one available, but even ThreadLocalRandom is too
256      * heavy for these purposes.
257      *
258      * With such a small slack threshold value, it is not worthwhile
259      * to augment this with path short-circuiting (i.e., unsplicing
260      * interior nodes) except in the case of cancellation/removal (see
261      * below).
262      *
263      * We allow both the head and tail fields to be null before any
264      * nodes are enqueued; initializing upon first append.  This
265      * simplifies some other logic, as well as providing more
266      * efficient explicit control paths instead of letting JVMs insert
267      * implicit NullPointerExceptions when they are null.  While not
268      * currently fully implemented, we also leave open the possibility
269      * of re-nulling these fields when empty (which is complicated to
270      * arrange, for little benefit.)
271      *
272      * All enqueue/dequeue operations are handled by the single method
273      * "xfer" with parameters indicating whether to act as some form
274      * of offer, put, poll, take, or transfer (each possibly with
275      * timeout). The relative complexity of using one monolithic
276      * method outweighs the code bulk and maintenance problems of
277      * using separate methods for each case.
278      *
279      * Operation consists of up to three phases. The first is
280      * implemented within method xfer, the second in tryAppend, and
281      * the third in method awaitMatch.
282      *
283      * 1. Try to match an existing node
284      *
285      *    Starting at head, skip already-matched nodes until finding
286      *    an unmatched node of opposite mode, if one exists, in which
287      *    case matching it and returning, also if necessary updating
288      *    head to one past the matched node (or the node itself if the
289      *    list has no other unmatched nodes). If the CAS misses, then
290      *    a loop retries advancing head by two steps until either
291      *    success or the slack is at most two. By requiring that each
292      *    attempt advances head by two (if applicable), we ensure that
293      *    the slack does not grow without bound. Traversals also check
294      *    if the initial head is now off-list, in which case they
295      *    start at the new head.
296      *
297      *    If no candidates are found and the call was untimed
298      *    poll/offer, (argument "how" is NOW) return.
299      *
300      * 2. Try to append a new node (method tryAppend)
301      *
302      *    Starting at current tail pointer, find the actual last node
303      *    and try to append a new node (or if head was null, establish
304      *    the first node). Nodes can be appended only if their
305      *    predecessors are either already matched or are of the same
306      *    mode. If we detect otherwise, then a new node with opposite
307      *    mode must have been appended during traversal, so we must
308      *    restart at phase 1. The traversal and update steps are
309      *    otherwise similar to phase 1: Retrying upon CAS misses and
310      *    checking for staleness.  In particular, if a self-link is
311      *    encountered, then we can safely jump to a node on the list
312      *    by continuing the traversal at current head.
313      *
314      *    On successful append, if the call was ASYNC, return.
315      *
316      * 3. Await match or cancellation (method awaitMatch)
317      *
318      *    Wait for another thread to match node; instead cancelling if
319      *    the current thread was interrupted or the wait timed out. On
320      *    multiprocessors, we use front-of-queue spinning: If a node
321      *    appears to be the first unmatched node in the queue, it
322      *    spins a bit before blocking. In either case, before blocking
323      *    it tries to unsplice any nodes between the current "head"
324      *    and the first unmatched node.
325      *
326      *    Front-of-queue spinning vastly improves performance of
327      *    heavily contended queues. And so long as it is relatively
328      *    brief and "quiet", spinning does not much impact performance
329      *    of less-contended queues.  During spins threads check their
330      *    interrupt status and generate a thread-local random number
331      *    to decide to occasionally perform a Thread.yield. While
332      *    yield has underdefined specs, we assume that it might help,
333      *    and will not hurt, in limiting impact of spinning on busy
334      *    systems.  We also use smaller (1/2) spins for nodes that are
335      *    not known to be front but whose predecessors have not
336      *    blocked -- these "chained" spins avoid artifacts of
337      *    front-of-queue rules which otherwise lead to alternating
338      *    nodes spinning vs blocking. Further, front threads that
339      *    represent phase changes (from data to request node or vice
340      *    versa) compared to their predecessors receive additional
341      *    chained spins, reflecting longer paths typically required to
342      *    unblock threads during phase changes.
343      * ** Unlinking removed interior nodes **
344      *
345      * In addition to minimizing garbage retention via self-linking
346      * described above, we also unlink removed interior nodes. These
347      * may arise due to timed out or interrupted waits, or calls to
348      * remove(x) or Iterator.remove.  Normally, given a node that was
349      * at one time known to be the predecessor of some node s that is
350      * to be removed, we can unsplice s by CASing the next field of
351      * its predecessor if it still points to s (otherwise s must
352      * already have been removed or is now offlist). But there are two
353      * situations in which we cannot guarantee to make node s
354      * unreachable in this way: (1) If s is the trailing node of list
355      * (i.e., with null next), then it is pinned as the target node
356      * for appends, so can only be removed later after other nodes are
357      * appended. (2) We cannot necessarily unlink s given a
358      * predecessor node that is matched (including the case of being
359      * cancelled): the predecessor may already be unspliced, in which
360      * case some previous reachable node may still point to s.
361      * (For further explanation see Herlihy & Shavit "The Art of
362      * Multiprocessor Programming" chapter 9).  Although, in both
363      * cases, we can rule out the need for further action if either s
364      * or its predecessor are (or can be made to be) at, or fall off
365      * from, the head of list.
366      *
367      * Without taking these into account, it would be possible for an
368      * unbounded number of supposedly removed nodes to remain
369      * reachable.  Situations leading to such buildup are uncommon but
370      * can occur in practice; for example when a series of short timed
371      * calls to poll repeatedly time out but never otherwise fall off
372      * the list because of an untimed call to take at the front of the
373      * queue.
374      *
375      * When these cases arise, rather than always retraversing the
376      * entire list to find an actual predecessor to unlink (which
377      * won't help for case (1) anyway), we record a conservative
378      * estimate of possible unsplice failures (in "sweepVotes").
379      * We trigger a full sweep when the estimate exceeds a threshold
380      * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
381      * removal failures to tolerate before sweeping through, unlinking
382      * cancelled nodes that were not unlinked upon initial removal.
383      * We perform sweeps by the thread hitting threshold (rather than
384      * background threads or by spreading work to other threads)
385      * because in the main contexts in which removal occurs, the
386      * caller is already timed-out, cancelled, or performing a
387      * potentially O(n) operation (e.g. remove(x)), none of which are
388      * time-critical enough to warrant the overhead that alternatives
389      * would impose on other threads.
390      *
391      * Because the sweepVotes estimate is conservative, and because
392      * nodes become unlinked "naturally" as they fall off the head of
393      * the queue, and because we allow votes to accumulate even while
394      * sweeps are in progress, there are typically significantly fewer
395      * such nodes than estimated.  Choice of a threshold value
396      * balances the likelihood of wasted effort and contention, versus
397      * providing a worst-case bound on retention of interior nodes in
398      * quiescent queues. The value defined below was chosen
399      * empirically to balance these under various timeout scenarios.
400      *
401      * Note that we cannot self-link unlinked interior nodes during
402      * sweeps. However, the associated garbage chains terminate when
403      * some successor ultimately falls off the head of the list and is
404      * self-linked.
405      */
406 
407     /** True if on multiprocessor */
408     private static final boolean MP =
409         Runtime.getRuntime().availableProcessors() > 1;
410 
411     /**
412      * The number of times to spin (with randomly interspersed calls
413      * to Thread.yield) on multiprocessor before blocking when a node
414      * is apparently the first waiter in the queue.  See above for
415      * explanation. Must be a power of two. The value is empirically
416      * derived -- it works pretty well across a variety of processors,
417      * numbers of CPUs, and OSes.
418      */
419     private static final int FRONT_SPINS   = 1 << 7;
420 
421     /**
422      * The number of times to spin before blocking when a node is
423      * preceded by another node that is apparently spinning.  Also
424      * serves as an increment to FRONT_SPINS on phase changes, and as
425      * base average frequency for yielding during spins. Must be a
426      * power of two.
427      */
428     private static final int CHAINED_SPINS = FRONT_SPINS >>> 1;
429 
430     /**
431      * The maximum number of estimated removal failures (sweepVotes)
432      * to tolerate before sweeping through the queue unlinking
433      * cancelled nodes that were not unlinked upon initial
434      * removal. See above for explanation. The value must be at least
435      * two to avoid useless sweeps when removing trailing nodes.
436      */
437     static final int SWEEP_THRESHOLD = 32;
438 
439     /**
440      * Queue nodes. Uses Object, not E, for items to allow forgetting
441      * them after use.  Relies heavily on Unsafe mechanics to minimize
442      * unnecessary ordering constraints: Writes that are intrinsically
443      * ordered wrt other accesses or CASes use simple relaxed forms.
444      */
445     static final class Node {
446         final boolean isData;   // false if this is a request node
447         volatile Object item;   // initially non-null if isData; CASed to match
448         volatile Node next;
449         volatile Thread waiter; // null until waiting
450 
451         // CAS methods for fields
452         final boolean casNext(Node cmp, Node val) {
453             return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
454         }
455 
456         final boolean casItem(Object cmp, Object val) {
457             // assert cmp == null || cmp.getClass() != Node.class;
458             return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
459         }
460 
461         /**
462          * Constructs a new node.  Uses relaxed write because item can
463          * only be seen after publication via casNext.
464          */
465         Node(Object item, boolean isData) {
466             UNSAFE.putObject(this, itemOffset, item); // relaxed write
467             this.isData = isData;
468         }
469 
470         /**
471          * Links node to itself to avoid garbage retention.  Called
472          * only after CASing head field, so uses relaxed write.
473          */
474         final void forgetNext() {
475             UNSAFE.putObject(this, nextOffset, this);
476         }
477 
478         /**
479          * Sets item to self and waiter to null, to avoid garbage
480          * retention after matching or cancelling. Uses relaxed writes
481          * because order is already constrained in the only calling
482          * contexts: item is forgotten only after volatile/atomic
483          * mechanics that extract items.  Similarly, clearing waiter
484          * follows either CAS or return from park (if ever parked;
485          * else we don't care).
486          */
487         final void forgetContents() {
488             UNSAFE.putObject(this, itemOffset, this);
489             UNSAFE.putObject(this, waiterOffset, null);
490         }
491 
492         /**
493          * Returns true if this node has been matched, including the
494          * case of artificial matches due to cancellation.
495          */
496         final boolean isMatched() {
497             Object x = item;
498             return x == this || x == null == isData;
499         }
500 
501         /**
502          * Returns true if this is an unmatched request node.
503          */
504         final boolean isUnmatchedRequest() {
505             return !isData && item == null;
506         }
507 
508         /**
509          * Returns true if a node with the given mode cannot be
510          * appended to this node because this node is unmatched and
511          * has opposite data mode.
512          */
513         final boolean cannotPrecede(boolean haveData) {
514             boolean d = isData;
515             Object x;
516             return d != haveData && (x = item) != this && x != null == d;
517         }
518 
519         /**
520          * Tries to artificially match a data node -- used by remove.
521          */
522         final boolean tryMatchData() {
523             // assert isData;
524             Object x = item;
525             if (x != null && x != this && casItem(x, null)) {
526                 LockSupport.unpark(waiter);
527                 return true;
528             }
529             return false;
530         }
531 
532         // Unsafe mechanics
533         private static final sun.misc.Unsafe UNSAFE;
534         private static final long itemOffset;
535         private static final long nextOffset;
536         private static final long waiterOffset;
537         static {
538             try {
539                 UNSAFE = getUnsafe();
540                 Class<?> k = Node.class;
541                 itemOffset = UNSAFE.objectFieldOffset
542                     (k.getDeclaredField("item"));
543                 nextOffset = UNSAFE.objectFieldOffset
544                     (k.getDeclaredField("next"));
545                 waiterOffset = UNSAFE.objectFieldOffset
546                     (k.getDeclaredField("waiter"));
547             } catch (Exception e) {
548                 throw new Error(e);
549             }
550         }
551     }
552 
553     /** head of the queue; null until first enqueue */
554     transient volatile Node head;
555 
556     /** tail of the queue; null until first append */
557     private transient volatile Node tail;
558 
559     /** The number of apparent failures to unsplice removed nodes */
560     private transient volatile int sweepVotes;
561 
562     // CAS methods for fields
563     private boolean casTail(Node cmp, Node val) {
564         return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
565     }
566 
567     private boolean casHead(Node cmp, Node val) {
568         return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
569     }
570 
571     private boolean casSweepVotes(int cmp, int val) {
572         return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val);
573     }
574 
575     /*
576      * Possible values for "how" argument in xfer method.
577      */
578     private static final int NOW   = 0; // for untimed poll, tryTransfer
579     private static final int ASYNC = 1; // for offer, put, add
580     private static final int SYNC  = 2; // for transfer, take
581     private static final int TIMED = 3; // for timed poll, tryTransfer
582 
583     @SuppressWarnings("unchecked")
584     static <E> E cast(Object item) {
585         // assert item == null || item.getClass() != Node.class;
586         return (E) item;
587     }
588 
589     /**
590      * Implements all queuing methods. See above for explanation.
591      *
592      * @param e the item or null for take
593      * @param haveData true if this is a put, else a take
594      * @param how NOW, ASYNC, SYNC, or TIMED
595      * @param nanos timeout in nanosecs, used only if mode is TIMED
596      * @return an item if matched, else e
597      * @throws NullPointerException if haveData mode but e is null
598      */
599     private E xfer(E e, boolean haveData, int how, long nanos) {
600         if (haveData && e == null) {
601             throw new NullPointerException();
602         }
603         Node s = null;                        // the node to append, if needed
604 
605         retry:
606         for (;;) {                            // restart on append race
607 
608             for (Node h = head, p = h; p != null;) { // find & match first node
609                 boolean isData = p.isData;
610                 Object item = p.item;
611                 if (item != p && item != null == isData) { // unmatched
612                     if (isData == haveData) {
613                         break;
614                     }
615                     if (p.casItem(item, e)) { // match
616                         for (Node q = p; q != h;) {
617                             Node n = q.next;  // update by 2 unless singleton
618                             if (head == h && casHead(h, n == null ? q : n)) {
619                                 h.forgetNext();
620                                 break;
621                             }                 // advance and retry
622                             if ((h = head)   == null ||
623                                 (q = h.next) == null || !q.isMatched())
624                              {
625                                 break;        // unless slack < 2
626                             }
627                         }
628                         LockSupport.unpark(p.waiter);
629                         return LinkedTransferQueue.<E>cast(item);
630                     }
631                 }
632                 Node n = p.next;
633                 p = p != n ? n : (h = head); // Use head if p offlist
634             }
635 
636             if (how != NOW) {                 // No matches available
637                 if (s == null) {
638                     s = new Node(e, haveData);
639                 }
640                 Node pred = tryAppend(s, haveData);
641                 if (pred == null)
642                  {
643                     continue retry;           // lost race vs opposite mode
644                 }
645                 if (how != ASYNC) {
646                     return awaitMatch(s, pred, e, how == TIMED, nanos);
647                 }
648             }
649             return e; // not waiting
650         }
651     }
652 
653     /**
654      * Tries to append node s as tail.
655      *
656      * @param s the node to append
657      * @param haveData true if appending in data mode
658      * @return null on failure due to losing race with append in
659      * different mode, else s's predecessor, or s itself if no
660      * predecessor
661      */
662     private Node tryAppend(Node s, boolean haveData) {
663         for (Node t = tail, p = t;;) {        // move p to last node and append
664             Node n, u;                        // temps for reads of next & tail
665             if (p == null && (p = head) == null) {
666                 if (casHead(null, s))
667                  {
668                     return s;                 // initialize
669                 }
670             }
671             else if (p.cannotPrecede(haveData)) {
672                 return null;                  // lost race vs opposite mode
673             } else if ((n = p.next) != null) {
674                 p = p != t && t != (u = tail) ? (t = u) : // stale tail
675                     p != n ? n : null;      // restart if off list
676             } else if (!p.casNext(null, s)) {
677                 p = p.next;                   // re-read on CAS failure
678             } else {
679                 if (p != t) {                 // update if slack now >= 2
680                     while ((tail != t || !casTail(t, s)) &&
681                            (t = tail)   != null &&
682                            (s = t.next) != null && // advance and retry
683                            (s = s.next) != null && s != t) {
684                         continue;
685                     }
686                 }
687                 return p;
688             }
689         }
690     }
691 
692     /**
693      * Spins/yields/blocks until node s is matched or caller gives up.
694      *
695      * @param s the waiting node
696      * @param pred the predecessor of s, or s itself if it has no
697      * predecessor, or null if unknown (the null case does not occur
698      * in any current calls but may in possible future extensions)
699      * @param e the comparison value for checking match
700      * @param timed if true, wait only until timeout elapses
701      * @param nanos timeout in nanosecs, used only if timed is true
702      * @return matched item, or e if unmatched on interrupt or timeout
703      */
704     private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
705         long lastTime = timed ? System.nanoTime() : 0L;
706         Thread w = Thread.currentThread();
707         int spins = -1; // initialized after first item and cancel checks
708         ThreadLocalRandom randomYields = null; // bound if needed
709 
710         for (;;) {
711             Object item = s.item;
712             if (item != e) {                  // matched
713                 // assert item != s;
714                 s.forgetContents();           // avoid garbage
715                 return LinkedTransferQueue.<E>cast(item);
716             }
717             if ((w.isInterrupted() || timed && nanos <= 0) &&
718                     s.casItem(e, s)) {        // cancel
719                 unsplice(pred, s);
720                 return e;
721             }
722 
723             if (spins < 0) {                  // establish spins at/near front
724                 if ((spins = spinsFor(pred, s.isData)) > 0) {
725                     randomYields = ThreadLocalRandom.current();
726                 }
727             }
728             else if (spins > 0) {             // spin
729                 --spins;
730                 if (randomYields.nextInt(CHAINED_SPINS) == 0)
731                  {
732                     Thread.yield();           // occasionally yield
733                 }
734             }
735             else if (s.waiter == null) {
736                 s.waiter = w;                 // request unpark then recheck
737             }
738             else if (timed) {
739                 long now = System.nanoTime();
740                 if ((nanos -= now - lastTime) > 0) {
741                     // Use LockSupport.parkNanose(nanos) to make it compatible with java5
742                     //
743                     //LockSupport.parkNanos(this, nanos);
744                     LockSupport.parkNanos(nanos);
745                 }
746 
747                 lastTime = now;
748             }
749             else {
750                 // Use LockSupport.park() to make it compatible with java5
751                 //
752                 //LockSupport.park(this);
753                 LockSupport.park();
754 
755             }
756         }
757     }
758 
759     /**
760      * Returns spin/yield value for a node with given predecessor and
761      * data mode. See above for explanation.
762      */
763     private static int spinsFor(Node pred, boolean haveData) {
764         if (MP && pred != null) {
765             if (pred.isData != haveData) {
766                 return FRONT_SPINS + CHAINED_SPINS;
767             }
768             if (pred.isMatched()) {
769                 return FRONT_SPINS;
770             }
771             if (pred.waiter == null) {
772                 return CHAINED_SPINS;
773             }
774         }
775         return 0;
776     }
777 
778     /* -------------- Traversal methods -------------- */
779 
780     /**
781      * Returns the successor of p, or the head node if p.next has been
782      * linked to self, which will only be true if traversing with a
783      * stale pointer that is now off the list.
784      */
785     final Node succ(Node p) {
786         Node next = p.next;
787         return p == next ? head : next;
788     }
789 
790     /**
791      * Returns the first unmatched node of the given mode, or null if
792      * none.  Used by methods isEmpty, hasWaitingConsumer.
793      */
794     private Node firstOfMode(boolean isData) {
795         for (Node p = head; p != null; p = succ(p)) {
796             if (!p.isMatched()) {
797                 return p.isData == isData ? p : null;
798             }
799         }
800         return null;
801     }
802 
803     /**
804      * Returns the item in the first unmatched node with isData; or
805      * null if none.  Used by peek.
806      */
807     private E firstDataItem() {
808         for (Node p = head; p != null; p = succ(p)) {
809             Object item = p.item;
810             if (p.isData) {
811                 if (item != null && item != p) {
812                     return LinkedTransferQueue.<E>cast(item);
813                 }
814             }
815             else if (item == null) {
816                 return null;
817             }
818         }
819         return null;
820     }
821 
822     /**
823      * Traverses and counts unmatched nodes of the given mode.
824      * Used by methods size and getWaitingConsumerCount.
825      */
826     private int countOfMode(boolean data) {
827         int count = 0;
828         for (Node p = head; p != null; ) {
829             if (!p.isMatched()) {
830                 if (p.isData != data) {
831                     return 0;
832                 }
833                 if (++count == Integer.MAX_VALUE) {
834                     break;
835                 }
836             }
837             Node n = p.next;
838             if (n != p) {
839                 p = n;
840             } else {
841                 count = 0;
842                 p = head;
843             }
844         }
845         return count;
846     }
847 
848     final class Itr implements Iterator<E> {
849         private Node nextNode;   // next node to return item for
850         private E nextItem;      // the corresponding item
851         private Node lastRet;    // last returned node, to support remove
852         private Node lastPred;   // predecessor to unlink lastRet
853 
854         /**
855          * Moves to next node after prev, or first node if prev null.
856          */
857         private void advance(Node prev) {
858             /*
859              * To track and avoid buildup of deleted nodes in the face
860              * of calls to both Queue.remove and Itr.remove, we must
861              * include variants of unsplice and sweep upon each
862              * advance: Upon Itr.remove, we may need to catch up links
863              * from lastPred, and upon other removes, we might need to
864              * skip ahead from stale nodes and unsplice deleted ones
865              * found while advancing.
866              */
867 
868             Node r, b; // reset lastPred upon possible deletion of lastRet
869             if ((r = lastRet) != null && !r.isMatched()) {
870                 lastPred = r;    // next lastPred is old lastRet
871             } else if ((b = lastPred) == null || b.isMatched()) {
872                 lastPred = null; // at start of list
873             } else {
874                 Node s, n;       // help with removal of lastPred.next
875                 while ((s = b.next) != null &&
876                        s != b && s.isMatched() &&
877                        (n = s.next) != null && n != s) {
878                     b.casNext(s, n);
879                 }
880             }
881 
882             this.lastRet = prev;
883 
884             for (Node p = prev, s, n;;) {
885                 s = p == null ? head : p.next;
886                 if (s == null) {
887                     break;
888                 } else if (s == p) {
889                     p = null;
890                     continue;
891                 }
892                 Object item = s.item;
893                 if (s.isData) {
894                     if (item != null && item != s) {
895                         nextItem = LinkedTransferQueue.<E>cast(item);
896                         nextNode = s;
897                         return;
898                     }
899                 }
900                 else if (item == null) {
901                     break;
902                 }
903                 // assert s.isMatched();
904                 if (p == null) {
905                     p = s;
906                 } else if ((n = s.next) == null) {
907                     break;
908                 } else if (s == n) {
909                     p = null;
910                 } else {
911                     p.casNext(s, n);
912                 }
913             }
914             nextNode = null;
915             nextItem = null;
916         }
917 
918         Itr() {
919             advance(null);
920         }
921 
922         public final boolean hasNext() {
923             return nextNode != null;
924         }
925 
926         public final E next() {
927             Node p = nextNode;
928             if (p == null) {
929                 throw new NoSuchElementException();
930             }
931             E e = nextItem;
932             advance(p);
933             return e;
934         }
935 
936         public final void remove() {
937             final Node lastRet = this.lastRet;
938             if (lastRet == null) {
939                 throw new IllegalStateException();
940             }
941             this.lastRet = null;
942             if (lastRet.tryMatchData()) {
943                 unsplice(lastPred, lastRet);
944             }
945         }
946     }
947 
948     /* -------------- Removal methods -------------- */
949 
950     /**
951      * Unsplices (now or later) the given deleted/cancelled node with
952      * the given predecessor.
953      *
954      * @param pred a node that was at one time known to be the
955      * predecessor of s, or null or s itself if s is/was at head
956      * @param s the node to be unspliced
957      */
958     final void unsplice(Node pred, Node s) {
959         s.forgetContents(); // forget unneeded fields
960         /*
961          * See above for rationale. Briefly: if pred still points to
962          * s, try to unlink s.  If s cannot be unlinked, because it is
963          * trailing node or pred might be unlinked, and neither pred
964          * nor s are head or offlist, add to sweepVotes, and if enough
965          * votes have accumulated, sweep.
966          */
967         if (pred != null && pred != s && pred.next == s) {
968             Node n = s.next;
969             if (n == null ||
970                 n != s && pred.casNext(s, n) && pred.isMatched()) {
971                 for (;;) {               // check if at, or could be, head
972                     Node h = head;
973                     if (h == pred || h == s || h == null)
974                      {
975                         return;          // at head or list empty
976                     }
977                     if (!h.isMatched()) {
978                         break;
979                     }
980                     Node hn = h.next;
981                     if (hn == null)
982                      {
983                         return;          // now empty
984                     }
985                     if (hn != h && casHead(h, hn))
986                      {
987                         h.forgetNext();  // advance head
988                     }
989                 }
990                 if (pred.next != pred && s.next != s) { // recheck if offlist
991                     for (;;) {           // sweep now if enough votes
992                         int v = sweepVotes;
993                         if (v < SWEEP_THRESHOLD) {
994                             if (casSweepVotes(v, v + 1)) {
995                                 break;
996                             }
997                         }
998                         else if (casSweepVotes(v, 0)) {
999                             sweep();
1000                             break;
1001                         }
1002                     }
1003                 }
1004             }
1005         }
1006     }
1007 
1008     /**
1009      * Unlinks matched (typically cancelled) nodes encountered in a
1010      * traversal from head.
1011      */
1012     private void sweep() {
1013         for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
1014             if (!s.isMatched()) {
1015                 // Unmatched nodes are never self-linked
1016                 p = s;
1017             } else if ((n = s.next) == null) {
1018                 break;
1019             } else if (s == n) {
1020                 // No need to also check for p == s, since that implies s == n
1021                 p = head;
1022             } else {
1023                 p.casNext(s, n);
1024             }
1025         }
1026     }
1027 
1028     /**
1029      * Main implementation of remove(Object)
1030      */
1031     private boolean findAndRemove(Object e) {
1032         if (e != null) {
1033             for (Node pred = null, p = head; p != null; ) {
1034                 Object item = p.item;
1035                 if (p.isData) {
1036                     if (item != null && item != p && e.equals(item) &&
1037                         p.tryMatchData()) {
1038                         unsplice(pred, p);
1039                         return true;
1040                     }
1041                 }
1042                 else if (item == null) {
1043                     break;
1044                 }
1045                 pred = p;
1046                 if ((p = p.next) == pred) { // stale
1047                     pred = null;
1048                     p = head;
1049                 }
1050             }
1051         }
1052         return false;
1053     }
1054 
1055 
1056     /**
1057      * Creates an initially empty {@code LinkedTransferQueue}.
1058      */
1059     public LinkedTransferQueue() {
1060     }
1061 
1062     /**
1063      * Creates a {@code LinkedTransferQueue}
1064      * initially containing the elements of the given collection,
1065      * added in traversal order of the collection's iterator.
1066      *
1067      * @param c the collection of elements to initially contain
1068      * @throws NullPointerException if the specified collection or any
1069      *         of its elements are null
1070      */
1071     public LinkedTransferQueue(Collection<? extends E> c) {
1072         this();
1073         addAll(c);
1074     }
1075 
1076     /**
1077      * Inserts the specified element at the tail of this queue.
1078      * As the queue is unbounded, this method will never block.
1079      *
1080      * @throws NullPointerException if the specified element is null
1081      */
1082     public void put(E e) {
1083         xfer(e, true, ASYNC, 0);
1084     }
1085 
1086     /**
1087      * Inserts the specified element at the tail of this queue.
1088      * As the queue is unbounded, this method will never block or
1089      * return {@code false}.
1090      *
1091      * @return {@code true} (as specified by
1092      *  {@link java.util.concurrent.BlockingQueue#offer(Object,long,TimeUnit)
1093      *  BlockingQueue.offer})
1094      * @throws NullPointerException if the specified element is null
1095      */
1096     public boolean offer(E e, long timeout, TimeUnit unit) {
1097         xfer(e, true, ASYNC, 0);
1098         return true;
1099     }
1100 
1101     /**
1102      * Inserts the specified element at the tail of this queue.
1103      * As the queue is unbounded, this method will never return {@code false}.
1104      *
1105      * @return {@code true} (as specified by {@link Queue#offer})
1106      * @throws NullPointerException if the specified element is null
1107      */
1108     public boolean offer(E e) {
1109         xfer(e, true, ASYNC, 0);
1110         return true;
1111     }
1112 
1113     /**
1114      * Inserts the specified element at the tail of this queue.
1115      * As the queue is unbounded, this method will never throw
1116      * {@link IllegalStateException} or return {@code false}.
1117      *
1118      * @return {@code true} (as specified by {@link Collection#add})
1119      * @throws NullPointerException if the specified element is null
1120      */
1121     @Override
1122     public boolean add(E e) {
1123         xfer(e, true, ASYNC, 0);
1124         return true;
1125     }
1126 
1127     /**
1128      * Transfers the element to a waiting consumer immediately, if possible.
1129      *
1130      * <p>More precisely, transfers the specified element immediately
1131      * if there exists a consumer already waiting to receive it (in
1132      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1133      * otherwise returning {@code false} without enqueuing the element.
1134      *
1135      * @throws NullPointerException if the specified element is null
1136      */
1137     public boolean tryTransfer(E e) {
1138         return xfer(e, true, NOW, 0) == null;
1139     }
1140 
1141     /**
1142      * Transfers the element to a consumer, waiting if necessary to do so.
1143      *
1144      * <p>More precisely, transfers the specified element immediately
1145      * if there exists a consumer already waiting to receive it (in
1146      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1147      * else inserts the specified element at the tail of this queue
1148      * and waits until the element is received by a consumer.
1149      *
1150      * @throws NullPointerException if the specified element is null
1151      */
1152     public void transfer(E e) throws InterruptedException {
1153         if (xfer(e, true, SYNC, 0) != null) {
1154             Thread.interrupted(); // failure possible only due to interrupt
1155             throw new InterruptedException();
1156         }
1157     }
1158 
1159     /**
1160      * Transfers the element to a consumer if it is possible to do so
1161      * before the timeout elapses.
1162      *
1163      * <p>More precisely, transfers the specified element immediately
1164      * if there exists a consumer already waiting to receive it (in
1165      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
1166      * else inserts the specified element at the tail of this queue
1167      * and waits until the element is received by a consumer,
1168      * returning {@code false} if the specified wait time elapses
1169      * before the element can be transferred.
1170      *
1171      * @throws NullPointerException if the specified element is null
1172      */
1173     public boolean tryTransfer(E e, long timeout, TimeUnit unit)
1174         throws InterruptedException {
1175         if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null) {
1176             return true;
1177         }
1178         if (!Thread.interrupted()) {
1179             return false;
1180         }
1181         throw new InterruptedException();
1182     }
1183 
1184     public E take() throws InterruptedException {
1185         E e = xfer(null, false, SYNC, 0);
1186         if (e != null) {
1187             return e;
1188         }
1189         Thread.interrupted();
1190         throw new InterruptedException();
1191     }
1192 
1193     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
1194         E e = xfer(null, false, TIMED, unit.toNanos(timeout));
1195         if (e != null || !Thread.interrupted()) {
1196             return e;
1197         }
1198         throw new InterruptedException();
1199     }
1200 
1201     public E poll() {
1202         return xfer(null, false, NOW, 0);
1203     }
1204 
1205     /**
1206      * @throws NullPointerException     {@inheritDoc}
1207      * @throws IllegalArgumentException {@inheritDoc}
1208      */
1209     public int drainTo(Collection<? super E> c) {
1210         if (c == null) {
1211             throw new NullPointerException();
1212         }
1213         if (c == this) {
1214             throw new IllegalArgumentException();
1215         }
1216         int n = 0;
1217         for (E e; (e = poll()) != null;) {
1218             c.add(e);
1219             ++n;
1220         }
1221         return n;
1222     }
1223 
1224     /**
1225      * @throws NullPointerException     {@inheritDoc}
1226      * @throws IllegalArgumentException {@inheritDoc}
1227      */
1228     public int drainTo(Collection<? super E> c, int maxElements) {
1229         if (c == null) {
1230             throw new NullPointerException();
1231         }
1232         if (c == this) {
1233             throw new IllegalArgumentException();
1234         }
1235         int n = 0;
1236         for (E e; n < maxElements && (e = poll()) != null;) {
1237             c.add(e);
1238             ++n;
1239         }
1240         return n;
1241     }
1242 
1243     /**
1244      * Returns an iterator over the elements in this queue in proper sequence.
1245      * The elements will be returned in order from first (head) to last (tail).
1246      *
1247      * <p>The returned iterator is a "weakly consistent" iterator that
1248      * will never throw {@link java.util.ConcurrentModificationException
1249      * ConcurrentModificationException}, and guarantees to traverse
1250      * elements as they existed upon construction of the iterator, and
1251      * may (but is not guaranteed to) reflect any modifications
1252      * subsequent to construction.
1253      *
1254      * @return an iterator over the elements in this queue in proper sequence
1255      */
1256     @Override
1257     public Iterator<E> iterator() {
1258         return new Itr();
1259     }
1260 
1261     public E peek() {
1262         return firstDataItem();
1263     }
1264 
1265     /**
1266      * Returns {@code true} if this queue contains no elements.
1267      *
1268      * @return {@code true} if this queue contains no elements
1269      */
1270     @Override
1271     public boolean isEmpty() {
1272         for (Node p = head; p != null; p = succ(p)) {
1273             if (!p.isMatched()) {
1274                 return !p.isData;
1275             }
1276         }
1277         return true;
1278     }
1279 
1280     public boolean hasWaitingConsumer() {
1281         return firstOfMode(false) != null;
1282     }
1283 
1284     /**
1285      * Returns the number of elements in this queue.  If this queue
1286      * contains more than {@code Integer.MAX_VALUE} elements, returns
1287      * {@code Integer.MAX_VALUE}.
1288      *
1289      * <p>Beware that, unlike in most collections, this method is
1290      * <em>NOT</em> a constant-time operation. Because of the
1291      * asynchronous nature of these queues, determining the current
1292      * number of elements requires an O(n) traversal.
1293      *
1294      * @return the number of elements in this queue
1295      */
1296     @Override
1297     public int size() {
1298         return countOfMode(true);
1299     }
1300 
1301     public int getWaitingConsumerCount() {
1302         return countOfMode(false);
1303     }
1304 
1305     /**
1306      * Removes a single instance of the specified element from this queue,
1307      * if it is present.  More formally, removes an element {@code e} such
1308      * that {@code o.equals(e)}, if this queue contains one or more such
1309      * elements.
1310      * Returns {@code true} if this queue contained the specified element
1311      * (or equivalently, if this queue changed as a result of the call).
1312      *
1313      * @param o element to be removed from this queue, if present
1314      * @return {@code true} if this queue changed as a result of the call
1315      */
1316     @Override
1317     public boolean remove(Object o) {
1318         return findAndRemove(o);
1319     }
1320 
1321     /**
1322      * Returns {@code true} if this queue contains the specified element.
1323      * More formally, returns {@code true} if and only if this queue contains
1324      * at least one element {@code e} such that {@code o.equals(e)}.
1325      *
1326      * @param o object to be checked for containment in this queue
1327      * @return {@code true} if this queue contains the specified element
1328      */
1329     @Override
1330     public boolean contains(Object o) {
1331         if (o == null) {
1332             return false;
1333         }
1334         for (Node p = head; p != null; p = succ(p)) {
1335             Object item = p.item;
1336             if (p.isData) {
1337                 if (item != null && item != p && o.equals(item)) {
1338                     return true;
1339                 }
1340             }
1341             else if (item == null) {
1342                 break;
1343             }
1344         }
1345         return false;
1346     }
1347 
1348     /**
1349      * Always returns {@code Integer.MAX_VALUE} because a
1350      * {@code LinkedTransferQueue} is not capacity constrained.
1351      *
1352      * @return {@code Integer.MAX_VALUE} (as specified by
1353      *         {@link java.util.concurrent.BlockingQueue#remainingCapacity()
1354      *         BlockingQueue.remainingCapacity})
1355      */
1356     public int remainingCapacity() {
1357         return Integer.MAX_VALUE;
1358     }
1359 
1360     /**
1361      * Saves the state to a stream (that is, serializes it).
1362      *
1363      * @serialData All of the elements (each an {@code E}) in
1364      * the proper order, followed by a null
1365      * @param s the stream
1366      */
1367     private void writeObject(java.io.ObjectOutputStream s)
1368         throws java.io.IOException {
1369         s.defaultWriteObject();
1370         for (E e : this) {
1371             s.writeObject(e);
1372         }
1373         // Use trailing null as sentinel
1374         s.writeObject(null);
1375     }
1376 
1377     /**
1378      * Reconstitutes the Queue instance from a stream (that is,
1379      * deserializes it).
1380      *
1381      * @param s the stream
1382      */
1383     private void readObject(java.io.ObjectInputStream s)
1384         throws java.io.IOException, ClassNotFoundException {
1385         s.defaultReadObject();
1386         for (;;) {
1387             E item = (E) s.readObject();
1388             if (item == null) {
1389                 break;
1390             } else {
1391                 offer(item);
1392             }
1393         }
1394     }
1395 
1396     // Unsafe mechanics
1397 
1398     private static final sun.misc.Unsafe UNSAFE;
1399     private static final long headOffset;
1400     private static final long tailOffset;
1401     private static final long sweepVotesOffset;
1402     static {
1403         try {
1404             UNSAFE = getUnsafe();
1405             Class<?> k = LinkedTransferQueue.class;
1406             headOffset = UNSAFE.objectFieldOffset
1407                 (k.getDeclaredField("head"));
1408             tailOffset = UNSAFE.objectFieldOffset
1409                 (k.getDeclaredField("tail"));
1410             sweepVotesOffset = UNSAFE.objectFieldOffset
1411                 (k.getDeclaredField("sweepVotes"));
1412         } catch (Exception e) {
1413             throw new Error(e);
1414         }
1415     }
1416 
1417     /**
1418      * Returns a sun.misc.Unsafe.  Suitable for use in a 3rd party package.
1419      * Replace with a simple call to Unsafe.getUnsafe when integrating
1420      * into a jdk.
1421      *
1422      * @return a sun.misc.Unsafe
1423      */
1424     static sun.misc.Unsafe getUnsafe() {
1425         try {
1426             return sun.misc.Unsafe.getUnsafe();
1427         } catch (SecurityException se) {
1428             try {
1429                 return java.security.AccessController.doPrivileged
1430                     (new java.security
1431                      .PrivilegedExceptionAction<sun.misc.Unsafe>() {
1432                         public sun.misc.Unsafe run() throws Exception {
1433                             java.lang.reflect.Field f = sun.misc
1434                                 .Unsafe.class.getDeclaredField("theUnsafe");
1435                             f.setAccessible(true);
1436                             return (sun.misc.Unsafe) f.get(null);
1437                         }});
1438             } catch (java.security.PrivilegedActionException e) {
1439                 throw new RuntimeException("Could not initialize intrinsics",
1440                                            e.getCause());
1441             }
1442         }
1443     }
1444 
1445 }