View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.spdy;
17  
18  import java.util.Comparator;
19  import java.util.Map;
20  import java.util.Set;
21  import java.util.TreeSet;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.jboss.netty.channel.MessageEvent;
27  
28  final class SpdySession {
29  
30      private static final SpdyProtocolException STREAM_CLOSED = new SpdyProtocolException("Stream closed");
31  
32      private final Map<Integer, StreamState> activeStreams =
33          new ConcurrentHashMap<Integer, StreamState>();
34  
35      SpdySession() {
36      }
37  
38      int numActiveStreams() {
39          return activeStreams.size();
40      }
41  
42      boolean noActiveStreams() {
43          return activeStreams.isEmpty();
44      }
45  
46      boolean isActiveStream(int streamID) {
47          return activeStreams.containsKey(streamID);
48      }
49  
50      // Stream-IDs should be iterated in priority order
51      Set<Integer> getActiveStreams() {
52          TreeSet<Integer> StreamIDs = new TreeSet<Integer>(new PriorityComparator());
53          StreamIDs.addAll(activeStreams.keySet());
54          return StreamIDs;
55      }
56  
57      void acceptStream(
58              int streamID, byte priority, boolean remoteSideClosed, boolean localSideClosed,
59              int sendWindowSize, int receiveWindowSize) {
60          if (!remoteSideClosed || !localSideClosed) {
61              activeStreams.put(
62                      streamID,
63                      new StreamState(priority, remoteSideClosed, localSideClosed, sendWindowSize, receiveWindowSize));
64          }
65      }
66  
67      void removeStream(int streamID) {
68          Integer StreamID = streamID;
69          StreamState state = activeStreams.get(StreamID);
70          activeStreams.remove(StreamID);
71          if (state != null) {
72              MessageEvent e = state.removePendingWrite();
73              while (e != null) {
74                  e.getFuture().setFailure(STREAM_CLOSED);
75                  e = state.removePendingWrite();
76              }
77          }
78      }
79  
80      boolean isRemoteSideClosed(int streamID) {
81          StreamState state = activeStreams.get(streamID);
82          return state == null || state.isRemoteSideClosed();
83      }
84  
85      void closeRemoteSide(int streamID) {
86          Integer StreamID = streamID;
87          StreamState state = activeStreams.get(StreamID);
88          if (state != null) {
89              state.closeRemoteSide();
90              if (state.isLocalSideClosed()) {
91                  activeStreams.remove(StreamID);
92              }
93          }
94      }
95  
96      boolean isLocalSideClosed(int streamID) {
97          StreamState state = activeStreams.get(streamID);
98          return state == null || state.isLocalSideClosed();
99      }
100 
101     void closeLocalSide(int streamID) {
102         Integer StreamID = streamID;
103         StreamState state = activeStreams.get(StreamID);
104         if (state != null) {
105             state.closeLocalSide();
106             if (state.isRemoteSideClosed()) {
107                 activeStreams.remove(StreamID);
108             }
109         }
110     }
111 
112     /*
113      * hasReceivedReply and receivedReply are only called from messageReceived
114      * no need to synchronize access to the StreamState
115      */
116 
117     boolean hasReceivedReply(int streamID) {
118         StreamState state = activeStreams.get(streamID);
119         return state != null && state.hasReceivedReply();
120     }
121 
122     void receivedReply(int streamID) {
123         StreamState state = activeStreams.get(streamID);
124         if (state != null) {
125             state.receivedReply();
126         }
127     }
128 
129     int getSendWindowSize(int streamID) {
130         StreamState state = activeStreams.get(streamID);
131         return state != null ? state.getSendWindowSize() : -1;
132     }
133 
134     int updateSendWindowSize(int streamID, int deltaWindowSize) {
135         StreamState state = activeStreams.get(streamID);
136         return state != null ? state.updateSendWindowSize(deltaWindowSize) : -1;
137     }
138 
139     int updateReceiveWindowSize(int streamID, int deltaWindowSize) {
140         StreamState state = activeStreams.get(streamID);
141         if (deltaWindowSize > 0) {
142             state.setReceiveWindowSizeLowerBound(0);
143         }
144         return state != null ? state.updateReceiveWindowSize(deltaWindowSize) : -1;
145     }
146 
147     int getReceiveWindowSizeLowerBound(int streamID) {
148         StreamState state = activeStreams.get(streamID);
149         return state != null ? state.getReceiveWindowSizeLowerBound() : 0;
150     }
151 
152     void updateAllReceiveWindowSizes(int deltaWindowSize) {
153         for (StreamState state: activeStreams.values()) {
154             state.updateReceiveWindowSize(deltaWindowSize);
155             if (deltaWindowSize < 0) {
156                 state.setReceiveWindowSizeLowerBound(deltaWindowSize);
157             }
158         }
159     }
160 
161     boolean putPendingWrite(int streamID, MessageEvent evt) {
162         StreamState state = activeStreams.get(streamID);
163         return state != null && state.putPendingWrite(evt);
164     }
165 
166     MessageEvent getPendingWrite(int streamID) {
167         StreamState state = activeStreams.get(streamID);
168         return state != null ? state.getPendingWrite() : null;
169     }
170 
171     MessageEvent removePendingWrite(int streamID) {
172         StreamState state = activeStreams.get(streamID);
173         return state != null ? state.removePendingWrite() : null;
174     }
175 
176     private static final class StreamState {
177 
178         private final byte priority;
179         private volatile boolean remoteSideClosed;
180         private volatile boolean localSideClosed;
181         private boolean receivedReply;
182         private final AtomicInteger sendWindowSize;
183         private final AtomicInteger receiveWindowSize;
184         private volatile int receiveWindowSizeLowerBound;
185         private final ConcurrentLinkedQueue<MessageEvent> pendingWriteQueue =
186                 new ConcurrentLinkedQueue<MessageEvent>();
187 
188         StreamState(
189                 byte priority, boolean remoteSideClosed, boolean localSideClosed,
190                 int sendWindowSize, int receiveWindowSize) {
191             this.priority = priority;
192             this.remoteSideClosed = remoteSideClosed;
193             this.localSideClosed = localSideClosed;
194             this.sendWindowSize = new AtomicInteger(sendWindowSize);
195             this.receiveWindowSize = new AtomicInteger(receiveWindowSize);
196         }
197 
198         byte getPriority() {
199             return priority;
200         }
201 
202         boolean isRemoteSideClosed() {
203             return remoteSideClosed;
204         }
205 
206         void closeRemoteSide() {
207             remoteSideClosed = true;
208         }
209 
210         boolean isLocalSideClosed() {
211             return localSideClosed;
212         }
213 
214         void closeLocalSide() {
215             localSideClosed = true;
216         }
217 
218         boolean hasReceivedReply() {
219             return receivedReply;
220         }
221 
222         void receivedReply() {
223             receivedReply = true;
224         }
225 
226         int getSendWindowSize() {
227             return sendWindowSize.get();
228         }
229 
230         int updateSendWindowSize(int deltaWindowSize) {
231             return sendWindowSize.addAndGet(deltaWindowSize);
232         }
233 
234         int updateReceiveWindowSize(int deltaWindowSize) {
235             return receiveWindowSize.addAndGet(deltaWindowSize);
236         }
237 
238         int getReceiveWindowSizeLowerBound() {
239             return receiveWindowSizeLowerBound;
240         }
241 
242         void setReceiveWindowSizeLowerBound(int receiveWindowSizeLowerBound) {
243             this.receiveWindowSizeLowerBound = receiveWindowSizeLowerBound;
244         }
245 
246         boolean putPendingWrite(MessageEvent evt) {
247             return pendingWriteQueue.offer(evt);
248         }
249 
250         MessageEvent getPendingWrite() {
251             return pendingWriteQueue.peek();
252         }
253 
254         MessageEvent removePendingWrite() {
255             return pendingWriteQueue.poll();
256         }
257     }
258 
259     private final class PriorityComparator implements Comparator<Integer> {
260 
261         PriorityComparator() {
262             super();
263         }
264 
265         public int compare(Integer id1, Integer id2) {
266             StreamState state1 = activeStreams.get(id1);
267             StreamState state2 = activeStreams.get(id2);
268             return state1.getPriority() - state2.getPriority();
269         }
270     }
271 }