View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.util;
17  
18  import java.util.ArrayList;
19  import java.util.Collections;
20  import java.util.HashSet;
21  import java.util.List;
22  import java.util.Set;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.ThreadFactory;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.ReadWriteLock;
29  import java.util.concurrent.locks.ReentrantReadWriteLock;
30  
31  import org.jboss.netty.channel.ChannelPipelineFactory;
32  import org.jboss.netty.logging.InternalLogger;
33  import org.jboss.netty.logging.InternalLoggerFactory;
34  import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
35  import org.jboss.netty.util.internal.DetectionUtil;
36  import org.jboss.netty.util.internal.ReusableIterator;
37  import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
38  
39  /**
40   * A {@link Timer} optimized for approximated I/O timeout scheduling.
41   *
42   * <h3>Tick Duration</h3>
43   *
44   * As described with 'approximated', this timer does not execute the scheduled
45   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
46   * check if there are any {@link TimerTask}s behind the schedule and execute
47   * them.
48   * <p>
49   * You can increase or decrease the accuracy of the execution timing by
50   * specifying smaller or larger tick duration in the constructor.  In most
51   * network applications, I/O timeout does not need to be accurate.  Therefore,
52   * the default tick duration is 100 milliseconds and you will not need to try
53   * different configurations in most cases.
54   *
55   * <h3>Ticks per Wheel (Wheel Size)</h3>
56   *
57   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
58   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
59   * function is 'dead line of the task'.  The default number of ticks per wheel
60   * (i.e. the size of the wheel) is 512.  You could specify a larger value
61   * if you are going to schedule a lot of timeouts.
62   *
63   * <h3>Do not create many instances.</h3>
64   *
65   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
66   * started.  Therefore, you should make sure to create only one instance and
67   * share it across your application.  One of the common mistakes, that makes
68   * your application unresponsive, is to create a new instance in
69   * {@link ChannelPipelineFactory}, which results in the creation of a new thread
70   * for every connection.
71   *
72   * <h3>Implementation Details</h3>
73   *
74   * {@link HashedWheelTimer} is based on
75   * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
76   * Tony Lauck's paper,
77   * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
78   * and Hierarchical Timing Wheels: data structures to efficiently implement a
79   * timer facility'</a>.  More comprehensive slides are located
80   * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
81   */
82  public class HashedWheelTimer implements Timer {
83  
84      static final InternalLogger logger =
85          InternalLoggerFactory.getInstance(HashedWheelTimer.class);
86      private static final AtomicInteger id = new AtomicInteger();
87  
88      private static final SharedResourceMisuseDetector misuseDetector =
89          new SharedResourceMisuseDetector(HashedWheelTimer.class);
90  
91      private final Worker worker = new Worker();
92      final Thread workerThread;
93      final AtomicBoolean shutdown = new AtomicBoolean();
94  
95      private final long roundDuration;
96      final long tickDuration;
97      final Set<HashedWheelTimeout>[] wheel;
98      final ReusableIterator<HashedWheelTimeout>[] iterators;
99      final int mask;
100     final ReadWriteLock lock = new ReentrantReadWriteLock();
101     volatile int wheelCursor;
102 
103     /**
104      * Creates a new timer with the default thread factory
105      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
106      * default number of ticks per wheel.
107      */
108     public HashedWheelTimer() {
109         this(Executors.defaultThreadFactory());
110     }
111 
112     /**
113      * Creates a new timer with the default thread factory
114      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
115      * per wheel.
116      *
117      * @param tickDuration   the duration between tick
118      * @param unit           the time unit of the {@code tickDuration}
119      */
120     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
121         this(Executors.defaultThreadFactory(), tickDuration, unit);
122     }
123 
124     /**
125      * Creates a new timer with the default thread factory
126      * ({@link Executors#defaultThreadFactory()}).
127      *
128      * @param tickDuration   the duration between tick
129      * @param unit           the time unit of the {@code tickDuration}
130      * @param ticksPerWheel  the size of the wheel
131      */
132     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
133         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
134     }
135 
136     /**
137      * Creates a new timer with the default tick duration and default number of
138      * ticks per wheel.
139      *
140      * @param threadFactory  a {@link ThreadFactory} that creates a
141      *                       background {@link Thread} which is dedicated to
142      *                       {@link TimerTask} execution.
143      */
144     public HashedWheelTimer(ThreadFactory threadFactory) {
145         this(threadFactory, 100, TimeUnit.MILLISECONDS);
146     }
147 
148     /**
149      * Creates a new timer with the default number of ticks per wheel.
150      *
151      * @param threadFactory  a {@link ThreadFactory} that creates a
152      *                       background {@link Thread} which is dedicated to
153      *                       {@link TimerTask} execution.
154      * @param tickDuration   the duration between tick
155      * @param unit           the time unit of the {@code tickDuration}
156      */
157     public HashedWheelTimer(
158             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
159         this(threadFactory, tickDuration, unit, 512);
160     }
161 
162     /**
163      * Creates a new timer.
164      *
165      * @param threadFactory  a {@link ThreadFactory} that creates a
166      *                       background {@link Thread} which is dedicated to
167      *                       {@link TimerTask} execution.
168      * @param tickDuration   the duration between tick
169      * @param unit           the time unit of the {@code tickDuration}
170      * @param ticksPerWheel  the size of the wheel
171      */
172     public HashedWheelTimer(
173             ThreadFactory threadFactory,
174             long tickDuration, TimeUnit unit, int ticksPerWheel) {
175 
176         if (threadFactory == null) {
177             throw new NullPointerException("threadFactory");
178         }
179         if (unit == null) {
180             throw new NullPointerException("unit");
181         }
182         if (tickDuration <= 0) {
183             throw new IllegalArgumentException(
184                     "tickDuration must be greater than 0: " + tickDuration);
185         }
186         if (ticksPerWheel <= 0) {
187             throw new IllegalArgumentException(
188                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
189         }
190 
191         // Normalize ticksPerWheel to power of two and initialize the wheel.
192         wheel = createWheel(ticksPerWheel);
193         iterators = createIterators(wheel);
194         mask = wheel.length - 1;
195 
196         // Convert tickDuration to milliseconds.
197         this.tickDuration = tickDuration = unit.toMillis(tickDuration);
198 
199         // Prevent overflow.
200         if (tickDuration == Long.MAX_VALUE ||
201                 tickDuration >= Long.MAX_VALUE / wheel.length) {
202             throw new IllegalArgumentException(
203                     "tickDuration is too long: " +
204                     tickDuration +  ' ' + unit);
205         }
206 
207         roundDuration = tickDuration * wheel.length;
208 
209         workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
210                         worker, "Hashed wheel timer #" + id.incrementAndGet()));
211 
212         // Misuse check
213         misuseDetector.increase();
214     }
215 
216     @SuppressWarnings("unchecked")
217     private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
218         if (ticksPerWheel <= 0) {
219             throw new IllegalArgumentException(
220                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
221         }
222         if (ticksPerWheel > 1073741824) {
223             throw new IllegalArgumentException(
224                     "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
225         }
226 
227         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
228         Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
229         for (int i = 0; i < wheel.length; i ++) {
230             wheel[i] = new MapBackedSet<HashedWheelTimeout>(
231                     new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
232         }
233         return wheel;
234     }
235 
236     @SuppressWarnings("unchecked")
237     private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
238         ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
239         for (int i = 0; i < wheel.length; i ++) {
240             iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
241         }
242         return iterators;
243     }
244 
245     private static int normalizeTicksPerWheel(int ticksPerWheel) {
246         int normalizedTicksPerWheel = 1;
247         while (normalizedTicksPerWheel < ticksPerWheel) {
248             normalizedTicksPerWheel <<= 1;
249         }
250         return normalizedTicksPerWheel;
251     }
252 
253     /**
254      * Starts the background thread explicitly.  The background thread will
255      * start automatically on demand even if you did not call this method.
256      *
257      * @throws IllegalStateException if this timer has been
258      *                               {@linkplain #stop() stopped} already
259      */
260     public synchronized void start() {
261         if (shutdown.get()) {
262             throw new IllegalStateException("cannot be started once stopped");
263         }
264 
265         if (!workerThread.isAlive()) {
266             workerThread.start();
267         }
268     }
269 
270     public synchronized Set<Timeout> stop() {
271         if (Thread.currentThread() == workerThread) {
272             throw new IllegalStateException(
273                     HashedWheelTimer.class.getSimpleName() +
274                     ".stop() cannot be called from " +
275                     TimerTask.class.getSimpleName());
276         }
277 
278         if (!shutdown.compareAndSet(false, true)) {
279             return Collections.emptySet();
280         }
281 
282         boolean interrupted = false;
283         while (workerThread.isAlive()) {
284             workerThread.interrupt();
285             try {
286                 workerThread.join(100);
287             } catch (InterruptedException e) {
288                 interrupted = true;
289             }
290         }
291 
292         if (interrupted) {
293             Thread.currentThread().interrupt();
294         }
295 
296         misuseDetector.decrease();
297 
298         Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
299         for (Set<HashedWheelTimeout> bucket: wheel) {
300             unprocessedTimeouts.addAll(bucket);
301             bucket.clear();
302         }
303 
304         return Collections.unmodifiableSet(unprocessedTimeouts);
305     }
306 
307     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
308         final long currentTime = System.currentTimeMillis();
309 
310         if (task == null) {
311             throw new NullPointerException("task");
312         }
313         if (unit == null) {
314             throw new NullPointerException("unit");
315         }
316 
317         if (!workerThread.isAlive()) {
318             start();
319         }
320 
321         delay = unit.toMillis(delay);
322         HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
323         scheduleTimeout(timeout, delay);
324         return timeout;
325     }
326 
327     void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
328         // delay must be equal to or greater than tickDuration so that the
329         // worker thread never misses the timeout.
330         if (delay < tickDuration) {
331             delay = tickDuration;
332         }
333 
334         // Prepare the required parameters to schedule the timeout object.
335         final long lastRoundDelay = delay % roundDuration;
336         final long lastTickDelay = delay % tickDuration;
337         final long relativeIndex =
338             lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
339 
340         final long remainingRounds =
341             delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
342 
343         // Add the timeout to the wheel.
344         lock.readLock().lock();
345         try {
346             int stopIndex = (int) (wheelCursor + relativeIndex & mask);
347             timeout.stopIndex = stopIndex;
348             timeout.remainingRounds = remainingRounds;
349 
350             wheel[stopIndex].add(timeout);
351         } finally {
352             lock.readLock().unlock();
353         }
354     }
355 
356     private final class Worker implements Runnable {
357 
358         private long startTime;
359         private long tick;
360 
361         Worker() {
362             super();
363         }
364 
365         public void run() {
366             List<HashedWheelTimeout> expiredTimeouts =
367                 new ArrayList<HashedWheelTimeout>();
368 
369             startTime = System.currentTimeMillis();
370             tick = 1;
371 
372             while (!shutdown.get()) {
373                 final long deadline = waitForNextTick();
374                 if (deadline > 0) {
375                     fetchExpiredTimeouts(expiredTimeouts, deadline);
376                     notifyExpiredTimeouts(expiredTimeouts);
377                 }
378             }
379         }
380 
381         private void fetchExpiredTimeouts(
382                 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
383 
384             // Find the expired timeouts and decrease the round counter
385             // if necessary.  Note that we don't send the notification
386             // immediately to make sure the listeners are called without
387             // an exclusive lock.
388             lock.writeLock().lock();
389             try {
390                 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
391                 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
392                 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
393             } finally {
394                 lock.writeLock().unlock();
395             }
396         }
397 
398         private void fetchExpiredTimeouts(
399                 List<HashedWheelTimeout> expiredTimeouts,
400                 ReusableIterator<HashedWheelTimeout> i, long deadline) {
401 
402             List<HashedWheelTimeout> slipped = null;
403             i.rewind();
404             while (i.hasNext()) {
405                 HashedWheelTimeout timeout = i.next();
406                 if (timeout.remainingRounds <= 0) {
407                     i.remove();
408                     if (timeout.deadline <= deadline) {
409                         expiredTimeouts.add(timeout);
410                     } else {
411                         // Handle the case where the timeout is put into a wrong
412                         // place, usually one tick earlier.  For now, just add
413                         // it to a temporary list - we will reschedule it in a
414                         // separate loop.
415                         if (slipped == null) {
416                             slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
417                         }
418                         slipped.add(timeout);
419                     }
420                 } else {
421                     timeout.remainingRounds --;
422                 }
423             }
424 
425             // Reschedule the slipped timeouts.
426             if (slipped != null) {
427                 for (HashedWheelTimeout timeout: slipped) {
428                     scheduleTimeout(timeout, timeout.deadline - deadline);
429                 }
430             }
431         }
432 
433         private void notifyExpiredTimeouts(
434                 List<HashedWheelTimeout> expiredTimeouts) {
435             // Notify the expired timeouts.
436             for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
437                 expiredTimeouts.get(i).expire();
438             }
439 
440             // Clean up the temporary list.
441             expiredTimeouts.clear();
442         }
443 
444         private long waitForNextTick() {
445             long deadline = startTime + tickDuration * tick;
446 
447             for (;;) {
448                 final long currentTime = System.currentTimeMillis();
449                 long sleepTime = tickDuration * tick - (currentTime - startTime);
450 
451                 // Check if we run on windows, as if thats the case we will need
452                 // to round the sleepTime as workaround for a bug that only affect
453                 // the JVM if it runs on windows.
454                 //
455                 // See https://github.com/netty/netty/issues/356
456                 if (DetectionUtil.isWindows()) {
457                     sleepTime = sleepTime / 10 * 10;
458                 }
459 
460                 if (sleepTime <= 0) {
461                     break;
462                 }
463                 try {
464                     Thread.sleep(sleepTime);
465                 } catch (InterruptedException e) {
466                     if (shutdown.get()) {
467                         return -1;
468                     }
469                 }
470             }
471 
472             // Increase the tick.
473             tick ++;
474             return deadline;
475         }
476     }
477 
478     private final class HashedWheelTimeout implements Timeout {
479 
480         private static final int ST_INIT = 0;
481         private static final int ST_CANCELLED = 1;
482         private static final int ST_EXPIRED = 2;
483 
484         private final TimerTask task;
485         final long deadline;
486         volatile int stopIndex;
487         volatile long remainingRounds;
488         private final AtomicInteger state = new AtomicInteger(ST_INIT);
489 
490         HashedWheelTimeout(TimerTask task, long deadline) {
491             this.task = task;
492             this.deadline = deadline;
493         }
494 
495         public Timer getTimer() {
496             return HashedWheelTimer.this;
497         }
498 
499         public TimerTask getTask() {
500             return task;
501         }
502 
503         public void cancel() {
504             if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
505                 // TODO return false
506                 return;
507             }
508 
509             wheel[stopIndex].remove(this);
510         }
511 
512         public boolean isCancelled() {
513             return state.get() == ST_CANCELLED;
514         }
515 
516         public boolean isExpired() {
517             return state.get() != ST_INIT;
518         }
519 
520         public void expire() {
521             if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
522                 return;
523             }
524 
525             try {
526                 task.run(this);
527             } catch (Throwable t) {
528                 if (logger.isWarnEnabled()) {
529                     logger.warn(
530                             "An exception was thrown by " +
531                             TimerTask.class.getSimpleName() + ".", t);
532                 }
533 
534             }
535         }
536 
537         @Override
538         public String toString() {
539             long currentTime = System.currentTimeMillis();
540             long remaining = deadline - currentTime;
541 
542             StringBuilder buf = new StringBuilder(192);
543             buf.append(getClass().getSimpleName());
544             buf.append('(');
545 
546             buf.append("deadline: ");
547             if (remaining > 0) {
548                 buf.append(remaining);
549                 buf.append(" ms later, ");
550             } else if (remaining < 0) {
551                 buf.append(-remaining);
552                 buf.append(" ms ago, ");
553             } else {
554                 buf.append("now, ");
555             }
556 
557             if (isCancelled()) {
558                 buf.append(", cancelled");
559             }
560 
561             return buf.append(')').toString();
562         }
563     }
564 }