Commit 482d82db authored by Davis King's avatar Davis King

Added multi_device_tensor_averager

parent d3f12d83
......@@ -35,6 +35,64 @@ namespace dlib
return num_devices;
}
bool can_access_peer (int device_id, int peer_device_id)
{
int can_access;
CHECK_CUDA(cudaDeviceCanAccessPeer(&can_access, device_id, peer_device_id));
return can_access;
}
bool can_access_peer (const tensor& device, const tensor& peer_device)
{
return can_access_peer(device.device_id(), peer_device.device_id());
}
void device_synchronize (int dev)
{
raii_set_device set_dev(dev);
CHECK_CUDA(cudaDeviceSynchronize());
}
void device_synchronize (const tensor& dev) { device_synchronize(dev.device_id()); }
enable_peer_access::
enable_peer_access(
int device_id,
int peer_device_id
) : call_disable(false), device_id(device_id), peer_device_id(peer_device_id)
{
raii_set_device set_dev(device_id);
auto err = cudaDeviceEnablePeerAccess(peer_device_id, 0);
if (err == cudaSuccess)
{
call_disable = true;
}
else if (err == cudaErrorPeerAccessAlreadyEnabled)
{
// call cudaGetLastError() to dispose of this error since we don't
// care.
auto err2 = cudaGetLastError();
if (err2 != cudaErrorPeerAccessAlreadyEnabled)
CHECK_CUDA(err2);
}
else
{
CHECK_CUDA(err);
}
}
enable_peer_access::
~enable_peer_access() noexcept(false)
{
if (call_disable)
{
raii_set_device set_dev(device_id);
CHECK_CUDA(cudaDeviceDisablePeerAccess(peer_device_id));
}
}
// -----------------------------------------------------------------------------------
// -----------------------------------------------------------------------------------
// -----------------------------------------------------------------------------------
__global__ void _cuda_multiply1(float* d, const float* s1, const float* s2, size_t n)
......
......@@ -11,8 +11,6 @@ namespace dlib
namespace cuda
{
#ifdef DLIB_USE_CUDA
// ----------------------------------------------------------------------------------------
void set_device (
......@@ -25,6 +23,82 @@ namespace dlib
int get_num_devices (
);
bool can_access_peer (int device_id, int peer_device_id);
bool can_access_peer (const tensor& device, const tensor& peer_device);
void device_synchronize (int dev);
void device_synchronize (const tensor& dev);
class raii_set_device
{
public:
raii_set_device() = delete;
raii_set_device(const raii_set_device&) = delete;
raii_set_device& operator=(const raii_set_device&) = delete;
raii_set_device(int dev)
{
prev_dev = get_device();
set_device(dev);
}
raii_set_device(const tensor& dev)
{
prev_dev = get_device();
set_device(dev.device_id());
}
void operator() (int dev)
{
set_device(dev);
}
void operator() (const tensor& dev)
{
set_device(dev.device_id());
}
~raii_set_device() noexcept(false)
{
set_device(prev_dev);
}
private:
int prev_dev;
};
#ifdef DLIB_USE_CUDA
class enable_peer_access
{
public:
enable_peer_access() = delete;
enable_peer_access(const enable_peer_access&) = delete;
enable_peer_access& operator=(const enable_peer_access&) = delete;
enable_peer_access(
int device_id,
int peer_device_id
);
enable_peer_access(
const tensor& device,
const tensor& peer_device
) : enable_peer_access(device.device_id(), peer_device.device_id())
{}
~enable_peer_access() noexcept(false);
private:
bool call_disable;
int device_id;
int peer_device_id;
};
// -----------------------------------------------------------------------------------
void multiply (
......@@ -188,6 +262,24 @@ namespace dlib
inline int get_num_devices (
) { return 1; }
inline bool can_access_peer (int device_id, int peer_device_id)
{ return false; }
inline bool can_access_peer (const tensor& device, const tensor& peer_device)
{ return false; }
inline void device_synchronize (int ){}
inline void device_synchronize (const tensor& ){}
class enable_peer_access
{
public:
enable_peer_access() = delete;
enable_peer_access(const enable_peer_access&) = delete;
enable_peer_access& operator=(const enable_peer_access&) = delete;
enable_peer_access( int, int ){}
enable_peer_access( const tensor&, const tensor& ) {}
};
#endif // DLIB_USE_CUDA
}
......
......@@ -10,6 +10,7 @@
#include "cpu_dlib.h"
#include "cuda_dlib.h"
#include "../rand.h"
#include <memory>
namespace dlib
{
......@@ -1008,6 +1009,139 @@ namespace dlib { namespace tt
is_same_object(grad, gradient_input)==true
!*/
// ----------------------------------------------------------------------------------------
class multi_device_tensor_averager
{
/*!
WHAT THIS OBJECT REPRESENTS
This object is a tool for very quickly averaging a bunch of tensors
together.
!*/
public:
multi_device_tensor_averager(const multi_device_tensor_averager&) = delete;
multi_device_tensor_averager& operator=(const multi_device_tensor_averager&) = delete;
multi_device_tensor_averager() = default;
void set(
std::vector<tensor*> items
)
/*!
requires
- All the tensors in items are the same size
ensures
- When you call average() we will average the tensors in items.
!*/
{
using namespace ::dlib::cuda;
accessible_groups.clear();
epa.clear();
if (items.size() < 1)
return;
scale = 1.0/items.size();
// split item into groups of accessible devices
std::vector<tensor*> group, unused;
while(items.size() > 0)
{
group.push_back(items[0]);
for(size_t i = 1; i < items.size(); ++i)
{
if (can_access_peer(*items[0], *items[i]))
group.push_back(items[i]);
else
unused.push_back(items[i]);
}
accessible_groups.push_back(group);
unused.swap(items);
unused.clear();
group.clear();
}
for (auto&& g : accessible_groups)
{
for (size_t i = 1; i < g.size(); ++i)
{
epa.emplace_back(new enable_peer_access(*g[0], *g[i]));
}
}
// If there are multiple groups then we need to use the accum_buffer space
// when talking across groups. So allocate that buffer now.
if (accessible_groups.size() > 1)
{
raii_set_device set_dev(*accessible_groups[0][0]);
accum_buffer.copy_size(*accessible_groups[0][0]);
}
}
void average()
/*!
requires
- All the devices have stopped writing to the tensors given to set(). So
you should probably call cudaDeviceSynchronize() on each of the relevant
devices before calling average().
ensures
- Computes the average of all the tensors given to set() and then sets them
all equal to the average.
!*/
{
using namespace ::dlib::cuda;
// First we average things within each group
for (auto&& g : accessible_groups)
{
raii_set_device set_dev(*g[0]);
if (g.size() == 1)
tt::affine_transform(*g[0], *g[0], scale);
else
tt::affine_transform(*g[0], *g[0], *g[1], scale, scale);
for (size_t i = 2; i < g.size(); ++i)
tt::affine_transform(*g[0], *g[0], *g[i], 1, scale);
}
if (accessible_groups.size() > 1)
{
tensor& total_avg = *accessible_groups[0][0];
raii_set_device set_dev(total_avg);
// now we need to average things across groups
for (size_t i = 1; i < accessible_groups.size(); ++i)
{
memcpy(accum_buffer, *accessible_groups[i][0]);
tt::add(total_avg, total_avg, accum_buffer);
}
// Now total_avg has the final average in it. So we need to send
// copies of it back to each of the groups.
for (size_t i = 1; i < accessible_groups.size(); ++i)
{
memcpy(*accessible_groups[i][0], total_avg);
}
}
// Now propagate averages back out to each element using point to point
// communication inside a group.
for (auto&& g : accessible_groups)
{
raii_set_device set_dev(*g[0]);
for (size_t i = 1; i < g.size(); ++i)
memcpy(*g[i], *g[0]);
}
}
private:
std::vector<std::unique_ptr<::dlib::cuda::enable_peer_access>> epa;
std::vector<std::vector<tensor*>> accessible_groups;
float scale;
resizable_tensor accum_buffer;
};
// ----------------------------------------------------------------------------------------
}}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment