Eclipse SUMO - Simulation of Urban MObility
FXWorkerThread.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2004-2019 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials
5 // are made available under the terms of the Eclipse Public License v2.0
6 // which accompanies this distribution, and is available at
7 // http://www.eclipse.org/legal/epl-v20.html
8 // SPDX-License-Identifier: EPL-2.0
9 /****************************************************************************/
14 // A thread class together with a pool and a task for parallelized computation
15 /****************************************************************************/
16 
17 #ifndef FXWorkerThread_h
18 #define FXWorkerThread_h
19 
20 // #define WORKLOAD_PROFILING
21 // at which interval report maximum workload of the threads, needs WORKLOAD_PROFILING
22 // undefine to use summary report only
23 #define WORKLOAD_INTERVAL 100
24 
25 // ===========================================================================
26 // included modules
27 // ===========================================================================
28 #include <config.h>
29 
30 #include <list>
31 #include <vector>
32 #include <fx.h>
33 #ifdef WORKLOAD_PROFILING
34 #include <chrono>
36 #include <utils/common/ToString.h>
37 #endif
39 
40 
41 // ===========================================================================
42 // class definitions
43 // ===========================================================================
48 class FXWorkerThread : public FXThread {
49 
50 public:
55  class Task {
56  public:
58  virtual ~Task() {};
59 
68  virtual void run(FXWorkerThread* context) = 0;
69 
76  void setIndex(const int newIndex) {
77  myIndex = newIndex;
78  }
79  private:
81  int myIndex;
82  };
83 
88  class Pool {
89  public:
96  Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myException(nullptr)
97 #ifdef WORKLOAD_PROFILING
98  , myNumBatches(0), myTotalMaxLoad(0.), myTotalSpread(0.)
99 #endif
100  {
101 #ifdef WORKLOAD_PROFILING
102  long long int timeDiff = 0;
103  for (int i = 0; i < 100; i++) {
104  const auto begin = std::chrono::high_resolution_clock::now();
105  const auto end = std::chrono::high_resolution_clock::now();
106  timeDiff += std::chrono::duration_cast<std::chrono::nanoseconds>(end - begin).count();
107  }
108  //std::cout << ("Average cost of a timing call (in ns): " + toString(timeDiff / 100.)) << std::endl;
109 #endif
110  while (numThreads > 0) {
111  new FXWorkerThread(*this);
112  numThreads--;
113  }
114  }
115 
120  virtual ~Pool() {
121  clear();
122  }
123 
126  void clear() {
127  for (FXWorkerThread* const worker : myWorkers) {
128  delete worker;
129  }
130  myWorkers.clear();
131  }
132 
137  void addWorker(FXWorkerThread* const w) {
138  myWorkers.push_back(w);
139  }
140 
147  void add(Task* const t, int index = -1) {
148  if (index < 0) {
149  index = myRunningIndex % myWorkers.size();
150  }
151 #ifdef WORKLOAD_PROFILING
152  if (myRunningIndex == 0) {
153  for (FXWorkerThread* const worker : myWorkers) {
154  worker->startProfile();
155  }
156  myProfileStart = std::chrono::high_resolution_clock::now();
157  }
158 #endif
159  t->setIndex(myRunningIndex++);
160  myWorkers[index]->add(t);
161  }
162 
169  void addFinished(std::list<Task*>& tasks) {
170  myMutex.lock();
171  myFinishedTasks.splice(myFinishedTasks.end(), tasks);
172  myCondition.signal();
173  myMutex.unlock();
174  }
175 
177  myMutex.lock();
178  if (myException == nullptr) {
179  myException = new ProcessError(e);
180  }
181  myMutex.unlock();
182  }
183 
185  void waitAll(const bool deleteFinished = true) {
186  myMutex.lock();
187  while ((int)myFinishedTasks.size() < myRunningIndex) {
188  myCondition.wait(myMutex);
189  }
190 #ifdef WORKLOAD_PROFILING
191  if (myRunningIndex > 0) {
192  const auto end = std::chrono::high_resolution_clock::now();
193  const long long int elapsed = std::chrono::duration_cast<std::chrono::microseconds>(end - myProfileStart).count();
194  double minLoad = std::numeric_limits<double>::max();
195  double maxLoad = 0.;
196  for (FXWorkerThread* const worker : myWorkers) {
197  const double load = worker->endProfile(elapsed);
198  minLoad = MIN2(minLoad, load);
199  maxLoad = MAX2(maxLoad, load);
200  }
201 #ifdef WORKLOAD_INTERVAL
202  myTotalMaxLoad += maxLoad;
203  myTotalSpread += maxLoad / minLoad;
204  myNumBatches++;
205  if (myNumBatches % WORKLOAD_INTERVAL == 0) {
206  WRITE_MESSAGE(toString(myFinishedTasks.size()) + " tasks, average maximum load: " + toString(myTotalMaxLoad / WORKLOAD_INTERVAL) + ", average spread: " + toString(myTotalSpread / WORKLOAD_INTERVAL));
207  myTotalMaxLoad = 0.;
208  myTotalSpread = 0.;
209  }
210 #endif
211  }
212 #endif
213  if (deleteFinished) {
214  for (Task* task : myFinishedTasks) {
215  delete task;
216  }
217  }
218  ProcessError* toRaise = myException;
219  myException = nullptr;
220  myFinishedTasks.clear();
221  myRunningIndex = 0;
222  myMutex.unlock();
223  if (toRaise != nullptr) {
224  throw* toRaise;
225  }
226  }
227 
235  bool isFull() const {
236  return myRunningIndex - (int)myFinishedTasks.size() >= size();
237  }
238 
243  int size() const {
244  return (int)myWorkers.size();
245  }
246 
248  void lock() {
249  myPoolMutex.lock();
250  }
251 
253  void unlock() {
254  myPoolMutex.unlock();
255  }
256 
257  const std::vector<FXWorkerThread*>& getWorkers() {
258  return myWorkers;
259  }
260  private:
262  std::vector<FXWorkerThread*> myWorkers;
264  FXMutex myMutex;
266  FXMutex myPoolMutex;
268  FXCondition myCondition;
270  std::list<Task*> myFinishedTasks;
275 #ifdef WORKLOAD_PROFILING
276  int myNumBatches;
279  double myTotalMaxLoad;
281  double myTotalSpread;
283  std::chrono::high_resolution_clock::time_point myProfileStart;
284 #endif
285  };
286 
287 public:
294  FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false)
295 #ifdef WORKLOAD_PROFILING
296  , myCounter(0), myBusyTime(0), myTotalBusyTime(0), myTotalTime(0)
297 #endif
298  {
299  pool.addWorker(this);
300  start();
301  }
302 
307  virtual ~FXWorkerThread() {
308  stop();
309 #ifdef WORKLOAD_PROFILING
310  const double load = 100. * myTotalBusyTime / myTotalTime;
311  WRITE_MESSAGE("Thread " + toString((long long int)this) + " ran " + toString(myCounter) +
312  " tasks and had a load of " + toString(load) + "% (" + toString(myTotalBusyTime) +
313  "us / " + toString(myTotalTime) + "us), " + toString(myTotalBusyTime / (double)myCounter) + " per task.");
314 #endif
315  }
316 
321  void add(Task* t) {
322  myMutex.lock();
323  myTasks.push_back(t);
324  myCondition.signal();
325  myMutex.unlock();
326  }
327 
334  FXint run() {
335  while (!myStopped) {
336  myMutex.lock();
337  while (!myStopped && myTasks.empty()) {
338  myCondition.wait(myMutex);
339  }
340  if (myStopped) {
341  myMutex.unlock();
342  break;
343  }
344  myCurrentTasks.splice(myCurrentTasks.end(), myTasks);
345  myMutex.unlock();
346  try {
347  for (Task* const t : myCurrentTasks) {
348 #ifdef WORKLOAD_PROFILING
349  const auto before = std::chrono::high_resolution_clock::now();
350 #endif
351  t->run(this);
352 #ifdef WORKLOAD_PROFILING
353  const auto after = std::chrono::high_resolution_clock::now();
354  myBusyTime += std::chrono::duration_cast<std::chrono::microseconds>(after - before).count();
355  myCounter++;
356 #endif
357  }
358  } catch (ProcessError& e) {
359  myPool.setException(e);
360  }
362  }
363  return 0;
364  }
365 
370  void stop() {
371  myMutex.lock();
372  myStopped = true;
373  myCondition.signal();
374  myMutex.unlock();
375  join();
376  }
377 
378 #ifdef WORKLOAD_PROFILING
379  void startProfile() {
380  myBusyTime = 0;
381  }
382 
383  double endProfile(const long long int time) {
384  myTotalTime += time;
385  myTotalBusyTime += myBusyTime;
386  return time == 0 ? 100. : 100. * myBusyTime / time;
387  }
388 #endif
389 
390 private:
394  FXMutex myMutex;
396  FXCondition myCondition;
398  std::list<Task*> myTasks;
400  std::list<Task*> myCurrentTasks;
402  bool myStopped;
403 #ifdef WORKLOAD_PROFILING
404  int myCounter;
407  long long int myBusyTime;
409  long long int myTotalBusyTime;
411  long long int myTotalTime;
412 #endif
413 };
414 
415 
416 #endif
FXWorkerThread::myCurrentTasks
std::list< Task * > myCurrentTasks
the list of tasks which are currently calculated
Definition: FXWorkerThread.h:400
FXWorkerThread::Pool::myRunningIndex
int myRunningIndex
the running index for the next task
Definition: FXWorkerThread.h:272
ToString.h
MIN2
T MIN2(T a, T b)
Definition: StdDefs.h:73
FXWorkerThread::Pool::myMutex
FXMutex myMutex
the internal mutex for the task list
Definition: FXWorkerThread.h:264
FXWorkerThread::Pool::myFinishedTasks
std::list< Task * > myFinishedTasks
list of finished tasks
Definition: FXWorkerThread.h:270
MsgHandler.h
FXWorkerThread::myPool
Pool & myPool
the pool for this thread
Definition: FXWorkerThread.h:392
FXWorkerThread::Pool::~Pool
virtual ~Pool()
Destructor.
Definition: FXWorkerThread.h:120
FXWorkerThread::stop
void stop()
Stops the thread.
Definition: FXWorkerThread.h:370
FXWorkerThread::Pool::size
int size() const
Returns the number of threads in the pool.
Definition: FXWorkerThread.h:243
MAX2
T MAX2(T a, T b)
Definition: StdDefs.h:79
FXWorkerThread::Pool::waitAll
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
Definition: FXWorkerThread.h:185
FXWorkerThread::Pool::addFinished
void addFinished(std::list< Task * > &tasks)
Adds the given tasks to the list of finished tasks.
Definition: FXWorkerThread.h:169
FXWorkerThread::Pool::isFull
bool isFull() const
Checks whether there are currently more pending tasks than threads.
Definition: FXWorkerThread.h:235
FXWorkerThread::Pool::addWorker
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
Definition: FXWorkerThread.h:137
FXWorkerThread::myStopped
bool myStopped
whether we are still running
Definition: FXWorkerThread.h:402
FXWorkerThread::run
FXint run()
Main execution method of this thread.
Definition: FXWorkerThread.h:334
FXWorkerThread::Pool::unlock
void unlock()
unlocks the pool mutex
Definition: FXWorkerThread.h:253
ProcessError
Definition: UtilExceptions.h:39
FXWorkerThread::myCondition
FXCondition myCondition
the semaphore when waiting for new tasks
Definition: FXWorkerThread.h:396
FXWorkerThread::Pool::getWorkers
const std::vector< FXWorkerThread * > & getWorkers()
Definition: FXWorkerThread.h:257
UtilExceptions.h
FXWorkerThread::Pool::lock
void lock()
locks the pool mutex
Definition: FXWorkerThread.h:248
FXWorkerThread::Pool::Pool
Pool(int numThreads=0)
Constructor.
Definition: FXWorkerThread.h:96
FXWorkerThread::~FXWorkerThread
virtual ~FXWorkerThread()
Destructor.
Definition: FXWorkerThread.h:307
FXWorkerThread::Task::run
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
FXWorkerThread::Task::myIndex
int myIndex
the index of the task, valid only after the task has been added to the pool
Definition: FXWorkerThread.h:81
FXWorkerThread::FXWorkerThread
FXWorkerThread(Pool &pool)
Constructor.
Definition: FXWorkerThread.h:294
toString
std::string toString(const T &t, std::streamsize accuracy=gPrecision)
Definition: ToString.h:47
FXWorkerThread::Task::~Task
virtual ~Task()
Desctructor.
Definition: FXWorkerThread.h:58
FXWorkerThread::myMutex
FXMutex myMutex
the mutex for the task list
Definition: FXWorkerThread.h:394
FXWorkerThread::Pool
A pool of worker threads which distributes the tasks and collects the results.
Definition: FXWorkerThread.h:88
FXWorkerThread::Pool::setException
void setException(ProcessError &e)
Definition: FXWorkerThread.h:176
WORKLOAD_INTERVAL
#define WORKLOAD_INTERVAL
Definition: FXWorkerThread.h:23
FXWorkerThread::Pool::add
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index....
Definition: FXWorkerThread.h:147
FXWorkerThread::Pool::myCondition
FXCondition myCondition
the semaphore to wait on for finishing all tasks
Definition: FXWorkerThread.h:268
FXWorkerThread::add
void add(Task *t)
Adds the given task to this thread to be calculated.
Definition: FXWorkerThread.h:321
config.h
FXWorkerThread::Pool::clear
void clear()
Stops and deletes all worker threads.
Definition: FXWorkerThread.h:126
FXWorkerThread::Pool::myWorkers
std::vector< FXWorkerThread * > myWorkers
the current worker threads
Definition: FXWorkerThread.h:262
FXWorkerThread::Task
Abstract superclass of a task to be run with an index to keep track of pending tasks.
Definition: FXWorkerThread.h:55
FXWorkerThread::Pool::myException
ProcessError * myException
the exception from a child thread
Definition: FXWorkerThread.h:274
FXWorkerThread::Pool::myPoolMutex
FXMutex myPoolMutex
the pool mutex for external sync
Definition: FXWorkerThread.h:266
FXWorkerThread::myTasks
std::list< Task * > myTasks
the list of pending tasks
Definition: FXWorkerThread.h:398
FXWorkerThread::Task::setIndex
void setIndex(const int newIndex)
Sets the running index of this task.
Definition: FXWorkerThread.h:76
WRITE_MESSAGE
#define WRITE_MESSAGE(msg)
Definition: MsgHandler.h:277
FXWorkerThread
A thread repeatingly calculating incoming tasks.
Definition: FXWorkerThread.h:48