git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Basic modeling question


Not sure if it’s optimal compared to what James proposes, but I would have simply made the weekly and monthly rollup tasks as downstream tasks of the daily log ingestion tasks they depend on. Then I would have used trigger rules ‘all_done’ to ensure those rollup tasks start when their parent tasks are completed.

https://airflow.incubator.apache.org/concepts.html#trigger-rules

(daily log ingestion) > (daily rollup)
(daily log ingestion) > (weekly rollup + TriggerRule.all_done)
(daily log ingestion) > (monthly rollup + TriggerRule.all_done)

Cheers

Alexis

On 9 Aug 2018, at 02:57, James Meickle <jmeickle@xxxxxxxxxxxxxx.INVALID<mailto:jmeickle@xxxxxxxxxxxxxx.INVALID>> wrote:

It sounds like you want something like this?

root_operator = DummyOperator()

def offset_operator(i):
 my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
}}}};".format(offset=i)
 sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
query=my_sql_query)
 return sql_operator

offset_operators = list(offset_operator(i) for i in range(7))
root_operator >> offset_operators

# Daily just waits on today, no offset
do_daily_work = DummyOperator()
offset_operators[0] >> do_daily_work

# Weekly waits on today AND the six prior offsets
do_weekly_work = DummyOperator()
offset_operators >> do_weekly_work

IOW, every day you wait for that day's data to be available, and then run
the daily job; you also wait for the previous six days data to be
available, and when it is, run the weekly job.

n.b. - if you do it this way you will have up to 7 tasks polling the "same"
data point, which is slightly wasteful. But it's also not much code or
mental effort to write it this way.

On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gsilk@xxxxxxxxxxx.invalid<mailto:gsilk@xxxxxxxxxxx.invalid>>
wrote:

My main concern is how to express the fact that the weekly rollup depends
on the previous 7 days worth of data, and ensure that it does not run until
the tasks that generate those 7 days of data have run, assuming that tasks
can run non-sequentially.

It's easy enough when you have the following situation:

(daily log ingestion) <-- (daily rollup)

In any given DAG run, you are guaranteed to have the data needed for (daily
rollup), because the dependency that generated its data just ran.

But I'm not sure how best to model it when you have all of the following:

(daily log ingestion) <-- (daily rollup)
(daily log ingestion) <-- (weekly rollup)
(daily log ingestion) <-- (monthly rollup)



On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmiston@xxxxxxxxx<mailto:tedmiston@xxxxxxxxx>>
wrote:

Gabriel -

Ah, I missed your earlier comment about weekly/monthly rollups also being
on a daily cadence.  So is your concern e.g., more about reducing the
redundant process of the weekly rollup tasks for the days of that range
that already processed in the previous DAG run(s)?  Or mainly about the
dependency of not executing the first weekly at all until the first 7
daily
rollups worth of data have built up?

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeickle@xxxxxxxxxxxxxx<mailto:jmeickle@xxxxxxxxxxxxxx>.
invalid> wrote:

If you want to run (daily, rolling weekly, rolling monthly) backups on
a
daily basis, and they're mostly the same but have some additional
dependencies, you can write a DAG factory method, which you call three
times. Certain nodes only get added to the longer-than-daily backups.

On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gsilk@xxxxxxxxxxx.invalid<mailto:gsilk@xxxxxxxxxxx.invalid>

wrote:

Thanks Andy and Taylor for the suggestions --

I see how that would work for the case where you want a weekly rollup
that
runs on a weekly cadence.

But what about a rolling weekly or monthly rollup that runs each day?

On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
andy.cooper@xxxxxxxxxxxxx<mailto:andy.cooper@xxxxxxxxxxxxx>>
wrote:

To expand on Taylor's idea

I recently wrote a ScheduleBlackoutSensor that would allow you to
prevent a
task from running if it meets the criteria provided. It accepts an
array
of
args for any number of the criteria so you could leverage this
sensor
to
provide "blackout" runs for a range of days of the week.

https://github.com/apache/incubator-airflow/pull/3702/files

For example,

task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)

Would prevent a task from running Monday - Saturday, allowing it to
run
on
Sunday.

You could leverage this Sensor as you would any other sensor or you
could
invert the logic so that you would only need to specify

task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)

To "whitelist" a task to run on Sundays.


Let me know if you have any questions

On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
tedmiston@xxxxxxxxx<mailto:tedmiston@xxxxxxxxx>>
wrote:

Gabriel -

One approach I've seen for a similar use case is to have multiple
related
rollups in one DAG that runs daily, then have the non-daily tasks
skip
most
of the time (e.g., weekly only actually executes on Sundays and
is
parameterized to look at the last 7 days).

You could implement that not running part a few ways, but one
idea
is a
sensor in front of the weekly rollup task.  Imagine a
SundaySensor
like
return
execution_date.weekday() == 6.  One thing to keep in mind here is
dependence on the DAG's cron schedule being more granular than
the
tasks.

I think this could generalize into a DayOfWeekSensor /
DayOfMonthSensor
that would be nice to have.

Of course this does mean some scheduler inefficiency on the skip
days,
but
as long as those skips are fast and the overall number of tasks
is
small, I
can accept that.

*Taylor Edmiston*
Blog <https://blog.tedmiston.com/> | CV
<https://stackoverflow.com/cv/taylor> | LinkedIn
<https://www.linkedin.com/in/tedmiston/> | AngelList
<https://angel.co/taylor> | Stack Overflow
<https://stackoverflow.com/users/149428/taylor-edmiston>


On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
<gsilk@xxxxxxxxxxx.invalid<mailto:gsilk@xxxxxxxxxxx.invalid>

wrote:

Hello Airflow community,

I have a basic question about how best to model a common data
pipeline
pattern here at Dropbox.

At Dropbox, all of our logs are ingested and written into Hive
in
hourly
and/or daily rollups. On top of this data we build many weekly
and
monthly
rollups, which typically run on a daily cadence and compute
results
over
a
rolling window.

If we have a metric X, it seems natural to put the daily,
weekly,
and
monthly rollups for metric X all in the same DAG.

However, the different rollups have different dependency
structures.
The
daily job only depends on a single day partition, whereas the
weekly
job
depends on 7, the monthly on 28.

In Airflow, it seems the two paradigms for modeling
dependencies
are:
1) Depend on a *single run of a task* within the same DAG
2) Depend on *multiple runs of task* by using an
ExternalTaskSensor

I'm not sure how I could possibly model this scenario using
approach
#1,
and I'm not sure approach #2 is the most elegant or performant
way
to
model
this scenario.

Any thoughts or suggestions?









( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-airflow-development/msg04183.html on line 322
Call Stack
#TimeMemoryFunctionLocation
10.0010358472{main}( ).../msg04183.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-airflow-development/msg04183.html on line 322
Call Stack
#TimeMemoryFunctionLocation
10.0010358472{main}( ).../msg04183.html:0