1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ClosedChannelException;
23 import java.nio.channels.DatagramChannel;
24 import java.nio.channels.SocketChannel;
25 import java.util.LinkedList;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.Executor;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.regex.Pattern;
31
32 import javax.net.ssl.SSLEngine;
33 import javax.net.ssl.SSLEngineResult;
34 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
35 import javax.net.ssl.SSLEngineResult.Status;
36 import javax.net.ssl.SSLException;
37
38 import org.jboss.netty.buffer.ChannelBuffer;
39 import org.jboss.netty.buffer.ChannelBuffers;
40 import org.jboss.netty.channel.Channel;
41 import org.jboss.netty.channel.ChannelDownstreamHandler;
42 import org.jboss.netty.channel.ChannelEvent;
43 import org.jboss.netty.channel.ChannelFuture;
44 import org.jboss.netty.channel.ChannelFutureListener;
45 import org.jboss.netty.channel.ChannelHandlerContext;
46 import org.jboss.netty.channel.ChannelPipeline;
47 import org.jboss.netty.channel.ChannelStateEvent;
48 import org.jboss.netty.channel.Channels;
49 import org.jboss.netty.channel.DefaultChannelFuture;
50 import org.jboss.netty.channel.DownstreamMessageEvent;
51 import org.jboss.netty.channel.ExceptionEvent;
52 import org.jboss.netty.channel.MessageEvent;
53 import org.jboss.netty.handler.codec.frame.FrameDecoder;
54 import org.jboss.netty.logging.InternalLogger;
55 import org.jboss.netty.logging.InternalLoggerFactory;
56 import org.jboss.netty.util.internal.DetectionUtil;
57 import org.jboss.netty.util.internal.NonReentrantLock;
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157 public class SslHandler extends FrameDecoder
158 implements ChannelDownstreamHandler {
159
160 private static final InternalLogger logger =
161 InternalLoggerFactory.getInstance(SslHandler.class);
162
163 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
164
165 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
166 "^.*(Socket|DatagramChannel|SctpChannel).*$");
167 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
168 "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
169 Pattern.CASE_INSENSITIVE);
170
171 private static SslBufferPool defaultBufferPool;
172
173
174
175
176
177 public static synchronized SslBufferPool getDefaultBufferPool() {
178 if (defaultBufferPool == null) {
179 defaultBufferPool = new SslBufferPool();
180 }
181 return defaultBufferPool;
182 }
183
184 private volatile ChannelHandlerContext ctx;
185 private final SSLEngine engine;
186 private final SslBufferPool bufferPool;
187 private final Executor delegatedTaskExecutor;
188 private final boolean startTls;
189
190 private volatile boolean enableRenegotiation = true;
191
192 final Object handshakeLock = new Object();
193 private boolean handshaking;
194 private volatile boolean handshaken;
195 private volatile ChannelFuture handshakeFuture;
196
197 private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
198 private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
199 int ignoreClosedChannelException;
200 final Object ignoreClosedChannelExceptionLock = new Object();
201 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
202 private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
203 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
204
205 private volatile boolean issueHandshake;
206
207 private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
208
209 private boolean closeOnSSLException;
210
211
212
213
214
215
216 public SslHandler(SSLEngine engine) {
217 this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
218 }
219
220
221
222
223
224
225
226
227 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
228 this(engine, bufferPool, ImmediateExecutor.INSTANCE);
229 }
230
231
232
233
234
235
236
237
238 public SslHandler(SSLEngine engine, boolean startTls) {
239 this(engine, getDefaultBufferPool(), startTls);
240 }
241
242
243
244
245
246
247
248
249
250
251 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
252 this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
253 }
254
255
256
257
258
259
260
261
262
263
264 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
265 this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
266 }
267
268
269
270
271
272
273
274
275
276
277
278
279
280 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
281 this(engine, bufferPool, false, delegatedTaskExecutor);
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295
296 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
297 this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
298 }
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
316 if (engine == null) {
317 throw new NullPointerException("engine");
318 }
319 if (bufferPool == null) {
320 throw new NullPointerException("bufferPool");
321 }
322 if (delegatedTaskExecutor == null) {
323 throw new NullPointerException("delegatedTaskExecutor");
324 }
325 this.engine = engine;
326 this.bufferPool = bufferPool;
327 this.delegatedTaskExecutor = delegatedTaskExecutor;
328 this.startTls = startTls;
329 }
330
331
332
333
334 public SSLEngine getEngine() {
335 return engine;
336 }
337
338
339
340
341
342
343
344 public ChannelFuture handshake() {
345 if (handshaken && !isEnableRenegotiation()) {
346 throw new IllegalStateException("renegotiation disabled");
347 }
348
349 ChannelHandlerContext ctx = this.ctx;
350 Channel channel = ctx.getChannel();
351 ChannelFuture handshakeFuture;
352 Exception exception = null;
353
354 synchronized (handshakeLock) {
355 if (handshaking) {
356 return this.handshakeFuture;
357 } else {
358 handshaking = true;
359 try {
360 engine.beginHandshake();
361 runDelegatedTasks();
362 handshakeFuture = this.handshakeFuture = future(channel);
363 } catch (Exception e) {
364 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
365 exception = e;
366 }
367 }
368 }
369
370 if (exception == null) {
371 try {
372 wrapNonAppData(ctx, channel);
373 } catch (SSLException e) {
374 handshakeFuture.setFailure(e);
375
376 fireExceptionCaught(ctx, e);
377 if (closeOnSSLException) {
378 Channels.close(ctx, future(channel));
379 }
380 }
381 } else {
382 fireExceptionCaught(ctx, exception);
383 if (closeOnSSLException) {
384 Channels.close(ctx, future(channel));
385 }
386 }
387
388 return handshakeFuture;
389 }
390
391
392
393
394 @Deprecated
395 public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
396 return handshake();
397 }
398
399
400
401
402
403 public ChannelFuture close() {
404 ChannelHandlerContext ctx = this.ctx;
405 Channel channel = ctx.getChannel();
406 try {
407 engine.closeOutbound();
408 return wrapNonAppData(ctx, channel);
409 } catch (SSLException e) {
410 fireExceptionCaught(ctx, e);
411 if (closeOnSSLException) {
412 Channels.close(ctx, future(channel));
413 }
414 return failedFuture(channel, e);
415 }
416 }
417
418
419
420
421 @Deprecated
422 public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
423 return close();
424 }
425
426
427
428
429 public boolean isEnableRenegotiation() {
430 return enableRenegotiation;
431 }
432
433
434
435
436 public void setEnableRenegotiation(boolean enableRenegotiation) {
437 this.enableRenegotiation = enableRenegotiation;
438 }
439
440
441
442
443
444
445 public void setIssueHandshake(boolean issueHandshake) {
446 this.issueHandshake = issueHandshake;
447 }
448
449
450
451
452 public boolean isIssueHandshake() {
453 return issueHandshake;
454 }
455
456
457
458
459
460
461
462
463
464 public ChannelFuture getSSLEngineInboundCloseFuture() {
465 return sslEngineCloseFuture;
466
467 }
468
469
470
471
472
473
474
475
476
477
478 public void setCloseOnSSLException(boolean closeOnSslException) {
479 if (ctx != null) {
480 throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
481 }
482 this.closeOnSSLException = closeOnSslException;
483 }
484
485 public boolean getCloseOnSSLException() {
486 return closeOnSSLException;
487 }
488
489 public void handleDownstream(
490 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
491 if (evt instanceof ChannelStateEvent) {
492 ChannelStateEvent e = (ChannelStateEvent) evt;
493 switch (e.getState()) {
494 case OPEN:
495 case CONNECTED:
496 case BOUND:
497 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
498 closeOutboundAndChannel(context, e);
499 return;
500 }
501 }
502 }
503 if (!(evt instanceof MessageEvent)) {
504 context.sendDownstream(evt);
505 return;
506 }
507
508 MessageEvent e = (MessageEvent) evt;
509 if (!(e.getMessage() instanceof ChannelBuffer)) {
510 context.sendDownstream(evt);
511 return;
512 }
513
514
515
516 if (startTls && sentFirstMessage.compareAndSet(false, true)) {
517 context.sendDownstream(evt);
518 return;
519 }
520
521
522 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
523 PendingWrite pendingWrite;
524
525 if (msg.readable()) {
526 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
527 } else {
528 pendingWrite = new PendingWrite(evt.getFuture(), null);
529 }
530 synchronized (pendingUnencryptedWrites) {
531 boolean offered = pendingUnencryptedWrites.offer(pendingWrite);
532 assert offered;
533 }
534
535 wrap(context, evt.getChannel());
536 }
537
538 @Override
539 public void channelDisconnected(ChannelHandlerContext ctx,
540 ChannelStateEvent e) throws Exception {
541
542
543
544 synchronized (handshakeLock) {
545 if (handshaking) {
546 handshakeFuture.setFailure(new ClosedChannelException());
547 }
548 }
549
550 try {
551 super.channelDisconnected(ctx, e);
552 } finally {
553 unwrap(ctx, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
554 engine.closeOutbound();
555 if (!sentCloseNotify.get() && handshaken) {
556 try {
557 engine.closeInbound();
558 } catch (SSLException ex) {
559 if (logger.isDebugEnabled()) {
560 logger.debug("Failed to clean up SSLEngine.", ex);
561 }
562 }
563 }
564 }
565 }
566
567 @Override
568 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
569 throws Exception {
570
571 Throwable cause = e.getCause();
572 if (cause instanceof IOException) {
573 if (cause instanceof ClosedChannelException) {
574 synchronized (ignoreClosedChannelExceptionLock) {
575 if (ignoreClosedChannelException > 0) {
576 ignoreClosedChannelException --;
577 if (logger.isDebugEnabled()) {
578 logger.debug(
579 "Swallowing an exception raised while " +
580 "writing non-app data", cause);
581 }
582
583 return;
584 }
585 }
586 } else {
587 if (ignoreException(cause)) {
588 return;
589 }
590 }
591 }
592
593 ctx.sendUpstream(e);
594 }
595
596
597
598
599
600
601
602
603
604
605 private boolean ignoreException(Throwable t) {
606 if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
607 String message = String.valueOf(t.getMessage()).toLowerCase();
608
609
610
611 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
612 return true;
613 }
614
615
616 StackTraceElement[] elements = t.getStackTrace();
617 for (StackTraceElement element: elements) {
618 String classname = element.getClassName();
619 String methodname = element.getMethodName();
620
621
622 if (classname.startsWith("org.jboss.netty.")) {
623 continue;
624 }
625
626
627 if (!methodname.equals("read")) {
628 continue;
629 }
630
631
632
633 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
634 return true;
635 }
636
637 try {
638
639
640
641 Class<?> clazz = getClass().getClassLoader().loadClass(classname);
642
643 if (SocketChannel.class.isAssignableFrom(clazz)
644 || DatagramChannel.class.isAssignableFrom(clazz)) {
645 return true;
646 }
647
648
649 if (DetectionUtil.javaVersion() >= 7
650 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
651 return true;
652 }
653 } catch (ClassNotFoundException e) {
654
655 }
656
657 }
658 }
659
660 return false;
661 }
662
663 @Override
664 protected Object decode(
665 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
666
667 if (buffer.readableBytes() < 5) {
668 return null;
669 }
670
671 int packetLength = 0;
672
673
674 boolean tls;
675 switch (buffer.getUnsignedByte(buffer.readerIndex())) {
676 case 20:
677 case 21:
678 case 22:
679 case 23:
680 tls = true;
681 break;
682 default:
683
684 tls = false;
685 }
686
687 if (tls) {
688
689 int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
690 if (majorVersion == 3) {
691
692 packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
693 if (packetLength <= 5) {
694
695 tls = false;
696 }
697 } else {
698
699 tls = false;
700 }
701 }
702
703 if (!tls) {
704
705 boolean sslv2 = true;
706 int headerLength = (buffer.getUnsignedByte(
707 buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
708 int majorVersion = buffer.getUnsignedByte(
709 buffer.readerIndex() + headerLength + 1);
710 if (majorVersion == 2 || majorVersion == 3) {
711
712 if (headerLength == 2) {
713 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
714 } else {
715 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x3FFF) + 3;
716 }
717 if (packetLength <= headerLength) {
718 sslv2 = false;
719 }
720 } else {
721 sslv2 = false;
722 }
723
724 if (!sslv2) {
725
726 NotSslRecordException e = new NotSslRecordException(
727 "not an SSL/TLS record: " + ChannelBuffers.hexDump(buffer));
728 buffer.skipBytes(buffer.readableBytes());
729 if (closeOnSSLException) {
730
731 fireExceptionCaught(ctx, e);
732 Channels.close(ctx, future(channel));
733
734
735
736 return null;
737 } else {
738 throw e;
739 }
740 }
741 }
742
743 assert packetLength > 0;
744
745 if (buffer.readableBytes() < packetLength) {
746 return null;
747 }
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763 final int packetOffset = buffer.readerIndex();
764 buffer.skipBytes(packetLength);
765 return unwrap(ctx, channel, buffer, packetOffset, packetLength);
766 }
767
768
769
770
771
772 private static short getShort(ChannelBuffer buf, int offset) {
773 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
774 }
775
776 private ChannelFuture wrap(ChannelHandlerContext context, Channel channel)
777 throws SSLException {
778
779 ChannelFuture future = null;
780 ChannelBuffer msg;
781 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
782 boolean success = true;
783 boolean offered = false;
784 boolean needsUnwrap = false;
785 try {
786 loop:
787 for (;;) {
788
789
790
791 synchronized (pendingUnencryptedWrites) {
792 PendingWrite pendingWrite = pendingUnencryptedWrites.peek();
793 if (pendingWrite == null) {
794 break;
795 }
796
797 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
798 if (outAppBuf == null) {
799
800 pendingUnencryptedWrites.remove();
801 offerEncryptedWriteRequest(
802 new DownstreamMessageEvent(
803 channel, pendingWrite.future,
804 ChannelBuffers.EMPTY_BUFFER,
805 channel.getRemoteAddress()));
806 offered = true;
807 } else {
808 SSLEngineResult result = null;
809 try {
810 synchronized (handshakeLock) {
811 result = engine.wrap(outAppBuf, outNetBuf);
812 }
813 } finally {
814 if (!outAppBuf.hasRemaining()) {
815 pendingUnencryptedWrites.remove();
816 }
817 }
818
819 if (result.bytesProduced() > 0) {
820 outNetBuf.flip();
821 int remaining = outNetBuf.remaining();
822 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
823
824
825
826
827
828 msg.writeBytes(outNetBuf);
829 outNetBuf.clear();
830
831 if (pendingWrite.outAppBuf.hasRemaining()) {
832
833
834 future = succeededFuture(channel);
835 } else {
836 future = pendingWrite.future;
837 }
838
839 MessageEvent encryptedWrite = new DownstreamMessageEvent(
840 channel, future, msg, channel.getRemoteAddress());
841 offerEncryptedWriteRequest(encryptedWrite);
842 offered = true;
843 } else if (result.getStatus() == Status.CLOSED) {
844
845
846 success = false;
847 break;
848 } else {
849 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
850 handleRenegotiation(handshakeStatus);
851 switch (handshakeStatus) {
852 case NEED_WRAP:
853 if (outAppBuf.hasRemaining()) {
854 break;
855 } else {
856 break loop;
857 }
858 case NEED_UNWRAP:
859 needsUnwrap = true;
860 break loop;
861 case NEED_TASK:
862 runDelegatedTasks();
863 break;
864 case FINISHED:
865 case NOT_HANDSHAKING:
866 if (handshakeStatus == HandshakeStatus.FINISHED) {
867 setHandshakeSuccess(channel);
868 }
869 if (result.getStatus() == Status.CLOSED) {
870 success = false;
871 }
872 break loop;
873 default:
874 throw new IllegalStateException(
875 "Unknown handshake status: " +
876 handshakeStatus);
877 }
878 }
879 }
880 }
881 }
882 } catch (SSLException e) {
883 success = false;
884 setHandshakeFailure(channel, e);
885 throw e;
886 } finally {
887 bufferPool.releaseBuffer(outNetBuf);
888
889 if (offered) {
890 flushPendingEncryptedWrites(context);
891 }
892
893 if (!success) {
894 IllegalStateException cause =
895 new IllegalStateException("SSLEngine already closed");
896
897
898
899
900 for (;;) {
901 PendingWrite pendingWrite;
902 synchronized (pendingUnencryptedWrites) {
903 pendingWrite = pendingUnencryptedWrites.poll();
904 if (pendingWrite == null) {
905 break;
906 }
907 }
908
909 pendingWrite.future.setFailure(cause);
910 }
911 }
912 }
913
914 if (needsUnwrap) {
915 unwrap(context, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
916 }
917
918 if (future == null) {
919 future = succeededFuture(channel);
920 }
921 return future;
922 }
923
924 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
925 final boolean locked = pendingEncryptedWritesLock.tryLock();
926 try {
927 pendingEncryptedWrites.offer(encryptedWrite);
928 } finally {
929 if (locked) {
930 pendingEncryptedWritesLock.unlock();
931 }
932 }
933 }
934
935 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
936
937
938
939 if (!pendingEncryptedWritesLock.tryLock()) {
940 return;
941 }
942
943 try {
944 MessageEvent e;
945 while ((e = pendingEncryptedWrites.poll()) != null) {
946 ctx.sendDownstream(e);
947 }
948 } finally {
949 pendingEncryptedWritesLock.unlock();
950 }
951 }
952
953 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
954 ChannelFuture future = null;
955 ByteBuffer outNetBuf = bufferPool.acquireBuffer();
956
957 SSLEngineResult result;
958 try {
959 for (;;) {
960 synchronized (handshakeLock) {
961 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
962 }
963
964 if (result.bytesProduced() > 0) {
965 outNetBuf.flip();
966 ChannelBuffer msg =
967 ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
968
969
970
971
972
973
974 msg.writeBytes(outNetBuf);
975 outNetBuf.clear();
976
977 future = future(channel);
978 future.addListener(new ChannelFutureListener() {
979 public void operationComplete(ChannelFuture future)
980 throws Exception {
981 if (future.getCause() instanceof ClosedChannelException) {
982 synchronized (ignoreClosedChannelExceptionLock) {
983 ignoreClosedChannelException ++;
984 }
985 }
986 }
987 });
988
989 write(ctx, future, msg);
990 }
991
992 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
993 handleRenegotiation(handshakeStatus);
994 switch (handshakeStatus) {
995 case FINISHED:
996 setHandshakeSuccess(channel);
997 runDelegatedTasks();
998 break;
999 case NEED_TASK:
1000 runDelegatedTasks();
1001 break;
1002 case NEED_UNWRAP:
1003 if (!Thread.holdsLock(handshakeLock)) {
1004
1005
1006
1007 unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
1008 }
1009 break;
1010 case NOT_HANDSHAKING:
1011 case NEED_WRAP:
1012 break;
1013 default:
1014 throw new IllegalStateException(
1015 "Unexpected handshake status: " + handshakeStatus);
1016 }
1017
1018 if (result.bytesProduced() == 0) {
1019 break;
1020 }
1021 }
1022 } catch (SSLException e) {
1023 setHandshakeFailure(channel, e);
1024 throw e;
1025 } finally {
1026 bufferPool.releaseBuffer(outNetBuf);
1027 }
1028
1029 if (future == null) {
1030 future = succeededFuture(channel);
1031 }
1032
1033 return future;
1034 }
1035
1036 private ChannelBuffer unwrap(
1037 ChannelHandlerContext ctx, Channel channel,
1038 ChannelBuffer buffer, int offset, int length) throws SSLException {
1039 ByteBuffer inNetBuf = buffer.toByteBuffer(offset, length);
1040 ByteBuffer outAppBuf = bufferPool.acquireBuffer();
1041
1042 try {
1043 boolean needsWrap = false;
1044 loop:
1045 for (;;) {
1046 SSLEngineResult result;
1047 boolean needsHandshake = false;
1048 synchronized (handshakeLock) {
1049 if (!handshaken && !handshaking &&
1050 !engine.getUseClientMode() &&
1051 !engine.isInboundDone() && !engine.isOutboundDone()) {
1052 needsHandshake = true;
1053
1054 }
1055 }
1056 if (needsHandshake) {
1057 handshake();
1058 }
1059
1060 synchronized (handshakeLock) {
1061 result = engine.unwrap(inNetBuf, outAppBuf);
1062 }
1063
1064
1065 if (result.getStatus() == Status.CLOSED) {
1066 sslEngineCloseFuture.setClosed();
1067 }
1068
1069
1070 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1071 handleRenegotiation(handshakeStatus);
1072 switch (handshakeStatus) {
1073 case NEED_UNWRAP:
1074 if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
1075 break;
1076 } else {
1077 break loop;
1078 }
1079 case NEED_WRAP:
1080 wrapNonAppData(ctx, channel);
1081 break;
1082 case NEED_TASK:
1083 runDelegatedTasks();
1084 break;
1085 case FINISHED:
1086 setHandshakeSuccess(channel);
1087 needsWrap = true;
1088 break loop;
1089 case NOT_HANDSHAKING:
1090 needsWrap = true;
1091 break loop;
1092 default:
1093 throw new IllegalStateException(
1094 "Unknown handshake status: " + handshakeStatus);
1095 }
1096
1097 }
1098
1099 if (needsWrap) {
1100
1101
1102
1103
1104
1105
1106
1107
1108 if (!Thread.holdsLock(handshakeLock) &&
1109 !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1110 wrap(ctx, channel);
1111 }
1112 }
1113
1114 outAppBuf.flip();
1115
1116 if (outAppBuf.hasRemaining()) {
1117 ChannelBuffer frame = ctx.getChannel().getConfig().getBufferFactory().getBuffer(outAppBuf.remaining());
1118
1119
1120
1121
1122 frame.writeBytes(outAppBuf);
1123
1124 return frame;
1125 } else {
1126 return null;
1127 }
1128 } catch (SSLException e) {
1129 setHandshakeFailure(channel, e);
1130 throw e;
1131 } finally {
1132 bufferPool.releaseBuffer(outAppBuf);
1133 }
1134 }
1135
1136 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1137 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1138 handshakeStatus == HandshakeStatus.FINISHED) {
1139
1140 return;
1141 }
1142
1143 if (!handshaken) {
1144
1145 return;
1146 }
1147
1148 final boolean renegotiate;
1149 synchronized (handshakeLock) {
1150 if (handshaking) {
1151
1152
1153 return;
1154 }
1155
1156 if (engine.isInboundDone() || engine.isOutboundDone()) {
1157
1158 return;
1159 }
1160
1161 if (isEnableRenegotiation()) {
1162
1163 renegotiate = true;
1164 } else {
1165
1166 renegotiate = false;
1167
1168 handshaking = true;
1169 }
1170 }
1171
1172 if (renegotiate) {
1173
1174 handshake();
1175 } else {
1176
1177 fireExceptionCaught(
1178 ctx, new SSLException(
1179 "renegotiation attempted by peer; " +
1180 "closing the connection"));
1181
1182
1183 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1184 }
1185 }
1186
1187 private void runDelegatedTasks() {
1188 for (;;) {
1189 final Runnable task;
1190 synchronized (handshakeLock) {
1191 task = engine.getDelegatedTask();
1192 }
1193
1194 if (task == null) {
1195 break;
1196 }
1197
1198 delegatedTaskExecutor.execute(new Runnable() {
1199 public void run() {
1200 synchronized (handshakeLock) {
1201 task.run();
1202 }
1203 }
1204 });
1205 }
1206 }
1207
1208 private void setHandshakeSuccess(Channel channel) {
1209 synchronized (handshakeLock) {
1210 handshaking = false;
1211 handshaken = true;
1212
1213 if (handshakeFuture == null) {
1214 handshakeFuture = future(channel);
1215 }
1216 }
1217
1218 handshakeFuture.setSuccess();
1219 }
1220
1221 private void setHandshakeFailure(Channel channel, SSLException cause) {
1222 synchronized (handshakeLock) {
1223 if (!handshaking) {
1224 return;
1225 }
1226 handshaking = false;
1227 handshaken = false;
1228
1229 if (handshakeFuture == null) {
1230 handshakeFuture = future(channel);
1231 }
1232
1233
1234
1235
1236 engine.closeOutbound();
1237
1238 try {
1239 engine.closeInbound();
1240 } catch (SSLException e) {
1241 if (logger.isDebugEnabled()) {
1242 logger.debug(
1243 "SSLEngine.closeInbound() raised an exception after " +
1244 "a handshake failure.", e);
1245 }
1246
1247 }
1248 }
1249
1250 handshakeFuture.setFailure(cause);
1251 if (closeOnSSLException) {
1252 Channels.close(ctx, future(channel));
1253 }
1254 }
1255
1256 private void closeOutboundAndChannel(
1257 final ChannelHandlerContext context, final ChannelStateEvent e) {
1258 if (!e.getChannel().isConnected()) {
1259 context.sendDownstream(e);
1260 return;
1261 }
1262
1263 boolean success = false;
1264 try {
1265 try {
1266 unwrap(context, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
1267 } catch (SSLException ex) {
1268 if (logger.isDebugEnabled()) {
1269 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1270 }
1271 }
1272
1273 if (!engine.isInboundDone()) {
1274 if (sentCloseNotify.compareAndSet(false, true)) {
1275 engine.closeOutbound();
1276 try {
1277 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1278 closeNotifyFuture.addListener(
1279 new ClosingChannelFutureListener(context, e));
1280 success = true;
1281 } catch (SSLException ex) {
1282 if (logger.isDebugEnabled()) {
1283 logger.debug("Failed to encode a close_notify message", ex);
1284 }
1285 }
1286 }
1287 } else {
1288 success = true;
1289 }
1290 } finally {
1291 if (!success) {
1292 context.sendDownstream(e);
1293 }
1294 }
1295 }
1296
1297 private static final class PendingWrite {
1298 final ChannelFuture future;
1299 final ByteBuffer outAppBuf;
1300
1301 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1302 this.future = future;
1303 this.outAppBuf = outAppBuf;
1304 }
1305 }
1306
1307 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1308
1309 private final ChannelHandlerContext context;
1310 private final ChannelStateEvent e;
1311
1312 ClosingChannelFutureListener(
1313 ChannelHandlerContext context, ChannelStateEvent e) {
1314 this.context = context;
1315 this.e = e;
1316 }
1317
1318 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1319 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1320 Channels.close(context, e.getFuture());
1321 } else {
1322 e.getFuture().setSuccess();
1323 }
1324 }
1325 }
1326
1327 @Override
1328 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1329 super.beforeAdd(ctx);
1330 this.ctx = ctx;
1331 }
1332
1333
1334
1335
1336 @Override
1337 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1338
1339
1340 Throwable cause = null;
1341 for (;;) {
1342 PendingWrite pw = pendingUnencryptedWrites.poll();
1343 if (pw == null) {
1344 break;
1345 }
1346 if (cause == null) {
1347 cause = new IOException("Unable to write data");
1348 }
1349 pw.future.setFailure(cause);
1350
1351 }
1352
1353 for (;;) {
1354 MessageEvent ev = pendingEncryptedWrites.poll();
1355 if (ev == null) {
1356 break;
1357 }
1358 if (cause == null) {
1359 cause = new IOException("Unable to write data");
1360 }
1361 ev.getFuture().setFailure(cause);
1362
1363 }
1364
1365 if (cause != null) {
1366 fireExceptionCaughtLater(ctx, cause);
1367 }
1368 }
1369
1370
1371
1372
1373
1374 @Override
1375 public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1376 if (issueHandshake) {
1377
1378
1379 handshake().addListener(new ChannelFutureListener() {
1380
1381 public void operationComplete(ChannelFuture future) throws Exception {
1382 if (!future.isSuccess()) {
1383 Channels.fireExceptionCaught(future.getChannel(), future.getCause());
1384 } else {
1385
1386
1387
1388 ctx.sendUpstream(e);
1389 }
1390
1391 }
1392 });
1393 } else {
1394 super.channelConnected(ctx, e);
1395 }
1396 }
1397
1398
1399
1400
1401
1402
1403 @Override
1404 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1405 Throwable cause = null;
1406 synchronized (pendingUnencryptedWrites) {
1407 for (;;) {
1408 PendingWrite pw = pendingUnencryptedWrites.poll();
1409 if (pw == null) {
1410 break;
1411 }
1412 if (cause == null) {
1413 cause = new ClosedChannelException();
1414 }
1415 pw.future.setFailure(cause);
1416
1417 }
1418
1419
1420 for (;;) {
1421 MessageEvent ev = pendingEncryptedWrites.poll();
1422 if (ev == null) {
1423 break;
1424 }
1425 if (cause == null) {
1426 cause = new ClosedChannelException();
1427 }
1428 ev.getFuture().setFailure(cause);
1429
1430 }
1431 }
1432
1433 if (cause != null) {
1434 fireExceptionCaught(ctx, cause);
1435 }
1436
1437 super.channelClosed(ctx, e);
1438 }
1439
1440 private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1441 public SSLEngineInboundCloseFuture() {
1442 super(null, true);
1443 }
1444
1445 void setClosed() {
1446 super.setSuccess();
1447 }
1448
1449 @Override
1450 public Channel getChannel() {
1451 if (ctx == null) {
1452
1453 return null;
1454 } else {
1455 return ctx.getChannel();
1456 }
1457 }
1458
1459 @Override
1460 public boolean setSuccess() {
1461 return false;
1462 }
1463
1464 @Override
1465 public boolean setFailure(Throwable cause) {
1466 return false;
1467 }
1468 }
1469 }