git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Cancel a Running dag


I have a similar use case, to support that I have implemented on_kill methods for all the operators.

To cancel current run:
Get all the running send a kill signal, in on_kill make sure current run for the task terminates gracefully, after the kill signal set state to failed(state should be set manually as in case of retries the scheduler will pick the task again)
Get all queued tasks and set state to shutdown
Just to be safe scan for tasks with up_for_retry state and set the state to shutdown

Task instances have set_state method that can be used to change states externally
To get all tasks with given state use get_task_instances method of dagrun
You can do this in loop until dagrun fails to avoid any race conditions for setting task states. I had a case when scheduler updated the state after I have changed the state.

There might be simpler ways but I feel I shouldn’t modify airflow metastore directly when I can use set_state which internally does that for me and don’t wanna stop the scheduler as other dagruns will be affected. 

This was the best solution I could come up with. If there is a better approach please let me know.

Thanks and Regards,
Prem

On 2018/04/13 08:33:50, "Driesprong, Fokko" <fokko@xxxxxxxxxxxxxx> wrote: 
> Like Bolke said, it has been fixed in master. One of the perquisites is
> support by the operator. For example, the Spark operator has implemented
> how to kill the Spark job on YARN, Local and Kubernetes. If you are running
> something else, you might want to check if this is implemented.
> 
> Implemented on_kill: https://github.com/apache/incubator-airflow/pull/3204
> An example of the on_kill:
> https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L485-L534
> 
> Cheers, Fokko
> 
> 2018-04-12 21:19 GMT+02:00 Bolke de Bruin <bdbruin@xxxxxxxxx>:
> 
> > This is now fixed in master. Clearing tasks will now properly terminate a
> > running task. If you pause the dag run no new tasks will be scheduled.
> >
> > B.
> >
> >
> >
> > Verstuurd vanaf mijn iPad
> >
> > > Op 12 apr. 2018 om 20:23 heeft Laura Lorenz <llorenz@xxxxxxxxxxxxxxxx>
> > het volgende geschreven:
> > >
> > > That won't stop them if they are already running in a celery worker or
> > > already in your messaging queue backend (e.g. rabbitmq; redis), but it
> > will
> > > prevent the message to do them from being emitted again by the airflow
> > > scheduler to your messaging queue backend. To be thorough you have to do
> > > both - stop the scheduler from scheduling the tasks anymore (by failing
> > > them individually and/or the DagRun in the metadata database) and, if you
> > > want to make sure the tasks that already got picked up stop and don't try
> > > again, you have to kill their worker processes and make sure your
> > messaging
> > > queue is clean of messages of that task type. If you don't care that any
> > > already started or queued up tasks finish, you can simply doctor the
> > > metadata database.
> > >
> > > Laura
> > >
> > > On Thu, Apr 12, 2018 at 12:40 PM, ramandumcs@xxxxxxxxx <
> > ramandumcs@xxxxxxxxx
> > >> wrote:
> > >
> > >> Thanks Laura,
> > >> We are using the CeleryExecutor. Just wondering if marking the
> > >> TaskInstances as failed in metadata store would also work.
> > >> -Raman
> > >>
> > >>> On 2018/04/12 16:27:00, Laura Lorenz <llorenz@xxxxxxxxxxxxxxxx> wrote:
> > >>> I use the CeleryExecutor and have used a mix of `celery control` and
> > >>> messaging queue purges to kill the running tasks and prevent them from
> > >>> being picked up by workers again (respectively), and doctor the DagRun
> > to
> > >>> failed to stop the scheduler from repopulating the message. I think if
> > >> you
> > >>> are using the Local or Sequential Executor you'd have to kill the
> > >> scheduler
> > >>> process.
> > >>>
> > >>> Laura
> > >>>
> > >>> On Thu, Apr 12, 2018 at 12:05 PM, Taylor Edmiston <tedmiston@xxxxxxxxx
> > >
> > >>> wrote:
> > >>>
> > >>>> I don't think killing a currently running task is possible today.
> > >>>>
> > >>>> Of course you can pause it from the CLI or web UI so that future runs
> > >> don't
> > >>>> get triggered, but it sounds like that's not what you're looking for.
> > >>>>
> > >>>> Best,
> > >>>> Taylor
> > >>>>
> > >>>> *Taylor Edmiston*
> > >>>> Blog <http://blog.tedmiston.com> | Stack Overflow CV
> > >>>> <https://stackoverflow.com/story/taylor> | LinkedIn
> > >>>> <https://www.linkedin.com/in/tedmiston/> | AngelList
> > >>>> <https://angel.co/taylor>
> > >>>>
> > >>>>
> > >>>> On Thu, Apr 12, 2018 at 11:26 AM, ramandumcs@xxxxxxxxx <
> > >>>> ramandumcs@xxxxxxxxx
> > >>>>> wrote:
> > >>>>
> > >>>>> Hi All,
> > >>>>> We have a use case to cancel the already running DAG. So is there any
> > >>>>> recommended way to do so.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Raman
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
> 


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg03130.html on line 185
Call Stack
#TimeMemoryFunctionLocation
10.0009368744{main}( ).../msg03130.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg03130.html on line 185
Call Stack
#TimeMemoryFunctionLocation
10.0009368744{main}( ).../msg03130.html:0