[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: GlobalWindows, Watermarks and triggers

On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <stephanus.kotze@xxxxxxxxx> wrote:
Hi there.

I have a question regarding the completeness of results in a GlobalWindow for a pipeline which receives all events in order. (no late/missing data).

The question is based on reasoning about Beam that takes 3 pieces of (our current) understanding into account:

1)Global Window Watermark
As I understand it a PCollection with a GlobalWindow and ParDo will be running a watermark (which is what allows Triggers in stateful DoFns to fire for example).

If this is the case,   
 * the PCollection has a notion of "which elements it has emitted for a given position of the watermark"
 * the PCollection will also know which results from upstream PTransforms/Pcollections etc. are still in flight
 * the PCollection will emit results and update its watermark once Upstream elements have all provided their results and shifted their watermarks.

This is upto the Runner to implement but from what I recall, all runners just have the watermark start at -infinity for a PCollection and stay at -infinity while elements are being consumed from that PCollection. For bounded PCollections, this jumps to +infinity when there are no more elements to consume and means that watermark based triggers all "fire" at the same time. For unbounded PCollections, the watermark stays at -infinity forever and hence watermark based triggers will not fire.
2) Triggering on Watermark
 For Fixed windows for example we have the "AfterWatermark".pastEndOfWindow() trigger. 
 In the case of Global windows however, the GlobalWindow never "ends", so the watermark cannot progress past this point and we'll never get any results for something like newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())

The GlobalWindow does end for bounded PCollections since the watermark advances to +infinity when we run out of elements to consume. For unbounded PCollections, your correct, the GlobalWindow never "ends".
3) Ordering between upstream PCollections/PTransforms and GlobalWindows
In the following pipeline:  Source -> fixedWindow(1m) -> GlobalWindow(), the 1Min segments can arrive out of order in the global window, even if the source was ordered (with no late data)

Triggers will control when things appear in the GlobalWindow and they only "fire" at GroupByKey.
Thus the questions.

If I have a ParDo on the GlobalWindow that is triggered by OnElementCountAtLeast(1)  and events can arrive out of order, how can the ParDo have a watermark that moves only forward when it's possible To trigger on any amount of elements having arrived (this would appear to allow the emission of 2 later events, followed by an earlier event for another trigger).
You have to re-implement effectively what Apache Beam Runners do by computing the watermark of the elements, figuring out if they are early, on time, late and buffering them so that they are grouped together correctly for each window with a StatefulDoFn. This is quite difficult to do in general.


Does the OnElementCountAtLeast only trigger once ALL upstream elements up to and including the watermark have arrived? (Though they may be unordered in the DoFn's input for example, it is still a complete list with All upstream produced elements between the last watermark and the "new" one that will be set once the ParDo has completed).

I stress the point because it has some important repercussions for us (so I'm just paraphrasing the question slightly below, to try and make it as clear as possible :))

How can a PTransform/PCollection on a Global Window have a monotonic watermark if events can trigger calcs with out of order events (when using a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that when the trigger fires, we will receive a complete list of upstream results up to the time of the latest event in the Collection we receive to process.

Hopefully I've explained the question concisely enough :)


It is hard to understand your use case, does your pipeline look like:
GBK(FixedWindows(1m) + Afterwatermark.pastEndOfWindow()) -> ParDo(A) -> GBK(GlobalWindow + OnElementCountAtLeast(1)) -> ParDo(B)
Also, can you provide a flow of elements as an example through the pipeline like A sees this and then this and then this, which means that B will always see this and then ...?
Also, are you talking about an unbounded pipeline?