Commit b6ae2a6d authored by Davis King's avatar Davis King

Improved the thread_pool example program.

parent cf8e3d6f
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "dlib/threads.h" #include "dlib/threads.h"
#include "dlib/misc_api.h" // for dlib::sleep #include "dlib/misc_api.h" // for dlib::sleep
#include "dlib/logger.h" #include "dlib/logger.h"
#include <vector>
using namespace dlib; using namespace dlib;
...@@ -33,65 +34,81 @@ thread_pool tp(3); ...@@ -33,65 +34,81 @@ thread_pool tp(3);
class test class test
{ {
/* /*
The thread_pool accepts "tasks" from the user and schedules them The thread_pool accepts "tasks" from the user and schedules them for
for execution in one of its threads when one becomes available. Each execution in one of its threads when one becomes available. Each task
task is just a request to call a member function on a particular object is just a request to call a function. So here we create a class called
(or if you use futures you may make tasks that call global functions). test with a few member functions which we will have the thread pool call
So here we create a class called test with a few member functions which as tasks.
we will have the thread pool call as tasks.
*/ */
public: public:
void task_0() void task()
{ {
dlog << LINFO << "task_0 start"; dlog << LINFO << "task start";
// Here we ask the thread pool to call this->subtask() three different times future<int> var;
// with different arguments. Note that calls to add_task() will return
// immediately if there is an available thread to hand the task off to. However, var = 1;
// if there isn't a thread ready then add_task blocks until there is such a thread.
// Also note that since task_0() is executed within the thread pool (see main() below) // Here we ask the thread pool to call this->subtask() and this->subtask2().
// Note that calls to add_task() will return immediately if there is an
// available thread to hand the task off to. However, if there isn't a
// thread ready then add_task() blocks until there is such a thread.
// Also note that since task() is executed within the thread pool (see main() below)
// calls to add_task() will execute the requested task within the calling thread // calls to add_task() will execute the requested task within the calling thread
// in cases where the thread pool is full. This means it is safe to have // in cases where the thread pool is full. This means it is always safe to
// tasks running in the thread pool spawn sub tasks which is what we are doing here. // spawn subtasks from within another task, which is what we are doing here.
tp.add_task(*this,&test::subtask,1); // schedule call to this->subtask(1) tp.add_task(*this,&test::subtask,var); // schedule call to this->subtask(var)
tp.add_task(*this,&test::subtask,2); // schedule call to this->subtask(2) tp.add_task(*this,&test::subtask2); // schedule call to this->subtask2()
tp.add_task(*this,&test::subtask,3); // schedule call to this->subtask(3)
// Since var is a future, this line will wait for the test::subtask task to
// wait_for_all_tasks() is a function that blocks until all tasks // finish and before allowing us to access the contents of var. var will
// submitted to the thread pool by the thread calling wait_for_all_tasks() // return the integer it contains. In this case result will be assigned
// finish. So this call blocks until the 3 tasks above are done. // the value of 2 since var was incremented by subtask().
int result = var;
// print out the result
dlog << LINFO << "var = " << result;
// Wait for all the tasks we have started to finish. Note that
// wait_for_all_tasks() only waits for tasks which were started
// by the calling thread. So you don't have to worry about other
// unrelated parts of your application interfering. In this case
// it just waits for subtask2() to finish.
tp.wait_for_all_tasks(); tp.wait_for_all_tasks();
dlog << LINFO << "task_0 end" ; dlog << LINFO << "task end" ;
} }
void subtask(long a) void subtask(int& a)
{ {
dlib::sleep(200); dlib::sleep(200);
dlog << LINFO << "subtask end " << a; a = a + 1;
dlog << LINFO << "subtask end ";
} }
void task_1(long a, long b) void subtask2()
{ {
dlog << LINFO << "task_1 start: " << a << ", " << b; dlib::sleep(300);
dlib::sleep(700); dlog << LINFO << "subtask2 end ";
dlog << LINFO << "task_1 end: " << a << ", " << b;
} }
}; };
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
void add ( class add_value
long a,
long b,
long& result
)
{ {
dlib::sleep(400); public:
result = a + b; add_value(int value):val(value) { }
}
void operator()( int& a )
{
a += val;
}
private:
int val;
};
// ---------------------------------------------------------------------------------------- // ----------------------------------------------------------------------------------------
...@@ -100,56 +117,58 @@ int main() ...@@ -100,56 +117,58 @@ int main()
// tell the logger to print out everything // tell the logger to print out everything
dlog.set_level(LALL); dlog.set_level(LALL);
test a;
dlog << LINFO << "schedule a few tasks"; dlog << LINFO << "schedule a few tasks";
// schedule a call to a.task_1(10,11) test mytask;
tp.add_task(a, &test::task_1, 10, 11); // Schedule the thread pool to call mytask.task(). Note that all forms of add_task()
// pass in the task object by reference. This means you must make sure, in this case,
// schedule the thread pool to call a.task_0(). // that mytask isn't destructed until after the task has finished executing.
uint64 id = tp.add_task(a, &test::task_0); tp.add_task(mytask, &test::task);
// schedule a call to a.task_1(12,13) // You can also pass task objects to a thread pool by value. So in this case we don't
tp.add_task(a, &test::task_1, 12, 13); // have to worry about keeping our own instance of the task. Here we construct a temporary
// add_value object and pass it right in and everything works like it should.
future<int> num = 3;
tp.add_task_by_value(add_value(7), num); // adds 7 to num
int result = num.get();
dlog << LINFO << "result = " << result; // prints result = 10
dlog << LINFO << "wait for a.task_0() to finish";
// now wait for our a.task_0() task to finish. To do this we use the id
// returned by add_task to reference the task we want to wait for.
tp.wait_for_task(id);
dlog << LINFO << "a.task_0() finished, now start another task_1() call";
// schedule a call to a.task_1(14,15)
tp.add_task(a, &test::task_1, 14, 15);
dlog << LINFO << "wait for all tasks to finish";
// here we wait for all tasks which were requested by the main thread
// to complete.
tp.wait_for_all_tasks();
dlog << LINFO << "all tasks finished";
// uncomment this line if your compiler supports the new C++0x lambda functions
//#define COMPILER_SUPPORTS_CPP0X_LAMBDA_FUNCTIONS
#ifdef COMPILER_SUPPORTS_CPP0X_LAMBDA_FUNCTIONS
// The thread pool also allows you to use futures to pass arbitrary objects into the tasks. // In the above examples we had to explicitly create task objects which is
// For example: // inconvenient. If you have a compiler which supports C++0x lambda functions
future<long> n1, n2, result; // then you can use the following simpler method.
n1 = 3;
n2 = 4;
// add a task that is supposed to go call add(n1, n2, result);
tp.add_task(add, n1, n2, result);
// This line will wait for the task in the thread pool to finish and when it does
// result will return the integer it contains. In this case r will be assigned a value of 7.
long r = result;
// print out the result
dlog << LINFO << "result = " << r;
// We can also use futures with member functions like so: // make a task which will just log a message
tp.add_task(a, &test::task_1, n1, n2); tp.add_task_by_value([](){
dlog << LINFO << "A message from a lambda function running in another thread.";
});
// and we can still wait for tasks like so: // Here we make 10 different tasks, each assigns a different value into
// the elements of the vector vect.
std::vector<int> vect(10);
for (unsigned long i = 0; i < vect.size(); ++i)
{
// Make a lambda function which takes vect by reference and i by value. So what
// will happen is each assignment statement will run in a thread in the thread_pool.
tp.add_task_by_value([&vect,i](){
vect[i] = i;
});
}
// Wait for all tasks which were requested by the main thread to complete.
tp.wait_for_all_tasks(); tp.wait_for_all_tasks();
dlog << LINFO << "all tasks using futures finished"; for (unsigned long i = 0; i < vect.size(); ++i)
{
dlog << LINFO << "vect["<<i<<"]: " << vect[i];
}
#endif
...@@ -157,26 +176,24 @@ int main() ...@@ -157,26 +176,24 @@ int main()
the time the log message occurred and the value in [] is the thread id for the thread the time the log message occurred and the value in [] is the thread id for the thread
that generated the log message): that generated the log message):
0 INFO [0] main: schedule a few tasks 1 INFO [0] main: schedule a few tasks
0 INFO [1] main: task_1 start: 10, 11 1 INFO [1] main: task start
0 INFO [2] main: task_0 start 1 INFO [0] main: result = 10
200 INFO [2] main: subtask end 2 201 INFO [2] main: subtask end
200 INFO [3] main: subtask end 1 201 INFO [1] main: var = 2
200 INFO [3] main: task_1 start: 12, 13 201 INFO [2] main: A message from a lambda function running in another thread.
201 INFO [0] main: wait for a.task_0() to finish 301 INFO [3] main: subtask2 end
400 INFO [2] main: subtask end 3 301 INFO [1] main: task end
400 INFO [2] main: task_0 end 301 INFO [0] main: vect[0]: 0
400 INFO [0] main: a.task_0() finished, now start another task_1() call 301 INFO [0] main: vect[1]: 1
401 INFO [2] main: task_1 start: 14, 15 301 INFO [0] main: vect[2]: 2
401 INFO [0] main: wait for all tasks to finish 301 INFO [0] main: vect[3]: 3
700 INFO [1] main: task_1 end: 10, 11 301 INFO [0] main: vect[4]: 4
901 INFO [3] main: task_1 end: 12, 13 301 INFO [0] main: vect[5]: 5
1101 INFO [2] main: task_1 end: 14, 15 301 INFO [0] main: vect[6]: 6
1101 INFO [0] main: all tasks finished 301 INFO [0] main: vect[7]: 7
1503 INFO [0] main: result = 7 301 INFO [0] main: vect[8]: 8
1503 INFO [3] main: task_1 start: 3, 4 301 INFO [0] main: vect[9]: 9
2203 INFO [3] main: task_1 end: 3, 4
2203 INFO [0] main: all tasks using futures finished
*/ */
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment