View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.net.InetSocketAddress;
21  import java.net.SocketAddress;
22  import java.util.concurrent.Executor;
23  
24  import org.jboss.netty.channel.ChannelEvent;
25  import org.jboss.netty.channel.ChannelFuture;
26  import org.jboss.netty.channel.ChannelFutureListener;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelState;
29  import org.jboss.netty.channel.ChannelStateEvent;
30  import org.jboss.netty.channel.MessageEvent;
31  
32  /**
33   * Receives downstream events from a {@link ChannelPipeline}.  It contains
34   * an array of I/O workers.
35   */
36  class NioDatagramPipelineSink extends AbstractNioChannelSink {
37  
38      private final WorkerPool<NioDatagramWorker> workerPool;
39  
40      /**
41       * Creates a new {@link NioDatagramPipelineSink} with a the number of {@link NioDatagramWorker}s
42       * specified in workerCount.  The {@link NioDatagramWorker}s take care of reading and writing
43       * for the {@link NioDatagramChannel}.
44       *
45       * @param workerExecutor
46       *        the {@link Executor} that will run the {@link NioDatagramWorker}s
47       *        for this sink
48       * @param workerCount
49       *        the number of {@link NioDatagramWorker}s for this sink
50       */
51      NioDatagramPipelineSink(final WorkerPool<NioDatagramWorker> workerPool) {
52          this.workerPool = workerPool;
53      }
54  
55      /**
56       * Handle downstream event.
57       *
58       * @param pipeline the {@link ChannelPipeline} that passes down the
59       *                 downstream event.
60       * @param e The downstream event.
61       */
62      public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
63              throws Exception {
64          final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel();
65          final ChannelFuture future = e.getFuture();
66          if (e instanceof ChannelStateEvent) {
67              final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
68              final ChannelState state = stateEvent.getState();
69              final Object value = stateEvent.getValue();
70              switch (state) {
71              case OPEN:
72                  if (Boolean.FALSE.equals(value)) {
73                      channel.worker.close(channel, future);
74                  }
75                  break;
76              case BOUND:
77                  if (value != null) {
78                      bind(channel, future, (InetSocketAddress) value);
79                  } else {
80                      channel.worker.close(channel, future);
81                  }
82                  break;
83              case CONNECTED:
84                  if (value != null) {
85                      connect(channel, future, (InetSocketAddress) value);
86                  } else {
87                      NioDatagramWorker.disconnect(channel, future);
88                  }
89                  break;
90              case INTEREST_OPS:
91                  channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
92                  break;
93              }
94          } else if (e instanceof MessageEvent) {
95              final MessageEvent event = (MessageEvent) e;
96              final boolean offered = channel.writeBufferQueue.offer(event);
97              assert offered;
98              channel.worker.writeFromUserCode(channel);
99          }
100     }
101 
102     private static void close(NioDatagramChannel channel, ChannelFuture future) {
103         try {
104             channel.getDatagramChannel().socket().close();
105             if (channel.setClosed()) {
106                 future.setSuccess();
107                 if (channel.isBound()) {
108                     fireChannelUnbound(channel);
109                 }
110                 fireChannelClosed(channel);
111             } else {
112                 future.setSuccess();
113             }
114         } catch (final Throwable t) {
115             future.setFailure(t);
116             fireExceptionCaught(channel, t);
117         }
118     }
119 
120     /**
121      * Will bind the DatagramSocket to the passed-in address.
122      * Every call bind will spawn a new thread using the that basically in turn
123      */
124     private static void bind(final NioDatagramChannel channel,
125             final ChannelFuture future, final InetSocketAddress address) {
126         boolean bound = false;
127         boolean started = false;
128         try {
129             // First bind the DatagramSocket the specified port.
130             channel.getDatagramChannel().socket().bind(address);
131             bound = true;
132 
133             future.setSuccess();
134             fireChannelBound(channel, address);
135 
136             channel.worker.register(channel, null);
137             started = true;
138         } catch (final Throwable t) {
139             future.setFailure(t);
140             fireExceptionCaught(channel, t);
141         } finally {
142             if (!started && bound) {
143                 close(channel, future);
144             }
145         }
146     }
147 
148     private static void connect(
149             NioDatagramChannel channel, ChannelFuture future,
150             SocketAddress remoteAddress) {
151 
152         boolean bound = channel.isBound();
153         boolean connected = false;
154         boolean workerStarted = false;
155 
156         future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
157 
158         // Clear the cached address so that the next getRemoteAddress() call
159         // updates the cache.
160         channel.remoteAddress = null;
161 
162         try {
163             channel.getDatagramChannel().connect(remoteAddress);
164             connected = true;
165 
166             // Fire events.
167             future.setSuccess();
168             if (!bound) {
169                 fireChannelBound(channel, channel.getLocalAddress());
170             }
171             fireChannelConnected(channel, channel.getRemoteAddress());
172 
173             if (!bound) {
174                 channel.worker.register(channel, future);
175             }
176 
177             workerStarted = true;
178         } catch (Throwable t) {
179             future.setFailure(t);
180             fireExceptionCaught(channel, t);
181         } finally {
182             if (connected && !workerStarted) {
183                 channel.worker.close(channel, future);
184             }
185         }
186     }
187 
188     NioDatagramWorker nextWorker() {
189         return workerPool.nextWorker();
190     }
191 
192 }