git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How Airflow import modules as it executes the tasks


Not exactly answering your question but the reason db.py is loaded in each
task might be because you have something like `import db` in each of your
*.py file, and Airflow spun up one process to parse one *.py file, thus
your db.py was loaded multiple time.

I'm not sure how you can share the connection pool if it is created within
the same process your operator is in, since Airflow would spun up one
process for each task even it is LocalExecutor. You might have to make the
connection pool available to outside processes (this part Idk how it can be
done) to be able to share it.

Cheers,
Kevin Y

On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@xxxxxxxxx <
alireza.khoshkbari@xxxxxxxxx> wrote:

> To start off, here is my project structure:
> ├── dags
> │   ├── __init__.py
> │   ├── core
> │   │   ├── __init__.py
> │   │   ├── operators
> │   │   │   ├── __init__.py
> │   │   │   ├── first_operator.py
> │   │   └── util
> │   │       ├── __init__.py
> │   │       ├── db.py
> │   ├── my_dag.py
>
> Here is the versions and details of the airflow docker setup:
>
> In my dag in different tasks I'm connecting to db (not Airflow db). I've
> setup db connection pooling,  I expected that my db.py would be be loaded
> once across the DagRun. However, in the log I can see that each task
> imports the module and new db connections made by each and every task. I
> can see that db.py is loaded in each task by having the line below in db.py:
>
> logging.info("I was loaded {}".format(random.randint(0,100)))
>
> I understand that each operator can technically be run in a separate
> machine and it does make sense that each task runs sort of independently.
> However, not sure that if this does apply in case of using LocalExecutor.
> Now the question is, how I can share the resources (db connections) across
> tasks using LocalExecutor.
>