#!/usr/bin/env python3
PK     eP6P       tr064queries.py#!/usr/bin/env python3
# coding=utf-8

import json
from urllib import request
from urllib.error import URLError
from xml.etree import ElementTree
from hashlib import md5
import ssl

BASE_URL = ''
USER = ''
PASSWORD = ''


def authentication(user: str, password: str, realm: str, nonce: str) -> str:
    """SOAP authentication"""

    secret = md5(user.encode() + b':' + realm.encode() + b':' + password.encode()).hexdigest().lower()
    response = md5(secret.encode() + b':' + nonce.encode()).hexdigest().lower()

    return response


def soap(service: str, action: str, user: str = None, password: str = None, realm: str = None,
         nonce: str = None) -> bytes:
    auth = ''
    if user is not None:
        if password is None:
            template = """<?xml version='1.0' encoding='utf-8'?>
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'>
<s:Header>
<h:InitChallenge xmlns:h="http://soap-authentication.org/digest/2001/10/" s:mustUnderstand="1">
<UserID>%(user)</UserID>
</h:InitChallenge >
</s:Header>
<s:Body>
<u:%(action) xmlns:u="%(service)" />
</s:Body>
</s:Envelope>"""
        else:
            auth = authentication(user, password, realm, nonce)

            template = """<?xml version='1.0' encoding='utf-8'?>
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'>
<s:Header>
<h:ClientAuth xmlns:h="http://soap-authentication.org/digest/2001/10/" s:mustUnderstand="1">
<Nonce>%(nonce)</Nonce>
<Auth>%(auth)</Auth>
<UserID>%(user)</UserID>
<Realm>%(realm)</Realm>
</h:ClientAuth>
</s:Header>
<s:Body>
<u:%(action) xmlns:u="%(service)" />
</s:Body>
</s:Envelope>"""
    else:
        template = """<?xml version='1.0' encoding='utf-8'?>
<s:Envelope s:encodingStyle='http://schemas.xmlsoap.org/soap/encoding/' xmlns:s='http://schemas.xmlsoap.org/soap/envelope/'>
<s:Body>
<u:%(action) xmlns:u="%(service)" />
</s:Body>
</s:Envelope>"""

    user = user if user is not None else ''
    nonce = nonce if nonce is not None else ''
    realm = realm if realm is not None else ''

    return template.replace('%(action)', action).replace('%(service)', service).replace('%(user)', user) \
        .replace('%(nonce)', nonce).replace('%(auth)', auth).replace('%(realm)', realm).encode('utf8')


def setup(user, password, base_url):
    global USER, PASSWORD, BASE_URL

    USER = user
    PASSWORD = password
    BASE_URL = base_url

    try:
        _create_unverified_https_context = ssl._create_unverified_context
    except AttributeError:
        # Legacy Python that doesn't verify HTTPS certificates by default
        pass
    else:
        # Handle target environment that doesn't support HTTPS verification
        ssl._create_default_https_context = _create_unverified_https_context

    password_mgr = request.HTTPPasswordMgrWithDefaultRealm()
    password_mgr.add_password(None, BASE_URL, USER, PASSWORD)
    handler = request.HTTPDigestAuthHandler(password_mgr)
    opener = request.build_opener(handler)
    request.install_opener(opener)


def query(url, action, service, queries) -> int:
    """Main function"""

    initial_request = request.Request(url=BASE_URL + url)

    initial_request.add_header('Content-Type', 'text/xml; charset=UTF-8')
    initial_request.add_header('SOAPAction', '"{0}#{1}"'.format(service, action))
    initial_request.data = soap(service, action, USER, None, None)

    initial_response = request.urlopen(url=initial_request)
    initial_response_raw_data = initial_response.read()
    initial_response_et = ElementTree.fromstring(initial_response_raw_data.decode('utf8'))

    final_response_raw_data = ''

    authentication_status = initial_response_et.find('./s:Header/h:Challenge/Status', {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
                                                                                       'h': 'http://soap-authentication.org/digest/2001/10/'})
    if authentication_status is not None and authentication_status.text == 'Unauthenticated':
        nonce = initial_response_et.find('./s:Header/h:Challenge/Nonce', {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
                                                                      'h': 'http://soap-authentication.org/digest/2001/10/'}).text
        realm = initial_response_et.find('./s:Header/h:Challenge/Realm', {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
                                                                      'h': 'http://soap-authentication.org/digest/2001/10/'}).text

        final_request = request.Request(url=BASE_URL + url)

        final_request.add_header('Content-Type', 'text/xml; charset=UTF-8')
        final_request.add_header('SOAPAction', '"{0}#{1}"'.format(service, action))
        final_request.data = soap(service, action, USER, PASSWORD, realm, nonce)

        final_response = request.urlopen(url=final_request)
        final_response_raw_data = final_response.read().decode('utf8')
    else:
        final_response_raw_data = initial_response_raw_data
        

    # print(final_response.read())
    final_response_et = ElementTree.fromstring(final_response_raw_data)

    state = final_response_et.find('./s:Header/h:NextChallenge/Status',
                                   {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
                                    'h': 'http://soap-authentication.org/digest/2001/10/'})

    if state is not None and state.text  == 'Unauthenticated':
        print('UNKNOWN - Authentication failed')
        return None

    result = dict()

    for q in queries:
        try:
            value = final_response_et.find('./s:Body/u:' + action + 'Response/New' + q,
                                           {'s': 'http://schemas.xmlsoap.org/soap/envelope/',
                                            'u': service}).text
            result[q] = value if value is not None else ''
        except:
            result[q] = None
            pass

    return json.dumps(result)
