105 lines
3.2 KiB
Python
105 lines
3.2 KiB
Python
import daemon
|
||
import time
|
||
import logging
|
||
import logging.handlers
|
||
import multiprocessing
|
||
from multiprocessing import Event
|
||
from src.usb import Usb
|
||
from src.mqtt import Mqtt
|
||
from src.payload import USBdata
|
||
from src.config import Config
|
||
import signal
|
||
from daemon.pidfile import PIDLockFile
|
||
|
||
|
||
|
||
class App():
|
||
def __init__(self):
|
||
# 加载配置文件
|
||
self.cfg = Config()
|
||
|
||
def run(self):
|
||
|
||
signal.signal(signal.SIGINT, self.exit)
|
||
signal.signal(signal.SIGTERM, self.exit)
|
||
# signal.signal(signal.SIGKILL, self.exit)
|
||
signal.signal(signal.SIGHUP, self.exit)
|
||
|
||
# 初始化日志
|
||
log = logging.getLogger('gateway')
|
||
log.setLevel(logging.DEBUG) # 实际运行一般用info
|
||
|
||
# 创建一个StreamHandler对象,用于将日志输出到终端上
|
||
console = logging.StreamHandler()
|
||
console.setLevel(logging.DEBUG) # 终端上只显示DEBUG及以上级别的日志
|
||
|
||
# 创建一个RotatingFileHandler对象,用于将日志写入到指定路径的文件中
|
||
fh = logging.handlers.RotatingFileHandler(self.cfg.Sys.logpath, maxBytes=1000000,
|
||
backupCount=1) # 滚动式日志文件位置、大小、数量部署时按需修改
|
||
fh.setLevel(logging.DEBUG)
|
||
|
||
# 为两个Handler对象设置日志记录格式
|
||
formatter = logging.Formatter(u'%(asctime)s [%(levelname)s] %(message)s') # 日志格式
|
||
console.setFormatter(formatter)
|
||
fh.setFormatter(formatter)
|
||
|
||
# 将console和fh两个Handler对象添加到Logger对象log中
|
||
log.addHandler(console)
|
||
log.addHandler(fh)
|
||
|
||
# 公共队列
|
||
q1 = multiprocessing.Queue()
|
||
q2 = multiprocessing.Queue(maxsize=50)
|
||
# 公共事件
|
||
self.event = Event()
|
||
|
||
# 处理线程,包括参数计算和MQTT发送
|
||
mqtt = Mqtt(self.cfg.Mqtt, log)
|
||
err = mqtt.Connect()
|
||
# TODO: mqtt连不上怎么办?
|
||
# 暂时考虑,连不上等待10秒继续连接
|
||
while err != 0:
|
||
log.error("正在重连...")
|
||
err = mqtt.Connect()
|
||
time.sleep(10)
|
||
USBdata_thread = multiprocessing.Process(target=USBdata, args=(q1, q2, log))
|
||
mqtt_thread = multiprocessing.Process(target=mqtt.Start, args=(q2, self.event, self.cfg.Mqtt))
|
||
|
||
|
||
# USB线程
|
||
usb = Usb(self.cfg.Usb, log)
|
||
err = usb.Connect()
|
||
if err != 0:
|
||
exit(err)
|
||
|
||
USBGetInfo = multiprocessing.Process(target=usb.WriteDataWork, args=(q1, self.event))
|
||
USBKeepAliveWork = multiprocessing.Process(target=usb.KeepAliveWork)
|
||
USBGetInfo.start()
|
||
USBKeepAliveWork.start()
|
||
mqtt_thread.start()
|
||
USBdata_thread.start()
|
||
|
||
USBGetInfo.join()
|
||
USBKeepAliveWork.join()
|
||
mqtt_thread.join()
|
||
USBdata_thread.join()
|
||
|
||
log.info("program exit!")
|
||
usb.Disconnect()
|
||
mqtt.Disconnect()
|
||
|
||
def exit(self, signum, frame):
|
||
self.event.set()
|
||
|
||
|
||
app = App()
|
||
|
||
pidfile = PIDLockFile(app.cfg.Sys.pidpath)
|
||
|
||
# 判断pid文件是否存在或锁定
|
||
if pidfile.is_locked():
|
||
pidfile.break_lock()
|
||
|
||
# with daemon.DaemonContext(pidfile=pidfile):
|
||
app.run()
|