git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How Airflow import modules as it executes the tasks


You are right, but that's within the same process. The way each operator
gets executed is that one `airflow run` command get generated and sent to
the local executor, executor spun up subprocesses to run `airflow run
--raw` (which parses the file again and calls the operator.execute()). Thus
each task would have its own process that parses the *.py file and import
the module multiple times.

Hope this helped, cheers
Kevin Y

On Tue, May 15, 2018 at 7:57 PM, alireza.khoshkbari@xxxxxxxxx <
alireza.khoshkbari@xxxxxxxxx> wrote:

> Thanks Kevin. Yes, I'm importing db in different operators. That said, my
> understanding is if a module has already been imported, it's not loaded
> again even if you try to import it again (and I reckon this is why in
> Python Singleton is not commonly used). Is that right?
>
> On 2018/05/16 02:34:18, Ruiqin Yang <yrqls21@xxxxxxxxx> wrote:
> > Not exactly answering your question but the reason db.py is loaded in
> each
> > task might be because you have something like `import db` in each of your
> > *.py file, and Airflow spun up one process to parse one *.py file, thus
> > your db.py was loaded multiple time.
> >
> > I'm not sure how you can share the connection pool if it is created
> within
> > the same process your operator is in, since Airflow would spun up one
> > process for each task even it is LocalExecutor. You might have to make
> the
> > connection pool available to outside processes (this part Idk how it can
> be
> > done) to be able to share it.
> >
> > Cheers,
> > Kevin Y
> >
> > On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@xxxxxxxxx <
> > alireza.khoshkbari@xxxxxxxxx> wrote:
> >
> > > To start off, here is my project structure:
> > > ├── dags
> > > │   ├── __init__.py
> > > │   ├── core
> > > │   │   ├── __init__.py
> > > │   │   ├── operators
> > > │   │   │   ├── __init__.py
> > > │   │   │   ├── first_operator.py
> > > │   │   └── util
> > > │   │       ├── __init__.py
> > > │   │       ├── db.py
> > > │   ├── my_dag.py
> > >
> > > Here is the versions and details of the airflow docker setup:
> > >
> > > In my dag in different tasks I'm connecting to db (not Airflow db).
> I've
> > > setup db connection pooling,  I expected that my db.py would be be
> loaded
> > > once across the DagRun. However, in the log I can see that each task
> > > imports the module and new db connections made by each and every task.
> I
> > > can see that db.py is loaded in each task by having the line below in
> db.py:
> > >
> > > logging.info("I was loaded {}".format(random.randint(0,100)))
> > >
> > > I understand that each operator can technically be run in a separate
> > > machine and it does make sense that each task runs sort of
> independently.
> > > However, not sure that if this does apply in case of using
> LocalExecutor.
> > > Now the question is, how I can share the resources (db connections)
> across
> > > tasks using LocalExecutor.
> > >
> >
>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03377.html on line 146
Call Stack
#TimeMemoryFunctionLocation
10.0008368792{main}( ).../msg03377.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03377.html on line 146
Call Stack
#TimeMemoryFunctionLocation
10.0008368792{main}( ).../msg03377.html:0