diff --git a/src/core/comm/SocketComm.cc b/src/core/comm/SocketComm.cc index cb764a2e8cde5aacc2ee4fc64061fcc8c52da8f4..bd74465d805dbfa284255994a749b59ae3258905 100644 --- a/src/core/comm/SocketComm.cc +++ b/src/core/comm/SocketComm.cc @@ -1,6 +1,7 @@ #include <string> #include <errno.h> #include <signal.h> +#include <poll.h> #include "SocketComm.hpp" @@ -63,6 +64,17 @@ char * SocketComm::getBuf(int sockfd, int *size) return buf; } +int SocketComm::timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout) +{ + struct pollfd pfd = {sockfd, POLLIN, 0}; + int ret = poll(&pfd, 1, timeout); + if (ret > 0) { + return accept(sockfd, addr, addrlen); + } + errno = EWOULDBLOCK; + return -1; +} + ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count) { ssize_t ret; diff --git a/src/core/comm/SocketComm.hpp b/src/core/comm/SocketComm.hpp index 74aad301bfa9d6858f7cfec91ca8b853df8fe6d4..56af18de2b91cc499c66749c8b93f783197d2f76 100644 --- a/src/core/comm/SocketComm.hpp +++ b/src/core/comm/SocketComm.hpp @@ -45,6 +45,16 @@ public: */ static bool dropMsg(int sockfd); + /** + * An accept() wrapper that times out (using poll(2)) + * @param sockfd listening socket descriptor to accept connections from + * @param addr same as accept()'s + * @param addrlen same as accept()'s + * @param timeout timeout in milliseconds (see poll(2)) + * \return < 0 on failure, > 0 for a new socket connection + */ + static int timedAccept(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int timeout); + private: static char * getBuf(int sockfd, int *size); static ssize_t safe_write(int fd, const void *buf, size_t count); diff --git a/src/core/config/CMakeLists.txt b/src/core/config/CMakeLists.txt index 4ee279f9568c7a02a80138a808373e6d92681611..dc30d25975511e552eb1056192bda4ac5db80552 100644 --- a/src/core/config/CMakeLists.txt +++ b/src/core/config/CMakeLists.txt @@ -25,13 +25,14 @@ OPTION(CONFIG_FAST_BREAKPOINTS "Enable fast breakpoints (requires break OPTION(CONFIG_FAST_WATCHPOINTS "Enable fast watchpoints (requires memory access events to be enabled)" OFF) SET(SERVER_COMM_HOSTNAME "localhost" CACHE STRING "Job-server hostname or IP") SET(SERVER_COMM_TCP_PORT "1111" CACHE STRING "Job-server TCP port") -SET(SERVER_OUT_QUEUE_SIZE "0" CACHE STRING "Queue size for outbound jobs (0 = unlimited)") +SET(SERVER_OUT_QUEUE_SIZE "10000" CACHE STRING "Queue size for outbound jobs (0 = unlimited)") SET(SERVER_PERF_LOG_PATH "perf.log" CACHE STRING "A file name for storing the server's performance log (CSV)") SET(SERVER_PERF_STEPPING_SEC "1" CACHE STRING "Stepping of performance measurements in seconds") SET(CLIENT_RAND_BACKOFF_TSTART "3" CACHE STRING "Lower limit of client's backoff phase in seconds") SET(CLIENT_RAND_BACKOFF_TEND "8" CACHE STRING "Upper limit of client's backoff phase in seconds") SET(CLIENT_RETRY_COUNT "3" CACHE STRING "Client's number of reconnect retries") SET(CLIENT_JOB_REQUEST_SEC "30" CACHE STRING "Time in seconds a client tries to get work for (to reduce client/server communication frequency)") +SET(CLIENT_JOB_INITIAL "1" CACHE STRING "Initial amount of jobs to request") SET(CLIENT_JOB_LIMIT "1000" CACHE STRING "How many jobs can a client ask for") configure_file(${CMAKE_CURRENT_SOURCE_DIR}/FailConfig.hpp.in diff --git a/src/core/config/FailConfig.hpp.in b/src/core/config/FailConfig.hpp.in index 78fbe3c3fca1252d565f67008587f0c1846d6972..ab64f6cc390a19a7ca236b697b6594e0777a2b6c 100644 --- a/src/core/config/FailConfig.hpp.in +++ b/src/core/config/FailConfig.hpp.in @@ -43,6 +43,7 @@ #define CLIENT_RETRY_COUNT @CLIENT_RETRY_COUNT@ #define CLIENT_JOB_REQUEST_SEC @CLIENT_JOB_REQUEST_SEC@ #define CLIENT_JOB_LIMIT @CLIENT_JOB_LIMIT@ +#define CLIENT_JOB_INITIAL @CLIENT_JOB_INITIAL@ #define PROJECT_VERSION "@PROJECT_VERSION@" #define FAIL_VERSION PROJECT_VERSION diff --git a/src/core/cpn/JobServer.cc b/src/core/cpn/JobServer.cc index b9450e4e152350fef94c80ee3a7b1b2c0c7add0a..250e6e566f6e4a0cd35cc0fa8b79524f4d465c6a 100644 --- a/src/core/cpn/JobServer.cc +++ b/src/core/cpn/JobServer.cc @@ -34,28 +34,33 @@ void JobServer::addParam(ExperimentData* exp) volatile unsigned JobServer::m_DoneCount = 0; #endif -#ifndef __puma -boost::mutex CommThread::m_CommMutex; -#endif - ExperimentData *JobServer::getDone() { #ifndef __puma - if (m_undoneJobs.Size() == 0 - && noMoreExperiments() - && m_runningJobs.Size() == 0 - && m_doneJobs.Size() == 0 - && m_inOutCounter.getValue() == 0) { - return 0; + ExperimentData *exp = m_doneJobs.Dequeue(); + if (exp) { + m_inOutCounter.decrement(); } - - ExperimentData *exp = NULL; - exp = m_doneJobs.Dequeue(); - m_inOutCounter.decrement(); return exp; #endif } +void JobServer::setNoMoreExperiments() +{ +#ifndef __puma + boost::unique_lock<boost::mutex> lock(m_CommMutex); +#endif + // currently not really necessary, as we only non-blockingly dequeue: + m_undoneJobs.setIsFinished(); + + m_noMoreExps = true; + if (m_undoneJobs.Size() == 0 && + noMoreExperiments() && + m_runningJobs.Size() == 0) { + m_doneJobs.setIsFinished(); + } +} + #ifdef SERVER_PERFORMANCE_MEASURE void JobServer::measure() { @@ -156,11 +161,15 @@ void JobServer::run() boost::thread* th; while (!m_finish){ // Accept connection - int cs = accept(s, (struct sockaddr*)&clientaddr, &clen); - if (cs == -1) { - perror("accept"); - // TODO: Log-level? - return; + int cs = SocketComm::timedAccept(s, (struct sockaddr*)&clientaddr, &clen, 100); + if (cs < 0) { + if (errno != EWOULDBLOCK) { + perror("poll/accept"); + // TODO: Log-level? + return; + } else { + continue; + } } // Spawn a thread for further communication, // and add this thread to a list threads @@ -257,10 +266,6 @@ void CommThread::sendPendingExperimentData(Minion& minion) } else { break; } - - if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) { - cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; - } } if (exp.size() != 0) { ctrlmsg.set_job_size(exp.size()); @@ -271,8 +276,22 @@ void CommThread::sendPendingExperimentData(Minion& minion) if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) { for (i = 0; i < ctrlmsg.job_size(); i++) { if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) { + + // delay insertion into m_runningJobs until here, as + // getMessage() won't work anymore if this job is re-sent, + // received, and deleted in the meantime + if (!m_js.m_runningJobs.insert(exp.front()->getWorkloadID(), exp.front())) { + cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl; + } + exp.pop_front(); } else { + // add remaining jobs back to the queue + cout << "!![Server] failed to send scheduled " << exp.size() << " jobs" << endl; + while (exp.size()) { + m_js.m_undoneJobs.Enqueue(exp.front()); + exp.pop_front(); + } break; } @@ -285,7 +304,7 @@ void CommThread::sendPendingExperimentData(Minion& minion) // Prevent receiveExperimentResults from modifying (or indirectly, via // getDone and the campaign, deleting) jobs in the m_runningJobs queue. // (See details in receiveExperimentResults) - boost::unique_lock<boost::mutex> lock(m_CommMutex); + boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex); #endif if ((temp_exp = m_js.m_runningJobs.pickone()) != NULL) { // 2nd priority // (This picks one running job.) @@ -338,7 +357,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct // by the campaign at any time. // Additionally, receiving a result overwrites the job's contents. This // already may cause breakage in sendPendingExperimentData (a). - boost::unique_lock<boost::mutex> lock(m_CommMutex); + boost::unique_lock<boost::mutex> lock(m_js.m_CommMutex); #endif for (i = 0; i < ctrlmsg.workloadid_size(); i++) { if (m_js.m_runningJobs.remove(ctrlmsg.workloadid(i), exp)) { // ExperimentData* found @@ -361,6 +380,12 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct } } + // all results complete? + if (m_js.m_undoneJobs.Size() == 0 && + m_js.noMoreExperiments() && + m_js.m_runningJobs.Size() == 0) { + m_js.m_doneJobs.setIsFinished(); + } } } // end-of-namespace: fail diff --git a/src/core/cpn/JobServer.hpp b/src/core/cpn/JobServer.hpp index 7c1bf5217e317072b6e78753429e60a006542b9a..99255a79df356fcd7bbf87ff826a7a0b2c2b081d 100644 --- a/src/core/cpn/JobServer.hpp +++ b/src/core/cpn/JobServer.hpp @@ -66,6 +66,9 @@ private: SynchronizedQueue<ExperimentData*> m_undoneJobs; //! List of finished experiment results. SynchronizedQueue<ExperimentData*> m_doneJobs; +#ifndef __puma + boost::mutex m_CommMutex; //! to synchronise the communication +#endif // __puma friend class CommThread; //!< CommThread is allowed access the job queues. /** * The actual startup of the Jobserver. @@ -93,10 +96,13 @@ public: } ~JobServer() { + done(); #ifndef __puma // Cleanup of m_serverThread, etc. + m_serverThread->join(); delete m_serverThread; #ifdef SERVER_PERFORMANCE_MEASURE + m_measureThread->join(); delete m_measureThread; #endif #endif // __puma @@ -118,7 +124,7 @@ public: * sets. We need this, as we allow concurrent parameter generation and * distribution. */ - void setNoMoreExperiments() { m_noMoreExps = true; } + void setNoMoreExperiments(); /** * Checks whether there are no more experiment parameter sets. * @return \c true if no more parameter sets available, \c false otherwise @@ -162,9 +168,6 @@ private: */ void receiveExperimentResults(Minion& minion, FailControlMessage& ctrlmsg); public: -#ifndef __puma - static boost::mutex m_CommMutex; //! to synchronise the communication -#endif // __puma CommThread(int sockfd, JobServer& p) : m_sock(sockfd), m_job_size(1), m_js(p) { } /** diff --git a/src/core/efw/JobClient.cc b/src/core/efw/JobClient.cc index 153c863d506b16f244e731c8e3577c9862a7057f..ffb004af5e52a8158b1924094b4cbb54349975bc 100644 --- a/src/core/efw/JobClient.cc +++ b/src/core/efw/JobClient.cc @@ -6,10 +6,14 @@ using namespace std; namespace fail { JobClient::JobClient(const std::string& server, int port) + : m_server(server), m_server_port(port), + m_server_runid(0), // server accepts this for virgin clients + m_job_runtime_total(0), + m_job_throughput(CLIENT_JOB_INITIAL), // will be corrected after measurement + m_job_total(0), + m_connect_failed(false) { SocketComm::init(); - m_server_port = port; - m_server = server; m_server_ent = gethostbyname(m_server.c_str()); cout << "JobServer: " << m_server.c_str() << endl; if(m_server_ent == NULL) { @@ -18,8 +22,6 @@ JobClient::JobClient(const std::string& server, int port) exit(1); } srand(time(NULL)); // needed for random backoff (see connectToServer) - m_server_runid = 0; // server accepts this for virgin clients - m_job_throughput = 1; // client gets only one job at the first request } JobClient::~JobClient() @@ -30,6 +32,11 @@ JobClient::~JobClient() bool JobClient::connectToServer() { + // don't retry server connects to speedup shutdown at campaign end + if (m_connect_failed) { + return false; + } + int retries = CLIENT_RETRY_COUNT; while (true) { // Connect to server @@ -67,6 +74,7 @@ bool JobClient::connectToServer() cout << "[Client] Unable to reconnect (tried " << CLIENT_RETRY_COUNT << " times); " << "I'll give it up!" << endl; // TODO: Log-level? + m_connect_failed = true; return false; // finally: unable to connect, give it up :-( } break; // connected! :-) @@ -79,6 +87,11 @@ bool JobClient::connectToServer() bool JobClient::getParam(ExperimentData& exp) { + // die immediately if a previous connect already failed + if (m_connect_failed) { + return false; + } + while (1) { // Here we try to acquire a parameter set switch (tryToGetExperimentData(exp)) { // Jobserver will sent workload, params are set in \c exp @@ -135,10 +148,16 @@ FailControlMessage_Command JobClient::tryToGetExperimentData(ExperimentData& exp ExperimentData* temp_exp = new ExperimentData(exp.getMessage().New()); if (!SocketComm::rcvMsg(m_sockfd, temp_exp->getMessage())) { - // Failed to receive message? Retry. - close(m_sockfd); + // looks like we won't receive more jobs now, cleanup + delete &temp_exp->getMessage(); delete temp_exp; - return FailControlMessage::COME_AGAIN; + // did a previous loop iteration succeed? + if (m_parameters.size() > 0) { + break; + } else { + // nothing to do now, retry later + return FailControlMessage::COME_AGAIN; + } } temp_exp->setWorkloadID(ctrlmsg.workloadid(i)); //Store workload id of experiment data @@ -188,10 +207,10 @@ bool JobClient::sendResult(ExperimentData& result) m_job_runtime.reset(); m_job_runtime.startTimer(); m_job_total += m_results.size(); - sendResultsToServer(); + // tell caller whether we failed phoning home + return sendResultsToServer(); } - //If there are more jobs for the experiment store result return true; } else { //Stop time measurement and calculate new throughput @@ -219,6 +238,14 @@ bool JobClient::sendResultsToServer() { if (m_results.size() != 0) { if (!connectToServer()) { + // clear results, although we didn't get them to safety; otherwise, + // subsequent calls to sendResult() may and the destructor will + // retry sending them, resulting in a large shutdown time + while (m_results.size()) { + delete &m_results.front()->getMessage(); + delete m_results.front(); + m_results.pop_front(); + } return false; } @@ -240,10 +267,16 @@ bool JobClient::sendResultsToServer() cout << "]"; // TODO: Log-level? - SocketComm::sendMsg(m_sockfd, ctrlmsg); + if (!SocketComm::sendMsg(m_sockfd, ctrlmsg)) { + close(m_sockfd); + return false; + } for (i = 0; i < ctrlmsg.job_size() ; i++) { - SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage()); + if (!SocketComm::sendMsg(m_sockfd, m_results.front()->getMessage())) { + close(m_sockfd); + return false; + } delete &m_results.front()->getMessage(); delete m_results.front(); m_results.pop_front(); diff --git a/src/core/efw/JobClient.hpp b/src/core/efw/JobClient.hpp index de4153c543852bfc1f2ae5f564575fb6db1fcd1b..f0d0315790217b8dc94a826809058ac087f612a2 100644 --- a/src/core/efw/JobClient.hpp +++ b/src/core/efw/JobClient.hpp @@ -37,6 +37,8 @@ private: std::deque<ExperimentData*> m_parameters; std::deque<ExperimentData*> m_results; + bool m_connect_failed; + bool connectToServer(); bool sendResultsToServer(); FailControlMessage_Command tryToGetExperimentData(ExperimentData& exp); diff --git a/src/core/util/SynchronizedQueue.hpp b/src/core/util/SynchronizedQueue.hpp index 5a9a4047f4545c21124eec035a05c4f4052a1f80..45d523303f489283edc07e05f3cbbc330ad41b56 100644 --- a/src/core/util/SynchronizedQueue.hpp +++ b/src/core/util/SynchronizedQueue.hpp @@ -18,14 +18,15 @@ class SynchronizedQueue { // Adapted from: http://www.quantnet.com/cplusplus-mul private: std::queue<T> m_queue; //!< Use STL queue to store data unsigned capacity; + bool finished; #ifndef __puma boost::mutex m_mutex; //!< The mutex to synchronise on boost::condition_variable m_cond; //!< The condition to wait for boost::condition_variable m_cond_capacity; //!< Another condition to wait for #endif public: - SynchronizedQueue() : capacity(0) {} - SynchronizedQueue(unsigned capacity) : capacity(capacity) {} + SynchronizedQueue() : capacity(0), finished(false) {} + SynchronizedQueue(unsigned capacity) : capacity(capacity), finished(false) {} int Size() { #ifndef __puma @@ -69,6 +70,10 @@ public: // again after the wait #ifndef __puma while (m_queue.size() == 0) { + if (finished) { + // default-constructed T, 0 for integral types + return T(); + } m_cond.wait(lock); } #endif @@ -116,6 +121,17 @@ public: return false; } } // Lock is automatically released here + + void setIsFinished(bool value = true) + { +#ifndef __puma + boost::unique_lock<boost::mutex> lock(m_mutex); +#endif + finished = value; +#ifndef __puma + m_cond.notify_all(); +#endif + } }; } // end-of-namespace: fail