Files
docker-zabbix-server-pimped/tr064tool

269 lines
14 KiB
Plaintext
Raw Normal View History

2022-11-05 19:18:53 +01:00
#!/usr/bin/env python3
PK<00>e<EFBFBD>P<EFBFBD><50>6P<36><00>tr064queries.py#!/usr/bin/env python3
# coding=utf-8
import json
from urllib import request
from urllib.error import URLError
from xml.etree import ElementTree
from hashlib import md5
import ssl
BASE_URL = ''
USER = ''
PASSWORD = ''
def authentication(user: str, password: str, realm: str, nonce: str) -> str:
"""SOAP authentication"""
secret = md5(user.encode() + b':' + realm.encode() + b':' + password.encode()).hexdigest().lower()
response = md5(secret.encode() + b':' + nonce.encode()).hexdigest().lower()
return response
def soap(service: str, action: str, user: str = None, password: str = None, realm: str = None,
nonce: str = None) -> bytes:
auth = ''
if user is not None:
if password is None:
template = """<?xml version='1.0' encoding='utf-8'?>
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'>
<s:Header>
<h:InitChallenge xmlns:h="http://soap-authentication.org/digest/2001/10/" s:mustUnderstand="1">
<UserID>%(user)</UserID>
</h:InitChallenge >
</s:Header>
<s:Body>
<u:%(action) xmlns:u="%(service)" />
</s:Body>
</s:Envelope>"""
else:
auth = authentication(user, password, realm, nonce)
template = """<?xml version='1.0' encoding='utf-8'?>
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'>
<s:Header>
<h:ClientAuth xmlns:h="http://soap-authentication.org/digest/2001/10/" s:mustUnderstand="1">
<Nonce>%(nonce)</Nonce>
<Auth>%(auth)</Auth>
<UserID>%(user)</UserID>
<Realm>%(realm)</Realm>
</h:ClientAuth>
</s:Header>
<s:Body>
<u:%(action) xmlns:u="%(service)" />
</s:Body>
</s:Envelope>"""
else:
template = """<?xml version='1.0' encoding='utf-8'?>
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'>
<s:Body>
<u:%(action) xmlns:u="%(service)" />
</s:Body>
</s:Envelope>"""
user = user if user is not None else ''
nonce = nonce if nonce is not None else ''
realm = realm if realm is not None else ''
return template.replace('%(action)', action).replace('%(service)', service).replace('%(user)', user) \
.replace('%(nonce)', nonce).replace('%(auth)', auth).replace('%(realm)', realm).encode('utf8')
def setup(user, password, base_url):
global USER, PASSWORD, BASE_URL
USER = user
PASSWORD = password
BASE_URL = base_url
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy Python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
password_mgr = request.HTTPPasswordMgrWithDefaultRealm()
password_mgr.add_password(None, BASE_URL, USER, PASSWORD)
handler = request.HTTPDigestAuthHandler(password_mgr)
opener = request.build_opener(handler)
request.install_opener(opener)
def query(url, action, service, queries) -> int:
"""Main function"""
initial_request = request.Request(url=BASE_URL + url)
initial_request.add_header('Content-Type', 'text/xml; charset=UTF-8')
initial_request.add_header('SOAPAction', '"{0}#{1}"'.format(service, action))
initial_request.data = soap(service, action, USER, None, None)
initial_response = request.urlopen(url=initial_request)
initial_response_raw_data = initial_response.read()
initial_response_et = ElementTree.fromstring(initial_response_raw_data.decode('utf8'))
final_response_raw_data = ''
authentication_status = initial_response_et.find('./s:Header/h:Challenge/Status', {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'h': 'http://soap-authentication.org/digest/2001/10/'})
if authentication_status is not None and authentication_status.text == 'Unauthenticated':
nonce = initial_response_et.find('./s:Header/h:Challenge/Nonce', {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'h': 'http://soap-authentication.org/digest/2001/10/'}).text
realm = initial_response_et.find('./s:Header/h:Challenge/Realm', {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'h': 'http://soap-authentication.org/digest/2001/10/'}).text
final_request = request.Request(url=BASE_URL + url)
final_request.add_header('Content-Type', 'text/xml; charset=UTF-8')
final_request.add_header('SOAPAction', '"{0}#{1}"'.format(service, action))
final_request.data = soap(service, action, USER, PASSWORD, realm, nonce)
final_response = request.urlopen(url=final_request)
final_response_raw_data = final_response.read().decode('utf8')
else:
final_response_raw_data = initial_response_raw_data
# print(final_response.read())
final_response_et = ElementTree.fromstring(final_response_raw_data)
state = final_response_et.find('./s:Header/h:NextChallenge/Status',
{'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'h': 'http://soap-authentication.org/digest/2001/10/'})
if state is not None and state.text == 'Unauthenticated':
print('UNKNOWN - Authentication failed')
return None
result = dict()
for q in queries:
try:
value = final_response_et.find('./s:Body/u:' + action + 'Response/New' + q,
{'s': 'http://schemas.xmlsoap.org/soap/envelope/',
'u': service}).text
result[q] = value if value is not None else ''
except:
result[q] = None
pass
return json.dumps(result)
PK<00>e<EFBFBD>P<EFBFBD>0<EFBFBD><30>ZZ __main__.py#!/usr/bin/env python3
# coding=utf-8
"""TR-064 client"""
from sys import exit, argv
import tr064queries
MODES = {'DeviceInfo.GetInfo': {'url': '/upnp/control/deviceinfo', 'action': 'GetInfo',
'service': 'urn:dslforum-org:service:DeviceInfo:1',
'queries': ['ManufacturerName', 'ManufacturerOUI', 'ModelName',
'Description', 'ProductClass', 'SerialNumber',
'SoftwareVersion', 'HardwareVersion', 'SpecVersion',
'ProvisioningCode', 'UpTime', 'DeviceLog']},
'LANEthernetInterfaceConfig.GetStatistics': {'url': '/upnp/control/lanethernetifcfg', 'action': 'GetStatistics',
'service': 'urn:dslforum-org:service:LANEthernetInterfaceConfig:1',
'queries': ['BytesSent', 'BytesReceived', 'PacketsSent', 'PacketsReceived']},
'WANDSLLinkConfig.GetStatistics': {'url': '/upnp/control/wandsllinkconfig1', 'action': 'GetStatistics',
'service': 'urn:dslforum-org:service:WANDSLLinkConfig:1',
'queries': ['ATMTransmittedBlocks', 'ATMReceivedBlocks', 'AAL5CRCErrors',
'ATMCRCErrors']},
'WANDSLInterfaceConfig.GetInfo': {'url': '/upnp/control/wandslifconfig1', 'action': 'GetInfo',
'service': 'urn:dslforum-org:service:WANDSLInterfaceConfig:1',
'queries': ['UpstreamCurrRate', 'DownstreamCurrRate', 'UpstreamMaxRate',
'DownstreamMaxRate',
'UpstreamNoiseMargin', 'DownstreamNoiseMargin',
'UpstreamAttenuation', 'DownstreamAttenuation', 'DownstreamPower',
'UpstreamPower']},
'WANDSLInterfaceConfig.GetStatisticsTotal': {'url': '/upnp/control/wandslifconfig1', 'action': 'GetStatisticsTotal',
'service': 'urn:dslforum-org:service:WANDSLInterfaceConfig:1',
'queries': ['ReceiveBlocks', 'TransmitBlocks', 'CellDelin',
'LinkRetrain', 'InitErrors', 'InitTimeouts',
'LossOfFraming', 'ErroredSecs', 'SeverelyErroredSecs',
'FECErrors', 'ATUCFECErrors', 'HECErrors',
'ATUCHECErrors', 'ATUCCRCErrors', 'CRCErrors']},
'WANPPPConnection.GetInfo': {'url': '/upnp/control/wanpppconn1', 'action': 'GetInfo',
'service': 'urn:dslforum-org:service:WANPPPConnection:1',
'queries': ['Enable', 'ConnectionStatus', 'PossibleConnectionTypes', 'ConnectionType', 'Name',
'Uptime', 'DownstreamMaxBitRate', 'UpstreamMaxBitRate', 'LastConnectionError',
'IdleDisconnectTime', 'RSIPAvailable', 'UserName', 'NATEnabled', 'DNSServers',
'MACAddress', 'ConnectionTrigger', 'LastAuthErrorInfo', 'MaxCharsUsername',
'MinCharsUsername', 'AllowedCharsUsername', 'MaxCharsPassword', 'MinCharsPassword',
'AllowedCharsPassword', 'TransportType', 'RouteProtocolRx', 'PPPoEServiceName',
'RemoteIPAddress', 'PPPoEACName', 'DNSEnabled', 'DNSOverrideAllowed']},
'WANCommonInterfaceConfig.GetCommonLinkProperties': {'url': '/upnp/control/wancommonifconfig1', 'action': 'GetCommonLinkProperties',
'service': 'urn:dslforum-org:service:WANCommonInterfaceConfig:1',
'queries': ['WANAccessType', 'Layer1UpstreamMaxBitRate', 'Layer1DownstreamMaxBitRate', 'PhysicalLinkStatus']},
'WANCommonInterfaceConfig.GetTotalBytesSent': {'url': '/upnp/control/wancommonifconfig1',
'action': 'GetTotalBytesSent',
'service': 'urn:dslforum-org:service:WANCommonInterfaceConfig:1',
'queries': ['TotalBytesSent']},
'WANCommonInterfaceConfig.GetTotalBytesReceived': {'url': '/upnp/control/wancommonifconfig1',
'action': 'GetTotalBytesReceived',
'service': 'urn:dslforum-org:service:WANCommonInterfaceConfig:1',
'queries': ['TotalBytesReceived']},
'WANIPConnection.GetExternalIPAddress': {'url': '/upnp/control/wanipconnection1', 'action': 'GetExternalIPAddress',
'service': 'urn:dslforum-org:service:WANIPConnection:1',
'queries': ['ExternalIPAddress']},
'UserInterface.GetInfo': {'url': '/upnp/control/userif', 'action': 'GetInfo',
'service': 'urn:dslforum-org:service:UserInterface:1',
'queries': ['UpgradeAvailable', 'X_AVM-DE_Version', 'X_AVM-DE_DownloadURL', 'X_AVM-DE_InfoURL', 'X_AVM-DE_UpdateState', 'X_AVM-DE_LaborVersion']},
'UserInterface.X_AVM-DE_GetInfo': {'url': '/upnp/control/userif', 'action': 'X_AVM-DE_GetInfo',
'service': 'urn:dslforum-org:service:UserInterface:1',
'queries': ['X_AVM-DE_AutoUpdateMode', 'X_AVM-DE_UpdateTime', 'X_AVM-DE_LastFwVersion', 'X_AVM-DE_LastInfoUrl', 'X_AVM-DE_CurrentFwVersion', 'X_AVM-DE_UpdateSuccessful']},
'WANCommonInterfaceConfig.GetAddonInfos': {'url': '/igdupnp/control/WANCommonIFC1', 'action': 'GetAddonInfos',
'service': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
'queries': ['X_AVM_DE_TotalBytesSent64', 'X_AVM_DE_TotalBytesReceived64', 'X_AVM_DE_WANAccessType']},
# 'UserInterface.': {'url': '/upnp/control/userif', 'action': '',
# 'service': 'urn:dslforum-org:service:UserInterface:1',
# 'queries': ['']},
}
def show_help() -> None:
print('Usage:')
print(argv[0] + ' <Mode> <Base URL> <User> <Password>\n')
print('Where <Base URL> is something like https://192.168.178.1:49433,'
' http://192.168.178.1:49000 or https://remote-address:port/tr064')
print('and where <Mode> is one of')
for i in MODES.keys():
print(' ' + i)
if len(argv) != 5:
show_help()
exit(255)
if __name__ == '__main__':
mode_parameter = argv[1]
base_url_parameter = argv[2]
user_parameter = argv[3]
password_parameter = argv[4]
tr064queries.setup(user=user_parameter, password=password_parameter, base_url=base_url_parameter)
mode = MODES.get(mode_parameter)
if mode is None:
print('No such mode: ' + mode_parameter)
show_help()
exit(-1)
url = mode.get('url')
action = mode.get('action')
service = mode.get('service')
queries = mode.get('queries')
# try:
print(tr064queries.query(url, action, service, queries))
# except Exception as e:
# print('UNKNOWN - Exception occurred: ' + str(e))
# exit(-1)
PK<00>e<EFBFBD>P<EFBFBD><50>6P<36><00><00><>tr064queries.pyPK<00>e<EFBFBD>P<EFBFBD>0<EFBFBD><30>ZZ <00><><EFBFBD>__main__.pyPKvq5