Commit a80e573c authored by Davis King's avatar Davis King

made code a little more robust.

parent cb901d75
...@@ -209,20 +209,20 @@ namespace dlib ...@@ -209,20 +209,20 @@ namespace dlib
close_all_connections_gracefully( close_all_connections_gracefully(
) )
{ {
if (_node_id == 0)
{
// Wait for all the other nodes to terminate before we do anything since
// we are the controller node.
receive();
}
_cons.reset(); _cons.reset();
while (_cons.move_next()) while (_cons.move_next())
{ {
// tell the other end that we are intentionally dropping the connection // tell the other end that we are intentionally dropping the connection
serialize(impl::NODE_TERMINATE,_cons.element().value()->stream); serialize(impl::NODE_TERMINATE,_cons.element().value()->stream);
_cons.element().value()->stream.flush(); _cons.element().value()->stream.flush();
_cons.element().value()->con->shutdown(); _cons.element().value()->con->shutdown_outgoing();
}
{
// now wait for all the other nodes to terminate
auto_mutex lock(class_mutex);
while (num_terminated_nodes < _cons.size())
terminated_signal.wait();
} }
check_for_errors(); check_for_errors();
...@@ -255,6 +255,7 @@ namespace dlib ...@@ -255,6 +255,7 @@ namespace dlib
outstanding_messages(0), outstanding_messages(0),
num_waiting_nodes(0), num_waiting_nodes(0),
buf_not_empty(class_mutex), buf_not_empty(class_mutex),
terminated_signal(class_mutex),
_cons(cons_), _cons(cons_),
_node_id(node_id_) _node_id(node_id_)
{ {
...@@ -422,7 +423,7 @@ namespace dlib ...@@ -422,7 +423,7 @@ namespace dlib
try try
{ {
using namespace impl; using namespace impl;
while (con->stream.peek() != EOF) while(true)
{ {
char header; char header;
deserialize(header, con->stream); deserialize(header, con->stream);
...@@ -472,6 +473,10 @@ namespace dlib ...@@ -472,6 +473,10 @@ namespace dlib
case NODE_TERMINATE: { case NODE_TERMINATE: {
auto_mutex lock(class_mutex); auto_mutex lock(class_mutex);
++num_terminated_nodes;
terminated_signal.signal();
if (_node_id == 0) if (_node_id == 0)
{ {
// a terminating node is basically the same as a node that waits forever. // a terminating node is basically the same as a node that waits forever.
...@@ -494,10 +499,21 @@ namespace dlib ...@@ -494,10 +499,21 @@ namespace dlib
auto_mutex lock(class_mutex); auto_mutex lock(class_mutex);
error_message = sout.str(); error_message = sout.str();
} }
catch (...)
{
std::ostringstream sout;
sout << "An exception was thrown while attempting to receive a message from processing node " << sender_id << ".\n";
sout << " Sending processing node address: " << con->con->get_foreign_ip() << ":" << con->con->get_foreign_port() << std::endl;
sout << " Receiving processing node address: " << con->con->get_local_ip() << ":" << con->con->get_local_port() << std::endl;
auto_mutex lock(class_mutex);
error_message = sout.str();
}
auto_mutex lock(class_mutex); auto_mutex lock(class_mutex);
read_thread_terminated_improperly = true; read_thread_terminated_improperly = true;
buf_not_empty.signal(); buf_not_empty.signal();
++num_terminated_nodes;
terminated_signal.signal();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
......
...@@ -215,7 +215,9 @@ namespace dlib ...@@ -215,7 +215,9 @@ namespace dlib
bool read_thread_terminated_improperly; // true if any of our connections goes down. bool read_thread_terminated_improperly; // true if any of our connections goes down.
unsigned long outstanding_messages; unsigned long outstanding_messages;
unsigned long num_waiting_nodes; unsigned long num_waiting_nodes;
unsigned long num_terminated_nodes;
rsignaler buf_not_empty; // used to signal when msg_buffer isn't empty rsignaler buf_not_empty; // used to signal when msg_buffer isn't empty
rsignaler terminated_signal;
std::deque<shared_ptr<std::string> > msg_buffer; std::deque<shared_ptr<std::string> > msg_buffer;
std::deque<unsigned long> msg_sender_id; std::deque<unsigned long> msg_sender_id;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment