git.net

Re: GlobalWindows, Watermarks and triggers

Groovy, thanks Lukasz, thanks Robert.

Really appreciate your input on this.

Stephan

On Fri, 8 Jun 2018, 01:15 Robert Bradshaw, <robertwb@xxxxxxxxxx> wrote:
On Thu, Jun 7, 2018 at 3:42 PM Stephan Kotze <stephanus.kotze@xxxxxxxxx> wrote:
Yep, I get that watermarks can move forward in Chunks greater than one.

I am also comfortable with the notion of Aggregate[Pete:09:02,X,Y] before Aggregate[Pete:09:01,X,Y] and them arriving out of order at another Window with it's own triggers.

I don't need the data ordered in event time (strictly speaking) I'm happy with them arriving in any order, But I only want the trigger to fire and release its elements once all of the aggs up to that point in time have become available.

Timers are exactly this--they fire when you've seen all data up to a given timestamp. Until we support infinite window sets, I think that stateful DoFns are the simplest solution here. A CumulativeSum (or general CumulativeCombinePerKey) PTransform would probably make a nice blog post.

I did indeed consider the previous approach (using the feedback loop), and yep the problem is no different, I wanted to explore the space further and find a more elegant solution (Not introducing Cycles if there was a better way to handle it).

On Thu, Jun 7, 2018 at 10:34 PM Lukasz Cwik <lcwik@xxxxxxxxxx> wrote:
A watermark is a lower bound on data that is processed and available. It is specifically a lower bound because we want runners to be able to process each window in parallel.

In your example, a Runner may choose to compute Aggregate[Pete:09:01,X,Y] in parallel with Aggregate[Pete:09:02,X,Y] even if the watermark is only at 9:00 and then advance the watermark to 9:03. This means that a downstream computation may see Aggregate[Pete:09:02,X,Y] before Aggregate[Pete:09:01,X,Y]. The Runner may actually process 1000s of windows at the same time in parallel and advance the watermark by an arbitrarily large number. Nothing states that the watermark only needs to advance one time unit at a time.

Your problem is specifically saying I want you to provide me all the data ordered in event time per key (a specialization of sorting). This would require Runners to take this massively parallel computation and order it per key which doesn't exist in Apache Beam. People have requested support for ordering/sorting in the past and there is some limited support inside the sorter extension[1] but nothing like what your looking for. The only way to do this is to build this ordering yourself, if you can provide a generalization I'm sure Apache Beam would be interested in a contribution of that kind.

On the other hand, do session windows fit your usecase or do you truly need a global aggregation that is ongoing?

Also, you had a very similar thread about doing Global Aggregation[2], does using a feedback loop via Pubsub/Kafka help you solve your problem so your not using a Global Window and can rely on Apache Beam's triggers to handle the "global" aggregation? (and how is this problem different then the last?)

On Thu, Jun 7, 2018 at 2:48 AM Stephan Kotze <stephanus.kotze@xxxxxxxxx> wrote:
Thanks for the thorough replies!

Raghu, I think I have mistakenly used the wrong terminology with regards to "  PCollection has a notion of "which elements it has emitted for a given position of the watermark" So apologies there :)

The essence here does seem to be in this though:
Thus the questions.

If I have a ParDo on the GlobalWindow that is triggered by OnElementCountAtLeast(1)  and events can arrive out of order, how can the ParDo have a watermark that moves only forward when it's possible To trigger on any amount of elements having arrived (this would appear to allow the emission of 2 later events, followed by an earlier event for another trigger).
You have to re-implement effectively what Apache Beam Runners do by computing the watermark of the elements, figuring out if they are early, on time, late and buffering them so that they are grouped together correctly for each window with a StatefulDoFn. This is quite difficult to do in general.

This for me feels like an unnecessarily complex thing one needs to implement, to ensure completeness/correctness even when you are sure to have all the relevant data in the pipeline for a given time period already.

So if you'll humour me one more time please, I'll try to explain as concisely as possible, (Because it just feels wrong that use case requires added complexity of manually implemented stateful DoFns.

• I have an unbounded source of events like these:  Event[TimeStamp:Instant,PersonId:String,Amount:Long]
• The source is watermarked on event time (Event.TimeStamp)
• The messages have timestamps applied (beam timestamp = Event.TimeStamp
• Events arrive in order of event time.
• I create aggregates like these: Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long, AmountSum:long]

I would like to obtain the following:

--------------------------------------------------------------------------------------------------
| PersonId  |  Time    | 1Min:EventCount | 1Min:AmountSum | Global:EventCount | GlobalAmountSum  |
--------------------------------------------------------------------------------------------------
| Pete      | 09:01    | 3               | 7              | 3                 | 7                |
--------------------------------------------------------------------------------------------------
| Pete      | 09:02    | 1               | 2              | 4                 | 9                |
--------------------------------------------------------------------------------------------------
| Pete      | 09:03    | 5               | 9              | 9                 | 18               |
--------------------------------------------------------------------------------------------------
....
--------------------------------------------------------------------------------------------------

A rough starting point for a pipeline is:

PCollection<KV<String, Aggregate>> per1MinAggStream =
UboundedSource<Event>
-> KV.Create(Event:PersonId)
-> FixedWindow.of(1M)
-> GroupByKey()
-> AggregateDoFn( -> c.output(key, new Aggregate(window.getMaxTimeStamp(),key,count(),sum())))

PCollection<KV<KV<String, Aggregate>> allTimeStream =
per1MinAggStream
-> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????)
-> GroupByKey()
-> GlobalAggDoFn( -> c.output(key, new Aggregate(window.getMaxTimeStamp(),key,count(),sum())))

///Potentially, re-assign allTimeStream to 1Min windows here

results =  KeyedPCollectionTuple
.of(1MinAggsTag, per1MinAggStream)
.and(globalAggsTag, allTimeStream)
.apply(CoGroupByKey.create())
-> doFn()

This works if I can find a trigger for the global window that ensures it does not emit any 1Min Aggs, with others still in flight (in processing not event time) to my  GlobalAggDoFn
The 1 min aggs may be out of order in the Iterable<Aggregate> provided to GlobalAggDoFn, (I can re-order and emit the totals with the correct timestamps then assign to new 1 min windows).
However, if the GlobalWindow triggers and the Iterable<Aggregate> still has missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be correct/complete at a given point in time.

An example:
1) per1MinAggStream has received all events up to 09:03
2) The 1Minute Windows have all triggered and are creating their Aggregates.
3) Some of the 1Minute Aggregates complete their calcs and are streamed to the GlobalWindow,
4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is running slow.
4) The Global Window Triggers and sends the following to the GlobalAggDoFn: Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y], Aggregate[Pete:09:01,X,Y]]
5) I can construct a global total up to 09:01 emit that and re-window, but I cannot do anything further as I'm unable to tell whether there is actually data for the 09:02 or whether it has simply not arrived at the windowing function (All sorts of complications ensue).

It just feels strange that even though I'm trying to tie everything to event time, when it comes to the global window (unless I get super fancy with stateful doFns, I cannot get to a state (or find triggers) that allow me to remain consistent over event time) I have to work around it and start taking processing time into account.

This sort of leads me back to the question: Is there a trigger that ensures that the GlobalWindow releases only events (aggregates in this case) to the GlobalAggDoFn, once it knows that no earlier events will arrive at the function. (We have watermarked source, with messages arriving in order according to their event times). I've not seen any triggers that would do this, unless I suppose, like I asked earlier, if the GlobalWindowFn, somehow only emits events when they are complete in processing time as well as event time.

Many thanks for all the help thus far.
Stephan

On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <rangadi@xxxxxxxxxx> wrote:
> * the PCollection has a notion of "which elements it has emitted for a given position of the watermark"
This is not correct (to me it reads to me like you are saying something close to 'PCollection is a stream of (element, watermark) tuples').
Every element in a PCollection has an associated with a event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark is not associated with a PCollection, and is completely independent of event timestamp.

IOW, Watermark is by definition monotonic. When a stage in your pipeline sets its watermark to 'X' what it means is that each of its inputs (sources like PubSub, or stages) has communicated a watermark timestamp Y saying it expects all its future elements will have event timestamp >= Y.  X = min(Y). A custom source that receives events with monotonically increasing timestamp can just report the timestamp of the last element emitted as it watermark.

OnElementCountAtLeast(1) has no dependence on watermark progression, since trigger does not depend on time.

On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <stephanus.kotze@xxxxxxxxx> wrote:
Hi there.

I have a question regarding the completeness of results in a GlobalWindow for a pipeline which receives all events in order. (no late/missing data).

The question is based on reasoning about Beam that takes 3 pieces of (our current) understanding into account:

1)Global Window Watermark
As I understand it a PCollection with a GlobalWindow and ParDo will be running a watermark (which is what allows Triggers in stateful DoFns to fire for example).

If this is the case,
* the PCollection has a notion of "which elements it has emitted for a given position of the watermark"
* the PCollection will also know which results from upstream PTransforms/Pcollections etc. are still in flight
* the PCollection will emit results and update its watermark once Upstream elements have all provided their results and shifted their watermarks.

2) Triggering on Watermark
For Fixed windows for example we have the "AfterWatermark".pastEndOfWindow() trigger.
In the case of Global windows however, the GlobalWindow never "ends", so the watermark cannot progress past this point and we'll never get any results for something like newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())

3) Ordering between upstream PCollections/PTransforms and GlobalWindows
In the following pipeline:  Source -> fixedWindow(1m) -> GlobalWindow(), the 1Min segments can arrive out of order in the global window, even if the source was ordered (with no late data)

Thus the questions.

If I have a ParDo on the GlobalWindow that is triggered by OnElementCountAtLeast(1)  and events can arrive out of order, how can the ParDo have a watermark that moves only forward when it's possible To trigger on any amount of elements having arrived (this would appear to allow the emission of 2 later events, followed by an earlier event for another trigger).

Or?

Does the OnElementCountAtLeast only trigger once ALL upstream elements up to and including the watermark have arrived? (Though they may be unordered in the DoFn's input for example, it is still a complete list with All upstream produced elements between the last watermark and the "new" one that will be set once the ParDo has completed).

I stress the point because it has some important repercussions for us (so I'm just paraphrasing the question slightly below, to try and make it as clear as possible :))

How can a PTransform/PCollection on a Global Window have a monotonic watermark if events can trigger calcs with out of order events (when using a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that when the trigger fires, we will receive a complete list of upstream results up to the time of the latest event in the Collection we receive to process.

Hopefully I've explained the question concisely enough :)

Stephan