Commit 4d47d3d3 authored by Davis King's avatar Davis King

Added futures support to the thread_pool object.

--HG--
extra : convert_revision : svn%3Afdd8eb12-d10e-0410-9acb-85c331704f74/trunk%402673
parent fd6baebf
......@@ -144,7 +144,9 @@ namespace dlib
}
// now do the task
if (task.mfp0)
if (task.bfp)
task.bfp();
else if (task.mfp0)
task.mfp0();
else if (task.mfp1)
task.mfp1(task.arg1);
......@@ -156,6 +158,7 @@ namespace dlib
{ auto_mutex M(m);
tasks[idx].is_being_processed = false;
tasks[idx].task_id = 0;
tasks[idx].bfp.clear();
tasks[idx].mfp0.clear();
tasks[idx].mfp1.clear();
tasks[idx].mfp2.clear();
......@@ -219,6 +222,49 @@ namespace dlib
return static_cast<unsigned long>(id%tasks.size());
}
// ----------------------------------------------------------------------------------------
uint64 thread_pool::
add_task (
const bfp_type& bfp
)
{
auto_mutex M(m);
const thread_id_type my_thread_id = get_thread_id();
// find a thread that isn't doing anything
long idx = find_empty_task_slot();
if (idx == -1 && is_worker_thread(my_thread_id))
{
// this function is being called from within a worker thread and there
// aren't any other worker threads free so just perform the task right
// here
m.unlock();
bfp();
// return a task id that is both non-zero and also one
// that is never normally returned. This way calls
// to wait_for_task() will never block given this id.
return 1;
}
// wait until there is a thread that isn't doing anything
while (idx == -1)
{
task_done_signaler.wait();
idx = find_empty_task_slot();
}
tasks[idx].thread_id = my_thread_id;
tasks[idx].task_id = make_next_task_id(idx);
tasks[idx].bfp = bfp;
task_ready_signaler.signal();
return tasks[idx].task_id;
}
// ----------------------------------------------------------------------------------------
}
......
......@@ -5,6 +5,7 @@
#include "thread_pool_extension_abstract.h"
#include "../member_function_pointer.h"
#include "../bound_function_pointer.h"
#include "threads_kernel.h"
#include "auto_mutex_extension.h"
#include "multithreaded_object_extension.h"
......@@ -14,6 +15,109 @@
namespace dlib
{
// ----------------------------------------------------------------------------------------
class thread_pool;
template <
typename T
>
class future
{
/*!
INITIAL VALUE
- task_id == 0
- tp == 0
CONVENTION
- is_ready() == (tp == 0)
- get() == var
- if (tp != 0)
- tp == a pointer to the thread_pool that is using this future object
- task_id == the task id of the task in the thread pool tp that is using
this future object.
!*/
public:
future (
) : task_id(0), tp(0) {}
future (
const T& item
) : task_id(0), tp(0), var(item) {}
future (
const future& item
) :task_id(0), tp(0), var(item.get()) {}
future& operator=(
const T& item
) { get() = item; return *this; }
future& operator=(
const future& item
) { get() = item.get(); return *this; }
operator T& (
) { return get(); }
operator const T& (
) const { return get(); }
T& get (
) { wait(); return var; }
const T& get (
) const { wait(); return var; }
bool is_ready (
) const { return tp == 0; }
private:
friend class thread_pool;
inline void wait () const;
mutable uint64 task_id;
mutable thread_pool* tp;
T var;
};
// ----------------------------------------------------------------------------------------
template <typename T>
inline void swap (
future<T>& a,
future<T>& b
) { dlib::exchange(a.get(), b.get()); }
// Note that dlib::exchange() just calls std::swap. I'm only using it because
// this works around some bugs in certain compilers.
// ----------------------------------------------------------------------------------------
template <typename T> bool operator== (const future<T>& a, const future<T>& b) { return a.get() == b.get(); }
template <typename T> bool operator!= (const future<T>& a, const future<T>& b) { return a.get() != b.get(); }
template <typename T> bool operator<= (const future<T>& a, const future<T>& b) { return a.get() <= b.get(); }
template <typename T> bool operator>= (const future<T>& a, const future<T>& b) { return a.get() >= b.get(); }
template <typename T> bool operator< (const future<T>& a, const future<T>& b) { return a.get() < b.get(); }
template <typename T> bool operator> (const future<T>& a, const future<T>& b) { return a.get() > b.get(); }
template <typename T> bool operator== (const future<T>& a, const T& b) { return a.get() == b; }
template <typename T> bool operator== (const T& a, const future<T>& b) { return a.get() == b; }
template <typename T> bool operator!= (const future<T>& a, const T& b) { return a.get() != b; }
template <typename T> bool operator!= (const T& a, const future<T>& b) { return a.get() != b; }
template <typename T> bool operator<= (const future<T>& a, const T& b) { return a.get() <= b; }
template <typename T> bool operator<= (const T& a, const future<T>& b) { return a.get() <= b; }
template <typename T> bool operator>= (const future<T>& a, const T& b) { return a.get() >= b; }
template <typename T> bool operator>= (const T& a, const future<T>& b) { return a.get() >= b; }
template <typename T> bool operator< (const future<T>& a, const T& b) { return a.get() < b; }
template <typename T> bool operator< (const T& a, const future<T>& b) { return a.get() < b; }
template <typename T> bool operator> (const future<T>& a, const T& b) { return a.get() > b; }
template <typename T> bool operator> (const T& a, const future<T>& b) { return a.get() > b; }
// ----------------------------------------------------------------------------------------
class thread_pool : private multithreaded_object
......@@ -30,6 +134,7 @@ namespace dlib
- worker_thread_ids == an array that contains the thread ids for
all the threads in the thread pool
!*/
typedef bound_function_pointer::kernel_1a_c bfp_type;
public:
explicit thread_pool (
......@@ -181,8 +286,299 @@ namespace dlib
return tasks[idx].task_id;
}
// --------------------
template <typename T, typename T1, typename A1>
uint64 add_task (
T& obj,
void (T::*funct)(T1),
future<A1>& arg1
)
{
bfp_type temp;
temp.set(obj,funct,arg1.get());
uint64 id = add_task(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
return id;
}
template <typename T, typename T1, typename A1>
uint64 add_task (
const T& obj,
void (T::*funct)(T1) const,
future<A1>& arg1
)
{
bfp_type temp;
temp.set(obj,funct,arg1.get());
uint64 id = add_task(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
return id;
}
template <typename T1, typename A1>
uint64 add_task (
void (*funct)(T1),
future<A1>& arg1
)
{
bfp_type temp;
temp.set(funct,arg1.get());
uint64 id = add_task(temp);
// tie the future to this task
arg1.task_id = id;
arg1.tp = this;
return id;
}
// --------------------
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2),
future<A1>& arg1,
future<A2>& arg2
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2) const,
future<A1>& arg1,
future<A2>& arg2
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
return id;
}
template <typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
void (*funct)(T1,T2),
future<A1>& arg1,
future<A2>& arg2
)
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
return id;
}
// --------------------
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2,T3),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
arg3.task_id = id;
arg3.tp = this;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2,T3) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
arg3.task_id = id;
arg3.tp = this;
return id;
}
template <typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
void (*funct)(T1,T2,T3),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
)
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get(), arg3.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
arg3.task_id = id;
arg3.tp = this;
return id;
}
// --------------------
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2,T3,T4),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
arg3.task_id = id;
arg3.tp = this;
arg4.task_id = id;
arg4.tp = this;
return id;
}
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2,T3,T4) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
bfp_type temp;
temp.set(obj, funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
arg3.task_id = id;
arg3.tp = this;
arg4.task_id = id;
arg4.tp = this;
return id;
}
template <typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
void (*funct)(T1,T2,T3,T4),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
)
{
bfp_type temp;
temp.set(funct, arg1.get(), arg2.get(), arg3.get(), arg4.get());
uint64 id = add_task(temp);
// tie the futures to this task
arg1.task_id = id;
arg1.tp = this;
arg2.task_id = id;
arg2.tp = this;
arg3.task_id = id;
arg3.tp = this;
arg4.task_id = id;
arg4.tp = this;
return id;
}
// --------------------
private:
uint64 add_task (
const bfp_type& bfp
);
/*!
ensures
- adds a task to call the given bfp object.
- returns the task id for this new task
!*/
bool is_worker_thread (
const thread_id_type id
) const;
......@@ -289,6 +685,7 @@ namespace dlib
member_function_pointer<>::kernel_1a_c mfp0;
member_function_pointer<long>::kernel_1a_c mfp1;
member_function_pointer<long,long>::kernel_1a_c mfp2;
bfp_type bfp;
};
......@@ -306,6 +703,22 @@ namespace dlib
};
// ----------------------------------------------------------------------------------------
template <typename T>
void future<T>::
wait (
) const
{
if (tp)
{
tp->wait_for_task(task_id);
tp = 0;
task_id = 0;
}
}
}
// ----------------------------------------------------------------------------------------
......
......@@ -9,6 +9,189 @@
namespace dlib
{
// ----------------------------------------------------------------------------------------
template <
typename T
>
class future
{
/*!
INITIAL VALUE
- is_ready() == true
WHAT THIS OBJECT REPRESENTS
This object represents a container that allows you to safely pass objects
into the tasks performed by the thread_pool object defined below. An
example will make it clear:
// Suppose you have a global function defined as follows
void add (int a, int b, int& result) { result = a + b; }
// Also suppose you have a thread_pool named tp defined somewhere.
// Then you could do the following.
future<int> a, b, result;
a = 3;
b = 4;
// this function call causes another thread to execute a call to the add() function
// and passes in the int objects contained in a, b, and result
tp.add_task(add,a,b,result);
// This line will wait for the task in the thread pool to finish and then print the
// value in the result integer. So it will print a 7.
cout << result << endl;
!*/
public:
future (
);
/*!
ensures
- The object of type T contained in this future has
an initial value for its type.
- #is_ready() == true
!*/
future (
const T& item
);
/*!
ensures
- #get() == item
- #is_ready() == true
!*/
future (
const future& item
);
/*!
ensures
- if (item.is_ready() == false) then
- the call to this function blocks until the thread processing the task related
to the item future has finished.
- #is_ready() == true
- #item.is_ready() == true
- #get() == item.get()
!*/
bool is_ready (
) const;
/*!
ensures
- if (the value of this future may not yet be ready to be accessed because it is in use by a task in a thread_pool) then
- returns false
- else
- returns true
!*/
future& operator=(
const T& item
);
/*!
ensures
- if (is_ready() == false) then
- the call to this function blocks until the thread processing the task related
to this future has finished.
- #is_ready() == true
- #get() == item
- returns *this
!*/
future& operator=(
const future& item
);
/*!
ensures
- if (is_ready() == false || item.is_ready() == false) then
- the call to this function blocks until the threads processing the tasks related
to this future and the item future have finished.
- #is_ready() == true
- #item.is_ready() == true
- #get() == item.get()
- returns *this
!*/
operator T& (
);
/*!
ensures
- if (is_ready() == false) then
- the call to this function blocks until the thread processing the task related
to this future has finished.
- #is_ready() == true
- returns get()
!*/
operator const T& (
);
/*!
ensures
- if (is_ready() == false) then
- the call to this function blocks until the thread processing the task related
to this future has finished.
- #is_ready() == true
- returns get()
!*/
T& get (
);
/*!
ensures
- if (is_ready() == false) then
- the call to this function blocks until the thread processing the task related
to this future has finished.
- #is_ready() == true
- returns a non-const reference to the object of type T contained inside this future
!*/
const T& get (
);
/*!
ensures
- if (is_ready() == false) then
- the call to this function blocks until the thread processing the task related
to this future has finished.
- #is_ready() == true
- returns a const reference to the object of type T contained inside this future
!*/
};
// ----------------------------------------------------------------------------------------
template <typename T>
inline void swap (
future<T>& a,
future<T>& b
) { std::swap(a.get(), b.get()); }
/*!
provides a global swap function
!*/
// ----------------------------------------------------------------------------------------
// The future object comes with overloads for all the usual comparison operators.
template <typename T> bool operator== (const future<T>& a, const future<T>& b) { return a.get() == b.get(); }
template <typename T> bool operator!= (const future<T>& a, const future<T>& b) { return a.get() != b.get(); }
template <typename T> bool operator<= (const future<T>& a, const future<T>& b) { return a.get() <= b.get(); }
template <typename T> bool operator>= (const future<T>& a, const future<T>& b) { return a.get() >= b.get(); }
template <typename T> bool operator< (const future<T>& a, const future<T>& b) { return a.get() < b.get(); }
template <typename T> bool operator> (const future<T>& a, const future<T>& b) { return a.get() > b.get(); }
template <typename T> bool operator== (const future<T>& a, const T& b) { return a.get() == b; }
template <typename T> bool operator== (const T& a, const future<T>& b) { return a.get() == b; }
template <typename T> bool operator!= (const future<T>& a, const T& b) { return a.get() != b; }
template <typename T> bool operator!= (const T& a, const future<T>& b) { return a.get() != b; }
template <typename T> bool operator<= (const future<T>& a, const T& b) { return a.get() <= b; }
template <typename T> bool operator<= (const T& a, const future<T>& b) { return a.get() <= b; }
template <typename T> bool operator>= (const future<T>& a, const T& b) { return a.get() >= b; }
template <typename T> bool operator>= (const T& a, const future<T>& b) { return a.get() >= b; }
template <typename T> bool operator< (const future<T>& a, const T& b) { return a.get() < b; }
template <typename T> bool operator< (const T& a, const future<T>& b) { return a.get() < b; }
template <typename T> bool operator> (const future<T>& a, const T& b) { return a.get() > b; }
template <typename T> bool operator> (const T& a, const future<T>& b) { return a.get() > b; }
// ----------------------------------------------------------------------------------------
class thread_pool
......@@ -17,6 +200,12 @@ namespace dlib
WHAT THIS OBJECT REPRESENTS
This object represents a fixed size group of threads which you can
submit tasks to and then wait for those tasks to be completed.
EXCEPTIONS
Note that if an exception is thrown inside a task thread and
is not caught then the normal rule for uncaught exceptions in
threads applies. That is, the application will be terminated
and the text of the exception will be printed to standard error.
!*/
public:
......@@ -27,7 +216,7 @@ namespace dlib
requires
- num_threads > 0
ensures
- num_threads_in_pool() == num_threads
- #num_threads_in_pool() == num_threads
throws
- std::bad_alloc
- dlib::thread_error
......@@ -66,7 +255,7 @@ namespace dlib
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.funct)()
is handed off to that thread which then calls (obj.*funct)()
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
......@@ -88,7 +277,7 @@ namespace dlib
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.funct)(arg1)
is handed off to that thread which then calls (obj.*funct)(arg1)
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
......@@ -111,7 +300,7 @@ namespace dlib
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.funct)(arg1,arg2)
is handed off to that thread which then calls (obj.*funct)(arg1,arg2)
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
......@@ -136,6 +325,189 @@ namespace dlib
finished.
!*/
// --------------------
template <typename T, typename T1, typename A1>
uint64 add_task (
T& obj,
void (T::*funct)(T1),
future<A1>& arg1
);
/*!
requires
- funct == a valid member function pointer for class T
- (obj.*funct)(arg1.get()) must be a valid expression.
(i.e. The A1 type stored in the future must be a type that can be passed into the given function)
ensures
- if (the thread calling this function is actually one of the threads in the
thread pool and there aren't any free threads available) then
- calls (obj.*funct)(arg1.get()) within the calling thread and returns
when it finishes
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.*funct)(arg1.get()).
- #arg1.is_ready() == false
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
template <typename T, typename T1, typename A1>
uint64 add_task (
const T& obj,
void (T::*funct)(T1) const,
future<A1>& arg1
);
/*!
requires
- funct == a valid member function pointer for class T
- (obj.*funct)(arg1.get()) must be a valid expression.
(i.e. The A1 type stored in the future must be a type that can be passed into the given function)
ensures
- if (the thread calling this function is actually one of the threads in the
thread pool and there aren't any free threads available) then
- calls (obj.*funct)(arg1.get()) within the calling thread and returns
when it finishes
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.*funct)(arg1.get()).
- #arg1.is_ready() == false
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
template <typename T1, typename A1>
uint64 add_task (
void (*funct)(T1),
future<A1>& arg1
);
/*!
requires
- funct == a valid function pointer
- (funct)(arg1.get()) must be a valid expression.
(i.e. The A1 type stored in the future must be a type that can be passed into the given function)
ensures
- if (the thread calling this function is actually one of the threads in the
thread pool and there aren't any free threads available) then
- calls funct(arg1.get()) within the calling thread and returns
when it finishes
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls funct(arg1.get()).
- #arg1.is_ready() == false
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
// --------------------------------------------------------------------------------
// The remainder of this class just contains overloads for add_task() that take up
// to 4 futures. Their behavior is identical to the above add_task() functions.
// --------------------------------------------------------------------------------
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2),
future<A1>& arg1,
future<A2>& arg2
);
template <typename T, typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2) const,
future<A1>& arg1,
future<A2>& arg2
);
template <typename T1, typename A1,
typename T2, typename A2>
uint64 add_task (
void (*funct)(T1,T2),
future<A1>& arg1,
future<A2>& arg2
);
// --------------------
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2,T3),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
);
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2,T3) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
);
template <typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3>
uint64 add_task (
void (*funct)(T1,T2,T3),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3
);
// --------------------
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
T& obj,
void (T::*funct)(T1,T2,T3,T4),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
);
template <typename T, typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
const T& obj,
void (T::*funct)(T1,T2,T3,T4) const,
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
);
template <typename T1, typename A1,
typename T2, typename A2,
typename T3, typename A3,
typename T4, typename A4>
uint64 add_task (
void (*funct)(T1,T2,T3,T4),
future<A1>& arg1,
future<A2>& arg2,
future<A3>& arg3,
future<A4>& arg4
);
// --------------------
private:
// restricted functions
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment