botsense  3.2.0
RoadNarrows Client-Server Proxied Services Framework
bsUnitTest.py
Go to the documentation of this file.
1 #!/usr/bin/python
2 ###############################################################################
3 #
4 # File: bsUnitTest.py
5 #
6 
7 """
8 BotSense Unit Test Script
9 """
10 
11 ## \file
12 ##
13 ## $LastChangedDate: 2010-09-13 10:25:05 -0600 (Mon, 13 Sep 2010) $
14 ## $Rev: 581 $
15 ##
16 ## \brief BotSense Unit Tester
17 ##
18 ## \todo Re-write to fit into BotSense 3.0
19 ##
20 ## \sa
21 ## \htmlonly
22 ## <a href="../pydoc/bsUnitTest.html">PyDoc Generated Documentation</a>
23 ## \endhtmlonly
24 ##
25 ## \author Robin Knight (robin.knight@roadnarrows.com)
26 ##
27 ## \copyright
28 ## \h_copy 2007-2017. RoadNarrows LLC.\n
29 ## http://www.roadnarrows.com\n
30 ## All Rights Reserved
31 ##
32 
33 # Permission is hereby granted, without written agreement and without
34 # license or royalty fees, to use, copy, modify, and distribute this
35 # software and its documentation for any purpose, provided that
36 # (1) The above copyright notice and the following two paragraphs
37 # appear in all copies of the source code and (2) redistributions
38 # including binaries reproduces these notices in the supporting
39 # documentation. Substantial modifications to this software may be
40 # copyrighted by their authors and need not follow the licensing terms
41 # described here, provided that the new terms are clearly indicated in
42 # all files where they apply.
43 #
44 # IN NO EVENT SHALL THE AUTHOR, ROADNARROWS LLC, OR ANY MEMBERS/EMPLOYEES
45 # OF ROADNARROW LLC OR DISTRIBUTORS OF THIS SOFTWARE BE LIABLE TO ANY
46 # PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL
47 # DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION,
48 # EVEN IF THE AUTHORS OR ANY OF THE ABOVE PARTIES HAVE BEEN ADVISED OF
49 # THE POSSIBILITY OF SUCH DAMAGE.
50 #
51 # THE AUTHOR AND ROADNARROWS LLC SPECIFICALLY DISCLAIM ANY WARRANTIES,
52 # INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
53 # FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN
54 # "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO
55 # PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
56 #
57 ###############################################################################
58 
59 
60 import sys
61 import time
62 
63 try:
64  import readline # optional module
65 except ImportError:
66  pass
67 
68 import BotSense.BotSenseTypes as bsTypes
69 
70 #--
71 def CmdBPFoot(args):
72  cmdName = 'bpfoot'
73  msgId = BsProxyMsgIdDevCmd
74  if len(args) < 2:
75  print >>sys.stderr, "error: %s missing argument(s)" % cmdName
76  return
77  whichfoot = args[0]
78  footcmd = args[1]
79  if whichfoot not in ['bpfoot_left', 'bpfoot_right']:
80  print >>sys.stderr, "error: %s %s: whichfoot foot?" % (cmdName, args[0])
81  return
82  handle = BsClientHandles.get(whichfoot)
83  if handle is None:
84  print >>sys.stderr, "error: %s: proxied device not found" % whichfoot
85  return
86  if footcmd == 'getids':
87  devCmdId = BsProxyBPFootCmdIdGetIds
88  cmd = BsPacker.PackMsg(msgId,
89  [ {'fval':handle, 'ftype':'u8'},
90  {'fval':devCmdId, 'ftype':'u8'},
91  ])
92  sendCmd(BsClientSock, cmd)
93  rsp = recvRsp(BsClientSock)
94  if rsp:
95  vals = BsPacker.UnpackMsg(rsp, msgId,
96  [ {'fname':'devId', 'ftype':'u16'},
97  {'fname':'version', 'ftype':'u8'}
98  ])
99  if vals['_rc'] == 'ok':
100  print "%s %s device_id=0x%04x version=0x%02x" % \
101  (whichfoot, cmdName, vals['devId'], vals['version'])
102  else:
103  print "error:", vals['_error_msg']
104  elif footcmd == 'getraw':
105  devCmdId = BsProxyBPFootCmdIdGetRaw
106  cmd = BsPacker.PackMsg(msgId,
107  [ {'fval':handle, 'ftype':'u8'},
108  {'fval':devCmdId, 'ftype':'u8'},
109  ])
110  sendCmd(BsClientSock, cmd)
111  rsp = recvRsp(BsClientSock)
112  if rsp:
113  vals = BsPacker.UnpackMsg(rsp, msgId,
114  [{'fname':'press_raw', 'ftype':'u16', 'fcount':8}])
115  if vals['_rc'] == 'ok':
116  print "%s pressure_raw" % (whichfoot),
117  for press in vals['press_raw']:
118  print press,
119  print
120  else:
121  print "error:", vals['_error_msg']
122  elif footcmd == 'getcooked':
123  devCmdId = BsProxyBPFootCmdIdGetCooked
124  cmd = BsPacker.PackMsg(msgId,
125  [ {'fval':handle, 'ftype':'u8'},
126  {'fval':devCmdId, 'ftype':'u8'},
127  ])
128  sendCmd(BsClientSock, cmd)
129  rsp = recvRsp(BsClientSock)
130  if rsp:
131  vals = BsPacker.UnpackMsg(rsp, msgId,
132  [{'fname':'press_cal', 'ftype':'u16', 'fcount':8}])
133  if vals['_rc'] == 'ok':
134  print "%s pressure_cal" % (whichfoot),
135  for press in vals['press_cal']:
136  print press,
137  print
138  else:
139  print "error:", vals['_error_msg']
140  else:
141  print >>sys.stderr, "error: %s: unknown device command" % args[1]
142 
143 #--
144 def CmdDevOpen(args):
145  cmdName = 'devadd'
146  msgId = BsProxyMsgIdDevOpen
147  if len(args) < 2:
148  print >>sys.stderr, "error: %s: missing argument(s)" % cmdName
149  return
150  devTypeName = args[0]
151  if devTypeName == 'i2c':
152  devType = BsProxyDevTypeI2C
153  elif devTypeName == 'bpfoot_left' or devType == 'bpfoot_right':
154  devType = BsProxyDevTypeBPFoot
155  elif devTypeName == 'bpimu':
156  devType = BsProxyDevTypeBPImu
157  else:
158  print >>sys.stderr, "error: 0x%02x: unknown type" % devTypeName
159  return
160  i2cAddr = str2int(args[1])
161  if i2cAddr is None:
162  return
163  if len(args) > 2:
164  devName = args[2]
165  else:
166  devName = '/dev/null'
167  cmd = BsPacker.PackMsg(msgId,
168  [ {'fval':devType, 'ftype':'u8'},
169  {'fval':i2cAddr, 'ftype':'u8'},
170  {'fval':devName, 'ftype':'zstr'}
171  ])
172  sendCmd(BsClientSock, cmd)
173  rsp = recvRsp(BsClientSock)
174  if rsp:
175  vals = BsPacker.UnpackMsg(rsp, msgId,
176  [{'fname':'handle', 'ftype':'u8'}],
177  BsPacker.GetLastTid())
178  if vals['_rc'] == 'ok':
179  print "proxied device %s's handle is %d" % (devTypeName, vals['handle'])
180  BsClientHandles[devTypeName] = vals['handle']
181  else:
182  print "error:", vals['_error_msg']
183 
184 #--
185 def CmdDevClose(args):
186  cmdName = 'devdel'
187  msgId = BsProxyMsgIdDevClose
188  if len(args) < 1:
189  print >>sys.stderr, "error: %s missing argument(s)" % cmdName
190  return None
191  handle = str2int(args[0])
192  if handle is None:
193  return
194  cmd = BsPacker.PackMsg(msgId, [{'fval':handle, 'ftype':'u8'}])
195  sendCmd(BsClientSock, cmd)
196  rsp = recvRsp(BsClientSock)
197  if rsp:
198  vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'handle', 'ftype':'u8'}])
199  if vals['_rc'] == 'ok':
200  for k,v in BsClientHandles.iteritems():
201  if v == handle:
202  del BsClientHandles[k]
203  break
204  else:
205  print "error:", vals['_error_msg']
206 
207 #--
208 def CmdOpen(args):
209  cmdName = 'open'
210  if len(args) > 0:
211  addr = args[0]
212  else:
213  addr = '127.0.0.1'
214  if len(args) > 1:
215  port = str2int(args[1])
216  else:
217  port = 9195
218  if port is None:
219  return
220  rc = BsClientSock.connect_ex((addr, port))
221  if rc != 0:
222  print "Failed to connect, rc=%d (%s)" % \
223  (rc, errno.errorcode.get(rc, 'UNKNOWN'))
224  return
225  BsClientHandles.clear()
226  print "Connected to bsproxy @ %s:%d" % (addr, port)
227 
228 #--
229 def CmdClose():
230  global BsClientSock
231  BsClientSock.close()
232  BsClientHandles.clear()
233 
234 #--
235 def CmdLoopback(args):
236  cmdName = 'loopback'
237  msgId = BsProxyMsgIdLoopback
238  if len(args) > 0:
239  max = str2int(args[0])
240  else:
241  max = None
242  if len(args) > 1:
243  s = args[1]
244  else:
245  s = 'from ut.py'
246  cnt = 0
247  while True:
248  loopmsg = "loopback "+str(cnt)+" "+s+"\n"
249  cmd = BsPacker.PackMsg(msgId, [{'fval':loopmsg, 'ftype':'zstr'}])
250  sendCmd(BsClientSock, cmd)
251  rsp = recvRsp(BsClientSock)
252  if rsp:
253  vals = BsPacker.UnpackMsg(rsp, msgId,
254  [{'fname':'loopmsg', 'ftype':'zstr'}],
255  BsPacker.GetLastTid())
256  if vals['_rc'] == 'ok':
257  print vals['loopmsg'],
258  else:
259  print "error:", vals['_error_msg']
260  cnt = cnt + 1
261  if max is not None and cnt > max:
262  return
263  time.sleep(1)
264 
265 #--
266 def CmdVersion():
267  cmdName = 'version'
268  msgId = BsProxyMsgIdVersion
269  cmd = BsPacker.PackMsg(msgId, [])
270  sendCmd(BsClientSock, cmd)
271  rsp = recvRsp(BsClientSock)
272  if rsp:
273  vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'version', 'ftype':'zstr'}])
274  if vals['_rc'] == 'ok':
275  print 'BotSense Server Version: %s' % vals['version']
276  else:
277  print "error:", vals['_error_msg']
278 
279 #--
280 def CmdInfo():
281  cmdName = 'info'
282  msgId = BsProxyMsgIdProxyInfo
283  cmd = BsPacker.PackMsg(msgId, [])
284  sendCmd(BsClientSock, cmd)
285  rsp = recvRsp(BsClientSock)
286  if rsp:
287  vals = BsPacker.UnpackMsg(rsp, msgId, [])
288  if vals['_rc'] == 'ok':
289  print 'Server proxied devices:',
290  n = vals['blen']
291  k = vals['_pos']
292  while n > 0:
293  print '0x%02x' % ord(rsp[k]),
294  n -= 1
295  k += 1
296  print
297  else:
298  print "error:", vals['_error_msg']
299 
300 #--
301 def CmdLog(args):
302  cmdName = 'log'
303  msgId = BsProxyMsgIdLog
304  if len(args) < 1:
305  print >>sys.stderr, "error: %s: missing argument(s)" % cmdName
306  return
307  level = str2int(args[0])
308  if level is None:
309  return
310  cmd = BsPacker.PackMsg(msgId, [{'fval':level, 'ftype':'u8'}])
311  sendCmd(BsClientSock, cmd)
312  rsp = recvRsp(BsClientSock)
313  if rsp:
314  vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'level', 'ftype':'u8'}])
315  if vals['_rc'] == 'ok':
316  print 'Server logging level: %d' % vals['level']
317  else:
318  print "error:", vals['_error_msg']
319 
320 #--
321 def CmdI2CScan(args):
322  cmdName = 'i2cscan'
323  msgId = BsProxyMsgIdDevScan
324  if len(args) < 1:
325  print >>sys.stderr, "error: %s: missing argument(s)" % cmdName
326  return
327  handle = str2int(args[0])
328  if handle is None:
329  return
330  cmd = BsPacker.PackMsg(msgId, [{'fval':handle, 'ftype':'u8'}])
331  sendCmd(BsClientSock, cmd)
332  rsp = recvRsp(BsClientSock)
333  if rsp:
334  vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'nScanned', 'ftype':'u8'}])
335  if vals['_rc'] == 'ok':
336  nScanned = vals['nScanned']
337  print "Scanned %d I2C devices" % nScanned
338  for byte in rsp[5:]:
339  if nScanned <= 0:
340  break
341  print "0x%02x" % ord(byte),
342  nScanned -= 1
343  else:
344  print "error:", vals['_error_msg']
345 
346 
347 BsClientHandles = {}
348 
349 while True:
350  print """
351 The bsUnitTest python application is being rewritten to fit into BotSense 3.0.
352 Goodbye.
353 """
354  sys.exit(8)
355  cmd = raw_input("bsclient$ ")
356  args = cmd.split()
357  if not args:
358  pass
359  elif args[0] == 'quit':
360  break
361  elif args[0] == 'open':
362  CmdOpen(args[1:])
363  elif args[0] == 'close':
364  CmdClose()
365  elif args[0] == 'loopback':
366  CmdLoopback(args[1:])
367  elif args[0] == 'version':
368  CmdVersion()
369  elif args[0] == 'info':
370  CmdInfo()
371  elif args[0] == 'log':
372  CmdLog(args[1:])
373  elif args[0] == 'i2cscan':
374  CmdI2CScan(args[1:])
375  elif args[0] == 'devopen':
376  handle = CmdDevOpen(args[1:])
377  elif args[0] == 'devclose':
378  CmdDevClose(args[1:])
379  elif args[0] == 'bpfoot_left' or args[0] == 'bpfoot_right':
380  CmdBPFoot(args)
381  elif args[0] == 'help':
382  print >>sys.stderr, """\
383 BotSense Server UnitTest Help
384 -----------------------------
385 
386 *BrainPack Foot Commands
387  Two BP Feet (left and right) are supported. Use command 'devadd' to add each
388  foot to the bsproxy proxied devices.
389 
390  General Foot Command Format:
391  bpfoot_left footcmd [byte...]
392  bpfoot_right footcmd [byte...]
393  footcmd - Foot-specific command.
394  byte... - Foot-specific command bytes, if any.
395 
396  Specific Foot Commands:
397  ... getids - Get BrainPack foot identifiers.
398  ... getraw - Get foot raw sensor data.
399  ... getcooked - Get foot cooked sensor data.
400 
401 
402 *BotSense Proxy Connection Commands
403  Opens, closes, and tests bsproxy connection.
404 
405  Commands:
406  open [addr [port]] - Open connection on IP address and port.
407  addr - IP address. Default: 127.0.0.1
408  port - IP port. Default: 9195
409  close - Close connection.
410  loopback [cnt] - Loopback cnt times.
411  cnt - Number of loops. Default is forever.
412  version - Server version.
413  info - List of server supported proxied device types.
414  log - Set server log level.
415  i2cscan handle - Scan the I2C Bus for available I2C attached devices.
416  handle - allocated proxied device handle
417 
418 
419 *Proxied Device General Commands
420  Opens, closes, and lists client's proxied devices.
421 
422  Commands:
423  devopen type i2caddr [devname]
424  - Open proxied device.
425  type - One of: i2c bpfoot_left bpfoot_right bpimu.
426  i2caddr - I2C address [0x01 - 0x7f].
427  devname - device name. Default: /dev/null
428  devclose handle - Close proxied device.
429  handle - allocated proxied device handle
430 
431 
432 *BotSense UnitTest Commands
433  help - Print this help.
434  quit - Quit UnitTest shell.
435 """
436  else:
437  print >>sys.stderr, "Error:", args[0], "; unknown command"
438 
BotSense Types.
Definition: BotSenseTypes.py:1