View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.embedder;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.lang.reflect.Array;
21  import java.util.ConcurrentModificationException;
22  import java.util.LinkedList;
23  import java.util.Queue;
24  
25  import org.jboss.netty.buffer.ChannelBufferFactory;
26  import org.jboss.netty.channel.Channel;
27  import org.jboss.netty.channel.ChannelEvent;
28  import org.jboss.netty.channel.ChannelFuture;
29  import org.jboss.netty.channel.ChannelHandler;
30  import org.jboss.netty.channel.ChannelHandlerContext;
31  import org.jboss.netty.channel.ChannelPipeline;
32  import org.jboss.netty.channel.ChannelPipelineException;
33  import org.jboss.netty.channel.ChannelSink;
34  import org.jboss.netty.channel.ChannelUpstreamHandler;
35  import org.jboss.netty.channel.Channels;
36  import org.jboss.netty.channel.DefaultChannelPipeline;
37  import org.jboss.netty.channel.ExceptionEvent;
38  import org.jboss.netty.channel.MessageEvent;
39  
40  /**
41   * A skeletal {@link CodecEmbedder} implementation.
42   */
43  abstract class AbstractCodecEmbedder<E> implements CodecEmbedder<E> {
44  
45      private final Channel channel;
46      private final ChannelPipeline pipeline;
47      private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
48  
49      final Queue<Object> productQueue = new LinkedList<Object>();
50  
51      /**
52       * Creates a new embedder whose pipeline is composed of the specified
53       * handlers.
54       */
55      protected AbstractCodecEmbedder(ChannelHandler... handlers) {
56          pipeline = new EmbeddedChannelPipeline();
57          configurePipeline(handlers);
58          channel = new EmbeddedChannel(pipeline, sink);
59          fireInitialEvents();
60      }
61  
62      /**
63       * Creates a new embedder whose pipeline is composed of the specified
64       * handlers.
65       *
66       * @param bufferFactory the {@link ChannelBufferFactory} to be used when
67       *                      creating a new buffer.
68       */
69      protected AbstractCodecEmbedder(ChannelBufferFactory bufferFactory, ChannelHandler... handlers) {
70          this(handlers);
71          getChannel().getConfig().setBufferFactory(bufferFactory);
72      }
73  
74      private void fireInitialEvents() {
75          // Fire the typical initial events.
76          fireChannelOpen(channel);
77          fireChannelBound(channel, channel.getLocalAddress());
78          fireChannelConnected(channel, channel.getRemoteAddress());
79      }
80  
81      private void configurePipeline(ChannelHandler... handlers) {
82          if (handlers == null) {
83              throw new NullPointerException("handlers");
84          }
85  
86          if (handlers.length == 0) {
87              throw new IllegalArgumentException(
88                      "handlers should contain at least one " +
89                      ChannelHandler.class.getSimpleName() + '.');
90          }
91  
92          for (int i = 0; i < handlers.length; i ++) {
93              ChannelHandler h = handlers[i];
94              if (h == null) {
95                  throw new NullPointerException("handlers[" + i + "]");
96              }
97              pipeline.addLast(String.valueOf(i), handlers[i]);
98          }
99          pipeline.addLast("SINK", sink);
100     }
101 
102     public boolean finish() {
103         close(channel);
104         fireChannelDisconnected(channel);
105         fireChannelUnbound(channel);
106         fireChannelClosed(channel);
107         return !productQueue.isEmpty();
108     }
109 
110     /**
111      * Returns the virtual {@link Channel} which will be used as a mock
112      * during encoding and decoding.
113      */
114     protected final Channel getChannel() {
115         return channel;
116     }
117 
118     /**
119      * Returns {@code true} if and only if the produce queue is empty and
120      * therefore {@link #poll()} will return {@code null}.
121      */
122     protected final boolean isEmpty() {
123         return productQueue.isEmpty();
124     }
125 
126     public final E poll() {
127         return (E) productQueue.poll();
128     }
129 
130     public final E peek() {
131         return (E) productQueue.peek();
132     }
133 
134     public final Object[] pollAll() {
135         final int size = size();
136         Object[] a = new Object[size];
137         for (int i = 0; i < size; i ++) {
138             E product = poll();
139             if (product == null) {
140                 throw new ConcurrentModificationException();
141             }
142             a[i] = product;
143         }
144         return a;
145     }
146 
147     @SuppressWarnings("unchecked")
148     public final <T> T[] pollAll(T[] a) {
149         if (a == null) {
150             throw new NullPointerException("a");
151         }
152 
153         final int size = size();
154 
155         // Create a new array if the specified one is too small.
156         if (a.length < size) {
157             a = (T[]) Array.newInstance(a.getClass().getComponentType(), size);
158         }
159 
160         for (int i = 0;; i ++) {
161             T product = (T) poll();
162             if (product == null) {
163                 break;
164             }
165             a[i] = product;
166         }
167 
168         // Put the terminator if necessary.
169         if (a.length > size) {
170             a[size] = null;
171         }
172 
173         return a;
174     }
175 
176     public final int size() {
177         return productQueue.size();
178     }
179 
180     public ChannelPipeline getPipeline() {
181         return pipeline;
182     }
183 
184     private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
185         EmbeddedChannelSink() {
186             super();
187         }
188 
189         public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
190             handleEvent(e);
191         }
192 
193         public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
194             handleEvent(e);
195         }
196 
197         private void handleEvent(ChannelEvent e) {
198             if (e instanceof MessageEvent) {
199                 boolean offered = productQueue.offer(((MessageEvent) e).getMessage());
200                 assert offered;
201             } else if (e instanceof ExceptionEvent) {
202                 throw new CodecEmbedderException(((ExceptionEvent) e).getCause());
203             }
204 
205             // Swallow otherwise.
206         }
207 
208         public void exceptionCaught(
209                 ChannelPipeline pipeline, ChannelEvent e,
210                 ChannelPipelineException cause) throws Exception {
211             Throwable actualCause = cause.getCause();
212             if (actualCause == null) {
213                 actualCause = cause;
214             }
215 
216             throw new CodecEmbedderException(actualCause);
217         }
218 
219         public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
220             try {
221                 task.run();
222                 return Channels.succeededFuture(pipeline.getChannel());
223             } catch (Throwable t) {
224                 return Channels.failedFuture(pipeline.getChannel(), t);
225             }
226         }
227     }
228 
229     private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
230 
231         EmbeddedChannelPipeline() {
232             super();
233         }
234 
235         @Override
236         protected void notifyHandlerException(ChannelEvent e, Throwable t) {
237             while (t instanceof ChannelPipelineException && t.getCause() != null) {
238                 t = t.getCause();
239             }
240             if (t instanceof CodecEmbedderException) {
241                 throw (CodecEmbedderException) t;
242             } else {
243                 throw new CodecEmbedderException(t);
244             }
245         }
246     }
247 }