1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.local;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.ConnectException;
22
23 import org.jboss.netty.channel.AbstractChannelSink;
24 import org.jboss.netty.channel.Channel;
25 import org.jboss.netty.channel.ChannelEvent;
26 import org.jboss.netty.channel.ChannelException;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ChannelPipeline;
29 import org.jboss.netty.channel.ChannelState;
30 import org.jboss.netty.channel.ChannelStateEvent;
31 import org.jboss.netty.channel.MessageEvent;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34
35
36
37 final class LocalClientChannelSink extends AbstractChannelSink {
38
39 private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalClientChannelSink.class);
40
41 LocalClientChannelSink() {
42 super();
43 }
44
45 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
46 if (e instanceof ChannelStateEvent) {
47 ChannelStateEvent event = (ChannelStateEvent) e;
48
49 DefaultLocalChannel channel =
50 (DefaultLocalChannel) event.getChannel();
51 ChannelFuture future = event.getFuture();
52 ChannelState state = event.getState();
53 Object value = event.getValue();
54 switch (state) {
55 case OPEN:
56 if (Boolean.FALSE.equals(value)) {
57 channel.closeNow(future);
58 }
59 break;
60 case BOUND:
61 if (value != null) {
62 bind(channel, future, (LocalAddress) value);
63 } else {
64 channel.closeNow(future);
65 }
66 break;
67 case CONNECTED:
68 if (value != null) {
69 connect(channel, future, (LocalAddress) value);
70 } else {
71 channel.closeNow(future);
72 }
73 break;
74 case INTEREST_OPS:
75
76 future.setSuccess();
77 break;
78 }
79 } else if (e instanceof MessageEvent) {
80 MessageEvent event = (MessageEvent) e;
81 DefaultLocalChannel channel = (DefaultLocalChannel) event.getChannel();
82 boolean offered = channel.writeBuffer.offer(event);
83 assert offered;
84 channel.flushWriteBuffer();
85 }
86 }
87
88 private static void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
89 try {
90 if (!LocalChannelRegistry.register(localAddress, channel)) {
91 throw new ChannelException("address already in use: " + localAddress);
92 }
93
94 channel.setBound();
95 channel.localAddress = localAddress;
96 future.setSuccess();
97 fireChannelBound(channel, localAddress);
98 } catch (Throwable t) {
99 LocalChannelRegistry.unregister(localAddress);
100 future.setFailure(t);
101 fireExceptionCaught(channel, t);
102 }
103 }
104
105 private void connect(DefaultLocalChannel channel, ChannelFuture future, LocalAddress remoteAddress) {
106 Channel remoteChannel = LocalChannelRegistry.getChannel(remoteAddress);
107 if (!(remoteChannel instanceof DefaultLocalServerChannel)) {
108 future.setFailure(new ConnectException("connection refused"));
109 return;
110 }
111
112 DefaultLocalServerChannel serverChannel = (DefaultLocalServerChannel) remoteChannel;
113 ChannelPipeline pipeline;
114 try {
115 pipeline = serverChannel.getConfig().getPipelineFactory().getPipeline();
116 } catch (Exception e) {
117 future.setFailure(e);
118 fireExceptionCaught(channel, e);
119 if (logger.isWarnEnabled()) {
120 logger.warn(
121 "Failed to initialize an accepted socket.", e);
122 }
123 return;
124 }
125
126 future.setSuccess();
127 DefaultLocalChannel acceptedChannel = new DefaultLocalChannel(
128 serverChannel, serverChannel.getFactory(), pipeline, this, channel);
129 channel.pairedChannel = acceptedChannel;
130
131
132 if (!channel.isBound()) {
133 bind(channel, succeededFuture(channel), new LocalAddress(LocalAddress.EPHEMERAL));
134 }
135 channel.remoteAddress = serverChannel.getLocalAddress();
136 channel.setConnected();
137 fireChannelConnected(channel, serverChannel.getLocalAddress());
138
139 acceptedChannel.localAddress = serverChannel.getLocalAddress();
140 try {
141 acceptedChannel.setBound();
142 } catch (IOException e) {
143 throw new Error(e);
144 }
145 fireChannelBound(acceptedChannel, channel.getRemoteAddress());
146 acceptedChannel.remoteAddress = channel.getLocalAddress();
147 acceptedChannel.setConnected();
148 fireChannelConnected(acceptedChannel, channel.getLocalAddress());
149
150
151 channel.flushWriteBuffer();
152 acceptedChannel.flushWriteBuffer();
153 }
154 }