[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Review for [BEAM-2928] SideInput in ULR


I am looking for reviews and suggestions, regarding side input in ULR [1]. We have planned steps described in this doc [2].

A draft of PR, for the first step, is created. Would like to ask for suggestions on [1] (maybe [2] as well). Figured maybe getting some feedback would be of great help, in case I go too far with something less preferred.

What this PR [1] has done (limited to Step One):

  1. Setup and pass stateAPI server description, and contexts, correctly.

  2. Creates a SideInput Handler (by borrowing ideas from Flink implementation) that does the KV look up. This basically implements a skeleton of SideInputHandler without wiring to ULR’s job graphs.

  3. When runner sends back a constant integer, I can see data flowing correctly, up to the point of encoding happens. The example pipeline I use is a WordCount, with an integer sideinput added.

What this PR [1] does not do yet (but otherwise should be, to complete Step One):

  1. When runner sends back a constant Integer (1), there is an error during data encoding:


Caused by: java.lang.IllegalStateException: java.lang.ClassCastException: java.lang.Integer cannot be cast to [B

at org.apache.beam.sdk.coders.ByteArrayCoder.encode(

at org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandlers$StateRequestHandlerToSideInputHandlerFactoryAdapter.handleGetRequest( at org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandlers$StateRequestHandlerToSideInputHandlerFactoryAdapter.handle( at org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.state.StateRequestHandlers$StateKeyTypeDelegatingStateRequestHandler.handle(


I am still trying to understand why this coder throws CastException, please suggest if I did something wrong at high level.




Ruoyun  Huang