#coding:utf-8
u"""
Классы SAML-запросов и ответов
"""
import urllib2
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.auth import logout, login
from django.shortcuts import redirect
from django.utils.importlib import import_module
from ssosp.settings import get_sso_setting
from ssosp.assertion_parser import xml_to_assertion, is_logout_request, \
get_session_from_request_assertion, build_assertion, assertion_to_xml, \
is_logout_response, get_session_from_response_assertion, \
get_userid_from_assertion, get_attributes_from_assertion, \
verify_assertion, sign_request
from ssosp.exceptions import SSOLoginException
from ssosp.utils import decode_base64_and_inflate, deflate_and_base64_encode, \
get_random_id, get_time_string, decode_base64
[документация]class SSOException(Exception):
u"""
Класс исключений работы с SSO
"""
pass
[документация]def get_session_map():
u"""
Получение бэкенда хранения соответствия сессий, указанного в
настройке SSO_CONFIG['session_map']
:return: Экземпляр бэкенда, наследника BaseSSOSessionMap
:rtype: ssosp.backends.base.BaseSSOSessionMap
"""
session_map_engine = get_sso_setting('session_map')
engine = import_module(session_map_engine)
return engine.SSOSessionMap()
[документация]def get_method(method_str):
u"""
Получение функции, представленной строкой с полным путем
:param basesting method_str: полный путь к функции
:return: указатель на функцию или None, если строка пустая
"""
if method_str:
module = '.'.join(method_str.split('.')[:-1])
method = method_str.split('.')[-1]
mod = import_module(module)
return getattr(mod, method)
else:
return None
[документация]class SAMLObject(object):
u"""
Базовый класс SAML-объекта
"""
def __init__(self, request):
u"""
Инициализация
:param request: запрос, в рамках которого создан объект
:type request: django.http.HttpRequest
"""
self.idp_url = get_sso_setting('idp')
self.issuer = get_sso_setting('issuer')
self.service_index = get_sso_setting('index')
self.acs_url = request.build_absolute_uri(get_sso_setting('acs'))
self.signing = get_sso_setting('signing')
self.validate = get_sso_setting('validate')
self.public_key_str = get_sso_setting('public_key')
self.private_key_str = get_sso_setting('private_key')
self.logout_method = get_method(get_sso_setting('logout'))
self.login_method = get_method(get_sso_setting('login'))
self.get_user_method = get_method(get_sso_setting('get_user'))
[документация]class AuthResponse(SAMLObject):
u"""
SAML-ответ на запрос аутентификации
"""
def __init__(self, request):
super(AuthResponse, self).__init__(request)
[документация] def from_assertion(self, assertion):
u"""
Заполнить из утверждения. Загрузить.
Определяется сессия, пользователь, атрибуты пользователя.
:param assertion: Утверждение, из которого надо получить данные
:type assertion: etree.ElementTree
"""
if self.validate and (not self.public_key_str
or not verify_assertion(assertion,
self.public_key_str)):
raise SSOException("Response not valid")
self.session_id = get_session_from_response_assertion(assertion)
self.attributes = get_attributes_from_assertion(assertion)
userid = get_userid_from_assertion(assertion)
if self.get_user_method:
self.user = self.get_user_method(userid, self.attributes)
else:
try:
self.user = User.objects.get(username=userid)
# возьмем первый попавшийся бэкенд
self.user.backend = settings.AUTHENTICATION_BACKENDS[0]
except User.DoesNotExist:
self.user = None
[документация] def do_login(self, request, next_url):
u"""
Выполнить вход в систему.
Атрибуты пользователя сохраняются в сессию.
Сохраняется соответствие SSO-сессии и django-сессии.
:param request: запрос, в рамках которого выполняется действие
:type request: django.http.HttpRequest
:param basestring next_url: адрес, на который вернуться после входа
:return: ответ на запрос - редирект на адрес возврата
:rtype: django.http.HttpResponseRedirect
"""
if self.login_method:
try:
self.login_method(request, self.user)
except SSOLoginException as ex:
return redirect(u'{}?msg={}'.format(ex.next_url, ex.message))
else:
login(request, self.user)
request.session['attributes'] = self.attributes
# сохраним соответствие SSO-сессии и django-сессии
if self.session_id:
session_map = get_session_map()
session_map.set_session_map(self.session_id,
request.session.session_key)
return redirect(next_url)
[документация]class LogoutResponse(SAMLObject):
u"""
SAML-ответ на запрос выхода из системы
"""
def __init__(self, request):
super(LogoutResponse, self).__init__(request)
[документация] def from_assertion(self, assertion):
u"""
Заполнить из утверждения. Загрузить.
Просто проверим цифровую подпись, если надо.
:param assertion: Утверждение, из которого надо получить данные
:type assertion: etree.ElementTree
"""
if self.validate and (not self.public_key_str
or not verify_assertion(assertion,
self.public_key_str)):
raise SSOException("Response not valid")
[документация] def do_logout(self, request, next_url):
u"""
Выполнить выход из системы.
Удаляется соответствие SSO-сессии и django-сессии.
:param request: запрос, в рамках которого выполняется действие
:type request: django.http.HttpRequest
:param basestring next_url: адрес, на который вернуться после выхода
:return: ответ на запрос - редирект на адрес возврата
:rtype: django.http.HttpResponseRedirect
"""
session_map = get_session_map()
session_key = request.session.session_key
if self.logout_method:
self.logout_method(request)
else:
logout(request)
session_map.delete_by_django_session(session_key)
return redirect(next_url)
[документация]class AuthRequest(SAMLObject):
u"""
SAML-запрос на вход в систему
"""
def __init__(self, request):
super(AuthRequest, self).__init__(request)
[документация] def get_request(self):
u"""
Получить SAML-запрос.
Формируется утверждение AuthnRequest и преобразовывается в строку.
:return: Утверждение для входа в систему, представленное в виде строки
:rtype: basestring
"""
assertion_struct = {
'tag': '{samlp}AuthnRequest',
'attrs': {
'AssertionConsumerServiceURL': self.acs_url,
'Destination': self.idp_url,
'ID': get_random_id(),
'IssueInstant': get_time_string(),
'ProtocolBinding':
"urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
'Version': "2.0",
},
'nsmap': {'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'saml': 'urn:oasis:names:tc:SAML:2.0:assertion'},
'value': [
{
'tag': '{saml}Issuer',
'value': self.issuer,
},
{
'tag': '{saml}AttributeQuery',
'value': [
{
'tag': '{saml}Attribute',
'attrs': {'Name': 'role'}
}
]
}
],
}
if self.service_index:
assertion_struct['attrs']['AttributeConsumingServiceIndex'] = \
self.service_index
assertion = build_assertion(assertion_struct)
req = get_str_from_assertion(assertion)
return req
[документация] def get_login(self, next_url):
u"""
Получить GET-запрос на вход в систему.
:param basestring next_url: адрес, на который вернуться после входа
:return: редирект на адрес SSO с SAML-запросом на вход в качестве
параметра
:rtype: django.http.HttpResponseRedirect
"""
request_str = self.get_request()
if self.signing and self.private_key_str:
req = 'SAMLRequest=%s&RelayState=%s&SigAlg=%s' % (
urllib2.quote(request_str),
urllib2.quote(next_url),
urllib2.quote('http://www.w3.org/2000/09/xmldsig#rsa-sha1'),
)
signature = sign_request(req, self.private_key_str)
login_url = '%s?%s&Signature=%s' % (
self.idp_url, req,
urllib2.quote(signature),
)
else:
login_url = '%s?SAMLRequest=%s&RelayState=%s' % (
self.idp_url, urllib2.quote(request_str), urllib2.quote(next_url)
)
return redirect(login_url)
[документация]class LogoutRequest(SAMLObject):
u"""
SAML-запрос на выход из системы
"""
def __init__(self, request):
super(LogoutRequest, self).__init__(request)
session_map = get_session_map()
self.session_id = session_map.get_sso_session_key(
request.session.session_key)
[документация] def from_assertion(self, assertion):
u"""
Заполнить из утверждения. Загрузить.
Просто проверим цифровую подпись, если надо.
И вытащим сессию, если ее передали в утверждении.
:param assertion: Утверждение, из которого надо получить данные
:type assertion: etree.ElementTree
"""
if self.validate and (not self.public_key_str
or not verify_assertion(assertion,
self.public_key_str)):
raise SSOException("Response not valid")
self.session_id = get_session_from_request_assertion(assertion)
[документация] def do_logout_by_session(self, request):
u"""
Осуществить выход из систему по сессии SSO.
:param request: запрос, в рамках которого выполняется действие
:type request: django.http.HttpRequest
"""
session_map = get_session_map()
# найдем django-сессию, соответствующую SSO-сессии
if session_map.exists_sso_session(self.session_id):
engine = import_module(settings.SESSION_ENGINE)
session_key = session_map.get_django_session_key(self.session_id)
request.session = engine.SessionStore(session_key)
if self.logout_method:
self.logout_method(request)
else:
logout(request)
session_map.delete_by_sso_session(self.session_id)
else:
# на нашли соответствующую сессию
pass
[документация] def get_request(self, username):
u"""
Получить SAML-запрос на выход.
Формируется утверждение LogoutRequest и преобразовывается в строку.
Используется текущая сессия: либо загруженная, либо определенная из
request.
:param basestring username: пользователь, который должен выйти из
системы. (Похоже, что уже не надо использовать)
:return: Утверждение для выхода из системы, представленное в виде строки
:rtype: basestring
"""
assertion_struct = {
'tag': '{samlp}LogoutRequest',
'attrs': {
'AssertionConsumerServiceURL': self.acs_url,
'ID': get_random_id(),
'Destination': self.idp_url,
'IssueInstant': get_time_string(),
'ProtocolBinding': "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect",
'Version': "2.0",
'AttributeConsumingServiceIndex': self.service_index,
},
'nsmap': {'samlp': 'urn:oasis:names:tc:SAML:2.0:protocol',
'saml': 'urn:oasis:names:tc:SAML:2.0:assertion'},
'value': [
{
'tag': '{saml}Issuer',
'value': self.issuer,
},
{
'tag': '{saml}NameID',
'value': username,
},
],
}
if self.session_id:
assertion_struct['value'].append({
'tag': '{samlp}SessionIndex',
'value': self.session_id,
})
assertion = build_assertion(assertion_struct)
req = get_str_from_assertion(assertion)
return req
[документация] def get_logout(self, username, next_url):
u"""
Получить GET-запрос на выход из системы.
:param basestring username: пользователь, который должен выйти из
системы. (Похоже, что уже не надо использовать)
:param basestring next_url: адрес, на который вернуться после входа
:return: редирект на адрес SSO с SAML-запросом на выход в качестве
параметра
:rtype: django.http.HttpResponseRedirect
"""
request_str = self.get_request(username)
if self.signing and self.private_key_str:
req = 'SAMLRequest=%s&RelayState=%s&SigAlg=%s' % (
urllib2.quote(request_str),
urllib2.quote(next_url),
urllib2.quote('http://www.w3.org/2000/09/xmldsig#rsa-sha1'),
)
signature = sign_request(req, self.private_key_str)
logout_url = '%s?%s&Signature=%s' % (
self.idp_url, req,
urllib2.quote(signature),
)
else:
logout_url = '%s?SAMLRequest=%s&RelayState=%s' % (
self.idp_url, urllib2.quote(request_str),
urllib2.quote(next_url)
)
return redirect(logout_url)
[документация] def do_logout(self, request):
u"""
Осуществить выход из систему по текущей сессии django.
Сессия получается из текущего запроса.
:param request: запрос, в рамках которого выполняется действие
:type request: django.http.HttpRequest
"""
session_map = get_session_map()
session_key = request.session.session_key
if self.logout_method:
self.logout_method(request)
else:
logout(request)
session_map.delete_by_django_session(session_key)
[документация]def get_response_from_data(request, xml_string):
u"""
Получить утверждение-ответ из xml-строки.
Используется для определения объекта в сервисе ACS.
:param request: запрос, в рамках которого выполняется действие
:type request: django.http.HttpRequest
:param basestring xml_string: xml-строка, содержащая утверждение
:return: объект Response, преобразованный из xml-строки
:rtype: либо LogoutResponse, либо AuthResponse
"""
if get_sso_setting('zipped'):
assertion_str = decode_base64_and_inflate(xml_string)
else:
assertion_str = decode_base64(xml_string)
assertion = xml_to_assertion(assertion_str)
if is_logout_response(assertion):
response = LogoutResponse(request)
response.from_assertion(assertion)
return response
else:
response = AuthResponse(request)
response.from_assertion(assertion)
return response
[документация]def get_request_from_data(request, xml_string):
u"""
Получить утверждение-запрос из xml-строки.
Используется для определения объекта в сервисе ACS.
:param request: запрос, в рамках которого выполняется действие
:type request: django.http.HttpRequest
:param xml_string: xml-строка, содержащая утверждение
:type xml_string: basestring
:return: объект Request, преобразованный из xml-строки
:rtype: LogoutRequest | None
"""
if get_sso_setting('zipped'):
assertion_str = decode_base64_and_inflate(xml_string)
else:
assertion_str = decode_base64(xml_string)
assertion = xml_to_assertion(assertion_str)
if is_logout_request(assertion):
request = LogoutRequest(request)
request.from_assertion(assertion)
return request
else:
return None
[документация]def get_str_from_assertion(assertion):
u"""
Преобразовать утверждение в строку перекодированную и упакованную.
:param assertion: Утверждение (Assertion, xml)
:type assertion: etree.ElementTree
:return: xml-строка, содержащая утверждение
:rtype: basestring
"""
xml_string = assertion_to_xml(assertion)
assertion_str = deflate_and_base64_encode(xml_string)
return assertion_str