git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: celery problem: cannot override celery_broker_transport_options


To use with the SQLA backend to celery you need to override the options Airflow passes to Celery. Those come from https://github.com/apache/incubator-airflow/blob/v1-10-test/airflow/config_templates/default_celery.py

Since you don't want most/all of those options (and there is no way in the config file to _remove_ a setting) you will have to point airflow to a different file for the celery config:

This line in the config is what you will need to change:

    # Import path for celery configuration options
    celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

If you create something like config/celery_config.py containing:


    CELERY_CONFIG = {
        # Just the options you want to set
    }


(config/ should exist along side your dags/ folder, and I think it should be added to the python path already). You can then set this in the config:

    celery_config_options = celery_config.CELERY_CONFIG

That should give you complete control

> On 21 May 2018, at 09:50, Craig Rodrigues <crodr001@xxxxxxxxx> wrote:
> 
> Hi,
> 
> I used this requirements.txt file to install airflow from the v1-10-test branch:
> 
> git+https://github.com/celery/celery@master#egg=celery
> git+https://github.com/apache/incubator-airflow@v1-10-test#egg=apache-airflow[celery,crypto,emr,hive,hdfs,ldap,mysql,postgres,redis,slack,s3]
> kombu>=4.1.0
> 
> 
> In my airflow.cfg, I have:
> 
> [celery]
> executor = CeleryExecutor
> 
> executor = CeleryExec
> broker_url = sqla+mysql://airflow:blah@localhost:3306/mydb
> 
> [celery_broker_transport_options]
> #
> #
> 
> However, if I manually run this code inside the webserver, I see:
> 
> python -c "from airflow import configuration; c = configuration.conf.getsection('celery_broker_transport_options'); print(c)"
> OrderedDict([(u'visibility_timeout', 21600), (u'ssl_active', False), (u'ssl_key', u''), (u'ssl_cert', u''), (u'ssl_cacert', u'')])
> 
> My worker crashes with this error:
> 
> 
> [2018-05-21 07:46:12,406] {configuration.py:212} WARNING - section/key [celery/ssl_active] not found in config
> [2018-05-21 07:46:12,407] {default_celery.py:51} WARNING - Celery Executor will run without SSL
> [2018-05-21 07:46:12,411] {__init__.py:48} INFO - Using executor CeleryExecutor
> [2018-05-21 07:46:13,086: CRITICAL/MainProcess] Unrecoverable error: TypeError(u"Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.",)
> Traceback (most recent call last):
>  File "/usr/lib/python2.7/site-packages/celery/worker/worker.py", line 205, in start
>    self.blueprint.start(self)
>  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
>    step.start(parent)
>  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 369, in start
>    return self.obj.start()
>  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 322, in start
>    blueprint.start(self)
>  File "/usr/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start
>    step.start(parent)
>  File "/usr/lib/python2.7/site-packages/celery/worker/consumer/tasks.py", line 41, in start
>    c.connection, on_decode_error=c.on_decode_error,
>  File "/usr/lib/python2.7/site-packages/celery/app/amqp.py", line 297, in TaskConsumer
>    **kw
>  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 386, in __init__
>    self.revive(self.channel)
>  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 408, in revive
>    self.declare()
>  File "/usr/lib/python2.7/site-packages/kombu/messaging.py", line 421, in declare
>    queue.declare()
>  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 605, in declare
>    self._create_queue(nowait=nowait, channel=channel)
>  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 614, in _create_queue
>    self.queue_declare(nowait=nowait, passive=False, channel=channel)
>  File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 649, in queue_declare
>    nowait=nowait,
>  File "/usr/lib/python2.7/site-packages/kombu/transport/virtual/base.py", line 531, in queue_declare
>    self._new_queue(queue, **kwargs)
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 82, in _new_queue
>    self._get_or_create(queue)
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 70, in _get_or_create
>    obj = self.session.query(self.queue_cls) \
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 65, in session
>    _, Session = self._open()
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 56, in _open
>    engine = self._engine_from_config()
>  File "/usr/lib/python2.7/site-packages/kombu/transport/sqlalchemy/__init__.py", line 51, in _engine_from_config
>    return create_engine(conninfo.hostname, **transport_options)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/__init__.py", line 391, in create_engine
>    return strategy.create(*args, **kwargs)
>  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/strategies.py", line 160, in create
>    engineclass.__name__))
> TypeError: Invalid argument(s) 'ssl_key','ssl_cert','ssl_active','visibility_timeout','ssl_cacert' sent to create_engine(), using configuration MySQLDialect_mysqldb/QueuePool/Engine.  Please check that the keyword arguments are appropriate for this combination of components.
> 
> -------------- celery@qa1 v4.2.0rc3 (windowlicker)
> ---- **** ----- 
> --- * ***  * -- Linux-4.13.0-16-generic-x86_64-with-centos-7.3.1611-Core 2018-05-21 07:46:12
> -- * - **** --- 
> - ** ---------- [config]
> - ** ---------- .> app:         airflow.executors.celery_executor:0x4766d50
> - ** ---------- .> transport:   sqla+mysql://airflow:blah@localhost:3306/mydb
> - ** ---------- .> results:     mysql://airflow:**@localhost:3306/airflow
> - *** --- * --- .> concurrency: 16 (prefork)
> -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
> --- ***** ----- 
> -------------- [queues]
>                .> airflow_celery   exchange=airflow_celery(direct) key=airflow_celery
> 
> 
> 
> What is the correct way to override the celery_broker_transport_options?
> I thought that having an empty section in airflow.cfg would be enough?
> 
> I thought that this was fixed with: https://github.com/apache/incubator-airflow/pull/2842
> 
> 
> I cannot pass visibilty_timeout or ssl_key to a mysql backend.
> --
> Craig
> 
> 
> 
> 
> 
> 
>