Skip to content
Snippets Groups Projects
Commit 9c984b97 authored by Michael Lenz's avatar Michael Lenz
Browse files

fail/cpn: (Database)Campaign no longer loses jobs

Up until now the JobServer was silently losing jobs and only claiming to be
finished - a workaround for this was to restart the campaign until all jobs
were finished according to the database and the campaign's output.
This change fixes the underlying problem, so a single campaign-run suffices
and does no longer lose any jobs.
Debugging this was awful and took us quite some time...

Change-Id: Ie6c982cc3b2ce11128941f1f13be563bae22565c
parent abd9decf
No related branches found
No related tags found
No related merge requests found
...@@ -27,19 +27,40 @@ bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg) ...@@ -27,19 +27,40 @@ bool SocketComm::sendMsg(int sockfd, google::protobuf::Message& msg)
bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg) bool SocketComm::rcvMsg(int sockfd, google::protobuf::Message& msg)
{ {
int size; char *buf;
if (safe_read(sockfd, &size, sizeof(size)) == -1) { int bufsiz;
return false; if ((buf = getBuf(sockfd, &bufsiz))) {
std::string st(buf, bufsiz);
delete [] buf;
return msg.ParseFromString(st);
} }
size = ntohl(size); return false;
char *buf = new char[size]; }
if (safe_read(sockfd, buf, size) == -1) {
bool SocketComm::dropMsg(int sockfd)
{
char *buf;
int bufsiz;
if ((buf = getBuf(sockfd, &bufsiz))) {
delete [] buf; delete [] buf;
return false; return true;
}
return false;
}
char * SocketComm::getBuf(int sockfd, int *size)
{
char *buf;
if (safe_read(sockfd, size, sizeof(int)) == -1) {
return 0;
}
*size = ntohl(*size);
buf = new char[*size];
if (safe_read(sockfd, buf, *size) == -1) {
delete [] buf;
return 0;
} }
std::string st(buf, size); return buf;
delete [] buf;
return msg.ParseFromString(st);
} }
ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count) ssize_t SocketComm::safe_write(int fd, const void *buf, size_t count)
......
...@@ -38,7 +38,15 @@ public: ...@@ -38,7 +38,15 @@ public:
*/ */
static bool rcvMsg(int sockfd, google::protobuf::Message& msg); static bool rcvMsg(int sockfd, google::protobuf::Message& msg);
/**
* Receive Protobuf-generated message and just drop it
* @param sockfd open socket descriptor to read from
* \return false if message reception failed
*/
static bool dropMsg(int sockfd);
private: private:
static char * getBuf(int sockfd, int *size);
static ssize_t safe_write(int fd, const void *buf, size_t count); static ssize_t safe_write(int fd, const void *buf, size_t count);
static ssize_t safe_read(int fd, void *buf, size_t count); static ssize_t safe_read(int fd, void *buf, size_t count);
}; };
......
...@@ -357,7 +357,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct ...@@ -357,7 +357,7 @@ void CommThread::receiveExperimentResults(Minion& minion, FailControlMessage& ct
cout << "[Server] Received another result for workload id [" cout << "[Server] Received another result for workload id ["
<< ctrlmsg.workloadid(i) << "] -- ignored." << endl; << ctrlmsg.workloadid(i) << "] -- ignored." << endl;
// TODO: Any need for error-handling here? SocketComm::dropMsg(minion.getSocketDescriptor());
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment