Commit 491cbe1a authored by Davis King's avatar Davis King

Refactored and greatly simplified the BSP implementation. This has

fixed a few subtle race conditions and now the tool seems to work
robustly.
parent aa230458
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "bsp.h" #include "bsp.h"
#include "../ref.h" #include "../ref.h"
#include <stack>
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -10,7 +11,7 @@ ...@@ -10,7 +11,7 @@
namespace dlib namespace dlib
{ {
namespace impl namespace impl1
{ {
struct hostinfo struct hostinfo
...@@ -51,17 +52,28 @@ namespace dlib ...@@ -51,17 +52,28 @@ namespace dlib
void connect_all_hostinfo ( void connect_all_hostinfo (
map_id_to_con& cons, map_id_to_con& cons,
const std::vector<hostinfo>& hosts, const std::vector<hostinfo>& hosts,
unsigned long node_id unsigned long node_id,
std::string& error_string
) )
{ {
cons.clear(); cons.clear();
for (unsigned long i = 0; i < hosts.size(); ++i) for (unsigned long i = 0; i < hosts.size(); ++i)
{ {
scoped_ptr<bsp_con> con(new bsp_con(make_pair(hosts[i].ip,hosts[i].port))); try
serialize(node_id, con->stream); // tell the other end our node_id {
con->stream.flush(); scoped_ptr<bsp_con> con(new bsp_con(make_pair(hosts[i].ip,hosts[i].port)));
unsigned long id = hosts[i].node_id; serialize(node_id, con->stream); // tell the other end our node_id
cons.add(id, con); con->stream.flush();
unsigned long id = hosts[i].node_id;
cons.add(id, con);
}
catch (std::exception& e)
{
std::ostringstream sout;
sout << "Could not connect to " << hosts[i].ip << ":" << hosts[i].port;
error_string = sout.str();
break;
}
} }
} }
...@@ -114,17 +126,6 @@ namespace dlib ...@@ -114,17 +126,6 @@ namespace dlib
} }
} }
// ------------------------------------------------------------------------------------
// These control bytes are sent before each message nodes send to each other.
const static char MESSAGE_HEADER = 0;
const static char WAITING_ON_RECEIVE = 1;
const static char NOT_WAITING_ON_RECEIVE = 2;
const static char ALL_NODES_WAITING = 3;
const static char SENT_MESSAGE = 4;
const static char GOT_MESSAGE = 5;
const static char NODE_TERMINATE = 6;
// ------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------
void listen_and_connect_all( void listen_and_connect_all(
...@@ -166,7 +167,10 @@ namespace dlib ...@@ -166,7 +167,10 @@ namespace dlib
// make a thread that will connect to all the targets // make a thread that will connect to all the targets
map_id_to_con cons2; map_id_to_con cons2;
thread_function thread(impl::connect_all_hostinfo, ref(cons2), ref(targets), node_id); std::string error_string;
thread_function thread(connect_all_hostinfo, ref(cons2), ref(targets), node_id, ref(error_string));
if (error_string.size() != 0)
throw socket_error(error_string);
// accept any incoming connections // accept any incoming connections
for (unsigned long i = 0; i < num_incoming_connections; ++i) for (unsigned long i = 0; i < num_incoming_connections; ++i)
...@@ -197,6 +201,110 @@ namespace dlib ...@@ -197,6 +201,110 @@ namespace dlib
cons.add(id,temp); cons.add(id,temp);
} }
} }
}
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
namespace impl2
{
// These control bytes are sent before each message nodes send to each other.
// denotes a normal content message.
const static char MESSAGE_HEADER = 0;
// sent back to sender, means message was returned by receive().
const static char GOT_MESSAGE = 1;
// broadcast when a node goes into a state where it has no outstanding sent
// messages (i.e. it received GOT_MESSAGE for all its sent messages) and is waiting
// on receive().
const static char IN_WAITING_STATE = 2;
// broadcast when no longer in IN_WAITING_STATE state.
const static char NOT_IN_WAITING_STATE = 3;
// broadcast when a node terminates itself.
const static char NODE_TERMINATE = 4;
// broadcast when a node finds out that all non-terminated nodes are in the
// IN_WAITING_STATE state. sending this message puts a node into the
// SEE_ALL_IN_WAITING_STATE where it will wait until it gets this message from all
// others and then return from receive() once this happens.
const static char SEE_ALL_IN_WAITING_STATE = 5;
const static char READ_ERROR = 6;
// ------------------------------------------------------------------------------------
void read_thread (
impl1::bsp_con* con,
unsigned long node_id,
unsigned long sender_id,
impl1::thread_safe_deque& msg_buffer
)
{
try
{
while(true)
{
impl1::msg_data msg;
deserialize(msg.msg_type, con->stream);
msg.sender_id = sender_id;
if (msg.msg_type == MESSAGE_HEADER)
{
msg.data.reset(new std::string);
deserialize(*msg.data, con->stream);
}
msg_buffer.push_and_consume(msg);
if (msg.msg_type == NODE_TERMINATE)
break;
}
}
catch (std::exception& e)
{
std::ostringstream sout;
sout << "An exception was thrown while attempting to receive a message from processing node " << sender_id << ".\n";
sout << " Sending processing node address: " << con->con->get_foreign_ip() << ":" << con->con->get_foreign_port() << std::endl;
sout << " Receiving processing node address: " << con->con->get_local_ip() << ":" << con->con->get_local_port() << std::endl;
sout << " Receiving processing node id: " << node_id << std::endl;
sout << " Error message in the exception: " << e.what() << std::endl;
impl1::msg_data msg;
msg.sender_id = sender_id;
msg.msg_type = READ_ERROR;
msg.data.reset(new std::string);
*msg.data = sout.str();
msg_buffer.push_and_consume(msg);
}
catch (...)
{
std::ostringstream sout;
sout << "An exception was thrown while attempting to receive a message from processing node " << sender_id << ".\n";
sout << " Sending processing node address: " << con->con->get_foreign_ip() << ":" << con->con->get_foreign_port() << std::endl;
sout << " Receiving processing node address: " << con->con->get_local_ip() << ":" << con->con->get_local_port() << std::endl;
sout << " Receiving processing node id: " << node_id << std::endl;
impl1::msg_data msg;
msg.sender_id = sender_id;
msg.msg_type = READ_ERROR;
msg.data.reset(new std::string);
*msg.data = sout.str();
msg_buffer.push_and_consume(msg);
}
}
// ------------------------------------------------------------------------------------
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -213,19 +321,33 @@ namespace dlib ...@@ -213,19 +321,33 @@ namespace dlib
while (_cons.move_next()) while (_cons.move_next())
{ {
// tell the other end that we are intentionally dropping the connection // tell the other end that we are intentionally dropping the connection
serialize(impl::NODE_TERMINATE,_cons.element().value()->stream); serialize(impl2::NODE_TERMINATE,_cons.element().value()->stream);
_cons.element().value()->stream.flush(); _cons.element().value()->stream.flush();
_cons.element().value()->con->shutdown_outgoing();
} }
impl1::msg_data msg;
// now wait for all the other nodes to terminate
while (num_terminated_nodes < _cons.size() )
{ {
// now wait for all the other nodes to terminate if (!msg_buffer.pop(msg))
auto_mutex lock(class_mutex); throw dlib::socket_error("Error reading from msg_buffer in dlib::bsp_context.");
while (num_terminated_nodes < _cons.size())
terminated_signal.wait(); if (msg.msg_type == impl2::NODE_TERMINATE)
++num_terminated_nodes;
else if (msg.msg_type == impl2::READ_ERROR)
throw dlib::socket_error(*msg.data);
else if (msg.msg_type == impl2::GOT_MESSAGE)
--outstanding_messages;
} }
check_for_errors(); if (outstanding_messages != 0)
{
std::ostringstream sout;
sout << "A BSP job was allowed to terminate before all sent messages have been received.\n";
sout << "There are at least " << outstanding_messages << " messages still in flight. Make sure all sent messages\n";
sout << "have a corresponding call to receive().";
throw dlib::socket_error(sout.str());
}
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -239,6 +361,7 @@ namespace dlib ...@@ -239,6 +361,7 @@ namespace dlib
_cons.element().value()->con->shutdown(); _cons.element().value()->con->shutdown();
} }
msg_buffer.disable();
// this will wait for all the threads to terminate // this will wait for all the threads to terminate
threads.clear(); threads.clear();
...@@ -249,26 +372,23 @@ namespace dlib ...@@ -249,26 +372,23 @@ namespace dlib
bsp_context:: bsp_context::
bsp_context( bsp_context(
unsigned long node_id_, unsigned long node_id_,
impl::map_id_to_con& cons_ impl1::map_id_to_con& cons_
) : ) :
read_thread_terminated_improperly(false),
outstanding_messages(0), outstanding_messages(0),
num_waiting_nodes(0), num_waiting_nodes(0),
num_terminated_nodes(0), num_terminated_nodes(0),
buf_not_empty(class_mutex),
terminated_signal(class_mutex),
_cons(cons_), _cons(cons_),
_node_id(node_id_) _node_id(node_id_)
{ {
// spawn a bunch of read threads, one for each connection // spawn a bunch of read threads, one for each connection
member_function_pointer<impl::bsp_con*, unsigned long>::kernel_1a_c mfp;
mfp.set(*this, &bsp_context::read_thread);
_cons.reset(); _cons.reset();
while (_cons.move_next()) while (_cons.move_next())
{ {
scoped_ptr<thread_function> ptr(new thread_function(mfp, scoped_ptr<thread_function> ptr(new thread_function(&impl2::read_thread,
_cons.element().value().get(), _cons.element().value().get(),
_cons.element().key())); _node_id,
_cons.element().key(),
ref(msg_buffer)));
threads.push_back(ptr); threads.push_back(ptr);
} }
...@@ -282,259 +402,134 @@ namespace dlib ...@@ -282,259 +402,134 @@ namespace dlib
unsigned long& sending_node_id unsigned long& sending_node_id
) )
{ {
using namespace impl; if (outstanding_messages == 0)
// If there aren't any other nodes then you will never receive anything. broadcast_byte(impl2::IN_WAITING_STATE);
if (_cons.size() == 0)
return false;
unsigned long num_in_see_all_in_waiting_state = 0;
bool sent_see_all_in_waiting_state = false;
std::stack<impl1::msg_data> buf;
while (true)
{ {
auto_mutex lock(class_mutex); // if there aren't any nodes left to give us messages then return right now.
if (msg_buffer.size() == 0) if (num_terminated_nodes == _cons.size())
return false;
// if all running nodes are currently blocking forever on receive_data()
if (outstanding_messages == 0 && num_terminated_nodes + num_waiting_nodes == _cons.size())
{ {
send_to_master_node(WAITING_ON_RECEIVE); num_waiting_nodes = 0;
while (msg_buffer.size() == 0 && !read_thread_terminated_improperly) sent_see_all_in_waiting_state = true;
{ broadcast_byte(impl2::SEE_ALL_IN_WAITING_STATE);
buf_not_empty.wait(); }
}
if (read_thread_terminated_improperly) impl1::msg_data data;
if (!msg_buffer.pop(data))
throw dlib::socket_error("Error reading from msg_buffer in dlib::bsp_context.");
if (sent_see_all_in_waiting_state)
{
// Once we have gotten one SEE_ALL_IN_WAITING_STATE, all we care about is
// getting the rest of them. So the effect of this code is to always move
// any SEE_ALL_IN_WAITING_STATE messages to the front of the message queue.
if (data.msg_type != impl2::SEE_ALL_IN_WAITING_STATE)
{ {
throw dlib::socket_error("A connection between processing nodes has been lost."); buf.push(data);
continue;
} }
send_to_master_node(NOT_WAITING_ON_RECEIVE);
} }
sending_node_id = msg_sender_id.front(); switch(data.msg_type)
msg_sender_id.pop_front(); {
item = msg_buffer.front(); case impl2::MESSAGE_HEADER: {
msg_buffer.pop_front(); item = data.data;
} sending_node_id = data.sender_id;
// if this is a message from another node rather than the // if we would have send the IN_WAITING_STATE message before getting to
// "everyone is blocked on receive() message". // this point then let other nodes know that we aren't waiting anymore.
if (item) if (outstanding_messages == 0)
{ broadcast_byte(impl2::NOT_IN_WAITING_STATE);
send_to_master_node(GOT_MESSAGE);
return true;
}
else
{
return false;
}
}
// ---------------------------------------------------------------------------------------- send_byte(impl2::GOT_MESSAGE, data.sender_id);
return true;
void bsp_context::
send_to_master_node (
char msg
)
{
using namespace impl;
// if we aren't the special controlling node then send the
// controller a message.
if (_cons.is_in_domain(0))
{
serialize(msg, _cons[0]->stream);
_cons[0]->stream.flush();
}
else if (_node_id == 0) // if this is the master node
{
// since we are the master node we will just modify our state directly
auto_mutex lock(class_mutex);
switch(msg)
{
case WAITING_ON_RECEIVE: {
++num_waiting_nodes;
notify_everyone_if_all_blocked();
} break; } break;
case NOT_WAITING_ON_RECEIVE: { case impl2::IN_WAITING_STATE: {
--num_waiting_nodes; ++num_waiting_nodes;
} break; } break;
case SENT_MESSAGE: { case impl2::NOT_IN_WAITING_STATE: {
++outstanding_messages; --num_waiting_nodes;
} break; } break;
case GOT_MESSAGE: { case impl2::GOT_MESSAGE: {
--outstanding_messages; --outstanding_messages;
if (outstanding_messages == 0)
broadcast_byte(impl2::IN_WAITING_STATE);
} break; } break;
default: case impl2::NODE_TERMINATE: {
DLIB_CASSERT(false,"this should not happen"); ++num_terminated_nodes;
} _cons[data.sender_id]->terminated = true;
} if (num_terminated_nodes == _cons.size())
} {
return false;
// ---------------------------------------------------------------------------------------- }
} break;
void bsp_context:: case impl2::SEE_ALL_IN_WAITING_STATE: {
notify_everyone_if_all_blocked( ++num_in_see_all_in_waiting_state;
) if (num_in_see_all_in_waiting_state + num_terminated_nodes == _cons.size())
{
using namespace impl;
// if all the nodes are blocked on receive() and there aren't any
// messages in flight.
if (_node_id == 0 && num_waiting_nodes == number_of_nodes() && outstanding_messages == 0)
{
// send notifications
_cons.reset();
while (_cons.move_next())
{
try
{
// Skip connections to nodes that have already terminated their
// execution.
if (_cons.element().value()->terminated == false)
{ {
serialize(ALL_NODES_WAITING, _cons.element().value()->stream); // put stuff from buf back into msg_buffer
_cons.element().value()->stream.flush(); while (buf.size() != 0)
if (!_cons.element().value()->stream) {
throw dlib::error("Error writing data to TCP connection"); msg_buffer.push_front(buf.top());
buf.pop();
}
return false;
} }
} } break;
catch (std::exception& e)
{
const connection* const con = _cons.element().value()->con.get();
std::ostringstream sout;
sout << "An exception occurred in the controlling node while it was trying to communicate with a listening node.\n";
sout << " Listening processing node address: " << con->get_foreign_ip() << ":" << con->get_foreign_port() << std::endl;
sout << " Controlling processing node address: " << con->get_local_ip() << ":" << con->get_local_port() << std::endl;
sout << " Error message in the exception: " << e.what() << std::endl;
error_message = sout.str();
}
}
// unblock the control node itself case impl2::READ_ERROR: {
shared_ptr<std::string> msg; throw dlib::socket_error(*data.data);
msg_buffer.push_back(msg); } break;
msg_sender_id.push_back(0);
buf_not_empty.signal(); default: {
} throw dlib::socket_error("Unknown message received by dlib::bsp_context");
} break;
} // end switch()
} // end while (true)
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
void bsp_context:: void bsp_context::
read_thread ( send_byte (
impl::bsp_con* con, char val,
unsigned long sender_id unsigned long target_node_id
) )
{ {
try serialize(val, _cons[target_node_id]->stream);
{ _cons[target_node_id]->stream.flush();
using namespace impl;
while(true)
{
char header;
deserialize(header, con->stream);
switch (header)
{
case MESSAGE_HEADER: {
shared_ptr<std::string> msg(new std::string);
deserialize(*msg, con->stream);
auto_mutex lock(class_mutex);
msg_buffer.push_back(msg);
msg_sender_id.push_back(sender_id);
buf_not_empty.signal();
} break;
case WAITING_ON_RECEIVE: {
auto_mutex lock(class_mutex);
++num_waiting_nodes;
notify_everyone_if_all_blocked();
} break;
case NOT_WAITING_ON_RECEIVE: {
auto_mutex lock(class_mutex);
--num_waiting_nodes;
} break;
case ALL_NODES_WAITING: {
// put something into the message buffer that lets
// receive() know to return false. We do this using
// a null msg pointer.
auto_mutex lock(class_mutex);
shared_ptr<std::string> msg;
msg_buffer.push_back(msg);
msg_sender_id.push_back(sender_id);
buf_not_empty.signal();
} break;
case SENT_MESSAGE: {
auto_mutex lock(class_mutex);
++outstanding_messages;
} break;
case GOT_MESSAGE: {
auto_mutex lock(class_mutex);
--outstanding_messages;
} break;
case NODE_TERMINATE: {
auto_mutex lock(class_mutex);
++num_terminated_nodes;
terminated_signal.signal();
if (_node_id == 0)
{
// a terminating node is basically the same as a node that waits forever.
_cons[sender_id]->terminated = true;
++num_waiting_nodes;
notify_everyone_if_all_blocked();
}
return;
} break;
}
}
}
catch (std::exception& e)
{
std::ostringstream sout;
sout << "An exception was thrown while attempting to receive a message from processing node " << sender_id << ".\n";
sout << " Sending processing node address: " << con->con->get_foreign_ip() << ":" << con->con->get_foreign_port() << std::endl;
sout << " Receiving processing node address: " << con->con->get_local_ip() << ":" << con->con->get_local_port() << std::endl;
sout << " Receiving processing node id: "<< _node_id << std::endl;
sout << " Error message in the exception: " << e.what() << std::endl;
auto_mutex lock(class_mutex);
error_message = sout.str();
}
catch (...)
{
std::ostringstream sout;
sout << "An exception was thrown while attempting to receive a message from processing node " << sender_id << ".\n";
sout << " Sending processing node address: " << con->con->get_foreign_ip() << ":" << con->con->get_foreign_port() << std::endl;
sout << " Receiving processing node address: " << con->con->get_local_ip() << ":" << con->con->get_local_port() << std::endl;
sout << " Receiving processing node id: "<< _node_id << std::endl;
auto_mutex lock(class_mutex);
error_message = sout.str();
}
auto_mutex lock(class_mutex);
read_thread_terminated_improperly = true;
buf_not_empty.signal();
++num_terminated_nodes;
terminated_signal.signal();
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
void bsp_context:: void bsp_context::
check_for_errors() broadcast_byte (
char val
)
{ {
auto_mutex lock(class_mutex); for (unsigned long i = 0; i < number_of_nodes(); ++i)
if (error_message.size() != 0)
throw dlib::socket_error(error_message);
if (outstanding_messages != 0)
{ {
std::ostringstream sout; // don't send to yourself or to terminated nodes
sout << "A BSP job was allowed to terminate before all sent messages have been received.\n"; if (i == node_id() || _cons[i]->terminated)
sout << "There are at least " << outstanding_messages << " messages still in flight. Make sure all sent messages\n"; continue;
sout << "have a corresponding call to receive().";
throw dlib::socket_error(sout.str()); send_byte(val,i);
} }
} }
...@@ -546,14 +541,15 @@ namespace dlib ...@@ -546,14 +541,15 @@ namespace dlib
unsigned long target_node_id unsigned long target_node_id
) )
{ {
using namespace impl; using namespace impl2;
if (_cons[target_node_id]->terminated) if (_cons[target_node_id]->terminated)
throw socket_error("Attempt to send a message to a node that has terminated."); throw socket_error("Attempt to send a message to a node that has terminated.");
serialize(MESSAGE_HEADER, _cons[target_node_id]->stream); serialize(MESSAGE_HEADER, _cons[target_node_id]->stream);
serialize(item, _cons[target_node_id]->stream); serialize(item, _cons[target_node_id]->stream);
_cons[target_node_id]->stream.flush(); _cons[target_node_id]->stream.flush();
send_to_master_node(SENT_MESSAGE);
++outstanding_messages;
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
......
...@@ -19,7 +19,7 @@ namespace dlib ...@@ -19,7 +19,7 @@ namespace dlib
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
namespace impl namespace impl1
{ {
struct bsp_con struct bsp_con
{ {
...@@ -71,6 +71,82 @@ namespace dlib ...@@ -71,6 +71,82 @@ namespace dlib
map_id_to_con& cons, map_id_to_con& cons,
unsigned short port unsigned short port
); );
struct msg_data
{
shared_ptr<std::string> data;
unsigned long sender_id;
char msg_type;
};
class thread_safe_deque
{
public:
thread_safe_deque() : sig(class_mutex),disabled(false) {}
~thread_safe_deque()
{
disable();
}
void disable()
{
auto_mutex lock(class_mutex);
disabled = true;
sig.broadcast();
}
unsigned long size() const { return data.size(); }
void push_front( const msg_data& item)
{
auto_mutex lock(class_mutex);
data.push_front(item);
sig.signal();
}
void push_and_consume( msg_data& item)
{
auto_mutex lock(class_mutex);
data.push_back(item);
// do this here so that we don't have to worry about different threads touching the shared_ptr.
item.data.reset();
sig.signal();
}
bool pop (
msg_data& item
)
/*!
ensures
- if (this function returns true) then
- #item == the next thing from the queue
- else
- this object is disabled
!*/
{
auto_mutex lock(class_mutex);
while (data.size() == 0 && !disabled)
sig.wait();
if (disabled)
return false;
item = data.front();
data.pop_front();
return true;
}
private:
std::deque<msg_data> data;
dlib::mutex class_mutex;
dlib::signaler sig;
bool disabled;
};
} }
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -159,7 +235,7 @@ namespace dlib ...@@ -159,7 +235,7 @@ namespace dlib
bsp_context( bsp_context(
unsigned long node_id_, unsigned long node_id_,
impl::map_id_to_con& cons_ impl1::map_id_to_con& cons_
); );
void close_all_connections_gracefully(); void close_all_connections_gracefully();
...@@ -175,28 +251,16 @@ namespace dlib ...@@ -175,28 +251,16 @@ namespace dlib
unsigned long& sending_node_id unsigned long& sending_node_id
); );
void send_to_master_node (
char msg
);
void notify_everyone_if_all_blocked( void send_byte (
char val,
unsigned long target_node_id
); );
/*!
requires
- class_mutex is locked
ensures
- sends out notifications to all the nodes if we are all blocked on receive. This
will cause all receive calls to unblock and return false.
!*/
void read_thread ( void broadcast_byte (
impl::bsp_con* con, char val
unsigned long sender_id
); );
void check_for_errors();
void send_data( void send_data(
const std::string& item, const std::string& item,
unsigned long target_node_id unsigned long target_node_id
...@@ -211,18 +275,14 @@ namespace dlib ...@@ -211,18 +275,14 @@ namespace dlib
rmutex class_mutex; // used to lock any class members touched from more than one thread.
std::string error_message;
bool read_thread_terminated_improperly; // true if any of our connections goes down.
unsigned long outstanding_messages; unsigned long outstanding_messages;
unsigned long num_waiting_nodes; unsigned long num_waiting_nodes;
unsigned long num_terminated_nodes; unsigned long num_terminated_nodes;
rsignaler buf_not_empty; // used to signal when msg_buffer isn't empty
rsignaler terminated_signal;
std::deque<shared_ptr<std::string> > msg_buffer;
std::deque<unsigned long> msg_sender_id;
impl::map_id_to_con& _cons; impl1::thread_safe_deque msg_buffer;
impl1::map_id_to_con& _cons;
const unsigned long _node_id; const unsigned long _node_id;
array<scoped_ptr<thread_function> > threads; array<scoped_ptr<thread_function> > threads;
...@@ -366,7 +426,7 @@ namespace dlib ...@@ -366,7 +426,7 @@ namespace dlib
funct_type& funct funct_type& funct
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
const unsigned long node_id = 0; const unsigned long node_id = 0;
connect_all(cons, hosts, node_id); connect_all(cons, hosts, node_id);
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
...@@ -387,7 +447,7 @@ namespace dlib ...@@ -387,7 +447,7 @@ namespace dlib
ARG1 arg1 ARG1 arg1
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
const unsigned long node_id = 0; const unsigned long node_id = 0;
connect_all(cons, hosts, node_id); connect_all(cons, hosts, node_id);
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
...@@ -410,7 +470,7 @@ namespace dlib ...@@ -410,7 +470,7 @@ namespace dlib
ARG2 arg2 ARG2 arg2
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
const unsigned long node_id = 0; const unsigned long node_id = 0;
connect_all(cons, hosts, node_id); connect_all(cons, hosts, node_id);
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
...@@ -435,7 +495,7 @@ namespace dlib ...@@ -435,7 +495,7 @@ namespace dlib
ARG3 arg3 ARG3 arg3
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
const unsigned long node_id = 0; const unsigned long node_id = 0;
connect_all(cons, hosts, node_id); connect_all(cons, hosts, node_id);
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
...@@ -462,7 +522,7 @@ namespace dlib ...@@ -462,7 +522,7 @@ namespace dlib
ARG4 arg4 ARG4 arg4
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
const unsigned long node_id = 0; const unsigned long node_id = 0;
connect_all(cons, hosts, node_id); connect_all(cons, hosts, node_id);
send_out_connection_orders(cons, hosts); send_out_connection_orders(cons, hosts);
...@@ -483,7 +543,7 @@ namespace dlib ...@@ -483,7 +543,7 @@ namespace dlib
funct_type& funct funct_type& funct
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
unsigned long node_id; unsigned long node_id;
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp_context obj(node_id, cons); bsp_context obj(node_id, cons);
...@@ -503,7 +563,7 @@ namespace dlib ...@@ -503,7 +563,7 @@ namespace dlib
ARG1 arg1 ARG1 arg1
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
unsigned long node_id; unsigned long node_id;
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp_context obj(node_id, cons); bsp_context obj(node_id, cons);
...@@ -525,7 +585,7 @@ namespace dlib ...@@ -525,7 +585,7 @@ namespace dlib
ARG2 arg2 ARG2 arg2
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
unsigned long node_id; unsigned long node_id;
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp_context obj(node_id, cons); bsp_context obj(node_id, cons);
...@@ -549,7 +609,7 @@ namespace dlib ...@@ -549,7 +609,7 @@ namespace dlib
ARG3 arg3 ARG3 arg3
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
unsigned long node_id; unsigned long node_id;
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp_context obj(node_id, cons); bsp_context obj(node_id, cons);
...@@ -575,7 +635,7 @@ namespace dlib ...@@ -575,7 +635,7 @@ namespace dlib
ARG4 arg4 ARG4 arg4
) )
{ {
impl::map_id_to_con cons; impl1::map_id_to_con cons;
unsigned long node_id; unsigned long node_id;
listen_and_connect_all(node_id, cons, listening_port); listen_and_connect_all(node_id, cons, listening_port);
bsp_context obj(node_id, cons); bsp_context obj(node_id, cons);
......
...@@ -15,6 +15,11 @@ namespace dlib ...@@ -15,6 +15,11 @@ namespace dlib
{ {
/*! /*!
WHAT THIS OBJECT REPRESENTS WHAT THIS OBJECT REPRESENTS
THREAD SAFETY
This object is not thread-safe. This means you must serialize all access
to it using an appropriate mutex or other synchronization mechanism if it
is to be accessed from multiple threads.
!*/ !*/
public: public:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment