git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How Airflow import modules as it executes the tasks


Thanks for the explanation, really helpful.

Cheers,
Ali

On 2018/05/16 03:27:27, Ruiqin Yang <yrqls21@xxxxxxxxx> wrote: 
> You are right, but that's within the same process. The way each operator
> gets executed is that one `airflow run` command get generated and sent to
> the local executor, executor spun up subprocesses to run `airflow run
> --raw` (which parses the file again and calls the operator.execute()). Thus
> each task would have its own process that parses the *.py file and import
> the module multiple times.
> 
> Hope this helped, cheers
> Kevin Y
> 
> On Tue, May 15, 2018 at 7:57 PM, alireza.khoshkbari@xxxxxxxxx <
> alireza.khoshkbari@xxxxxxxxx> wrote:
> 
> > Thanks Kevin. Yes, I'm importing db in different operators. That said, my
> > understanding is if a module has already been imported, it's not loaded
> > again even if you try to import it again (and I reckon this is why in
> > Python Singleton is not commonly used). Is that right?
> >
> > On 2018/05/16 02:34:18, Ruiqin Yang <yrqls21@xxxxxxxxx> wrote:
> > > Not exactly answering your question but the reason db.py is loaded in
> > each
> > > task might be because you have something like `import db` in each of your
> > > *.py file, and Airflow spun up one process to parse one *.py file, thus
> > > your db.py was loaded multiple time.
> > >
> > > I'm not sure how you can share the connection pool if it is created
> > within
> > > the same process your operator is in, since Airflow would spun up one
> > > process for each task even it is LocalExecutor. You might have to make
> > the
> > > connection pool available to outside processes (this part Idk how it can
> > be
> > > done) to be able to share it.
> > >
> > > Cheers,
> > > Kevin Y
> > >
> > > On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@xxxxxxxxx <
> > > alireza.khoshkbari@xxxxxxxxx> wrote:
> > >
> > > > To start off, here is my project structure:
> > > > ├── dags
> > > > │   ├── __init__.py
> > > > │   ├── core
> > > > │   │   ├── __init__.py
> > > > │   │   ├── operators
> > > > │   │   │   ├── __init__.py
> > > > │   │   │   ├── first_operator.py
> > > > │   │   └── util
> > > > │   │       ├── __init__.py
> > > > │   │       ├── db.py
> > > > │   ├── my_dag.py
> > > >
> > > > Here is the versions and details of the airflow docker setup:
> > > >
> > > > In my dag in different tasks I'm connecting to db (not Airflow db).
> > I've
> > > > setup db connection pooling,  I expected that my db.py would be be
> > loaded
> > > > once across the DagRun. However, in the log I can see that each task
> > > > imports the module and new db connections made by each and every task.
> > I
> > > > can see that db.py is loaded in each task by having the line below in
> > db.py:
> > > >
> > > > logging.info("I was loaded {}".format(random.randint(0,100)))
> > > >
> > > > I understand that each operator can technically be run in a separate
> > > > machine and it does make sense that each task runs sort of
> > independently.
> > > > However, not sure that if this does apply in case of using
> > LocalExecutor.
> > > > Now the question is, how I can share the resources (db connections)
> > across
> > > > tasks using LocalExecutor.
> > > >
> > >
> >
> 



( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03379.html on line 154
Call Stack
#TimeMemoryFunctionLocation
10.0008368792{main}( ).../msg03379.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03379.html on line 154
Call Stack
#TimeMemoryFunctionLocation
10.0008368792{main}( ).../msg03379.html:0