Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not use core Airflow Flask related resources in FAB provider (tests of www) #45472

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@
from airflow.providers.fab.www.security import permissions
from airflow.providers.fab.www.security_manager import AirflowSecurityManagerV2
from airflow.providers.fab.www.session import (
AirflowDatabaseSessionInterface,
AirflowDatabaseSessionInterface as FabAirflowDatabaseSessionInterface,
)
from airflow.www.session import AirflowDatabaseSessionInterface

if TYPE_CHECKING:
from airflow.providers.fab.www.security.permissions import RESOURCE_ASSET
Expand Down
44 changes: 29 additions & 15 deletions providers/src/airflow/providers/fab/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,25 @@
# under the License.
from __future__ import annotations

from os.path import isabs

from flask import Flask
from flask_appbuilder import SQLA
from flask_wtf.csrf import CSRFProtect
from sqlalchemy.engine.url import make_url

from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
from airflow.logging_config import configure_logging
from airflow.providers.fab.www.extensions.init_appbuilder import init_appbuilder
from airflow.providers.fab.www.extensions.init_jinja_globals import init_jinja_globals
from airflow.providers.fab.www.extensions.init_manifest_files import configure_manifest_files
from airflow.providers.fab.www.extensions.init_security import init_api_auth, init_xframe_protection
from airflow.providers.fab.www.extensions.init_views import init_error_handlers, init_plugins
from airflow.providers.fab.www.extensions.init_views import (
init_api_auth_provider,
init_api_connexion,
init_api_error_handlers,
init_error_handlers,
init_plugins,
)
from airflow.utils.json import AirflowJsonProvider

app: Flask | None = None

Expand All @@ -41,44 +44,55 @@
csrf = CSRFProtect()


def create_app():
def create_app(config=None, testing=False):
"""Create a new instance of Airflow WWW app."""
flask_app = Flask(__name__)
flask_app.secret_key = conf.get("webserver", "SECRET_KEY")
webserver_config = conf.get_mandatory_value("webserver", "config_file")
# Enable customizations in webserver_config.py to be applied via Flask.current_app.
with flask_app.app_context():
flask_app.config.from_pyfile(webserver_config, silent=True)

flask_app.config["TESTING"] = testing
flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", "SQL_ALCHEMY_CONN")

url = make_url(flask_app.config["SQLALCHEMY_DATABASE_URI"])
if url.drivername == "sqlite" and url.database and not isabs(url.database):
raise AirflowConfigException(
f'Cannot use relative path: `{conf.get("database", "SQL_ALCHEMY_CONN")}` to connect to sqlite. '
"Please use absolute path such as `sqlite:////tmp/airflow.db`."
)
if config:
flask_app.config.from_mapping(config)

if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args()

# Configure the JSON encoder used by `|tojson` filter from Flask
flask_app.json_provider_class = AirflowJsonProvider
flask_app.json = AirflowJsonProvider(flask_app)

csrf.init_app(flask_app)

db = SQLA()
db.session = settings.Session
db.init_app(flask_app)

init_api_auth(flask_app)
configure_logging()
configure_manifest_files(flask_app)
init_api_auth(flask_app)

with flask_app.app_context():
init_appbuilder(flask_app)
init_plugins(flask_app)
init_api_auth_provider(flask_app)
init_error_handlers(flask_app)
init_api_connexion(flask_app)
init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first
init_jinja_globals(flask_app)
init_xframe_protection(flask_app)
return flask_app


def cached_app():
def cached_app(config=None, testing=False):
"""Return cached instance of Airflow WWW app."""
global app
if not app:
app = create_app()
app = create_app(config=config, testing=testing)
return app


Expand Down
125 changes: 125 additions & 0 deletions providers/src/airflow/providers/fab/www/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import logging
from functools import wraps
from typing import TYPE_CHECKING, Callable, TypeVar, cast

from flask import flash, redirect, render_template, request, url_for

from airflow.api_fastapi.app import get_auth_manager
from airflow.auth.managers.models.resource_details import (
AccessView,
DagAccessEntity,
DagDetails,
)
from airflow.configuration import conf
from airflow.utils.net import get_hostname

if TYPE_CHECKING:
from airflow.auth.managers.base_auth_manager import ResourceMethod

