1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.net.NetworkInterface;
24 import java.net.SocketAddress;
25 import java.net.SocketException;
26 import java.nio.channels.DatagramChannel;
27 import java.nio.channels.MembershipKey;
28 import java.util.ArrayList;
29 import java.util.HashMap;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33
34 import org.jboss.netty.channel.ChannelException;
35 import org.jboss.netty.channel.ChannelFactory;
36 import org.jboss.netty.channel.ChannelFuture;
37 import org.jboss.netty.channel.ChannelPipeline;
38 import org.jboss.netty.channel.ChannelSink;
39 import org.jboss.netty.channel.Channels;
40 import org.jboss.netty.channel.socket.DatagramChannelConfig;
41 import org.jboss.netty.channel.socket.InternetProtocolFamily;
42 import org.jboss.netty.util.internal.DetectionUtil;
43
44
45
46
47 public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
48 implements org.jboss.netty.channel.socket.DatagramChannel {
49
50
51
52
53 private final NioDatagramChannelConfig config;
54 private Map<InetAddress, List<MembershipKey>> memberships;
55
56 NioDatagramChannel(final ChannelFactory factory,
57 final ChannelPipeline pipeline, final ChannelSink sink,
58 final NioDatagramWorker worker, InternetProtocolFamily family) {
59 super(null, factory, pipeline, sink, worker, openNonBlockingChannel(family));
60 config = new DefaultNioDatagramChannelConfig(channel);
61
62 fireChannelOpen(this);
63
64 }
65
66 private static DatagramChannel openNonBlockingChannel(InternetProtocolFamily family) {
67 try {
68 final DatagramChannel channel;
69
70
71 if (DetectionUtil.javaVersion() < 7 || family == null) {
72 channel = DatagramChannel.open();
73 } else {
74
75
76
77
78
79 switch (family) {
80 case IPv4:
81 channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
82 break;
83
84 case IPv6:
85 channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
86 break;
87
88 default:
89 throw new IllegalArgumentException();
90 }
91 }
92
93 channel.configureBlocking(false);
94 return channel;
95 } catch (final IOException e) {
96 throw new ChannelException("Failed to open a DatagramChannel.", e);
97 }
98 }
99
100
101
102 @Override
103 public NioDatagramWorker getWorker() {
104 return (NioDatagramWorker) super.getWorker();
105 }
106
107 public boolean isBound() {
108 return isOpen() && channel.socket().isBound();
109 }
110
111 public boolean isConnected() {
112 return channel.isConnected();
113 }
114
115 @Override
116 protected boolean setClosed() {
117 return super.setClosed();
118 }
119
120 @Override
121 public NioDatagramChannelConfig getConfig() {
122 return config;
123 }
124
125 DatagramChannel getDatagramChannel() {
126 return channel;
127 }
128
129
130
131 public ChannelFuture joinGroup(InetAddress multicastAddress) {
132 try {
133 return joinGroup(
134 multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
135 } catch (SocketException e) {
136 return Channels.failedFuture(this, e);
137 }
138 }
139
140
141 public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
142 return joinGroup(multicastAddress.getAddress(), networkInterface, null);
143 }
144
145
146
147
148 public ChannelFuture joinGroup(
149 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
150 if (DetectionUtil.javaVersion() < 7) {
151 throw new UnsupportedOperationException();
152 } else {
153 if (multicastAddress == null) {
154 throw new NullPointerException("multicastAddress");
155 }
156
157 if (networkInterface == null) {
158 throw new NullPointerException("networkInterface");
159 }
160
161 try {
162 MembershipKey key;
163 if (source == null) {
164 key = channel.join(multicastAddress, networkInterface);
165 } else {
166 key = channel.join(multicastAddress, networkInterface, source);
167 }
168
169 synchronized (this) {
170 if (memberships == null) {
171 memberships = new HashMap<InetAddress, List<MembershipKey>>();
172
173 }
174 List<MembershipKey> keys = memberships.get(multicastAddress);
175 if (keys == null) {
176 keys = new ArrayList<MembershipKey>();
177 memberships.put(multicastAddress, keys);
178 }
179 keys.add(key);
180 }
181 } catch (Throwable e) {
182 return Channels.failedFuture(this, e);
183 }
184 }
185 return Channels.succeededFuture(this);
186 }
187
188 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
189 try {
190 return leaveGroup(
191 multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
192 } catch (SocketException e) {
193 return Channels.failedFuture(this, e);
194 }
195
196 }
197
198 public ChannelFuture leaveGroup(InetSocketAddress multicastAddress,
199 NetworkInterface networkInterface) {
200 return leaveGroup(multicastAddress.getAddress(), networkInterface, null);
201 }
202
203
204
205
206 public ChannelFuture leaveGroup(InetAddress multicastAddress,
207 NetworkInterface networkInterface, InetAddress source) {
208 if (DetectionUtil.javaVersion() < 7) {
209 throw new UnsupportedOperationException();
210 } else {
211 if (multicastAddress == null) {
212 throw new NullPointerException("multicastAddress");
213 }
214
215 if (networkInterface == null) {
216 throw new NullPointerException("networkInterface");
217 }
218
219 synchronized (this) {
220 if (memberships != null) {
221 List<MembershipKey> keys = memberships.get(multicastAddress);
222 if (keys != null) {
223 Iterator<MembershipKey> keyIt = keys.iterator();
224
225 while (keyIt.hasNext()) {
226 MembershipKey key = keyIt.next();
227 if (networkInterface.equals(key.networkInterface())) {
228 if (source == null && key.sourceAddress() == null ||
229 source != null && source.equals(key.sourceAddress())) {
230 key.drop();
231 keyIt.remove();
232 }
233
234 }
235 }
236 if (keys.isEmpty()) {
237 memberships.remove(multicastAddress);
238 }
239 }
240 }
241 }
242 return Channels.succeededFuture(this);
243 }
244 }
245
246
247
248
249
250 public ChannelFuture block(InetAddress multicastAddress,
251 NetworkInterface networkInterface, InetAddress sourceToBlock) {
252 if (DetectionUtil.javaVersion() < 7) {
253 throw new UnsupportedOperationException();
254 } else {
255 if (multicastAddress == null) {
256 throw new NullPointerException("multicastAddress");
257 }
258 if (sourceToBlock == null) {
259 throw new NullPointerException("sourceToBlock");
260 }
261
262 if (networkInterface == null) {
263 throw new NullPointerException("networkInterface");
264 }
265 synchronized (this) {
266 if (memberships != null) {
267 List<MembershipKey> keys = memberships.get(multicastAddress);
268 for (MembershipKey key: keys) {
269 if (networkInterface.equals(key.networkInterface())) {
270 try {
271 key.block(sourceToBlock);
272 } catch (IOException e) {
273 return Channels.failedFuture(this, e);
274 }
275 }
276 }
277 }
278 }
279 return Channels.succeededFuture(this);
280
281
282 }
283 }
284
285
286
287
288
289 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
290 try {
291 block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock);
292 } catch (SocketException e) {
293 return Channels.failedFuture(this, e);
294 }
295 return Channels.succeededFuture(this);
296
297 }
298 @Override
299 InetSocketAddress getLocalSocketAddress() throws Exception {
300 return (InetSocketAddress) channel.socket().getLocalSocketAddress();
301 }
302
303 @Override
304 InetSocketAddress getRemoteSocketAddress() throws Exception {
305 return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
306 }
307
308 @Override
309 public ChannelFuture write(Object message, SocketAddress remoteAddress) {
310 if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
311 return super.write(message, null);
312 } else {
313 return super.write(message, remoteAddress);
314 }
315
316 }
317 }