git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ingest daily data, but delivery is always delayed by two days


What about an exponential back off on the poke interval?

On Fri, 12 Oct 2018, 13:01 Ash Berlin-Taylor, <ash@xxxxxxxxxx> wrote:

> That would work for some of our other uses cases (and has been an idea in
> our backlog for months) but not this case as we're reading from someone
> else's bucket so can't set up notifications etc. :(
>
> -ash
>
> > On 12 Oct 2018, at 11:57, Bolke de Bruin <bdbruin@xxxxxxxxx> wrote:
> >
> > S3 Bucket notification that triggers a dag?
> >
> > Verstuurd vanaf mijn iPad
> >
> >> Op 12 okt. 2018 om 12:42 heeft Ash Berlin-Taylor <ash@xxxxxxxxxx> het
> volgende geschreven:
> >>
> >> A lot of our dags are ingesting data (usually daily or weekly) from
> suppliers, and they are universally late.
> >>
> >> In the case I'm setting up now the delivery lag is about 30hours - data
> for 2018-10-10 turned up at 2018-10-12 05:43.
> >>
> >> I was going to just set this up with an S3KeySensor and a daily
> schedule, but I'm wondering if anyone has any other bright ideas for a
> better way of handling this sort of case:
> >>
> >>   dag = DAG(
> >>       DAG_ID
> >>       default_args=args,
> >>       start_date=args['start_date'],
> >>       concurrency=1,
> >>       schedule_interval='@daily',
> >>       params={'country': cc}
> >>   )
> >>
> >>   with dag:
> >>       task = S3KeySensor(
> >>           task_id="await_files",
> >>           bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{
> execution_date.strftime('%Y/%m/%d') }}/SUCCESS",
> >>           poke_interval=60 * 60 * 2,
> >>           timeout=60 * 60 * 72,
> >>       )
> >>
> >> That S3 key sensor is _going_ to fail the first 18 times or so it runs
> which just seems silly.
> >>
> >> One option could be to use `ds_add` or similar on the execution date,
> but I don't like breaking the (obvious) link between execution date and
> which files it picks up, so I've ruled out this option
> >>
> >> I could use a Time(Delta)Sensor to just delay the start of the
> checking. I guess with the new change in master to make sensors yield their
> execution slots that's not a terrible plan.
> >>
> >> Does anyone else have any other idea, including possible things we
> could add to Airflow itself.
> >>
> >> -ash
> >>
>
>