1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.InetSocketAddress;
21 import java.net.SocketAddress;
22 import java.util.concurrent.Executor;
23
24 import org.jboss.netty.channel.ChannelEvent;
25 import org.jboss.netty.channel.ChannelFuture;
26 import org.jboss.netty.channel.ChannelFutureListener;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelState;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.MessageEvent;
31
32
33
34
35
36 class NioDatagramPipelineSink extends AbstractNioChannelSink {
37
38 private final WorkerPool<NioDatagramWorker> workerPool;
39
40
41
42
43
44
45
46
47
48
49
50
51 NioDatagramPipelineSink(final WorkerPool<NioDatagramWorker> workerPool) {
52 this.workerPool = workerPool;
53 }
54
55
56
57
58
59
60
61
62 public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
63 throws Exception {
64 final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel();
65 final ChannelFuture future = e.getFuture();
66 if (e instanceof ChannelStateEvent) {
67 final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
68 final ChannelState state = stateEvent.getState();
69 final Object value = stateEvent.getValue();
70 switch (state) {
71 case OPEN:
72 if (Boolean.FALSE.equals(value)) {
73 channel.worker.close(channel, future);
74 }
75 break;
76 case BOUND:
77 if (value != null) {
78 bind(channel, future, (InetSocketAddress) value);
79 } else {
80 channel.worker.close(channel, future);
81 }
82 break;
83 case CONNECTED:
84 if (value != null) {
85 connect(channel, future, (InetSocketAddress) value);
86 } else {
87 NioDatagramWorker.disconnect(channel, future);
88 }
89 break;
90 case INTEREST_OPS:
91 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
92 break;
93 }
94 } else if (e instanceof MessageEvent) {
95 final MessageEvent event = (MessageEvent) e;
96 final boolean offered = channel.writeBufferQueue.offer(event);
97 assert offered;
98 channel.worker.writeFromUserCode(channel);
99 }
100 }
101
102 private static void close(NioDatagramChannel channel, ChannelFuture future) {
103 try {
104 channel.getDatagramChannel().socket().close();
105 if (channel.setClosed()) {
106 future.setSuccess();
107 if (channel.isBound()) {
108 fireChannelUnbound(channel);
109 }
110 fireChannelClosed(channel);
111 } else {
112 future.setSuccess();
113 }
114 } catch (final Throwable t) {
115 future.setFailure(t);
116 fireExceptionCaught(channel, t);
117 }
118 }
119
120
121
122
123
124 private static void bind(final NioDatagramChannel channel,
125 final ChannelFuture future, final InetSocketAddress address) {
126 boolean bound = false;
127 boolean started = false;
128 try {
129
130 channel.getDatagramChannel().socket().bind(address);
131 bound = true;
132
133 future.setSuccess();
134 fireChannelBound(channel, address);
135
136 channel.worker.register(channel, null);
137 started = true;
138 } catch (final Throwable t) {
139 future.setFailure(t);
140 fireExceptionCaught(channel, t);
141 } finally {
142 if (!started && bound) {
143 close(channel, future);
144 }
145 }
146 }
147
148 private static void connect(
149 NioDatagramChannel channel, ChannelFuture future,
150 SocketAddress remoteAddress) {
151
152 boolean bound = channel.isBound();
153 boolean connected = false;
154 boolean workerStarted = false;
155
156 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
157
158
159
160 channel.remoteAddress = null;
161
162 try {
163 channel.getDatagramChannel().connect(remoteAddress);
164 connected = true;
165
166
167 future.setSuccess();
168 if (!bound) {
169 fireChannelBound(channel, channel.getLocalAddress());
170 }
171 fireChannelConnected(channel, channel.getRemoteAddress());
172
173 if (!bound) {
174 channel.worker.register(channel, future);
175 }
176
177 workerStarted = true;
178 } catch (Throwable t) {
179 future.setFailure(t);
180 fireExceptionCaught(channel, t);
181 } finally {
182 if (connected && !workerStarted) {
183 channel.worker.close(channel, future);
184 }
185 }
186 }
187
188 NioDatagramWorker nextWorker() {
189 return workerPool.nextWorker();
190 }
191
192 }