Skip to content
Snippets Groups Projects
Commit 4a47e9f9 authored by Adrian Böckenkamp's avatar Adrian Böckenkamp
Browse files

Job-resend mechanism for JobServer added.

git-svn-id: https://www4.informatik.uni-erlangen.de/i4svn/danceos/trunk/devel/fail@1060 8c4709b5-6ec9-48aa-a5cd-a96041d1645a
parent 9a31ce2b
No related branches found
No related tags found
No related merge requests found
// Author: Martin Hoffmann, Richard Hellwig
// Author: Martin Hoffmann, Richard Hellwig, Adrian Böckenkamp
// Date: 07.10.11
// <iostream> needs to be included before *.pb.h, otherwise ac++/Puma chokes on the latter
......@@ -26,7 +26,6 @@ using namespace std;
namespace fi {
void JobServer::addParam(ExperimentData* exp){
#ifndef __puma
m_undoneJobs.Enqueue(exp);
......@@ -151,9 +150,10 @@ void JobServer::run(){
#endif
}
/// Communication thread implementation
void CommThread::operator()()
{
// The communication thread implementation:
Minion minion;
FailControlMessage ctrlmsg;
minion.setSocketDescriptor(m_sock);
......@@ -186,52 +186,88 @@ void CommThread::operator()()
close(m_sock);
}
bool CommThread::sendPendingExperimentData(Minion& minion)
#ifndef __puma
boost::mutex CommThread::m_CommMutex;
#endif // __puma
void CommThread::sendPendingExperimentData(Minion& minion)
{
FailControlMessage ctrlmsg;
ctrlmsg.set_build_id(42);
ExperimentData * exp = 0;
if(m_js.m_undoneJobs.Dequeue_nb(exp) == true){
// Got an element from queue, assign ID to workload and send to minion
uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter
exp->setWorkloadID(workloadID); // store ID for identification when receiving result
if(!m_js.m_runningJobs.insert(workloadID, exp)){
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS);
ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Sending workload [" << workloadID << "]" << endl;
cout << ">>[" << workloadID << "] " << flush;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage());
}else if( m_js.noMoreExperiments() == false ){
// Currently we have no workload, but the campaign is not over yet. Minion can try again later
ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN);
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
cout << "--[Server] No workload, come again..." << endl;
}else{
// No more elements, and campaign is over. Minion can die.
ctrlmsg.set_command(FailControlMessage_Command_DIE);
cout << "--[Server] No workload, and no campaign, please die." << endl;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
}
return true;
}
FailControlMessage ctrlmsg;
ctrlmsg.set_build_id(42);
ExperimentData * exp = 0;
if(m_js.m_undoneJobs.Dequeue_nb(exp) == true) {
// Got an element from queue, assign ID to workload and send to minion
uint32_t workloadID = m_js.m_counter.increment(); // increment workload counter
exp->setWorkloadID(workloadID); // store ID for identification when receiving result
if(!m_js.m_runningJobs.insert(workloadID, exp)) {
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
}
ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS);
ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Sending workload [" << workloadID << "]" << endl;
cout << ">>[" << workloadID << "] " << flush;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage());
return;
}
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_CommMutex);
#endif
if((exp = m_js.m_runningJobs.first()) != NULL) { // 2nd priority
// (This simply gets the first running-job.)
// TODO: Improve selection of parameter-set to be resend (the first is not
// necessarily the best...especially when the specific parameter-set
// causes the experiment-client to terminate abnormally -> endless loop!)
// Further ideas: sequential, random, ...? (+ "retry-counter" for each job)
// Implement resend of running-parameter sets to improve campaign speed
// and to prevent result loss due to (unexpected) termination of experiment
// clients.
// (Note: Therefore we need to be aware of receiving multiple results for a
// single parameter-set, @see receiveExperimentResults.)
uint32_t workloadID = exp->getWorkloadID(); // (this ID has been set previously)
// Resend the parameter-set.
ctrlmsg.set_command(FailControlMessage_Command_WORK_FOLLOWS);
ctrlmsg.set_workloadid(workloadID); // set workload id
//cout << ">>[Server] Re-sending workload [" << workloadID << "]" << endl;
cout << ">>R[" << workloadID << "] " << flush;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
SocketComm::send_msg(minion.getSocketDescriptor(), exp->getMessage());
} else if(m_js.noMoreExperiments() == false) {
// Currently we have no workload (even the running-job-queue is empty!), but
// the campaign is not over yet. Minion can try again later.
ctrlmsg.set_command(FailControlMessage_Command_COME_AGAIN);
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
cout << "--[Server] No workload, come again..." << endl;
} else {
// No more elements, and campaign is over. Minion can die.
ctrlmsg.set_command(FailControlMessage_Command_DIE);
cout << "--[Server] No workload, and no campaign, please die." << endl;
SocketComm::send_msg(minion.getSocketDescriptor(), ctrlmsg);
}
}
bool CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
void CommThread::receiveExperimentResults(Minion& minion, uint32_t workloadID)
{
ExperimentData * exp; // Get exp* from running jobs
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
cout << "<<[" << workloadID << "] " << flush;
if( m_js.m_runningJobs.remove(workloadID, exp) ){ /// ExperimentData* found
SocketComm::rcv_msg(minion.getSocketDescriptor(), exp->getMessage() ); /// deserialize results.
m_js.m_doneJobs.Enqueue(exp); /// Put results in done queue..
return true;
}else{
cout << "!![Server] workload id not found in running jobs map :( [" << workloadID << "]" << endl;
return false;
}
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_CommMutex);
#endif
ExperimentData * exp; // Get exp* from running jobs
//cout << "<<[Server] Received result for workload id [" << workloadID << "]" << endl;
cout << "<<[" << workloadID << "] " << flush;
if(m_js.m_runningJobs.remove(workloadID, exp)) { // ExperimentData* found
SocketComm::rcv_msg(minion.getSocketDescriptor(), exp->getMessage() ); // deserialize results.
m_js.m_doneJobs.Enqueue(exp); // Put results in done queue..
} else {
// We can receive several results for the same workload id because
// we (may) distribute the (running) jobs to a *few* experiment-clients.
cout << "[Server] Received another result for workload id ["
<< workloadID << "] -- ignored." << endl;
// TODO: Any need for error-handling here?
}
}
};
/**
* \brief The JobServer supplies the Minions with ExperimentData's and receives the result data.
* \brief The JobServer supplies the Minions with ExperimentData's
* and receives the result data.
*
* \author Martin Hoffmann, Richard Hellwig
* \author Martin Hoffmann, Richard Hellwig, Adrian Böckenkamp
*/
......@@ -65,7 +66,13 @@ public:
m_serverThread = new boost::thread(&JobServer::run, this); // run operator()() in a thread.
#endif
};
~JobServer() {}
~JobServer()
{
#ifndef __puma
// Cleanup of m_serverThread, etc.
delete m_serverThread;
#endif // __puma
};
private:
......@@ -75,7 +82,7 @@ public:
* and listen for connections.
*/
void run();
void sendWork(int sockfd);
public:
......@@ -126,32 +133,32 @@ public:
class CommThread {
int m_sock; //! Socket descriptor of the connection
JobServer& m_js; //! Calling jobserver
#ifndef __puma
static boost::mutex m_CommMutex; //! to synchronise the communication
#endif // __puma
public:
CommThread(int sockfd, JobServer& p) : m_sock(sockfd), m_js(p) {};
/**
* The thread's entry point
* The thread's entry point.
*/
void operator() ();
private:
/// FIXME concerns are not really separated yet ;)
/// FIXME concerns are not really separated yet ;)
/**
* Called after minion calls for work.
* Tries to deque a parameter set non blocking, and
* sends it back to the requesting minion.
* @param minion The minion asking for input
* @return FIXME return value not evaluated yet.
*/
bool sendPendingExperimentData(Minion& minion);
void sendPendingExperimentData(Minion& minion);
/**
* Called after minion offers a result message.
* Evaluates the Workload ID and puts the corresponding
* job result into the result queue.
* @param minion The minion offering results
* @param workloadID The workload id of the result message
* @return \c true if Worload ID could be mapped, \c false if not
*/
bool receiveExperimentResults(Minion& minion, uint32_t workloadID);
void receiveExperimentResults(Minion& minion, uint32_t workloadID);
};
};
......
......@@ -27,6 +27,20 @@ private:
#endif
return m_map.size();
}
/**
* Retrieves the first element in the map.
* @return a pointer to the first element, or \c NULL if empty
*/
Tvalue first()
{
#ifndef __puma
boost::unique_lock<boost::mutex> lock(m_mutex);
#endif
if(m_map.size() > 0)
return m_map.begin()->second;
else
return NULL;
} // Lock is automatically released here
/**
* Add data to the map, return false if already present
* @param key Map key
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment