python - Telnetlib Python3 中的类型错误 : 'in <string>' requires string as left operand, 不是字节

标签 python python-3.x

我在 python 3 中使用 telenetlib 时遇到问题。这是我的代码:

import time
import os

originalTime=0
state=0

def cleanString(inText):
    inText = inText.rstrip()
    params, inText = inText.split("=",1)
    inText = str(inText)
    return inText

def notify(title, text, sound):
    os.system("""osascript -e 'display notification "{}" with title "{}" sound name "{}"'
              """.format(text, title, sound))

def shutdown():
    os.system("""osascript -e 'tell application "System Events"
    shut down
    end tell'
    """)



dataFile = open("NUT.conf", "r")
login = cleanString(dataFile.readline())
password = cleanString(dataFile.readline())
host = cleanString(dataFile.readline())
port = cleanString(dataFile.readline())
upsName = cleanString(dataFile.readline())
shutdownTime = cleanString(dataFile.readline())

shutdownTime = shutdownTime*60,000

print(login)
print(password)
print(host)
print(port)
print(upsName)
print(shutdownTime)

ups_handler=PyNUT.PyNUTClient(host=host, port=port, login=login, password=password)
upsInfo = ups_handler.GetUPSVars(upsName)

while True:
    upsInfo = ups_handler.GetUPSVars(upsName)

    if upsInfo['ups.status']=='OB DISCHRG' and state!=1:
        originalTime = time.time()
        state=1

    elif upsInfo['ups.status']=='OB DISCHRG' :
        print("PANIC")
        if(int(originalTime)+int(  shutdownTime))==time.time():
            notify("UPS Alert", "Detected UPS power loss. Shutting down NAS in "+shutdownTime+" minute(s).", "Funk")
            #TODO shutdown computer

    if upsInfo['ups.status']=='OL':
        state=0

这是 pyNUT 库:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

#   Copyright (C) 2008 David Goncalves <david@lestat.st>
#
#   This program is free software: you can redistribute it and/or modify
#   it under the terms of the GNU General Public License as published by
#   the Free Software Foundation; either version 3 of the License, or
#   (at your option) any later version.
#
#   This program is distributed in the hope that it will be useful,
#   but WITHOUT ANY WARRANTY; without even the implied warranty of
#   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#   GNU General Public License for more details.
#
#   You should have received a copy of the GNU General Public License
#   along with this program.  If not, see <http://www.gnu.org/licenses/>.

# 2008-01-14 David Goncalves
#            PyNUT is an abstraction class to access NUT (Network UPS Tools) server.
#
# 2008-06-09 David Goncalves
#            Added 'GetRWVars' and 'SetRWVar' commands.
#
# 2009-02-19 David Goncalves
#            Changed class PyNUT to PyNUTClient
#
# 2010-07-23 David Goncalves - Version 1.2
#            Changed GetRWVars function that fails is the UPS is not
#            providing such vars.
#
# 2011-07-05 René Martín Rodríguez <rmrodri@ull.es> - Version 1.2.1
#            Added support for FSD, HELP and VER commands
#
# 2012-02-07 René Martín Rodríguez <rmrodri@ull.es> - Version 1.2.2
#            Added support for LIST CLIENTS command
#
# 2014-06-03 george2 - Version 1.3.0
#            Added custom exception class, fixed minor bug, added Python 3 support.
#

import telnetlib


class PyNUTError(Exception):
    """ Base class for custom exceptions """


class PyNUTClient:
    """ Abstraction class to access NUT (Network UPS Tools) server """

    __debug       = None   # Set class to debug mode (prints everything useful for debugging...)
    __host        = None
    __port        = None
    __login       = None
    __password    = None
    __timeout     = None
    __srv_handler = None

    __version     = "1.3.0"
    __release     = "2014-06-03"

    def __init__( self, host="127.0.0.1", port=3493, login=None, password=None, debug=False, timeout=5 ) :
        """ Class initialization method

host     : Host to connect (default to localhost)
port     : Port where NUT listens for connections (default to 3493)
login    : Login used to connect to NUT server (default to None for no authentication)
password : Password used when using authentication (default to None)
debug    : Boolean, put class in debug mode (prints everything on console, default to False)
timeout  : Timeout used to wait for network response
        """
        self.__debug = debug

        if self.__debug :
            print( "[DEBUG] Class initialization..." )
            print( "[DEBUG]  -> Host  = %s (port %s)" % ( host, port ) )
            print( "[DEBUG]  -> Login = '%s' / '%s'" % ( login, password ) )

        self.__host     = host
        self.__port     = port
        self.__login    = login
        self.__password = password
        self.__timeout  = 5

        self.__connect()

    # Try to disconnect cleanly when class is deleted ;)
    def __del__( self ) :
        """ Class destructor method """
        try :
            self.__srv_handler.write( "LOGOUT\n" )
        except :
            pass

    def __connect( self ) :
        """ Connects to the defined server

If login/pass was specified, the class tries to authenticate. An error is raised
if something goes wrong.
        """
        if self.__debug :
            print( "[DEBUG] Connecting to host" )

        self.__srv_handler = telnetlib.Telnet( self.__host, self.__port )

        if self.__login != None :
            self.__srv_handler.write( "USERNAME %s\n" % self.__login )
            result = self.__srv_handler.read_until( "\n", self.__timeout )
            if result[:2] != "OK" :
                raise PyNUTError( result.replace( "\n", "" ) )

        if self.__password != None :
            self.__srv_handler.write( "PASSWORD %s\n" % self.__password )
            result = self.__srv_handler.read_until( "\n", self.__timeout )
            if result[:2] != "OK" :
                raise PyNUTError( result.replace( "\n", "" ) )

    def GetUPSList( self ) :
        """ Returns the list of available UPS from the NUT server

The result is a dictionary containing 'key->val' pairs of 'UPSName' and 'UPS Description'
        """
        if self.__debug :
            print( "[DEBUG] GetUPSList from server" )

        self.__srv_handler.write( "LIST UPS\n" )
        result = self.__srv_handler.read_until( "\n" )
        if result != "BEGIN LIST UPS\n" :
            raise PyNUTError( result.replace( "\n", "" ) )

        result = self.__srv_handler.read_until( "END LIST UPS\n" )
        ups_list = {}

        for line in result.split( "\n" ) :
            if line[:3] == "UPS" :
                ups, desc = line[4:-1].split( '"' )
                ups_list[ ups.replace( " ", "" ) ] = desc

        return( ups_list )

    def GetUPSVars( self, ups="" ) :
        """ Get all available vars from the specified UPS

The result is a dictionary containing 'key->val' pairs of all
available vars.
        """
        if self.__debug :
            print( "[DEBUG] GetUPSVars called..." )

        self.__srv_handler.write( "LIST VAR %s\n" % ups )
        result = self.__srv_handler.read_until( "\n" )
        if result != "BEGIN LIST VAR %s\n" % ups :
            raise PyNUTError( result.replace( "\n", "" ) )

        ups_vars   = {}
        result     = self.__srv_handler.read_until( "END LIST VAR %s\n" % ups )
        offset     = len( "VAR %s " % ups )
        end_offset = 0 - ( len( "END LIST VAR %s\n" % ups ) + 1 )

        for current in result[:end_offset].split( "\n" ) :
            var  = current[ offset: ].split( '"' )[0].replace( " ", "" )
            data = current[ offset: ].split( '"' )[1]
            ups_vars[ var ] = data

        return( ups_vars )

    def GetUPSCommands( self, ups="" ) :
        """ Get all available commands for the specified UPS

The result is a dict object with command name as key and a description
of the command as value
        """
        if self.__debug :
            print( "[DEBUG] GetUPSCommands called..." )

        self.__srv_handler.write( "LIST CMD %s\n" % ups )
        result = self.__srv_handler.read_until( "\n" )
        if result != "BEGIN LIST CMD %s\n" % ups :
            raise PyNUTError( result.replace( "\n", "" ) )

        ups_cmds   = {}
        result     = self.__srv_handler.read_until( "END LIST CMD %s\n" % ups )
        offset     = len( "CMD %s " % ups )
        end_offset = 0 - ( len( "END LIST CMD %s\n" % ups ) + 1 )

        for current in result[:end_offset].split( "\n" ) :
            var  = current[ offset: ].split( '"' )[0].replace( " ", "" )

            # For each var we try to get the available description
            try :
                self.__srv_handler.write( "GET CMDDESC %s %s\n" % ( ups, var ) )
                temp = self.__srv_handler.read_until( "\n" )
                if temp[:7] != "CMDDESC" :
                    raise PyNUTError
                else :
                    off  = len( "CMDDESC %s %s " % ( ups, var ) )
                    desc = temp[off:-1].split('"')[1]
            except :
                desc = var

            ups_cmds[ var ] = desc

        return( ups_cmds )

    def GetRWVars( self,  ups="" ) :
        """ Get a list of all writable vars from the selected UPS

The result is presented as a dictionary containing 'key->val' pairs
        """
        if self.__debug :
            print( "[DEBUG] GetUPSVars from '%s'..." % ups )

        self.__srv_handler.write( "LIST RW %s\n" % ups )
        result = self.__srv_handler.read_until( "\n" )
        if ( result != "BEGIN LIST RW %s\n" % ups ) :
            raise PyNUTError( result.replace( "\n",  "" ) )

        result     = self.__srv_handler.read_until( "END LIST RW %s\n" % ups )
        offset     = len( "VAR %s" % ups )
        end_offset = 0 - ( len( "END LIST RW %s\n" % ups ) + 1 )
        rw_vars    = {}

        try :
            for current in result[:end_offset].split( "\n" ) :
                var  = current[ offset: ].split( '"' )[0].replace( " ", "" )
                data = current[ offset: ].split( '"' )[1]
                rw_vars[ var ] = data

        except :
            pass

        return( rw_vars )

    def SetRWVar( self, ups="", var="", value="" ):
        """ Set a variable to the specified value on selected UPS

The variable must be a writable value (cf GetRWVars) and you must have the proper
rights to set it (maybe login/password).
        """

        self.__srv_handler.write( "SET VAR %s %s %s\n" % ( ups, var, value ) )
        result = self.__srv_handler.read_until( "\n" )
        if ( result == "OK\n" ) :
            return( "OK" )
        else :
            raise PyNUTError( result )

    def RunUPSCommand( self, ups="", command="" ) :
        """ Send a command to the specified UPS

Returns OK on success or raises an error
        """

        if self.__debug :
            print( "[DEBUG] RunUPSCommand called..." )

        self.__srv_handler.write( "INSTCMD %s %s\n" % ( ups, command ) )
        result = self.__srv_handler.read_until( "\n" )
        if ( result == "OK\n" ) :
            return( "OK" )
        else :
            raise PyNUTError( result.replace( "\n", "" ) )

    def FSD( self, ups="") :
        """ Send FSD command

Returns OK on success or raises an error
        """

        if self.__debug :
            print( "[DEBUG] MASTER called..." )

        self.__srv_handler.write( "MASTER %s\n" % ups )
        result = self.__srv_handler.read_until( "\n" )
        if ( result != "OK MASTER-GRANTED\n" ) :
            raise PyNUTError( ( "Master level function are not available", "" ) )

        if self.__debug :
            print( "[DEBUG] FSD called..." )
        self.__srv_handler.write( "FSD %s\n" % ups )
        result = self.__srv_handler.read_until( "\n" )
        if ( result == "OK FSD-SET\n" ) :
            return( "OK" )
        else :
            raise PyNUTError( result.replace( "\n", "" ) )

    def help(self) :
        """ Send HELP command
        """

        if self.__debug :
            print( "[DEBUG] HELP called..." )

        self.__srv_handler.write( "HELP\n")
        return self.__srv_handler.read_until( "\n" )

    def ver(self) :
        """ Send VER command
        """

        if self.__debug :
            print( "[DEBUG] VER called..." )

        self.__srv_handler.write( "VER\n")
        return self.__srv_handler.read_until( "\n" )

    def ListClients( self, ups = None ) :
        """ Returns the list of connected clients from the NUT server

The result is a dictionary containing 'key->val' pairs of 'UPSName' and a list of clients
        """
        if self.__debug :
            print( "[DEBUG] ListClients from server" )

        if ups and (ups not in self.GetUPSList()):
            raise PyNUTError( "%s is not a valid UPS" % ups )

        if ups:
            self.__srv_handler.write( "LIST CLIENTS %s\n" % ups)
        else:
            self.__srv_handler.write( "LIST CLIENTS\n" )
        result = self.__srv_handler.read_until( "\n" )
        if result != "BEGIN LIST CLIENTS\n" :
            raise PyNUTError( result.replace( "\n", "" ) )

        result = self.__srv_handler.read_until( "END LIST CLIENTS\n" )
        ups_list = {}

        for line in result.split( "\n" ):
            if line[:6] == "CLIENT" :
                host, ups = line[7:].split(' ')
                ups.replace(' ', '')
                if not ups in ups_list:
                    ups_list[ups] = []
                ups_list[ups].append(host)

        return( ups_list )

