git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to wait for external process


Thanks Christopher for the idea. That would work, we already have such a
"listener" that polls a queue (SQS) and creates the DAG runs. However it
would have been nice to have the full process in one DAG to have a
better overview about running jobs and leverage the gantt chart, but I
think this can be accomplished via custom plugins and views.

On 05/28/2018 08:43 PM, Christopher Bockman wrote:
> Haven't done this, but we'll have a similar need in the future, so have
> investigated a little.
> 
> What about a design pattern something like this:
> 
> 1) When jobs are done (ready for further processing) they publish those
> details to a queue (such as GC Pub/Sub or any other sort of queue)
> 
> 2) A single "listener" DAG sits and periodically checks that queue.  If it
> finds anything on it, it triggers (via DAG trigger) all of the DAGs which
> are on the queue.*
> 
> * = if your triggering volume is too high, this may cause airflow issues w/
> too many going at once; this could presumably be solved then via custom
> rate-limiting on firing these
> 
> 3) The listener DAG resets itself (triggers itself)
> 
> 
> On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko <fokko@xxxxxxxxxxxxxx>
> wrote:
> 
>> Hi Stefan,
>>
>> Afaik there isn't a more efficient way of doing this. DAGs that are relying
>> on a lot of sensors are experiencing the same issues. The only way right
>> now, I can think of, is doing updating the state directly in the database.
>> But then you need to know what you are doing. I can image that this would
>> be feasible by using an AWS lambda function. Hope this helps.
>>
>> Cheers, Fokko
>>
>> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann <mail@xxxxxxxxxxxxxxxxxx>:
>>
>>> Hello,
>>>
>>> I have a DAG (externally triggered) where some processing is done at an
>>> external system (EC2 instance). The processing is started by an Airflow
>>> task (via HTTP request). The DAG should only continue once that
>>> processing is completed. In a first naive implementation I created a
>>> sensor that gets the progress (via HTTP request) and only if status is
>>> "finished" returns true and the DAG run continues. That works but...
>>>
>>> ... the external processing can take hours or days, and during that time
>>> a worker is occupied which does nothing but HTTP GET and sleep. There
>>> will be hundreds of DAG runs in parallel which means hundreds of workers
>>> are occupied.
>>>
>>> I looked into other operators that do computation on external systems
>>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
>>> just wait/sleep.
>>>
>>> So I want to ask if there is a more efficient way to build such a
>>> workflow with Airflow?
>>>
>>> Kind Regards,
>>> Stefan
>>>
>>
> 




( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03488.html on line 137
Call Stack
#TimeMemoryFunctionLocation
10.0008368760{main}( ).../msg03488.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03488.html on line 137
Call Stack
#TimeMemoryFunctionLocation
10.0008368760{main}( ).../msg03488.html:0