00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020 #ifndef _PASSENGER_STANDARD_APPLICATION_POOL_H_
00021 #define _PASSENGER_STANDARD_APPLICATION_POOL_H_
00022
00023 #include <boost/shared_ptr.hpp>
00024 #include <boost/weak_ptr.hpp>
00025 #include <boost/thread.hpp>
00026 #include <boost/bind.hpp>
00027 #include <boost/date_time/microsec_time_clock.hpp>
00028 #include <boost/date_time/posix_time/posix_time.hpp>
00029
00030 #include <oxt/system_calls.hpp>
00031 #include <oxt/backtrace.hpp>
00032
00033 #include <string>
00034 #include <sstream>
00035 #include <map>
00036 #include <list>
00037
00038 #include <sys/types.h>
00039 #include <sys/stat.h>
00040 #include <stdio.h>
00041 #include <unistd.h>
00042 #include <ctime>
00043 #include <cerrno>
00044 #ifdef TESTING_APPLICATION_POOL
00045 #include <cstdlib>
00046 #endif
00047
00048 #include "ApplicationPool.h"
00049 #include "Logging.h"
00050 #include "FileChecker.h"
00051 #include "CachedFileStat.h"
00052 #ifdef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00053 #include "DummySpawnManager.h"
00054 #else
00055 #include "SpawnManager.h"
00056 #endif
00057
00058 namespace Passenger {
00059
00060 using namespace std;
00061 using namespace boost;
00062 using namespace oxt;
00063
00064 class ApplicationPoolServer;
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 class StandardApplicationPool: public ApplicationPool {
00098 private:
00099 static const int DEFAULT_MAX_IDLE_TIME = 120;
00100 static const int DEFAULT_MAX_POOL_SIZE = 20;
00101 static const int DEFAULT_MAX_INSTANCES_PER_APP = 0;
00102 static const int CLEANER_THREAD_STACK_SIZE = 1024 * 128;
00103 static const unsigned int MAX_GET_ATTEMPTS = 10;
00104 static const unsigned int GET_TIMEOUT = 5000;
00105
00106 friend class ApplicationPoolServer;
00107 struct Domain;
00108 struct AppContainer;
00109
00110 typedef shared_ptr<Domain> DomainPtr;
00111 typedef shared_ptr<AppContainer> AppContainerPtr;
00112 typedef list<AppContainerPtr> AppContainerList;
00113 typedef map<string, DomainPtr> DomainMap;
00114
00115 struct Domain {
00116 AppContainerList instances;
00117 unsigned int size;
00118 unsigned long maxRequests;
00119 FileChecker restartFileChecker;
00120 CachedFileStat alwaysRestartFileStatter;
00121
00122 Domain(const PoolOptions &options)
00123 : restartFileChecker(determineRestartDir(options) + "/restart.txt"),
00124 alwaysRestartFileStatter(determineRestartDir(options) + "/always_restart.txt")
00125 {
00126 }
00127
00128 private:
00129 static string determineRestartDir(const PoolOptions &options) {
00130 if (options.restartDir.empty()) {
00131 return options.appRoot + "/tmp";
00132 } else if (options.restartDir[0] == '/') {
00133 return options.restartDir;
00134 } else {
00135 return options.appRoot + "/" + options.restartDir;
00136 }
00137 }
00138 };
00139
00140 struct AppContainer {
00141 ApplicationPtr app;
00142 time_t startTime;
00143 time_t lastUsed;
00144 unsigned int sessions;
00145 unsigned int processed;
00146 AppContainerList::iterator iterator;
00147 AppContainerList::iterator ia_iterator;
00148
00149 AppContainer() {
00150 startTime = time(NULL);
00151 processed = 0;
00152 }
00153
00154
00155
00156
00157 string uptime() const {
00158 time_t seconds = time(NULL) - startTime;
00159 stringstream result;
00160
00161 if (seconds >= 60) {
00162 time_t minutes = seconds / 60;
00163 if (minutes >= 60) {
00164 time_t hours = minutes / 60;
00165 minutes = minutes % 60;
00166 result << hours << "h ";
00167 }
00168
00169 seconds = seconds % 60;
00170 result << minutes << "m ";
00171 }
00172 result << seconds << "s";
00173 return result.str();
00174 }
00175 };
00176
00177
00178
00179
00180
00181
00182
00183 struct SharedData {
00184 boost::mutex lock;
00185 condition activeOrMaxChanged;
00186
00187 DomainMap domains;
00188 unsigned int max;
00189 unsigned int count;
00190 unsigned int active;
00191 unsigned int maxPerApp;
00192 AppContainerList inactiveApps;
00193 map<string, unsigned int> appInstanceCount;
00194 };
00195
00196 typedef shared_ptr<SharedData> SharedDataPtr;
00197
00198
00199
00200
00201 struct SessionCloseCallback {
00202 SharedDataPtr data;
00203 weak_ptr<AppContainer> container;
00204
00205 SessionCloseCallback(SharedDataPtr data,
00206 const weak_ptr<AppContainer> &container) {
00207 this->data = data;
00208 this->container = container;
00209 }
00210
00211 void operator()() {
00212 boost::mutex::scoped_lock l(data->lock);
00213 AppContainerPtr container(this->container.lock());
00214
00215 if (container == NULL) {
00216 return;
00217 }
00218
00219 DomainMap::iterator it;
00220 it = data->domains.find(container->app->getAppRoot());
00221 if (it != data->domains.end()) {
00222 Domain *domain = it->second.get();
00223 AppContainerList *instances = &domain->instances;
00224
00225 container->processed++;
00226 if (domain->maxRequests > 0 && container->processed >= domain->maxRequests) {
00227 instances->erase(container->iterator);
00228 domain->size--;
00229 if (instances->empty()) {
00230 data->domains.erase(container->app->getAppRoot());
00231 }
00232 data->count--;
00233 data->active--;
00234 data->activeOrMaxChanged.notify_all();
00235 } else {
00236 container->lastUsed = time(NULL);
00237 container->sessions--;
00238 if (container->sessions == 0) {
00239 instances->erase(container->iterator);
00240 instances->push_front(container);
00241 container->iterator = instances->begin();
00242 data->inactiveApps.push_back(container);
00243 container->ia_iterator = data->inactiveApps.end();
00244 container->ia_iterator--;
00245 data->active--;
00246 data->activeOrMaxChanged.notify_all();
00247 }
00248 }
00249 }
00250 }
00251 };
00252
00253 #ifdef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00254 DummySpawnManager spawnManager;
00255 #else
00256 SpawnManager spawnManager;
00257 #endif
00258 SharedDataPtr data;
00259 boost::thread *cleanerThread;
00260 bool detached;
00261 bool done;
00262 unsigned int maxIdleTime;
00263 unsigned int waitingOnGlobalQueue;
00264 condition cleanerThreadSleeper;
00265
00266
00267 boost::mutex &lock;
00268 condition &activeOrMaxChanged;
00269 DomainMap &domains;
00270 unsigned int &max;
00271 unsigned int &count;
00272 unsigned int &active;
00273 unsigned int &maxPerApp;
00274 AppContainerList &inactiveApps;
00275 map<string, unsigned int> &appInstanceCount;
00276
00277
00278
00279
00280 bool inline verifyState() {
00281 #if PASSENGER_DEBUG
00282
00283 DomainMap::const_iterator it;
00284 unsigned int totalSize = 0;
00285 for (it = domains.begin(); it != domains.end(); it++) {
00286 const string &appRoot = it->first;
00287 Domain *domain = it->second.get();
00288 AppContainerList *instances = &domain->instances;
00289
00290 P_ASSERT(domain->size <= count, false,
00291 "domains['" << appRoot << "'].size (" << domain->size <<
00292 ") <= count (" << count << ")");
00293 totalSize += domain->size;
00294
00295
00296
00297 P_ASSERT(!instances->empty(), false,
00298 "domains['" << appRoot << "'].instances is nonempty.");
00299
00300 AppContainerList::const_iterator prev_lit;
00301 AppContainerList::const_iterator lit;
00302 prev_lit = instances->begin();
00303 lit = prev_lit;
00304 lit++;
00305 for (; lit != instances->end(); lit++) {
00306 if ((*prev_lit)->sessions > 0) {
00307 P_ASSERT((*lit)->sessions > 0, false,
00308 "domains['" << appRoot << "'].instances "
00309 "is sorted from nonactive to active");
00310 }
00311 }
00312 }
00313 P_ASSERT(totalSize == count, false, "(sum of all d.size in domains) == count");
00314
00315 P_ASSERT(active <= count, false,
00316 "active (" << active << ") < count (" << count << ")");
00317 P_ASSERT(inactiveApps.size() == count - active, false,
00318 "inactive_apps.size() == count - active");
00319 #endif
00320 return true;
00321 }
00322
00323 string toStringWithoutLock() const {
00324 stringstream result;
00325
00326 result << "----------- General information -----------" << endl;
00327 result << "max = " << max << endl;
00328 result << "count = " << count << endl;
00329 result << "active = " << active << endl;
00330 result << "inactive = " << inactiveApps.size() << endl;
00331 result << "Waiting on global queue: " << waitingOnGlobalQueue << endl;
00332 result << endl;
00333
00334 result << "----------- Domains -----------" << endl;
00335 DomainMap::const_iterator it;
00336 for (it = domains.begin(); it != domains.end(); it++) {
00337 Domain *domain = it->second.get();
00338 AppContainerList *instances = &domain->instances;
00339 AppContainerList::const_iterator lit;
00340
00341 result << it->first << ": " << endl;
00342 for (lit = instances->begin(); lit != instances->end(); lit++) {
00343 AppContainer *container = lit->get();
00344 char buf[128];
00345
00346 snprintf(buf, sizeof(buf),
00347 "PID: %-5lu Sessions: %-2u Processed: %-5u Uptime: %s",
00348 (unsigned long) container->app->getPid(),
00349 container->sessions,
00350 container->processed,
00351 container->uptime().c_str());
00352 result << " " << buf << endl;
00353 }
00354 result << endl;
00355 }
00356 return result.str();
00357 }
00358
00359 bool needsRestart(const string &appRoot, Domain *domain, const PoolOptions &options) {
00360 return domain->alwaysRestartFileStatter.refresh(options.statThrottleRate) == 0
00361 || domain->restartFileChecker.changed(options.statThrottleRate);
00362 }
00363
00364 void cleanerThreadMainLoop() {
00365 this_thread::disable_syscall_interruption dsi;
00366 unique_lock<boost::mutex> l(lock);
00367 try {
00368 while (!done && !this_thread::interruption_requested()) {
00369 xtime xt;
00370 xtime_get(&xt, TIME_UTC);
00371 xt.sec += maxIdleTime + 1;
00372 if (cleanerThreadSleeper.timed_wait(l, xt)) {
00373
00374 if (done) {
00375
00376 break;
00377 } else {
00378
00379 continue;
00380 }
00381 }
00382
00383 time_t now = syscalls::time(NULL);
00384 AppContainerList::iterator it;
00385 for (it = inactiveApps.begin(); it != inactiveApps.end(); it++) {
00386 AppContainer &container(*it->get());
00387 ApplicationPtr app(container.app);
00388 Domain *domain = domains[app->getAppRoot()].get();
00389 AppContainerList *instances = &domain->instances;
00390
00391 if (maxIdleTime > 0 &&
00392 (now - container.lastUsed > (time_t) maxIdleTime)) {
00393 P_DEBUG("Cleaning idle app " << app->getAppRoot() <<
00394 " (PID " << app->getPid() << ")");
00395 instances->erase(container.iterator);
00396
00397 AppContainerList::iterator prev = it;
00398 prev--;
00399 inactiveApps.erase(it);
00400 it = prev;
00401
00402 domain->size--;
00403
00404 count--;
00405 }
00406 if (instances->empty()) {
00407 domains.erase(app->getAppRoot());
00408 }
00409 }
00410 }
00411 } catch (const exception &e) {
00412 P_ERROR("Uncaught exception: " << e.what());
00413 }
00414 }
00415
00416
00417
00418
00419
00420
00421
00422
00423 pair<AppContainerPtr, Domain *>
00424 spawnOrUseExisting(boost::mutex::scoped_lock &l, const PoolOptions &options) {
00425 beginning_of_function:
00426
00427 TRACE_POINT();
00428 this_thread::disable_interruption di;
00429 this_thread::disable_syscall_interruption dsi;
00430 const string &appRoot(options.appRoot);
00431 AppContainerPtr container;
00432 Domain *domain;
00433 AppContainerList *instances;
00434
00435 try {
00436 DomainMap::iterator it(domains.find(appRoot));
00437
00438 if (it != domains.end() && needsRestart(appRoot, it->second.get(), options)) {
00439 AppContainerList::iterator it2;
00440 instances = &it->second->instances;
00441 for (it2 = instances->begin(); it2 != instances->end(); it2++) {
00442 container = *it2;
00443 if (container->sessions == 0) {
00444 inactiveApps.erase(container->ia_iterator);
00445 } else {
00446 active--;
00447 }
00448 it2--;
00449 instances->erase(container->iterator);
00450 count--;
00451 }
00452 domains.erase(appRoot);
00453 spawnManager.reload(appRoot);
00454 it = domains.end();
00455 activeOrMaxChanged.notify_all();
00456 }
00457
00458 if (it != domains.end()) {
00459 domain = it->second.get();
00460 instances = &domain->instances;
00461
00462 if (instances->front()->sessions == 0) {
00463 container = instances->front();
00464 instances->pop_front();
00465 instances->push_back(container);
00466 container->iterator = instances->end();
00467 container->iterator--;
00468 inactiveApps.erase(container->ia_iterator);
00469 active++;
00470 activeOrMaxChanged.notify_all();
00471 } else if (count >= max || (
00472 maxPerApp != 0 && domain->size >= maxPerApp )
00473 ) {
00474 if (options.useGlobalQueue) {
00475 UPDATE_TRACE_POINT();
00476 waitingOnGlobalQueue++;
00477 activeOrMaxChanged.wait(l);
00478 waitingOnGlobalQueue--;
00479 goto beginning_of_function;
00480 } else {
00481 AppContainerList::iterator it(instances->begin());
00482 AppContainerList::iterator smallest(instances->begin());
00483 it++;
00484 for (; it != instances->end(); it++) {
00485 if ((*it)->sessions < (*smallest)->sessions) {
00486 smallest = it;
00487 }
00488 }
00489 container = *smallest;
00490 instances->erase(smallest);
00491 instances->push_back(container);
00492 container->iterator = instances->end();
00493 container->iterator--;
00494 }
00495 } else {
00496 container = ptr(new AppContainer());
00497 {
00498 this_thread::restore_interruption ri(di);
00499 this_thread::restore_syscall_interruption rsi(dsi);
00500 container->app = spawnManager.spawn(options);
00501 }
00502 container->sessions = 0;
00503 instances->push_back(container);
00504 container->iterator = instances->end();
00505 container->iterator--;
00506 domain->size++;
00507 count++;
00508 active++;
00509 activeOrMaxChanged.notify_all();
00510 }
00511 } else {
00512 if (active >= max) {
00513 UPDATE_TRACE_POINT();
00514 activeOrMaxChanged.wait(l);
00515 goto beginning_of_function;
00516 } else if (count == max) {
00517 container = inactiveApps.front();
00518 inactiveApps.pop_front();
00519 domain = domains[container->app->getAppRoot()].get();
00520 instances = &domain->instances;
00521 instances->erase(container->iterator);
00522 if (instances->empty()) {
00523 domains.erase(container->app->getAppRoot());
00524 } else {
00525 domain->size--;
00526 }
00527 count--;
00528 }
00529
00530 UPDATE_TRACE_POINT();
00531 container = ptr(new AppContainer());
00532 {
00533 this_thread::restore_interruption ri(di);
00534 this_thread::restore_syscall_interruption rsi(dsi);
00535 container->app = spawnManager.spawn(options);
00536 }
00537 container->sessions = 0;
00538 it = domains.find(appRoot);
00539 if (it == domains.end()) {
00540 domain = new Domain(options);
00541 domain->size = 1;
00542 domain->maxRequests = options.maxRequests;
00543 domains[appRoot] = ptr(domain);
00544 } else {
00545 domain = it->second.get();
00546 domain->size++;
00547 }
00548 instances = &domain->instances;
00549 instances->push_back(container);
00550 container->iterator = instances->end();
00551 container->iterator--;
00552 count++;
00553 active++;
00554 activeOrMaxChanged.notify_all();
00555 }
00556 } catch (const SpawnException &e) {
00557 string message("Cannot spawn application '");
00558 message.append(appRoot);
00559 message.append("': ");
00560 message.append(e.what());
00561 if (e.hasErrorPage()) {
00562 throw SpawnException(message, e.getErrorPage());
00563 } else {
00564 throw SpawnException(message);
00565 }
00566 } catch (const exception &e) {
00567 string message("Cannot spawn application '");
00568 message.append(appRoot);
00569 message.append("': ");
00570 message.append(e.what());
00571 throw SpawnException(message);
00572 }
00573
00574 return make_pair(container, domain);
00575 }
00576
00577 public:
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597 StandardApplicationPool(const string &spawnServerCommand,
00598 const string &logFile = "",
00599 const string &rubyCommand = "ruby",
00600 const string &user = "")
00601 :
00602 #ifndef PASSENGER_USE_DUMMY_SPAWN_MANAGER
00603 spawnManager(spawnServerCommand, logFile, rubyCommand, user),
00604 #endif
00605 data(new SharedData()),
00606 lock(data->lock),
00607 activeOrMaxChanged(data->activeOrMaxChanged),
00608 domains(data->domains),
00609 max(data->max),
00610 count(data->count),
00611 active(data->active),
00612 maxPerApp(data->maxPerApp),
00613 inactiveApps(data->inactiveApps),
00614 appInstanceCount(data->appInstanceCount)
00615 {
00616 TRACE_POINT();
00617 detached = false;
00618 done = false;
00619 max = DEFAULT_MAX_POOL_SIZE;
00620 count = 0;
00621 active = 0;
00622 waitingOnGlobalQueue = 0;
00623 maxPerApp = DEFAULT_MAX_INSTANCES_PER_APP;
00624 maxIdleTime = DEFAULT_MAX_IDLE_TIME;
00625 cleanerThread = new boost::thread(
00626 bind(&StandardApplicationPool::cleanerThreadMainLoop, this),
00627 CLEANER_THREAD_STACK_SIZE
00628 );
00629 }
00630
00631 virtual ~StandardApplicationPool() {
00632 if (!detached) {
00633 this_thread::disable_interruption di;
00634 {
00635 boost::mutex::scoped_lock l(lock);
00636 done = true;
00637 cleanerThreadSleeper.notify_one();
00638 }
00639 cleanerThread->join();
00640 }
00641 delete cleanerThread;
00642 }
00643
00644 virtual Application::SessionPtr get(const string &appRoot) {
00645 return ApplicationPool::get(appRoot);
00646 }
00647
00648 virtual Application::SessionPtr get(const PoolOptions &options) {
00649 TRACE_POINT();
00650 using namespace boost::posix_time;
00651 unsigned int attempt = 0;
00652
00653
00654
00655 unique_lock<boost::mutex> l(lock);
00656
00657 while (true) {
00658 attempt++;
00659
00660 pair<AppContainerPtr, Domain *> p(
00661 spawnOrUseExisting(l, options)
00662 );
00663 AppContainerPtr &container = p.first;
00664 Domain *domain = p.second;
00665
00666 container->lastUsed = time(NULL);
00667 container->sessions++;
00668
00669 P_ASSERT(verifyState(), Application::SessionPtr(),
00670 "State is valid:\n" << toString(false));
00671 try {
00672 UPDATE_TRACE_POINT();
00673 return container->app->connect(SessionCloseCallback(data, container));
00674 } catch (const exception &e) {
00675 container->sessions--;
00676
00677 AppContainerList &instances(domain->instances);
00678 instances.erase(container->iterator);
00679 domain->size--;
00680 if (instances.empty()) {
00681 domains.erase(options.appRoot);
00682 }
00683 count--;
00684 active--;
00685 activeOrMaxChanged.notify_all();
00686 P_ASSERT(verifyState(), Application::SessionPtr(),
00687 "State is valid: " << toString(false));
00688 if (attempt == MAX_GET_ATTEMPTS) {
00689 string message("Cannot connect to an existing "
00690 "application instance for '");
00691 message.append(options.appRoot);
00692 message.append("': ");
00693 try {
00694 const SystemException &syse =
00695 dynamic_cast<const SystemException &>(e);
00696 message.append(syse.sys());
00697 } catch (const bad_cast &) {
00698 message.append(e.what());
00699 }
00700 throw IOException(message);
00701 }
00702 }
00703 }
00704
00705 return Application::SessionPtr();
00706 }
00707
00708 virtual void clear() {
00709 boost::mutex::scoped_lock l(lock);
00710 domains.clear();
00711 inactiveApps.clear();
00712 appInstanceCount.clear();
00713 count = 0;
00714 active = 0;
00715 activeOrMaxChanged.notify_all();
00716 }
00717
00718 virtual void setMaxIdleTime(unsigned int seconds) {
00719 boost::mutex::scoped_lock l(lock);
00720 maxIdleTime = seconds;
00721 cleanerThreadSleeper.notify_one();
00722 }
00723
00724 virtual void setMax(unsigned int max) {
00725 boost::mutex::scoped_lock l(lock);
00726 this->max = max;
00727 activeOrMaxChanged.notify_all();
00728 }
00729
00730 virtual unsigned int getActive() const {
00731 return active;
00732 }
00733
00734 virtual unsigned int getCount() const {
00735 return count;
00736 }
00737
00738 virtual void setMaxPerApp(unsigned int maxPerApp) {
00739 boost::mutex::scoped_lock l(lock);
00740 this->maxPerApp = maxPerApp;
00741 activeOrMaxChanged.notify_all();
00742 }
00743
00744 virtual pid_t getSpawnServerPid() const {
00745 return spawnManager.getServerPid();
00746 }
00747
00748
00749
00750
00751
00752 virtual string toString(bool lockMutex = true) const {
00753 if (lockMutex) {
00754 unique_lock<boost::mutex> l(lock);
00755 return toStringWithoutLock();
00756 } else {
00757 return toStringWithoutLock();
00758 }
00759 }
00760
00761
00762
00763
00764
00765 virtual string toXml() const {
00766 unique_lock<boost::mutex> l(lock);
00767 stringstream result;
00768 DomainMap::const_iterator it;
00769
00770 result << "<?xml version=\"1.0\" encoding=\"iso8859-1\" ?>\n";
00771 result << "<info>";
00772
00773 result << "<domains>";
00774 for (it = domains.begin(); it != domains.end(); it++) {
00775 Domain *domain = it->second.get();
00776 AppContainerList *instances = &domain->instances;
00777 AppContainerList::const_iterator lit;
00778
00779 result << "<domain>";
00780 result << "<name>" << escapeForXml(it->first) << "</name>";
00781
00782 result << "<instances>";
00783 for (lit = instances->begin(); lit != instances->end(); lit++) {
00784 AppContainer *container = lit->get();
00785
00786 result << "<instance>";
00787 result << "<pid>" << container->app->getPid() << "</pid>";
00788 result << "<sessions>" << container->sessions << "</sessions>";
00789 result << "<processed>" << container->processed << "</processed>";
00790 result << "<uptime>" << container->uptime() << "</uptime>";
00791 result << "</instance>";
00792 }
00793 result << "</instances>";
00794
00795 result << "</domain>";
00796 }
00797 result << "</domains>";
00798
00799 result << "</info>";
00800 return result.str();
00801 }
00802 };
00803
00804 typedef shared_ptr<StandardApplicationPool> StandardApplicationPoolPtr;
00805
00806 }
00807
00808 #endif
00809