Commit f70ea49a authored by Davis King's avatar Davis King

Added code to allow different processing nodes to terminate at different

times.
parent c3f09874
...@@ -123,6 +123,7 @@ namespace dlib ...@@ -123,6 +123,7 @@ namespace dlib
const static char ALL_NODES_WAITING = 3; const static char ALL_NODES_WAITING = 3;
const static char SENT_MESSAGE = 4; const static char SENT_MESSAGE = 4;
const static char GOT_MESSAGE = 5; const static char GOT_MESSAGE = 5;
const static char NODE_TERMINATE = 6;
// ------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------
...@@ -202,6 +203,31 @@ namespace dlib ...@@ -202,6 +203,31 @@ namespace dlib
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
// IMPLEMENTATION OF bsp OBJECT MEMBERS // IMPLEMENTATION OF bsp OBJECT MEMBERS
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
void bsp::
close_all_connections_gracefully(
)
{
if (_node_id == 0)
{
// Wait for all the other nodes to terminate before we do anything since
// we are the controller node.
receive();
}
_cons.reset();
while (_cons.move_next())
{
// tell the other end that we are intentionally dropping the connection
serialize(impl::NODE_TERMINATE,_cons.element().value()->stream);
_cons.element().value()->stream.flush();
_cons.element().value()->con->shutdown();
}
check_for_errors();
}
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
bsp:: bsp::
...@@ -225,7 +251,7 @@ namespace dlib ...@@ -225,7 +251,7 @@ namespace dlib
unsigned long node_id_, unsigned long node_id_,
impl::map_id_to_con& cons_ impl::map_id_to_con& cons_
) : ) :
read_thread_terminated(false), read_thread_terminated_improperly(false),
outstanding_messages(0), outstanding_messages(0),
num_waiting_nodes(0), num_waiting_nodes(0),
buf_not_empty(class_mutex), buf_not_empty(class_mutex),
...@@ -264,11 +290,11 @@ namespace dlib ...@@ -264,11 +290,11 @@ namespace dlib
if (msg_buffer.size() == 0) if (msg_buffer.size() == 0)
{ {
send_to_master_node(WAITING_ON_RECEIVE); send_to_master_node(WAITING_ON_RECEIVE);
while (msg_buffer.size() == 0 && !read_thread_terminated) while (msg_buffer.size() == 0 && !read_thread_terminated_improperly)
{ {
buf_not_empty.wait(); buf_not_empty.wait();
} }
if (read_thread_terminated) if (read_thread_terminated_improperly)
{ {
throw dlib::socket_error("A connection between processing nodes has been lost."); throw dlib::socket_error("A connection between processing nodes has been lost.");
} }
...@@ -354,12 +380,17 @@ namespace dlib ...@@ -354,12 +380,17 @@ namespace dlib
while (_cons.move_next()) while (_cons.move_next())
{ {
try try
{
// Skip connections to nodes that have already terminated their
// execution.
if (_cons.element().value()->terminated == false)
{ {
serialize(ALL_NODES_WAITING, _cons.element().value()->stream); serialize(ALL_NODES_WAITING, _cons.element().value()->stream);
_cons.element().value()->stream.flush(); _cons.element().value()->stream.flush();
if (!_cons.element().value()->stream) if (!_cons.element().value()->stream)
throw dlib::error("Error writing data to TCP connection"); throw dlib::error("Error writing data to TCP connection");
} }
}
catch (std::exception& e) catch (std::exception& e)
{ {
const connection* const con = _cons.element().value()->con.get(); const connection* const con = _cons.element().value()->con.get();
...@@ -438,6 +469,18 @@ namespace dlib ...@@ -438,6 +469,18 @@ namespace dlib
auto_mutex lock(class_mutex); auto_mutex lock(class_mutex);
--outstanding_messages; --outstanding_messages;
} break; } break;
case NODE_TERMINATE: {
auto_mutex lock(class_mutex);
if (_node_id == 0)
{
// a terminating node is basically the same as a node that waits forever.
_cons[sender_id]->terminated = true;
++num_waiting_nodes;
notify_everyone_if_all_blocked();
}
return;
} break;
} }
} }
} }
...@@ -453,7 +496,7 @@ namespace dlib ...@@ -453,7 +496,7 @@ namespace dlib
} }
auto_mutex lock(class_mutex); auto_mutex lock(class_mutex);
read_thread_terminated = true; read_thread_terminated_improperly = true;
buf_not_empty.signal(); buf_not_empty.signal();
} }
...@@ -476,6 +519,9 @@ namespace dlib ...@@ -476,6 +519,9 @@ namespace dlib
) )
{ {
using namespace impl; using namespace impl;
if (_cons[target_node_id]->terminated)
throw socket_error("Attempt to send a message to a node that has terminated.");
serialize(MESSAGE_HEADER, _cons[target_node_id]->stream); serialize(MESSAGE_HEADER, _cons[target_node_id]->stream);
serialize(item, _cons[target_node_id]->stream); serialize(item, _cons[target_node_id]->stream);
_cons[target_node_id]->stream.flush(); _cons[target_node_id]->stream.flush();
......
...@@ -27,14 +27,16 @@ namespace dlib ...@@ -27,14 +27,16 @@ namespace dlib
) : ) :
con(connect(dest.first,dest.second)), con(connect(dest.first,dest.second)),
buf(con), buf(con),
stream(&buf) stream(&buf),
terminated(false)
{} {}
bsp_con( bsp_con(
scoped_ptr<connection>& conptr scoped_ptr<connection>& conptr
) : ) :
buf(conptr), buf(conptr),
stream(&buf) stream(&buf),
terminated(false)
{ {
// make sure we own the connection // make sure we own the connection
conptr.swap(con); conptr.swap(con);
...@@ -43,6 +45,7 @@ namespace dlib ...@@ -43,6 +45,7 @@ namespace dlib
scoped_ptr<connection> con; scoped_ptr<connection> con;
sockstreambuf::kernel_2a buf; sockstreambuf::kernel_2a buf;
std::iostream stream; std::iostream stream;
bool terminated;
}; };
typedef dlib::map<unsigned long, scoped_ptr<bsp_con> >::kernel_1a_c map_id_to_con; typedef dlib::map<unsigned long, scoped_ptr<bsp_con> >::kernel_1a_c map_id_to_con;
...@@ -135,6 +138,20 @@ namespace dlib ...@@ -135,6 +138,20 @@ namespace dlib
BSP computation. BSP computation.
!*/ !*/
void receive (
)
/*!
ensures
- simply waits for all other nodes to become blocked
on calls to receive() or to terminate (i.e. waits for
other nodes to be in a state that can't send messages).
!*/
{
int junk;
if (receive(junk))
throw dlib::socket_error("call to receive got an unexpected message");
}
template <typename T> template <typename T>
bool receive ( bool receive (
T& item T& item
...@@ -194,6 +211,14 @@ namespace dlib ...@@ -194,6 +211,14 @@ namespace dlib
impl::map_id_to_con& cons_ impl::map_id_to_con& cons_
); );
void close_all_connections_gracefully();
/*!
ensures
- closes all the connections to other nodes and lets them know that
we are terminating normally rather than as the result of some kind
of error.
!*/
bool receive_data ( bool receive_data (
shared_ptr<std::string>& item, shared_ptr<std::string>& item,
unsigned long& sending_node_id unsigned long& sending_node_id
...@@ -237,7 +262,7 @@ namespace dlib ...@@ -237,7 +262,7 @@ namespace dlib
rmutex class_mutex; // used to lock any class members touched from more than one thread. rmutex class_mutex; // used to lock any class members touched from more than one thread.
std::string error_message; std::string error_message;
bool read_thread_terminated; // true if any of our connections goes down. bool read_thread_terminated_improperly; // true if any of our connections goes down.
unsigned long outstanding_messages; unsigned long outstanding_messages;
unsigned long num_waiting_nodes; unsigned long num_waiting_nodes;
rsignaler buf_not_empty; // used to signal when msg_buffer isn't empty rsignaler buf_not_empty; // used to signal when msg_buffer isn't empty
...@@ -362,7 +387,7 @@ namespace dlib ...@@ -362,7 +387,7 @@ namespace dlib
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj); funct(obj);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -383,7 +408,7 @@ namespace dlib ...@@ -383,7 +408,7 @@ namespace dlib
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj,arg1); funct(obj,arg1);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -406,7 +431,7 @@ namespace dlib ...@@ -406,7 +431,7 @@ namespace dlib
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj,arg1,arg2); funct(obj,arg1,arg2);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -431,7 +456,7 @@ namespace dlib ...@@ -431,7 +456,7 @@ namespace dlib
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj,arg1,arg2,arg3); funct(obj,arg1,arg2,arg3);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -451,7 +476,7 @@ namespace dlib ...@@ -451,7 +476,7 @@ namespace dlib
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj); funct(obj);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -471,7 +496,7 @@ namespace dlib ...@@ -471,7 +496,7 @@ namespace dlib
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj,arg1); funct(obj,arg1);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -493,7 +518,7 @@ namespace dlib ...@@ -493,7 +518,7 @@ namespace dlib
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj,arg1,arg2); funct(obj,arg1,arg2);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -517,7 +542,7 @@ namespace dlib ...@@ -517,7 +542,7 @@ namespace dlib
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp obj(node_id, cons); bsp obj(node_id, cons);
funct(obj,arg1,arg2,arg3); funct(obj,arg1,arg2,arg3);
obj.check_for_errors(); obj.close_all_connections_gracefully();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment