101 lines
3.1 KiB
Python
101 lines
3.1 KiB
Python
![]() |
import usb.core
|
|||
|
import struct
|
|||
|
import usb.util
|
|||
|
import time
|
|||
|
import array
|
|||
|
import numpy as np
|
|||
|
import FFT
|
|||
|
|
|||
|
simpling_rate = 40000
|
|||
|
fft_size = 12800
|
|||
|
|
|||
|
VID = 0x0483
|
|||
|
PID = 0x5740
|
|||
|
|
|||
|
|
|||
|
# VID = 0xABAB
|
|||
|
# PID = 0xCDCD
|
|||
|
|
|||
|
|
|||
|
def Find_usb():
|
|||
|
dev = usb.core.find(idVendor=VID, idProduct=PID)
|
|||
|
|
|||
|
if not dev:
|
|||
|
print("Could not find USB :(")
|
|||
|
exit(1)
|
|||
|
else:
|
|||
|
print("Yeeha! Found a usb device!")
|
|||
|
|
|||
|
dev.reset()
|
|||
|
|
|||
|
if dev.is_kernel_driver_active(0):
|
|||
|
dev.detach_kernel_driver(0)
|
|||
|
|
|||
|
dev.set_configuration()
|
|||
|
|
|||
|
return dev
|
|||
|
|
|||
|
|
|||
|
# 内部缓冲区凑齐80000个字节(即20000个浮点数)后,统一将byte转成浮点,并构造为np.array进行后续运算
|
|||
|
FFT_BUF_LEN = 40960
|
|||
|
|
|||
|
# 一次从USB读取1000(暂定,具体根据单片机调整,同时也没考虑头尾、校验等多出字节)个字节,放到内部缓冲区
|
|||
|
USB_BUF_LEN = 8224
|
|||
|
|
|||
|
# 一个字典,缓存8个通道的数据,格式为未转换的16进制,KEY值可根据USB桢结构更换,以方便存取处理
|
|||
|
fft_buf = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: [], 8: []}
|
|||
|
|
|||
|
|
|||
|
def Read(dev):
|
|||
|
while True:
|
|||
|
for k, v in fft_buf.items():
|
|||
|
# 如果有通道的fft_buf超过了10000,那么就做一次算法(我不太确定unpack()和np.array()的性能,或许可以考虑另开线程来处理)
|
|||
|
if len(v) >= FFT_BUF_LEN:
|
|||
|
data_array = array.array('f', v)
|
|||
|
# 将数组保存到文件
|
|||
|
np.savetxt(f"data_array{k}.txt", data_array)
|
|||
|
# print(data_array)
|
|||
|
freqs, abs_xfft = FFT.FFT(simpling_rate, fft_size, data_array)
|
|||
|
np.savetxt(f"abs_xfft{k}.txt", abs_xfft)
|
|||
|
# FFT.draw_figur(data_array, fft_size, freqs, abs_xfft, k )
|
|||
|
print("通道号:", k)
|
|||
|
# 清空当前通道对应的列表
|
|||
|
fft_buf[k].clear()
|
|||
|
# exit(1)
|
|||
|
num = dev.read(0x81, USB_BUF_LEN, 2000) # 超时时间可能需要调整
|
|||
|
for i in range(0, len(num), 1028):
|
|||
|
num_list = num[i:i + 1028]
|
|||
|
if num_list[0] == num_list[1] == num_list[2] == num_list[3]:
|
|||
|
print("通道号:", num_list[0])
|
|||
|
print("个数:", len(num_list))
|
|||
|
# print("内容:", num_list[4])
|
|||
|
DataProcess(num_list, num_list[0])
|
|||
|
# else:
|
|||
|
# print("帧结构错误")
|
|||
|
# exit(1)
|
|||
|
|
|||
|
|
|||
|
# 将接受到的数据进行整合处理
|
|||
|
def DataProcess(num_list, j):
|
|||
|
usb_buf = num_list[4: len(num_list)]
|
|||
|
float_list = []
|
|||
|
for i in range(0, len(usb_buf), 2):
|
|||
|
# 将4个8位数据合并成一个32位整数
|
|||
|
int_data = (usb_buf[i] << 8) | usb_buf[i + 1]
|
|||
|
# print(int_data)
|
|||
|
if int_data & 0x8000: # 最高位为1,表示负数
|
|||
|
int_data = -((int_data ^ 0xFFFF) + 1) # 取反加一得到负数
|
|||
|
# print(int_data)
|
|||
|
float_data = int_data * 5.0 / 32768.0
|
|||
|
# print(float_data)
|
|||
|
float_list.append(float_data)
|
|||
|
# exit(1)
|
|||
|
fft_buf[j].extend(float_list)
|
|||
|
|
|||
|
|
|||
|
def Write(dev):
|
|||
|
w = 0
|
|||
|
while w == 0:
|
|||
|
w = dev.write(0x01, "hello usb")
|
|||
|
print(w)
|