git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Multiple firings on side input


Hi Vilhelm,

This is a known issue in the Beam model. Trigger firings should automatically update downstream results, but instead they are treated as new elements. The design for retractions will alleviate this problem. But you can work around it yourself in specific cases like this.

You can use View.asMultimap() which will mean each trigger firing for a key will add a new element to the set of values for a key. In order to distinguish the latest one, you will need to do some manual preparation.

    // This is your triggered input; there will be duplicate keys
    PCollection<KV<MyKey, MyValue>> input = ...

    // Here I am just making up the types you need to implement to keep the index of the triggering
    PCollectionView<MultiMap<MyKey, MyValuePlusSequenceNumber>> sideInput = input
        .apply(ParDo.of(new DoFn<KV<MyKey, MyValue>, KV<MyKey, MyValuePlusSequenceNumber>() {
          @ProcessElement
          public void process(ProcessContext ctx) {
            ctx.output(
                ctx.element().getKey(),
                new MyValuePlusSequenceNumber(
                    ctx.element().getValue(),
                    ctx.pane().getIndex()));
          }
        })
        .apply(View.asMultiMap());

This will allow you to grab all the trigger firings associated with a particular key and find the last. It is not ideal, either in clarity or performance, but it can work for some cases until we have retraction support.

Apologies for typos or broken code here, as I am just typing it in email without checking its compilation or behavior.

Kenn

On Mon, Apr 16, 2018 at 4:15 AM Vilhelm von Ehrenheim <vonehrenheim@xxxxxxxxx> wrote:

Hi!
I have a side input with streaming updates in a global window. I have tried to approach this several ways but can’t figure out how to do it. What I really need is a side-input Map that should be updated when streaming input change (i.e keys are updated).

I have tried to implement this with a View.asMap transform but got an error that I have duplicate keys in my set (which are there due to multiple triggered updates on the pcollection). I then tried to do it as a singleton using a global CombineFn to build the map and use the a Combine.globally().asSingletonView() instead. But I then got an error java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.

How are you supposed to do this? In the documentation there is a part that suggests that this should be possible:

If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger.

Thanks!

// Vilhelm von Ehrenheim



( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-beam-users/msg02138.html on line 110
Call Stack
#TimeMemoryFunctionLocation
10.0008372760{main}( ).../msg02138.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-beam-users/msg02138.html on line 110
Call Stack
#TimeMemoryFunctionLocation
10.0008372760{main}( ).../msg02138.html:0