Исходный код ssosp.assertion_parser

#coding:utf-8
u"""
Низкоуровневая работа с SAML-утверждениями (Assertion) в xml виде
с использованием lxml.etree
"""

from lxml import etree
import rsa
try:
    import xmldsig
except ImportError:
    xmldsig = None


def _build_node(builder, struct, nsmap):
    u"""
    Рекурсивный построитель элементов дерева из переданной структуры.

    :param builder: экземпляр построителя дерева etree.TreeBuilder
    :type builder: etree.TreeBuilder
    :param struct: структура, из которой надо построить дерево
    :type struct: basestring, dict, list, tuple
    :param dict nsmap: словарь замены namespace в формируемом дереве
    """
    assert isinstance(builder, etree.TreeBuilder)
    assert isinstance(nsmap, dict)
    assert isinstance(struct, (basestring, dict, list, tuple))
    if isinstance(struct, basestring):
        builder.data(struct)
    else:
        if isinstance(struct, dict):
            struct_list = [struct]
        else:
            struct_list = struct
        for item in struct_list:
            tag = item.get('tag', None)
            if tag:
                attrs = item.get('attrs', None)
                current_nsmap = item.get('nsmap', None)
                if current_nsmap:
                    nsmap.update(current_nsmap)
                # заменим неймспейс в тэге
                for key, value in nsmap.iteritems():
                    if tag.startswith('{%s}' % key):
                        tag = tag.replace('{%s}' % key, '{%s}' % value)
                        break
                builder.start(tag=tag, attrs=attrs, nsmap=current_nsmap)
                value = item.get('value', None)
                if value:
                    _build_node(builder, value, nsmap)
                builder.end(tag)


[документация]def build_assertion(assertion_struct): u""" Создание нового утверждения по структуре :param dict assertion_struct: словарь структуры из которого строится утверждение :return: Сформированное утверждение (Assertion, xml) :rtype: etree.ElementTree """ assert isinstance(assertion_struct, dict) builder = etree.TreeBuilder() _build_node(builder, assertion_struct, {}) assertion = builder.close() return assertion
[документация]def xml_to_assertion(xml_string): u""" Переобразование xml-строки в утверждение для дальнейшей работы :param basestring xml_string: Утверждение, представленное строкой :return: Преобразованное утверждение (Assertion, xml) :rtype: etree.ElementTree """ assertion = etree.fromstring(xml_string) return assertion
[документация]def assertion_to_xml(assertion): u""" Переобразование утверждения в xml-строку :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :return: Утверждение представленное строкой :rtype: basestring """ xml = etree.tostring(assertion, encoding='UTF-8', xml_declaration=True) return xml
[документация]def is_logout_request(assertion): u""" Проверка того, что это запрос (утверждение) на выход из системы :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :return: True, если запрос на выход :rtype: bool """ logout_resp = assertion.xpath( '//saml2p:LogoutRequest', namespaces={'saml2p': 'urn:oasis:names:tc:SAML:2.0:protocol'} ) return len(logout_resp) > 0
[документация]def is_logout_response(assertion): u""" Проверка того, что это ответ (утверждение) на запрос на выход из системы :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :return: True, если ответ на запрос на выход :rtype: bool """ logout_resp = assertion.xpath( '//saml2p:LogoutResponse', namespaces={'saml2p': 'urn:oasis:names:tc:SAML:2.0:protocol'} ) return len(logout_resp) > 0
[документация]def get_session_from_request_assertion(assertion): u""" Получение ID сессии из запроса на выход (утверждения) :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :return: идентификатор сессии :rtype: basestring """ session_id = None statement = assertion.xpath( '//saml2p:LogoutRequest//saml2p:SessionIndex', namespaces={'saml2p': 'urn:oasis:names:tc:SAML:2.0:protocol', 'saml2': 'urn:oasis:names:tc:SAML:2.0:assertion'} ) if len(statement) > 0: session_id = statement[0].text return session_id
[документация]def get_session_from_response_assertion(assertion): u""" Получение ID сессии из ответа (утверждения) на запрос :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :return: идентификатор сессии :rtype: basestring """ session_id = None statement = assertion.xpath( '//saml2p:Response//saml2:Assertion//saml2:AuthnStatement', namespaces={'saml2p': 'urn:oasis:names:tc:SAML:2.0:protocol', 'saml2': 'urn:oasis:names:tc:SAML:2.0:assertion'} ) if len(statement) > 0 and 'SessionIndex' in statement[0].attrib: session_id = statement[0].attrib['SessionIndex'] return session_id
[документация]def get_attributes_from_assertion(assertion): u""" Получение атрибутов из утверждения :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :return: список атрибутов :rtype: list """ attributes = {} attrs = assertion.xpath( '//saml2p:Response//saml2:Assertion//saml2:AttributeStatement//saml2:Attribute', namespaces={'saml2p': 'urn:oasis:names:tc:SAML:2.0:protocol', 'saml2': 'urn:oasis:names:tc:SAML:2.0:assertion'} ) for attr in attrs: name = attr.attrib['Name'] values = attr.xpath( './saml2:AttributeValue', namespaces={'saml2p': 'urn:oasis:names:tc:SAML:2.0:protocol', 'saml2': 'urn:oasis:names:tc:SAML:2.0:assertion'} ) if len(values) == 0: # игнорируем пустые значения атрибутов continue elif len(values) == 1: attributes[name] = values[0].text else: # множество значений for value in values: attributes.setdefault(name, []).append(value.text) return attributes
[документация]def get_userid_from_assertion(assertion): u""" Получение ID пользователя из ответа (утверждения) на запрос :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :return: идентификатор пользователя :rtype: basestring """ statement = assertion.xpath( '//saml2p:Response//saml2:Assertion//saml2:Subject//saml2:NameID', namespaces={'saml2p': 'urn:oasis:names:tc:SAML:2.0:protocol', 'saml2': 'urn:oasis:names:tc:SAML:2.0:assertion'} ) if len(statement) > 0: userid = statement[0].text else: userid = None return userid
[документация]def verify_assertion(assertion, public_key_str): u""" Проверка цифровой подписи утверждения по публичному ключу :param assertion: Утверждение (Assertion, xml) :type assertion: etree.ElementTree :param basestring public_key_str: публичный ключ подписи представленный в виде строки :return: признак успешной проверки подписи :rtype: bool :raise: XMLSigException - ошибка при проверке подписи """ if not xmldsig is None: with file(public_key_str, 'r') as public_key_file: public_key_data = public_key_file.read() public_key = rsa.key.PublicKey.load_pkcs1_openssl_pem(public_key_data) try: return xmldsig.verify(assertion.getroottree(), public_key) except xmldsig.XMLSigException as err: if err.message == "Is not signed xml!": return True else: raise err else: raise ImportError("Cant import xmldsig module.")
[документация]def sign_request(message, private_key_str): u""" Цифровая подпись SAML-сообщения. Получение сигнатуры по алгоритму SHA1 :param basestring message: Сообщение для подписи :param basestring public_key_str: закрытый ключ для подписи представленный в виде строки :return: строка сигнатуры подписи закодированная в base64 :rtype: basestring """ with file(private_key_str, 'r') as private_key_file: private_key_data = private_key_file.read() private_key = rsa.key.PrivateKey.load_pkcs1(private_key_data) signed = rsa.pkcs1.sign(message, private_key, 'SHA-1') signature = signed.encode('base64').replace('\n', '') return signature