View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import java.io.IOException;
19  import java.lang.ref.SoftReference;
20  import java.net.SocketAddress;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.DatagramChannel;
23  import java.nio.channels.GatheringByteChannel;
24  import java.nio.channels.WritableByteChannel;
25  
26  import org.jboss.netty.buffer.ChannelBuffer;
27  import org.jboss.netty.buffer.CompositeChannelBuffer;
28  import org.jboss.netty.channel.DefaultFileRegion;
29  import org.jboss.netty.channel.FileRegion;
30  
31  final class SocketSendBufferPool {
32  
33      private static final SendBuffer EMPTY_BUFFER = new EmptySendBuffer();
34  
35      private static final int DEFAULT_PREALLOCATION_SIZE = 65536;
36      private static final int ALIGN_SHIFT = 4;
37      private static final int ALIGN_MASK = 15;
38  
39      PreallocationRef poolHead;
40      Preallocation current = new Preallocation(DEFAULT_PREALLOCATION_SIZE);
41  
42      SocketSendBufferPool() {
43          super();
44      }
45  
46      SendBuffer acquire(Object message) {
47          if (message instanceof ChannelBuffer) {
48              return acquire((ChannelBuffer) message);
49          } else if (message instanceof FileRegion) {
50              return acquire((FileRegion) message);
51          }
52  
53          throw new IllegalArgumentException(
54                  "unsupported message type: " + message.getClass());
55      }
56  
57      private SendBuffer acquire(FileRegion src) {
58          if (src.getCount() == 0) {
59              return EMPTY_BUFFER;
60          }
61          return new FileSendBuffer(src);
62      }
63  
64      private SendBuffer acquire(ChannelBuffer src) {
65          final int size = src.readableBytes();
66          if (size == 0) {
67              return EMPTY_BUFFER;
68          }
69  
70  
71          if (src instanceof CompositeChannelBuffer && ((CompositeChannelBuffer) src).useGathering()) {
72              return new GatheringSendBuffer(src.toByteBuffers());
73          }
74  
75          if (src.isDirect()) {
76              return new UnpooledSendBuffer(src.toByteBuffer());
77          }
78          if (src.readableBytes() > DEFAULT_PREALLOCATION_SIZE) {
79              return new UnpooledSendBuffer(src.toByteBuffer());
80          }
81  
82          Preallocation current = this.current;
83          ByteBuffer buffer = current.buffer;
84          int remaining = buffer.remaining();
85          PooledSendBuffer dst;
86  
87          if (size < remaining) {
88              int nextPos = buffer.position() + size;
89              ByteBuffer slice = buffer.duplicate();
90              buffer.position(align(nextPos));
91              slice.limit(nextPos);
92              current.refCnt ++;
93              dst = new PooledSendBuffer(current, slice);
94          } else if (size > remaining) {
95              this.current = current = getPreallocation();
96              buffer = current.buffer;
97              ByteBuffer slice = buffer.duplicate();
98              buffer.position(align(size));
99              slice.limit(size);
100             current.refCnt ++;
101             dst = new PooledSendBuffer(current, slice);
102         } else { // size == remaining
103             current.refCnt ++;
104             this.current = getPreallocation0();
105             dst = new PooledSendBuffer(current, current.buffer);
106         }
107 
108         ByteBuffer dstbuf = dst.buffer;
109         dstbuf.mark();
110         src.getBytes(src.readerIndex(), dstbuf);
111         dstbuf.reset();
112         return dst;
113     }
114 
115     private Preallocation getPreallocation() {
116         Preallocation current = this.current;
117         if (current.refCnt == 0) {
118             current.buffer.clear();
119             return current;
120         }
121 
122         return getPreallocation0();
123     }
124 
125     private Preallocation getPreallocation0() {
126         PreallocationRef ref = poolHead;
127         if (ref != null) {
128             do {
129                 Preallocation p = ref.get();
130                 ref = ref.next;
131 
132                 if (p != null) {
133                     poolHead = ref;
134                     return p;
135                 }
136             } while (ref != null);
137 
138             poolHead = ref;
139         }
140 
141         return new Preallocation(DEFAULT_PREALLOCATION_SIZE);
142     }
143 
144     private static int align(int pos) {
145         int q = pos >>> ALIGN_SHIFT;
146         int r = pos & ALIGN_MASK;
147         if (r != 0) {
148             q ++;
149         }
150         return q << ALIGN_SHIFT;
151     }
152 
153     private final class Preallocation {
154         final ByteBuffer buffer;
155         int refCnt;
156 
157         Preallocation(int capacity) {
158             buffer = ByteBuffer.allocateDirect(capacity);
159         }
160     }
161 
162     private final class PreallocationRef extends SoftReference<Preallocation> {
163         final PreallocationRef next;
164 
165         PreallocationRef(Preallocation prealloation, PreallocationRef next) {
166             super(prealloation);
167             this.next = next;
168         }
169     }
170 
171     interface SendBuffer {
172         boolean finished();
173         long writtenBytes();
174         long totalBytes();
175 
176         long transferTo(WritableByteChannel ch) throws IOException;
177         long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException;
178 
179         void release();
180     }
181 
182     class UnpooledSendBuffer implements SendBuffer {
183 
184         final ByteBuffer buffer;
185         final int initialPos;
186 
187         UnpooledSendBuffer(ByteBuffer buffer) {
188             this.buffer = buffer;
189             initialPos = buffer.position();
190         }
191 
192         public final boolean finished() {
193             return !buffer.hasRemaining();
194         }
195 
196         public final long writtenBytes() {
197             return buffer.position() - initialPos;
198         }
199 
200         public final long totalBytes() {
201             return buffer.limit() - initialPos;
202         }
203 
204         public final long transferTo(WritableByteChannel ch) throws IOException {
205             return ch.write(buffer);
206         }
207 
208         public final long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
209             return ch.send(buffer, raddr);
210         }
211 
212         public void release() {
213             // Unpooled.
214         }
215     }
216 
217     final class PooledSendBuffer extends UnpooledSendBuffer {
218 
219         private final Preallocation parent;
220 
221         PooledSendBuffer(Preallocation parent, ByteBuffer buffer) {
222             super(buffer);
223             this.parent = parent;
224         }
225 
226         @Override
227         public void release() {
228             final Preallocation parent = this.parent;
229             if (-- parent.refCnt == 0) {
230                 parent.buffer.clear();
231                 if (parent != current) {
232                     poolHead = new PreallocationRef(parent, poolHead);
233                 }
234             }
235         }
236     }
237 
238     class GatheringSendBuffer implements SendBuffer {
239 
240         private final ByteBuffer[] buffers;
241         private final int last;
242         private long written;
243         private final int total;
244 
245         GatheringSendBuffer(ByteBuffer[] buffers) {
246             this.buffers = buffers;
247             last = buffers.length - 1;
248             int total = 0;
249             for (ByteBuffer buf: buffers) {
250                 total += buf.remaining();
251             }
252             this.total = total;
253         }
254 
255         public boolean finished() {
256             return !buffers[last].hasRemaining();
257         }
258 
259         public long writtenBytes() {
260             return written;
261         }
262 
263         public long totalBytes() {
264             return total;
265         }
266 
267         public long transferTo(WritableByteChannel ch) throws IOException {
268             if (ch instanceof GatheringByteChannel) {
269                  long w = ((GatheringByteChannel) ch).write(buffers);
270                  written += w;
271                  return w;
272             } else {
273                 int send = 0;
274                 for (ByteBuffer buf: buffers) {
275                     if (buf.hasRemaining()) {
276                         int w = ch.write(buf);
277                         if (w == 0) {
278                             break;
279                         } else {
280                             send += w;
281                         }
282                     }
283                 }
284                 written += send;
285                 return send;
286             }
287         }
288 
289         public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
290             int send = 0;
291             for (ByteBuffer buf: buffers) {
292                 if (buf.hasRemaining()) {
293                     int w = ch.send(buf, raddr);
294                     if (w == 0) {
295                         break;
296                     } else {
297                         send += w;
298                     }
299                 }
300             }
301             written += send;
302 
303             return send;
304         }
305 
306         public void release() {
307             // nothing todo
308         }
309 
310     }
311 
312     final class FileSendBuffer implements SendBuffer {
313 
314         private final FileRegion file;
315         private long writtenBytes;
316 
317 
318         FileSendBuffer(FileRegion file) {
319             this.file = file;
320         }
321 
322         public boolean finished() {
323             return writtenBytes >= file.getCount();
324         }
325 
326         public long writtenBytes() {
327             return writtenBytes;
328         }
329 
330         public long totalBytes() {
331             return file.getCount();
332         }
333 
334         public long transferTo(WritableByteChannel ch) throws IOException {
335             long localWrittenBytes = file.transferTo(ch, writtenBytes);
336             writtenBytes += localWrittenBytes;
337             return localWrittenBytes;
338         }
339 
340         public long transferTo(DatagramChannel ch, SocketAddress raddr)
341                 throws IOException {
342             throw new UnsupportedOperationException();
343         }
344 
345         public void release() {
346             if (file instanceof DefaultFileRegion) {
347                 if (((DefaultFileRegion) file).releaseAfterTransfer()) {
348                     // Make sure the FileRegion resource are released otherwise it may cause a FD
349                     // leak or something similar
350                     file.releaseExternalResources();
351                 }
352             }
353         }
354     }
355 
356     static final class EmptySendBuffer implements SendBuffer {
357 
358         EmptySendBuffer() {
359             super();
360         }
361 
362         public boolean finished() {
363             return true;
364         }
365 
366         public long writtenBytes() {
367             return 0;
368         }
369 
370         public long totalBytes() {
371             return 0;
372         }
373 
374         public long transferTo(WritableByteChannel ch) throws IOException {
375             return 0;
376         }
377 
378         public long transferTo(DatagramChannel ch, SocketAddress raddr) throws IOException {
379             return 0;
380         }
381 
382         public void release() {
383             // Unpooled.
384         }
385     }
386 }