git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Basic modeling question


There's also the hack of using templating to skip executions. Say for a
BashOperator:

{% if execution_date.weekday() == 1 %}
echo "skipping today"
{% else %}
./run_workload.sh
{% endif %}

On Wed, Aug 8, 2018 at 4:27 PM Gabriel Silk <gsilk@xxxxxxxxxxx.invalid>
wrote:

> Alexis, do you mean you would have done this using an ExternalTaskSensor?
> Or is there some other way to depend on a range of tasks?
>
> On Wed, Aug 8, 2018 at 3:35 PM, Alexis Rolland <alexis.rolland@xxxxxxxxxxx
> >
> wrote:
>
> > Not sure if it’s optimal compared to what James proposes, but I would
> have
> > simply made the weekly and monthly rollup tasks as downstream tasks of
> the
> > daily log ingestion tasks they depend on. Then I would have used trigger
> > rules ‘all_done’ to ensure those rollup tasks start when their parent
> tasks
> > are completed.
> >
> > https://airflow.incubator.apache.org/concepts.html#trigger-rules
> >
> > (daily log ingestion) > (daily rollup)
> > (daily log ingestion) > (weekly rollup + TriggerRule.all_done)
> > (daily log ingestion) > (monthly rollup + TriggerRule.all_done)
> >
> > Cheers
> >
> > Alexis
> >
> > On 9 Aug 2018, at 02:57, James Meickle <jmeickle@xxxxxxxxxxxxxx.
> > INVALID<mailto:jmeickle@xxxxxxxxxxxxxx.INVALID>> wrote:
> >
> > It sounds like you want something like this?
> >
> > root_operator = DummyOperator()
> >
> > def offset_operator(i):
> >  my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
> > }}}};".format(offset=i)
> >  sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
> > query=my_sql_query)
> >  return sql_operator
> >
> > offset_operators = list(offset_operator(i) for i in range(7))
> > root_operator >> offset_operators
> >
> > # Daily just waits on today, no offset
> > do_daily_work = DummyOperator()
> > offset_operators[0] >> do_daily_work
> >
> > # Weekly waits on today AND the six prior offsets
> > do_weekly_work = DummyOperator()
> > offset_operators >> do_weekly_work
> >
> > IOW, every day you wait for that day's data to be available, and then run
> > the daily job; you also wait for the previous six days data to be
> > available, and when it is, run the weekly job.
> >
> > n.b. - if you do it this way you will have up to 7 tasks polling the
> "same"
> > data point, which is slightly wasteful. But it's also not much code or
> > mental effort to write it this way.
> >
> > On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gsilk@xxxxxxxxxxx.invalid<
> > mailto:gsilk@xxxxxxxxxxx.invalid>>
> > wrote:
> >
> > My main concern is how to express the fact that the weekly rollup depends
> > on the previous 7 days worth of data, and ensure that it does not run
> until
> > the tasks that generate those 7 days of data have run, assuming that
> tasks
> > can run non-sequentially.
> >
> > It's easy enough when you have the following situation:
> >
> > (daily log ingestion) <-- (daily rollup)
> >
> > In any given DAG run, you are guaranteed to have the data needed for
> (daily
> > rollup), because the dependency that generated its data just ran.
> >
> > But I'm not sure how best to model it when you have all of the following:
> >
> > (daily log ingestion) <-- (daily rollup)
> > (daily log ingestion) <-- (weekly rollup)
> > (daily log ingestion) <-- (monthly rollup)
> >
> >
> >
> > On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmiston@xxxxxxxxx
> > <mailto:tedmiston@xxxxxxxxx>>
> > wrote:
> >
> > Gabriel -
> >
> > Ah, I missed your earlier comment about weekly/monthly rollups also being
> > on a daily cadence.  So is your concern e.g., more about reducing the
> > redundant process of the weekly rollup tasks for the days of that range
> > that already processed in the previous DAG run(s)?  Or mainly about the
> > dependency of not executing the first weekly at all until the first 7
> > daily
> > rollups worth of data have built up?
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> > On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@xxxxxxxxxxxxxx<
> > mailto:jmeickle@xxxxxxxxxxxxxx>.
> > invalid> wrote:
> >
> > If you want to run (daily, rolling weekly, rolling monthly) backups on
> > a
> > daily basis, and they're mostly the same but have some additional
> > dependencies, you can write a DAG factory method, which you call three
> > times. Certain nodes only get added to the longer-than-daily backups.
> >
> > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gsilk@xxxxxxxxxxx.invalid<
> > mailto:gsilk@xxxxxxxxxxx.invalid>
> >
> > wrote:
> >
> > Thanks Andy and Taylor for the suggestions --
> >
> > I see how that would work for the case where you want a weekly rollup
> > that
> > runs on a weekly cadence.
> >
> > But what about a rolling weekly or monthly rollup that runs each day?
> >
> > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
> > andy.cooper@xxxxxxxxxxxxx<mailto:andy.cooper@xxxxxxxxxxxxx>>
> > wrote:
> >
> > To expand on Taylor's idea
> >
> > I recently wrote a ScheduleBlackoutSensor that would allow you to
> > prevent a
> > task from running if it meets the criteria provided. It accepts an
> > array
> > of
> > args for any number of the criteria so you could leverage this
> > sensor
> > to
> > provide "blackout" runs for a range of days of the week.
> >
> > https://github.com/apache/incubator-airflow/pull/3702/files
> >
> > For example,
> >
> > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
> >
> > Would prevent a task from running Monday - Saturday, allowing it to
> > run
> > on
> > Sunday.
> >
> > You could leverage this Sensor as you would any other sensor or you
> > could
> > invert the logic so that you would only need to specify
> >
> > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
> >
> > To "whitelist" a task to run on Sundays.
> >
> >
> > Let me know if you have any questions
> >
> > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
> > tedmiston@xxxxxxxxx<mailto:tedmiston@xxxxxxxxx>>
> > wrote:
> >
> > Gabriel -
> >
> > One approach I've seen for a similar use case is to have multiple
> > related
> > rollups in one DAG that runs daily, then have the non-daily tasks
> > skip
> > most
> > of the time (e.g., weekly only actually executes on Sundays and
> > is
> > parameterized to look at the last 7 days).
> >
> > You could implement that not running part a few ways, but one
> > idea
> > is a
> > sensor in front of the weekly rollup task.  Imagine a
> > SundaySensor
> > like
> > return
> > execution_date.weekday() == 6.  One thing to keep in mind here is
> > dependence on the DAG's cron schedule being more granular than
> > the
> > tasks.
> >
> > I think this could generalize into a DayOfWeekSensor /
> > DayOfMonthSensor
> > that would be nice to have.
> >
> > Of course this does mean some scheduler inefficiency on the skip
> > days,
> > but
> > as long as those skips are fast and the overall number of tasks
> > is
> > small, I
> > can accept that.
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> > <gsilk@xxxxxxxxxxx.invalid<mailto:gsilk@xxxxxxxxxxx.invalid>
> >
> > wrote:
> >
> > Hello Airflow community,
> >
> > I have a basic question about how best to model a common data
> > pipeline
> > pattern here at Dropbox.
> >
> > At Dropbox, all of our logs are ingested and written into Hive
> > in
> > hourly
> > and/or daily rollups. On top of this data we build many weekly
> > and
> > monthly
> > rollups, which typically run on a daily cadence and compute
> > results
> > over
> > a
> > rolling window.
> >
> > If we have a metric X, it seems natural to put the daily,
> > weekly,
> > and
> > monthly rollups for metric X all in the same DAG.
> >
> > However, the different rollups have different dependency
> > structures.
> > The
> > daily job only depends on a single day partition, whereas the
> > weekly
> > job
> > depends on 7, the monthly on 28.
> >
> > In Airflow, it seems the two paradigms for modeling
> > dependencies
> > are:
> > 1) Depend on a *single run of a task* within the same DAG
> > 2) Depend on *multiple runs of task* by using an
> > ExternalTaskSensor
> >
> > I'm not sure how I could possibly model this scenario using
> > approach
> > #1,
> > and I'm not sure approach #2 is the most elegant or performant
> > way
> > to
> > model
> > this scenario.
> >
> > Any thoughts or suggestions?
> >
> >
> >
> >
> >
> >
> >
> >
>


( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg04186.html on line 359
Call Stack
#TimeMemoryFunctionLocation
10.0008358472{main}( ).../msg04186.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg04186.html on line 359
Call Stack
#TimeMemoryFunctionLocation
10.0008358472{main}( ).../msg04186.html:0