[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to implement @SplitRestriction for Splittable DoFn

Hi Eugene,

Thanks for your reply. I'm no longer having the previous error. I think that error might be because I didn't do a clean build after upgrading SDK from 2.3.0 to 2.4.0.

However, I'm having other exceptions with my SDF.

java.lang.OutOfMemoryError: unable to create new native thread java.lang.Thread.start0(Native Method) java.lang.Thread.start( java.util.concurrent.ThreadPoolExecutor.addWorker( java.util.concurrent.ThreadPoolExecutor.ensurePrestart( java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute( java.util.concurrent.ScheduledThreadPoolExecutor.schedule( java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule( org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed( org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.tryClaim(


java.lang.NullPointerException org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone( org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement( org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(

The old pipeline I'm trying to optimize is like

.apply(ParDo.of(new DoFn<KV<String, Iterable<Object>>, KV<String, KV<String, String>>> {
public void process(...) {
Iterable<Object> groupedValues = context.element().getValue();
               for (final Object o1 : groupedValues) {
                 for (final Object o2 : groupedValues) {

The optimization I'm doing right now with SDF is roughly like

public void processElement(ProcessContext context, OffsetRangeTracker tracker) {
final Iterable<Object> groupedValues = context.element().getValue();
final Iterator<Object> it = actions.iterator();

long index = tracker.currentRestriction().getFrom();
Iterators.advance(it, Math.toIntExact(index));

for (; it.hasNext() && tracker.tryClaim(index); ++index) {
final Object o1 =;
for (final Object o2 : actions) {
... same old logic ...

public OffsetRange getInitialRestriction(final KV<String, Iterable<Object>> groupedValues) {
final long size = Iterables.size(groupedValues.getValue());
return new OffsetRange(0, size);

public void splitRestriction(final KV<String, Iterable<Object>> groupedValues,
final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
  final long size = Iterables.size(groupedValues.getValue());
    for (final OffsetRange p : range.split(1000000 / size, 10)) {

public OffsetRangeTracker newTracker(OffsetRange range) {
return new OffsetRangeTracker(range);


On Wed, Apr 11, 2018 at 3:54 PM, Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote:
Hi! This looks concerning. Can you show a full code example please? Does it run in direct runner?

On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <jiayuanmark@xxxxxxxxx> wrote:
Hi all,

I'm trying to use ReplicateFn mentioned in this doc in my pipeline to speed up a nested for loop. The use case is exactly the same as "Counting friends in common (cross join by key)"  section. However, I have trouble to make it work with beam 2.4.0 SDK.

I'm implementing @SplitRestriction as follows:

public void splitRestriction(A element, OffsetRange range, OutputReceiver<OffsetRange> out) {
  for (final OffsetRange p : range.split(1000, 10)) {

Dataflow runner throws exception like this:

java.util.NoSuchElementException$1000($ java.util.concurrent.ThreadPoolExecutor.runWorker( java.util.concurrent.ThreadPoolExecutor$

I also tried the following as suggested by the javadoc but it has errors during pipeline construction.

public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
  return range.split(1000, 10);

Without implementing @SplitRestriction, my pipeline can run without any errors. However, I think the SDF is not really splitted by default, which defeats the purpose of improving performance.

Does anyone know if @SplitRestriction is currently supported by Dataflow runner? How can I write a working version with the latest SDK?