python - MQTT 连接在 Screen session 中不起作用

标签 python python-3.x google-cloud-platform raspberry-pi mqtt

我有一个简单的传感器记录器,它连接到 Google Cloud Project,然后每分钟发送一条消息,其中包含更新的传感器读数。

该脚本在直接运行时运行良好,但在屏幕 session 中运行时无法连接。

我使用/etc/rc.local在启动时加载shell启动脚本:

cd /home/pi/Desktop/Lily/
su pi -c "./lily_startup.sh" &
exit 0

lily_startup.sh:

#!/usr/bin/env bash

echo Waiting 10 seconds before launching Lily Software.
sleep 10

echo Starting lily_light.py
screen -D -RR "lily_light" -X quit || true
screen -S "lily_light" -d -m
screen -r "lily_light" -X stuff $'cd /home/pi/Desktop/Lily/\n'
screen -r "lily_light" -X stuff $'python3 lily_light.py\n'

echo Starting lily_telemetry.py
screen -D -RR "lily_telemetry" -X quit || true
screen -S "lily_telemetry" -d -m
screen -r "lily_telemetry" -X stuff $'cd /home/pi/Desktop/Lily/\n'
screen -r "lily_telemetry" -X stuff $'python3 lily_telemetry.py --project_id=$project --registry_id=$registry --device_id=$device --private_key_file=rsa_private.pem --algorithm=RS256 --ca_certs=roots.pem\n'

两个屏幕 session 都会启动,我可以附加到两个屏幕 session 。

lily_light.py 文件只是根据时间打开和关闭灯,并且加载完全正常并按预期运行。它不通过 MQTT 连接。

记录到 Google Cloud Project Pub/Sub 的 lily_telemetry.py 导致了问题。

lily_telemetry.py:

#!/usr/bin/env python3

import os
import sys
import psutil
import logging

import argparse
import json
import datetime
import time
import ssl
import subprocess

import board
import busio
import adafruit_mcp9808
import adafruit_tsl2591
import digitalio

import jwt
import paho.mqtt.client as mqtt
from socket import gaierror

# Update and publish temperature readings at a rate of sensor_delay per second.
sensor_delay=60

# Initialise I2C Master Bus
i2c_bus = busio.I2C(board.SCL, board.SDA)

# Set temperature sensor inputs MCP9808 board
mcp1 = adafruit_mcp9808.MCP9808(i2c_bus)
mcp2 = adafruit_mcp9808.MCP9808(i2c_bus,0x1C)

# Set light sensor inputs TSL2591 board
tsl = adafruit_tsl2591.TSL2591(i2c_bus)

def restartlily():
  """Restarts the current program, with file objects and descriptors cleanup."""
  print('')
  print(sys.executable, ['python3'] + sys.argv)
  os.execv(sys.executable, ['python3'] + sys.argv)
  return

def create_jwt(project_id, private_key_file, algorithm):
  """Create a JWT (https://jwt.io) to establish an MQTT connection."""
  token = {
      'iat': datetime.datetime.utcnow(),
      'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=60),
      'aud': project_id
  }
  with open(private_key_file, 'r') as f:
    private_key = f.read()
  print('Creating JWT using {} from private key file {}' .format(algorithm, private_key_file))
  return jwt.encode(token, private_key, algorithm=algorithm)

def error_str(rc):
  """Convert a Paho error to a human readable string."""
  return '{}: {}'.format(rc, mqtt.error_string(rc))

class Device(object):
  """Represents the state of a single device."""
  def __init__(self):
    self.temperature = 0
    self.connected = False

  def update_sensor_data(self):
    # CPU Temp
    rawdata = subprocess.check_output(["sudo", "/opt/vc/bin/vcgencmd", "measure_temp"]).decode().split('=', 1)[-1].rstrip()
    sendtemp = '"temp":"{}"' .format(rawdata.replace("'", "-").split("-", 1)[0])
    self.temperature = sendtemp
    # Temperature Sensor 1 Data
    self.sensortemperature_1 = '"sensor_temp_1":"{}"' .format(mcp1.temperature)
    # Temperature Sensor 2 Data
    self.sensortemperature_2 = '"sensor_temp_2":"{}"' .format(mcp2.temperature)
    # Light Sensor Data
    lux = tsl.lux
    infrared = tsl.infrared
    visible = tsl.visible
    full_spectrum = tsl.full_spectrum
    self.lightsensor_lux = '"light_sensor_lux":"{}"' .format(round(lux,4))
    self.lightsensor_infrared = '"light_sensor_infrared":"{}"' .format(infrared)
    self.lightsensor_visible = '"light_sensor_visible":"{}"' .format(visible)
    self.lightsensor_full_spectrum = '"light_sensor_full_spectrum":"{}"' .format(full_spectrum)

  def wait_for_connection(self, timeout):
    """Wait for the device to become connected."""
    total_time = 0
    while not self.connected and total_time < timeout:
      time.sleep(1)
      total_time += 1

    if not self.connected:
      raise RuntimeError('Could not connect to MQTT bridge.')

  def on_connect(self, unused_client, unused_userdata, unused_flags, rc):
    """Callback for when a device connects."""
    print('Status:', error_str(rc))
    self.connected = True

  def on_disconnect(self, unused_client, unused_userdata, rc):
    """Callback for when a device disconnects."""
    print('Disconnected:', error_str(rc))
    print('Waiting {} seconds before restarting Main'.format(sensor_delay))
    self.connected = False

  def on_publish(self, unused_client, unused_userdata, unused_mid):
    """Callback when the device receives a PUBACK from the MQTT bridge."""
    print('Published Payload.')
    print("Waiting {} seconds before next Payload." .format(sensor_delay))

  def on_subscribe(self, unused_client, unused_userdata, unused_mid,
                   granted_qos):
    """Callback when the device receives a SUBACK from the MQTT bridge."""
    print('Subscribed: ', granted_qos)
    if granted_qos[0] == 128:
      print('Subscription failed.')

