1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.buffer.ChannelBuffers;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.channel.ChannelDownstreamHandler;
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelHandlerContext;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.Channels;
30 import org.jboss.netty.channel.DefaultChannelFuture;
31 import org.jboss.netty.channel.DownstreamMessageEvent;
32 import org.jboss.netty.channel.ExceptionEvent;
33 import org.jboss.netty.channel.MessageEvent;
34 import org.jboss.netty.handler.codec.frame.FrameDecoder;
35 import org.jboss.netty.logging.InternalLogger;
36 import org.jboss.netty.logging.InternalLoggerFactory;
37 import org.jboss.netty.util.Timeout;
38 import org.jboss.netty.util.Timer;
39 import org.jboss.netty.util.TimerTask;
40 import org.jboss.netty.util.internal.DetectionUtil;
41 import org.jboss.netty.util.internal.NonReentrantLock;
42
43 import javax.net.ssl.SSLEngine;
44 import javax.net.ssl.SSLEngineResult;
45 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
46 import javax.net.ssl.SSLEngineResult.Status;
47 import javax.net.ssl.SSLException;
48 import java.io.IOException;
49 import java.nio.ByteBuffer;
50 import java.nio.channels.ClosedChannelException;
51 import java.nio.channels.DatagramChannel;
52 import java.nio.channels.SocketChannel;
53 import java.util.ArrayList;
54 import java.util.LinkedList;
55 import java.util.List;
56 import java.util.Queue;
57 import java.util.concurrent.ConcurrentLinkedQueue;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.Executor;
60 import java.util.concurrent.TimeUnit;
61 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
62 import java.util.regex.Pattern;
63
64 import static org.jboss.netty.channel.Channels.*;
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 public class SslHandler extends FrameDecoder
183 implements ChannelDownstreamHandler {
184
185 private static final InternalLogger logger = InternalLoggerFactory.getInstance(SslHandler.class);
186
187 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
188
189 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
190 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
191 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
192 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
193
194 private static SslBufferPool defaultBufferPool;
195
196
197
198
199
200 public static synchronized SslBufferPool getDefaultBufferPool() {
201 if (defaultBufferPool == null) {
202 defaultBufferPool = new SslBufferPool();
203 }
204 return defaultBufferPool;
205 }
206
207 private volatile ChannelHandlerContext ctx;
208 private final SSLEngine engine;
209 private final SslBufferPool bufferPool;
210 private final Executor delegatedTaskExecutor;
211 private final boolean startTls;
212
213 private volatile boolean enableRenegotiation = true;
214
215 final Object handshakeLock = new Object();
216 private boolean handshaking;
217 private volatile boolean handshaken;
218 private volatile ChannelFuture handshakeFuture;
219
220 @SuppressWarnings("UnusedDeclaration")
221 private volatile int sentFirstMessage;
222 @SuppressWarnings("UnusedDeclaration")
223 private volatile int sentCloseNotify;
224 @SuppressWarnings("UnusedDeclaration")
225 private volatile int closedOutboundAndChannel;
226
227 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_FIRST_MESSAGE_UPDATER =
228 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentFirstMessage");
229 private static final AtomicIntegerFieldUpdater<SslHandler> SENT_CLOSE_NOTIFY_UPDATER =
230 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentCloseNotify");
231 private static final AtomicIntegerFieldUpdater<SslHandler> CLOSED_OUTBOUND_AND_CHANNEL_UPDATER =
232 AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "closedOutboundAndChannel");
233
234 int ignoreClosedChannelException;
235 final Object ignoreClosedChannelExceptionLock = new Object();
236 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
237 private final NonReentrantLock pendingUnencryptedWritesLock = new NonReentrantLock();
238 private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
239 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
240
241 private volatile boolean issueHandshake;
242 private volatile boolean writeBeforeHandshakeDone;
243 private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
244
245 private boolean closeOnSslException;
246
247 private int packetLength;
248
249 private final Timer timer;
250 private final long handshakeTimeoutInMillis;
251 private Timeout handshakeTimeout;
252
253
254
255
256
257
258 public SslHandler(SSLEngine engine) {
259 this(engine, getDefaultBufferPool(), false, null, 0);
260 }
261
262
263
264
265
266
267
268
269 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
270 this(engine, bufferPool, false, null, 0);
271 }
272
273
274
275
276
277
278
279
280 public SslHandler(SSLEngine engine, boolean startTls) {
281 this(engine, getDefaultBufferPool(), startTls);
282 }
283
284
285
286
287
288
289
290
291
292
293 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
294 this(engine, bufferPool, startTls, null, 0);
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315 @SuppressWarnings("deprecation")
316 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls,
317 Timer timer, long handshakeTimeoutInMillis) {
318 this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE, timer, handshakeTimeoutInMillis);
319 }
320
321
322
323
324 @Deprecated
325 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
326 this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
327 }
328
329
330
331
332 @Deprecated
333 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
334 this(engine, bufferPool, false, delegatedTaskExecutor);
335 }
336
337
338
339
340 @Deprecated
341 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
342 this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
343 }
344
345
346
347
348 @Deprecated
349 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
350 this(engine, bufferPool, startTls, delegatedTaskExecutor, null, 0);
351 }
352
353
354
355
356
357 @Deprecated
358 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor,
359 Timer timer, long handshakeTimeoutInMillis) {
360 if (engine == null) {
361 throw new NullPointerException("engine");
362 }
363 if (bufferPool == null) {
364 throw new NullPointerException("bufferPool");
365 }
366 if (delegatedTaskExecutor == null) {
367 throw new NullPointerException("delegatedTaskExecutor");
368 }
369 if (timer == null && handshakeTimeoutInMillis > 0) {
370 throw new IllegalArgumentException("No Timer was given but a handshakeTimeoutInMillis, need both or none");
371 }
372
373 this.engine = engine;
374 this.bufferPool = bufferPool;
375 this.delegatedTaskExecutor = delegatedTaskExecutor;
376 this.startTls = startTls;
377 this.timer = timer;
378 this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
379 }
380
381
382
383
384 public SSLEngine getEngine() {
385 return engine;
386 }
387
388
389
390
391
392
393
394 public ChannelFuture handshake() {
395 synchronized (handshakeLock) {
396 if (handshaken && !isEnableRenegotiation()) {
397 throw new IllegalStateException("renegotiation disabled");
398 }
399
400 final ChannelHandlerContext ctx = this.ctx;
401 final Channel channel = ctx.getChannel();
402 ChannelFuture handshakeFuture;
403 Exception exception = null;
404
405 if (handshaking) {
406 return this.handshakeFuture;
407 }
408
409 handshaking = true;
410 try {
411 engine.beginHandshake();
412 runDelegatedTasks();
413 handshakeFuture = this.handshakeFuture = future(channel);
414 if (handshakeTimeoutInMillis > 0) {
415 handshakeTimeout = timer.newTimeout(new TimerTask() {
416 public void run(Timeout timeout) throws Exception {
417 ChannelFuture future = SslHandler.this.handshakeFuture;
418 if (future != null && future.isDone()) {
419 return;
420 }
421
422 setHandshakeFailure(channel, new SSLException("Handshake did not complete within " +
423 handshakeTimeoutInMillis + "ms"));
424 }
425 }, handshakeTimeoutInMillis, TimeUnit.MILLISECONDS);
426 }
427 } catch (Exception e) {
428 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
429 exception = e;
430 }
431
432 if (exception == null) {
433 try {
434 final ChannelFuture hsFuture = handshakeFuture;
435 wrapNonAppData(ctx, channel).addListener(new ChannelFutureListener() {
436 public void operationComplete(ChannelFuture future) throws Exception {
437 if (!future.isSuccess()) {
438 Throwable cause = future.getCause();
439 hsFuture.setFailure(cause);
440
441 fireExceptionCaught(ctx, cause);
442 if (closeOnSslException) {
443 Channels.close(ctx, future(channel));
444 }
445 }
446 }
447 });
448 } catch (SSLException e) {
449 handshakeFuture.setFailure(e);
450
451 fireExceptionCaught(ctx, e);
452 if (closeOnSslException) {
453 Channels.close(ctx, future(channel));
454 }
455 }
456 } else {
457 fireExceptionCaught(ctx, exception);
458 if (closeOnSslException) {
459 Channels.close(ctx, future(channel));
460 }
461 }
462 return handshakeFuture;
463 }
464 }
465
466
467
468
469 @Deprecated
470 public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
471 return handshake();
472 }
473
474
475
476
477
478 public ChannelFuture close() {
479 ChannelHandlerContext ctx = this.ctx;
480 Channel channel = ctx.getChannel();
481 try {
482 engine.closeOutbound();
483 return wrapNonAppData(ctx, channel);
484 } catch (SSLException e) {
485 fireExceptionCaught(ctx, e);
486 if (closeOnSslException) {
487 Channels.close(ctx, future(channel));
488 }
489 return failedFuture(channel, e);
490 }
491 }
492
493
494
495
496 @Deprecated
497 public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
498 return close();
499 }
500
501
502
503
504 public boolean isEnableRenegotiation() {
505 return enableRenegotiation;
506 }
507
508
509
510
511 public void setEnableRenegotiation(boolean enableRenegotiation) {
512 this.enableRenegotiation = enableRenegotiation;
513 }
514
515
516
517
518
519
520 public void setIssueHandshake(boolean issueHandshake) {
521 this.issueHandshake = issueHandshake;
522 }
523
524
525
526
527 public boolean isIssueHandshake() {
528 return issueHandshake;
529 }
530
531
532
533
534
535
536
537
538
539 public ChannelFuture getSSLEngineInboundCloseFuture() {
540 return sslEngineCloseFuture;
541 }
542
543
544
545
546
547 public long getHandshakeTimeout() {
548 return handshakeTimeoutInMillis;
549 }
550
551
552
553
554
555
556
557
558
559
560 public void setCloseOnSSLException(boolean closeOnSslException) {
561 if (ctx != null) {
562 throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
563 }
564 this.closeOnSslException = closeOnSslException;
565 }
566
567 public boolean getCloseOnSSLException() {
568 return closeOnSslException;
569 }
570
571 public void handleDownstream(
572 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
573 if (evt instanceof ChannelStateEvent) {
574 ChannelStateEvent e = (ChannelStateEvent) evt;
575 switch (e.getState()) {
576 case OPEN:
577 case CONNECTED:
578 case BOUND:
579 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
580 closeOutboundAndChannel(context, e);
581 return;
582 }
583 }
584 }
585 if (!(evt instanceof MessageEvent)) {
586 context.sendDownstream(evt);
587 return;
588 }
589
590 MessageEvent e = (MessageEvent) evt;
591 if (!(e.getMessage() instanceof ChannelBuffer)) {
592 context.sendDownstream(evt);
593 return;
594 }
595
596
597
598 if (startTls && SENT_FIRST_MESSAGE_UPDATER.compareAndSet(this, 0, 1)) {
599 context.sendDownstream(evt);
600 return;
601 }
602
603
604 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
605 PendingWrite pendingWrite;
606
607 if (msg.readable()) {
608 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
609 } else {
610 pendingWrite = new PendingWrite(evt.getFuture(), null);
611 }
612
613 pendingUnencryptedWritesLock.lock();
614 try {
615 pendingUnencryptedWrites.add(pendingWrite);
616 } finally {
617 pendingUnencryptedWritesLock.unlock();
618 }
619
620 if (handshakeFuture == null || !handshakeFuture.isDone()) {
621 writeBeforeHandshakeDone = true;
622 }
623 wrap(context, evt.getChannel());
624 }
625
626 private void cancelHandshakeTimeout() {
627 if (handshakeTimeout != null) {
628
629 handshakeTimeout.cancel();
630 }
631 }
632
633 @Override
634 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
635
636
637
638 synchronized (handshakeLock) {
639 if (handshaking) {
640 cancelHandshakeTimeout();
641 handshakeFuture.setFailure(new ClosedChannelException());
642 }
643 }
644
645 try {
646 super.channelDisconnected(ctx, e);
647 } finally {
648 unwrapNonAppData(ctx, e.getChannel());
649 closeEngine();
650 }
651 }
652
653 private void closeEngine() {
654 engine.closeOutbound();
655 if (sentCloseNotify == 0 && handshaken) {
656 try {
657 engine.closeInbound();
658 } catch (SSLException ex) {
659 if (logger.isDebugEnabled()) {
660 logger.debug("Failed to clean up SSLEngine.", ex);
661 }
662 }
663 }
664 }
665
666 @Override
667 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
668 throws Exception {
669
670 Throwable cause = e.getCause();
671 if (cause instanceof IOException) {
672 if (cause instanceof ClosedChannelException) {
673 synchronized (ignoreClosedChannelExceptionLock) {
674 if (ignoreClosedChannelException > 0) {
675 ignoreClosedChannelException --;
676 if (logger.isDebugEnabled()) {
677 logger.debug(
678 "Swallowing an exception raised while " +
679 "writing non-app data", cause);
680 }
681
682 return;
683 }
684 }
685 } else {
686 if (ignoreException(cause)) {
687 return;
688 }
689 }
690 }
691
692 ctx.sendUpstream(e);
693 }
694
695
696
697
698
699
700
701
702
703
704 private boolean ignoreException(Throwable t) {
705 if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
706 String message = String.valueOf(t.getMessage()).toLowerCase();
707
708
709
710 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
711 return true;
712 }
713
714
715 StackTraceElement[] elements = t.getStackTrace();
716 for (StackTraceElement element: elements) {
717 String classname = element.getClassName();
718 String methodname = element.getMethodName();
719
720
721 if (classname.startsWith("org.jboss.netty.")) {
722 continue;
723 }
724
725
726 if (!"read".equals(methodname)) {
727 continue;
728 }
729
730
731
732 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
733 return true;
734 }
735
736 try {
737
738
739
740 Class<?> clazz = getClass().getClassLoader().loadClass(classname);
741
742 if (SocketChannel.class.isAssignableFrom(clazz)
743 || DatagramChannel.class.isAssignableFrom(clazz)) {
744 return true;
745 }
746
747
748 if (DetectionUtil.javaVersion() >= 7
749 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
750 return true;
751 }
752 } catch (ClassNotFoundException e) {
753
754 }
755 }
756 }
757
758 return false;
759 }
760
761
762
763
764
765
766
767
768
769
770
771
772
773 public static boolean isEncrypted(ChannelBuffer buffer) {
774 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != -1;
775 }
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790 private static int getEncryptedPacketLength(ChannelBuffer buffer, int offset) {
791 int packetLength = 0;
792
793
794 boolean tls;
795 switch (buffer.getUnsignedByte(offset)) {
796 case 20:
797 case 21:
798 case 22:
799 case 23:
800 tls = true;
801 break;
802 default:
803
804 tls = false;
805 }
806
807 if (tls) {
808
809 int majorVersion = buffer.getUnsignedByte(offset + 1);
810 if (majorVersion == 3) {
811
812 packetLength = (getShort(buffer, offset + 3) & 0xFFFF) + 5;
813 if (packetLength <= 5) {
814
815 tls = false;
816 }
817 } else {
818
819 tls = false;
820 }
821 }
822
823 if (!tls) {
824
825 boolean sslv2 = true;
826 int headerLength = (buffer.getUnsignedByte(offset) & 0x80) != 0 ? 2 : 3;
827 int majorVersion = buffer.getUnsignedByte(offset + headerLength + 1);
828 if (majorVersion == 2 || majorVersion == 3) {
829
830 if (headerLength == 2) {
831 packetLength = (getShort(buffer, offset) & 0x7FFF) + 2;
832 } else {
833 packetLength = (getShort(buffer, offset) & 0x3FFF) + 3;
834 }
835 if (packetLength <= headerLength) {
836 sslv2 = false;
837 }
838 } else {
839 sslv2 = false;
840 }
841
842 if (!sslv2) {
843 return -1;
844 }
845 }
846 return packetLength;
847 }
848
849 @Override
850 protected Object decode(
851 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception {
852
853 final int startOffset = in.readerIndex();
854 final int endOffset = in.writerIndex();
855 int offset = startOffset;
856 int totalLength = 0;
857
858
859 if (packetLength > 0) {
860 if (endOffset - startOffset < packetLength) {
861 return null;
862 } else {
863 offset += packetLength;
864 totalLength = packetLength;
865 packetLength = 0;
866 }
867 }
868
869 boolean nonSslRecord = false;
870
871 while (totalLength < OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
872 final int readableBytes = endOffset - offset;
873 if (readableBytes < 5) {
874 break;
875 }
876
877 final int packetLength = getEncryptedPacketLength(in, offset);
878 if (packetLength == -1) {
879 nonSslRecord = true;
880 break;
881 }
882
883 assert packetLength > 0;
884
885 if (packetLength > readableBytes) {
886
887 this.packetLength = packetLength;
888 break;
889 }
890
891 int newTotalLength = totalLength + packetLength;
892 if (newTotalLength > OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
893
894 break;
895 }
896
897
898
899 offset += packetLength;
900 totalLength = newTotalLength;
901 }
902
903 ChannelBuffer unwrapped = null;
904 if (totalLength > 0) {
905
906
907
908
909
910
911
912
913
914
915
916 final ByteBuffer inNetBuf = in.toByteBuffer(in.readerIndex(), totalLength);
917 unwrapped = unwrap(ctx, channel, in, inNetBuf, totalLength);
918 assert !inNetBuf.hasRemaining() || engine.isInboundDone();
919 }
920
921 if (nonSslRecord) {
922
923 NotSslRecordException e = new NotSslRecordException(
924 "not an SSL/TLS record: " + ChannelBuffers.hexDump(in));
925 in.skipBytes(in.readableBytes());
926 if (closeOnSslException) {
927
928 fireExceptionCaught(ctx, e);
929 Channels.close(ctx, future(channel));
930
931
932
933 return null;
934 } else {
935 throw e;
936 }
937 }
938
939 return unwrapped;
940 }
941
942
943
944
945
946 private static short getShort(ChannelBuffer buf, int offset) {
947 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
948 }
949
950 private void wrap(ChannelHandlerContext context, Channel channel) throws SSLException {
951 ChannelBuffer msg;
952 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
953 boolean success = true;
954 boolean offered = false;
955 boolean needsUnwrap = false;
956 PendingWrite pendingWrite = null;
957
958 try {
959 loop:
960 for (;;) {
961
962
963
964 pendingUnencryptedWritesLock.lock();
965 try {
966 pendingWrite = pendingUnencryptedWrites.peek();
967 if (pendingWrite == null) {
968 break;
969 }
970
971 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
972 if (outAppBuf == null) {
973
974 pendingUnencryptedWrites.remove();
975 offerEncryptedWriteRequest(
976 new DownstreamMessageEvent(
977 channel, pendingWrite.future,
978 ChannelBuffers.EMPTY_BUFFER,
979 channel.getRemoteAddress()));
980 offered = true;
981 } else {
982 synchronized (handshakeLock) {
983 SSLEngineResult result = null;
984 try {
985 result = engine.wrap(outAppBuf, outNetBuf);
986 } finally {
987 if (!outAppBuf.hasRemaining()) {
988 pendingUnencryptedWrites.remove();
989 }
990 }
991
992 if (result.bytesProduced() > 0) {
993 outNetBuf.flip();
994 int remaining = outNetBuf.remaining();
995 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
996
997
998
999
1000
1001 msg.writeBytes(outNetBuf);
1002 outNetBuf.clear();
1003
1004 ChannelFuture future;
1005 if (pendingWrite.outAppBuf.hasRemaining()) {
1006
1007
1008 future = succeededFuture(channel);
1009 } else {
1010 future = pendingWrite.future;
1011 }
1012
1013 MessageEvent encryptedWrite = new DownstreamMessageEvent(
1014 channel, future, msg, channel.getRemoteAddress());
1015 offerEncryptedWriteRequest(encryptedWrite);
1016 offered = true;
1017 } else if (result.getStatus() == Status.CLOSED) {
1018
1019
1020 success = false;
1021 break;
1022 } else {
1023 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1024 handleRenegotiation(handshakeStatus);
1025 switch (handshakeStatus) {
1026 case NEED_WRAP:
1027 if (outAppBuf.hasRemaining()) {
1028 break;
1029 } else {
1030 break loop;
1031 }
1032 case NEED_UNWRAP:
1033 needsUnwrap = true;
1034 break loop;
1035 case NEED_TASK:
1036 runDelegatedTasks();
1037 break;
1038 case FINISHED:
1039 setHandshakeSuccess(channel);
1040 if (result.getStatus() == Status.CLOSED) {
1041 success = false;
1042 }
1043 break loop;
1044 case NOT_HANDSHAKING:
1045 setHandshakeSuccessIfStillHandshaking(channel);
1046 if (result.getStatus() == Status.CLOSED) {
1047 success = false;
1048 }
1049 break loop;
1050 default:
1051 throw new IllegalStateException(
1052 "Unknown handshake status: " +
1053 handshakeStatus);
1054 }
1055 }
1056 }
1057 }
1058 } finally {
1059 pendingUnencryptedWritesLock.unlock();
1060 }
1061 }
1062 } catch (SSLException e) {
1063 success = false;
1064 setHandshakeFailure(channel, e);
1065 throw e;
1066 } finally {
1067 bufferPool.releaseBuffer(outNetBuf);
1068
1069 if (offered) {
1070 flushPendingEncryptedWrites(context);
1071 }
1072
1073 if (!success) {
1074 IllegalStateException cause =
1075 new IllegalStateException("SSLEngine already closed");
1076
1077
1078
1079 if (pendingWrite != null) {
1080 pendingWrite.future.setFailure(cause);
1081 }
1082
1083
1084
1085
1086
1087 for (;;) {
1088 pendingUnencryptedWritesLock.lock();
1089 try {
1090 pendingWrite = pendingUnencryptedWrites.poll();
1091 if (pendingWrite == null) {
1092 break;
1093 }
1094 } finally {
1095 pendingUnencryptedWritesLock.unlock();
1096 }
1097
1098 pendingWrite.future.setFailure(cause);
1099 }
1100 }
1101 }
1102
1103 if (needsUnwrap) {
1104 unwrapNonAppData(ctx, channel);
1105 }
1106 }
1107
1108 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
1109 final boolean locked = pendingEncryptedWritesLock.tryLock();
1110 try {
1111 pendingEncryptedWrites.add(encryptedWrite);
1112 } finally {
1113 if (locked) {
1114 pendingEncryptedWritesLock.unlock();
1115 }
1116 }
1117 }
1118
1119 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
1120 while (!pendingEncryptedWrites.isEmpty()) {
1121
1122
1123
1124 if (!pendingEncryptedWritesLock.tryLock()) {
1125 return;
1126 }
1127
1128 try {
1129 MessageEvent e;
1130 while ((e = pendingEncryptedWrites.poll()) != null) {
1131 ctx.sendDownstream(e);
1132 }
1133 } finally {
1134 pendingEncryptedWritesLock.unlock();
1135 }
1136
1137
1138 }
1139 }
1140
1141 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1142 ChannelFuture future = null;
1143 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1144
1145 SSLEngineResult result;
1146 try {
1147 for (;;) {
1148 synchronized (handshakeLock) {
1149 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1150 }
1151
1152 if (result.bytesProduced() > 0) {
1153 outNetBuf.flip();
1154 ChannelBuffer msg =
1155 ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1156
1157
1158
1159
1160
1161 msg.writeBytes(outNetBuf);
1162 outNetBuf.clear();
1163
1164 future = future(channel);
1165 future.addListener(new ChannelFutureListener() {
1166 public void operationComplete(ChannelFuture future)
1167 throws Exception {
1168 if (future.getCause() instanceof ClosedChannelException) {
1169 synchronized (ignoreClosedChannelExceptionLock) {
1170 ignoreClosedChannelException ++;
1171 }
1172 }
1173 }
1174 });
1175
1176 write(ctx, future, msg);
1177 }
1178
1179 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1180 handleRenegotiation(handshakeStatus);
1181 switch (handshakeStatus) {
1182 case FINISHED:
1183 setHandshakeSuccess(channel);
1184 runDelegatedTasks();
1185 break;
1186 case NEED_TASK:
1187 runDelegatedTasks();
1188 break;
1189 case NEED_UNWRAP:
1190 if (!Thread.holdsLock(handshakeLock)) {
1191
1192
1193
1194 unwrapNonAppData(ctx, channel);
1195 }
1196 break;
1197 case NOT_HANDSHAKING:
1198 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1199 runDelegatedTasks();
1200 }
1201 break;
1202 case NEED_WRAP:
1203 break;
1204 default:
1205 throw new IllegalStateException(
1206 "Unexpected handshake status: " + handshakeStatus);
1207 }
1208
1209 if (result.bytesProduced() == 0) {
1210 break;
1211 }
1212 }
1213 } catch (SSLException e) {
1214 setHandshakeFailure(channel, e);
1215 throw e;
1216 } finally {
1217 bufferPool.releaseBuffer(outNetBuf);
1218 }
1219
1220 if (future == null) {
1221 future = succeededFuture(channel);
1222 }
1223
1224 return future;
1225 }
1226
1227
1228
1229
1230 private void unwrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1231 unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, EMPTY_BUFFER, -1);
1232 }
1233
1234
1235
1236
1237 private ChannelBuffer unwrap(
1238 ChannelHandlerContext ctx, Channel channel,
1239 ChannelBuffer nettyInNetBuf, ByteBuffer nioInNetBuf,
1240 int initialNettyOutAppBufCapacity) throws SSLException {
1241
1242 final int nettyInNetBufStartOffset = nettyInNetBuf.readerIndex();
1243 final int nioInNetBufStartOffset = nioInNetBuf.position();
1244 final ByteBuffer nioOutAppBuf = bufferPool.acquireBuffer();
1245
1246 ChannelBuffer nettyOutAppBuf = null;
1247
1248 try {
1249 boolean needsWrap = false;
1250 for (;;) {
1251 SSLEngineResult result;
1252 boolean needsHandshake = false;
1253 synchronized (handshakeLock) {
1254 if (!handshaken && !handshaking &&
1255 !engine.getUseClientMode() &&
1256 !engine.isInboundDone() && !engine.isOutboundDone()) {
1257 needsHandshake = true;
1258 }
1259 }
1260
1261 if (needsHandshake) {
1262 handshake();
1263 }
1264
1265 synchronized (handshakeLock) {
1266
1267
1268
1269
1270 for (;;) {
1271 final int outAppBufSize = engine.getSession().getApplicationBufferSize();
1272 final ByteBuffer outAppBuf;
1273 if (nioOutAppBuf.capacity() < outAppBufSize) {
1274
1275
1276 outAppBuf = ByteBuffer.allocate(outAppBufSize);
1277 } else {
1278 outAppBuf = nioOutAppBuf;
1279 }
1280
1281 try {
1282 result = engine.unwrap(nioInNetBuf, outAppBuf);
1283 switch (result.getStatus()) {
1284 case CLOSED:
1285
1286 sslEngineCloseFuture.setClosed();
1287 break;
1288 case BUFFER_OVERFLOW:
1289
1290
1291 continue;
1292 }
1293
1294 break;
1295 } finally {
1296 outAppBuf.flip();
1297
1298
1299 nettyInNetBuf.readerIndex(
1300 nettyInNetBufStartOffset + nioInNetBuf.position() - nioInNetBufStartOffset);
1301
1302
1303 if (outAppBuf.hasRemaining()) {
1304 if (nettyOutAppBuf == null) {
1305 ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
1306 nettyOutAppBuf = factory.getBuffer(initialNettyOutAppBufCapacity);
1307 }
1308 nettyOutAppBuf.writeBytes(outAppBuf);
1309 }
1310 outAppBuf.clear();
1311 }
1312 }
1313
1314 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1315 handleRenegotiation(handshakeStatus);
1316 switch (handshakeStatus) {
1317 case NEED_UNWRAP:
1318 break;
1319 case NEED_WRAP:
1320 wrapNonAppData(ctx, channel);
1321 break;
1322 case NEED_TASK:
1323 runDelegatedTasks();
1324 break;
1325 case FINISHED:
1326 setHandshakeSuccess(channel);
1327 needsWrap = true;
1328 continue;
1329 case NOT_HANDSHAKING:
1330 if (setHandshakeSuccessIfStillHandshaking(channel)) {
1331 needsWrap = true;
1332 continue;
1333 }
1334 if (writeBeforeHandshakeDone) {
1335
1336
1337
1338 writeBeforeHandshakeDone = false;
1339 needsWrap = true;
1340 }
1341 break;
1342 default:
1343 throw new IllegalStateException(
1344 "Unknown handshake status: " + handshakeStatus);
1345 }
1346
1347 if (result.getStatus() == Status.BUFFER_UNDERFLOW ||
1348 result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
1349 break;
1350 }
1351 }
1352 }
1353
1354 if (needsWrap) {
1355
1356
1357
1358
1359
1360
1361
1362
1363 if (!Thread.holdsLock(handshakeLock) && !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1364 wrap(ctx, channel);
1365 }
1366 }
1367 } catch (SSLException e) {
1368 setHandshakeFailure(channel, e);
1369 throw e;
1370 } finally {
1371 bufferPool.releaseBuffer(nioOutAppBuf);
1372 }
1373
1374 if (nettyOutAppBuf != null && nettyOutAppBuf.readable()) {
1375 return nettyOutAppBuf;
1376 } else {
1377 return null;
1378 }
1379 }
1380
1381 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1382 synchronized (handshakeLock) {
1383 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1384 handshakeStatus == HandshakeStatus.FINISHED) {
1385
1386 return;
1387 }
1388
1389 if (!handshaken) {
1390
1391 return;
1392 }
1393
1394 final boolean renegotiate;
1395 if (handshaking) {
1396
1397
1398 return;
1399 }
1400
1401 if (engine.isInboundDone() || engine.isOutboundDone()) {
1402
1403 return;
1404 }
1405
1406 if (isEnableRenegotiation()) {
1407
1408 renegotiate = true;
1409 } else {
1410
1411 renegotiate = false;
1412
1413 handshaking = true;
1414 }
1415
1416 if (renegotiate) {
1417
1418 handshake();
1419 } else {
1420
1421 fireExceptionCaught(
1422 ctx, new SSLException(
1423 "renegotiation attempted by peer; " +
1424 "closing the connection"));
1425
1426
1427 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1428 }
1429 }
1430 }
1431
1432
1433
1434
1435
1436
1437
1438 private void runDelegatedTasks() {
1439 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) {
1440 for (;;) {
1441 final Runnable task;
1442 synchronized (handshakeLock) {
1443 task = engine.getDelegatedTask();
1444 }
1445
1446 if (task == null) {
1447 break;
1448 }
1449
1450 delegatedTaskExecutor.execute(task);
1451 }
1452 } else {
1453 final List<Runnable> tasks = new ArrayList<Runnable>(2);
1454 for (;;) {
1455 final Runnable task;
1456 synchronized (handshakeLock) {
1457 task = engine.getDelegatedTask();
1458 }
1459
1460 if (task == null) {
1461 break;
1462 }
1463
1464 tasks.add(task);
1465 }
1466
1467 if (tasks.isEmpty()) {
1468 return;
1469 }
1470
1471 final CountDownLatch latch = new CountDownLatch(1);
1472 delegatedTaskExecutor.execute(new Runnable() {
1473 public void run() {
1474 try {
1475 for (Runnable task: tasks) {
1476 task.run();
1477 }
1478 } catch (Exception e) {
1479 fireExceptionCaught(ctx, e);
1480 } finally {
1481 latch.countDown();
1482 }
1483 }
1484 });
1485
1486 boolean interrupted = false;
1487 while (latch.getCount() != 0) {
1488 try {
1489 latch.await();
1490 } catch (InterruptedException e) {
1491
1492 interrupted = true;
1493 }
1494 }
1495
1496 if (interrupted) {
1497 Thread.currentThread().interrupt();
1498 }
1499 }
1500 }
1501
1502
1503
1504
1505
1506
1507
1508
1509 private boolean setHandshakeSuccessIfStillHandshaking(Channel channel) {
1510 if (handshaking && !handshakeFuture.isDone()) {
1511 setHandshakeSuccess(channel);
1512 return true;
1513 }
1514 return false;
1515 }
1516
1517 private void setHandshakeSuccess(Channel channel) {
1518 synchronized (handshakeLock) {
1519 handshaking = false;
1520 handshaken = true;
1521
1522 if (handshakeFuture == null) {
1523 handshakeFuture = future(channel);
1524 }
1525 cancelHandshakeTimeout();
1526 }
1527
1528 if (logger.isDebugEnabled()) {
1529 logger.debug(channel + " HANDSHAKEN: " + engine.getSession().getCipherSuite());
1530 }
1531
1532 handshakeFuture.setSuccess();
1533 }
1534
1535 private void setHandshakeFailure(Channel channel, SSLException cause) {
1536 synchronized (handshakeLock) {
1537 if (!handshaking) {
1538 return;
1539 }
1540 handshaking = false;
1541 handshaken = false;
1542
1543 if (handshakeFuture == null) {
1544 handshakeFuture = future(channel);
1545 }
1546
1547
1548 cancelHandshakeTimeout();
1549
1550
1551
1552
1553 engine.closeOutbound();
1554
1555 try {
1556 engine.closeInbound();
1557 } catch (SSLException e) {
1558 if (logger.isDebugEnabled()) {
1559 logger.debug(
1560 "SSLEngine.closeInbound() raised an exception after " +
1561 "a handshake failure.", e);
1562 }
1563 }
1564 }
1565
1566 handshakeFuture.setFailure(cause);
1567 if (closeOnSslException) {
1568 Channels.close(ctx, future(channel));
1569 }
1570 }
1571
1572 private void closeOutboundAndChannel(
1573 final ChannelHandlerContext context, final ChannelStateEvent e) {
1574 if (!e.getChannel().isConnected()) {
1575 context.sendDownstream(e);
1576 return;
1577 }
1578
1579
1580 if (!CLOSED_OUTBOUND_AND_CHANNEL_UPDATER.compareAndSet(this, 0, 1)) {
1581
1582
1583
1584 e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
1585 public void operationComplete(ChannelFuture future) throws Exception {
1586 context.sendDownstream(e);
1587 }
1588 });
1589 return;
1590 }
1591
1592 boolean passthrough = true;
1593 try {
1594 try {
1595 unwrapNonAppData(ctx, e.getChannel());
1596 } catch (SSLException ex) {
1597 if (logger.isDebugEnabled()) {
1598 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1599 }
1600 }
1601
1602 if (!engine.isOutboundDone()) {
1603 if (SENT_CLOSE_NOTIFY_UPDATER.compareAndSet(this, 0, 1)) {
1604 engine.closeOutbound();
1605 try {
1606 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1607 closeNotifyFuture.addListener(
1608 new ClosingChannelFutureListener(context, e));
1609 passthrough = false;
1610 } catch (SSLException ex) {
1611 if (logger.isDebugEnabled()) {
1612 logger.debug("Failed to encode a close_notify message", ex);
1613 }
1614 }
1615 }
1616 }
1617 } finally {
1618 if (passthrough) {
1619 context.sendDownstream(e);
1620 }
1621 }
1622 }
1623
1624 private static final class PendingWrite {
1625 final ChannelFuture future;
1626 final ByteBuffer outAppBuf;
1627
1628 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1629 this.future = future;
1630 this.outAppBuf = outAppBuf;
1631 }
1632 }
1633
1634 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1635
1636 private final ChannelHandlerContext context;
1637 private final ChannelStateEvent e;
1638
1639 ClosingChannelFutureListener(
1640 ChannelHandlerContext context, ChannelStateEvent e) {
1641 this.context = context;
1642 this.e = e;
1643 }
1644
1645 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1646 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1647 Channels.close(context, e.getFuture());
1648 } else {
1649 e.getFuture().setSuccess();
1650 }
1651 }
1652 }
1653
1654 @Override
1655 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1656 super.beforeAdd(ctx);
1657 this.ctx = ctx;
1658 }
1659
1660
1661
1662
1663 @Override
1664 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1665 closeEngine();
1666
1667
1668 Throwable cause = null;
1669 for (;;) {
1670 PendingWrite pw = pendingUnencryptedWrites.poll();
1671 if (pw == null) {
1672 break;
1673 }
1674 if (cause == null) {
1675 cause = new IOException("Unable to write data");
1676 }
1677 pw.future.setFailure(cause);
1678 }
1679
1680 for (;;) {
1681 MessageEvent ev = pendingEncryptedWrites.poll();
1682 if (ev == null) {
1683 break;
1684 }
1685 if (cause == null) {
1686 cause = new IOException("Unable to write data");
1687 }
1688 ev.getFuture().setFailure(cause);
1689 }
1690
1691 if (cause != null) {
1692 fireExceptionCaughtLater(ctx, cause);
1693 }
1694 }
1695
1696
1697
1698
1699 @Override
1700 public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1701 if (issueHandshake) {
1702
1703
1704 handshake().addListener(new ChannelFutureListener() {
1705
1706 public void operationComplete(ChannelFuture future) throws Exception {
1707 if (future.isSuccess()) {
1708
1709
1710
1711 ctx.sendUpstream(e);
1712 }
1713 }
1714 });
1715 } else {
1716 super.channelConnected(ctx, e);
1717 }
1718 }
1719
1720
1721
1722
1723
1724
1725 @Override
1726 public void channelClosed(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1727
1728
1729 ctx.getPipeline().execute(new Runnable() {
1730 public void run() {
1731 if (!pendingUnencryptedWritesLock.tryLock()) {
1732 return;
1733 }
1734
1735 Throwable cause = null;
1736 try {
1737 for (;;) {
1738 PendingWrite pw = pendingUnencryptedWrites.poll();
1739 if (pw == null) {
1740 break;
1741 }
1742 if (cause == null) {
1743 cause = new ClosedChannelException();
1744 }
1745 pw.future.setFailure(cause);
1746 }
1747
1748 for (;;) {
1749 MessageEvent ev = pendingEncryptedWrites.poll();
1750 if (ev == null) {
1751 break;
1752 }
1753 if (cause == null) {
1754 cause = new ClosedChannelException();
1755 }
1756 ev.getFuture().setFailure(cause);
1757 }
1758 } finally {
1759 pendingUnencryptedWritesLock.unlock();
1760 }
1761
1762 if (cause != null) {
1763 fireExceptionCaught(ctx, cause);
1764 }
1765 }
1766 });
1767
1768 super.channelClosed(ctx, e);
1769 }
1770
1771 private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1772 SSLEngineInboundCloseFuture() {
1773 super(null, true);
1774 }
1775
1776 void setClosed() {
1777 super.setSuccess();
1778 }
1779
1780 @Override
1781 public Channel getChannel() {
1782 if (ctx == null) {
1783
1784 return null;
1785 } else {
1786 return ctx.getChannel();
1787 }
1788 }
1789
1790 @Override
1791 public boolean setSuccess() {
1792 return false;
1793 }
1794
1795 @Override
1796 public boolean setFailure(Throwable cause) {
1797 return false;
1798 }
1799 }
1800 }