1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.queue;
17
18 import java.io.IOException;
19 import java.nio.channels.ClosedChannelException;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Queue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.jboss.netty.buffer.ChannelBuffer;
28 import org.jboss.netty.buffer.ChannelBuffers;
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelConfig;
31 import org.jboss.netty.channel.ChannelFuture;
32 import org.jboss.netty.channel.ChannelFutureListener;
33 import org.jboss.netty.channel.ChannelHandlerContext;
34 import org.jboss.netty.channel.ChannelStateEvent;
35 import org.jboss.netty.channel.Channels;
36 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
37 import org.jboss.netty.channel.MessageEvent;
38 import org.jboss.netty.channel.SimpleChannelHandler;
39 import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
40 import org.jboss.netty.util.HashedWheelTimer;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
165
166 private final Queue<MessageEvent> queue;
167 private final boolean consolidateOnFlush;
168 private volatile ChannelHandlerContext ctx;
169 private final AtomicBoolean flush = new AtomicBoolean(false);
170
171
172
173
174
175 public BufferedWriteHandler() {
176 this(false);
177 }
178
179
180
181
182
183
184
185 public BufferedWriteHandler(Queue<MessageEvent> queue) {
186 this(queue, false);
187 }
188
189
190
191
192
193
194
195
196 public BufferedWriteHandler(boolean consolidateOnFlush) {
197 this(new ConcurrentLinkedQueue<MessageEvent>(), consolidateOnFlush);
198 }
199
200
201
202
203
204
205
206
207
208
209 public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210 if (queue == null) {
211 throw new NullPointerException("queue");
212 }
213 this.queue = queue;
214 this.consolidateOnFlush = consolidateOnFlush;
215 }
216
217 public boolean isConsolidateOnFlush() {
218 return consolidateOnFlush;
219 }
220
221
222
223
224
225 protected Queue<MessageEvent> getQueue() {
226 return queue;
227 }
228
229
230
231
232 public void flush() {
233 flush(consolidateOnFlush);
234 }
235
236
237
238
239
240
241
242
243 public void flush(boolean consolidateOnFlush) {
244 final ChannelHandlerContext ctx = this.ctx;
245 if (ctx == null) {
246
247 return;
248 }
249 Channel channel = ctx.getChannel();
250 boolean acquired;
251
252
253 if (acquired = flush.compareAndSet(false, true)) {
254 final Queue<MessageEvent> queue = getQueue();
255 if (consolidateOnFlush) {
256 if (queue.isEmpty()) {
257 return;
258 }
259
260 List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
261 for (;;) {
262 MessageEvent e = queue.poll();
263 if (e == null) {
264 break;
265 }
266 if (!(e.getMessage() instanceof ChannelBuffer)) {
267 if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
268 pendingWrites = new ArrayList<MessageEvent>();
269 }
270 ctx.sendDownstream(e);
271 } else {
272 pendingWrites.add(e);
273 }
274 }
275 consolidatedWrite(pendingWrites);
276
277 } else {
278 for (;;) {
279 MessageEvent e = queue.poll();
280 if (e == null) {
281 break;
282 }
283 ctx.sendDownstream(e);
284 }
285 }
286 }
287
288 if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
289 flush(consolidateOnFlush);
290 }
291 }
292
293 private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
294 final int size = pendingWrites.size();
295 if (size == 1) {
296 ctx.sendDownstream(pendingWrites.remove(0));
297 return pendingWrites;
298 } else if (size == 0) {
299 return pendingWrites;
300 }
301
302 ChannelBuffer[] data = new ChannelBuffer[size];
303 for (int i = 0; i < data.length; i ++) {
304 data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
305 }
306
307 ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
308 ChannelFuture future = Channels.future(ctx.getChannel());
309 future.addListener(new ChannelFutureListener() {
310 public void operationComplete(ChannelFuture future)
311 throws Exception {
312 if (future.isSuccess()) {
313 for (MessageEvent e: pendingWrites) {
314 e.getFuture().setSuccess();
315 }
316 } else {
317 Throwable cause = future.getCause();
318 for (MessageEvent e: pendingWrites) {
319 e.getFuture().setFailure(cause);
320 }
321 }
322 }
323 });
324
325 Channels.write(ctx, future, composite);
326 return null;
327 }
328
329
330
331
332
333 @Override
334 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
335 throws Exception {
336 if (this.ctx == null) {
337 this.ctx = ctx;
338 } else {
339 assert this.ctx == ctx;
340 }
341
342 getQueue().add(e);
343 }
344
345 @Override
346 public void disconnectRequested(ChannelHandlerContext ctx,
347 ChannelStateEvent e) throws Exception {
348 try {
349 flush(consolidateOnFlush);
350 } finally {
351 ctx.sendDownstream(e);
352 }
353 }
354
355 @Override
356 public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
357 throws Exception {
358 try {
359 flush(consolidateOnFlush);
360 } finally {
361 ctx.sendDownstream(e);
362 }
363 }
364
365
366
367
368
369 @Override
370 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
371 Throwable cause = null;
372 for (;;) {
373 MessageEvent ev = queue.poll();
374
375 if (ev == null) {
376 break;
377 }
378
379 if (cause == null) {
380 cause = new ClosedChannelException();
381 }
382 ev.getFuture().setFailure(cause);
383
384 }
385 if (cause != null) {
386 Channels.fireExceptionCaught(ctx.getChannel(), cause);
387 }
388
389 super.channelClosed(ctx, e);
390 }
391
392 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
393
394
395 }
396
397 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
398
399
400 }
401
402 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
403
404 flush(consolidateOnFlush);
405 }
406
407
408
409
410
411 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
412 Throwable cause = null;
413 for (;;) {
414 MessageEvent ev = queue.poll();
415
416 if (ev == null) {
417 break;
418 }
419
420 if (cause == null) {
421 cause = new IOException("Unable to flush message");
422 }
423 ev.getFuture().setFailure(cause);
424
425 }
426 if (cause != null) {
427 Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
428 }
429 }
430
431
432 }