def parse_command_line_args():
  """Parse command line arguments."""
  parser = argparse.ArgumentParser(
      description='Lily Grow Logging using MQTT')
  parser.add_argument(
      '--project_id', required=True, help='GCP cloud project name')
  parser.add_argument(
      '--registry_id', required=True, help='Cloud IoT registry id')
  parser.add_argument('--device_id', required=True, help='Cloud IoT device id')
  parser.add_argument(
      '--private_key_file', required=True, help='Path to private key file.')
  parser.add_argument(
      '--algorithm',
      choices=('RS256', 'ES256'),
      required=True,
      help='Which encryption algorithm to use to generate the JWT.')
  parser.add_argument(
      '--cloud_region', default='us-central1', help='GCP cloud region')
  parser.add_argument(
      '--ca_certs',
      default='roots.pem',
      help='CA root certificate. Get from https://pki.google.com/roots.pem')
  parser.add_argument(
      '--num_messages',
      type=int,
      default=60,
      help='Number of messages to publish.')
  parser.add_argument(
      '--mqtt_bridge_hostname',
      default='mqtt.googleapis.com',
      help='MQTT bridge hostname.')
  parser.add_argument(
      '--mqtt_bridge_port', default=8883, help='MQTT bridge port.')

  return parser.parse_args()

def main():
  args = parse_command_line_args()

  # Create our MQTT client and connect to Cloud IoT.
  client = mqtt.Client(
      client_id='projects/{}/locations/{}/registries/{}/devices/{}'.format(
          args.project_id, args.cloud_region, args.registry_id, args.device_id))
  client.username_pw_set(
      username='unused',
      password=create_jwt(args.project_id, args.private_key_file,
                          args.algorithm))
  client.tls_set(ca_certs=args.ca_certs, tls_version=ssl.PROTOCOL_TLSv1_2)

  device = Device()

  client.on_connect = device.on_connect
  client.on_publish = device.on_publish
  client.on_disconnect = device.on_disconnect
  client.on_subscribe = device.on_subscribe

  try:
        client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port)
  except gaierror as e:
        print('Gaierror {}.'.format(e))
        print('Waiting {} seconds before restarting Main'.format(sensor_delay))
        time.sleep(sensor_delay)
        restartlily()

  client.loop_start()

  # This is the topic that the device will publish telemetry events (temperature
  # data) to.
  mqtt_telemetry_topic = '/devices/{}/events'.format(args.device_id)

  # Wait up to 10 seconds for the device to connect.
  device.wait_for_connection(10)

  # Update and publish temperature readings at a rate of sensor_delay per second.
  for _ in range(args.num_messages):
    device.update_sensor_data()
    # Time
    sendtime = '"datetime":"{}"'.format(datetime.datetime.now().strftime("%Y-%m-%d"" ""%H:%M:%S.%f"))  
    # Payload
    payload = '{{''{},{},{},{},{},{},{},{}''}}'.format(device.temperature, device.sensortemperature_1,device.sensortemperature_2,device.lightsensor_lux,device.lightsensor_infrared,device.lightsensor_visible,device.lightsensor_full_spectrum,sendtime)
    # Message
    print('')
    print('+-----**------+')
    print('')
    print('Payload: {}'.format(payload))
    print('')
    # Publish, Sleep
    client.publish(mqtt_telemetry_topic, payload, qos=1)
    time.sleep(sensor_delay)

  restartlily()
  client.disconnect()
  client.loop_stop()
  print('Loop Stopped.')
  print('Waiting {} seconds before restarting Main'.format(sensor_delay))
  time.sleep(sensor_delay)

if __name__ == '__main__':
  try:
    print('')
    print('Restarting with main() from 239')
    print('')
    main()
  except:
    print('')
    print('Restarting with restartlily() from 244')
    print('')
    restartlily()

生成的 MQTT 错误:

4: Connection refused – bad username or password
5: Connection refused – not authorised

在屏幕 session 中是否会更改/未正确使用任何用户凭据、证书路径?

预先感谢您提供任何建议、提示或想法。

基督教

最佳答案

通过使用export,我了解到我的shell变量没有在脚本之前加载。在命令行中查看加载的内容以及加载时间。

我通过删除调用 shell 变量( screen -r "lily_telemetry" -X stuff $'python3 lily_telemetry.py --project_id=$project --registry_id=$registry --device_id=$device --private_key_file=rsa_private.pem --algorithm=RS256 --ca_certs=roots.pem\n' 等)的命令行参数(如 $project 所示)解决了这个问题,而是使用 configparser在 python 代码中从 .ini 文件加载这些变量

关于python - MQTT 连接在 Screen session 中不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53604959/

相关文章:

python - 使用 Python 灵活环境在 virtualenv 中包含 Google Cloud SDK

python - 距离度量的组合优化

python - Pygame 侧滚轴街机

python-3.x - 在 Docker Alpine Python 中安装 gevent 时出错

mongodb - 为什么我不能直接分配 ListField 的值?

google-cloud-platform - 云任务条件执行

python - lmdb.BadRslotError : mdb_txn_begin: MDB_BAD_RSLOT: Invalid reuse of reader locktable slot?

python - 在 PyQt 中获取 QScrollArea 中视口(viewport)的大小

python-3.x - 为什么我的合并排序算法不起作用?

google-cloud-platform - OAuth 同意屏幕 - 删除应用程序 Logo 的能力