botsense  3.2.0
RoadNarrows Client-Server Proxied Services Framework
bsProxyThread.c
Go to the documentation of this file.
1 ////////////////////////////////////////////////////////////////////////////////
2 //
3 // Package: BotSense
4 //
5 // Program: bsProxy
6 //
7 // File: bsProxyThread.c
8 //
9 /*! \file
10  *
11  * $LastChangedDate: 2010-08-20 11:36:38 -0600 (Fri, 20 Aug 2010) $
12  * $Rev: 568 $
13  *
14  * \brief \h_botsense bsProxy service threads.
15  *
16  * A Sevice thread provides a separate execution context to service client
17  * request-response exchanges. There are two types of service threads:
18  * \termblock
19  * \term Server Thread
20  * \termdata 1 thread to service server-terminated requests.
21  * \endterm
22  * \term Device Thread
23  * \termdata N device threads, one attached to each URI unique, opened device.
24  * \endterm
25  * \endtermblock
26  *
27  * \author Robin Knight (robin.knight@roadnarrows.com)
28  *
29  * \copyright
30  * \h_copy 2007-2017. RoadNarrows LLC.\n
31  * http://www.roadnarrows.com\n
32  * All Rights Reserved
33  */
34 // Permission is hereby granted, without written agreement and without
35 // license or royalty fees, to use, copy, modify, and distribute this
36 // software and its documentation for any purpose, provided that
37 // (1) The above copyright notice and the following two paragraphs
38 // appear in all copies of the source code and (2) redistributions
39 // including binaries reproduces these notices in the supporting
40 // documentation. Substantial modifications to this software may be
41 // copyrighted by their authors and need not follow the licensing terms
42 // described here, provided that the new terms are clearly indicated in
43 // all files where they apply.
44 //
45 // IN NO EVENT SHALL THE AUTHOR, ROADNARROWS LLC, OR ANY MEMBERS/EMPLOYEES
46 // OF ROADNARROW LLC OR DISTRIBUTORS OF THIS SOFTWARE BE LIABLE TO ANY
47 // PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL
48 // DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
49 // EVEN IF THE AUTHORS OR ANY OF THE ABOVE PARTIES HAVE BEEN ADVISED OF
50 // THE POSSIBILITY OF SUCH DAMAGE.
51 //
52 // THE AUTHOR AND ROADNARROWS LLC SPECIFICALLY DISCLAIM ANY WARRANTIES,
53 // INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
54 // FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN
55 // "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO
56 // PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
57 //
58 ////////////////////////////////////////////////////////////////////////////////
59 
60 #include <stdio.h>
61 #include <stdlib.h>
62 #include <string.h>
63 #include <errno.h>
64 #include <pthread.h>
65 
66 #include "rnr/rnrconfig.h"
67 #include "rnr/log.h"
68 #include "rnr/new.h"
69 #include "rnr/hash.h"
70 #include "rnr/dlistvoid.h"
71 #include "rnr/path.h"
72 
73 #include "botsense/BotSense.h"
74 #include "botsense/libBotSense.h"
75 #include "botsense/bsProxyMsgs.h"
76 
77 #include "bsProxy.h"
78 
79 
80 // ---------------------------------------------------------------------------
81 // Private Interface
82 // ---------------------------------------------------------------------------
83 
84 #define BSPROXY_TH_HASH_MIN 8 ///< minimum hash table size
85 #define BSPROXY_TH_HASH_MAX 256 ///< maximum hash table size
86 #define BSPROXY_TH_MAX_QUEUE_SIZE 8 ///< request queue maximum depth
87 #define BSPROXY_TH_SERVER_URI "#SERVER" ///< server "URI"
88 
89 
90 static pthread_mutex_t BsThGlobalMutex; ///< bsProxy module operations mutex
91 static hash_t *BsThHashTbl; ///< proxied device hash table
92 
93 /*!
94  * \brief Log Proxy Server Thread Error.
95  *
96  * \param devuri Device URI string.
97  * \param ecode \h_botsense error code.
98  * \param efmt Error output format string literal.
99  * \param ... Error variable arguments.
100  */
101 #define BSPROXY_TH_LOG_ERROR(devuri, ecode, efmt, ...) \
102  LOGERROR("%s: Service Thread \"%s\": %s(ecode=%d): " efmt, \
103  ServerHasName(), devuri, \
104  bsStrError(ecode), (ecode>=0? ecode: -ecode), \
105  ##__VA_ARGS__)
106 
107 
108 //.............................................................................
109 // Service Thread Control Block Functions
110 //.............................................................................
111 
112 /*!
113  * \brief Queue callback to compare two queue entries.
114  *
115  * \param pData1 Pointer to queue entry 1.
116  * \param pData2 Pointer to queue entry 2.
117  *
118  * \return Returns \h_lt 0, 0, or \h_gt 0 if data 1 is less than, equal to, or
119  * greater than data2, respectively.
120  */
121 static int ThQReqCmp(const void *pData1, const void *pData2)
122 {
123  BsProxyThReq_T *pThReq1 = (BsProxyThReq_T *)pData1;
124  BsProxyThReq_T *pThReq2 = (BsProxyThReq_T *)pData2;
125  BsMsgUid_T uid1 = BSPROXY_MAKE_MSGUID(pThReq1->m_hndVConn,
126  pThReq1->m_uMsgId);
127  BsMsgUid_T uid2 = BSPROXY_MAKE_MSGUID(pThReq2->m_hndVConn,
128  pThReq2->m_uMsgId);
129 
130  return (int)uid1 - (int)uid2;
131 }
132 
133 /*!
134  * \brief Queue callback to delete queue entry.
135  *
136  * \param pData Pointer to queue entry.
137  */
138 static void ThQReqDelete(void *pData)
139 {
140  ThReqDelete((BsProxyThReq_T *)pData);
141 }
142 
143 /*!
144  * \brief Allocate new service thread control block.
145  *
146  * \param sDevUri Device URI.
147  *
148  * \return Pointer to allocated thread control block.
149  */
150 static BsProxyThCtl_T *ThCtlBlkNew(const char *sDevUri)
151 {
152  BsProxyThCtl_T *pThCtl = NEW(BsProxyThCtl_T);
153 
154  pThCtl->m_sDevUri = new_strdup(sDevUri);
155  pThCtl->m_uRefCnt = 0;
156  pThCtl->m_eState = BsProxyThStateUninit;
157  pThCtl->m_queue = DListVoidNew(ThQReqCmp, ThQReqDelete);
158 
159  // create mutex
160  pthread_mutex_init(&pThCtl->m_mutexSync, NULL);
161 
162  // create condition
163  pthread_cond_init(&pThCtl->m_condSync, NULL);
164 
165  return pThCtl;
166 }
167 
168 /*!
169  * \brief Delete service thread control block.
170  *
171  * \param pThCtl Pointer to thread control block.
172  */
173 static void ThCtlBlkDelete(BsProxyThCtl_T *pThCtl)
174 {
175  if( pThCtl != NULL )
176  {
177  // delete uri
178  delete((char *)pThCtl->m_sDevUri);
179 
180  // delete request queue
181  DListVoidDelete(pThCtl->m_queue);
182 
183  // destroy condition
184  pthread_cond_destroy(&pThCtl->m_condSync);
185 
186  // destroy mutex
187  pthread_mutex_destroy(&pThCtl->m_mutexSync);
188 
189  // delete self
190  delete(pThCtl);
191  }
192 }
193 
194 
195 //.............................................................................
196 // Mutual Exclusion Functions
197 //.............................................................................
198 
199 /*!
200  * \brief Lock global mutual exclusion.
201  */
202 static inline void ThGlobalLock()
203 {
204  int rc;
205 
206  if( (rc = pthread_mutex_lock(&BsThGlobalMutex)) != 0 )
207  {
208  errno = rc;
209  LOGSYSERROR("Thread global mutex: pthread_mutex_lock()");
210  }
211 }
212 
213 /*!
214  * \brief Unlock global mutual exclusion.
215  */
216 static inline void ThGlobalUnlock()
217 {
218  int rc;
219 
220  if( (rc = pthread_mutex_unlock(&BsThGlobalMutex)) != 0 )
221  {
222  errno = rc;
223  LOGSYSERROR("Thread global mutex: pthread_mutex_unlock()");
224  }
225 }
226 
227 /*!
228  * \brief Try to lock global mutual exclusion.
229  *
230  * \return
231  * Returns true if lock is acquired. Otherwise returns false if mutex already
232  * locked.
233  */
234 static inline bool_t ThGlobalTryLock()
235 {
236  return pthread_mutex_trylock(&BsThGlobalMutex) == 0? true: false;
237 }
238 
239 /*!
240  * \brief Lock service thread's mutual exclusion.
241  *
242  * \param pThCtl Service thread control.
243  */
244 static inline void ThSyncLock(BsProxyThCtl_T *pThCtl)
245 {
246  int rc;
247 
248  if( (rc = pthread_mutex_lock(&pThCtl->m_mutexSync)) != 0 )
249  {
250  errno = rc;
251  LOGSYSERROR("Service Thread \"%s\": pthread_mutex_lock()",
252  pThCtl->m_sDevUri);
253  }
254 }
255 
256 /*!
257  * \brief Unlock service thread's mutual exclusion.
258  *
259  * \param pThCtl Service thread control.
260  */
261 static inline void ThSyncUnlock(BsProxyThCtl_T *pThCtl)
262 {
263  int rc;
264 
265  if( (rc = pthread_mutex_unlock(&pThCtl->m_mutexSync)) != 0 )
266  {
267  errno = rc;
268  LOGSYSERROR("Service Thread \"%s\": pthread_mutex_unlock()",
269  pThCtl->m_sDevUri);
270  }
271 }
272 
273 /*!
274  * \brief Signal service thread's condition.
275  *
276  * \param pThCtl Service thread control.
277  */
278 static inline void ThSyncSignal(BsProxyThCtl_T *pThCtl)
279 {
280  int rc;
281 
282  if( (rc = pthread_cond_signal(&pThCtl->m_condSync)) != 0 )
283  {
284  errno = rc;
285  LOGSYSERROR("Service Thread \"%s\": pthread_cond_signal()",
286  pThCtl->m_sDevUri);
287  }
288 }
289 
290 /*!
291  * \brief Wait on service thread's condition.
292  *
293  * \param pThCtl Service thread control.
294  */
295 static inline void ThSyncWait(BsProxyThCtl_T *pThCtl)
296 {
297  int rc;
298 
299  if( (rc = pthread_cond_wait(&pThCtl->m_condSync, &pThCtl->m_mutexSync)) != 0 )
300  {
301  errno = rc;
302  LOGSYSERROR("Service Thread \"%s\": pthread_cond_wait()",
303  pThCtl->m_sDevUri);
304  }
305 }
306 
307 
308 //.............................................................................
309 // Proxied Device Hashing Functions
310 //.............................................................................
311 
312 /*!
313  * \brief Delete hash node data callback.
314  *
315  * Both the key and value are dynamically allocated.
316  *
317  * \param sDevUri Proxied device hash table key.
318  * \param pThCtl Service thread control.
319  */
320 static void ThHashDeleteData(void *sDevUri, void *pThCtl)
321 {
322  delete(sDevUri);
323  ThCtlBlkDelete(pThCtl);
324 }
325 
326 /*!
327  * \brief Add the module data to the i/f module hash table.
328  *
329  * \param sDevUri Allocated Proxied device hash table key.
330  * \param pThCtl Allocated service thread control.
331  *
332  * \copydoc doc_return_std
333  */
334 static int ThHashAdd(const char *sDevUri, BsProxyThCtl_T *pThCtl)
335 {
336  char *sKey = new_strdup(sDevUri);
337  int rc;
338 
339  // insert into hash table, automatically allocating hnode_t
340  if( !hash_insert(BsThHashTbl, sKey, pThCtl) )
341  {
342  rc = -BS_ECODE_NO_RSRC;
343  BSPROXY_TH_LOG_ERROR(sDevUri, rc, "hash_insert(%s,...) failed", sKey);
344  ThHashDeleteData(sKey, pThCtl);
345  return rc;
346  }
347 
348  return BS_OK;
349 }
350 
351 /*!
352  * \brief Remove and delete the module data from the i/f module hash table.
353  *
354  * \param pThCtl Allocated service thread control.
355  *
356  * \copydoc doc_return_std
357  */
358 static void ThHashDelete(BsProxyThCtl_T *pThCtl)
359 {
360  hnode_t *pNode;
361 
362  if( (pThCtl != NULL) &&
363  ((pNode = hash_lookup(BsThHashTbl, pThCtl->m_sDevUri)) != NULL) )
364  {
365  hash_node_delete(BsThHashTbl, pNode);
366  }
367 }
368 
369 /*!
370  * \brief Get the interface module's i/f from the module hash table.
371  *
372  * \param sDevUri Proxied device hash table key.
373  *
374  * \return
375  * On success, returns pointer to the associated thread control block.
376  * On failure, returns NULL.
377  */
378 static BsProxyThCtl_T *ThHashGetCtl(const char *sDevUri)
379 {
380  hnode_t *pNode;
381 
382  if( (pNode = hash_lookup(BsThHashTbl, sDevUri)) == NULL )
383  {
384  return NULL;
385  }
386  else
387  {
388  return (BsProxyThCtl_T *)hnode_get(pNode);
389  }
390 }
391 
392 
393 //.............................................................................
394 // Service Threads
395 //.............................................................................
396 
397 /*!
398  * \brief Command service thread to run.
399  *
400  * \param pThCtl Service thread control block.
401  *
402  * \par Execution Context:
403  * Calling thread.
404  */
405 static void ThSyncRun(BsProxyThCtl_T *pThCtl)
406 {
407  ThSyncLock(pThCtl);
409  ThSyncSignal(pThCtl);
410  ThSyncUnlock(pThCtl);
411 }
412 
413 /*!
414  * \brief Command service thread to exit.
415  *
416  * \par Execution Context:
417  * Calling thread.
418  *
419  * \param pThCtl Service thread control block.
420  */
421 static void ThSyncExit(BsProxyThCtl_T *pThCtl)
422 {
423  ThSyncLock(pThCtl);
424  pThCtl->m_eState = BsProxyThStateExit;
425  ThSyncSignal(pThCtl);
426  ThSyncUnlock(pThCtl);
427 }
428 
429 /*!
430  * \brief Wait indefinitely until state change.
431  *
432  * \par Execution Context:
433  * Service thread.
434  *
435  * \param pThCtl Service thread control block.
436  */
437 static void ThSyncWaitForRun(BsProxyThCtl_T *pThCtl)
438 {
439  ThSyncLock(pThCtl);
440  while( pThCtl->m_eState == BsProxyThStateInit )
441  {
442  ThSyncWait(pThCtl);
443  }
444  ThSyncUnlock(pThCtl);
445 }
446 
447 /*!
448  * \brief Device service thread request handler.
449  *
450  * \par Execution Context:
451  * Device service thread.
452  *
453  * \param hndClient \h_botsense client handle.
454  * \param hndVConn Virtual connection handle.
455  * \param uTid Request-response transaction id.
456  * \param uMsgId Request message id.
457  * \param [in] bufReq Packed request message body buffer.
458  * \param uReqLen Length of packed request (number of bytes).
459  *
460  * \copydoc doc_return_std
461  */
462 static int ThDevRequest(BsProxyClientHnd_T hndClient,
463  BsVConnHnd_T hndVConn,
464  BsTid_T uTid,
465  BsMsgId_T uMsgId,
466  byte_t bufReq[],
467  size_t uReqLen)
468 {
469  BsProxyVConn_T *pVConn;
470  int rc;
471 
472  //
473  // Acquire virtual connection.
474  //
475  if( (pVConn = VConnAcquire(hndVConn)) != NULL )
476  {
477  //
478  // Call the interface module' specific request handler.
479  // The module is responsible for sending a response back regardless if the
480  // request is a success or a failure.
481  //
482  rc = pVConn->m_pModIF->m_fnModRequest(hndVConn, uTid, uMsgId, bufReq,
483  uReqLen);
484 
485  // release virtual connection
486  VConnRelease(hndVConn);
487 
488  // error
489  if( rc < 0 )
490  {
491  BSPROXY_LOG_ERROR(hndClient, rc, "Request failed.");
492  }
493  }
494 
495  else
496  {
497  BSPROXY_SEND_ERROR_RSP(hndClient, hndVConn, uTid, BS_ECODE_NO_VCONN,
498  "VConn=%d", hndVConn);
499  rc = -BS_ECODE_NO_VCONN;
500  }
501 
502  return rc;
503 }
504 
505 /*!
506  * \brief The service thread.
507  *
508  * \param pThArg Thread argument.
509  *
510  * \return Returns NULL on thread exit.
511  */
512 static void *ThServiceThread(void *pThArg)
513 {
514  BsProxyThCtl_T *pThCtl = (BsProxyThCtl_T *)pThArg;
515  BsProxyThReq_T *pThReq;
516  int rc;
517 
518  LOGDIAG1("Service Thread \"%s\" created.", pThCtl->m_sDevUri);
519 
520  ThSyncWaitForRun(pThCtl);
521 
522  //
523  // Wait for any queue message or thread state change.
524  //
525  while( (pThReq = ThDequeue(pThCtl)) )
526  {
527  // exit thread
528  if( pThCtl->m_eState == BsProxyThStateExit )
529  {
530  break;
531  }
532  // nothing on queue
533  else if( pThReq == NULL )
534  {
536  "No request on queue.");
537  }
538  // service request then destroy
539  else
540  {
541  rc = pThCtl->m_fnRequest(pThReq->m_hndClient,
542  pThReq->m_hndVConn,
543  pThReq->m_uTid,
544  pThReq->m_uMsgId,
545  pThReq->m_bufReq,
546  pThReq->m_uReqLen);
547  ThReqDelete(pThReq);
548  }
549  }
550 
551  LOGDIAG1("Service Thread \"%s\" destroyed.", pThCtl->m_sDevUri);
552 
553  return NULL;
554 }
555 
556 
557 // ---------------------------------------------------------------------------
558 // Public Interface
559 // ---------------------------------------------------------------------------
560 
561 //.............................................................................
562 // Service Thread Request Functions
563 //.............................................................................
564 
565 /*!
566  * \brief Allocate and initalized a new service thread request.
567  *
568  * \param hndClient Client handle.
569  * \param hndVConn Virtual connection handle.
570  * \param uTid Request-Response transaction id.
571  * \param uMsgId Request message id.
572  * \param bufReq Allocated packed request message body.
573  * \param uReqLen Length of request in buffer (number of bytes).
574  *
575  * \return Pointer to allocated request.
576  */
578  BsVConnHnd_T hndVConn,
579  BsTid_T uTid,
580  BsMsgId_T uMsgId,
581  byte_t bufReq[],
582  size_t uReqLen)
583 {
584  BsProxyThReq_T *pThReq = NEW(BsProxyThReq_T);
585 
586  pThReq->m_hndClient = hndClient;
587  pThReq->m_hndVConn = hndVConn;
588  pThReq->m_uTid = uTid;
589  pThReq->m_uMsgId = uMsgId;
590  pThReq->m_bufReq = bufReq;
591  pThReq->m_uReqLen = uReqLen;
592 
593  return pThReq;
594 }
595 
596 /*!
597  * \brief Delete service thread request.
598  *
599  * \param pThReq Service thread request.
600  */
602 {
603  if( pThReq != NULL )
604  {
605  delete(pThReq->m_bufReq);
606  delete(pThReq);
607  }
608 }
609 
610 /*!
611  * \brief Queue a request for the given service thread.
612  *
613  * \par Execution Context:
614  * Calling thread.
615  *
616  * \param pThCtl Service thread control block.
617  * \param hndClient Client handle.
618  * \param hndVConn Virtual connection handle.
619  * \param uTid Request-Response transaction id.
620  * \param uMsgId Request message id.
621  * \param bufReq Allocated packed request message body.
622  * \param uReqLen Length of request in buffer (number of bytes).
623  *
624  * \copydoc doc_return_std
625  */
627  BsProxyClientHnd_T hndClient,
628  BsVConnHnd_T hndVConn,
629  BsTid_T uTid,
630  BsMsgId_T uMsgId,
631  byte_t bufReq[],
632  size_t uReqLen)
633 {
634  BsProxyThReq_T *pThReq;
635  int rc;
636 
637  // lock thread and queue
638  ThSyncLock(pThCtl);
639 
640  //
641  // Queue request on thread's message queue.
642  //
643  if( DListVoidCount(pThCtl->m_queue) < BSPROXY_TH_MAX_QUEUE_SIZE )
644  {
645  pThReq = ThReqNew(hndClient, hndVConn, uTid, uMsgId, bufReq, uReqLen);
646  DListVoidQueue(pThCtl->m_queue, pThReq);
647  ThSyncSignal(pThCtl);
648  LOGDIAG3("%s: Service Thread \"%s\": queue depth=%u.",
649  ServerHasName(), pThCtl->m_sDevUri, DListVoidCount(pThCtl->m_queue));
650  rc = BS_OK;
651  }
652  else
653  {
654  rc = -BS_ECODE_NO_RSRC;
655  BSPROXY_TH_LOG_ERROR(pThCtl->m_sDevUri, rc, "Queue overflow.");
656  }
657 
658  // unlock thread and queue
659  ThSyncUnlock(pThCtl);
660 
661  return rc;
662 }
663 
664 /*!
665  * \brief Dequeue a request for the given service thread.
666  *
667  * The service thread will block if no queued request is present.
668  *
669  * \par Execution Context:
670  * Service thread.
671  *
672  * \return On success, returns removed request at the front of the queue.\n
673  * On failure, NULL is returned.
674  */
676 {
677  BsProxyThReq_T *pThReq;
678 
679  // lock thread and queue
680  ThSyncLock(pThCtl);
681 
682  //
683  // Block waiting for queued request or state change event.
684  //
685  while( (pThCtl->m_eState == BsProxyThStateRunning) &&
686  (DListVoidCount(pThCtl->m_queue) == 0) )
687  {
688  // Release mutex, block on conditional variable, auto-lock mutex on return.
689  ThSyncWait(pThCtl);
690  }
691 
692  if( pThCtl->m_eState == BsProxyThStateRunning )
693  {
694  pThReq = (BsProxyThReq_T *)DListVoidDequeue(pThCtl->m_queue);
695  }
696  else
697  {
698  pThReq = NULL;
699  }
700 
701  // unlock thread and queue
702  ThSyncUnlock(pThCtl);
703 
704  return pThReq;
705 }
706 
707 /*!
708  * \brief The service thread one-time global initialization.
709  */
711 {
712  // create global mutex
713  pthread_mutex_init(&BsThGlobalMutex, NULL);
714 
715  // create thread hash table keyed by device uri
716  BsThHashTbl = hash_table_create(
717  true, // dynamic table sizing
718  (hashcount_t)BSPROXY_TH_HASH_MIN, // minimum size
719  (hashcount_t)BSPROXY_TH_HASH_MAX, // maximum size
720  NULL, // use default comparator function
721  NULL, // use default hashing function
722  ThHashDeleteData); // hash node data deletion function
723 
724  // turn off most asserts()
725  hash_set_self_verify(false);
726 }
727 
728 /*!
729  * \brief Convert the device name to a quasi Uniform Resource Id.
730  *
731  * \param sDevName Device path name.
732  *
733  * \return
734  * On success, an allocated, canonical device path name is returned.\n
735  * On failure, NULL is returned.
736  */
737 char *ThNewDevUri(const char *sDevName)
738 {
739  return NewSearchPathCanonicalized(sDevName);
740 }
741 
742 /*!
743  * \brief Create a device service thread.
744  *
745  * The actual thread is only created if it does not already exists. The internal
746  * reference count is incremented to keep track of the users of this thread.
747  *
748  * \par Execution Context:
749  * Calling thread.
750  *
751  * \param sDevUri Device URI.
752  *
753  * \return On success, returns the service thread control block.
754  * On failure, NULL is returned.
755  */
756 BsProxyThCtl_T *ThCreateDevThread(const char *sDevUri)
757 {
758  BsProxyThCtl_T *pThCtl;
759  int rc;
760 
761  // lock global mutex
762  ThGlobalLock();
763 
764  //
765  // Device thread already exists.
766  //
767  if( (pThCtl = ThHashGetCtl(sDevUri)) != NULL )
768  {
769  pThCtl->m_uRefCnt++;
770  LOGDIAG1("Service Thread \"%s\" attached (refcnt=%u).",
771  pThCtl->m_sDevUri, pThCtl->m_uRefCnt);
772  }
773 
774  //
775  // Create new device thread.
776  //
777  else
778  {
779  //
780  // Create a new thread control block.
781  //
782  pThCtl = ThCtlBlkNew(sDevUri);
783 
784  pThCtl->m_fnRequest = ThDevRequest; // device request handler
785  pThCtl->m_uRefCnt = 1; // thread reference count
786  pThCtl->m_eState = BsProxyThStateInit; // control is (mostly) initialized
787 
788  //
789  // Add thread control to thread hash table
790  //
791  if( (rc = ThHashAdd(sDevUri, pThCtl)) < 0 )
792  {
793  LOGERROR("\"%s\": Failed to add thread control to hash table.", sDevUri);
794  ThCtlBlkDelete(pThCtl);
795  pThCtl = NULL;
796  }
797 
798  //
799  // Start the device service thread. The thread will block until the run
800  // state is set this context.
801  //
802  else if( pthread_create(&pThCtl->m_thread, NULL, ThServiceThread,
803  (void *)pThCtl) )
804  {
805  LOGSYSERROR("pthread_create(\"%s\")", sDevUri);
806  ThHashDelete(pThCtl);
807  pThCtl = NULL;
808  }
809 
810  //
811  // Signal the thread to run.
812  //
813  else
814  {
815  ThSyncRun(pThCtl);
816  }
817  }
818 
819  // unlock thread mutex
820  ThGlobalUnlock();
821 
822  return pThCtl;
823 }
824 
825 /*!
826  * \brief Create the special server service thread.
827  *
828  * There is only one server service thread per server. The server thread processes
829  * all server terminated requests.
830  *
831  * \par Execution Context:
832  * Calling thread.
833  *
834  * \return On success, returns the service thread control block.
835  * On failure, NULL is returned.
836  */
838 {
839  char *sDevUri;
840  BsProxyThCtl_T *pThCtl;
841  int rc;
842 
843  // Server URI
844  sDevUri = new_strdup(BSPROXY_TH_SERVER_URI);
845 
846  // lock global mutex
847  ThGlobalLock();
848 
849  //
850  // There should only be one server thread.
851  //
852  if( (pThCtl = ThHashGetCtl(sDevUri)) != NULL )
853  {
855  "Server service thread already exists.");
856  }
857 
858  //
859  // Create new device thread.
860  //
861  else
862  {
863  //
864  // Create a new thread control block.
865  //
866  pThCtl = ThCtlBlkNew(sDevUri);
867 
868  pThCtl->m_fnRequest = ServerRequest; // server request handler
869  pThCtl->m_uRefCnt = 1; // thread reference count
870  pThCtl->m_eState = BsProxyThStateInit; // control is (mostly) initialized
871 
872  //
873  // Add thread control to thread hash table
874  //
875  if( (rc = ThHashAdd(sDevUri, pThCtl)) < 0 )
876  {
877  LOGERROR("\"%s\": Failed to add thread control to hash table.", sDevUri);
878  ThCtlBlkDelete(pThCtl);
879  pThCtl = NULL;
880  }
881 
882  //
883  // Start the device service thread. The thread will block until the run
884  // state is set this context.
885  //
886  else if( pthread_create(&pThCtl->m_thread, NULL, ThServiceThread,
887  (void *)pThCtl) )
888  {
889  LOGSYSERROR("pthread_create(\"%s\")", sDevUri);
890  ThHashDelete(pThCtl);
891  pThCtl = NULL;
892  }
893 
894  //
895  // Signal the thread to run.
896  //
897  else
898  {
899  ThSyncRun(pThCtl);
900  }
901  }
902 
903  // unlock global mutex
904  ThGlobalUnlock();
905 
906  return pThCtl;
907 }
908 
909 /*!
910  * \brief Destroy service thread.
911  *
912  * The thread's reference count is decremented. If the count is zero, then the
913  * thread is actually destroyed with all queued requests deleted. The calling
914  * thread is suspended until the target service thread terminates.
915  *
916  * \par Execution Context:
917  * Calling thread.
918  *
919  * \param pThCtl Service thread control block.
920  */
922 {
923  if( pThCtl == NULL )
924  {
925  return;
926  }
927 
928  // lock global mutex
929  ThGlobalLock();
930 
931  // do not destroy the thread since there exists users of this thread
932  if( pThCtl->m_uRefCnt > 0 )
933  {
934  pThCtl->m_uRefCnt--;
935  }
936 
937  // thread is not being used anymore - destroy
938  if( pThCtl->m_uRefCnt == 0 )
939  {
940  // signal thread of exit state change
941  ThSyncExit(pThCtl);
942 
943  // wait for thread to terminate
944  pthread_join(pThCtl->m_thread, NULL);
945 
946  // delete the thread hash entry's control block (including its queue)
947  ThHashDelete(pThCtl);
948  }
949 
950  else
951  {
952  LOGDIAG1("Service Thread \"%s\" detached (refcnt=%u).",
953  pThCtl->m_sDevUri, pThCtl->m_uRefCnt);
954  }
955 
956  // unlock global mutex
957  ThGlobalUnlock();
958 }
uint_t BsMsgId_T
client message id type [0-64k].
Definition: BotSense.h:188
static void ThSyncUnlock(BsProxyThCtl_T *pThCtl)
Unlock service thread&#39;s mutual exclusion.
static void ThSyncExit(BsProxyThCtl_T *pThCtl)
Command service thread to exit.
static void ThGlobalLock()
Lock global mutual exclusion.
<b><i>BotSense</i></b> bsProxy IP server declarations.
static BsProxyThCtl_T * ThCtlBlkNew(const char *sDevUri)
Allocate new service thread control block.
BotSense client application - bsProxy server-terminated core messages.
uint_t m_uRefCnt
vconn reference count for this thread
Definition: bsProxy.h:151
static pthread_mutex_t BsThGlobalMutex
bsProxy module operations mutex
Definition: bsProxyThread.c:90
void VConnRelease(BsVConnHnd_T hndVConn)
Release the locked virtual client.
Definition: bsProxyVConn.c:585
static void ThCtlBlkDelete(BsProxyThCtl_T *pThCtl)
Delete service thread control block.
uint_t BsMsgUid_T
client message unique id
Definition: BotSense.h:245
byte_t * m_bufReq
packed request message buffer
Definition: bsProxy.h:239
uint_t BsTid_T
client transaction id type [0-255].
Definition: BotSense.h:172
thread is exiting
Definition: bsProxy.h:142
static void ThSyncWait(BsProxyThCtl_T *pThCtl)
Wait on service thread&#39;s condition.
static void ThSyncRun(BsProxyThCtl_T *pThCtl)
Command service thread to run.
#define BSPROXY_TH_MAX_QUEUE_SIZE
request queue maximum depth
Definition: bsProxyThread.c:86
static int ThQReqCmp(const void *pData1, const void *pData2)
Queue callback to compare two queue entries.
BsProxyThReq_T * ThDequeue(BsProxyThCtl_T *pThCtl)
Dequeue a request for the given service thread.
static void ThHashDeleteData(void *sDevUri, void *pThCtl)
Delete hash node data callback.
static bool_t ThGlobalTryLock()
Try to lock global mutual exclusion.
void ThOneTimeInit()
The service thread one-time global initialization.
#define BSPROXY_TH_LOG_ERROR(devuri, ecode, efmt,...)
Log Proxy Server Thread Error.
INLINE_IN_H const char * ServerHasName()
Get the <b><i>BotSense</i></b> server&#39;s official name.
Definition: bsProxy.h:485
#define BS_OK
not an error, success
Definition: BotSense.h:66
pthread_t m_thread
the service thread
Definition: bsProxy.h:156
char * ThNewDevUri(const char *sDevName)
Convert the device name to a quasi Uniform Resource Id.
static void ThSyncLock(BsProxyThCtl_T *pThCtl)
Lock service thread&#39;s mutual exclusion.
static int ThDevRequest(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Device service thread request handler.
int BsProxyClientHnd_T
bsProxy server client handle
Definition: bsProxy.h:114
size_t m_uReqLen
request buffer length
Definition: bsProxy.h:240
BsProxyThCtl_T * ThCreateServerThread()
Create the special server service thread.
#define BSPROXY_MAKE_MSGUID(hndVConn, msgid)
Make a MSGUID.
Definition: BotSense.h:218
static int ThHashAdd(const char *sDevUri, BsProxyThCtl_T *pThCtl)
Add the module data to the i/f module hash table.
BsProxyClientHnd_T m_hndClient
proxied client handle
Definition: bsProxy.h:235
#define BS_ECODE_NO_EXEC
cannot execute
Definition: BotSense.h:88
void ThReqDelete(BsProxyThReq_T *pThReq)
Delete service thread request.
pthread_mutex_t m_mutexSync
synchronization mutex used by condition
Definition: bsProxy.h:153
<b><i>BotSense</i></b> client library declarations.
pthread_cond_t m_condSync
synchronization condition
Definition: bsProxy.h:154
BsProxyVConn_T * VConnAcquire(BsVConnHnd_T hndVConn)
Acquire virtual connection, locking it from other threads.
Definition: bsProxyVConn.c:552
#define BSPROXY_TH_HASH_MAX
maximum hash table size
Definition: bsProxyThread.c:85
#define BSPROXY_SEND_ERROR_RSP(hndClient, hndVConn, uTid, ecode, efmt,...)
Log <b><i>BotSense</i></b> Error and Send Error Response.
Definition: bsProxy.h:330
static void ThQReqDelete(void *pData)
Queue callback to delete queue entry.
DListVoid_T * m_queue
thread request queue
Definition: bsProxy.h:155
BsProxyThReq_T * ThReqNew(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Allocate and initalized a new service thread request.
#define BS_ECODE_NO_RSRC
no resource available
Definition: BotSense.h:85
thread is not fully initialized
Definition: bsProxy.h:139
#define BSPROXY_LOG_ERROR(hndClient, ecode, efmt,...)
Log Proxy Server Error.
Definition: bsProxy.h:288
BsVConnHnd_T m_hndVConn
virtual connection handle
Definition: bsProxy.h:236
thread is running (nominal state)
Definition: bsProxy.h:141
thread is initialized
Definition: bsProxy.h:140
static hash_t * BsThHashTbl
proxied device hash table
Definition: bsProxyThread.c:91
static void * ThServiceThread(void *pThArg)
The service thread.
#define BSPROXY_TH_HASH_MIN
minimum hash table size
Definition: bsProxyThread.c:84
int(* m_fnRequest)(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Service thread request handler.
Definition: bsProxy.h:170
static void ThHashDelete(BsProxyThCtl_T *pThCtl)
Remove and delete the module data from the i/f module hash table.
void ThDestroyThread(BsProxyThCtl_T *pThCtl)
Destroy service thread.
static void ThGlobalUnlock()
Unlock global mutual exclusion.
#define BS_ECODE_INTERNAL
internal error (bug)
Definition: BotSense.h:93
static BsProxyThCtl_T * ThHashGetCtl(const char *sDevUri)
Get the interface module&#39;s i/f from the module hash table.
#define BS_ECODE_NO_VCONN
virtual connection not found
Definition: BotSense.h:80
BsProxyThState_T m_eState
thread state
Definition: bsProxy.h:152
int ThQueue(BsProxyThCtl_T *pThCtl, BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Queue a request for the given service thread.
BsProxyThCtl_T * ThCreateDevThread(const char *sDevUri)
Create a device service thread.
BsMsgId_T m_uMsgId
message id
Definition: bsProxy.h:238
#define BSPROXY_TH_SERVER_URI
server "URI"
Definition: bsProxyThread.c:87
static void ThSyncSignal(BsProxyThCtl_T *pThCtl)
Signal service thread&#39;s condition.
const char * m_sDevUri
proxied device URI or server
Definition: bsProxy.h:150
BsProxyModIF_T * m_pModIF
interface module I/F
Definition: bsProxy.h:226
static void ThSyncWaitForRun(BsProxyThCtl_T *pThCtl)
Wait indefinitely until state change.
BsTid_T m_uTid
request-response transaction id
Definition: bsProxy.h:237
<b><i>BotSense</i></b> package top-level, unifying header declarations.
int BsVConnHnd_T
virtual connection handle type
Definition: BotSense.h:151
int ServerRequest(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Server service thread request handler.
BsModRequestFunc_P m_fnModRequest
module request
Definition: bsProxy.h:129