PK     eP0Z  Z     __main__.py#!/usr/bin/env python3
# coding=utf-8

"""TR-064 client"""

from sys import exit, argv
import tr064queries

MODES = {'DeviceInfo.GetInfo': {'url': '/upnp/control/deviceinfo', 'action': 'GetInfo',
                           'service': 'urn:dslforum-org:service:DeviceInfo:1',
                           'queries': ['ManufacturerName', 'ManufacturerOUI', 'ModelName',
                                       'Description', 'ProductClass', 'SerialNumber',
                                       'SoftwareVersion', 'HardwareVersion', 'SpecVersion',
                                       'ProvisioningCode', 'UpTime', 'DeviceLog']},
         'LANEthernetInterfaceConfig.GetStatistics': {'url': '/upnp/control/lanethernetifcfg', 'action': 'GetStatistics',
                              'service': 'urn:dslforum-org:service:LANEthernetInterfaceConfig:1',
                              'queries': ['BytesSent', 'BytesReceived', 'PacketsSent', 'PacketsReceived']},
         'WANDSLLinkConfig.GetStatistics': {'url': '/upnp/control/wandsllinkconfig1', 'action': 'GetStatistics',
                                   'service': 'urn:dslforum-org:service:WANDSLLinkConfig:1',
                                   'queries': ['ATMTransmittedBlocks', 'ATMReceivedBlocks', 'AAL5CRCErrors',
                                               'ATMCRCErrors']},
         'WANDSLInterfaceConfig.GetInfo': {'url': '/upnp/control/wandslifconfig1', 'action': 'GetInfo',
                        'service': 'urn:dslforum-org:service:WANDSLInterfaceConfig:1',
                        'queries': ['UpstreamCurrRate', 'DownstreamCurrRate', 'UpstreamMaxRate',
                                    'DownstreamMaxRate',
                                    'UpstreamNoiseMargin', 'DownstreamNoiseMargin',
                                    'UpstreamAttenuation', 'DownstreamAttenuation', 'DownstreamPower',
                                    'UpstreamPower']},
         'WANDSLInterfaceConfig.GetStatisticsTotal': {'url': '/upnp/control/wandslifconfig1', 'action': 'GetStatisticsTotal',
                                   'service': 'urn:dslforum-org:service:WANDSLInterfaceConfig:1',
                                   'queries': ['ReceiveBlocks', 'TransmitBlocks', 'CellDelin',
                                               'LinkRetrain', 'InitErrors', 'InitTimeouts',
                                               'LossOfFraming', 'ErroredSecs', 'SeverelyErroredSecs',
                                               'FECErrors', 'ATUCFECErrors', 'HECErrors',
                                               'ATUCHECErrors', 'ATUCCRCErrors', 'CRCErrors']},
         'WANPPPConnection.GetInfo': {'url': '/upnp/control/wanpppconn1', 'action': 'GetInfo',
                  'service': 'urn:dslforum-org:service:WANPPPConnection:1',
                  'queries': ['Enable', 'ConnectionStatus', 'PossibleConnectionTypes', 'ConnectionType', 'Name',
                              'Uptime', 'DownstreamMaxBitRate', 'UpstreamMaxBitRate', 'LastConnectionError',
                              'IdleDisconnectTime', 'RSIPAvailable', 'UserName', 'NATEnabled', 'DNSServers',
                              'MACAddress', 'ConnectionTrigger', 'LastAuthErrorInfo', 'MaxCharsUsername',
                              'MinCharsUsername', 'AllowedCharsUsername', 'MaxCharsPassword', 'MinCharsPassword',
                              'AllowedCharsPassword', 'TransportType', 'RouteProtocolRx', 'PPPoEServiceName',
                              'RemoteIPAddress', 'PPPoEACName', 'DNSEnabled', 'DNSOverrideAllowed']},
         'WANCommonInterfaceConfig.GetCommonLinkProperties': {'url': '/upnp/control/wancommonifconfig1', 'action': 'GetCommonLinkProperties',
                  'service': 'urn:dslforum-org:service:WANCommonInterfaceConfig:1',
                  'queries': ['WANAccessType', 'Layer1UpstreamMaxBitRate', 'Layer1DownstreamMaxBitRate', 'PhysicalLinkStatus']},
         'WANCommonInterfaceConfig.GetTotalBytesSent': {'url': '/upnp/control/wancommonifconfig1',
                                                              'action': 'GetTotalBytesSent',
                                                              'service': 'urn:dslforum-org:service:WANCommonInterfaceConfig:1',
                                                              'queries': ['TotalBytesSent']},
         'WANCommonInterfaceConfig.GetTotalBytesReceived': {'url': '/upnp/control/wancommonifconfig1',
                                                        'action': 'GetTotalBytesReceived',
                                                        'service': 'urn:dslforum-org:service:WANCommonInterfaceConfig:1',
                                                        'queries': ['TotalBytesReceived']},
         'WANIPConnection.GetExternalIPAddress': {'url': '/upnp/control/wanipconnection1', 'action': 'GetExternalIPAddress',
                                      'service': 'urn:dslforum-org:service:WANIPConnection:1',
                                      'queries': ['ExternalIPAddress']},
         'UserInterface.GetInfo': {'url': '/upnp/control/userif', 'action': 'GetInfo',
                                      'service': 'urn:dslforum-org:service:UserInterface:1',
                                      'queries': ['UpgradeAvailable', 'X_AVM-DE_Version', 'X_AVM-DE_DownloadURL', 'X_AVM-DE_InfoURL', 'X_AVM-DE_UpdateState', 'X_AVM-DE_LaborVersion']},
         'UserInterface.X_AVM-DE_GetInfo': {'url': '/upnp/control/userif', 'action': 'X_AVM-DE_GetInfo',
                                      'service': 'urn:dslforum-org:service:UserInterface:1',
                                      'queries': ['X_AVM-DE_AutoUpdateMode', 'X_AVM-DE_UpdateTime', 'X_AVM-DE_LastFwVersion', 'X_AVM-DE_LastInfoUrl', 'X_AVM-DE_CurrentFwVersion', 'X_AVM-DE_UpdateSuccessful']},
         'WANCommonInterfaceConfig.GetAddonInfos': {'url': '/igdupnp/control/WANCommonIFC1', 'action': 'GetAddonInfos',
                                      'service': 'urn:schemas-upnp-org:service:WANCommonInterfaceConfig:1',
                                      'queries': ['X_AVM_DE_TotalBytesSent64', 'X_AVM_DE_TotalBytesReceived64', 'X_AVM_DE_WANAccessType']},
#         'UserInterface.': {'url': '/upnp/control/userif', 'action': '',
#                                      'service': 'urn:dslforum-org:service:UserInterface:1',
#                                      'queries': ['']},
         }


def show_help() -> None:
    print('Usage:')
    print(argv[0] + ' <Mode> <Base URL> <User> <Password>\n')
    print('Where <Base URL> is something like https://192.168.178.1:49433,'
          ' http://192.168.178.1:49000 or https://remote-address:port/tr064')
    print('and where <Mode> is one of')
    for i in MODES.keys():
        print('    ' + i)


if len(argv) != 5:
    show_help()
    exit(255)


if __name__ == '__main__':
    mode_parameter = argv[1]
    base_url_parameter = argv[2]
    user_parameter = argv[3]
    password_parameter = argv[4]

    tr064queries.setup(user=user_parameter, password=password_parameter, base_url=base_url_parameter)

    mode = MODES.get(mode_parameter)
    if mode is None:
        print('No such mode: ' + mode_parameter)
        show_help()
        exit(-1)

    url = mode.get('url')
    action = mode.get('action')
    service = mode.get('service')
    queries = mode.get('queries')

#    try:
    print(tr064queries.query(url, action, service, queries))
#    except Exception as e:
#        print('UNKNOWN - Exception occurred: ' + str(e))
#        exit(-1)
PK     eP6P                  tr064queries.pyPK     eP0Z  Z               __main__.pyPK      v   q5    