#ifndef _ThreadPool_H #define _ThreadPool_H // // Copyright (c) 2002 by Ted T. Yuan. // // Permission is granted to use this code without restriction as long as this copyright notice appears in all source files. // /* // usage: class DerivedRunnable { void operator()() { // do what you want to do // then delete self, which is on heap delete this; } }; example { RunPool pool(poolSize); ThreadPool thpool(pool, poolSize > 256 ? 256 : poolSize); boost::thread thrd(thpool); while(true) { // create an object of the derived class (DerivedRunnable) from Runnable that implements // virtual void operator()() // for real application logic DerivedRunnable drun = new DerivedRunnable(...); thpool.execute(drun); } thrd.join(); } */ #define HAS_PUTTABLETAKABLE #include #ifndef SPACE_YIN #define SPACE_YIN yin #endif namespace SPACE_YIN { // _Runnable is boost::function0 or its derivation, or any class that implements void operator()() #define RunnablePtr _Runnable * template < typename _Runnable > struct RunPool : SPACE_YIN::Pool { RunPool(size_t limit = (size_t)-1) : SPACE_YIN::Pool(limit) {} }; template < typename _Runnable > struct Runner; #define RunnerBase SPACE_YIN::Consumer > template < typename _Runnable > struct Runner : public RunnerBase { typedef SPACE_YIN::Takable > TakableRunPool; Runner(TakableRunPool& runners, SPACE_YIN::Latch& lh) : RunnerBase(runners, lh) {} Runner(RunPool<_Runnable>& runners, SPACE_YIN::Latch& lh) : RunnerBase(runners, lh) {} protected: void consume(RunnablePtr runner) { try { runner->operator()(); } catch (...) { Logger::log("ThreadPool::consume exception"); } } bool cancel() { return !this->channel_.channel_.size(); } void starting() { } void started() { } void done() { } }; #define ThreadPoolBase SPACE_YIN::Consuming, Runner<_Runnable> > template < typename _Runnable > struct ThreadPool : public ThreadPoolBase { bool bStoppableThreadPool; // in most server pool cases bStoppableThreadPool is default to false, meaning it runs forever... // however, in a finite pre-defined task queue case, one would set the flag to make sure proper consumer stop ThreadPool(RunPool<_Runnable>& channel, size_t nThreads = 1, bool bCanStop = false) : ThreadPoolBase (channel, nThreads, true, true), bStoppableThreadPool(bCanStop) {} void consumerModelCreated(Runner<_Runnable>& consumer) { consumer.mayStop(bStoppableThreadPool); //false); // see above comment } void execute(_Runnable*& runObj) { this->channel_.offer(runObj); } }; } #endif