mt4cpp
Scheduler.hpp
Go to the documentation of this file.
1 /**
2  * \file Scheduler.hpp
3  * \brief The active object scheduler module (Scheduler, CommandQueue and working thread main function)
4  */
5 
6 
7 #ifndef MT4CPP_SHEDULER_HPP
8 #define MT4CPP_SHEDULER_HPP
9 
10 #include <queue>
11 
12 #include <boost/noncopyable.hpp>
13 #include <boost/scoped_ptr.hpp>
14 #include <boost/thread.hpp>
15 #include <boost/bind.hpp>
16 
17 #include "Command.hpp"
18 
19 namespace mt4cpp {
20 
21  /* The command queue used by scheduler. Queue methods are thread-safe.*/
22  class CommandQueue : boost::noncopyable {
23  public:
24  /** constructor */
25  CommandQueue() : quitFlag_(false), currentCommandId_(0L) {}
26  ~CommandQueue() {}
27 
28  /** read the command to execute. Suspend the thread if the queue is empty.
29  Returns not empty optional if the queue is destroyed. */
30  PCommand read() {
31  boost::mutex::scoped_lock lock(m_);
32  while(queue_.empty()) {
33  cond.wait(lock); //wait if the empty queue
34 
35  if(quitFlag_) //if resumed because the quitFlag is set
36  return PCommand(); //empty command means finalize queue and finish thread
37  }
38  //here non-empty queue
39  PCommand f = queue_.front();
40  queue_.pop();
41  return f;
42  }
43 
44  /** write the command into queue, returns the new command ID */
45  CommandID write(PCommand cmd) {
46  //unique_lock<boost::mutex> lock(m_);
47  //to samo ale krocej zapisane
48  boost::mutex::scoped_lock lock(m_);
49 
50  queue_.push(cmd);
51  CommandID command_id = ++currentCommandId_;
52  cmd->setId(command_id);
53  cmd->setState(CommandDesc::QUEUED);
54  cond.notify_all();
55  return command_id;
56  }
57 
58  /** accessor */
59  bool getQuitFlag() const {
60  boost::mutex::scoped_lock lock(m_);
61  return quitFlag_;
62  }
63  /** mutator */
64  void setQuitFlag() {
65  boost::mutex::scoped_lock lock(m_);
66  quitFlag_ = true;
67  cond.notify_all(); //notify all waiting thread that the quit flag is set
68  }
69 
70  long generateCommandId() {
71  boost::mutex::scoped_lock lock(m_);
72  return ++currentCommandId_;
73  }
74  private:
75  std::queue<PCommand> queue_;
76  mutable boost::mutex m_;
77  /** the variable to notify if the queue is not empty */
78  boost::condition_variable cond;
79 
80  bool quitFlag_;
81 
82  /** the ID for the next command */
83  CommandID currentCommandId_;
84  };
85 
86  /**
87  * Sheduler, the execution queue with and thread pool.
88  * it executes commands (synchronically or asynchronically)
89  */
90  class Scheduler : boost::noncopyable
91  {
92  public:
93  Scheduler(int); //the constructor
94  ~Scheduler(); //the destructor
95  /**
96  Executes the command in one of threads from threads pool. Do not break execution of the calling thread.
97  \return CommandId of command
98  */
99  CommandID executeAsynchronously(PCommand cmd);
100 
101  /**
102  Executes the command in one of threads from threads pool.
103  Command is inserted into command queue, it waits till free working thread is found, then it is executed.
104  The calling thread waits (on condition variable) till the command is finished.
105  \return CommandId of command.
106  */
107  CommandID executeSynchronously(PCommand cmd);
108 
109  /** wait for ending threads in pool */
110  void join();
111 
112  /** signal to finish all threads after current command and wait for ending threads in pool */
113  void finishAndJoin();
114  private:
115  int num_;
116  /** the activation queue. Contains every commands executed asynchronically */
117  CommandQueue queue_;
118 
119  typedef std::list<boost::thread*> Threads;
120  Threads threads_; //pool of threads
121  mutable boost::mutex m_; //access to threads_
122 
123  Scheduler(const Scheduler&); //noncopyable
124  Scheduler& operator=(const Scheduler&); //noncopyable
125  };
126 } //namespace mt4cpp
127 
128 //-------------------------------------------------------------------------------------------
129 //
130 // implementation
131 //
132 //-------------------------------------------------------------------------------------------
133 
134 namespace mt4cpp {
135 
136  namespace {
137 
138  /** the function which run in thread in scheduler thread pools.
139  The main loops check if there is any command in queue, if so it executes one of them.
140  When quitFlag is set in queue finish the execution
141  */
142  void threadFunction(CommandQueue& queue) {
143  while(!queue.getQuitFlag()) { //check if the thread should finish execution
144  PCommand cmd = queue.read(); //reads command from queue,
145  //it can block the thread until the queue is not empty or until the queue is destroyed
146 
147  if(!cmd) break; //command object is null - finish the thread
148  cmd->execute();
149  }
150  }
151  } //namespace
152 
153  //constructor
154  inline Scheduler::Scheduler(int num_threads) : num_(num_threads) {
155 
156  for(int i = 0; i < num_; ++i) { //starts NUM_THREADS threads
157  boost::lock_guard<boost::mutex> guard(m_);
158  boost::thread* new_thread_ptr = new boost::thread(boost::bind(threadFunction, boost::ref(queue_)));
159  //std::auto_ptr<boost::thread> new_thread());
160  threads_.push_back(new_thread_ptr);
161  //new_thread.release();
162  }
163 
164  }
165 
166  //descructor
167  inline Scheduler::~Scheduler() {
168  finishAndJoin(); //signal to finish all threads and wait (join) fo them
169  boost::lock_guard<boost::mutex> guard(m_);
170  for(Threads::iterator it=threads_.begin(), end=threads_.end(); it!=end; ++it) {
171  delete *it;
172  }
173  }
174 
175  /**
176  asynchronically executes the command. Do not break execution of the calling thread.
177  */
178  inline CommandID Scheduler::executeAsynchronously(PCommand cmd) {
179  return queue_.write(cmd);
180  }
181 
182  namespace {
183 
184  struct FinishObserver : public CommandObserver {
185 
186  FinishObserver() :done(false) {}
187 
188  virtual void notifyProgress(const Command&, double) {}
189  virtual void notifyStep(const Command&) {}
190  virtual void notifyState(const Command&, CommandDesc::State s) {
191  if(s == CommandDesc::DONE || s == CommandDesc::INTERRUPTED || s == CommandDesc::EXCEPTION) {
192  boost::mutex::scoped_lock lock(mut);
193  done = true;
194  cond.notify_one();
195  }
196  }
197  boost::mutex mut;
198  boost::condition_variable cond;
199  bool done;
200  };
201 
202  }
203 
204  /**
205  asynchronically executes the command. Command is inserted into command queue,
206  it waits till free working thread is found, then it is executed.
207  The calling thread waits (on condition variable) untill the command is not finished.
208  */
209  inline CommandID Scheduler::executeSynchronously(PCommand cmd) {
210  FinishObserver* obs = new FinishObserver();
211  cmd->attach( PCommandObserver( obs ) );
212 
213  CommandID command_id = executeAsynchronously(cmd);
214 
215  boost::mutex::scoped_lock lock(obs->mut);
216  while( ! obs->done ) {
217  obs->cond.wait( lock );
218  }
219  return command_id;
220  }
221 
222 
223 
224  /** wait for ending threads in pool */
225  inline void Scheduler::join() {
226  boost::lock_guard<boost::mutex> guard(m_);
227  for(Threads::iterator it=threads_.begin(),end=threads_.end(); it!=end; ++it) {
228  if( (*it)->joinable() )
229  (*it)->join();
230  }
231  }
232 
233  /** signal to finish all threads after current command, and wait for endint the threads in pool */
234  inline void Scheduler::finishAndJoin() {
235  queue_.setQuitFlag();
236  join();
237  }
238 
239 
240 } //namespace mt4cpp
241 
242 #endif //MT4CPP_SHEDULER_HPP
243 
The command module (Command base class, progress, command description).
State
available states of Command NONE - command created, but not put in activation queue QUEUED - command ...
Definition: Command.hpp:37
Definition: Scheduler.hpp:90
Definition: Scheduler.hpp:22
CommandID executeSynchronously(PCommand cmd)
Definition: Scheduler.hpp:209
bool getQuitFlag() const
Definition: Scheduler.hpp:59
CommandID write(PCommand cmd)
Definition: Scheduler.hpp:45
CommandQueue()
Definition: Scheduler.hpp:25
CommandID executeAsynchronously(PCommand cmd)
Definition: Scheduler.hpp:178
void join()
Definition: Scheduler.hpp:225
Definition: Command.hpp:21
PCommand read()
Definition: Scheduler.hpp:30
void finishAndJoin()
Definition: Scheduler.hpp:234
Definition: Command.hpp:51
void setQuitFlag()
Definition: Scheduler.hpp:64