Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using celery + gevent cause exception to be thrown from ExecutorSafeguard #45475

Open
2 tasks done
collinpowerkariman opened this issue Jan 8, 2025 · 2 comments
Open
2 tasks done

Comments

@collinpowerkariman
Copy link

Apache Airflow Provider(s)

celery

Versions of Apache Airflow Providers

apache-airflow==2.10.4
apache-airflow-providers-amazon==9.1.0
apache-airflow-providers-celery==3.8.5
apache-airflow-providers-cncf-kubernetes==10.0.1
apache-airflow-providers-common-compat==1.2.2
apache-airflow-providers-common-io==1.4.2
apache-airflow-providers-common-sql==1.20.0
apache-airflow-providers-docker==3.14.1
apache-airflow-providers-elasticsearch==5.5.3
apache-airflow-providers-fab==1.5.1
apache-airflow-providers-ftp==3.11.1
apache-airflow-providers-google==11.0.0
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-hashicorp==3.8.0
apache-airflow-providers-http==4.13.3
apache-airflow-providers-imap==3.7.0
apache-airflow-providers-microsoft-azure==11.1.0
apache-airflow-providers-mysql==5.7.4
apache-airflow-providers-odbc==4.8.1
apache-airflow-providers-openlineage==1.14.0
apache-airflow-providers-postgres==5.14.0
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-sendgrid==3.6.0
apache-airflow-providers-sftp==4.11.1
apache-airflow-providers-slack==8.9.2
apache-airflow-providers-smtp==1.8.1
apache-airflow-providers-snowflake==5.8.1
apache-airflow-providers-sqlite==3.9.1
apache-airflow-providers-ssh==3.14.0
google-cloud-orchestration-airflow==1.15.1

Apache Airflow version

2.10.4

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Official Apache Airflow Helm Chart

Deployment details

environment variables:

  • AIRFLOW__CELERY__POOL=gevent
  • _AIRFLOW_PATCH_GEVENT=1

What happened

When using gevent pool on celery and enabling gevent monkey patch using envvar _AIRFLOW_PATCH_GEVENT=1 cause task to failed, with the following traceback.

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 413, in wrapper
    cls._sentinel.callers[sentinel_key] = sentinel
    ^^^^^^^^^^^^^^^^^^^^^
  File "src/gevent/local.py", line 410, in gevent._gevent_clocal.local.__getattribute__
AttributeError: 'gevent._gevent_clocal.local' object has no attribute 'callers'

What you think should happen instead

No response

How to reproduce

  1. Use celery worker with following envvar AIRFLOW__CELERY__POOL=gevent and _AIRFLOW_PATCH_GEVENT=1.
  2. Create new DAG with simple bash operator that sleep for 5 seconds.
  3. Unpause the new DAG.

Anything else

