1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.timeout;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.util.concurrent.TimeUnit;
21
22 import org.jboss.netty.bootstrap.ServerBootstrap;
23 import org.jboss.netty.channel.Channel;
24 import org.jboss.netty.channel.ChannelHandler;
25 import org.jboss.netty.channel.ChannelHandler.Sharable;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
34 import org.jboss.netty.channel.WriteCompletionEvent;
35 import org.jboss.netty.util.ExternalResourceReleasable;
36 import org.jboss.netty.util.HashedWheelTimer;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 @Sharable
123 public class IdleStateHandler extends SimpleChannelUpstreamHandler
124 implements LifeCycleAwareChannelHandler,
125 ExternalResourceReleasable {
126
127 final Timer timer;
128
129 final long readerIdleTimeMillis;
130 final long writerIdleTimeMillis;
131 final long allIdleTimeMillis;
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 public IdleStateHandler(
153 Timer timer,
154 int readerIdleTimeSeconds,
155 int writerIdleTimeSeconds,
156 int allIdleTimeSeconds) {
157
158 this(timer,
159 readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
160 TimeUnit.SECONDS);
161 }
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public IdleStateHandler(
186 Timer timer,
187 long readerIdleTime, long writerIdleTime, long allIdleTime,
188 TimeUnit unit) {
189
190 if (timer == null) {
191 throw new NullPointerException("timer");
192 }
193 if (unit == null) {
194 throw new NullPointerException("unit");
195 }
196
197 this.timer = timer;
198 if (readerIdleTime <= 0) {
199 readerIdleTimeMillis = 0;
200 } else {
201 readerIdleTimeMillis = Math.max(unit.toMillis(readerIdleTime), 1);
202 }
203 if (writerIdleTime <= 0) {
204 writerIdleTimeMillis = 0;
205 } else {
206 writerIdleTimeMillis = Math.max(unit.toMillis(writerIdleTime), 1);
207 }
208 if (allIdleTime <= 0) {
209 allIdleTimeMillis = 0;
210 } else {
211 allIdleTimeMillis = Math.max(unit.toMillis(allIdleTime), 1);
212 }
213 }
214
215
216
217
218
219 public long getReaderIdleTimeInMillis() {
220 return readerIdleTimeMillis;
221 }
222
223
224
225
226
227 public long getWriterIdleTimeInMillis() {
228 return writerIdleTimeMillis;
229 }
230
231
232
233
234
235 public long getAllIdleTimeInMillis() {
236 return allIdleTimeMillis;
237 }
238
239
240
241
242
243
244 public void releaseExternalResources() {
245 timer.stop();
246 }
247
248 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
249 if (ctx.getPipeline().isAttached()) {
250
251
252
253 initialize(ctx);
254 } else {
255
256
257 }
258 }
259
260 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
261
262 }
263
264 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
265 destroy(ctx);
266 }
267
268 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
269
270 }
271
272 @Override
273 public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
274 throws Exception {
275
276
277
278 initialize(ctx);
279 ctx.sendUpstream(e);
280 }
281
282 @Override
283 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
284 throws Exception {
285 destroy(ctx);
286 ctx.sendUpstream(e);
287 }
288
289 @Override
290 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
291 throws Exception {
292 State state = (State) ctx.getAttachment();
293 state.lastReadTime = System.currentTimeMillis();
294 ctx.sendUpstream(e);
295 }
296
297 @Override
298 public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e)
299 throws Exception {
300 if (e.getWrittenAmount() > 0) {
301 State state = (State) ctx.getAttachment();
302 state.lastWriteTime = System.currentTimeMillis();
303 }
304 ctx.sendUpstream(e);
305 }
306
307 private void initialize(ChannelHandlerContext ctx) {
308 State state = state(ctx);
309
310
311
312 synchronized (state) {
313 switch (state.state) {
314 case 1:
315 case 2:
316 return;
317 }
318 state.state = 1;
319 }
320
321 state.lastReadTime = state.lastWriteTime = System.currentTimeMillis();
322 if (readerIdleTimeMillis > 0) {
323 state.readerIdleTimeout = timer.newTimeout(
324 new ReaderIdleTimeoutTask(ctx),
325 readerIdleTimeMillis, TimeUnit.MILLISECONDS);
326 }
327 if (writerIdleTimeMillis > 0) {
328 state.writerIdleTimeout = timer.newTimeout(
329 new WriterIdleTimeoutTask(ctx),
330 writerIdleTimeMillis, TimeUnit.MILLISECONDS);
331 }
332 if (allIdleTimeMillis > 0) {
333 state.allIdleTimeout = timer.newTimeout(
334 new AllIdleTimeoutTask(ctx),
335 allIdleTimeMillis, TimeUnit.MILLISECONDS);
336 }
337 }
338
339 private static void destroy(ChannelHandlerContext ctx) {
340 State state = state(ctx);
341 synchronized (state) {
342 if (state.state != 1) {
343 return;
344 }
345 state.state = 2;
346 }
347
348 if (state.readerIdleTimeout != null) {
349 state.readerIdleTimeout.cancel();
350 state.readerIdleTimeout = null;
351 }
352 if (state.writerIdleTimeout != null) {
353 state.writerIdleTimeout.cancel();
354 state.writerIdleTimeout = null;
355 }
356 if (state.allIdleTimeout != null) {
357 state.allIdleTimeout.cancel();
358 state.allIdleTimeout = null;
359 }
360 }
361
362 private static State state(ChannelHandlerContext ctx) {
363 State state;
364 synchronized (ctx) {
365
366 state = (State) ctx.getAttachment();
367 if (state != null) {
368 return state;
369 }
370 state = new State();
371 ctx.setAttachment(state);
372 }
373 return state;
374 }
375
376 protected void channelIdle(
377 ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception {
378 ctx.sendUpstream(new DefaultIdleStateEvent(ctx.getChannel(), state, lastActivityTimeMillis));
379 }
380
381 private final class ReaderIdleTimeoutTask implements TimerTask {
382
383 private final ChannelHandlerContext ctx;
384
385 ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
386 this.ctx = ctx;
387 }
388
389 public void run(Timeout timeout) throws Exception {
390 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
391 return;
392 }
393
394 State state = (State) ctx.getAttachment();
395 long currentTime = System.currentTimeMillis();
396 long lastReadTime = state.lastReadTime;
397 long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
398 if (nextDelay <= 0) {
399
400 state.readerIdleTimeout =
401 timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
402 try {
403 channelIdle(ctx, IdleState.READER_IDLE, lastReadTime);
404 } catch (Throwable t) {
405 fireExceptionCaught(ctx, t);
406 }
407 } else {
408
409 state.readerIdleTimeout =
410 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
411 }
412 }
413
414 }
415
416 private final class WriterIdleTimeoutTask implements TimerTask {
417
418 private final ChannelHandlerContext ctx;
419
420 WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
421 this.ctx = ctx;
422 }
423
424 public void run(Timeout timeout) throws Exception {
425 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
426 return;
427 }
428
429 State state = (State) ctx.getAttachment();
430 long currentTime = System.currentTimeMillis();
431 long lastWriteTime = state.lastWriteTime;
432 long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
433 if (nextDelay <= 0) {
434
435 state.writerIdleTimeout =
436 timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
437 try {
438 channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime);
439 } catch (Throwable t) {
440 fireExceptionCaught(ctx, t);
441 }
442 } else {
443
444 state.writerIdleTimeout =
445 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
446 }
447 }
448 }
449
450 private final class AllIdleTimeoutTask implements TimerTask {
451
452 private final ChannelHandlerContext ctx;
453
454 AllIdleTimeoutTask(ChannelHandlerContext ctx) {
455 this.ctx = ctx;
456 }
457
458 public void run(Timeout timeout) throws Exception {
459 if (timeout.isCancelled() || !ctx.getChannel().isOpen()) {
460 return;
461 }
462
463 State state = (State) ctx.getAttachment();
464 long currentTime = System.currentTimeMillis();
465 long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime);
466 long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
467 if (nextDelay <= 0) {
468
469
470 state.allIdleTimeout =
471 timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
472 try {
473 channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime);
474 } catch (Throwable t) {
475 fireExceptionCaught(ctx, t);
476 }
477 } else {
478
479
480 state.allIdleTimeout =
481 timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
482 }
483 }
484 }
485
486 private static final class State {
487
488 int state;
489
490 volatile Timeout readerIdleTimeout;
491 volatile long lastReadTime;
492
493 volatile Timeout writerIdleTimeout;
494 volatile long lastWriteTime;
495
496 volatile Timeout allIdleTimeout;
497
498 State() {
499 super();
500 }
501 }
502 }