git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How Airflow import modules as it executes the tasks


Thanks Kevin. Yes, I'm importing db in different operators. That said, my understanding is if a module has already been imported, it's not loaded again even if you try to import it again (and I reckon this is why in Python Singleton is not commonly used). Is that right?

On 2018/05/16 02:34:18, Ruiqin Yang <yrqls21@xxxxxxxxx> wrote: 
> Not exactly answering your question but the reason db.py is loaded in each
> task might be because you have something like `import db` in each of your
> *.py file, and Airflow spun up one process to parse one *.py file, thus
> your db.py was loaded multiple time.
> 
> I'm not sure how you can share the connection pool if it is created within
> the same process your operator is in, since Airflow would spun up one
> process for each task even it is LocalExecutor. You might have to make the
> connection pool available to outside processes (this part Idk how it can be
> done) to be able to share it.
> 
> Cheers,
> Kevin Y
> 
> On Tue, May 15, 2018 at 6:21 PM, alireza.khoshkbari@xxxxxxxxx <
> alireza.khoshkbari@xxxxxxxxx> wrote:
> 
> > To start off, here is my project structure:
> > ├── dags
> > │   ├── __init__.py
> > │   ├── core
> > │   │   ├── __init__.py
> > │   │   ├── operators
> > │   │   │   ├── __init__.py
> > │   │   │   ├── first_operator.py
> > │   │   └── util
> > │   │       ├── __init__.py
> > │   │       ├── db.py
> > │   ├── my_dag.py
> >
> > Here is the versions and details of the airflow docker setup:
> >
> > In my dag in different tasks I'm connecting to db (not Airflow db). I've
> > setup db connection pooling,  I expected that my db.py would be be loaded
> > once across the DagRun. However, in the log I can see that each task
> > imports the module and new db connections made by each and every task. I
> > can see that db.py is loaded in each task by having the line below in db.py:
> >
> > logging.info("I was loaded {}".format(random.randint(0,100)))
> >
> > I understand that each operator can technically be run in a separate
> > machine and it does make sense that each task runs sort of independently.
> > However, not sure that if this does apply in case of using LocalExecutor.
> > Now the question is, how I can share the resources (db connections) across
> > tasks using LocalExecutor.
> >
>