1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import java.util.ArrayList;
19 import java.util.HashMap;
20 import java.util.LinkedHashMap;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.NoSuchElementException;
24 import java.util.concurrent.RejectedExecutionException;
25
26 import org.jboss.netty.logging.InternalLogger;
27 import org.jboss.netty.logging.InternalLoggerFactory;
28
29
30
31
32
33
34 public class DefaultChannelPipeline implements ChannelPipeline {
35
36 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
37 static final ChannelSink discardingSink = new DiscardingChannelSink();
38
39 private volatile Channel channel;
40 private volatile ChannelSink sink;
41 private volatile DefaultChannelHandlerContext head;
42 private volatile DefaultChannelHandlerContext tail;
43 private final Map<String, DefaultChannelHandlerContext> name2ctx =
44 new HashMap<String, DefaultChannelHandlerContext>(4);
45
46
47
48
49 public DefaultChannelPipeline() {
50 super();
51 }
52
53 public Channel getChannel() {
54 return channel;
55 }
56
57 public ChannelSink getSink() {
58 ChannelSink sink = this.sink;
59 if (sink == null) {
60 return discardingSink;
61 }
62 return sink;
63 }
64
65 public void attach(Channel channel, ChannelSink sink) {
66 if (channel == null) {
67 throw new NullPointerException("channel");
68 }
69 if (sink == null) {
70 throw new NullPointerException("sink");
71 }
72 if (this.channel != null || this.sink != null) {
73 throw new IllegalStateException("attached already");
74 }
75 this.channel = channel;
76 this.sink = sink;
77 }
78
79 public boolean isAttached() {
80 return sink != null;
81 }
82
83 public synchronized void addFirst(String name, ChannelHandler handler) {
84 if (name2ctx.isEmpty()) {
85 init(name, handler);
86 } else {
87 checkDuplicateName(name);
88 DefaultChannelHandlerContext oldHead = head;
89 DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
90
91 callBeforeAdd(newHead);
92
93 oldHead.prev = newHead;
94 head = newHead;
95 name2ctx.put(name, newHead);
96
97 callAfterAdd(newHead);
98 }
99 }
100
101 public synchronized void addLast(String name, ChannelHandler handler) {
102 if (name2ctx.isEmpty()) {
103 init(name, handler);
104 } else {
105 checkDuplicateName(name);
106 DefaultChannelHandlerContext oldTail = tail;
107 DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
108
109 callBeforeAdd(newTail);
110
111 oldTail.next = newTail;
112 tail = newTail;
113 name2ctx.put(name, newTail);
114
115 callAfterAdd(newTail);
116 }
117 }
118
119 public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
120 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
121 if (ctx == head) {
122 addFirst(name, handler);
123 } else {
124 checkDuplicateName(name);
125 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
126
127 callBeforeAdd(newCtx);
128
129 ctx.prev.next = newCtx;
130 ctx.prev = newCtx;
131 name2ctx.put(name, newCtx);
132
133 callAfterAdd(newCtx);
134 }
135 }
136
137 public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
138 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
139 if (ctx == tail) {
140 addLast(name, handler);
141 } else {
142 checkDuplicateName(name);
143 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
144
145 callBeforeAdd(newCtx);
146
147 ctx.next.prev = newCtx;
148 ctx.next = newCtx;
149 name2ctx.put(name, newCtx);
150
151 callAfterAdd(newCtx);
152 }
153 }
154
155 public synchronized void remove(ChannelHandler handler) {
156 remove(getContextOrDie(handler));
157 }
158
159 public synchronized ChannelHandler remove(String name) {
160 return remove(getContextOrDie(name)).getHandler();
161 }
162
163 @SuppressWarnings("unchecked")
164 public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
165 return (T) remove(getContextOrDie(handlerType)).getHandler();
166 }
167
168 private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
169 if (head == tail) {
170 head = tail = null;
171 name2ctx.clear();
172 } else if (ctx == head) {
173 removeFirst();
174 } else if (ctx == tail) {
175 removeLast();
176 } else {
177 callBeforeRemove(ctx);
178
179 DefaultChannelHandlerContext prev = ctx.prev;
180 DefaultChannelHandlerContext next = ctx.next;
181 prev.next = next;
182 next.prev = prev;
183 name2ctx.remove(ctx.getName());
184
185 callAfterRemove(ctx);
186 }
187 return ctx;
188 }
189
190 public synchronized ChannelHandler removeFirst() {
191 if (name2ctx.isEmpty()) {
192 throw new NoSuchElementException();
193 }
194
195 DefaultChannelHandlerContext oldHead = head;
196 if (oldHead == null) {
197 throw new NoSuchElementException();
198 }
199
200 callBeforeRemove(oldHead);
201
202 if (oldHead.next == null) {
203 head = tail = null;
204 name2ctx.clear();
205 } else {
206 oldHead.next.prev = null;
207 head = oldHead.next;
208 name2ctx.remove(oldHead.getName());
209 }
210
211 callAfterRemove(oldHead);
212
213 return oldHead.getHandler();
214 }
215
216 public synchronized ChannelHandler removeLast() {
217 if (name2ctx.isEmpty()) {
218 throw new NoSuchElementException();
219 }
220
221 DefaultChannelHandlerContext oldTail = tail;
222 if (oldTail == null) {
223 throw new NoSuchElementException();
224 }
225
226 callBeforeRemove(oldTail);
227
228 if (oldTail.prev == null) {
229 head = tail = null;
230 name2ctx.clear();
231 } else {
232 oldTail.prev.next = null;
233 tail = oldTail.prev;
234 name2ctx.remove(oldTail.getName());
235 }
236
237 callBeforeRemove(oldTail);
238
239 return oldTail.getHandler();
240 }
241
242 public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
243 replace(getContextOrDie(oldHandler), newName, newHandler);
244 }
245
246 public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
247 return replace(getContextOrDie(oldName), newName, newHandler);
248 }
249
250 @SuppressWarnings("unchecked")
251 public synchronized <T extends ChannelHandler> T replace(
252 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
253 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
254 }
255
256 private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
257 if (ctx == head) {
258 removeFirst();
259 addFirst(newName, newHandler);
260 } else if (ctx == tail) {
261 removeLast();
262 addLast(newName, newHandler);
263 } else {
264 boolean sameName = ctx.getName().equals(newName);
265 if (!sameName) {
266 checkDuplicateName(newName);
267 }
268
269 DefaultChannelHandlerContext prev = ctx.prev;
270 DefaultChannelHandlerContext next = ctx.next;
271 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
272
273 callBeforeRemove(ctx);
274 callBeforeAdd(newCtx);
275
276 prev.next = newCtx;
277 next.prev = newCtx;
278
279 if (!sameName) {
280 name2ctx.remove(ctx.getName());
281 }
282 name2ctx.put(newName, newCtx);
283
284 ChannelHandlerLifeCycleException removeException = null;
285 ChannelHandlerLifeCycleException addException = null;
286 boolean removed = false;
287 try {
288 callAfterRemove(ctx);
289 removed = true;
290 } catch (ChannelHandlerLifeCycleException e) {
291 removeException = e;
292 }
293
294 boolean added = false;
295 try {
296 callAfterAdd(newCtx);
297 added = true;
298 } catch (ChannelHandlerLifeCycleException e) {
299 addException = e;
300 }
301
302 if (!removed && !added) {
303 logger.warn(removeException.getMessage(), removeException);
304 logger.warn(addException.getMessage(), addException);
305 throw new ChannelHandlerLifeCycleException(
306 "Both " + ctx.getHandler().getClass().getName() +
307 ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
308 ".afterAdd() failed; see logs.");
309 } else if (!removed) {
310 throw removeException;
311 } else if (!added) {
312 throw addException;
313 }
314 }
315
316 return ctx.getHandler();
317 }
318
319 private static void callBeforeAdd(ChannelHandlerContext ctx) {
320 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
321 return;
322 }
323
324 LifeCycleAwareChannelHandler h =
325 (LifeCycleAwareChannelHandler) ctx.getHandler();
326
327 try {
328 h.beforeAdd(ctx);
329 } catch (Throwable t) {
330 throw new ChannelHandlerLifeCycleException(
331 h.getClass().getName() +
332 ".beforeAdd() has thrown an exception; not adding.", t);
333 }
334 }
335
336 private void callAfterAdd(ChannelHandlerContext ctx) {
337 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
338 return;
339 }
340
341 LifeCycleAwareChannelHandler h =
342 (LifeCycleAwareChannelHandler) ctx.getHandler();
343
344 try {
345 h.afterAdd(ctx);
346 } catch (Throwable t) {
347 boolean removed = false;
348 try {
349 remove((DefaultChannelHandlerContext) ctx);
350 removed = true;
351 } catch (Throwable t2) {
352 if (logger.isWarnEnabled()) {
353 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
354 }
355 }
356
357 if (removed) {
358 throw new ChannelHandlerLifeCycleException(
359 h.getClass().getName() +
360 ".afterAdd() has thrown an exception; removed.", t);
361 } else {
362 throw new ChannelHandlerLifeCycleException(
363 h.getClass().getName() +
364 ".afterAdd() has thrown an exception; also failed to remove.", t);
365 }
366 }
367 }
368
369 private static void callBeforeRemove(ChannelHandlerContext ctx) {
370 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
371 return;
372 }
373
374 LifeCycleAwareChannelHandler h =
375 (LifeCycleAwareChannelHandler) ctx.getHandler();
376
377 try {
378 h.beforeRemove(ctx);
379 } catch (Throwable t) {
380 throw new ChannelHandlerLifeCycleException(
381 h.getClass().getName() +
382 ".beforeRemove() has thrown an exception; not removing.", t);
383 }
384 }
385
386 private static void callAfterRemove(ChannelHandlerContext ctx) {
387 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
388 return;
389 }
390
391 LifeCycleAwareChannelHandler h =
392 (LifeCycleAwareChannelHandler) ctx.getHandler();
393
394 try {
395 h.afterRemove(ctx);
396 } catch (Throwable t) {
397 throw new ChannelHandlerLifeCycleException(
398 h.getClass().getName() +
399 ".afterRemove() has thrown an exception.", t);
400 }
401 }
402
403 public synchronized ChannelHandler getFirst() {
404 DefaultChannelHandlerContext head = this.head;
405 if (head == null) {
406 return null;
407 }
408 return head.getHandler();
409 }
410
411 public synchronized ChannelHandler getLast() {
412 DefaultChannelHandlerContext tail = this.tail;
413 if (tail == null) {
414 return null;
415 }
416 return tail.getHandler();
417 }
418
419 public synchronized ChannelHandler get(String name) {
420 DefaultChannelHandlerContext ctx = name2ctx.get(name);
421 if (ctx == null) {
422 return null;
423 } else {
424 return ctx.getHandler();
425 }
426 }
427
428 public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
429 ChannelHandlerContext ctx = getContext(handlerType);
430 if (ctx == null) {
431 return null;
432 } else {
433 return (T) ctx.getHandler();
434 }
435 }
436
437 public synchronized ChannelHandlerContext getContext(String name) {
438 if (name == null) {
439 throw new NullPointerException("name");
440 }
441 return name2ctx.get(name);
442 }
443
444 public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
445 if (handler == null) {
446 throw new NullPointerException("handler");
447 }
448 if (name2ctx.isEmpty()) {
449 return null;
450 }
451 DefaultChannelHandlerContext ctx = head;
452 for (;;) {
453 if (ctx.getHandler() == handler) {
454 return ctx;
455 }
456
457 ctx = ctx.next;
458 if (ctx == null) {
459 break;
460 }
461 }
462 return null;
463 }
464
465 public synchronized ChannelHandlerContext getContext(
466 Class<? extends ChannelHandler> handlerType) {
467 if (handlerType == null) {
468 throw new NullPointerException("handlerType");
469 }
470
471 if (name2ctx.isEmpty()) {
472 return null;
473 }
474 DefaultChannelHandlerContext ctx = head;
475 for (;;) {
476 if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
477 return ctx;
478 }
479
480 ctx = ctx.next;
481 if (ctx == null) {
482 break;
483 }
484 }
485 return null;
486 }
487
488 public List<String> getNames() {
489 List<String> list = new ArrayList<String>();
490 if (name2ctx.isEmpty()) {
491 return list;
492 }
493
494 DefaultChannelHandlerContext ctx = head;
495 for (;;) {
496 list.add(ctx.getName());
497 ctx = ctx.next;
498 if (ctx == null) {
499 break;
500 }
501 }
502 return list;
503 }
504
505 public Map<String, ChannelHandler> toMap() {
506 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
507 if (name2ctx.isEmpty()) {
508 return map;
509 }
510
511 DefaultChannelHandlerContext ctx = head;
512 for (;;) {
513 map.put(ctx.getName(), ctx.getHandler());
514 ctx = ctx.next;
515 if (ctx == null) {
516 break;
517 }
518 }
519 return map;
520 }
521
522
523
524
525 @Override
526 public String toString() {
527 StringBuilder buf = new StringBuilder();
528 buf.append(getClass().getSimpleName());
529 buf.append('{');
530 DefaultChannelHandlerContext ctx = head;
531 if (ctx != null) {
532 for (;;) {
533 buf.append('(');
534 buf.append(ctx.getName());
535 buf.append(" = ");
536 buf.append(ctx.getHandler().getClass().getName());
537 buf.append(')');
538 ctx = ctx.next;
539 if (ctx == null) {
540 break;
541 }
542 buf.append(", ");
543 }
544 }
545 buf.append('}');
546 return buf.toString();
547 }
548
549 public void sendUpstream(ChannelEvent e) {
550 DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
551 if (head == null) {
552 if (logger.isWarnEnabled()) {
553 logger.warn(
554 "The pipeline contains no upstream handlers; discarding: " + e);
555 }
556
557 return;
558 }
559
560 sendUpstream(head, e);
561 }
562
563 void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
564 try {
565 ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
566 } catch (Throwable t) {
567 notifyHandlerException(e, t);
568 }
569 }
570
571 public void sendDownstream(ChannelEvent e) {
572 DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
573 if (tail == null) {
574 try {
575 getSink().eventSunk(this, e);
576 return;
577 } catch (Throwable t) {
578 notifyHandlerException(e, t);
579 return;
580 }
581 }
582
583 sendDownstream(tail, e);
584 }
585
586 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
587 if (e instanceof UpstreamMessageEvent) {
588 throw new IllegalArgumentException("cannot send an upstream event to downstream");
589 }
590
591 try {
592 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
593 } catch (Throwable t) {
594
595
596
597
598
599 e.getFuture().setFailure(t);
600 notifyHandlerException(e, t);
601 }
602 }
603
604 DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
605 if (ctx == null) {
606 return null;
607 }
608
609 DefaultChannelHandlerContext realCtx = ctx;
610 while (!realCtx.canHandleUpstream()) {
611 realCtx = realCtx.next;
612 if (realCtx == null) {
613 return null;
614 }
615 }
616
617 return realCtx;
618 }
619
620 DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
621 if (ctx == null) {
622 return null;
623 }
624
625 DefaultChannelHandlerContext realCtx = ctx;
626 while (!realCtx.canHandleDownstream()) {
627 realCtx = realCtx.prev;
628 if (realCtx == null) {
629 return null;
630 }
631 }
632
633 return realCtx;
634 }
635
636 public ChannelFuture execute(Runnable task) {
637 return getSink().execute(this, task);
638 }
639
640 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
641 if (e instanceof ExceptionEvent) {
642 if (logger.isWarnEnabled()) {
643 logger.warn(
644 "An exception was thrown by a user handler " +
645 "while handling an exception event (" + e + ")", t);
646 }
647
648 return;
649 }
650
651 ChannelPipelineException pe;
652 if (t instanceof ChannelPipelineException) {
653 pe = (ChannelPipelineException) t;
654 } else {
655 pe = new ChannelPipelineException(t);
656 }
657
658 try {
659 sink.exceptionCaught(this, e, pe);
660 } catch (Exception e1) {
661 if (logger.isWarnEnabled()) {
662 logger.warn("An exception was thrown by an exception handler.", e1);
663 }
664 }
665 }
666
667 private void init(String name, ChannelHandler handler) {
668 DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
669 callBeforeAdd(ctx);
670 head = tail = ctx;
671 name2ctx.clear();
672 name2ctx.put(name, ctx);
673 callAfterAdd(ctx);
674 }
675
676 private void checkDuplicateName(String name) {
677 if (name2ctx.containsKey(name)) {
678 throw new IllegalArgumentException("Duplicate handler name: " + name);
679 }
680 }
681
682 private DefaultChannelHandlerContext getContextOrDie(String name) {
683 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
684 if (ctx == null) {
685 throw new NoSuchElementException(name);
686 } else {
687 return ctx;
688 }
689 }
690
691 private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
692 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
693 if (ctx == null) {
694 throw new NoSuchElementException(handler.getClass().getName());
695 } else {
696 return ctx;
697 }
698 }
699
700 private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
701 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
702 if (ctx == null) {
703 throw new NoSuchElementException(handlerType.getName());
704 } else {
705 return ctx;
706 }
707 }
708
709 private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
710 volatile DefaultChannelHandlerContext next;
711 volatile DefaultChannelHandlerContext prev;
712 private final String name;
713 private final ChannelHandler handler;
714 private final boolean canHandleUpstream;
715 private final boolean canHandleDownstream;
716 private volatile Object attachment;
717
718 DefaultChannelHandlerContext(
719 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
720 String name, ChannelHandler handler) {
721
722 if (name == null) {
723 throw new NullPointerException("name");
724 }
725 if (handler == null) {
726 throw new NullPointerException("handler");
727 }
728 canHandleUpstream = handler instanceof ChannelUpstreamHandler;
729 canHandleDownstream = handler instanceof ChannelDownstreamHandler;
730
731
732 if (!canHandleUpstream && !canHandleDownstream) {
733 throw new IllegalArgumentException(
734 "handler must be either " +
735 ChannelUpstreamHandler.class.getName() + " or " +
736 ChannelDownstreamHandler.class.getName() + '.');
737 }
738
739 this.prev = prev;
740 this.next = next;
741 this.name = name;
742 this.handler = handler;
743 }
744
745 public Channel getChannel() {
746 return getPipeline().getChannel();
747 }
748
749 public ChannelPipeline getPipeline() {
750 return DefaultChannelPipeline.this;
751 }
752
753 public boolean canHandleDownstream() {
754 return canHandleDownstream;
755 }
756
757 public boolean canHandleUpstream() {
758 return canHandleUpstream;
759 }
760
761 public ChannelHandler getHandler() {
762 return handler;
763 }
764
765 public String getName() {
766 return name;
767 }
768
769 public Object getAttachment() {
770 return attachment;
771 }
772
773 public void setAttachment(Object attachment) {
774 this.attachment = attachment;
775 }
776
777 public void sendDownstream(ChannelEvent e) {
778 DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
779 if (prev == null) {
780 try {
781 getSink().eventSunk(DefaultChannelPipeline.this, e);
782 } catch (Throwable t) {
783 notifyHandlerException(e, t);
784 }
785 } else {
786 DefaultChannelPipeline.this.sendDownstream(prev, e);
787 }
788 }
789
790 public void sendUpstream(ChannelEvent e) {
791 DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
792 if (next != null) {
793 DefaultChannelPipeline.this.sendUpstream(next, e);
794 }
795 }
796 }
797
798 private static final class DiscardingChannelSink implements ChannelSink {
799 DiscardingChannelSink() {
800 super();
801 }
802
803 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
804 if (logger.isWarnEnabled()) {
805 logger.warn("Not attached yet; discarding: " + e);
806 }
807
808 }
809
810 public void exceptionCaught(ChannelPipeline pipeline,
811 ChannelEvent e, ChannelPipelineException cause) throws Exception {
812 throw cause;
813 }
814
815
816 public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
817 if (logger.isWarnEnabled()) {
818 logger.warn("Not attached yet; rejecting: " + task);
819 }
820 return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet"));
821 }
822 }
823 }