View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.ssl;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.ClosedChannelException;
23  import java.nio.channels.DatagramChannel;
24  import java.nio.channels.SocketChannel;
25  import java.util.LinkedList;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.regex.Pattern;
31  
32  import javax.net.ssl.SSLEngine;
33  import javax.net.ssl.SSLEngineResult;
34  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
35  import javax.net.ssl.SSLEngineResult.Status;
36  import javax.net.ssl.SSLException;
37  
38  import org.jboss.netty.buffer.ChannelBuffer;
39  import org.jboss.netty.buffer.ChannelBuffers;
40  import org.jboss.netty.channel.Channel;
41  import org.jboss.netty.channel.ChannelDownstreamHandler;
42  import org.jboss.netty.channel.ChannelEvent;
43  import org.jboss.netty.channel.ChannelFuture;
44  import org.jboss.netty.channel.ChannelFutureListener;
45  import org.jboss.netty.channel.ChannelHandlerContext;
46  import org.jboss.netty.channel.ChannelPipeline;
47  import org.jboss.netty.channel.ChannelStateEvent;
48  import org.jboss.netty.channel.Channels;
49  import org.jboss.netty.channel.DefaultChannelFuture;
50  import org.jboss.netty.channel.DownstreamMessageEvent;
51  import org.jboss.netty.channel.ExceptionEvent;
52  import org.jboss.netty.channel.MessageEvent;
53  import org.jboss.netty.handler.codec.frame.FrameDecoder;
54  import org.jboss.netty.logging.InternalLogger;
55  import org.jboss.netty.logging.InternalLoggerFactory;
56  import org.jboss.netty.util.internal.DetectionUtil;
57  import org.jboss.netty.util.internal.NonReentrantLock;
58  
59  /**
60   * Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
61   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
62   * to the <strong>"SecureChat"</strong> example in the distribution or the web
63   * site for the detailed usage.
64   *
65   * <h3>Beginning the handshake</h3>
66   * <p>
67   * You must make sure not to write a message while the
68   * {@linkplain #handshake() handshake} is in progress unless you are
69   * renegotiating.  You will be notified by the {@link ChannelFuture} which is
70   * returned by the {@link #handshake()} method when the handshake
71   * process succeeds or fails.
72   *
73   * <h3>Handshake</h3>
74   * <p>
75   * If {@link #isIssueHandshake()} is {@code false}
76   * (default) you will need to take care of calling {@link #handshake()} by your own. In most
77   * situations were {@link SslHandler} is used in 'client mode' you want to issue a handshake once
78   * the connection was established. if {@link #setIssueHandshake(boolean)} is set to <code>true</code>
79   * you don't need to worry about this as the {@link SslHandler} will take care of it.
80   * <p>
81   *
82   * <h3>Renegotiation</h3>
83   * <p>
84   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code true}
85   * (default) and the initial handshake has been done successfully, you can call
86   * {@link #handshake()} to trigger the renegotiation.
87   * <p>
88   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code false},
89   * an attempt to trigger renegotiation will result in the connection closure.
90   * <p>
91   * Please note that TLS renegotiation had a security issue before.  If your
92   * runtime environment did not fix it, please make sure to disable TLS
93   * renegotiation by calling {@link #setEnableRenegotiation(boolean)} with
94   * {@code false}.  For more information, please refer to the following documents:
95   * <ul>
96   *   <li><a href="http://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2009-3555">CVE-2009-3555</a></li>
97   *   <li><a href="http://www.ietf.org/rfc/rfc5746.txt">RFC5746</a></li>
98   *   <li><a href="http://www.oracle.com/technetwork/java/javase/documentation/tlsreadme2-176330.html">Phased
99   *       Approach to Fixing the TLS Renegotiation Issue</a></li>
100  * </ul>
101  *
102  * <h3>Closing the session</h3>
103  * <p>
104  * To close the SSL session, the {@link #close()} method should be
105  * called to send the {@code close_notify} message to the remote peer.  One
106  * exception is when you close the {@link Channel} - {@link SslHandler}
107  * intercepts the close request and send the {@code close_notify} message
108  * before the channel closure automatically.  Once the SSL session is closed,
109  * it is not reusable, and consequently you should create a new
110  * {@link SslHandler} with a new {@link SSLEngine} as explained in the
111  * following section.
112  *
113  * <h3>Restarting the session</h3>
114  * <p>
115  * To restart the SSL session, you must remove the existing closed
116  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
117  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
118  * and start the handshake process as described in the first section.
119  *
120  * <h3>Implementing StartTLS</h3>
121  * <p>
122  * <a href="http://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
123  * communication pattern that secures the wire in the middle of the plaintext
124  * connection.  Please note that it is different from SSL &middot; TLS, that
125  * secures the wire from the beginning of the connection.  Typically, StartTLS
126  * is composed of three steps:
127  * <ol>
128  * <li>Client sends a StartTLS request to server.</li>
129  * <li>Server sends a StartTLS response to client.</li>
130  * <li>Client begins SSL handshake.</li>
131  * </ol>
132  * If you implement a server, you need to:
133  * <ol>
134  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
135  *     to {@code true},</li>
136  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
137  * <li>write a StartTLS response.</li>
138  * </ol>
139  * Please note that you must insert {@link SslHandler} <em>before</em> sending
140  * the StartTLS response.  Otherwise the client can send begin SSL handshake
141  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
142  * data corruption.
143  * <p>
144  * The client-side implementation is much simpler.
145  * <ol>
146  * <li>Write a StartTLS request,</li>
147  * <li>wait for the StartTLS response,</li>
148  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
149  *     to {@code false},</li>
150  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
151  * <li>Initiate SSL handshake by calling {@link SslHandler#handshake()}.</li>
152  * </ol>
153  *
154  * @apiviz.landmark
155  * @apiviz.uses org.jboss.netty.handler.ssl.SslBufferPool
156  */
157 public class SslHandler extends FrameDecoder
158                         implements ChannelDownstreamHandler {
159 
160     private static final InternalLogger logger =
161         InternalLoggerFactory.getInstance(SslHandler.class);
162 
163     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
164 
165     private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
166             "^.*(Socket|DatagramChannel|SctpChannel).*$");
167     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
168             "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
169             Pattern.CASE_INSENSITIVE);
170 
171     private static SslBufferPool defaultBufferPool;
172 
173     /**
174      * Returns the default {@link SslBufferPool} used when no pool is
175      * specified in the constructor.
176      */
177     public static synchronized SslBufferPool getDefaultBufferPool() {
178         if (defaultBufferPool == null) {
179             defaultBufferPool = new SslBufferPool();
180         }
181         return defaultBufferPool;
182     }
183 
184     private volatile ChannelHandlerContext ctx;
185     private final SSLEngine engine;
186     private final SslBufferPool bufferPool;
187     private final Executor delegatedTaskExecutor;
188     private final boolean startTls;
189 
190     private volatile boolean enableRenegotiation = true;
191 
192     final Object handshakeLock = new Object();
193     private boolean handshaking;
194     private volatile boolean handshaken;
195     private volatile ChannelFuture handshakeFuture;
196 
197     private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
198     private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
199     int ignoreClosedChannelException;
200     final Object ignoreClosedChannelExceptionLock = new Object();
201     private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
202     private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
203     private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
204 
205     private volatile boolean issueHandshake;
206 
207     private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
208 
209     private boolean closeOnSSLException;
210 
211     /**
212      * Creates a new instance.
213      *
214      * @param engine  the {@link SSLEngine} this handler will use
215      */
216     public SslHandler(SSLEngine engine) {
217         this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
218     }
219 
220     /**
221      * Creates a new instance.
222      *
223      * @param engine      the {@link SSLEngine} this handler will use
224      * @param bufferPool  the {@link SslBufferPool} where this handler will
225      *                    acquire the buffers required by the {@link SSLEngine}
226      */
227     public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
228         this(engine, bufferPool, ImmediateExecutor.INSTANCE);
229     }
230 
231     /**
232      * Creates a new instance.
233      *
234      * @param engine    the {@link SSLEngine} this handler will use
235      * @param startTls  {@code true} if the first write request shouldn't be
236      *                  encrypted by the {@link SSLEngine}
237      */
238     public SslHandler(SSLEngine engine, boolean startTls) {
239         this(engine, getDefaultBufferPool(), startTls);
240     }
241 
242     /**
243      * Creates a new instance.
244      *
245      * @param engine      the {@link SSLEngine} this handler will use
246      * @param bufferPool  the {@link SslBufferPool} where this handler will
247      *                    acquire the buffers required by the {@link SSLEngine}
248      * @param startTls    {@code true} if the first write request shouldn't be
249      *                    encrypted by the {@link SSLEngine}
250      */
251     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
252         this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
253     }
254 
255     /**
256      * Creates a new instance.
257      *
258      * @param engine
259      *        the {@link SSLEngine} this handler will use
260      * @param delegatedTaskExecutor
261      *        the {@link Executor} which will execute the delegated task
262      *        that {@link SSLEngine#getDelegatedTask()} will return
263      */
264     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
265         this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
266     }
267 
268     /**
269      * Creates a new instance.
270      *
271      * @param engine
272      *        the {@link SSLEngine} this handler will use
273      * @param bufferPool
274      *        the {@link SslBufferPool} where this handler will acquire
275      *        the buffers required by the {@link SSLEngine}
276      * @param delegatedTaskExecutor
277      *        the {@link Executor} which will execute the delegated task
278      *        that {@link SSLEngine#getDelegatedTask()} will return
279      */
280     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
281         this(engine, bufferPool, false, delegatedTaskExecutor);
282     }
283 
284     /**
285      * Creates a new instance.
286      *
287      * @param engine
288      *        the {@link SSLEngine} this handler will use
289      * @param startTls
290      *        {@code true} if the first write request shouldn't be encrypted
291      *        by the {@link SSLEngine}
292      * @param delegatedTaskExecutor
293      *        the {@link Executor} which will execute the delegated task
294      *        that {@link SSLEngine#getDelegatedTask()} will return
295      */
296     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
297         this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
298     }
299 
300     /**
301      * Creates a new instance.
302      *
303      * @param engine
304      *        the {@link SSLEngine} this handler will use
305      * @param bufferPool
306      *        the {@link SslBufferPool} where this handler will acquire
307      *        the buffers required by the {@link SSLEngine}
308      * @param startTls
309      *        {@code true} if the first write request shouldn't be encrypted
310      *        by the {@link SSLEngine}
311      * @param delegatedTaskExecutor
312      *        the {@link Executor} which will execute the delegated task
313      *        that {@link SSLEngine#getDelegatedTask()} will return
314      */
315     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
316         if (engine == null) {
317             throw new NullPointerException("engine");
318         }
319         if (bufferPool == null) {
320             throw new NullPointerException("bufferPool");
321         }
322         if (delegatedTaskExecutor == null) {
323             throw new NullPointerException("delegatedTaskExecutor");
324         }
325         this.engine = engine;
326         this.bufferPool = bufferPool;
327         this.delegatedTaskExecutor = delegatedTaskExecutor;
328         this.startTls = startTls;
329     }
330 
331     /**
332      * Returns the {@link SSLEngine} which is used by this handler.
333      */
334     public SSLEngine getEngine() {
335         return engine;
336     }
337 
338     /**
339      * Starts an SSL / TLS handshake for the specified channel.
340      *
341      * @return a {@link ChannelFuture} which is notified when the handshake
342      *         succeeds or fails.
343      */
344     public ChannelFuture handshake() {
345         if (handshaken && !isEnableRenegotiation()) {
346             throw new IllegalStateException("renegotiation disabled");
347         }
348 
349         ChannelHandlerContext ctx = this.ctx;
350         Channel channel = ctx.getChannel();
351         ChannelFuture handshakeFuture;
352         Exception exception = null;
353 
354         synchronized (handshakeLock) {
355             if (handshaking) {
356                 return this.handshakeFuture;
357             } else {
358                 handshaking = true;
359                 try {
360                     engine.beginHandshake();
361                     runDelegatedTasks();
362                     handshakeFuture = this.handshakeFuture = future(channel);
363                 } catch (Exception e) {
364                     handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
365                     exception = e;
366                 }
367             }
368         }
369 
370         if (exception == null) { // Began handshake successfully.
371             try {
372                 wrapNonAppData(ctx, channel);
373             } catch (SSLException e) {
374                 handshakeFuture.setFailure(e);
375 
376                 fireExceptionCaught(ctx, e);
377                 if (closeOnSSLException) {
378                     Channels.close(ctx, future(channel));
379                 }
380             }
381         } else { // Failed to initiate handshake.
382             fireExceptionCaught(ctx, exception);
383             if (closeOnSSLException) {
384                 Channels.close(ctx, future(channel));
385             }
386         }
387 
388         return handshakeFuture;
389     }
390 
391     /**
392      * @deprecated Use {@link #handshake()} instead.
393      */
394     @Deprecated
395     public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
396         return handshake();
397     }
398 
399     /**
400      * Sends an SSL {@code close_notify} message to the specified channel and
401      * destroys the underlying {@link SSLEngine}.
402      */
403     public ChannelFuture close() {
404         ChannelHandlerContext ctx = this.ctx;
405         Channel channel = ctx.getChannel();
406         try {
407             engine.closeOutbound();
408             return wrapNonAppData(ctx, channel);
409         } catch (SSLException e) {
410             fireExceptionCaught(ctx, e);
411             if (closeOnSSLException) {
412                 Channels.close(ctx, future(channel));
413             }
414             return failedFuture(channel, e);
415         }
416     }
417 
418     /**
419      * @deprecated Use {@link #close()} instead.
420      */
421     @Deprecated
422     public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
423         return close();
424     }
425 
426     /**
427      * Returns {@code true} if and only if TLS renegotiation is enabled.
428      */
429     public boolean isEnableRenegotiation() {
430         return enableRenegotiation;
431     }
432 
433     /**
434      * Enables or disables TLS renegotiation.
435      */
436     public void setEnableRenegotiation(boolean enableRenegotiation) {
437         this.enableRenegotiation = enableRenegotiation;
438     }
439 
440     /**
441      * Enables or disables the automatic handshake once the {@link Channel} is
442      * connected. The value will only have affect if its set before the
443      * {@link Channel} is connected.
444      */
445     public void setIssueHandshake(boolean issueHandshake) {
446         this.issueHandshake = issueHandshake;
447     }
448 
449     /**
450      * Returns <code>true</code> if the automatic handshake is enabled
451      */
452     public boolean isIssueHandshake() {
453         return issueHandshake;
454     }
455 
456     /**
457      * Return the {@link ChannelFuture} that will get notified if the inbound of the {@link SSLEngine} will get closed.
458      *
459      * This method will return the same {@link ChannelFuture} all the time.
460      *
461      * For more informations see the apidocs of {@link SSLEngine}
462      *
463      */
464     public ChannelFuture getSSLEngineInboundCloseFuture() {
465         return sslEngineCloseFuture;
466 
467     }
468 
469     /**
470      * If set to <code>true</code>, the {@link Channel} will automatically get closed
471      * one a {@link SSLException} was caught. This is most times what you want, as after this
472      * its almost impossible to recover.
473      *
474      * Anyway the default is <code>false</code> to not break compatibility with older releases. This
475      * will be changed to <code>true</code> in the next major release.
476      *
477      */
478     public void setCloseOnSSLException(boolean closeOnSslException) {
479         if (ctx != null) {
480             throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
481         }
482         this.closeOnSSLException = closeOnSslException;
483     }
484 
485     public boolean getCloseOnSSLException() {
486         return closeOnSSLException;
487     }
488 
489     public void handleDownstream(
490             final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
491         if (evt instanceof ChannelStateEvent) {
492             ChannelStateEvent e = (ChannelStateEvent) evt;
493             switch (e.getState()) {
494             case OPEN:
495             case CONNECTED:
496             case BOUND:
497                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
498                     closeOutboundAndChannel(context, e);
499                     return;
500                 }
501             }
502         }
503         if (!(evt instanceof MessageEvent)) {
504             context.sendDownstream(evt);
505             return;
506         }
507 
508         MessageEvent e = (MessageEvent) evt;
509         if (!(e.getMessage() instanceof ChannelBuffer)) {
510             context.sendDownstream(evt);
511             return;
512         }
513 
514         // Do not encrypt the first write request if this handler is
515         // created with startTLS flag turned on.
516         if (startTls && sentFirstMessage.compareAndSet(false, true)) {
517             context.sendDownstream(evt);
518             return;
519         }
520 
521         // Otherwise, all messages are encrypted.
522         ChannelBuffer msg = (ChannelBuffer) e.getMessage();
523         PendingWrite pendingWrite;
524 
525         if (msg.readable()) {
526             pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
527         } else {
528             pendingWrite = new PendingWrite(evt.getFuture(), null);
529         }
530         synchronized (pendingUnencryptedWrites) {
531             boolean offered = pendingUnencryptedWrites.offer(pendingWrite);
532             assert offered;
533         }
534 
535         wrap(context, evt.getChannel());
536     }
537 
538     @Override
539     public void channelDisconnected(ChannelHandlerContext ctx,
540             ChannelStateEvent e) throws Exception {
541 
542         // Make sure the handshake future is notified when a connection has
543         // been closed during handshake.
544         synchronized (handshakeLock) {
545             if (handshaking) {
546                 handshakeFuture.setFailure(new ClosedChannelException());
547             }
548         }
549 
550         try {
551             super.channelDisconnected(ctx, e);
552         } finally {
553             unwrap(ctx, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
554             engine.closeOutbound();
555             if (!sentCloseNotify.get() && handshaken) {
556                 try {
557                     engine.closeInbound();
558                 } catch (SSLException ex) {
559                     if (logger.isDebugEnabled()) {
560                         logger.debug("Failed to clean up SSLEngine.", ex);
561                     }
562                 }
563             }
564         }
565     }
566 
567     @Override
568     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
569             throws Exception {
570 
571         Throwable cause = e.getCause();
572         if (cause instanceof IOException) {
573             if (cause instanceof ClosedChannelException) {
574                 synchronized (ignoreClosedChannelExceptionLock) {
575                     if (ignoreClosedChannelException > 0) {
576                         ignoreClosedChannelException --;
577                         if (logger.isDebugEnabled()) {
578                             logger.debug(
579                                     "Swallowing an exception raised while " +
580                                     "writing non-app data", cause);
581                         }
582 
583                         return;
584                     }
585                 }
586             } else {
587                 if (ignoreException(cause)) {
588                     return;
589                 }
590             }
591         }
592 
593         ctx.sendUpstream(e);
594     }
595 
596     /**
597      * Checks if the given {@link Throwable} can be ignore and just "swallowed"
598      *
599      * When an ssl connection is closed a close_notify message is sent.
600      * After that the peer also sends close_notify however, it's not mandatory to receive
601      * the close_notify. The party who sent the initial close_notify can close the connection immediately
602      * then the peer will get connection reset error.
603      *
604      */
605     private boolean ignoreException(Throwable t) {
606         if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
607             String message = String.valueOf(t.getMessage()).toLowerCase();
608 
609             // first try to match connection reset / broke peer based on the regex. This is the fastest way
610             // but may fail on different jdk impls or OS's
611             if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
612                 return true;
613             }
614 
615             // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
616             StackTraceElement[] elements = t.getStackTrace();
617             for (StackTraceElement element: elements) {
618                 String classname = element.getClassName();
619                 String methodname = element.getMethodName();
620 
621                 // skip all classes that belong to the io.netty package
622                 if (classname.startsWith("org.jboss.netty.")) {
623                     continue;
624                 }
625 
626                 // check if the method name is read if not skip it
627                 if (!methodname.equals("read")) {
628                     continue;
629                 }
630 
631                 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
632                 // also others
633                 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
634                     return true;
635                 }
636 
637                 try {
638                     // No match by now.. Try to load the class via classloader and inspect it.
639                     // This is mainly done as other JDK implementations may differ in name of
640                     // the impl.
641                     Class<?> clazz = getClass().getClassLoader().loadClass(classname);
642 
643                     if (SocketChannel.class.isAssignableFrom(clazz)
644                             || DatagramChannel.class.isAssignableFrom(clazz)) {
645                         return true;
646                     }
647 
648                     // also match against SctpChannel via String matching as it may not present.
649                     if (DetectionUtil.javaVersion() >= 7
650                             && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
651                         return true;
652                     }
653                 } catch (ClassNotFoundException e) {
654                     // This should not happen just ignore
655                 }
656 
657             }
658         }
659 
660         return false;
661     }
662 
663     @Override
664     protected Object decode(
665             final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
666 
667         if (buffer.readableBytes() < 5) {
668             return null;
669         }
670 
671         int packetLength = 0;
672 
673         // SSLv3 or TLS - Check ContentType
674         boolean tls;
675         switch (buffer.getUnsignedByte(buffer.readerIndex())) {
676         case 20:  // change_cipher_spec
677         case 21:  // alert
678         case 22:  // handshake
679         case 23:  // application_data
680             tls = true;
681             break;
682         default:
683             // SSLv2 or bad data
684             tls = false;
685         }
686 
687         if (tls) {
688             // SSLv3 or TLS - Check ProtocolVersion
689             int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
690             if (majorVersion == 3) {
691                 // SSLv3 or TLS
692                 packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
693                 if (packetLength <= 5) {
694                     // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
695                     tls = false;
696                 }
697             } else {
698                 // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
699                 tls = false;
700             }
701         }
702 
703         if (!tls) {
704             // SSLv2 or bad data - Check the version
705             boolean sslv2 = true;
706             int headerLength = (buffer.getUnsignedByte(
707                     buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
708             int majorVersion = buffer.getUnsignedByte(
709                     buffer.readerIndex() + headerLength + 1);
710             if (majorVersion == 2 || majorVersion == 3) {
711                 // SSLv2
712                 if (headerLength == 2) {
713                     packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
714                 } else {
715                     packetLength = (getShort(buffer, buffer.readerIndex()) & 0x3FFF) + 3;
716                 }
717                 if (packetLength <= headerLength) {
718                     sslv2 = false;
719                 }
720             } else {
721                 sslv2 = false;
722             }
723 
724             if (!sslv2) {
725                 // Bad data - discard the buffer and raise an exception.
726                 NotSslRecordException e = new NotSslRecordException(
727                         "not an SSL/TLS record: " + ChannelBuffers.hexDump(buffer));
728                 buffer.skipBytes(buffer.readableBytes());
729                 if (closeOnSSLException) {
730                     // first trigger the exception and then close the channel
731                     fireExceptionCaught(ctx, e);
732                     Channels.close(ctx, future(channel));
733 
734                     // just return null as we closed the channel before, that
735                     // will take care of cleanup etc
736                     return null;
737                 } else {
738                     throw e;
739                 }
740             }
741         }
742 
743         assert packetLength > 0;
744 
745         if (buffer.readableBytes() < packetLength) {
746             return null;
747         }
748 
749         // We advance the buffer's readerIndex before calling unwrap() because
750         // unwrap() can trigger FrameDecoder call decode(), this method, recursively.
751         // The recursive call results in decoding the same packet twice if
752         // the readerIndex is advanced *after* decode().
753         //
754         // Here's an example:
755         // 1) An SSL packet is received from the wire.
756         // 2) SslHandler.decode() deciphers the packet and calls the user code.
757         // 3) The user closes the channel in the same thread.
758         // 4) The same thread triggers a channelDisconnected() event.
759         // 5) FrameDecoder.cleanup() is called, and it calls SslHandler.decode().
760         // 6) SslHandler.decode() will feed the same packet with what was
761         //    deciphered at the step 2 again if the readerIndex was not advanced
762         //    before calling the user code.
763         final int packetOffset = buffer.readerIndex();
764         buffer.skipBytes(packetLength);
765         return unwrap(ctx, channel, buffer, packetOffset, packetLength);
766     }
767 
768     /**
769      * Reads a big-endian short integer from the buffer.  Please note that we do not use
770      * {@link ChannelBuffer#getShort(int)} because it might be a little-endian buffer.
771      */
772     private static short getShort(ChannelBuffer buf, int offset) {
773         return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
774     }
775 
776     private ChannelFuture wrap(ChannelHandlerContext context, Channel channel)
777             throws SSLException {
778 
779         ChannelFuture future = null;
780         ChannelBuffer msg;
781         ByteBuffer outNetBuf = bufferPool.acquireBuffer();
782         boolean success = true;
783         boolean offered = false;
784         boolean needsUnwrap = false;
785         try {
786             loop:
787             for (;;) {
788                 // Acquire a lock to make sure unencrypted data is polled
789                 // in order and their encrypted counterpart is offered in
790                 // order.
791                 synchronized (pendingUnencryptedWrites) {
792                     PendingWrite pendingWrite = pendingUnencryptedWrites.peek();
793                     if (pendingWrite == null) {
794                         break;
795                     }
796 
797                     ByteBuffer outAppBuf = pendingWrite.outAppBuf;
798                     if (outAppBuf == null) {
799                         // A write request with an empty buffer
800                         pendingUnencryptedWrites.remove();
801                         offerEncryptedWriteRequest(
802                                 new DownstreamMessageEvent(
803                                         channel, pendingWrite.future,
804                                         ChannelBuffers.EMPTY_BUFFER,
805                                         channel.getRemoteAddress()));
806                         offered = true;
807                     } else {
808                         SSLEngineResult result = null;
809                         try {
810                             synchronized (handshakeLock) {
811                                 result = engine.wrap(outAppBuf, outNetBuf);
812                             }
813                         } finally {
814                             if (!outAppBuf.hasRemaining()) {
815                                 pendingUnencryptedWrites.remove();
816                             }
817                         }
818 
819                         if (result.bytesProduced() > 0) {
820                             outNetBuf.flip();
821                             int remaining = outNetBuf.remaining();
822                             msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
823 
824                             // Transfer the bytes to the new ChannelBuffer using some safe method that will also
825                             // work with "non" heap buffers
826                             //
827                             // See https://github.com/netty/netty/issues/329
828                             msg.writeBytes(outNetBuf);
829                             outNetBuf.clear();
830 
831                             if (pendingWrite.outAppBuf.hasRemaining()) {
832                                 // pendingWrite's future shouldn't be notified if
833                                 // only partial data is written.
834                                 future = succeededFuture(channel);
835                             } else {
836                                 future = pendingWrite.future;
837                             }
838 
839                             MessageEvent encryptedWrite = new DownstreamMessageEvent(
840                                     channel, future, msg, channel.getRemoteAddress());
841                             offerEncryptedWriteRequest(encryptedWrite);
842                             offered = true;
843                         } else if (result.getStatus() == Status.CLOSED) {
844                             // SSLEngine has been closed already.
845                             // Any further write attempts should be denied.
846                             success = false;
847                             break;
848                         } else {
849                             final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
850                             handleRenegotiation(handshakeStatus);
851                             switch (handshakeStatus) {
852                             case NEED_WRAP:
853                                 if (outAppBuf.hasRemaining()) {
854                                     break;
855                                 } else {
856                                     break loop;
857                                 }
858                             case NEED_UNWRAP:
859                                 needsUnwrap = true;
860                                 break loop;
861                             case NEED_TASK:
862                                 runDelegatedTasks();
863                                 break;
864                             case FINISHED:
865                             case NOT_HANDSHAKING:
866                                 if (handshakeStatus == HandshakeStatus.FINISHED) {
867                                     setHandshakeSuccess(channel);
868                                 }
869                                 if (result.getStatus() == Status.CLOSED) {
870                                     success = false;
871                                 }
872                                 break loop;
873                             default:
874                                 throw new IllegalStateException(
875                                         "Unknown handshake status: " +
876                                         handshakeStatus);
877                             }
878                         }
879                     }
880                 }
881             }
882         } catch (SSLException e) {
883             success = false;
884             setHandshakeFailure(channel, e);
885             throw e;
886         } finally {
887             bufferPool.releaseBuffer(outNetBuf);
888 
889             if (offered) {
890                 flushPendingEncryptedWrites(context);
891             }
892 
893             if (!success) {
894                 IllegalStateException cause =
895                     new IllegalStateException("SSLEngine already closed");
896                 // Mark all remaining pending writes as failure if anything
897                 // wrong happened before the write requests are wrapped.
898                 // Please note that we do not call setFailure while a lock is
899                 // acquired, to avoid a potential dead lock.
900                 for (;;) {
901                     PendingWrite pendingWrite;
902                     synchronized (pendingUnencryptedWrites) {
903                         pendingWrite = pendingUnencryptedWrites.poll();
904                         if (pendingWrite == null) {
905                             break;
906                         }
907                     }
908 
909                     pendingWrite.future.setFailure(cause);
910                 }
911             }
912         }
913 
914         if (needsUnwrap) {
915             unwrap(context, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
916         }
917 
918         if (future == null) {
919             future = succeededFuture(channel);
920         }
921         return future;
922     }
923 
924     private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
925         final boolean locked = pendingEncryptedWritesLock.tryLock();
926         try {
927             pendingEncryptedWrites.offer(encryptedWrite);
928         } finally {
929             if (locked) {
930                 pendingEncryptedWritesLock.unlock();
931             }
932         }
933     }
934 
935     private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
936         // Avoid possible dead lock and data integrity issue
937         // which is caused by cross communication between more than one channel
938         // in the same VM.
939         if (!pendingEncryptedWritesLock.tryLock()) {
940             return;
941         }
942 
943         try {
944             MessageEvent e;
945             while ((e = pendingEncryptedWrites.poll()) != null) {
946                 ctx.sendDownstream(e);
947             }
948         } finally {
949             pendingEncryptedWritesLock.unlock();
950         }
951     }
952 
953     private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
954         ChannelFuture future = null;
955         ByteBuffer outNetBuf = bufferPool.acquireBuffer();
956 
957         SSLEngineResult result;
958         try {
959             for (;;) {
960                 synchronized (handshakeLock) {
961                     result = engine.wrap(EMPTY_BUFFER, outNetBuf);
962                 }
963 
964                 if (result.bytesProduced() > 0) {
965                     outNetBuf.flip();
966                     ChannelBuffer msg =
967                             ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
968 
969 
970                     // Transfer the bytes to the new ChannelBuffer using some safe method that will also
971                     // work with "non" heap buffers
972                     //
973                     // See https://github.com/netty/netty/issues/329
974                     msg.writeBytes(outNetBuf);
975                     outNetBuf.clear();
976 
977                     future = future(channel);
978                     future.addListener(new ChannelFutureListener() {
979                         public void operationComplete(ChannelFuture future)
980                                 throws Exception {
981                             if (future.getCause() instanceof ClosedChannelException) {
982                                 synchronized (ignoreClosedChannelExceptionLock) {
983                                     ignoreClosedChannelException ++;
984                                 }
985                             }
986                         }
987                     });
988 
989                     write(ctx, future, msg);
990                 }
991 
992                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
993                 handleRenegotiation(handshakeStatus);
994                 switch (handshakeStatus) {
995                 case FINISHED:
996                     setHandshakeSuccess(channel);
997                     runDelegatedTasks();
998                     break;
999                 case NEED_TASK:
1000                     runDelegatedTasks();
1001                     break;
1002                 case NEED_UNWRAP:
1003                     if (!Thread.holdsLock(handshakeLock)) {
1004                         // unwrap shouldn't be called when this method was
1005                         // called by unwrap - unwrap will keep running after
1006                         // this method returns.
1007                         unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
1008                     }
1009                     break;
1010                 case NOT_HANDSHAKING:
1011                 case NEED_WRAP:
1012                     break;
1013                 default:
1014                     throw new IllegalStateException(
1015                             "Unexpected handshake status: " + handshakeStatus);
1016                 }
1017 
1018                 if (result.bytesProduced() == 0) {
1019                     break;
1020                 }
1021             }
1022         } catch (SSLException e) {
1023             setHandshakeFailure(channel, e);
1024             throw e;
1025         } finally {
1026             bufferPool.releaseBuffer(outNetBuf);
1027         }
1028 
1029         if (future == null) {
1030             future = succeededFuture(channel);
1031         }
1032 
1033         return future;
1034     }
1035 
1036     private ChannelBuffer unwrap(
1037             ChannelHandlerContext ctx, Channel channel,
1038             ChannelBuffer buffer, int offset, int length) throws SSLException {
1039         ByteBuffer inNetBuf = buffer.toByteBuffer(offset, length);
1040         ByteBuffer outAppBuf = bufferPool.acquireBuffer();
1041 
1042         try {
1043             boolean needsWrap = false;
1044             loop:
1045             for (;;) {
1046                 SSLEngineResult result;
1047                 boolean needsHandshake = false;
1048                 synchronized (handshakeLock) {
1049                     if (!handshaken && !handshaking &&
1050                         !engine.getUseClientMode() &&
1051                         !engine.isInboundDone() && !engine.isOutboundDone()) {
1052                         needsHandshake = true;
1053 
1054                     }
1055                 }
1056                 if (needsHandshake) {
1057                     handshake();
1058                 }
1059 
1060                 synchronized (handshakeLock) {
1061                     result = engine.unwrap(inNetBuf, outAppBuf);
1062                 }
1063 
1064                 // notify about the CLOSED state of the SSLEngine. See #137
1065                 if (result.getStatus() == Status.CLOSED) {
1066                     sslEngineCloseFuture.setClosed();
1067                 }
1068 
1069 
1070                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1071                 handleRenegotiation(handshakeStatus);
1072                 switch (handshakeStatus) {
1073                 case NEED_UNWRAP:
1074                     if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
1075                         break;
1076                     } else {
1077                         break loop;
1078                     }
1079                 case NEED_WRAP:
1080                     wrapNonAppData(ctx, channel);
1081                     break;
1082                 case NEED_TASK:
1083                     runDelegatedTasks();
1084                     break;
1085                 case FINISHED:
1086                     setHandshakeSuccess(channel);
1087                     needsWrap = true;
1088                     break loop;
1089                 case NOT_HANDSHAKING:
1090                     needsWrap = true;
1091                     break loop;
1092                 default:
1093                     throw new IllegalStateException(
1094                             "Unknown handshake status: " + handshakeStatus);
1095                 }
1096 
1097             }
1098 
1099             if (needsWrap) {
1100                 // wrap() acquires pendingUnencryptedWrites first and then
1101                 // handshakeLock.  If handshakeLock is already hold by the
1102                 // current thread, calling wrap() will lead to a dead lock
1103                 // i.e. pendingUnencryptedWrites -> handshakeLock vs.
1104                 //      handshakeLock -> pendingUnencryptedLock -> handshakeLock
1105                 //
1106                 // There is also a same issue between pendingEncryptedWrites
1107                 // and pendingUnencryptedWrites.
1108                 if (!Thread.holdsLock(handshakeLock) &&
1109                     !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1110                     wrap(ctx, channel);
1111                 }
1112             }
1113 
1114             outAppBuf.flip();
1115 
1116             if (outAppBuf.hasRemaining()) {
1117                 ChannelBuffer frame = ctx.getChannel().getConfig().getBufferFactory().getBuffer(outAppBuf.remaining());
1118                 // Transfer the bytes to the new ChannelBuffer using some safe method that will also
1119                 // work with "non" heap buffers
1120                 //
1121                 // See https://github.com/netty/netty/issues/329
1122                 frame.writeBytes(outAppBuf);
1123 
1124                 return frame;
1125             } else {
1126                 return null;
1127             }
1128         } catch (SSLException e) {
1129             setHandshakeFailure(channel, e);
1130             throw e;
1131         } finally {
1132             bufferPool.releaseBuffer(outAppBuf);
1133         }
1134     }
1135 
1136     private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1137         if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1138             handshakeStatus == HandshakeStatus.FINISHED) {
1139             // Not handshaking
1140             return;
1141         }
1142 
1143         if (!handshaken) {
1144             // Not renegotiation
1145             return;
1146         }
1147 
1148         final boolean renegotiate;
1149         synchronized (handshakeLock) {
1150             if (handshaking) {
1151                 // Renegotiation in progress or failed already.
1152                 // i.e. Renegotiation check has been done already below.
1153                 return;
1154             }
1155 
1156             if (engine.isInboundDone() || engine.isOutboundDone()) {
1157                 // Not handshaking but closing.
1158                 return;
1159             }
1160 
1161             if (isEnableRenegotiation()) {
1162                 // Continue renegotiation.
1163                 renegotiate = true;
1164             } else {
1165                 // Do not renegotiate.
1166                 renegotiate = false;
1167                 // Prevent reentrance of this method.
1168                 handshaking = true;
1169             }
1170         }
1171 
1172         if (renegotiate) {
1173             // Renegotiate.
1174             handshake();
1175         } else {
1176             // Raise an exception.
1177             fireExceptionCaught(
1178                     ctx, new SSLException(
1179                             "renegotiation attempted by peer; " +
1180                             "closing the connection"));
1181 
1182             // Close the connection to stop renegotiation.
1183             Channels.close(ctx, succeededFuture(ctx.getChannel()));
1184         }
1185     }
1186 
1187     private void runDelegatedTasks() {
1188         for (;;) {
1189             final Runnable task;
1190             synchronized (handshakeLock) {
1191                 task = engine.getDelegatedTask();
1192             }
1193 
1194             if (task == null) {
1195                 break;
1196             }
1197 
1198             delegatedTaskExecutor.execute(new Runnable() {
1199                 public void run() {
1200                     synchronized (handshakeLock) {
1201                         task.run();
1202                     }
1203                 }
1204             });
1205         }
1206     }
1207 
1208     private void setHandshakeSuccess(Channel channel) {
1209         synchronized (handshakeLock) {
1210             handshaking = false;
1211             handshaken = true;
1212 
1213             if (handshakeFuture == null) {
1214                 handshakeFuture = future(channel);
1215             }
1216         }
1217 
1218         handshakeFuture.setSuccess();
1219     }
1220 
1221     private void setHandshakeFailure(Channel channel, SSLException cause) {
1222         synchronized (handshakeLock) {
1223             if (!handshaking) {
1224                 return;
1225             }
1226             handshaking = false;
1227             handshaken = false;
1228 
1229             if (handshakeFuture == null) {
1230                 handshakeFuture = future(channel);
1231             }
1232 
1233             // Release all resources such as internal buffers that SSLEngine
1234             // is managing.
1235 
1236             engine.closeOutbound();
1237 
1238             try {
1239                 engine.closeInbound();
1240             } catch (SSLException e) {
1241                 if (logger.isDebugEnabled()) {
1242                     logger.debug(
1243                             "SSLEngine.closeInbound() raised an exception after " +
1244                             "a handshake failure.", e);
1245                 }
1246 
1247             }
1248         }
1249 
1250         handshakeFuture.setFailure(cause);
1251         if (closeOnSSLException) {
1252             Channels.close(ctx, future(channel));
1253         }
1254     }
1255 
1256     private void closeOutboundAndChannel(
1257             final ChannelHandlerContext context, final ChannelStateEvent e) {
1258         if (!e.getChannel().isConnected()) {
1259             context.sendDownstream(e);
1260             return;
1261         }
1262 
1263         boolean success = false;
1264         try {
1265             try {
1266                 unwrap(context, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
1267             } catch (SSLException ex) {
1268                 if (logger.isDebugEnabled()) {
1269                     logger.debug("Failed to unwrap before sending a close_notify message", ex);
1270                 }
1271             }
1272 
1273             if (!engine.isInboundDone()) {
1274                 if (sentCloseNotify.compareAndSet(false, true)) {
1275                     engine.closeOutbound();
1276                     try {
1277                         ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1278                         closeNotifyFuture.addListener(
1279                                 new ClosingChannelFutureListener(context, e));
1280                         success = true;
1281                     } catch (SSLException ex) {
1282                         if (logger.isDebugEnabled()) {
1283                             logger.debug("Failed to encode a close_notify message", ex);
1284                         }
1285                     }
1286                 }
1287             } else {
1288                 success = true;
1289             }
1290         } finally {
1291             if (!success) {
1292                 context.sendDownstream(e);
1293             }
1294         }
1295     }
1296 
1297     private static final class PendingWrite {
1298         final ChannelFuture future;
1299         final ByteBuffer outAppBuf;
1300 
1301         PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1302             this.future = future;
1303             this.outAppBuf = outAppBuf;
1304         }
1305     }
1306 
1307     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1308 
1309         private final ChannelHandlerContext context;
1310         private final ChannelStateEvent e;
1311 
1312         ClosingChannelFutureListener(
1313                 ChannelHandlerContext context, ChannelStateEvent e) {
1314             this.context = context;
1315             this.e = e;
1316         }
1317 
1318         public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1319             if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1320                 Channels.close(context, e.getFuture());
1321             } else {
1322                 e.getFuture().setSuccess();
1323             }
1324         }
1325     }
1326 
1327     @Override
1328     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1329         super.beforeAdd(ctx);
1330         this.ctx = ctx;
1331     }
1332 
1333     /**
1334      * Fail all pending writes which we were not able to flush out
1335      */
1336     @Override
1337     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1338 
1339         // there is no need for synchronization here as we do not receive downstream events anymore
1340         Throwable cause = null;
1341         for (;;) {
1342             PendingWrite pw = pendingUnencryptedWrites.poll();
1343             if (pw == null) {
1344                 break;
1345             }
1346             if (cause == null) {
1347                 cause = new IOException("Unable to write data");
1348             }
1349             pw.future.setFailure(cause);
1350 
1351         }
1352 
1353         for (;;) {
1354             MessageEvent ev = pendingEncryptedWrites.poll();
1355             if (ev == null) {
1356                 break;
1357             }
1358             if (cause == null) {
1359                 cause = new IOException("Unable to write data");
1360             }
1361             ev.getFuture().setFailure(cause);
1362 
1363         }
1364 
1365         if (cause != null) {
1366             fireExceptionCaughtLater(ctx, cause);
1367         }
1368     }
1369 
1370 
1371     /**
1372      * Calls {@link #handshake()} once the {@link Channel} is connected
1373      */
1374     @Override
1375     public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1376         if (issueHandshake) {
1377             // issue and handshake and add a listener to it which will fire an exception event if
1378             // an exception was thrown while doing the handshake
1379             handshake().addListener(new ChannelFutureListener() {
1380 
1381                 public void operationComplete(ChannelFuture future) throws Exception {
1382                     if (!future.isSuccess()) {
1383                         Channels.fireExceptionCaught(future.getChannel(), future.getCause());
1384                     } else {
1385                         // Send the event upstream after the handshake was completed without an error.
1386                         //
1387                         // See https://github.com/netty/netty/issues/358
1388                         ctx.sendUpstream(e);
1389                     }
1390 
1391                 }
1392             });
1393         } else {
1394             super.channelConnected(ctx, e);
1395         }
1396     }
1397 
1398     /**
1399      * Loop over all the pending writes and fail them.
1400      *
1401      * See <a href="https://github.com/netty/netty/issues/305">#305</a> for more details.
1402      */
1403     @Override
1404     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1405         Throwable cause = null;
1406         synchronized (pendingUnencryptedWrites) {
1407             for (;;) {
1408                 PendingWrite pw = pendingUnencryptedWrites.poll();
1409                 if (pw == null) {
1410                     break;
1411                 }
1412                 if (cause == null) {
1413                     cause = new ClosedChannelException();
1414                 }
1415                 pw.future.setFailure(cause);
1416 
1417             }
1418 
1419 
1420             for (;;) {
1421                 MessageEvent ev = pendingEncryptedWrites.poll();
1422                 if (ev == null) {
1423                     break;
1424                 }
1425                 if (cause == null) {
1426                     cause = new ClosedChannelException();
1427                 }
1428                 ev.getFuture().setFailure(cause);
1429 
1430             }
1431         }
1432 
1433         if (cause != null) {
1434             fireExceptionCaught(ctx, cause);
1435         }
1436 
1437         super.channelClosed(ctx, e);
1438     }
1439 
1440     private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1441         public SSLEngineInboundCloseFuture() {
1442             super(null, true);
1443         }
1444 
1445         void setClosed() {
1446             super.setSuccess();
1447         }
1448 
1449         @Override
1450         public Channel getChannel() {
1451             if (ctx == null) {
1452                 // Maybe we should better throw an IllegalStateException() ?
1453                 return null;
1454             } else {
1455                 return ctx.getChannel();
1456             }
1457         }
1458 
1459         @Override
1460         public boolean setSuccess() {
1461             return false;
1462         }
1463 
1464         @Override
1465         public boolean setFailure(Throwable cause) {
1466             return false;
1467         }
1468     }
1469 }