[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

How to implement @SplitRestriction for Splittable DoFn

Hi all,

I'm trying to use ReplicateFn mentioned in this doc in my pipeline to speed up a nested for loop. The use case is exactly the same as "Counting friends in common (cross join by key)"  section. However, I have trouble to make it work with beam 2.4.0 SDK.

I'm implementing @SplitRestriction as follows:

public void splitRestriction(A element, OffsetRange range, OutputReceiver<OffsetRange> out) {
  for (final OffsetRange p : range.split(1000, 10)) {

Dataflow runner throws exception like this:

java.util.NoSuchElementException$1000($ java.util.concurrent.ThreadPoolExecutor.runWorker( java.util.concurrent.ThreadPoolExecutor$

I also tried the following as suggested by the javadoc but it has errors during pipeline construction.

public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
  return range.split(1000, 10);

Without implementing @SplitRestriction, my pipeline can run without any errors. However, I think the SDF is not really splitted by default, which defeats the purpose of improving performance.

Does anyone know if @SplitRestriction is currently supported by Dataflow runner? How can I write a working version with the latest SDK?