101 lines
3.1 KiB
Python
101 lines
3.1 KiB
Python
import usb.core
|
||
import struct
|
||
import usb.util
|
||
import time
|
||
import array
|
||
import numpy as np
|
||
import FFT
|
||
|
||
simpling_rate = 40000
|
||
fft_size = 12800
|
||
|
||
VID = 0x0483
|
||
PID = 0x5740
|
||
|
||
|
||
# VID = 0xABAB
|
||
# PID = 0xCDCD
|
||
|
||
|
||
def Find_usb():
|
||
dev = usb.core.find(idVendor=VID, idProduct=PID)
|
||
|
||
if not dev:
|
||
print("Could not find USB :(")
|
||
exit(1)
|
||
else:
|
||
print("Yeeha! Found a usb device!")
|
||
|
||
dev.reset()
|
||
|
||
if dev.is_kernel_driver_active(0):
|
||
dev.detach_kernel_driver(0)
|
||
|
||
dev.set_configuration()
|
||
|
||
return dev
|
||
|
||
|
||
# 内部缓冲区凑齐80000个字节(即20000个浮点数)后,统一将byte转成浮点,并构造为np.array进行后续运算
|
||
FFT_BUF_LEN = 40960
|
||
|
||
# 一次从USB读取1000(暂定,具体根据单片机调整,同时也没考虑头尾、校验等多出字节)个字节,放到内部缓冲区
|
||
USB_BUF_LEN = 8224
|
||
|
||
# 一个字典,缓存8个通道的数据,格式为未转换的16进制,KEY值可根据USB桢结构更换,以方便存取处理
|
||
fft_buf = {1: [], 2: [], 3: [], 4: [], 5: [], 6: [], 7: [], 8: []}
|
||
|
||
|
||
def Read(dev):
|
||
while True:
|
||
for k, v in fft_buf.items():
|
||
# 如果有通道的fft_buf超过了10000,那么就做一次算法(我不太确定unpack()和np.array()的性能,或许可以考虑另开线程来处理)
|
||
if len(v) >= FFT_BUF_LEN:
|
||
data_array = array.array('f', v)
|
||
# 将数组保存到文件
|
||
np.savetxt(f"data_array{k}.txt", data_array)
|
||
# print(data_array)
|
||
freqs, abs_xfft = FFT.FFT(simpling_rate, fft_size, data_array)
|
||
np.savetxt(f"abs_xfft{k}.txt", abs_xfft)
|
||
# FFT.draw_figur(data_array, fft_size, freqs, abs_xfft, k )
|
||
print("通道号:", k)
|
||
# 清空当前通道对应的列表
|
||
fft_buf[k].clear()
|
||
# exit(1)
|
||
num = dev.read(0x81, USB_BUF_LEN, 2000) # 超时时间可能需要调整
|
||
for i in range(0, len(num), 1028):
|
||
num_list = num[i:i + 1028]
|
||
if num_list[0] == num_list[1] == num_list[2] == num_list[3]:
|
||
print("通道号:", num_list[0])
|
||
print("个数:", len(num_list))
|
||
# print("内容:", num_list[4])
|
||
DataProcess(num_list, num_list[0])
|
||
# else:
|
||
# print("帧结构错误")
|
||
# exit(1)
|
||
|
||
|
||
# 将接受到的数据进行整合处理
|
||
def DataProcess(num_list, j):
|
||
usb_buf = num_list[4: len(num_list)]
|
||
float_list = []
|
||
for i in range(0, len(usb_buf), 2):
|
||
# 将4个8位数据合并成一个32位整数
|
||
int_data = (usb_buf[i] << 8) | usb_buf[i + 1]
|
||
# print(int_data)
|
||
if int_data & 0x8000: # 最高位为1,表示负数
|
||
int_data = -((int_data ^ 0xFFFF) + 1) # 取反加一得到负数
|
||
# print(int_data)
|
||
float_data = int_data * 5.0 / 32768.0
|
||
# print(float_data)
|
||
float_list.append(float_data)
|
||
# exit(1)
|
||
fft_buf[j].extend(float_list)
|
||
|
||
|
||
def Write(dev):
|
||
w = 0
|
||
while w == 0:
|
||
w = dev.write(0x01, "hello usb")
|
||
print(w)
|