appkit  1.5.1
RoadNarrows Robotics Application Kit
Thread.cxx
Go to the documentation of this file.
1 ////////////////////////////////////////////////////////////////////////////////
2 //
3 // Package: RoadNarrows Robotics Application Tool Kit
4 //
5 // Library: librnr_appkit
6 //
7 // File: Thread.cxx
8 //
9 /*! \file
10  *
11  * $LastChangedDate: 2015-11-09 17:38:34 -0700 (Mon, 09 Nov 2015) $
12  * $Rev: 4195 $
13  *
14  * \brief Thread base class implementation.
15  *
16  * \par Copyright
17  * \h_copy 2015-2017. RoadNarrows LLC.\n
18  * http://www.roadnarrows.com\n
19  * All Rights Reserved
20  */
21 /*
22  * @EulaBegin@
23  * @EulaEnd@
24  */
25 ////////////////////////////////////////////////////////////////////////////////
26 
27 #include <sys/time.h>
28 #include <time.h>
29 #include <limits.h>
30 #include <unistd.h>
31 #include <stdio.h>
32 #include <pthread.h>
33 #include <sched.h>
34 #include <math.h>
35 
36 #include "rnr/rnrconfig.h"
37 #include "rnr/log.h"
38 
39 #include "rnr/appkit/Time.h"
40 #include "rnr/appkit/Thread.h"
41 
42 using namespace std;
43 using namespace rnr;
44 using namespace rnr::chronos;
45 
46 
47 // ---------------------------------------------------------------------------
48 // Local
49 // ---------------------------------------------------------------------------
50 
51 /*!
52  * \brief Print out timespec value to stderr.
53  *
54  * \param what What timespec.
55  * \param tx Timespec.
56  */
57 static void prts(string what, const struct timespec &ts)
58 {
59  fprintf(stderr, "%s = %ld.%09ld\n", what.c_str(), ts.tv_sec, ts.tv_nsec);
60 }
61 
62 
63 // ---------------------------------------------------------------------------
64 // Thread Class
65 // ---------------------------------------------------------------------------
66 
67 const double Thread::ThreadMinHz = 0.001; ///< once every 1000 seconds
68 
69 Thread::Thread(const string &strThreadName) :
70  m_strThreadName(strThreadName)
71 {
74 
75  pthread_mutex_init(&m_mutexSync, NULL);
76  pthread_cond_init(&m_condSync, NULL);
77 }
78 
80 {
82 
83  pthread_cond_destroy(&m_condSync);
84  pthread_mutex_destroy(&m_mutexSync);
85 }
86 
87 int Thread::createThread(int nPriority)
88 {
89  pthread_attr_t attr;
90  int rc;
91 
92  //
93  // Create thread with default attributes.
94  //
95  if( nPriority == ThreadPriorityDft )
96  {
97  rc = pthread_create(&m_thread, NULL, Thread::thread, (void *)this);
98  }
99 
100  //
101  // Create thread with a real-time priority.
102  //
103  else
104  {
105  setPriority(nPriority, attr);
106  rc = pthread_create(&m_thread, &attr, Thread::thread, (void *)this);
107  }
108 
109  // failure to create
110  if( rc != 0 )
111  {
112  LOGSYSERROR("pthread_create()");
114  return RC_ERROR;
115  }
116 
117  //
118  // Wait for thread to initialize.
119  //
120  lock();
121 
122  while( m_eState == ThreadStateUninit )
123  {
124  pthread_cond_wait(&m_condSync, &m_mutexSync);
125  }
126 
127  unlock();
128 
129  return OK;
130 }
131 
132 void Thread::setPriority(int nPriority, pthread_attr_t &attr)
133 {
134  struct sched_param parm;
135  int prioMin, prioMax;
136  double f;
137 
138  // Initialize thread attributes.
139  pthread_attr_init(&attr);
140 
141  //
142  // Set thread scheduling policy.
143  // SCHED_FIFO:
144  // + preemptive priority scheduling
145  // + highest priority thread runs until it choses to block/sleep
146  // + when it unblock/wakes it goes to the end of the queue for its priority
147  //
148  pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
149 
150  //
151  // Get the system's minimum and maximum scheduling priorities for the given
152  // scheduling policy.
153  //
154  prioMin = sched_get_priority_min(SCHED_FIFO);
155  prioMax = sched_get_priority_max(SCHED_FIFO);
156 
157  //
158  // Map target priority between system min, max.
159  //
160  if( nPriority < ThreadPriorityMin )
161  {
162  nPriority = ThreadPriorityMin;
163  }
164  else if( nPriority > ThreadPriorityMax )
165  {
166  nPriority = ThreadPriorityMax;
167  }
168 
169  f = (double)nPriority/(double)(ThreadPriorityMax - ThreadPriorityMin + 1);
170 
171  nPriority = (int)(f * (double)(prioMax - prioMin + 1) + 0.5);
172 
173  if( nPriority < prioMin )
174  {
175  nPriority = prioMin;
176  }
177  else if( nPriority > prioMax )
178  {
179  nPriority = prioMax;
180  }
181 
182  //
183  // Set the priority for this thread.
184  //
185  parm.sched_priority = m_nPriority;
186  pthread_attr_setschedparam(&attr, &parm);
187 }
188 
189 int Thread::runThread(double fHz)
190 {
191  int rc; // return code
192 
193  setHz(fHz);
194 
195  switch( m_eState )
196  {
197  case ThreadStateReady:
199  rc = OK;
200  break;
201 
202  case ThreadStateStart:
203  case ThreadStateRunning:
204  break;
205 
206  case ThreadStateExit:
207  default:
208  rc = RC_ERROR;
209  LOGERROR("%s thread in invalid state %d to run.",
210  m_strThreadName.c_str(), m_eState);
211  break;
212  }
213 
214  return rc;
215 }
216 
218 {
219  if( (m_eState == ThreadStateReady) ||
220  (m_eState == ThreadStateStart) ||
222  {
224  pthread_join(m_thread, NULL);
225  }
226  return OK;
227 }
228 
229 void Thread::setHz(const double fHz)
230 {
231  lock();
232 
233  m_fHz = fHz;
234 
235  if( m_fHz < ThreadMinHz )
236  {
237  m_fHz = ThreadMinHz;
238  }
239 
240  m_fTExec = 1.0 / m_fHz;
241 
243 
244  m_nSlipErrCnt = 0;
245 
246  //fprintf(stderr, "hz=%.2lf, m_fTExec=%lf\n", m_fHz, m_fTExec);
247  //fprintf(stderr, "ExecPeriod=%lf\n", m_tExecPeriod.t());
248 
249  unlock();
250 }
251 
253 {
254  m_eState = eNewState;
255  pthread_cond_signal(&m_condSync);
256 }
257 
258 void Thread::timedWait(const struct timespec &tsTimeout)
259 {
260  lock();
261 
262  pthread_cond_timedwait(&m_condSync, &m_mutexSync, &tsTimeout);
263 
264  unlock();
265 }
266 
268 {
269  lock();
270 
271  while( m_eState == ThreadStateReady )
272  {
273  pthread_cond_wait(&m_condSync, &m_mutexSync);
274  }
275 
276  unlock();
277 }
278 
280 {
281  Time tNow; // now
282  Time tSlip; // any slippage
283 
284  // mark now
285  tNow.markNow();
286 
287  //
288  // Next scheduled execution cycle is in the future. Block wait for the next
289  // cycle to begin.
290  //
291  if( tNow < m_tSched )
292  {
293  timedWait(m_tSched.ts()); // block
294  tNow.markNow(); // new now
295  m_tSched = tNow + m_tExecPeriod; // next scheduled time
296 
297  // leak
298  if( m_nSlipErrCnt > 0 )
299  {
300  --m_nSlipErrCnt;
301  }
302  }
303 
304  //
305  // Scheduled execution cycle is now.
306  //
307  else if( tNow == m_tSched )
308  {
309  m_tSched = tNow + m_tExecPeriod; // next scheduled time
310 
311  if( m_nSlipErrCnt > 0 )
312  {
313  --m_nSlipErrCnt;
314  }
315  }
316 
317  //
318  // Scheduled execution cycle slipped.
319  //
320  else
321  {
322  tSlip = tNow - m_tSched;
323 
324  //
325  // Slipped by less than two execution cycles. Try to make up the time.
326  //
327  if( tSlip < m_tExecPeriod )
328  {
329  m_tSched = tNow + m_tExecPeriod - tSlip; // next scheduled time
330  }
331 
332  //
333  // Slipped a bunch.
334  //
335  else
336  {
337  m_tSched = tNow + m_tExecPeriod;
338 
339  if( m_nSlipErrCnt < 1000 )
340  {
341  ++m_nSlipErrCnt;
342  }
343 
344  // log moderated slippage
345  if( m_nSlipErrCnt < 5 )
346  {
347  LOGWARN("%s thread: "
348  "Execution slipped by %ld.%09ld seconds.",
349  m_strThreadName.c_str(), tSlip.ts().tv_sec, tSlip.ts().tv_nsec);
350  }
351  }
352  }
353 }
354 
356 {
357 }
358 
360 {
361 }
362 
364 {
365  Time tDelta;
366 
368 
369  //fprintf(stderr, "delta=%lf, this=%lf, last=%lf\n",
370  // tDelta.t(), m_tExecThisTimeStamp.t(), m_tExecLastTimeStamp.t());
371 
372  printf("Thread %s: [%ld.%09ld] delta seconds.\n",
373  m_strThreadName.c_str(), tDelta.ts().tv_sec, tDelta.ts().tv_nsec);
374 
375  fflush(stdout);
376 }
377 
379 {
380 }
381 
382 void *Thread::thread(void *pArg)
383 {
384  Thread *pThis = (Thread *)pArg;
385  int oldstate;
386  int rc;
387 
388  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
389  pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldstate);
390 
391  // Change state and signal calling thread that created this thread.
393 
394  // derived thread specific Ready state transition function
395  pThis->transToReady();
396 
397  LOGDIAG3("%s thread created at priority %d - ready to run.",
398  pThis->m_strThreadName.c_str(), pThis->m_nPriority);
399 
400  //
401  // Loop forever until exit
402  //
403  while( (pThis->m_eState != ThreadStateExit) )
404  {
405  switch( pThis->m_eState )
406  {
407  //
408  // Blocked on Ready "queue"
409  //
410  case ThreadStateReady:
411  pThis->readyBlock();
412  break;
413 
414  //
415  // Start running thread to execute task.
416  //
417  case ThreadStateStart:
418  LOGDIAG3("%s thread started - running at %.3lf Hz.",
419  pThis->m_strThreadName.c_str(), pThis->m_fHz);
420 
421  // force immediately execution of first task
422  pThis->m_tSched.markNow();
423 
424  pThis->m_tExecThisTimeStamp = pThis->m_tSched;
425  pThis->m_tExecLastTimeStamp = pThis->m_tSched;
426 
428 
429  // derived thread specific Running state transition function
430  pThis->transToRunning();
431  break;
432 
433  //
434  // Run wait,execute subcycle
435  //
436  case ThreadStateRunning:
437  pThis->schedBlock();
438  if( pThis->m_eState == ThreadStateRunning )
439  {
441  pThis->m_tExecThisTimeStamp.markNow();
442 
443  pThis->lock();
444  pThis->exec();
445  pThis->unlock();
446  }
447  break;
448 
449  //
450  // Exit
451  //
452  case ThreadStateExit:
453  break;
454 
455  default:
456  LOGERROR("%d: Unexpected thread state.", pThis->m_eState);
457  pThis->m_eState = ThreadStateExit;
458  break;
459  }
460  }
461 
462  pThis->transToExit();
463 
464  LOGDIAG3("%s thread terminated.", pThis->m_strThreadName.c_str());
465 
466  return NULL;
467 }
int m_nPriority
thread OS scheduling priority
Definition: Thread.h:186
thread created and blocked, ready to start
Definition: Thread.h:79
std::string m_strThreadName
thread identifying name
Definition: Thread.h:179
static const double ThreadMinHz
minimum Hertz rate
Definition: Thread.h:71
double m_fHz
thread cycle run rate (Hertz)
Definition: Thread.h:187
void lock()
Lock the I2C bus.
Definition: Thread.h:203
virtual void transToExit()
Any to Exit state transition function.
Definition: Thread.cxx:378
static const int ThreadPriorityDft
default thread attributes
Definition: Thread.h:67
ThreadState
Kinematics thread states.
Definition: Thread.h:76
timespec ts()
Get this object&#39;s value in timespec format.
Definition: Time.h:244
Time functions and class interfaces.
double m_fTExec
task execution cycle period (sec)
Definition: Thread.h:188
Chronos - God of Time.
Definition: Time.h:63
Time class.
Definition: Time.h:192
virtual int runThread(const double fHz)
Run the thread.
Definition: Thread.cxx:189
virtual void readyBlock()
Block indefinitely while in the ready state.
Definition: Thread.cxx:267
chronos::Time m_tExecLastTimeStamp
start of last execution time stamp
Definition: Thread.h:191
void setPriority(int nPriority, pthread_attr_t &attr)
Set real-time priority attributes of of the thread to be created.
Definition: Thread.cxx:132
chronos::Time m_tExecPeriod
task execution period (converted)
Definition: Thread.h:189
pthread_t m_thread
pthread identifier
Definition: Thread.h:183
virtual void setHz(const double fHz)
Calculate thread new full cycle run rate.
Definition: Thread.cxx:229
thread exiting/exited
Definition: Thread.h:82
Thread base class interface.
pthread_mutex_t m_mutexSync
synchonization mutex
Definition: Thread.h:181
virtual void transToReady()
Uninitialized to Ready state transition function.
Definition: Thread.cxx:355
virtual void exec()
Execute task(s) within scheduled [sub]cycle.
Definition: Thread.cxx:363
virtual int createThread(int nPriority)
Create the thread.
Definition: Thread.cxx:87
double markNow()
Mark the current time, indentified by CLOCK_REALTIME, since the last Epoch.
Definition: Time.cxx:215
static void * thread(void *pArg)
The thread.
Definition: Thread.cxx:382
void unlock()
Unlock the I2C bus.
Definition: Thread.h:217
pthread_cond_t m_condSync
synchonization condition
Definition: Thread.h:182
int m_nSlipErrCnt
slipped error count leaky bucket
Definition: Thread.h:193
void timedWait(const struct timespec &tsTimeout)
Timed wait until state change or time out.
Definition: Thread.cxx:258
thread created but unitialized
Definition: Thread.h:78
virtual void transToRunning()
Ready to Running state transition function.
Definition: Thread.cxx:359
static void prts(string what, const struct timespec &ts)
Print out timespec value to stderr.
Definition: Thread.cxx:57
chronos::Time m_tExecThisTimeStamp
start of this execution time stamp
Definition: Thread.h:192
virtual int terminateThread()
Terminate the thread.
Definition: Thread.cxx:217
static const int ThreadPriorityMax
maximum scheduling priority
Definition: Thread.h:69
static const int ThreadPriorityMin
minimum scheduling priority
Definition: Thread.h:68
chronos::Time m_tSched
working scheduler time
Definition: Thread.h:190
virtual ~Thread()
Destructor.
Definition: Thread.cxx:79
RoadNarrows Robotics.
Definition: Camera.h:74
virtual void schedBlock()
Block the thread until the next subcycle task is to be run.
Definition: Thread.cxx:279
void changeState(ThreadState eNewState)
Change the thread state.
Definition: Thread.cxx:252
ThreadState m_eState
thread state
Definition: Thread.h:180