This repository has been archived on 2025-04-28. You can view files and clone it, but cannot push or open issues or pull requests.
CMS/5.MISC/usb读取数据画图/Pyusb.py
2024-11-19 17:19:21 +08:00

101 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import usb.core
import struct
import usb.util
import time
import array
import numpy as np
import FFT
simpling_rate = 40000
fft_size = 12800
VID = 0x0483
PID = 0x5740
# VID = 0xABAB
# PID = 0xCDCD
def Find_usb():
dev = usb.core.find(idVendor=VID, idProduct=PID)
if not dev:
print("Could not find USB :(")
exit(1)
else:
print("Yeeha! Found a usb device!")
dev.reset()
if dev.is_kernel_driver_active(0):
dev.detach_kernel_driver(0)
dev.set_configuration()
return dev
# 内部缓冲区凑齐80000个字节即20000个浮点数统一将byte转成浮点并构造为np.array进行后续运算
FFT_BUF_LEN = 40960
# 一次从USB读取1000暂定具体根据单片机调整同时也没考虑头尾、校验等多出字节个字节放到内部缓冲区
USB_BUF_LEN = 8224
# 一个字典缓存8个通道的数据格式为未转换的16进制KEY值可根据USB桢结构更换以方便存取处理
fft_buf = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: [], 8: []}
def Read(dev):
while True:
for k, v in fft_buf.items():
# 如果有通道的fft_buf超过了10000,那么就做一次算法我不太确定unpack()和np.array()的性能,或许可以考虑另开线程来处理)
if len(v) >= FFT_BUF_LEN:
data_array = array.array('f', v)
# 将数组保存到文件
np.savetxt(f"data_array{k}.txt", data_array)
# print(data_array)
freqs, abs_xfft = FFT.FFT(simpling_rate, fft_size, data_array)
np.savetxt(f"abs_xfft{k}.txt", abs_xfft)
# FFT.draw_figur(data_array, fft_size, freqs, abs_xfft, k )
print("通道号:", k)
# 清空当前通道对应的列表
fft_buf[k].clear()
# exit(1)
num = dev.read(0x81, USB_BUF_LEN, 2000) # 超时时间可能需要调整
for i in range(0, len(num), 1028):
num_list = num[i:i + 1028]
if num_list[0] == num_list[1] == num_list[2] == num_list[3]:
print("通道号:", num_list[0])
print("个数:", len(num_list))
# print("内容:", num_list[4])
DataProcess(num_list, num_list[0])
# else:
# print("帧结构错误")
# exit(1)
# 将接受到的数据进行整合处理
def DataProcess(num_list, j):
usb_buf = num_list[4: len(num_list)]
float_list = []
for i in range(0, len(usb_buf), 2):
# 将4个8位数据合并成一个32位整数
int_data = (usb_buf[i] << 8) | usb_buf[i + 1]
# print(int_data)
if int_data & 0x8000: # 最高位为1表示负数
int_data = -((int_data ^ 0xFFFF) + 1) # 取反加一得到负数
# print(int_data)
float_data = int_data * 5.0 / 32768.0
# print(float_data)
float_list.append(float_data)
# exit(1)
fft_buf[j].extend(float_list)
def Write(dev):
w = 0
while w == 0:
w = dev.write(0x01, "hello usb")
print(w)