T = TypeVar("T", bound=Callable)

log = logging.getLogger(__name__)


def get_access_denied_message():
return conf.get("webserver", "access_denied_message")


def _has_access(*, is_authorized: bool, func: Callable, args, kwargs):
"""
Define the behavior whether the user is authorized to access the resource.
:param is_authorized: whether the user is authorized to access the resource
:param func: the function to call if the user is authorized
:param args: the arguments of ``func``
:param kwargs: the keyword arguments ``func``
:meta private:
"""
if is_authorized:
return func(*args, **kwargs)
elif get_auth_manager().is_logged_in() and not get_auth_manager().is_authorized_view(
access_view=AccessView.WEBSITE
):
return (
render_template(
"airflow/no_roles_permissions.html",
hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "",
logout_url=get_auth_manager().get_url_logout(),
),
403,
)
elif not get_auth_manager().is_logged_in():
return redirect(get_auth_manager().get_url_login(next_url=request.url))
else:
access_denied = get_access_denied_message()
flash(access_denied, "danger")
return redirect(url_for("Airflow.index"))


def has_access_dag(method: ResourceMethod, access_entity: DagAccessEntity | None = None) -> Callable[[T], T]:
def has_access_decorator(func: T):
@wraps(func)
def decorated(*args, **kwargs):
dag_id_kwargs = kwargs.get("dag_id")
dag_id_args = request.args.get("dag_id")
dag_id_form = request.form.get("dag_id")
dag_id_json = request.json.get("dag_id") if request.is_json else None
all_dag_ids = [dag_id_kwargs, dag_id_args, dag_id_form, dag_id_json]
unique_dag_ids = set(dag_id for dag_id in all_dag_ids if dag_id is not None)

if len(unique_dag_ids) > 1:
log.warning(
"There are different dag_ids passed in the request: %s. Returning 403.", unique_dag_ids
)
log.warning(
"kwargs: %s, args: %s, form: %s, json: %s",
dag_id_kwargs,
dag_id_args,
dag_id_form,
dag_id_json,
)
return (
render_template(
"airflow/no_roles_permissions.html",
hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "",
logout_url=get_auth_manager().get_url_logout(),
),
403,
)
dag_id = unique_dag_ids.pop() if unique_dag_ids else None

is_authorized = get_auth_manager().is_authorized_dag(
method=method,
access_entity=access_entity,
details=None if not dag_id else DagDetails(id=dag_id),
)

return _has_access(
is_authorized=is_authorized,
func=func,
args=args,
kwargs=kwargs,
)

return cast(T, decorated)

return has_access_decorator
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from flask_appbuilder.views import IndexView

from airflow import settings
from airflow.api_fastapi.app import create_auth_manager
from airflow.api_fastapi.app import create_auth_manager, get_auth_manager
from airflow.configuration import conf
from airflow.providers.fab.www.security_manager import AirflowSecurityManagerV2

Expand Down Expand Up @@ -283,6 +283,8 @@ def _add_admin_views(self):
self.indexview = self._check_and_init(self.indexview)
self.add_view_no_menu(self.indexview)

get_auth_manager().register_views()

def _add_addon_views(self):
"""Register declared addons."""
for addon in self._addon_managers:
Expand Down
97 changes: 95 additions & 2 deletions providers/src/airflow/providers/fab/www/extensions/init_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,47 @@

import logging
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING

from connexion import Resolver
from connexion import FlaskApi, Resolver
from connexion.decorators.validation import RequestBodyValidator
from connexion.exceptions import BadRequestProblem
from connexion.exceptions import BadRequestProblem, ProblemException
from flask import request

from airflow.api_connexion.exceptions import common_error_handler
from airflow.api_fastapi.app import get_auth_manager
from airflow.configuration import conf
from airflow.providers.fab.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
from airflow.utils.yaml import safe_load

if TYPE_CHECKING:
from flask import Flask

log = logging.getLogger(__name__)

# providers/src/airflow/providers/fab/www/extensions/init_views.py => airflow/
ROOT_APP_DIR = Path(__file__).parents[7].joinpath("airflow").resolve()


