66 #include "rnr/rnrconfig.h" 70 #include "rnr/dlistvoid.h" 84 #define BSPROXY_TH_HASH_MIN 8 85 #define BSPROXY_TH_HASH_MAX 256
86 #define BSPROXY_TH_MAX_QUEUE_SIZE 8
87 #define BSPROXY_TH_SERVER_URI "#SERVER"
101 #define BSPROXY_TH_LOG_ERROR(devuri, ecode, efmt, ...) \ 102 LOGERROR("%s: Service Thread \"%s\": %s(ecode=%d): " efmt, \ 103 ServerHasName(), devuri, \ 104 bsStrError(ecode), (ecode>=0? ecode: -ecode), \ 121 static int ThQReqCmp(
const void *pData1,
const void *pData2)
130 return (
int)uid1 - (int)uid2;
181 DListVoidDelete(pThCtl->
m_queue);
209 LOGSYSERROR(
"Thread global mutex: pthread_mutex_lock()");
223 LOGSYSERROR(
"Thread global mutex: pthread_mutex_unlock()");
248 if( (rc = pthread_mutex_lock(&pThCtl->
m_mutexSync)) != 0 )
251 LOGSYSERROR(
"Service Thread \"%s\": pthread_mutex_lock()",
265 if( (rc = pthread_mutex_unlock(&pThCtl->
m_mutexSync)) != 0 )
268 LOGSYSERROR(
"Service Thread \"%s\": pthread_mutex_unlock()",
282 if( (rc = pthread_cond_signal(&pThCtl->
m_condSync)) != 0 )
285 LOGSYSERROR(
"Service Thread \"%s\": pthread_cond_signal()",
302 LOGSYSERROR(
"Service Thread \"%s\": pthread_cond_wait()",
336 char *sKey = new_strdup(sDevUri);
362 if( (pThCtl != NULL) &&
382 if( (pNode = hash_lookup(
BsThHashTbl, sDevUri)) == NULL )
498 "VConn=%d", hndVConn);
518 LOGDIAG1(
"Service Thread \"%s\" created.", pThCtl->
m_sDevUri);
533 else if( pThReq == NULL )
536 "No request on queue.");
551 LOGDIAG1(
"Service Thread \"%s\" destroyed.", pThCtl->
m_sDevUri);
645 pThReq =
ThReqNew(hndClient, hndVConn, uTid, uMsgId, bufReq, uReqLen);
646 DListVoidQueue(pThCtl->
m_queue, pThReq);
648 LOGDIAG3(
"%s: Service Thread \"%s\": queue depth=%u.",
686 (DListVoidCount(pThCtl->
m_queue) == 0) )
725 hash_set_self_verify(
false);
739 return NewSearchPathCanonicalized(sDevName);
770 LOGDIAG1(
"Service Thread \"%s\" attached (refcnt=%u).",
791 if( (rc =
ThHashAdd(sDevUri, pThCtl)) < 0 )
793 LOGERROR(
"\"%s\": Failed to add thread control to hash table.", sDevUri);
805 LOGSYSERROR(
"pthread_create(\"%s\")", sDevUri);
855 "Server service thread already exists.");
875 if( (rc =
ThHashAdd(sDevUri, pThCtl)) < 0 )
877 LOGERROR(
"\"%s\": Failed to add thread control to hash table.", sDevUri);
889 LOGSYSERROR(
"pthread_create(\"%s\")", sDevUri);
944 pthread_join(pThCtl->
m_thread, NULL);
952 LOGDIAG1(
"Service Thread \"%s\" detached (refcnt=%u).",
uint_t BsMsgId_T
client message id type [0-64k].
static void ThSyncUnlock(BsProxyThCtl_T *pThCtl)
Unlock service thread's mutual exclusion.
static void ThSyncExit(BsProxyThCtl_T *pThCtl)
Command service thread to exit.
static void ThGlobalLock()
Lock global mutual exclusion.
<b><i>BotSense</i></b> bsProxy IP server declarations.
static BsProxyThCtl_T * ThCtlBlkNew(const char *sDevUri)
Allocate new service thread control block.
BotSense client application - bsProxy server-terminated core messages.
uint_t m_uRefCnt
vconn reference count for this thread
static pthread_mutex_t BsThGlobalMutex
bsProxy module operations mutex
void VConnRelease(BsVConnHnd_T hndVConn)
Release the locked virtual client.
static void ThCtlBlkDelete(BsProxyThCtl_T *pThCtl)
Delete service thread control block.
uint_t BsMsgUid_T
client message unique id
byte_t * m_bufReq
packed request message buffer
uint_t BsTid_T
client transaction id type [0-255].
static void ThSyncWait(BsProxyThCtl_T *pThCtl)
Wait on service thread's condition.
static void ThSyncRun(BsProxyThCtl_T *pThCtl)
Command service thread to run.
#define BSPROXY_TH_MAX_QUEUE_SIZE
request queue maximum depth
static int ThQReqCmp(const void *pData1, const void *pData2)
Queue callback to compare two queue entries.
BsProxyThReq_T * ThDequeue(BsProxyThCtl_T *pThCtl)
Dequeue a request for the given service thread.
static void ThHashDeleteData(void *sDevUri, void *pThCtl)
Delete hash node data callback.
static bool_t ThGlobalTryLock()
Try to lock global mutual exclusion.
void ThOneTimeInit()
The service thread one-time global initialization.
#define BSPROXY_TH_LOG_ERROR(devuri, ecode, efmt,...)
Log Proxy Server Thread Error.
INLINE_IN_H const char * ServerHasName()
Get the <b><i>BotSense</i></b> server's official name.
#define BS_OK
not an error, success
pthread_t m_thread
the service thread
char * ThNewDevUri(const char *sDevName)
Convert the device name to a quasi Uniform Resource Id.
static void ThSyncLock(BsProxyThCtl_T *pThCtl)
Lock service thread's mutual exclusion.
static int ThDevRequest(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Device service thread request handler.
int BsProxyClientHnd_T
bsProxy server client handle
size_t m_uReqLen
request buffer length
BsProxyThCtl_T * ThCreateServerThread()
Create the special server service thread.
#define BSPROXY_MAKE_MSGUID(hndVConn, msgid)
Make a MSGUID.
static int ThHashAdd(const char *sDevUri, BsProxyThCtl_T *pThCtl)
Add the module data to the i/f module hash table.
BsProxyClientHnd_T m_hndClient
proxied client handle
#define BS_ECODE_NO_EXEC
cannot execute
void ThReqDelete(BsProxyThReq_T *pThReq)
Delete service thread request.
pthread_mutex_t m_mutexSync
synchronization mutex used by condition
<b><i>BotSense</i></b> client library declarations.
pthread_cond_t m_condSync
synchronization condition
BsProxyVConn_T * VConnAcquire(BsVConnHnd_T hndVConn)
Acquire virtual connection, locking it from other threads.
#define BSPROXY_TH_HASH_MAX
maximum hash table size
#define BSPROXY_SEND_ERROR_RSP(hndClient, hndVConn, uTid, ecode, efmt,...)
Log <b><i>BotSense</i></b> Error and Send Error Response.
static void ThQReqDelete(void *pData)
Queue callback to delete queue entry.
DListVoid_T * m_queue
thread request queue
BsProxyThReq_T * ThReqNew(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Allocate and initalized a new service thread request.
#define BS_ECODE_NO_RSRC
no resource available
thread is not fully initialized
#define BSPROXY_LOG_ERROR(hndClient, ecode, efmt,...)
Log Proxy Server Error.
BsVConnHnd_T m_hndVConn
virtual connection handle
thread is running (nominal state)
static hash_t * BsThHashTbl
proxied device hash table
static void * ThServiceThread(void *pThArg)
The service thread.
#define BSPROXY_TH_HASH_MIN
minimum hash table size
int(* m_fnRequest)(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Service thread request handler.
static void ThHashDelete(BsProxyThCtl_T *pThCtl)
Remove and delete the module data from the i/f module hash table.
void ThDestroyThread(BsProxyThCtl_T *pThCtl)
Destroy service thread.
static void ThGlobalUnlock()
Unlock global mutual exclusion.
#define BS_ECODE_INTERNAL
internal error (bug)
static BsProxyThCtl_T * ThHashGetCtl(const char *sDevUri)
Get the interface module's i/f from the module hash table.
#define BS_ECODE_NO_VCONN
virtual connection not found
BsProxyThState_T m_eState
thread state
int ThQueue(BsProxyThCtl_T *pThCtl, BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Queue a request for the given service thread.
BsProxyThCtl_T * ThCreateDevThread(const char *sDevUri)
Create a device service thread.
BsMsgId_T m_uMsgId
message id
#define BSPROXY_TH_SERVER_URI
server "URI"
static void ThSyncSignal(BsProxyThCtl_T *pThCtl)
Signal service thread's condition.
const char * m_sDevUri
proxied device URI or server
BsProxyModIF_T * m_pModIF
interface module I/F
static void ThSyncWaitForRun(BsProxyThCtl_T *pThCtl)
Wait indefinitely until state change.
BsTid_T m_uTid
request-response transaction id
<b><i>BotSense</i></b> package top-level, unifying header declarations.
int BsVConnHnd_T
virtual connection handle type
int ServerRequest(BsProxyClientHnd_T hndClient, BsVConnHnd_T hndVConn, BsTid_T uTid, BsMsgId_T uMsgId, byte_t bufReq[], size_t uReqLen)
Server service thread request handler.
BsModRequestFunc_P m_fnModRequest
module request