#!/usr/bin/python3
# Copyright (c) 2017-2022, Mudita Sp. z.o.o. All rights reserved.
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
'''
Tool for plotting realtime data provided through RTT via JLink.
Data must be wrapped in string containing json with timestamp, current and current_filtered keywords.
Specific line is detected by searching for SCOPE_KEYWORD. All other logs are printed in console.
Usage:
1. Before running the script JLinkServer must be running
2. Max number of displayed samples is specified by MAX_SAMPLES_TO_DISPLAY
3. Plot refresh rate is specified by REFRESH_INTERVAL_MS
4. To log data into CSV file please provide --save_to_csv FILENAME
'''
import pylink
import sys
import time
import json
import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
import collections
import argparse
import csv
try:
import thread
except ImportError:
import _thread as thread
# Plot constants
MAX_SAMPLES_TO_DISPLAY = 200
REFRESH_INTERVAL_MS = 100
# Parser constants
SCOPE_KEYWORD = '[scope]: '
ts = collections.deque(np.zeros(MAX_SAMPLES_TO_DISPLAY))
meas = collections.deque(np.zeros(MAX_SAMPLES_TO_DISPLAY))
meas_flt = collections.deque(np.zeros(MAX_SAMPLES_TO_DISPLAY))
fig = plt.figure(figsize=(12,6))
ax = plt.subplot()
csv_filename = str()
def save_to_csv(to_save):
# save to csv if filename provided
if csv_filename:
with open(csv_filename, 'a') as file:
writer = csv.writer(file)
writer.writerow(to_save)
def parse_lines(charbuf):
for line in charbuf.splitlines():
if SCOPE_KEYWORD in line:
try:
to_parse = line.split(SCOPE_KEYWORD)[1]
json_meas = json.loads(to_parse)
ts.popleft()
ts.append(int(json_meas['timestamp']))
meas.popleft()
meas.append(int(json_meas['current']))
meas_flt.popleft()
meas_flt.append(int(json_meas['current_filtered']))
save_to_csv([int(json_meas['timestamp']), int(json_meas['current']), int(json_meas['current_filtered'])])
except:
print("exception in json parse, corrupted data")
else:
print(line) # print other logs in console
def read_rtt(jlink):
try:
while jlink.connected():
terminal_bytes = jlink.rtt_read(0, 1024)
if terminal_bytes:
parse_lines("".join(map(chr, terminal_bytes)))
time.sleep(0.01)
except Exception:
print("IO read thread exception, exiting...")
thread.interrupt_main()
raise
def animation(i):
try:
ax.cla()
num_of_elems = int(np.nonzero(ts)[0][0]) # find first nonzero element in ts queue
ax.plot(ts, meas, 'b')
ax.plot(ts, meas_flt, 'g')
# plot styling
plt.title("Current measurement - b: inst, g: filtered", loc = 'left')
plt.xlabel("Timestamp [ms]")
plt.ylabel("Current [mA]")
plt.xlim([ts[num_of_elems], ts[-1]])
plt.grid(True)
except:
print("exception in plotter - no data provided")
def main():
jlink = pylink.JLink()
print("connecting to JLink...")
jlink.open()
print("starting RTT...")
jlink.rtt_start()
ani = FuncAnimation(fig, animation, interval=REFRESH_INTERVAL_MS)
try:
thread.start_new_thread(read_rtt, (jlink,))
plt.show()
while jlink.connected():
time.sleep(1)
print("JLink disconnected, exiting...")
except KeyboardInterrupt:
print("ctrl-c detected, exiting...")
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="RTT realtime plotter")
parser.add_argument("--save_to_csv",
help="Save data to csv file. Provide filename as an argument.",
default='')
args = parser.parse_args()
csv_filename = args.save_to_csv
sys.exit(main())