View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.InetAddress;
22  import java.net.InetSocketAddress;
23  import java.net.NetworkInterface;
24  import java.net.SocketAddress;
25  import java.net.SocketException;
26  import java.nio.channels.DatagramChannel;
27  import java.nio.channels.MembershipKey;
28  import java.util.ArrayList;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  
34  import org.jboss.netty.channel.ChannelException;
35  import org.jboss.netty.channel.ChannelFactory;
36  import org.jboss.netty.channel.ChannelFuture;
37  import org.jboss.netty.channel.ChannelPipeline;
38  import org.jboss.netty.channel.ChannelSink;
39  import org.jboss.netty.channel.Channels;
40  import org.jboss.netty.channel.socket.DatagramChannelConfig;
41  import org.jboss.netty.channel.socket.InternetProtocolFamily;
42  import org.jboss.netty.util.internal.DetectionUtil;
43  
44  /**
45   * Provides an NIO based {@link org.jboss.netty.channel.socket.DatagramChannel}.
46   */
47  public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
48                                  implements org.jboss.netty.channel.socket.DatagramChannel {
49  
50      /**
51       * The {@link DatagramChannelConfig}.
52       */
53      private final NioDatagramChannelConfig config;
54      private Map<InetAddress, List<MembershipKey>> memberships;
55  
56      NioDatagramChannel(final ChannelFactory factory,
57              final ChannelPipeline pipeline, final ChannelSink sink,
58              final NioDatagramWorker worker, InternetProtocolFamily family) {
59          super(null, factory, pipeline, sink, worker, openNonBlockingChannel(family));
60          config = new DefaultNioDatagramChannelConfig(channel);
61  
62          fireChannelOpen(this);
63  
64      }
65  
66      private static DatagramChannel openNonBlockingChannel(InternetProtocolFamily family) {
67          try {
68              final DatagramChannel channel;
69  
70              // check if we are on java 7 or if the family was not specified
71              if (DetectionUtil.javaVersion() < 7 || family == null) {
72                  channel = DatagramChannel.open();
73              } else {
74                  // This block only works on java7++, but we checked before if we have it.
75                  //
76                  // Use the ProtocolFamilyConvert for conversion to prevent NoClassDefFoundError.
77                  //
78                  // See #368
79                  switch (family) {
80                  case IPv4:
81                      channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
82                      break;
83  
84                  case IPv6:
85                      channel = DatagramChannel.open(ProtocolFamilyConverter.convert(family));
86                      break;
87  
88                  default:
89                      throw new IllegalArgumentException();
90                  }
91              }
92  
93              channel.configureBlocking(false);
94              return channel;
95          } catch (final IOException e) {
96              throw new ChannelException("Failed to open a DatagramChannel.", e);
97          }
98      }
99  
100 
101 
102     @Override
103     public NioDatagramWorker getWorker() {
104         return (NioDatagramWorker) super.getWorker();
105     }
106 
107     public boolean isBound() {
108         return isOpen() && channel.socket().isBound();
109     }
110 
111     public boolean isConnected() {
112         return channel.isConnected();
113     }
114 
115     @Override
116     protected boolean setClosed() {
117         return super.setClosed();
118     }
119 
120     @Override
121     public NioDatagramChannelConfig getConfig() {
122         return config;
123     }
124 
125     DatagramChannel getDatagramChannel() {
126         return channel;
127     }
128 
129 
130 
131     public ChannelFuture joinGroup(InetAddress multicastAddress) {
132        try {
133             return joinGroup(
134                     multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
135         } catch (SocketException e) {
136             return Channels.failedFuture(this, e);
137         }
138     }
139 
140 
141     public ChannelFuture joinGroup(InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
142         return joinGroup(multicastAddress.getAddress(), networkInterface, null);
143     }
144 
145     /**
146      * Joins the specified multicast group at the specified interface using the specified source.
147      */
148     public ChannelFuture joinGroup(
149             InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
150         if (DetectionUtil.javaVersion() < 7) {
151             throw new UnsupportedOperationException();
152         } else {
153             if (multicastAddress == null) {
154                 throw new NullPointerException("multicastAddress");
155             }
156 
157             if (networkInterface == null) {
158                 throw new NullPointerException("networkInterface");
159             }
160 
161             try {
162                 MembershipKey key;
163                 if (source == null) {
164                     key = channel.join(multicastAddress, networkInterface);
165                 } else {
166                     key = channel.join(multicastAddress, networkInterface, source);
167                 }
168 
169                 synchronized (this) {
170                     if (memberships == null) {
171                         memberships = new HashMap<InetAddress, List<MembershipKey>>();
172 
173                     }
174                     List<MembershipKey> keys = memberships.get(multicastAddress);
175                     if (keys == null) {
176                         keys = new ArrayList<MembershipKey>();
177                         memberships.put(multicastAddress, keys);
178                     }
179                     keys.add(key);
180                 }
181             } catch (Throwable e) {
182                 return Channels.failedFuture(this, e);
183             }
184         }
185         return Channels.succeededFuture(this);
186     }
187 
188     public ChannelFuture leaveGroup(InetAddress multicastAddress) {
189         try {
190             return leaveGroup(
191                     multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), null);
192         } catch (SocketException e) {
193             return Channels.failedFuture(this, e);
194         }
195 
196     }
197 
198     public ChannelFuture leaveGroup(InetSocketAddress multicastAddress,
199             NetworkInterface networkInterface) {
200         return leaveGroup(multicastAddress.getAddress(), networkInterface, null);
201     }
202 
203     /**
204      * Leave the specified multicast group at the specified interface using the specified source.
205      */
206     public ChannelFuture leaveGroup(InetAddress multicastAddress,
207             NetworkInterface networkInterface, InetAddress source) {
208         if (DetectionUtil.javaVersion() < 7) {
209             throw new UnsupportedOperationException();
210         } else {
211             if (multicastAddress == null) {
212                 throw new NullPointerException("multicastAddress");
213             }
214 
215             if (networkInterface == null) {
216                 throw new NullPointerException("networkInterface");
217             }
218 
219             synchronized (this) {
220                 if (memberships != null) {
221                     List<MembershipKey> keys = memberships.get(multicastAddress);
222                     if (keys != null) {
223                         Iterator<MembershipKey> keyIt = keys.iterator();
224 
225                         while (keyIt.hasNext()) {
226                             MembershipKey key = keyIt.next();
227                             if (networkInterface.equals(key.networkInterface())) {
228                                if (source == null && key.sourceAddress() == null ||
229                                    source != null && source.equals(key.sourceAddress())) {
230                                    key.drop();
231                                    keyIt.remove();
232                                }
233 
234                             }
235                         }
236                         if (keys.isEmpty()) {
237                             memberships.remove(multicastAddress);
238                         }
239                     }
240                 }
241             }
242             return Channels.succeededFuture(this);
243         }
244     }
245 
246     /**
247      * Block the given sourceToBlock address for the given multicastAddress on the given networkInterface
248      *
249      */
250     public ChannelFuture block(InetAddress multicastAddress,
251             NetworkInterface networkInterface, InetAddress sourceToBlock) {
252         if (DetectionUtil.javaVersion() < 7) {
253             throw new UnsupportedOperationException();
254         } else {
255             if (multicastAddress == null) {
256                 throw new NullPointerException("multicastAddress");
257             }
258             if (sourceToBlock == null) {
259                 throw new NullPointerException("sourceToBlock");
260             }
261 
262             if (networkInterface == null) {
263                 throw new NullPointerException("networkInterface");
264             }
265             synchronized (this) {
266                 if (memberships != null) {
267                     List<MembershipKey> keys = memberships.get(multicastAddress);
268                     for (MembershipKey key: keys) {
269                         if (networkInterface.equals(key.networkInterface())) {
270                             try {
271                                 key.block(sourceToBlock);
272                             } catch (IOException e) {
273                                 return Channels.failedFuture(this, e);
274                             }
275                         }
276                     }
277                 }
278             }
279             return Channels.succeededFuture(this);
280 
281 
282         }
283     }
284 
285     /**
286 * Block the given sourceToBlock address for the given multicastAddress
287 *
288 */
289     public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
290         try {
291             block(multicastAddress, NetworkInterface.getByInetAddress(getLocalAddress().getAddress()), sourceToBlock);
292         } catch (SocketException e) {
293             return Channels.failedFuture(this, e);
294         }
295         return Channels.succeededFuture(this);
296 
297     }
298     @Override
299     InetSocketAddress getLocalSocketAddress() throws Exception {
300         return (InetSocketAddress) channel.socket().getLocalSocketAddress();
301     }
302 
303     @Override
304     InetSocketAddress getRemoteSocketAddress() throws Exception {
305         return (InetSocketAddress) channel.socket().getRemoteSocketAddress();
306     }
307 
308     @Override
309     public ChannelFuture write(Object message, SocketAddress remoteAddress) {
310         if (remoteAddress == null || remoteAddress.equals(getRemoteAddress())) {
311             return super.write(message, null);
312         } else {
313             return super.write(message, remoteAddress);
314         }
315 
316     }
317 }