import os import base64 import json import pylab as pl from matplotlib.pylab import mpl import paho.mqtt.client as mqtt import random import time import numpy as np mpl.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 显示中文 mpl.rcParams['axes.unicode_minus'] = False # 显示负号 np.set_printoptions(threshold=np.inf) # 画出时域信号和频域信号图谱 def draw_figur(data, fft_size, freqs, abs_xfft, sampling_rate, channel): fig = pl.figure() # 使用鼠标缩放图片 def call_back(event): axtemp = event.inaxes x_min, x_max = axtemp.get_xlim() fanwei = (x_max - x_min) / 10 if event.button == 'up': axtemp.set(xlim=(x_min + fanwei, x_max - fanwei)) print('up') elif event.button == 'down': axtemp.set(xlim=(x_min - fanwei, x_max + fanwei)) print('down') fig.canvas.draw_idle() # 绘图动作实时反映在图像上 fig.canvas.mpl_connect('scroll_event', call_back) fig.canvas.mpl_connect('button_press_event', call_back) pl.subplot(3, 1, 1) # 截取幕布的一部分 np.linspace(-5, 5, 10) pl.plot(data[:fft_size], color='r') pl.xlabel('点数') pl.ylabel('加速度值') pl.title('时域信号') pl.subplot(3, 1, 3) # 截取幕布的一部分 np.linspace(-5, 5, 10) pl.title(f"频域信号(通道号:{channel})") pl.ylabel("振幅") pl.xlabel("频率(KHz)") pl.plot(freqs, abs_xfft) time_name = time.time() pl.show() # 连接成功回调 class Mqtt_Subscriber: def __init__(self, central_ip='192.168.123.200', port=1883, topic_prefix='10B035041002230110003775/1/', callback_func=None, node_name='bci_', anonymous=True, timeout=60): self.topic_prefix = topic_prefix self.callback = callback_func self.broker_ip = central_ip self.broker_port = port self.timeout = timeout self.connected = False self.node_name = node_name if anonymous: self.node_name = self.node_name + str(random.randint(100000, 999999)) self.Start() def Start(self): self.client = mqtt.Client(self.node_name) self.client.on_connect = self.on_connect self.client.on_message = self.default_on_message self.client.connect(self.broker_ip, self.broker_port, self.timeout) for i in range(12): topic_name = f'{self.topic_prefix}{i}/multivalue' self.client.subscribe(topic_name) self.client.loop_start() def default_on_message(self, client, userdata, msg): channel = int(msg.topic.split('/')[-2]) if channel < 12: data = json.loads(msg.payload.decode('utf-8')) data_points = list(map(float, data['data']['sample_data']['data'].split(" "))) spectrum_data = list(map(float, data['data']['sample_data']['spectrum'].split(" "))) # Create directories if they don't exist os.makedirs(self.topic_prefix, exist_ok=True) # Save the data to a file filename = os.path.join(self.topic_prefix, f'{channel}_data.json') with open(filename, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False) if channel ==0: # Visualize the data draw_figur(data_points, 60000*10, np.linspace(0, int(60000), int(60000*10)) / 1000, spectrum_data, 1, channel) def on_connect(self, client, userdata, flags, rc): if rc == 0: self.connected = True else: raise Exception("连接到mqtt服务器失败。") def numpy_decoder(dct): if '__ndarray__' in dct: data = base64.b64decode(dct['__ndarray__']) return np.frombuffer(data, dtype=dct['dtype']).reshape(dct['shape']) return dct if __name__ == '__main__': p = Mqtt_Subscriber(topic_prefix='10B035041002230110003775/1/') while not p.connected: pass while True: time.sleep(1)