If _AIRFLOW_PATCH_GEVENT is not set, the task finish successfully but the subsequent task run will hang in queued state and when checking worker log show RecursionError: maximum recursion depth exceeded.

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/log/s3_task_handler.py", line 167, in s3_write
    if append and self.s3_log_exists(remote_log_location):
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/log/s3_task_handler.py", line 134, in s3_log_exists
    return self.hook.check_for_key(remote_log_location)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 152, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 125, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 958, in check_for_key
    obj = self.head_object(key, bucket_name)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 152, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 125, in wrapper
    return func(*bound_args.args, **bound_args.kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/s3.py", line 938, in head_object
    return self.get_conn().head_object(Bucket=bucket_name, Key=key)
           ^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 783, in get_conn
    return self.conn
           ^^^^^^^^^
  File "/usr/local/lib/python3.12/functools.py", line 995, in __get__
    val = self.func(instance)
          ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 740, in conn
    return self.get_client_type(region_name=self.region_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/hooks/base_aws.py", line 710, in get_client_type
    return session.client(
           ^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/boto3/session.py", line 299, in client
    return self._session.create_client(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/session.py", line 951, in create_client
    credentials = self.get_credentials()
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/session.py", line 507, in get_credentials
    self._credentials = self._components.get_component(
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/session.py", line 1108, in get_component
    self._components[name] = factory()
                             ^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/session.py", line 186, in _create_credential_resolver
    return botocore.credentials.create_credential_resolver(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/credentials.py", line 92, in create_credential_resolver
    container_provider = ContainerProvider()
                         ^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/credentials.py", line 1893, in __init__
    fetcher = ContainerMetadataFetcher()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/utils.py", line 2910, in __init__
    session = botocore.httpsession.URLLib3Session(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/httpsession.py", line 323, in __init__
    self._manager = PoolManager(**self._get_pool_manager_kwargs())
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/httpsession.py", line 341, in _get_pool_manager_kwargs
    'ssl_context': self._get_ssl_context(),
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/httpsession.py", line 350, in _get_ssl_context
    return create_urllib3_context()
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/airflow/.local/lib/python3.12/site-packages/botocore/httpsession.py", line 139, in create_urllib3_context
    context.options |= options
    ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/ssl.py", line 561, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/lib/python3.12/ssl.py", line 561, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/lib/python3.12/ssl.py", line 561, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  [Previous line repeated 929 more times]
RecursionError: maximum recursion depth exceeded

I think this happen because the monkey patch in celery_command.py does not happen early enough as is clearly shown by warning of the 2nd line of this logs.

/home/airflow/.local/lib/python3.12/site-packages/airflow/metrics/statsd_logger.py:184 RemovedInAirflow3Warning: The basic metric validator will be deprecated in the future in favor of pattern-matching.  You can try this now by setting config option metrics_use_pattern_match to True.
/home/airflow/.local/lib/python3.12/site-packages/celery/__init__.py:113 MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported may lead to errors, including RecursionError on Python 3.6. It may also silently lead to incorrect behaviour on Python 3.7. Please monkey-patch earlier. See https://github.com/gevent/gevent/issues/1016. Modules that had direct imports (NOT patched): ['urllib3.util.ssl_ (/home/airflow/.local/lib/python3.12/site-packages/urllib3/util/ssl_.py)', 'aiohttp.client_exceptions (/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_exceptions.py)', 'urllib3.contrib.pyopenssl (/home/airflow/.local/lib/python3.12/site-packages/urllib3/contrib/pyopenssl.py)', 'aiohttp.connector (/home/airflow/.local/lib/python3.12/site-packages/aiohttp/connector.py)', 'aiohttp.client_reqrep (/home/airflow/.local/lib/python3.12/site-packages/aiohttp/client_reqrep.py)', 'urllib3.util (/home/airflow/.local/lib/python3.12/site-packages/urllib3/util/__init__.py)', 'jwt.jwks_client (/home/airflow/.local/lib/python3.12/site-packages/jwt/jwks_client.py)', 'botocore.httpsession (/home/airflow/.local/lib/python3.12/site-packages/botocore/httpsession.py)'].
[2025-01-07 11:15:35 +0000] [15] [INFO] Starting gunicorn 23.0.0
[2025-01-07 11:15:35 +0000] [15] [INFO] Listening at: http://[::]:8793 (15)
[2025-01-07 11:15:35 +0000] [15] [INFO] Using worker: sync
[2025-01-07 11:15:35 +0000] [16] [INFO] Booting worker with pid: 16
[2025-01-07 11:15:35 +0000] [17] [INFO] Booting worker with pid: 17

 -------------- celery@airflow-ptu-worker-alt-pool-5b78c98f5f-6zxg2 v5.4.0 (opalescent)
--- ***** -----
-- ******* ---- Linux-6.1.112-x86_64-with-glibc2.36 2025-01-07 11:15:36
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.providers.celery.executors.celery_executor:0x7f8457cd8b90
- ** ---------- .> transport:   ***
- ** ---------- .> results:     ***
- *** --- * --- .> concurrency: 24 (gevent)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> alt_pool         exchange=alt_pool(direct) key=alt_pool


[tasks]
  . airflow.providers.celery.executors.celery_executor_utils.execute_command

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@collinpowerkariman collinpowerkariman added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 8, 2025
Copy link

boring-cyborg bot commented Jan 8, 2025

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Jan 8, 2025
@collinpowerkariman
Copy link
Author

collinpowerkariman commented Jan 9, 2025

I think this happen because of the callers attribute was set outside of the gevent's loop, i think changing the way callers is set to this should fix it, i haven't try this but once i have the time i will and report back.

class ExecutorSafeguard:
    """
    The ExecutorSafeguard decorator.

    Checks if the execute method of an operator isn't manually called outside
    the TaskInstance as we want to avoid bad mixing between decorated and
    classic operators.
    """

    test_mode = conf.getboolean("core", "unit_test_mode")
    _sentinel = local()

    @classmethod
    def decorator(cls, func):
        if not hasattr(cls._sentinel, "callers"):
            cls._sentinel.callers = {}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants