git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Apache Beam delta between windows.


Hi everyone

I've already posted this question on stackoverflow @ https://stackoverflow.com/questions/50161186/apache-beam-delta-between-windows

But I thought it made sense to post it here as well.

So here we go:

I am trying to determine the delta between values calculated in different fixed windows and emit them.

                               T-2                         T-1                           T
            |---------------------------------|-------------------------------- |---------------------------------|                 
 userID     |       clickEventCount = 3       |       clickEventCount = 1       |       clickEventCount = 6       |
         
                                |                                 |                               |
                                ˅                                 ˅                               ˅
userID           clickEventCountDelta = 3           clickEventCountDelta = -2         clickEventCountDelta = 5 


Something like this would be grand:
        1. KV<userID,clickEvent> --> groupByKey() --> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount>
        2. KV<userID,clickEventCount> --> ?(deltaFn) --> KV<userId,clickEventCountDelta>


I'm struggling a bit to find a solution that is both elegant and scalable, so any help would be greatly appreciated.

Some things I've considered/tried:
    Window of Windows:
          1: KV<userID,clickEvent> --> groupByKey() --> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount>
          2: KV<userID,clickEventCount> --> groupByKey() fixedWindow(X*hour) --> ParDo(Iterable<KV<userID,clickEventCount>>)

    This allows me get my hands on the collection of KV<userID,clickEventCount> that I can iterate over and emit the delta between them.
    However, the larger fixed window seems arbitrary and unnecessary as I want to carry the delta indefinitely, not start from 0 every few hours.

    Shifting the previous count's timestamp
        1: Essentially attempting to take the KV<userID,clickEventCount> PTransform and emitting an additional KV<userID,clickEventCount> with an adjusted timestamp.
        2: Then grabbing this time shifted timestamp as a side input to a PTransform/doFn for calculating the delta with the previous period.

    This seemed like a cute idea, but it very quickly became a mess and doesn't "feel" to be the right approach.

    Using an external cache:
        1: writing the KV<userID,clickEventCount> + timestamp+window to a distributed cache.
        2: Grabbing the previous value from the cache and computing the delta.

    Doesn’t seem totally unreasonable to be able to reach back in time to the count of the previous window via a cache. However it "feels" wrong and highly inefficient given that I have the data in a PCollection somewhere nearby.

    Stateful DoFns
    Seems reasonable, but is it overkill (having to initialise them with cache lookbacks anyways as they are reset on window closes)

    Sliding Window over 2 X Windows:
    If I create a sliding window 2x the duration of the underlying count windows, I could potentially indefinitely emit the delta between every two events.
    This also feels odd, but could be the most elegant solution here?


None of the above really feel like the right way to approach deltas, and it does appear that the Compute model's state Doesn't cater for the "feed forward" of data in time.

So :)
I've done quite a bit of searching and I can't seem to find anything leading me clearly into a particular direction.
I'm probably missing something big here, so any help would be much appreciated.



( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-beam-users/msg02185.html on line 69
Call Stack
#TimeMemoryFunctionLocation
10.0007368664{main}( ).../msg02185.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-beam-users/msg02185.html on line 69
Call Stack
#TimeMemoryFunctionLocation
10.0007368664{main}( ).../msg02185.html:0