它向我抛出了这个错误:

Traceback (most recent call last):
  File "/Users/gabrielcsizmadia/Documents/GitHub/nut-shutdown-app/main.py", line 43, in <module>
    ups_handler=PyNUT.PyNUTClient(host=host, port=port, login=login, password=password)
  File "/Users/gabrielcsizmadia/Documents/GitHub/nut-shutdown-app/PyNUT.py", line 86, in __init__
    self.__connect()
  File "/Users/gabrielcsizmadia/Documents/GitHub/nut-shutdown-app/PyNUT.py", line 108, in __connect
    self.__srv_handler.write( "USERNAME %s\n" % self.__login )
  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/telnetlib.py", line 287, in write
    if IAC in buffer:
TypeError: 'in <string>' requires string as left operand, not bytes
      File "/Users/gabrielcsizmadia/Desktop/Homelab/NUT Shutdown/MacOS Shutdown/PyNUT.py", line 108, in __connect
        self.__srv_handler.write( "USERNAME %s\n" % self.__login )
      File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/telnetlib.py", line 287, in write
        if IAC in buffer:
    TypeError: 'in <string>' requires string as left operand, not bytes

不知道该去哪里...... 任何帮助将不胜感激!

编辑:我已经更新了错误,我在原始代码中发布了错误的错误...我还在代码中添加了更多内容。

