This repository has been archived on 2025-04-28. You can view files and clone it, but cannot push or open issues or pull requests.
CMS/4.Test/mqtt接收/main.py

123 lines
4.0 KiB
Python
Raw Permalink Normal View History

2024-11-19 09:19:21 +00:00
import os
import base64
import json
import pylab as pl
from matplotlib.pylab import mpl
import paho.mqtt.client as mqtt
import random
import time
import numpy as np
mpl.rcParams['font.sans-serif'] = ['Microsoft YaHei'] # 显示中文
mpl.rcParams['axes.unicode_minus'] = False # 显示负号
np.set_printoptions(threshold=np.inf)
# 画出时域信号和频域信号图谱
def draw_figur(data, fft_size, freqs, abs_xfft, sampling_rate, channel):
fig = pl.figure()
# 使用鼠标缩放图片
def call_back(event):
axtemp = event.inaxes
x_min, x_max = axtemp.get_xlim()
fanwei = (x_max - x_min) / 10
if event.button == 'up':
axtemp.set(xlim=(x_min + fanwei, x_max - fanwei))
print('up')
elif event.button == 'down':
axtemp.set(xlim=(x_min - fanwei, x_max + fanwei))
print('down')
fig.canvas.draw_idle() # 绘图动作实时反映在图像上
fig.canvas.mpl_connect('scroll_event', call_back)
fig.canvas.mpl_connect('button_press_event', call_back)
pl.subplot(3, 1, 1) # 截取幕布的一部分
np.linspace(-5, 5, 10)
pl.plot(data[:fft_size], color='r')
pl.xlabel('点数')
pl.ylabel('加速度值')
pl.title('时域信号')
pl.subplot(3, 1, 3) # 截取幕布的一部分
np.linspace(-5, 5, 10)
pl.title(f"频域信号(通道号:{channel})")
pl.ylabel("振幅")
pl.xlabel("频率(KHz)")
pl.plot(freqs, abs_xfft)
time_name = time.time()
pl.show()
# 连接成功回调
class Mqtt_Subscriber:
def __init__(self, central_ip='192.168.123.200', port=1883,
topic_prefix='10B035041002230110003775/1/', callback_func=None,
node_name='bci_', anonymous=True, timeout=60):
self.topic_prefix = topic_prefix
self.callback = callback_func
self.broker_ip = central_ip
self.broker_port = port
self.timeout = timeout
self.connected = False
self.node_name = node_name
if anonymous:
self.node_name = self.node_name + str(random.randint(100000, 999999))
self.Start()
def Start(self):
self.client = mqtt.Client(self.node_name)
self.client.on_connect = self.on_connect
self.client.on_message = self.default_on_message
self.client.connect(self.broker_ip, self.broker_port, self.timeout)
for i in range(12):
topic_name = f'{self.topic_prefix}{i}/multivalue'
self.client.subscribe(topic_name)
self.client.loop_start()
def default_on_message(self, client, userdata, msg):
channel = int(msg.topic.split('/')[-2])
if channel < 12:
data = json.loads(msg.payload.decode('utf-8'))
data_points = list(map(float, data['data']['sample_data']['data'].split(" ")))
spectrum_data = list(map(float, data['data']['sample_data']['spectrum'].split(" ")))
# Create directories if they don't exist
os.makedirs(self.topic_prefix, exist_ok=True)
# Save the data to a file
filename = os.path.join(self.topic_prefix, f'{channel}_data.json')
with open(filename, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False)
if channel ==0:
# Visualize the data
draw_figur(data_points, 60000*10,
np.linspace(0, int(60000), int(60000*10)) / 1000,
spectrum_data, 1, channel)
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
else:
raise Exception("连接到mqtt服务器失败。")
def numpy_decoder(dct):
if '__ndarray__' in dct:
data = base64.b64decode(dct['__ndarray__'])
return np.frombuffer(data, dtype=dct['dtype']).reshape(dct['shape'])
return dct
if __name__ == '__main__':
p = Mqtt_Subscriber(topic_prefix='10B035041002230110003775/1/')
while not p.connected:
pass
while True:
time.sleep(1)