def set_cors_headers_on_response(response):
"""Add response headers."""
allow_headers = conf.get("api", "access_control_allow_headers")
allow_methods = conf.get("api", "access_control_allow_methods")
allow_origins = conf.get("api", "access_control_allow_origins")
if allow_headers:
response.headers["Access-Control-Allow-Headers"] = allow_headers
if allow_methods:
response.headers["Access-Control-Allow-Methods"] = allow_methods
if allow_origins == "*":
response.headers["Access-Control-Allow-Origin"] = "*"
elif allow_origins:
allowed_origins = allow_origins.split(" ")
Copy link
Preview

Copilot AI Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The allow_origins variable should be split by commas instead of spaces to handle multiple origins more accurately.

Suggested change
allowed_origins = allow_origins.split(" ")
allowed_origins = allow_origins.split(",")

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options
origin = request.environ.get("HTTP_ORIGIN", allowed_origins[0])
if origin in allowed_origins:
response.headers["Access-Control-Allow-Origin"] = origin
return response


class _LazyResolution:
"""
Expand Down Expand Up @@ -78,6 +108,59 @@ def validate_schema(self, data, url):
return super().validate_schema(data, url)


base_paths: list[str] = [] # contains the list of base paths that have api endpoints


def init_api_error_handlers(app: Flask) -> None:
"""Add error handlers for 404 and 405 errors for existing API paths."""

@app.errorhandler(404)
def _handle_api_not_found(ex):
if any([request.path.startswith(p) for p in base_paths]):
# 404 errors are never handled on the blueprint level
# unless raised from a view func so actual 404 errors,
# i.e. "no route for it" defined, need to be handled
# here on the application level
return common_error_handler(ex)
else:
from airflow.providers.fab.www.views import not_found
Copy link
Preview

Copilot AI Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The not_found function should be imported at the top of the file for clarity and to avoid potential issues with circular imports.

Suggested change
from airflow.providers.fab.www.views import not_found
return not_found(ex)

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Positive Feedback
Negative Feedback

Provide additional feedback

Please help us improve GitHub Copilot by sharing more details about this comment.

Please select one or more of the options

return not_found(ex)

@app.errorhandler(405)
def _handle_method_not_allowed(ex):
if any([request.path.startswith(p) for p in base_paths]):
return common_error_handler(ex)
else:
from airflow.providers.fab.www.views import method_not_allowed

return method_not_allowed(ex)

app.register_error_handler(ProblemException, common_error_handler)


def init_api_connexion(app: Flask) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are probably at a point we can rip out the old api, to be honest - I think aip-84 is far enough along.

If we want to keep it for now, that's fine, however we might want to leave a breadcrumb that this needs to go before AF3.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Though, removing it make some tests fail. Some tests in FAB provider are using Rest API endpoints (e.g. /api/v1/pools in providers/tests/fab/auth_manager/api_endpoints/test_auth.py). So we need to update these tests as well. I prefer doing it in a separate PR because I think this one is big enough already.

"""Initialize Stable API."""
base_path = "/api/v1"
base_paths.append(base_path)

with ROOT_APP_DIR.joinpath("api_connexion", "openapi", "v1.yaml").open() as f:
specification = safe_load(f)
api_bp = FlaskApi(
specification=specification,
resolver=_LazyResolver(),
base_path=base_path,
options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
strict_validation=True,
validate_responses=True,
validator_map={"body": _CustomErrorRequestBodyValidator},
).blueprint
api_bp.after_request(set_cors_headers_on_response)

app.register_blueprint(api_bp)
app.extensions["csrf"].exempt(api_bp)


def init_plugins(app):
"""Integrate Flask and FAB with plugins."""
from airflow import plugins_manager
Expand Down Expand Up @@ -118,3 +201,13 @@ def init_error_handlers(app: Flask):

app.register_error_handler(500, views.show_traceback)
app.register_error_handler(404, views.not_found)


def init_api_auth_provider(app):
"""Initialize the API offered by the auth manager."""
auth_mgr = get_auth_manager()
blueprint = auth_mgr.get_api_endpoints()
if blueprint:
base_paths.append(blueprint.url_prefix)
app.register_blueprint(blueprint)
app.extensions["csrf"].exempt(blueprint)
Loading
Loading