我还注意到它在 Python 2.7 中运行良好,但在 Python 3 中则不然。这可能会对某人有所帮助。

最佳答案

问题在于 pyNUT 是为 Python 2 编写的。 telnetlib期望其大部分输入为字节。在 Python 2 中,str 类型是字节串,但在 Python 3 中它是 unicode。

pyNUT 代码使用字符串文字在内部格式化字符串,如下所示:

self.__srv_handler.write( "USERNAME %s\n" % self.__login ) 

所以传递字节不会有帮助:

>>> 'Hello %s' % 'world'.encode('ascii')
"Hello b'world'"

需要修改 pyNUT 以将字节传递给 telnetlib。例如

self.__srv_handler.write( b"USERNAME %s\n" % self.__login.encode('ascii') ) 

关于python - Telnetlib Python3 中的类型错误 : 'in <string>' requires string as left operand, 不是字节,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55206739/

相关文章:

python - ubuntu 上笔记本电脑的电源状态

Python:将 1,000,000 个整数写入文件

python - Jetson Xavier NX 不可读 pyserial (usb)

python - 由于 ModuleNotFoundError : No module named 'wsgi' ,Flask 无法使用 Docker 启动服务器

python - 除了 ValueError 不触发

python - 计数输出

python - 在 Python 中绘制曲线

django - 如何使用 ChannelNameRouter 在 Worker 和 Websocket(Django 和 Channels2.x)之间进行通信?

python - 读取 .txt 文件时出现无效 JSON 格式错误

python-3.x - Python3 + Tweepy流式传输错误