This repository has been archived on 2025-04-28. You can view files and clone it, but cannot push or open issues or pull requests.
CMS/4.Test/mqtt接收/main.py
2024-11-19 17:19:21 +08:00

123 lines
4.0 KiB
Python

import os
import base64
import json
import pylab as pl
from matplotlib.pylab import mpl
import paho.mqtt.client as mqtt
import random
import time
import numpy as np
mpl.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 显示中文
mpl.rcParams['axes.unicode_minus'] = False # 显示负号
np.set_printoptions(threshold=np.inf)
# 画出时域信号和频域信号图谱
def draw_figur(data, fft_size, freqs, abs_xfft, sampling_rate, channel):
fig = pl.figure()
# 使用鼠标缩放图片
def call_back(event):
axtemp = event.inaxes
x_min, x_max = axtemp.get_xlim()
fanwei = (x_max - x_min) / 10
if event.button == 'up':
axtemp.set(xlim=(x_min + fanwei, x_max - fanwei))
print('up')
elif event.button == 'down':
axtemp.set(xlim=(x_min - fanwei, x_max + fanwei))
print('down')
fig.canvas.draw_idle() # 绘图动作实时反映在图像上
fig.canvas.mpl_connect('scroll_event', call_back)
fig.canvas.mpl_connect('button_press_event', call_back)
pl.subplot(3, 1, 1) # 截取幕布的一部分
np.linspace(-5, 5, 10)
pl.plot(data[:fft_size], color='r')
pl.xlabel('点数')
pl.ylabel('加速度值')
pl.title('时域信号')
pl.subplot(3, 1, 3) # 截取幕布的一部分
np.linspace(-5, 5, 10)
pl.title(f"频域信号(通道号:{channel})")
pl.ylabel("振幅")
pl.xlabel("频率(KHz)")
pl.plot(freqs, abs_xfft)
time_name = time.time()
pl.show()
# 连接成功回调
class Mqtt_Subscriber:
def __init__(self, central_ip='192.168.123.200', port=1883,
topic_prefix='10B035041002230110003775/1/', callback_func=None,
node_name='bci_', anonymous=True, timeout=60):
self.topic_prefix = topic_prefix
self.callback = callback_func
self.broker_ip = central_ip
self.broker_port = port
self.timeout = timeout
self.connected = False
self.node_name = node_name
if anonymous:
self.node_name = self.node_name + str(random.randint(100000, 999999))
self.Start()
def Start(self):
self.client = mqtt.Client(self.node_name)
self.client.on_connect = self.on_connect
self.client.on_message = self.default_on_message
self.client.connect(self.broker_ip, self.broker_port, self.timeout)
for i in range(12):
topic_name = f'{self.topic_prefix}{i}/multivalue'
self.client.subscribe(topic_name)
self.client.loop_start()
def default_on_message(self, client, userdata, msg):
channel = int(msg.topic.split('/')[-2])
if channel < 12:
data = json.loads(msg.payload.decode('utf-8'))
data_points = list(map(float, data['data']['sample_data']['data'].split(" ")))
spectrum_data = list(map(float, data['data']['sample_data']['spectrum'].split(" ")))
# Create directories if they don't exist
os.makedirs(self.topic_prefix, exist_ok=True)
# Save the data to a file
filename = os.path.join(self.topic_prefix, f'{channel}_data.json')
with open(filename, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False)
if channel ==0:
# Visualize the data
draw_figur(data_points, 60000*10,
np.linspace(0, int(60000), int(60000*10)) / 1000,
spectrum_data, 1, channel)
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
else:
raise Exception("连接到mqtt服务器失败。")
def numpy_decoder(dct):
if '__ndarray__' in dct:
data = base64.b64decode(dct['__ndarray__'])
return np.frombuffer(data, dtype=dct['dtype']).reshape(dct['shape'])
return dct
if __name__ == '__main__':
p = Mqtt_Subscriber(topic_prefix='10B035041002230110003775/1/')
while not p.connected:
pass
while True:
time.sleep(1)