git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to wait for external process


hi,

here's another vote for persistence. we did similar thing where processing state is stored in the database. there is no part of the DAG that does a periodic check. the DAG retriggers itself and its very first task is to figure out if there is work to do or bail out.

> On May 28, 2018, at 4:28 PM, Ananth Durai <vananth22@xxxxxxxxx> wrote:
> 
> Since you already on AWS, the simplest thing I could think of is to write a
> signal file once the job finished and the downstream job waiting for the
> signal file. In other words, the same pattern how the Hadoop jobs writing
> `_SUCCESS` file and the downstream jobs depends on the signal file.
> 
> Regards,
> Ananth.P,
> 
> 
> 
> 
> 
> 
> On 28 May 2018 at 13:06, Stefan Seelmann <mail@xxxxxxxxxxxxxxxxxx> wrote:
> 
>> Thanks Christopher for the idea. That would work, we already have such a
>> "listener" that polls a queue (SQS) and creates the DAG runs. However it
>> would have been nice to have the full process in one DAG to have a
>> better overview about running jobs and leverage the gantt chart, but I
>> think this can be accomplished via custom plugins and views.
>> 
>> On 05/28/2018 08:43 PM, Christopher Bockman wrote:
>>> Haven't done this, but we'll have a similar need in the future, so have
>>> investigated a little.
>>> 
>>> What about a design pattern something like this:
>>> 
>>> 1) When jobs are done (ready for further processing) they publish those
>>> details to a queue (such as GC Pub/Sub or any other sort of queue)
>>> 
>>> 2) A single "listener" DAG sits and periodically checks that queue.  If
>> it
>>> finds anything on it, it triggers (via DAG trigger) all of the DAGs which
>>> are on the queue.*
>>> 
>>> * = if your triggering volume is too high, this may cause airflow issues
>> w/
>>> too many going at once; this could presumably be solved then via custom
>>> rate-limiting on firing these
>>> 
>>> 3) The listener DAG resets itself (triggers itself)
>>> 
>>> 
>>> On Mon, May 28, 2018 at 7:17 AM, Driesprong, Fokko <fokko@xxxxxxxxxxxxxx
>>> 
>>> wrote:
>>> 
>>>> Hi Stefan,
>>>> 
>>>> Afaik there isn't a more efficient way of doing this. DAGs that are
>> relying
>>>> on a lot of sensors are experiencing the same issues. The only way right
>>>> now, I can think of, is doing updating the state directly in the
>> database.
>>>> But then you need to know what you are doing. I can image that this
>> would
>>>> be feasible by using an AWS lambda function. Hope this helps.
>>>> 
>>>> Cheers, Fokko
>>>> 
>>>> 2018-05-26 17:50 GMT+02:00 Stefan Seelmann <mail@xxxxxxxxxxxxxxxxxx>:
>>>> 
>>>>> Hello,
>>>>> 
>>>>> I have a DAG (externally triggered) where some processing is done at an
>>>>> external system (EC2 instance). The processing is started by an Airflow
>>>>> task (via HTTP request). The DAG should only continue once that
>>>>> processing is completed. In a first naive implementation I created a
>>>>> sensor that gets the progress (via HTTP request) and only if status is
>>>>> "finished" returns true and the DAG run continues. That works but...
>>>>> 
>>>>> ... the external processing can take hours or days, and during that
>> time
>>>>> a worker is occupied which does nothing but HTTP GET and sleep. There
>>>>> will be hundreds of DAG runs in parallel which means hundreds of
>> workers
>>>>> are occupied.
>>>>> 
>>>>> I looked into other operators that do computation on external systems
>>>>> (ECSOperator, AWSBatchOperator) but they also follow that pattern and
>>>>> just wait/sleep.
>>>>> 
>>>>> So I want to ask if there is a more efficient way to build such a
>>>>> workflow with Airflow?
>>>>> 
>>>>> Kind Regards,
>>>>> Stefan
>>>>> 
>>>> 
>>> 
>> 
>> 




( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03496.html on line 168
Call Stack
#TimeMemoryFunctionLocation
10.0008368760{main}( ).../msg03496.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03496.html on line 168
Call Stack
#TimeMemoryFunctionLocation
10.0008368760{main}( ).../msg03496.html:0