git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Ingest daily data, but delivery is always delayed by two days


For something to add to Airflow itself: I would love a more flexible
mapping between data time and processing time. The default is "n-1" (day
over day, you're aiming to process yesterday's data) but people post other
use cases on this mailing list quite frequently.

On Fri, Oct 12, 2018 at 7:46 AM Faouz El Fassi <faouz@xxxxxxxxx> wrote:

> What about an exponential back off on the poke interval?
>
> On Fri, 12 Oct 2018, 13:01 Ash Berlin-Taylor, <ash@xxxxxxxxxx> wrote:
>
> > That would work for some of our other uses cases (and has been an idea in
> > our backlog for months) but not this case as we're reading from someone
> > else's bucket so can't set up notifications etc. :(
> >
> > -ash
> >
> > > On 12 Oct 2018, at 11:57, Bolke de Bruin <bdbruin@xxxxxxxxx> wrote:
> > >
> > > S3 Bucket notification that triggers a dag?
> > >
> > > Verstuurd vanaf mijn iPad
> > >
> > >> Op 12 okt. 2018 om 12:42 heeft Ash Berlin-Taylor <ash@xxxxxxxxxx> het
> > volgende geschreven:
> > >>
> > >> A lot of our dags are ingesting data (usually daily or weekly) from
> > suppliers, and they are universally late.
> > >>
> > >> In the case I'm setting up now the delivery lag is about 30hours -
> data
> > for 2018-10-10 turned up at 2018-10-12 05:43.
> > >>
> > >> I was going to just set this up with an S3KeySensor and a daily
> > schedule, but I'm wondering if anyone has any other bright ideas for a
> > better way of handling this sort of case:
> > >>
> > >>   dag = DAG(
> > >>       DAG_ID
> > >>       default_args=args,
> > >>       start_date=args['start_date'],
> > >>       concurrency=1,
> > >>       schedule_interval='@daily',
> > >>       params={'country': cc}
> > >>   )
> > >>
> > >>   with dag:
> > >>       task = S3KeySensor(
> > >>           task_id="await_files",
> > >>           bucket_key="s3://bucket/raw/table1-{{ params.country }}/{{
> > execution_date.strftime('%Y/%m/%d') }}/SUCCESS",
> > >>           poke_interval=60 * 60 * 2,
> > >>           timeout=60 * 60 * 72,
> > >>       )
> > >>
> > >> That S3 key sensor is _going_ to fail the first 18 times or so it runs
> > which just seems silly.
> > >>
> > >> One option could be to use `ds_add` or similar on the execution date,
> > but I don't like breaking the (obvious) link between execution date and
> > which files it picks up, so I've ruled out this option
> > >>
> > >> I could use a Time(Delta)Sensor to just delay the start of the
> > checking. I guess with the new change in master to make sensors yield
> their
> > execution slots that's not a terrible plan.
> > >>
> > >> Does anyone else have any other idea, including possible things we
> > could add to Airflow itself.
> > >>
> > >> -ash
> > >>
> >
> >
>