Commit 6c655274 authored by Davis King's avatar Davis King

Changed the thread_pool and future classes so that any mixture of

destruction orders between the two is legal.

--HG--
extra : convert_revision : svn%3Afdd8eb12-d10e-0410-9acb-85c331704f74/trunk%403138
parent af20cb9f
......@@ -67,12 +67,13 @@ namespace
void perform_test (
)
{
add_functor f;
for (int num_threads= 0; num_threads < 4; ++num_threads)
{
future<int> a, b, c, res;
thread_pool tp(num_threads);
print_spinner();
future<int> a, b, c, res;
future<some_struct> obj;
......@@ -205,12 +206,12 @@ namespace
a = 1;
b = 2;
res = 0;
add_functor f;
tp.add_task(f, a, b, res);
DLIB_TEST(a == 1);
DLIB_TEST(b == 2);
DLIB_TEST(res == 3);
global_var = 0;
DLIB_TEST(global_var == 0);
id = tp.add_task(&set_global_var);
......@@ -239,6 +240,9 @@ namespace
}
// add this task just to to perterb the thread pool before it goes out of scope
tp.add_task(f, a, b, res);
}
}
......
......@@ -10,8 +10,8 @@ namespace dlib
// ----------------------------------------------------------------------------------------
thread_pool::
thread_pool (
thread_pool_implementation::
thread_pool_implementation (
unsigned long num_threads
) :
task_done_signaler(m),
......@@ -21,7 +21,7 @@ namespace dlib
tasks.resize(num_threads);
for (unsigned long i = 0; i < num_threads; ++i)
{
register_thread(*this, &thread_pool::thread);
register_thread(*this, &thread_pool_implementation::thread);
}
start();
......@@ -29,10 +29,33 @@ namespace dlib
// ----------------------------------------------------------------------------------------
thread_pool::
~thread_pool()
void thread_pool_implementation::
shutdown_pool (
)
{
{
auto_mutex M(m);
// first wait for all pending tasks to finish
bool found_task = true;
while (found_task)
{
{auto_mutex M(m);
found_task = false;
for (unsigned long i = 0; i < tasks.size(); ++i)
{
// If task bucket i has a task that is currently supposed to be processed
if (tasks[i].is_empty() == false)
{
found_task = true;
break;
}
}
if (found_task)
task_done_signaler.wait();
}
// now tell the threads to kill themselves
we_are_destructing = true;
task_ready_signaler.broadcast();
}
......@@ -42,7 +65,15 @@ namespace dlib
// ----------------------------------------------------------------------------------------
unsigned long thread_pool::
thread_pool_implementation::
~thread_pool_implementation()
{
shutdown_pool();
}
// ----------------------------------------------------------------------------------------
unsigned long thread_pool_implementation::
num_threads_in_pool (
) const
{
......@@ -52,7 +83,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
void thread_pool::
void thread_pool_implementation::
wait_for_task (
uint64 task_id
) const
......@@ -68,7 +99,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
void thread_pool::
void thread_pool_implementation::
wait_for_all_tasks (
) const
{
......@@ -97,7 +128,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
bool thread_pool::
bool thread_pool_implementation::
is_worker_thread (
const thread_id_type id
) const
......@@ -118,7 +149,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
void thread_pool::
void thread_pool_implementation::
thread (
)
{
......@@ -175,7 +206,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
long thread_pool::
long thread_pool_implementation::
find_empty_task_slot (
) const
{
......@@ -190,7 +221,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
long thread_pool::
long thread_pool_implementation::
find_ready_task (
) const
{
......@@ -205,7 +236,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
uint64 thread_pool::
uint64 thread_pool_implementation::
make_next_task_id (
long idx
)
......@@ -217,7 +248,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
unsigned long thread_pool::
unsigned long thread_pool_implementation::
task_id_to_index (
uint64 id
) const
......@@ -227,7 +258,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
uint64 thread_pool::
uint64 thread_pool_implementation::
add_task_internal (
const bfp_type& bfp
)
......@@ -270,7 +301,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
bool thread_pool::
bool thread_pool_implementation::
is_task_thread (
) const
{
......
......@@ -11,13 +11,14 @@
#include "multithreaded_object_extension.h"
#include "../uintn.h"
#include "../array.h"
#include "../smart_pointers_thread_safe.h"
namespace dlib
{
// ----------------------------------------------------------------------------------------
class thread_pool;
class thread_pool_implementation;
template <
typename T
......@@ -27,29 +28,29 @@ namespace dlib
/*!
INITIAL VALUE
- task_id == 0
- tp == 0
- tp.get() == 0
CONVENTION
- is_ready() == (tp == 0)
- is_ready() == (tp.get() == 0)
- get() == var
- if (tp != 0)
- tp == a pointer to the thread_pool that is using this future object
- if (tp.get() != 0)
- tp == a pointer to the thread_pool_implementation that is using this future object
- task_id == the task id of the task in the thread pool tp that is using
this future object.
!*/
public:
future (
) : task_id(0), tp(0) {}
) : task_id(0) {}
future (
const T& item
) : task_id(0), tp(0), var(item) {}
) : task_id(0), var(item) {}
future (
const future& item
) :task_id(0), tp(0), var(item.get()) {}
) :task_id(0), var(item.get()) {}
~future (
) { wait(); }
......@@ -75,7 +76,7 @@ namespace dlib
) const { wait(); return var; }
bool is_ready (
) const { return tp == 0; }
) const { return tp.get() == 0; }
private:
......@@ -84,7 +85,7 @@ namespace dlib
inline void wait () const;
mutable uint64 task_id;
mutable thread_pool* tp;
mutable shared_ptr_thread_safe<thread_pool_implementation> tp;
T var;
};
......@@ -123,7 +124,7 @@ namespace dlib
// ----------------------------------------------------------------------------------------
class thread_pool : private multithreaded_object
class thread_pool_implementation : private multithreaded_object
{
/*!
CONVENTION
......@@ -141,12 +142,13 @@ namespace dlib
!*/
typedef bound_function_pointer::kernel_1a_c bfp_type;
public:
explicit thread_pool (
friend class thread_pool;
explicit thread_pool_implementation (
unsigned long num_threads
);
~thread_pool(
public:
~thread_pool_implementation(
);
void wait_for_task (
......@@ -294,6 +296,221 @@ namespace dlib
return tasks[idx].task_id;
}
uint64 add_task_internal (
const bfp_type& bfp
);
/*!
ensures
- adds a task to call the given bfp object.
- returns the task id for this new task
!*/
void shutdown_pool (
);
/*!
ensures
- causes all threads to terminate and blocks the
caller until this happens.
!*/
private:
bool is_worker_thread (
const thread_id_type id
) const;
/*!
requires
- m is locked
ensures
- if (thread with given id is one of the thread pool's worker threads or num_threads_in_pool() == 0) then
- returns true
- else
- returns false
!*/
void thread (
);
/*!
this is the function that executes the threads in the thread pool
!*/
long find_empty_task_slot (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a empty task slot) then
- returns the index of that task slot in tasks
- there is a task slot
- else
- returns -1
!*/
long find_ready_task (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a task to do) then
- returns the index of that task in tasks
- else
- returns -1
!*/
uint64 make_next_task_id (
long idx
);
/*!
requires
- m is locked
- 0 <= idx < tasks.size()
ensures
- returns the next index to be used for tasks that are placed in
tasks[idx]
!*/
unsigned long task_id_to_index (
uint64 id
) const;
/*!
requires
- m is locked
- num_threads_in_pool() != 0
ensures
- returns the index in tasks corresponding to the given id
!*/
struct task_state_type
{
task_state_type() : is_being_processed(false), task_id(0), next_task_id(2), arg1(0), arg2(0) {}
bool is_ready () const
/*!
ensures
- if (is_empty() == false && no thread is currently processing this task) then
- returns true
- else
- returns false
!*/
{
return !is_being_processed && !is_empty();
}
bool is_empty () const
/*!
ensures
- if (this task state is empty. i.e. it doesn't contain a task to be processed) then
- returns true
- else
- returns false
!*/
{
return task_id == 0;
}
bool is_being_processed; // true when a thread is working on this task
uint64 task_id; // the id of this task. 0 means this task is empty
thread_id_type thread_id; // the id of the thread that requested this task
uint64 next_task_id;
long arg1;
long arg2;
member_function_pointer<>::kernel_1a_c mfp0;
member_function_pointer<long>::kernel_1a_c mfp1;
member_function_pointer<long,long>::kernel_1a_c mfp2;
bfp_type bfp;
};
array<task_state_type>::expand_1d_c tasks;
array<thread_id_type>::expand_1d_c worker_thread_ids;
mutex m;
signaler task_done_signaler;
signaler task_ready_signaler;
bool we_are_destructing;
// restricted functions
thread_pool_implementation(thread_pool_implementation&); // copy constructor
thread_pool_implementation& operator=(thread_pool_implementation&); // assignment operator
};
// ----------------------------------------------------------------------------------------
class thread_pool
{
/*!
This object is just a shell that holds a shared_ptr_thread_safe
to the real thread_pool_implementation object. The reason for doing
it this way is so that we can allow any mixture of destruction orders
between thread_pool objects and futures. Whoever gets destroyed
last cleans up the thread_pool_implementation resources.
!*/
typedef bound_function_pointer::kernel_1a_c bfp_type;
public:
explicit thread_pool (
unsigned long num_threads
)
{
impl.reset(new thread_pool_implementation(num_threads));
}
~thread_pool (
)
{
impl->shutdown_pool();
}
void wait_for_task (
uint64 task_id
) const { impl->wait_for_task(task_id); }
unsigned long num_threads_in_pool (
) const { return impl->num_threads_in_pool(); }
void wait_for_all_tasks (
) const { impl->wait_for_all_tasks(); }
bool is_task_thread (
) const { return impl->is_task_thread(); }
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)()
)
{
return impl->add_task(obj, funct);
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long),
long arg1
)
{
return impl->add_task(obj, funct, arg1);
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long,long),
long arg1,
long arg2
)
{
return impl->add_task(obj, funct, arg1, arg2);
}
// --------------------
template <typename F>
......@@ -306,7 +523,7 @@ namespace dlib
bfp_type temp;
temp.set(function_object);
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
return id;
}
......@@ -319,7 +536,7 @@ namespace dlib
{
bfp_type temp;
temp.set(obj,funct);
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
return id;
}
......@@ -330,7 +547,7 @@ namespace dlib
{
bfp_type temp;
temp.set(funct);
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
return id;
}
......@@ -348,11 +565,11 @@ namespace dlib
bfp_type temp;
temp.set(function_object,arg1.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
return id;
}
......@@ -365,11 +582,11 @@ namespace dlib
{
bfp_type temp;
temp.set(obj,funct,arg1.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
return id;
}
......@@ -382,11 +599,11 @@ namespace dlib
{
bfp_type temp;
temp.set(obj,funct,arg1.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
return id;
}
......@@ -398,11 +615,11 @@ namespace dlib
{
bfp_type temp;
temp.set(funct,arg1.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
return id;
}
......@@ -420,13 +637,13 @@ namespace dlib
bfp_type temp;
temp.set(function_object, arg1.get(), arg2.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
return id;
}
......@@ -441,13 +658,13 @@ namespace dlib
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
return id;
}
......@@ -462,13 +679,13 @@ namespace dlib
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
return id;
}
......@@ -482,13 +699,13 @@ namespace dlib
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
return id;
}
......@@ -507,15 +724,15 @@ namespace dlib
bfp_type temp;
temp.set(function_object, arg1.get(), arg2.get(), arg3.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
return id;
}
......@@ -532,15 +749,15 @@ namespace dlib
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
return id;
}
......@@ -557,15 +774,15 @@ namespace dlib
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
return id;
}
......@@ -581,15 +798,15 @@ namespace dlib
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
return id;
}
......@@ -609,17 +826,17 @@ namespace dlib
bfp_type temp;
temp.set(function_object, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = this;
arg4.tp = impl;
return id;
}
......@@ -638,17 +855,17 @@ namespace dlib
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = this;
arg4.tp = impl;
return id;
}
......@@ -667,17 +884,17 @@ namespace dlib
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = this;
arg4.tp = impl;
return id;
}
......@@ -695,151 +912,23 @@ namespace dlib
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = add_task_internal(temp);
uint64 id = impl->add_task_internal(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg1.tp = impl;
arg2.task_id = id;
arg2.tp = this;
arg2.tp = impl;
arg3.task_id = id;
arg3.tp = this;
arg3.tp = impl;
arg4.task_id = id;
arg4.tp = this;
arg4.tp = impl;
return id;
}
// --------------------
private:
uint64 add_task_internal (
const bfp_type& bfp
);
/*!
ensures
- adds a task to call the given bfp object.
- returns the task id for this new task
!*/
bool is_worker_thread (
const thread_id_type id
) const;
/*!
requires
- m is locked
ensures
- if (thread with given id is one of the thread pool's worker threads or num_threads_in_pool() == 0) then
- returns true
- else
- returns false
!*/
void thread (
);
/*!
this is the function that executes the threads in the thread pool
!*/
long find_empty_task_slot (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a empty task slot) then
- returns the index of that task slot in tasks
- there is a task slot
- else
- returns -1
!*/
long find_ready_task (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a task to do) then
- returns the index of that task in tasks
- else
- returns -1
!*/
uint64 make_next_task_id (
long idx
);
/*!
requires
- m is locked
- 0 <= idx < tasks.size()
ensures
- returns the next index to be used for tasks that are placed in
tasks[idx]
!*/
unsigned long task_id_to_index (
uint64 id
) const;
/*!
requires
- m is locked
- num_threads_in_pool() != 0
ensures
- returns the index in tasks corresponding to the given id
!*/
struct task_state_type
{
task_state_type() : is_being_processed(false), task_id(0), next_task_id(2), arg1(0), arg2(0) {}
bool is_ready () const
/*!
ensures
- if (is_empty() == false && no thread is currently processing this task) then
- returns true
- else
- returns false
!*/
{
return !is_being_processed && !is_empty();
}
bool is_empty () const
/*!
ensures
- if (this task state is empty. i.e. it doesn't contain a task to be processed) then
- returns true
- else
- returns false
!*/
{
return task_id == 0;
}
bool is_being_processed; // true when a thread is working on this task
uint64 task_id; // the id of this task. 0 means this task is empty
thread_id_type thread_id; // the id of the thread that requested this task
uint64 next_task_id;
long arg1;
long arg2;
member_function_pointer<>::kernel_1a_c mfp0;
member_function_pointer<long>::kernel_1a_c mfp1;
member_function_pointer<long,long>::kernel_1a_c mfp2;
bfp_type bfp;
};
array<task_state_type>::expand_1d_c tasks;
array<thread_id_type>::expand_1d_c worker_thread_ids;
mutex m;
signaler task_done_signaler;
signaler task_ready_signaler;
bool we_are_destructing;
shared_ptr_thread_safe<thread_pool_implementation> impl;
// restricted functions
thread_pool(thread_pool&); // copy constructor
......@@ -858,7 +947,7 @@ namespace dlib
if (tp)
{
tp->wait_for_task(task_id);
tp = 0;
tp.reset();
task_id = 0;
}
}
......
......@@ -76,11 +76,6 @@ namespace dlib
~future (
);
/*!
requires
- if (item.is_ready() == false) then
- The thread_pool that this future was passed to should still exist
(i.e. You can't pass a future to a thread_pool and then destruct the
thread_pool before you destruct the future).
ensures
- if (item.is_ready() == false) then
- the call to this function blocks until the thread processing the task related
......@@ -223,6 +218,12 @@ namespace dlib
mode any thread that calls add_task() is considered to be
a thread_pool thread capable of executing tasks.
Also note that all function objects are passed to the tasks
by reference. This means you should ensure that your function
objects are not destroyed while tasks are still using them.
(e.g. Don't let them go out of scope right after a call to
add_task())
EXCEPTIONS
Note that if an exception is thrown inside a task thread and
is not caught then the normal rule for uncaught exceptions in
......@@ -248,7 +249,7 @@ namespace dlib
);
/*!
ensures
- all resources allocated by *this have been freed.
- blocks until all tasks in the pool have finished.
!*/
bool is_task_thread (
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment