Omezení přístupu do AppEngine aplikace

gaegoogle app engine

Poměrně často narážíme na požadavek omezení přístupu uživatelů do aplikace na AppEngine.

Modelový příklad: Zákazník si přeje omezit přístup do aplikace tak, aby do ní mohli pouze uživatelé z domény xxx.cz, yyy.cz a fragaria.cz.

Google nabízí pouze omezení přístupu buď na libovolnou (ale pouze jednu) Google Apps doménu nebo na jakéhokoliv uživatele z Google Apps domény bez omezení.

První možnost tedy použít nemůžeme, druhou používáme k vynucení přihlášení uživatele – omezení na Google Apps domény je pro nás OK.

Postup, který jsme používali dosud je podobný jako například v tomto článku. Připravíme si anotaci @user_required, která zkontroluje uživatele a tu "nalepíme" na všechny metody příslušných HTTP handlerů. To ale znamená, že máme anotace rozeseté všude a je snadné jí někde zapomenout.

Dneska +Robin Gottfried přišel s mnohem elegantnějším řešením. Využívá metodu __call__() z webapp2, přes kterou procházejí všechny requesty v aplikaci a kontrolu dělá tam.

Asi takhle:

from utils import SecureApplication

routes = (
    ('/route1', 'app.views.view1'),
)

handler = SecureApplication(routes=routes, debug=settings['DEBUG'])

Samozřejmě je nutné místo standardní WSGIApplication použít vlastní podtřídu:

import logging
from google.appengine.api import users
from webapp2 import WSGIApplication
from webob import exc

class SecureApplication(WSGIApplication):
    """ WSGIApplication which checks if user belongs to one of allowed domains. """
    def __call__(self, environ, start_response):
        logging.debug('calling request as %s' % users.get_current_user())
        with self.request_context_class(self, environ) as (request, response):
            try:
                if authorize_user():
                    return super(SecureApplication, self).__call__(environ, start_response)
                else:
                    raise exc.HTTPForbidden(detail="User not allowed to access application!")
            except Exception, e:
                try:
                    rv = self.handle_exception(request, response, e)
                    if rv is not None:
                        response = rv
                except exc.HTTPException, e:
                    response = e
                except Exception, e:
                    response = self._internal_error(e)

            try:
                return response(environ, start_response)
            except Exception, e:
                return self._internal_error(e)(environ, start_response)


def authorize_user(allowed_domains):
    user = users.get_current_user()
    email_info = user.email().split('@')

    if len(email_info) != 2:
        logging.warning('Cannot resolve domain for current user: %r' % user.email())
        return False

    domain = email_info[1]

    if domain not in allowed_domains:
        return False

    return True