Commit 10cccc04 authored by Davis King's avatar Davis King

Added the thread_pool object

--HG--
extra : convert_revision : svn%3Afdd8eb12-d10e-0410-9acb-85c331704f74/trunk%402633
parent f29f7e29
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
#include "../threads/threads_kernel_1.cpp" #include "../threads/threads_kernel_1.cpp"
#include "../threads/threads_kernel_2.cpp" #include "../threads/threads_kernel_2.cpp"
#include "../threads/threads_kernel_shared.cpp" #include "../threads/threads_kernel_shared.cpp"
#include "../threads/thread_pool_extension.cpp"
#include "../timer/timer_kernel_2.cpp" #include "../timer/timer_kernel_2.cpp"
#include "../stack_trace.cpp" #include "../stack_trace.cpp"
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "threads/threaded_object_extension.h" #include "threads/threaded_object_extension.h"
#include "threads/thread_specific_data_extension.h" #include "threads/thread_specific_data_extension.h"
#include "threads/thread_function_extension.h" #include "threads/thread_function_extension.h"
#include "threads/thread_pool_extension.h"
#endif // DLIB_THREADs_ #endif // DLIB_THREADs_
// Copyright (C) 2008 Davis E. King (davisking@users.sourceforge.net)
// License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_THREAD_POOl_CPP__
#define DLIB_THREAD_POOl_CPP__
#include "thread_pool_extension.h"
namespace dlib
{
// ----------------------------------------------------------------------------------------
thread_pool::
thread_pool (
unsigned long num_threads
) :
task_done_signaler(m),
task_ready_signaler(m),
we_are_destructing(false)
{
tasks.expand(num_threads);
for (unsigned long i = 0; i < num_threads; ++i)
{
register_thread(*this, &thread_pool::thread);
}
start();
}
// ----------------------------------------------------------------------------------------
thread_pool::
~thread_pool()
{
{auto_mutex M(m);
we_are_destructing = true;
task_ready_signaler.broadcast();
}
wait();
}
// ----------------------------------------------------------------------------------------
unsigned long thread_pool::
num_threads_in_pool (
) const
{
return tasks.size();
}
// ----------------------------------------------------------------------------------------
void thread_pool::
wait_for_task (
uint64 task_id
) const
{
auto_mutex M(m);
const unsigned long idx = task_id_to_index(task_id);
while (tasks[idx].task_id == task_id)
task_done_signaler.wait();
}
// ----------------------------------------------------------------------------------------
void thread_pool::
wait_for_all_tasks (
) const
{
const thread_id_type thread_id = get_thread_id();
auto_mutex M(m);
bool found_task = true;
while (found_task)
{
found_task = false;
for (unsigned long i = 0; i < tasks.size(); ++i)
{
// If task bucket i has a task that is currently supposed to be processed
// and it originated from the calling thread
if (tasks[i].is_empty() == false && tasks[i].thread_id == thread_id)
{
found_task = true;
break;
}
}
if (found_task)
task_done_signaler.wait();
}
}
// ----------------------------------------------------------------------------------------
bool thread_pool::
is_worker_thread (
const thread_id_type id
) const
{
for (unsigned long i = 0; i < worker_thread_ids.size(); ++i)
{
if (worker_thread_ids[i] == id)
return true;
}
return false;
}
// ----------------------------------------------------------------------------------------
void thread_pool::
thread (
)
{
{
// save the id of this worker thread into worker_thread_ids
auto_mutex M(m);
thread_id_type id = get_thread_id();
worker_thread_ids.push_back(id);
}
task_state_type task;
while (we_are_destructing == false)
{
long idx = 0;
// wait for a task to do
{ auto_mutex M(m);
while ( (idx = find_ready_task()) == -1 && we_are_destructing == false)
task_ready_signaler.wait();
if (we_are_destructing)
break;
tasks[idx].is_being_processed = true;
task = tasks[idx];
}
// now do the task
if (task.mfp0)
task.mfp0();
else if (task.mfp1)
task.mfp1(task.arg1);
else if (task.mfp2)
task.mfp2(task.arg1, task.arg2);
// Now let others know that we finished the task. We do this
// by clearing out the state of this task
{ auto_mutex M(m);
tasks[idx].is_being_processed = false;
tasks[idx].task_id = 0;
tasks[idx].mfp0.clear();
tasks[idx].mfp1.clear();
tasks[idx].mfp2.clear();
tasks[idx].arg1 = 0;
tasks[idx].arg2 = 0;
task_done_signaler.broadcast();
}
}
}
// ----------------------------------------------------------------------------------------
long thread_pool::
find_empty_task_slot (
) const
{
for (unsigned long i = 0; i < tasks.size(); ++i)
{
if (tasks[i].is_empty())
return i;
}
return -1;
}
// ----------------------------------------------------------------------------------------
long thread_pool::
find_ready_task (
) const
{
for (unsigned long i = 0; i < tasks.size(); ++i)
{
if (tasks[i].is_ready())
return i;
}
return -1;
}
// ----------------------------------------------------------------------------------------
uint64 thread_pool::
make_next_task_id (
long idx
)
{
uint64 id = tasks[idx].next_task_id * tasks.size() + idx;
tasks[idx].next_task_id += 1;
return id;
}
// ----------------------------------------------------------------------------------------
unsigned long thread_pool::
task_id_to_index (
uint64 id
) const
{
return id%tasks.size();
}
// ----------------------------------------------------------------------------------------
}
#endif // DLIB_THREAD_POOl_CPP__
// Copyright (C) 2008 Davis E. King (davisking@users.sourceforge.net)
// License: Boost Software License See LICENSE.txt for the full license.
#ifndef DLIB_THREAD_POOl_H__
#define DLIB_THREAD_POOl_H__
#include "thread_pool_extension_abstract.h"
#include "dlib/member_function_pointer.h"
#include "threads_kernel.h"
#include "auto_mutex_extension.h"
#include "multithreaded_object_extension.h"
#include "../uintn.h"
#include "dlib/array.h"
namespace dlib
{
// ----------------------------------------------------------------------------------------
class thread_pool : private multithreaded_object
{
/*!
CONVENTION
- num_threads_in_pool() == tasks.size()
- if (the destructor has been called) then
- we_are_destructing == true
- else
- we_are_destructing == false
- m == the mutex used to protect everything in this object
- worker_thread_ids == an array that contains the thread ids for
all the threads in the thread pool
!*/
public:
explicit thread_pool (
unsigned long num_threads
);
~thread_pool(
);
void wait_for_task (
uint64 task_id
) const;
unsigned long num_threads_in_pool (
) const;
void wait_for_all_tasks (
) const;
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)()
)
{
auto_mutex M(m);
const thread_id_type my_thread_id = get_thread_id();
// find a thread that isn't doing anything
long idx = find_empty_task_slot();
if (idx == -1 && is_worker_thread(my_thread_id))
{
// this function is being called from within a worker thread and there
// aren't any other worker threads free so just perform the task right
// here
m.unlock();
(obj.*funct)();
// return a task id that is both non-zero and also one
// that is never normally returned. This way calls
// to wait_for_task() will never block given this id.
return 1;
}
// wait until there is a thread that isn't doing anything
while (idx == -1)
{
task_done_signaler.wait();
idx = find_empty_task_slot();
}
tasks[idx].thread_id = my_thread_id;
tasks[idx].task_id = make_next_task_id(idx);
tasks[idx].mfp0.set(obj,funct);
task_ready_signaler.signal();
return tasks[idx].task_id;
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long),
long arg1
)
{
auto_mutex M(m);
const thread_id_type my_thread_id = get_thread_id();
// find a thread that isn't doing anything
long idx = find_empty_task_slot();
if (idx == -1 && is_worker_thread(my_thread_id))
{
// this function is being called from within a worker thread and there
// aren't any other worker threads free so just perform the task right
// here
m.unlock();
(obj.*funct)(arg1);
// return a task id that is both non-zero and also one
// that is never normally returned. This way calls
// to wait_for_task() will never block given this id.
return 1;
}
// wait until there is a thread that isn't doing anything
while (idx == -1)
{
task_done_signaler.wait();
idx = find_empty_task_slot();
}
tasks[idx].thread_id = my_thread_id;
tasks[idx].task_id = make_next_task_id(idx);
tasks[idx].mfp1.set(obj,funct);
tasks[idx].arg1 = arg1;
task_ready_signaler.signal();
return tasks[idx].task_id;
}
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long,long),
long arg1,
long arg2
)
{
auto_mutex M(m);
const thread_id_type my_thread_id = get_thread_id();
// find a thread that isn't doing anything
long idx = find_empty_task_slot();
if (idx == -1 && is_worker_thread(my_thread_id))
{
// this function is being called from within a worker thread and there
// aren't any other worker threads free so just perform the task right
// here
m.unlock();
(obj.*funct)(arg1, arg2);
// return a task id that is both non-zero and also one
// that is never normally returned. This way calls
// to wait_for_task() will never block given this id.
return 1;
}
// wait until there is a thread that isn't doing anything
while (idx == -1)
{
task_done_signaler.wait();
idx = find_empty_task_slot();
}
tasks[idx].thread_id = my_thread_id;
tasks[idx].task_id = make_next_task_id(idx);
tasks[idx].mfp2.set(obj,funct);
tasks[idx].arg1 = arg1;
tasks[idx].arg2 = arg2;
task_ready_signaler.signal();
return tasks[idx].task_id;
}
private:
bool is_worker_thread (
const thread_id_type id
) const;
/*!
requires
- m is locked
ensures
- if (thread with given id is one of the thread pool's worker threads) then
- returns true
- else
- returns false
!*/
void thread (
);
/*!
this is the function that executes the threads in the thread pool
!*/
long find_empty_task_slot (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a empty task slot) then
- returns the index of that task slot in tasks
- there is a task slot
- else
- returns -1
!*/
long find_ready_task (
) const;
/*!
requires
- m is locked
ensures
- if (there is currently a task to do) then
- returns the index of that task in tasks
- else
- returns -1
!*/
uint64 make_next_task_id (
long idx
);
/*!
requires
- m is locked
- 0 <= idx < tasks.size()
ensures
- returns the next index to be used for tasks that are placed in
tasks[idx]
!*/
unsigned long task_id_to_index (
uint64 id
) const;
/*!
requires
- m is locked
ensures
- returns the index in tasks corresponding to the given id
!*/
struct task_state_type
{
task_state_type() : is_being_processed(false), task_id(0), next_task_id(2), arg1(0), arg2(0) {}
bool is_ready () const
/*!
ensures
- if (is_empty() == false && no thread is currently processing this task) then
- returns true
- else
- returns false
!*/
{
return !is_being_processed && !is_empty();
}
bool is_empty () const
/*!
ensures
- if (this task state is empty. i.e. it doesn't contain a task to be processed) then
- returns true
- else
- returns false
!*/
{
return task_id == 0;
}
bool is_being_processed; // true when a thread is working on this task
uint64 task_id; // the id of this task. 0 means this task is empty
thread_id_type thread_id; // the id of the thread that requested this task
uint64 next_task_id;
long arg1;
long arg2;
member_function_pointer<>::kernel_1a_c mfp0;
member_function_pointer<long>::kernel_1a_c mfp1;
member_function_pointer<long,long>::kernel_1a_c mfp2;
};
array<task_state_type>::expand_1c_c tasks;
array<thread_id_type>::expand_1c_c worker_thread_ids;
mutex m;
signaler task_done_signaler;
signaler task_ready_signaler;
bool we_are_destructing;
// restricted functions
thread_pool(thread_pool&); // copy constructor
thread_pool& operator=(thread_pool&); // assignment operator
};
}
// ----------------------------------------------------------------------------------------
#ifdef NO_MAKEFILE
#include "thread_pool_extension.cpp"
#endif
#endif // DLIB_THREAD_POOl_H__
// Copyright (C) 2008 Davis E. King (davisking@users.sourceforge.net)
// License: Boost Software License See LICENSE.txt for the full license.
#undef DLIB_THREAD_POOl_ABSTRACT_H__
#ifdef DLIB_THREAD_POOl_ABSTRACT_H__
#include "threads_kernel_abstract.h"
#include "../uintn.h"
namespace dlib
{
// ----------------------------------------------------------------------------------------
class thread_pool
{
/*!
WHAT THIS OBJECT REPRESENTS
This object represents a group of threads which you can
submit tasks to and then wait for those tasks to be completed.
!*/
public:
explicit thread_pool (
unsigned long num_threads
);
/*!
ensures
- num_threads_in_pool() == num_threads
throws
- std::bad_alloc
- dlib::thread_error
the constructor may throw this exception if there is a problem
gathering resources to create threading objects.
!*/
~thread_pool(
);
/*!
ensures
- all resources allocated by *this have been freed.
!*/
unsigned long num_threads_in_pool (
) const;
/*!
ensures
- returns the number of threads contained in this thread pool. That is, returns
the maximum number of tasks that this object will process concurrently.
!*/
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)()
);
/*!
requires
- funct == a valid member function pointer for class T
ensures
- if (the thread calling this function is actually one of the threads in the
thread pool and there aren't any free threads available) then
- calls (obj.*funct)() within the calling thread and returns
when it finishes
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.funct)()
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long),
long arg1
);
/*!
requires
- funct == a valid member function pointer for class T
ensures
- if (the thread calling this function is actually one of the threads in the
thread pool and there aren't any free threads available) then
- calls (obj.*funct)(arg1) within the calling thread and returns
when it finishes
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.funct)(arg1)
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
template <typename T>
uint64 add_task (
T& obj,
void (T::*funct)(long,long),
long arg1,
long arg2
);
/*!
requires
- funct == a valid member function pointer for class T
ensures
- if (the thread calling this function is actually one of the threads in the
thread pool and there aren't any free threads available) then
- calls (obj.*funct)(arg1,arg2) within the calling thread and returns
when it finishes
- else
- the call to this function blocks until there is a free thread in the pool
to process this new task. Once a free thread is available the task
is handed off to that thread which then calls (obj.funct)(arg1,arg2)
- returns a task id that can be used by this->wait_for_task() to wait
for the submitted task to finish.
!*/
void wait_for_task (
uint64 task_id
) const;
/*!
ensures
- if (there is currently a task with the given id being executed in the thread pool) then
- the call to this function blocks until the task with the given id is complete
- else
- the call to this function returns immediately
!*/
void wait_for_all_tasks (
) const;
/*!
ensures
- the call to this function blocks until all tasks which were submitted
to the thread pool by the thread that is calling this function have
finished.
!*/
private:
// restricted functions
thread_pool(thread_pool&); // copy constructor
thread_pool& operator=(thread_pool&); // assignment operator
};
}
// ----------------------------------------------------------------------------------------
#endif // DLIB_THREAD_POOl_ABSTRACT_H__
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment