[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: GlobalWindows, Watermarks and triggers

> * the PCollection has a notion of "which elements it has emitted for a given position of the watermark"
This is not correct (to me it reads to me like you are saying something close to 'PCollection is a stream of (element, watermark) tuples').
Every element in a PCollection has an associated with a event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark is not associated with a PCollection, and is completely independent of event timestamp.

IOW, Watermark is by definition monotonic. When a stage in your pipeline sets its watermark to 'X' what it means is that each of its inputs (sources like PubSub, or stages) has communicated a watermark timestamp Y saying it expects all its future elements will have event timestamp >= Y.  X = min(Y). A custom source that receives events with monotonically increasing timestamp can just report the timestamp of the last element emitted as it watermark. 

OnElementCountAtLeast(1) has no dependence on watermark progression, since trigger does not depend on time.

On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <stephanus.kotze@xxxxxxxxx> wrote:
Hi there.

I have a question regarding the completeness of results in a GlobalWindow for a pipeline which receives all events in order. (no late/missing data).

The question is based on reasoning about Beam that takes 3 pieces of (our current) understanding into account:

1)Global Window Watermark
As I understand it a PCollection with a GlobalWindow and ParDo will be running a watermark (which is what allows Triggers in stateful DoFns to fire for example).

If this is the case,   
 * the PCollection has a notion of "which elements it has emitted for a given position of the watermark"
 * the PCollection will also know which results from upstream PTransforms/Pcollections etc. are still in flight
 * the PCollection will emit results and update its watermark once Upstream elements have all provided their results and shifted their watermarks.

2) Triggering on Watermark
 For Fixed windows for example we have the "AfterWatermark".pastEndOfWindow() trigger. 
 In the case of Global windows however, the GlobalWindow never "ends", so the watermark cannot progress past this point and we'll never get any results for something like newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())

3) Ordering between upstream PCollections/PTransforms and GlobalWindows
In the following pipeline:  Source -> fixedWindow(1m) -> GlobalWindow(), the 1Min segments can arrive out of order in the global window, even if the source was ordered (with no late data)

Thus the questions.

If I have a ParDo on the GlobalWindow that is triggered by OnElementCountAtLeast(1)  and events can arrive out of order, how can the ParDo have a watermark that moves only forward when it's possible To trigger on any amount of elements having arrived (this would appear to allow the emission of 2 later events, followed by an earlier event for another trigger).


Does the OnElementCountAtLeast only trigger once ALL upstream elements up to and including the watermark have arrived? (Though they may be unordered in the DoFn's input for example, it is still a complete list with All upstream produced elements between the last watermark and the "new" one that will be set once the ParDo has completed).

I stress the point because it has some important repercussions for us (so I'm just paraphrasing the question slightly below, to try and make it as clear as possible :))

How can a PTransform/PCollection on a Global Window have a monotonic watermark if events can trigger calcs with out of order events (when using a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that when the trigger fires, we will receive a complete list of upstream results up to the time of the latest event in the Collection we receive to process.

Hopefully I've explained the question concisely enough :)