This repository has been archived on 2025-04-28. You can view files and clone it, but cannot push or open issues or pull requests.
CMS/5.MISC/CMS程序替换教程/gateway/main.py
2024-11-19 17:19:21 +08:00

105 lines
3.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import daemon
import time
import logging
import logging.handlers
import multiprocessing
from multiprocessing import Event
from src.usb import Usb
from src.mqtt import Mqtt
from src.payload import USBdata
from src.config import Config
import signal
from daemon.pidfile import PIDLockFile
class App():
def __init__(self):
# 加载配置文件
self.cfg = Config()
def run(self):
signal.signal(signal.SIGINT, self.exit)
signal.signal(signal.SIGTERM, self.exit)
# signal.signal(signal.SIGKILL, self.exit)
signal.signal(signal.SIGHUP, self.exit)
# 初始化日志
log = logging.getLogger('gateway')
log.setLevel(logging.DEBUG) # 实际运行一般用info
# 创建一个StreamHandler对象用于将日志输出到终端上
console = logging.StreamHandler()
console.setLevel(logging.DEBUG) # 终端上只显示DEBUG及以上级别的日志
# 创建一个RotatingFileHandler对象用于将日志写入到指定路径的文件中
fh = logging.handlers.RotatingFileHandler(self.cfg.Sys.logpath, maxBytes=1000000,
backupCount=1) # 滚动式日志文件位置、大小、数量部署时按需修改
fh.setLevel(logging.DEBUG)
# 为两个Handler对象设置日志记录格式
formatter = logging.Formatter(u'%(asctime)s [%(levelname)s] %(message)s') # 日志格式
console.setFormatter(formatter)
fh.setFormatter(formatter)
# 将console和fh两个Handler对象添加到Logger对象log中
log.addHandler(console)
log.addHandler(fh)
# 公共队列
q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue(maxsize=50)
# 公共事件
self.event = Event()
# 处理线程包括参数计算和MQTT发送
mqtt = Mqtt(self.cfg.Mqtt, log)
err = mqtt.Connect()
# TODO: mqtt连不上怎么办
# 暂时考虑连不上等待10秒继续连接
while err != 0:
log.error("正在重连...")
err = mqtt.Connect()
time.sleep(10)
USBdata_thread = multiprocessing.Process(target=USBdata, args=(q1, q2, log))
mqtt_thread = multiprocessing.Process(target=mqtt.Start, args=(q2, self.event, self.cfg.Mqtt))
# USB线程
usb = Usb(self.cfg.Usb, log)
err = usb.Connect()
if err != 0:
exit(err)
USBGetInfo = multiprocessing.Process(target=usb.WriteDataWork, args=(q1, self.event))
USBKeepAliveWork = multiprocessing.Process(target=usb.KeepAliveWork)
USBGetInfo.start()
USBKeepAliveWork.start()
mqtt_thread.start()
USBdata_thread.start()
USBGetInfo.join()
USBKeepAliveWork.join()
mqtt_thread.join()
USBdata_thread.join()
log.info("program exit!")
usb.Disconnect()
mqtt.Disconnect()
def exit(self, signum, frame):
self.event.set()
app = App()
pidfile = PIDLockFile(app.cfg.Sys.pidpath)
# 判断pid文件是否存在或锁定
if pidfile.is_locked():
pidfile.break_lock()
# with daemon.DaemonContext(pidfile=pidfile):
app.run()