1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.jboss.netty.channel.socket.nio;
18
19 import org.jboss.netty.channel.socket.Worker;
20 import org.jboss.netty.logging.InternalLogger;
21 import org.jboss.netty.logging.InternalLoggerFactory;
22 import org.jboss.netty.util.ExternalResourceReleasable;
23 import org.jboss.netty.util.internal.ExecutorUtil;
24
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29
30
31
32
33 public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
34 implements WorkerPool<E>, ExternalResourceReleasable {
35
36
37
38
39 private static final int INITIALIZATION_TIMEOUT = 10;
40
41 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractNioWorkerPool.class);
42
43 private final AbstractNioWorker[] workers;
44 private final AtomicInteger workerIndex = new AtomicInteger();
45 private final Executor workerExecutor;
46 private volatile boolean initialized;
47
48
49
50
51
52
53
54 AbstractNioWorkerPool(Executor workerExecutor, int workerCount) {
55 this(workerExecutor, workerCount, true);
56 }
57
58 AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
59 if (workerExecutor == null) {
60 throw new NullPointerException("workerExecutor");
61 }
62 if (workerCount <= 0) {
63 throw new IllegalArgumentException(
64 "workerCount (" + workerCount + ") " + "must be a positive integer.");
65 }
66 workers = new AbstractNioWorker[workerCount];
67 this.workerExecutor = workerExecutor;
68 if (autoInit) {
69 init();
70 }
71 }
72
73 protected void init() {
74 if (initialized) {
75 throw new IllegalStateException("initialized already");
76 }
77
78 initialized = true;
79
80 for (int i = 0; i < workers.length; i++) {
81 workers[i] = newWorker(workerExecutor);
82 }
83
84 waitForWorkerThreads();
85 }
86
87 private void waitForWorkerThreads() {
88 long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(INITIALIZATION_TIMEOUT);
89 boolean warn = false;
90 for (AbstractNioSelector worker: workers) {
91 long waitTime = deadline - System.nanoTime();
92 try {
93 if (waitTime <= 0) {
94 if (worker.thread == null) {
95 warn = true;
96 break;
97 }
98 } else if (!worker.startupLatch.await(waitTime, TimeUnit.NANOSECONDS)) {
99 warn = true;
100 break;
101 }
102 } catch (InterruptedException ignore) {
103
104 Thread.currentThread().interrupt();
105 break;
106 }
107 }
108
109 if (warn) {
110 logger.warn(
111 "Failed to get all worker threads ready within " + INITIALIZATION_TIMEOUT + " second(s). " +
112 "Make sure to specify the executor which has more threads than the requested workerCount. " +
113 "If unsure, use Executors.newCachedThreadPool().");
114 }
115 }
116
117
118
119
120
121
122
123
124
125
126 @Deprecated
127 protected E createWorker(Executor executor) {
128 throw new IllegalStateException("This will be removed. Override this and the newWorker(..) method!");
129 }
130
131
132
133
134
135
136
137
138
139
140
141 @SuppressWarnings("deprecation")
142 protected E newWorker(Executor executor) {
143 return createWorker(executor);
144 }
145
146 @SuppressWarnings("unchecked")
147 public E nextWorker() {
148 return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
149 }
150
151 public void rebuildSelectors() {
152 for (AbstractNioWorker worker: workers) {
153 worker.rebuildSelector();
154 }
155 }
156
157 public void releaseExternalResources() {
158 shutdown();
159 ExecutorUtil.shutdownNow(workerExecutor);
160 }
161
162 public void shutdown() {
163 for (AbstractNioWorker worker: workers) {
164 worker.shutdown();
165 }
166 }
167
168 }