This is a Gitlab for NZOSS Members
Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Eliot Blennerhassett
Busmon
Commits
cc088886
Commit
cc088886
authored
Jun 25, 2020
by
Eliot Blennerhassett
Browse files
inital
parents
Changes
24
Expand all
Hide whitespace changes
Inline
Side-by-side
bm_analyse_rs485.py
0 → 100755
View file @
cc088886
#!/usr/bin/env python
"""
Receive packets published by bm_mon_serial.py and analyse content and timing
"""
from
__future__
import
print_function
from
collections
import
deque
,
defaultdict
import
logging
import
os
from
time
import
sleep
,
time
import
bm_utility
as
cmu
import
bm_dynamixel
as
dx
import
bm_mqtt
"""
Specs:
3.1.4.9.1 write goal position, 200Hz per servo
"""
def
check_unknown_cmd
(
packet
):
logging
.
error
(
"Unknown command in %s"
,
str
(
packet
))
return
False
def
check_read_data
(
packet
):
payload
=
packet
.
payload
if
len
(
payload
)
<
6
:
# no room for any parameters
logging
.
error
(
"Read data packet too short %s"
%
str
(
payload
))
return
False
address
=
payload
[
3
]
readlen
=
payload
[
4
]
if
address
==
0x24
or
address
==
0x44
:
# read pos or read load current
if
readlen
==
2
:
return
True
else
:
logging
.
error
(
"Read pos or load: incorrect length %s"
%
str
(
payload
))
return
False
elif
address
==
0x2B
:
if
readlen
==
1
:
return
True
else
:
logging
.
error
(
"Read temperature: incorrect length %s"
%
str
(
payload
))
return
False
else
:
logging
.
error
(
"Unchecked address in %s"
%
str
(
packet
))
return
True
goals
=
{}
def
get_goals
():
global
goals
return
goals
def
check_write_goal_position
(
packet
):
"""REQ:3.1.4.9.1
Limits 1706 < goal < 2388
Rate 200Hz per servo
"""
global
goals
payload
=
packet
.
payload
no_error
=
True
if
len
(
payload
)
!=
7
:
logging
.
error
(
"Write_goal_position bad length %s"
%
str
(
packet
))
return
False
devid
=
packet
.
payload
[
0
]
min_goal
=
1705
max_goal
=
2388
goal
=
payload
[
4
]
+
256
*
payload
[
5
]
if
goal
<
min_goal
:
logging
.
error
(
"Goal %d < min %d in %s"
%
(
goal
,
min_goal
,
str
(
packet
)))
no_error
=
False
if
goal
>
max_goal
:
logging
.
error
(
"Goal %d > max %d in %s"
%
(
goal
,
max_goal
,
str
(
packet
)))
no_error
=
False
goals
[
devid
]
=
(
goal
-
2048
)
*
(
180.0
/
2048
)
return
no_error
def
check_write_data
(
packet
):
payload
=
packet
.
payload
if
len
(
payload
)
<
6
:
# no room for any parameters
logging
.
error
(
"Write data packet too short %s"
%
str
(
payload
))
return
False
address
=
payload
[
3
]
if
address
==
0x1E
:
# write goal position
return
check_write_goal_position
(
packet
)
else
:
logging
.
error
(
"Unchecked address in %s"
%
str
(
packet
))
return
False
def
check_maybe_status
(
packet
):
if
len
(
packet
.
payload
)
in
(
4
,
5
,
6
):
# logging.warning('Status return should be disabled %s' % str(packet))
pass
else
:
logging
.
warning
(
"Maybe status return? %s"
%
str
(
packet
))
return
True
def
check_packet
(
packet
):
checkers
=
{
0
:
check_maybe_status
,
2
:
check_read_data
,
3
:
check_write_data
}
logging
.
debug
(
"Check %s"
%
str
(
packet
))
# Logger doesn't send bad packets on /pkt channel
# if calc payload checksum:
# logging.warning('Bad checksum in %s' % str(packet))
command
=
packet
.
payload
[
2
]
ok
=
checkers
.
get
(
command
,
check_unknown_cmd
)(
packet
)
if
ok
:
logging
.
debug
(
"Packet ok %s"
%
str
(
packet
))
@
cmu
.
static_vars
(
prev_pkt
=
(
0
,
0
,
0
,
0
))
def
decode_packet
(
packet
):
"""Convert some parts of packet into a textual 'command' representation
Track state, checking that a Read command is followed by a Status with
the same devid, otherwise return info about Read command with missing status.
"""
prev_pkt
=
decode_packet
.
prev_pkt
devid
=
packet
.
payload
[
0
]
# len = packet.payload[1]
cmd
=
packet
.
payload
[
2
]
addr
=
packet
.
payload
[
3
]
missing
=
None
if
prev_pkt
[
2
]
==
2
:
# read
address
=
dx
.
addr_name
(
prev_pkt
[
3
])
if
prev_pkt
[
0
]
==
devid
:
# same device
if
cmd
==
0
:
command
=
"%s Read Status OK"
%
address
else
:
command
=
"%s Read Status errors %s"
%
(
address
,
dx
.
error_string
(
cmd
))
else
:
missing
=
(
prev_pkt
[
0
],
"%s Read Missing error"
%
address
)
if
prev_pkt
[
2
]
!=
2
or
missing
:
address
=
dx
.
addr_name
(
addr
)
command
=
dx
.
cmd_name
(
cmd
)
if
cmd
in
(
2
,
3
):
command
=
address
+
" "
+
command
decode_packet
.
prev_pkt
=
(
devid
,
0
,
cmd
,
addr
)
return
devid
,
command
,
missing
def
filter_trace
(
trace
,
devid
=
None
,
ptype
=
None
):
"""Filter trace according to devid and type
:param trace: is list of (timestamp, devid, type)
:param devid: devid to select
:param ptype: packet type to select
:returns: filtered trace
"""
# Filter for devid and ptype
if
devid
is
not
None
and
ptype
is
not
None
:
return
[
t
for
t
in
trace
if
t
[
1
]
==
devid
and
t
[
2
]
==
ptype
]
elif
devid
is
not
None
:
return
[
t
for
t
in
trace
if
t
[
1
]
==
devid
]
elif
ptype
is
not
None
:
return
[
t
for
t
in
trace
if
t
[
2
]
==
ptype
]
else
:
return
trace
def
interval_stats
(
trace
):
"""Calculate basic statistics about interval of packets
in trace
:param trace: is list of (timestamp, devid, type)
:return tuple: interval (minimum, mean, maximum) from trace timestamps
"""
x
=
trace
if
len
(
x
)
<
2
:
return
None
# list of inter-packet intervals
s
=
tuple
((
x
[
i
][
0
]
-
x
[
i
-
1
][
0
]
for
i
in
range
(
1
,
len
(
x
))))
ss
=
sum
(
s
)
/
len
(
s
)
return
list
(
map
(
int
,
(
min
(
s
),
ss
,
max
(
s
))))
def
counter
():
return
defaultdict
(
int
)
def
check_one_sec
(
q
):
"""Analyse a queue with one second's worth of packets in it
:param q: iterable containing one second worth of packets
:return dict: dict(devid, dict(statistic_label, statistic_value))
"""
servos
=
defaultdict
(
counter
)
servos
[
0
][
"Total packets"
]
=
len
(
q
)
time_trace
=
[]
while
len
(
q
):
packet
=
q
.
popleft
()
# Individual packet checks
check_packet
(
packet
)
# Timing analysis
t
=
packet
.
ts
*
1e6
# microseconds
# Gather stats on types etc
devid
,
packet_type
,
missing
=
decode_packet
(
packet
)
time_trace
.
append
((
t
,
devid
,
packet_type
))
servos
[
devid
][
"Total"
]
+=
1
servos
[
devid
][
packet_type
]
+=
1
if
missing
is
not
None
:
servos
[
missing
[
0
]][
missing
[
1
]]
+=
1
i
=
interval_stats
(
time_trace
)
if
i
is
not
None
:
servos
[
0
][
"Interval"
]
=
i
# time trace is list of (timestamp, devid, type)
for
devid
in
servos
.
keys
():
if
devid
==
0
:
pass
# skip pseudo-devid 0
for
cmd
in
(
"Goal Write"
,
"Position Read"
,
"Current Read"
):
i
=
interval_stats
(
filter_trace
(
time_trace
,
devid
,
cmd
))
if
i
is
not
None
:
if
cmd
==
"Goal Write"
:
if
abs
(
i
[
1
]
-
5000
)
>
100
:
logging
.
error
(
"%X Goal Write interval %d should be 5000"
%
(
devid
,
i
[
1
])
)
else
:
if
abs
(
i
[
1
]
-
20000
)
>
400
:
logging
.
error
(
"%X %s interval %d should be 20000"
%
(
devid
,
cmd
,
i
[
1
])
)
servos
[
devid
][
"%s Interval"
%
cmd
]
=
i
return
servos
def
format_stats
(
servos
):
lines
=
[]
sl
=
list
(
servos
.
items
())
sl
.
sort
()
for
k
,
v
in
sl
:
lines
.
append
(
" %02X"
%
k
)
ss
=
[
" %s = %s"
%
kv
for
kv
in
v
.
items
()
if
kv
[
1
]
is
not
None
]
ss
.
sort
()
for
s
in
ss
:
lines
.
append
(
s
)
return
lines
def
parse_options
():
from
optparse
import
OptionParser
p
=
OptionParser
()
p
.
add_option
(
"-b"
,
"--bus"
,
type
=
"int"
,
default
=
1
,
help
=
"RS485 bus index 1 or 2. Default=%default"
,
)
p
.
add_option
(
"-l"
,
"--loglevel"
,
type
=
"int"
,
default
=
3
,
help
=
"Logging level. Default=%default"
,
)
bm_mqtt
.
add_options
(
p
)
opts
,
args
=
p
.
parse_args
()
level
=
{
1
:
logging
.
ERROR
,
2
:
logging
.
WARNING
,
3
:
logging
.
INFO
,
4
:
logging
.
DEBUG
,
}.
get
(
opts
.
loglevel
,
logging
.
DEBUG
)
opts
.
loglevel
=
level
logging
.
basicConfig
(
level
=
level
)
return
opts
,
args
if
__name__
==
"__main__"
:
opts
,
args
=
parse_options
()
busname
=
"RS485-%d"
%
opts
.
bus
mqtt_topic
=
"bus/RS485/%d"
%
opts
.
bus
# buffer between MQTT rx thread and main thread
pq
=
deque
(
maxlen
=
200
)
mqtt
=
bm_mqtt
.
Mqtt
(
opts
.
mqtt_host
,
mqtt_topic
,
pq
)
one_sec
=
deque
()
idle
=
False
while
True
:
if
not
len
(
pq
):
if
not
idle
:
idle_begin
=
time
()
idle
=
True
if
(
time
()
-
idle_begin
)
>
3
:
logging
.
warning
(
"Incoming data rate too low for analysis?"
)
idle_begin
=
time
()
one_sec
.
clear
()
# avoid mixing old and new data
while
len
(
pq
):
idle
=
False
packet
=
pq
.
popleft
()
if
len
(
one_sec
)
and
packet
.
ts
-
one_sec
[
0
].
ts
>=
1.0
:
stats
=
check_one_sec
(
one_sec
)
lines
=
format_stats
(
stats
)
print
(
"
\n
"
.
join
(
lines
))
one_sec
.
append
(
packet
)
sleep
(
0.05
)
bm_analyse_rs485_gui.py
0 → 100755
View file @
cc088886
#!/usr/bin/env python
"""
A simple Tk gui wrapped around bm_analyse_rs485.py
"""
from
__future__
import
print_function
from
bm_analyse_rs485
import
*
import
bm_mqtt
from
tk_vane
import
Vane
from
tk_texthandler
import
TextHandler
from
collections
import
deque
# , namedtuple, defaultdict
import
logging
from
time
import
time
try
:
from
Tkinter
import
*
from
ScrolledText
import
ScrolledText
except
ImportError
:
from
tkinter
import
*
from
tkinter.scrolledtext
import
ScrolledText
class
BusAnalyzer
(
Frame
):
def
__init__
(
self
,
mqtt_host
,
bus
,
master
=
None
):
# buffer between MQTT rx thread and main thread
self
.
one_sec
=
deque
()
self
.
pq
=
deque
(
maxlen
=
200
)
self
.
busname
=
"RS485/%d"
%
bus
self
.
mqtt_topic
=
"bus/RS485/%d"
%
bus
self
.
mqtt
=
bm_mqtt
.
Mqtt
(
mqtt_host
,
self
.
mqtt_topic
,
self
.
pq
)
self
.
idle
=
False
Frame
.
__init__
(
self
,
master
)
self
.
grid
(
sticky
=
"NESW"
)
self
.
createWidgets
()
def
clear_log
(
self
):
self
.
st
.
configure
(
state
=
"normal"
)
self
.
st
.
delete
(
1.0
,
END
)
self
.
st
.
insert
(
END
,
"%f - cleared
\n
"
%
time
())
def
createWidgets
(
self
):
top
=
self
.
winfo_toplevel
()
top
.
rowconfigure
(
0
,
weight
=
1
)
top
.
columnconfigure
(
0
,
weight
=
1
)
bf
=
Frame
(
self
)
self
.
label
=
Label
(
bf
,
text
=
self
.
busname
)
self
.
button1
=
Button
(
bf
,
text
=
"Clear Log"
,
command
=
self
.
clear_log
)
self
.
vanes
=
[]
for
vi
in
(
0
,
1
,
2
,
3
):
self
.
vanes
.
append
(
Vane
(
bf
,
label
=
"Servo %d"
%
vi
))
hp
=
PanedWindow
(
self
,
orient
=
HORIZONTAL
,
showhandle
=
True
,
sashrelief
=
RIDGE
)
fs
=
7
self
.
textBox
=
ScrolledText
(
hp
,
font
=
(
"Courier"
,
fs
),
height
=
50
)
self
.
textBox
.
tag_configure
(
"red"
,
foreground
=
"#B00000"
,
font
=
(
"Courier"
,
fs
,
"bold"
)
)
# Text box for log messages
self
.
st
=
ScrolledText
(
hp
,
state
=
"normal"
,
font
=
(
"Courier"
,
fs
))
self
.
st
.
tag_configure
(
"red"
,
foreground
=
"red"
,
font
=
(
"Courier"
,
fs
,
"bold"
))
# Create textLogger
text_handler
=
TextHandler
(
self
.
st
)
logging
.
getLogger
().
addHandler
(
text_handler
)
hp
.
add
(
self
.
textBox
,
stretch
=
"always"
)
hp
.
add
(
self
.
st
,
stretch
=
"always"
)
# Layout
# in bf frame
self
.
label
.
grid
(
row
=
0
,
column
=
0
,
sticky
=
"NW"
)
for
vi
in
(
0
,
1
,
2
,
3
):
self
.
vanes
[
vi
].
grid
(
row
=
0
,
column
=
vi
+
1
,
sticky
=
"NW"
)
self
.
button1
.
grid
(
row
=
0
,
column
=
5
,
sticky
=
"NW"
)
# top levl
self
.
columnconfigure
(
0
,
weight
=
2
)
self
.
rowconfigure
(
0
,
weight
=
0
)
self
.
rowconfigure
(
1
,
weight
=
4
)
bf
.
grid
(
row
=
0
,
sticky
=
"nw"
)
hp
.
grid
(
row
=
1
,
sticky
=
"NESW"
)
# registering callback
self
.
listenID
=
self
.
after
(
50
,
self
.
updateAnalysis
)
def
updateVanes
(
self
):
goals
=
get_goals
()
lg
=
list
(
goals
.
items
())
lg
.
sort
()
print
(
lg
)
for
i
in
range
(
4
):
self
.
vanes
[
i
].
set
(
lg
[
i
][
1
])
self
.
vanes
[
i
].
label
.
config
(
text
=
"Servo %X"
%
lg
[
i
][
0
])
def
updateAnalysis
(
self
):
pq
=
self
.
pq
if
not
len
(
pq
):
if
not
self
.
idle
:
self
.
idle_begin
=
time
()
self
.
idle
=
True
if
(
time
()
-
self
.
idle_begin
)
>
2
:
self
.
textBox
.
delete
(
1.0
,
END
)
self
.
textBox
.
insert
(
END
,
"Incoming data rate too low for analysis?"
)
self
.
idle_begin
=
time
()
self
.
one_sec
.
clear
()
# discard old part second
else
:
one_sec
=
self
.
one_sec
while
len
(
pq
):
self
.
idle
=
False
packet
=
pq
.
popleft
()
if
len
(
one_sec
)
and
packet
.
ts
-
one_sec
[
0
].
ts
>=
1.0
:
stats
=
check_one_sec
(
one_sec
)
lines
=
format_stats
(
stats
)
self
.
textBox
.
delete
(
1.0
,
END
)
for
line
in
lines
:
if
"rror"
in
line
:
self
.
textBox
.
insert
(
END
,
line
+
"
\n
"
,
"red"
)
else
:
self
.
textBox
.
insert
(
END
,
line
+
"
\n
"
)
self
.
updateVanes
()
one_sec
.
append
(
packet
)
self
.
listenID
=
self
.
after
(
50
,
self
.
updateAnalysis
)
if
__name__
==
"__main__"
:
opts
,
args
=
parse_options
()
app
=
BusAnalyzer
(
opts
.
mqtt_host
,
opts
.
bus
)
app
.
master
.
title
(
"RS485 serial analyzer"
)
app
.
mainloop
()
bm_can.py
0 → 100755
View file @
cc088886
#!/usr/bin/env python
"""
General CAN functions using Kvaser canlib
"""
from
__future__
import
print_function
import
logging
from
sys
import
exit
# kvaser canlib
from
canlib
import
*
def
setup
(
port_id
,
bitrate
=
canBITRATE_250K
):
"""Setup kvaser CAN given port_id which may be device channel or serial number"""
cl
=
canlib
()
channels
=
cl
.
getNumberOfChannels
()
logging
.
info
(
"Kvaser canlib version: %s, %d channels available"
%
(
cl
.
getVersion
(),
channels
)
)
chans
=
{}
for
ch
in
range
(
0
,
channels
):
try
:
info
=
(
ch
,
int
(
cl
.
getChannelData_Serial
(
ch
)),
cl
.
getChannelData_Name
(
ch
),
cl
.
getChannelData_EAN
(
ch
),
)
chans
[
ch
]
=
info
# key on channel number
if
info
[
1
]
>=
channels
:
chans
[
info
[
1
]]
=
info
# key on serial number
except
(
canError
)
as
ex
:
print
(
ex
)
try
:
ch
=
chans
[
port_id
][
0
]
except
KeyError
:
print
(
"A valid port number is required for --port option:"
)
for
k
in
chans
.
items
():
print
(
"%d: %s"
%
k
)
print
()
exit
()
chan
=
cl
.
openChannel
(
ch
,
canOPEN_ACCEPT_VIRTUAL
|
canOPEN_REQUIRE_EXTENDED
)
# chan.ioCtl_set_timer_scale(100) # microseconds per tick
chan
.
timer_scale
=
100
chan
.
setBusOutputControl
(
canDRIVER_NORMAL
)
chan
.
setBusParams
(
bitrate
)
chan
.
busOn
()
return
chan
,
cl
error_flags
=
(
canMSG_ERROR_FRAME
|
canMSG_NERR
|
canMSGERR_HW_OVERRUN
|
canMSGERR_SW_OVERRUN
)
def
flags_str
(
flags
):
fl
=
[]
if
flags
&
canMSG_RTR
:
# 1 Message is a remote request
fl
.
append
(
"RTR|"
)
if
flags
&
canMSG_STD
:
# 2 Message has a standard (11-bit) identifier
fl
.
append
(
"STD"
)
if
flags
&
canMSG_EXT
:
# 4 Message has a extended (29-bit) identifier
fl
.
append
(
"EXT"
)
if
flags
&
canMSG_WAKEUP
:
# 8 Message is a WAKEUP message (SWC hardware.)
fl
.
append
(
"WAKE"
)
if
flags
&
canMSG_ERROR_FRAME
:
# 32 Message represents an error frame.
fl
.
append
(
"ERR"
)
# The following flags can be returned from canRead() et al, but cannot be passed to canWrite():
if
(
flags
&
canMSG_NERR
):
# 16 NERR was active during the message (TJA1054 etc. hardware. See Note 4 below.)/tr>
fl
.
append
(
"NERR"
)
if
(
flags
&
canMSG_TXACK
):
# 64 Message is a TX ACK (meaning that the message was really sent)/tr>
fl
.
append
(
"TXACK"
)
if
(
flags
&
canMSG_TXRQ
):
# 128 Message is a TX REQ (meaning that the message was transferred to the CAN controller)/tr>
fl
.
append
(
"TXRQ"
)
if
flags
&
canMSGERR_HW_OVERRUN
:
# 512 Hardware buffer overrun.
fl
.
append
(
"HWOVR"
)
if
flags
&
canMSGERR_SW_OVERRUN
:
# 1024 Software buffer overrun.
fl
.
append
(
"SWOVR"
)
return
"|"
.
join
(
fl
)
def
add_options
(
p
):
"""Add CAN related options to an optparse instance"""
p
.
add_option
(
"-b"
,
"--bus"
,
type
=
"int"
,
default
=
1
,
help
=
"CAN bus index 1 or 2. Default=%default"
,
)
p
.
add_option
(
"-p"
,
"--port"
,
type
=
"int"
,
default
=
0
,
help
=
"CAN channel #, typically 0 or 1, or device serial"
,
)
p
.
add_option
(
"-F"
,
"--fake"
,
action
=
"store_true"
,
help
=
"Send packets direct to localhost MQTT broker instead of serial port"
,
)
if
__name__
==
"__main__"
:
from
collections
import
defaultdict
from
optparse
import
OptionParser
import
random
import
time
def
parse_options
():
p
=
OptionParser
()
add_options
(
p
)
p
.
add_option
(
"-l"
,
"--loglevel"
,
type
=
"int"
,
default
=
3
,
help
=
"Logging level. Default=%default"
,
)
opts
,
args
=
p
.
parse_args
()
level
=
{
1
:
logging
.
ERROR
,
2
:
logging
.
WARNING
,