View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.execution;
17  
18  import java.util.concurrent.Executor;
19  
20  import org.jboss.netty.bootstrap.ServerBootstrap;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelDownstreamHandler;
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelHandler;
25  import org.jboss.netty.channel.ChannelHandler.Sharable;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelPipelineFactory;
29  import org.jboss.netty.channel.ChannelState;
30  import org.jboss.netty.channel.ChannelStateEvent;
31  import org.jboss.netty.channel.ChannelUpstreamHandler;
32  import org.jboss.netty.channel.Channels;
33  import org.jboss.netty.util.ExternalResourceReleasable;
34  import org.jboss.netty.util.internal.ExecutorUtil;
35  
36  /**
37   * Forwards an upstream {@link ChannelEvent} to an {@link Executor}.
38   * <p>
39   * {@link ExecutionHandler} is often used when your {@link ChannelHandler}
40   * performs a blocking operation that takes long time or accesses a resource
41   * which is not CPU-bound business logic such as DB access.  Running such
42   * operations in a pipeline without an {@link ExecutionHandler} will result in
43   * unwanted hiccup during I/O because an I/O thread cannot perform I/O until
44   * your handler returns the control to the I/O thread.
45   * <p>
46   * In most cases, an {@link ExecutionHandler} is coupled with an
47   * {@link OrderedMemoryAwareThreadPoolExecutor} because it guarantees the
48   * correct event execution order and prevents an {@link OutOfMemoryError}
49   * under load:
50   * <pre>
51   * public class DatabaseGatewayPipelineFactory implements {@link ChannelPipelineFactory} {
52   *
53   *     <b>private final {@link ExecutionHandler} executionHandler;</b>
54   *
55   *     public DatabaseGatewayPipelineFactory({@link ExecutionHandler} executionHandler) {
56   *         this.executionHandler = executionHandler;
57   *     }
58   *
59   *     public {@link ChannelPipeline} getPipeline() {
60   *         return {@link Channels}.pipeline(
61   *                 new DatabaseGatewayProtocolEncoder(),
62   *                 new DatabaseGatewayProtocolDecoder(),
63   *                 <b>executionHandler, // Must be shared</b>
64   *                 new DatabaseQueryingHandler());
65   *     }
66   * }
67   * ...
68   *
69   * public static void main(String[] args) {
70   *     {@link ServerBootstrap} bootstrap = ...;
71   *     ...
72   *     <b>{@link ExecutionHandler} executionHandler = new {@link ExecutionHandler}(
73   *             new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576))
74   *     bootstrap.setPipelineFactory(
75   *             new DatabaseGatewayPipelineFactory(executionHandler));</b>
76   *     ...
77   *     bootstrap.bind(...);
78   *     ...
79   *
80   *     while (!isServerReadyToShutDown()) {
81   *         // ... wait ...
82   *     }
83   *
84   *     bootstrap.releaseExternalResources();
85   *     <b>executionHandler.releaseExternalResources();</b>
86   * }
87   * </pre>
88   *
89   * Please refer to {@link OrderedMemoryAwareThreadPoolExecutor} for the
90   * detailed information about how the event order is guaranteed.
91   *
92   * <h3>SEDA (Staged Event-Driven Architecture)</h3>
93   * You can implement an alternative thread model such as
94   * <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a>
95   * by adding more than one {@link ExecutionHandler} to the pipeline.
96   *
97   * <h3>Using other {@link Executor} implementation</h3>
98   *
99   * Although it's recommended to use {@link OrderedMemoryAwareThreadPoolExecutor},
100  * you can use other {@link Executor} implementations.  However, you must note
101  * that other {@link Executor} implementation might break your application
102  * because they often do not maintain event execution order nor interact with
103  * I/O threads to control the incoming traffic and avoid {@link OutOfMemoryError}.
104  *
105  * @apiviz.landmark
106  * @apiviz.has java.util.concurrent.ThreadPoolExecutor
107  */
108 @Sharable
109 public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable {
110 
111     private final Executor executor;
112     private final boolean handleDownstream;
113     private final boolean handleUpstream;
114 
115     /**
116      * Creates a new instance with the specified {@link Executor}.
117      * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
118      */
119     public ExecutionHandler(Executor executor) {
120         this(executor, false, true);
121     }
122 
123 
124     /**
125      * Use {@link #ExecutionHandler(Executor, boolean, boolean)}
126      *
127      * {@link Deprecated}
128      */
129     @Deprecated
130     public ExecutionHandler(Executor executor, boolean handleDownstream) {
131         this(executor, handleDownstream, true);
132     }
133 
134     /**
135      * Creates a new instance with the specified {@link Executor}.
136      * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
137      */
138     public ExecutionHandler(Executor executor, boolean handleDownstream, boolean handleUpstream) {
139         if (executor == null) {
140             throw new NullPointerException("executor");
141         }
142         if (!handleDownstream && !handleUpstream) {
143             throw new IllegalArgumentException("You must handle at least handle one event type");
144         }
145         this.executor = executor;
146         this.handleDownstream = handleDownstream;
147         this.handleUpstream = handleUpstream;
148     }
149 
150     /**
151      * Returns the {@link Executor} which was specified with the constructor.
152      */
153     public Executor getExecutor() {
154         return executor;
155     }
156 
157     /**
158      * Shuts down the {@link Executor} which was specified with the constructor
159      * and wait for its termination.
160      */
161     public void releaseExternalResources() {
162         Executor executor = getExecutor();
163         ExecutorUtil.terminate(ChannelEventRunnable.PARENT, executor);
164         if (executor instanceof ExternalResourceReleasable) {
165             ((ExternalResourceReleasable) executor).releaseExternalResources();
166         }
167     }
168 
169     public void handleUpstream(
170             ChannelHandlerContext context, ChannelEvent e) throws Exception {
171         if (handleUpstream) {
172             executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));
173         } else {
174             context.sendUpstream(e);
175         }
176     }
177 
178     public void handleDownstream(
179             ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
180         // check if the read was suspend
181         if (!handleReadSuspend(ctx, e)) {
182             if (handleDownstream) {
183                 executor.execute(new ChannelDownstreamEventRunnable(ctx, e, executor));
184             } else {
185                 ctx.sendDownstream(e);
186             }
187         }
188     }
189 
190     /**
191      * Handle suspended reads
192      */
193     protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) {
194         if (e instanceof ChannelStateEvent) {
195             ChannelStateEvent cse = (ChannelStateEvent) e;
196             if (cse.getState() == ChannelState.INTEREST_OPS &&
197                 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
198 
199                 // setReadable(true) requested
200                 boolean readSuspended = ctx.getAttachment() != null;
201                 if (readSuspended) {
202                     // Drop the request silently if MemoryAwareThreadPool has
203                     // set the flag.
204                     e.getFuture().setSuccess();
205                     return true;
206                 }
207             }
208         }
209 
210         return false;
211     }
212 }