[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to wait for external process

Haven't done this, but we'll have a similar need in the future, so have
investigated a little.

What about a design pattern something like this:

1) When jobs are done (ready for further processing) they publish those
details to a queue (such as GC Pub/Sub or any other sort of queue)

2) A single "listener" DAG sits and periodically checks that queue.  If it
finds anything on it, it triggers (via DAG trigger) all of the DAGs which
are on the queue.*

* = if your triggering volume is too high, this may cause airflow issues w/
too many going at once; this could presumably be solved then via custom
rate-limiting on firing these

3) The listener DAG resets itself (triggers itself)

On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko <fokko@xxxxxxxxxxxxxx>

> Hi Stefan,
> Afaik there isn't a more efficient way of doing this. DAGs that are relying
> on a lot of sensors are experiencing the same issues. The only way right
> now, I can think of, is doing updating the state directly in the database.
> But then you need to know what you are doing. I can image that this would
> be feasible by using an AWS lambda function. Hope this helps.
> Cheers, Fokko
> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann <mail@xxxxxxxxxxxxxxxxxx>:
> > Hello,
> >
> > I have a DAG (externally triggered) where some processing is done at an
> > external system (EC2 instance). The processing is started by an Airflow
> > task (via HTTP request). The DAG should only continue once that
> > processing is completed. In a first naive implementation I created a
> > sensor that gets the progress (via HTTP request) and only if status is
> > "finished" returns true and the DAG run continues. That works but...
> >
> > ... the external processing can take hours or days, and during that time
> > a worker is occupied which does nothing but HTTP GET and sleep. There
> > will be hundreds of DAG runs in parallel which means hundreds of workers
> > are occupied.
> >
> > I looked into other operators that do computation on external systems
> > (ECSOperator, AWSBatchOperator) but they also follow that pattern and
> > just wait/sleep.
> >
> > So I want to ask if there is a more efficient way to build such a
> > workflow with Airflow?
> >
> > Kind Regards,
> > Stefan
> >

( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03486.html on line 127
Call Stack
10.0008364664{main}( ).../msg03486.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03486.html on line 127
Call Stack
10.0008364664{main}( ).../msg03486.html:0