Flask中的单点登录解决方案
1.SSO 和 CAS
单点登录(Single Sign On
,SSO
)就是通过用户的一次性鉴别登录。当用户在身份认证服务器上登录一次以后,即可获得访问单点登录系统中其他关联系统和应用软件的权限,同时这种实现是不需要管理员对用户的登录状态或其他信息进行修改的,这意味着在多个应用系统中,用户只需一次登录就可以访问所有相互信任的应用系统。这种方式减少了由登录产生的时间消耗,辅助了用户管理,是比较流行的。
中央认证服务(Central Authentication Service
,CAS
),一种独立开放指令协议,是耶鲁大学发起的一个企业级开源项目,旨在为 Web 应用系统提供一种可靠的 SSO 解决方案。
2.Flask-CAS
那么对于 Flask,我们如何实现单点登录呢?此处提供一个开源的解决方案:Flask-CAS。在实际开发中,直接使用它的 API 可能无法满足我们的需求,这个时候就需要对源码进行一些修改。目前最新的版本是 1.0.2 版本。
__init__.py
:主要是一些配置和基本的函数。
cas_urls.py
:主要是生成一些 CAS 登录相关的 url 以及跳转的 url。
routing.py
:项目的核心所在,主要包含了 登录、登出、验证 三个函数,这三个函数也是我们自定义时候重点修改的地方。
"""
flask_cas.__init__
"""
import flask
from flask import current_app
# Find the stack on which we want to store the database connection.
# Starting with Flask 0.9, the _app_ctx_stack is the correct one,
# before that we need to use the _request_ctx_stack.
try:
from flask import _app_ctx_stack as stack
except ImportError:
from flask import _request_ctx_stack as stack
from . import routing
from functools import wraps
class CAS(object):
"""
Required Configs:
|Key |
|----------------|
|CAS_SERVER |
|CAS_AFTER_LOGIN |
Optional Configs:
|Key | Default |
|---------------------------|-----------------------|
|CAS_TOKEN_SESSION_KEY | _CAS_TOKEN |
|CAS_USERNAME_SESSION_KEY | CAS_USERNAME |
|CAS_ATTRIBUTES_SESSION_KEY | CAS_ATTRIBUTES |
|CAS_LOGIN_ROUTE | '/cas' |
|CAS_LOGOUT_ROUTE | '/cas/logout' |
|CAS_VALIDATE_ROUTE | '/cas/serviceValidate'|
|CAS_AFTER_LOGOUT | None |
"""
def __init__(self, app=None, url_prefix=None):
self._app = app
if app is not None:
self.init_app(app, url_prefix)
def init_app(self, app, url_prefix=None):
# Configuration defaults
app.config.setdefault('CAS_TOKEN_SESSION_KEY', '_CAS_TOKEN')
app.config.setdefault('CAS_USERNAME_SESSION_KEY', 'CAS_USERNAME')
app.config.setdefault('CAS_ATTRIBUTES_SESSION_KEY', 'CAS_ATTRIBUTES')
app.config.setdefault('CAS_LOGIN_ROUTE', '/cas')
app.config.setdefault('CAS_LOGOUT_ROUTE', '/cas/logout')
app.config.setdefault('CAS_VALIDATE_ROUTE', '/cas/serviceValidate')
# Requires CAS 2.0
app.config.setdefault('CAS_AFTER_LOGOUT', None)
# Register Blueprint
app.register_blueprint(routing.blueprint, url_prefix=url_prefix)
# Use the newstyle teardown_appcontext if it's available,
# otherwise fall back to the request context
if hasattr(app, 'teardown_appcontext'):
app.teardown_appcontext(self.teardown)
else:
app.teardown_request(self.teardown)
def teardown(self, exception):
ctx = stack.top
@property
def app(self):
return self._app or current_app
@property
def username(self):
return flask.session.get(
self.app.config['CAS_USERNAME_SESSION_KEY'], None)
@property
def attributes(self):
return flask.session.get(
self.app.config['CAS_ATTRIBUTES_SESSION_KEY'], None)
@property
def token(self):
return flask.session.get(
self.app.config['CAS_TOKEN_SESSION_KEY'], None)
def login():
return flask.redirect(flask.url_for('cas.login', _external=True))
def logout():
return flask.redirect(flask.url_for('cas.logout', _external=True))
def login_required(function):
@wraps(function)
def wrap(*args, **kwargs):
if 'CAS_USERNAME' not in flask.session:
flask.session['CAS_AFTER_LOGIN_SESSION_URL'] = (
flask.request.script_root +
flask.request.full_path
)
return login()
else:
return function(*args, **kwargs)
return wrap
"""
flask_cas.cas_urls
Functions for creating urls to access CAS.
"""
try:
from urllib import quote
from urllib import urlencode
from urlparse import urljoin
except ImportError:
from urllib.parse import quote
from urllib.parse import urljoin
from urllib.parse import urlencode
def create_url(base, path=None, *query):
""" Create a url.
Creates a url by combining base, path, and the query's list of
key/value pairs. Escaping is handled automatically. Any
key/value pair with a value that is None is ignored.
Keyword arguments:
base -- The left most part of the url (ex. http://localhost:5000).
path -- The path after the base (ex. /foo/bar).
query -- A list of key value pairs (ex. [('key', 'value')]).
Example usage:
>>> create_url(
... 'http://localhost:5000',
... 'foo/bar',
... ('key1', 'value'),
... ('key2', None), # Will not include None
... ('url', 'http://example.com'),
... )
'http://localhost:5000/foo/bar?key1=value&url=http%3A%2F%2Fexample.com'
"""
url = base
# Add the path to the url if it's not None.
if path is not None:
url = urljoin(url, quote(path))
# Remove key/value pairs with None values.
query = filter(lambda pair: pair[1] is not None, query)
# Add the query string to the url
url = urljoin(url, '?{0}'.format(urlencode(list(query))))
return url
def create_cas_login_url(cas_url, cas_route, service, renew=None, gateway=None):
""" Create a CAS login URL .
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas)
service -- (ex. http://localhost:5000/login)
renew -- "true" or "false"
gateway -- "true" or "false"
Example usage:
>>> create_cas_login_url(
... 'http://sso.pdx.edu',
... '/cas',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas?service=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
('service', service),
('renew', renew),
('gateway', gateway),
)
def create_cas_logout_url(cas_url, cas_route, service=None):
""" Create a CAS logout URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/logout)
url -- (ex. http://localhost:5000/login)
Example usage:
>>> create_cas_logout_url(
... 'http://sso.pdx.edu',
... '/cas/logout',
... 'http://localhost:5000',
... )
'http://sso.pdx.edu/cas/logout?service=http%3A%2F%2Flocalhost%3A5000'
"""
return create_url(
cas_url,
cas_route,
('service', service),
)
def create_cas_validate_url(cas_url, cas_route, service, ticket,
renew=None):
""" Create a CAS validate URL.
Keyword arguments:
cas_url -- The url to the CAS (ex. http://sso.pdx.edu)
cas_route -- The route where the CAS lives on server (ex. /cas/serviceValidate)
service -- (ex. http://localhost:5000/login)
ticket -- (ex. 'ST-58274-x839euFek492ou832Eena7ee-cas')
renew -- "true" or "false"
Example usage:
>>> create_cas_validate_url(
... 'http://sso.pdx.edu',
... '/cas/serviceValidate',
... 'http://localhost:5000/login',
... 'ST-58274-x839euFek492ou832Eena7ee-cas'
... )
'http://sso.pdx.edu/cas/serviceValidate?service=http%3A%2F%2Flocalhost%3A5000%2Flogin&ticket=ST-58274-x839euFek492ou832Eena7ee-cas'
"""
return create_url(
cas_url,
cas_route,
('service', service),
('ticket', ticket),
('renew', renew),
)
"""
flask_cas.routing
"""
import flask
from xmltodict import parse
from flask import current_app
from .cas_urls import create_cas_login_url
from .cas_urls import create_cas_logout_url
from .cas_urls import create_cas_validate_url
try:
from urllib import urlopen
except ImportError:
from urllib.request import urlopen
blueprint = flask.Blueprint('cas', __name__)
@blueprint.route('/login/')
def login():
"""
This route has two purposes. First, it is used by the user
to login. Second, it is used by the CAS to respond with the
`ticket` after the user logs in successfully.
When the user accesses this url, they are redirected to the CAS
to login. If the login was successful, the CAS will respond to this
route with the ticket in the url. The ticket is then validated.
If validation was successful the logged in username is saved in
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
the user's attributes are saved under the key
'CAS_USERNAME_ATTRIBUTE_KEY'
"""
cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY']
redirect_url = create_cas_login_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGIN_ROUTE'],
flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True))
if 'ticket' in flask.request.args:
flask.session[cas_token_session_key] = flask.request.args['ticket']
if cas_token_session_key in flask.session:
if validate(flask.session[cas_token_session_key]):
if 'CAS_AFTER_LOGIN_SESSION_URL' in flask.session:
redirect_url = flask.session.pop('CAS_AFTER_LOGIN_SESSION_URL')
elif flask.request.args.get('origin'):
redirect_url = flask.request.args['origin']
else:
redirect_url = flask.url_for(
current_app.config['CAS_AFTER_LOGIN'])
else:
del flask.session[cas_token_session_key]
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
return flask.redirect(redirect_url)
@blueprint.route('/logout/')
def logout():
"""
When the user accesses this route they are logged out.
"""
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY']
if cas_username_session_key in flask.session:
del flask.session[cas_username_session_key]
if cas_attributes_session_key in flask.session:
del flask.session[cas_attributes_session_key]
if(current_app.config['CAS_AFTER_LOGOUT'] is not None):
redirect_url = create_cas_logout_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGOUT_ROUTE'],
current_app.config['CAS_AFTER_LOGOUT'])
else:
redirect_url = create_cas_logout_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_LOGOUT_ROUTE'])
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
return flask.redirect(redirect_url)
def validate(ticket):
"""
Will attempt to validate the ticket. If validation fails, then False
is returned. If validation is successful, then True is returned
and the validated username is saved in the session under the
key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
"""
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY']
current_app.logger.debug("validating token {0}".format(ticket))
cas_validate_url = create_cas_validate_url(
current_app.config['CAS_SERVER'],
current_app.config['CAS_VALIDATE_ROUTE'],
flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True),
ticket)
current_app.logger.debug("Making GET request to {0}".format(
cas_validate_url))
xml_from_dict = {}
isValid = False
try:
xmldump = urlopen(cas_validate_url).read().strip().decode('utf8', 'ignore')
xml_from_dict = parse(xmldump)
isValid = True if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"] else False
except ValueError:
current_app.logger.error("CAS returned unexpected result")
if isValid:
current_app.logger.debug("valid")
xml_from_dict = xml_from_dict["cas:serviceResponse"]["cas:authenticationSuccess"]
username = xml_from_dict["cas:user"]
flask.session[cas_username_session_key] = username
if "cas:attributes" in xml_from_dict:
attributes = xml_from_dict["cas:attributes"]
if "cas:memberOf" in attributes:
if not isinstance(attributes["cas:memberOf"], list):
attributes["cas:memberOf"] = attributes["cas:memberOf"].lstrip('[').rstrip(']').split(',')
for group_number in range(0, len(attributes['cas:memberOf'])):
attributes['cas:memberOf'][group_number] = attributes['cas:memberOf'][group_number].lstrip(' ').rstrip(' ')
else:
for index in range(0, len(attributes['cas:memberOf'])):
attributes["cas:memberOf"][index] = attributes["cas:memberOf"][index].lstrip('[').rstrip(']').split(',')
for group_number in range(0, len(attributes['cas:memberOf'][index])):
attributes['cas:memberOf'][index][group_number] = attributes['cas:memberOf'][index][group_number].lstrip(' ').rstrip(' ')
flask.session[cas_attributes_session_key] = attributes
else:
current_app.logger.debug("invalid")
return isValid
3.Example
首先需要导入包。
from flask_cas import CAS
使用方法一:
app = Flask(__name__)
CAS(app)
使用方法二:
cas = CAS()
app = Flask(__name__)
cas.init_app(app)
CAS 类将添加两条路由 /login/
和 /logout/
。也可以把一个路由前缀作为第二个参数传给 CAS 的构造方法或初始化方法。
/login/
路由会将用户重定向到 CAS_SERVERE
配置值指定的 CAS。如果登录成功,用户将被重定向到由 CAS_AFTER_LOGIN
配置值指定的端点,登录用户的用户名也将被存储到由 CAS_USERNAME_SESSION_KEY
指定的键的 session 中。如果属性可用,它们将被存储在 CAS_ATTRIBUTES_SESSION_KEY
指定的键的 session 中。
/logout/
路由会将用户重定向到 CAS 注销页面,用户名和属性将会从 session 中被删除。
下面给出一个实例:
import flask
from flask import Flask
from flask_cas import CAS
from flask_cas import login_required
app = Flask(__name__)
cas = CAS(app)
# 此处填写server端地址
app.config['CAS_SERVER'] = 'https://server.cas.com:8443/cas'
# 填写cas登录后跳转的路由
app.config['CAS_AFTER_LOGIN'] = 'login'
app.config['SECRET_KEY'] ='dasdasdasdasdada123123423423aASDAS'
# 全局取消证书验证
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
@app.route('/')
@login_required
def login():
return flask.render_template(
'demo.html',
username=cas.username,
)
if __name__ == '__main__':
app.run("app1.cas.com", 9100, debug=True) # 此处填写客户端路由
demo.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>{{ username }}</h1>
</body>
</html>