This repository has been archived on 2025-04-28. You can view files and clone it, but cannot push or open issues or pull requests.
CMS/5.MISC/usb读取数据画图/Pyusb.py

101 lines
3.1 KiB
Python
Raw Permalink Normal View History

2024-11-19 09:19:21 +00:00
import usb.core
import struct
import usb.util
import time
import array
import numpy as np
import FFT
simpling_rate = 40000
fft_size = 12800
VID = 0x0483
PID = 0x5740
# VID = 0xABAB
# PID = 0xCDCD
def Find_usb():
dev = usb.core.find(idVendor=VID, idProduct=PID)
if not dev:
print("Could not find USB :(")
exit(1)
else:
print("Yeeha! Found a usb device!")
dev.reset()
if dev.is_kernel_driver_active(0):
dev.detach_kernel_driver(0)
dev.set_configuration()
return dev
# 内部缓冲区凑齐80000个字节即20000个浮点数统一将byte转成浮点并构造为np.array进行后续运算
FFT_BUF_LEN = 40960
# 一次从USB读取1000暂定具体根据单片机调整同时也没考虑头尾、校验等多出字节个字节放到内部缓冲区
USB_BUF_LEN = 8224
# 一个字典缓存8个通道的数据格式为未转换的16进制KEY值可根据USB桢结构更换以方便存取处理
fft_buf = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: [], 8: []}
def Read(dev):
while True:
for k, v in fft_buf.items():
# 如果有通道的fft_buf超过了10000,那么就做一次算法我不太确定unpack()和np.array()的性能,或许可以考虑另开线程来处理)
if len(v) >= FFT_BUF_LEN:
data_array = array.array('f', v)
# 将数组保存到文件
np.savetxt(f"data_array{k}.txt", data_array)
# print(data_array)
freqs, abs_xfft = FFT.FFT(simpling_rate, fft_size, data_array)
np.savetxt(f"abs_xfft{k}.txt", abs_xfft)
# FFT.draw_figur(data_array, fft_size, freqs, abs_xfft, k )
print("通道号:", k)
# 清空当前通道对应的列表
fft_buf[k].clear()
# exit(1)
num = dev.read(0x81, USB_BUF_LEN, 2000) # 超时时间可能需要调整
for i in range(0, len(num), 1028):
num_list = num[i:i + 1028]
if num_list[0] == num_list[1] == num_list[2] == num_list[3]:
print("通道号:", num_list[0])
print("个数:", len(num_list))
# print("内容:", num_list[4])
DataProcess(num_list, num_list[0])
# else:
# print("帧结构错误")
# exit(1)
# 将接受到的数据进行整合处理
def DataProcess(num_list, j):
usb_buf = num_list[4: len(num_list)]
float_list = []
for i in range(0, len(usb_buf), 2):
# 将4个8位数据合并成一个32位整数
int_data = (usb_buf[i] << 8) | usb_buf[i + 1]
# print(int_data)
if int_data & 0x8000: # 最高位为1表示负数
int_data = -((int_data ^ 0xFFFF) + 1) # 取反加一得到负数
# print(int_data)
float_data = int_data * 5.0 / 32768.0
# print(float_data)
float_list.append(float_data)
# exit(1)
fft_buf[j].extend(float_list)
def Write(dev):
w = 0
while w == 0:
w = dev.write(0x01, "hello usb")
print(w)