View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.local;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.ConnectException;
22  
23  import org.jboss.netty.channel.AbstractChannelSink;
24  import org.jboss.netty.channel.Channel;
25  import org.jboss.netty.channel.ChannelEvent;
26  import org.jboss.netty.channel.ChannelException;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ChannelPipeline;
29  import org.jboss.netty.channel.ChannelState;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.MessageEvent;
32  import org.jboss.netty.logging.InternalLogger;
33  import org.jboss.netty.logging.InternalLoggerFactory;
34  
35  /**
36   */
37  final class LocalClientChannelSink extends AbstractChannelSink {
38  
39      private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
40  
41      LocalClientChannelSink() {
42          super();
43      }
44  
45      public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
46          if (e instanceof ChannelStateEvent) {
47              ChannelStateEvent event = (ChannelStateEvent) e;
48  
49              DefaultLocalChannel channel =
50                    (DefaultLocalChannel) event.getChannel();
51              ChannelFuture future = event.getFuture();
52              ChannelState state = event.getState();
53              Object value = event.getValue();
54              switch (state) {
55              case OPEN:
56                  if (Boolean.FALSE.equals(value)) {
57                      channel.closeNow(future);
58                  }
59                  break;
60              case BOUND:
61                  if (value != null) {
62                      bind(channel, future, (LocalAddress) value);
63                  } else {
64                      channel.closeNow(future);
65                  }
66                  break;
67              case CONNECTED:
68                  if (value != null) {
69                      connect(channel, future, (LocalAddress) value);
70                  } else {
71                      channel.closeNow(future);
72                  }
73                  break;
74              case INTEREST_OPS:
75                  // Unsupported - discard silently.
76                  future.setSuccess();
77                  break;
78              }
79          } else if (e instanceof MessageEvent) {
80              MessageEvent event = (MessageEvent) e;
81              DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
82              boolean offered = channel.writeBuffer.offer(event);
83              assert offered;
84              channel.flushWriteBuffer();
85          }
86      }
87  
88      private static void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
89          try {
90              if (!LocalChannelRegistry.register(localAddress, channel)) {
91                  throw new ChannelException("address already in use: " + localAddress);
92              }
93  
94              channel.setBound();
95              channel.localAddress = localAddress;
96              future.setSuccess();
97              fireChannelBound(channel, localAddress);
98          } catch (Throwable t) {
99              LocalChannelRegistry.unregister(localAddress);
100             future.setFailure(t);
101             fireExceptionCaught(channel, t);
102         }
103     }
104 
105     private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
106         Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
107         if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
108             future.setFailure(new ConnectException("connection refused"));
109             return;
110         }
111 
112         DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
113         ChannelPipeline pipeline;
114         try {
115             pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
116         } catch (Exception e) {
117             future.setFailure(e);
118             fireExceptionCaught(channel, e);
119             if (logger.isWarnEnabled()) {
120                 logger.warn(
121                         "Failed to initialize an accepted socket.", e);
122             }
123             return;
124         }
125 
126         future.setSuccess();
127         DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
128                 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
129         channel.pairedChannel = acceptedChannel;
130 
131         // check if the channel was bound before. See #276
132         if (!channel.isBound()) {
133             bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
134         }
135         channel.remoteAddress = serverChannel.getLocalAddress();
136         channel.setConnected();
137         fireChannelConnected(channel, serverChannel.getLocalAddress());
138 
139         acceptedChannel.localAddress = serverChannel.getLocalAddress();
140         try {
141             acceptedChannel.setBound();
142         } catch (IOException e) {
143             throw new Error(e);
144         }
145         fireChannelBound(acceptedChannel, channel.getRemoteAddress());
146         acceptedChannel.remoteAddress = channel.getLocalAddress();
147         acceptedChannel.setConnected();
148         fireChannelConnected(acceptedChannel, channel.getLocalAddress());
149 
150         // Flush something that was written in channelBound / channelConnected
151         channel.flushWriteBuffer();
152         acceptedChannel.flushWriteBuffer();
153     }
154 }