git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Multiple stream operator watermark handling


On Thu, May 24, 2018 at 9:20 AM, Elias Levy <fearsome.lucidity@xxxxxxxxx> wrote:
On Thu, May 24, 2018 at 7:26 AM, Piotr Nowojski <piotr@xxxxxxxxxxxxxxxxx> wrote:
From top of my head I can imagine two solutions:

1. Override the default behaviour of the operator via for example org.apache.flink.streaming.api.datastream.ConnectedStreams#transform

That seems the safer, but more complicated path.

As we had already implemented the business logic in a RichCoFlatMapFunction, I ended up extending CoStreamFlatMap:

class SingleWatermarkCoFlatMap[IN1,IN2,OUT](flatMapper: CoFlatMapFunction[IN1,IN2,OUT]) extends CoStreamFlatMap(flatMapper)  {

  // Pass through the watermarks from the first stream
  override def processWatermark1(mark: Watermark): Unit = processWatermark(mark)

  // Ignore watermarks from the second stream
  override def processWatermark2(mark: Watermark): Unit = {}
}


Then it was easy to replace:

stream1
      .connect(stream2)
      .flatMap( new BusinessCoFlatMapFunction(params) )
        .name("Operator")
        .uid("op")

with:

stream1
      .connect(stream2)
      .transform("Operator", new SingleWatermarkCoFlatMap[X,Y,Z](new BusinessCoFlatMapFunction(params)))
      .uid("op")




( ! ) Warning: include(msgfooter.php): failed to open stream: No such file or directory in /var/www/git/apache-flink-users/msg09441.html on line 71
Call Stack
#TimeMemoryFunctionLocation
10.0008368696{main}( ).../msg09441.html:0

( ! ) Warning: include(): Failed opening 'msgfooter.php' for inclusion (include_path='.:/var/www/git') in /var/www/git/apache-flink-users/msg09441.html on line 71
Call Stack
#TimeMemoryFunctionLocation
10.0008368696{main}( ).../msg09441.html:0