[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: How to implement @SplitRestriction for Splittable DoFn

Hm which runner is this, does this reproduce with direct runner? The NullPointerException is particularly worrying, I'd like to investigate it.

On Thu, Apr 12, 2018 at 6:49 PM Jiayuan Ma <jiayuanmark@xxxxxxxxx> wrote:
Hi Eugene,

Thanks for your reply. I'm no longer having the previous error. I think that error might be because I didn't do a clean build after upgrading SDK from 2.3.0 to 2.4.0.

However, I'm having other exceptions with my SDF.

java.lang.OutOfMemoryError: unable to create new native thread java.lang.Thread.start0(Native Method) java.lang.Thread.start( java.util.concurrent.ThreadPoolExecutor.addWorker( java.util.concurrent.ThreadPoolExecutor.ensurePrestart( java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute( java.util.concurrent.ScheduledThreadPoolExecutor.schedule( java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule( org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker$ProcessContext.onClaimed( org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.tryClaim(


java.lang.NullPointerException org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone( org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement( org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(

The old pipeline I'm trying to optimize is like

.apply(ParDo.of(new DoFn<KV<String, Iterable<Object>>, KV<String, KV<String, String>>> {
public void process(...) {
Iterable<Object> groupedValues = context.element().getValue();
               for (final Object o1 : groupedValues) {
                 for (final Object o2 : groupedValues) {

The optimization I'm doing right now with SDF is roughly like

public void processElement(ProcessContext context, OffsetRangeTracker tracker) {
final Iterable<Object> groupedValues = context.element().getValue();
final Iterator<Object> it = actions.iterator();

long index = tracker.currentRestriction().getFrom();
Iterators.advance(it, Math.toIntExact(index));

for (; it.hasNext() && tracker.tryClaim(index); ++index) {
final Object o1 =;
for (final Object o2 : actions) {
... same old logic ...

public OffsetRange getInitialRestriction(final KV<String, Iterable<Object>> groupedValues) {
final long size = Iterables.size(groupedValues.getValue());
return new OffsetRange(0, size);

public void splitRestriction(final KV<String, Iterable<Object>> groupedValues,
final OffsetRange range, final OutputReceiver<OffsetRange> receiver) {
  final long size = Iterables.size(groupedValues.getValue());
    for (final OffsetRange p : range.split(1000000 / size, 10)) {

public OffsetRangeTracker newTracker(OffsetRange range) {
return new OffsetRangeTracker(range);


On Wed, Apr 11, 2018 at 3:54 PM, Eugene Kirpichov <kirpichov@xxxxxxxxxx> wrote:
Hi! This looks concerning. Can you show a full code example please? Does it run in direct runner?

On Tue, Apr 10, 2018 at 3:13 PM Jiayuan Ma <jiayuanmark@xxxxxxxxx> wrote:
Hi all,

I'm trying to use ReplicateFn mentioned in this doc in my pipeline to speed up a nested for loop. The use case is exactly the same as "Counting friends in common (cross join by key)"  section. However, I have trouble to make it work with beam 2.4.0 SDK.

I'm implementing @SplitRestriction as follows:

public void splitRestriction(A element, OffsetRange range, OutputReceiver<OffsetRange> out) {
  for (final OffsetRange p : range.split(1000, 10)) {

Dataflow runner throws exception like this:

java.util.NoSuchElementException$1000($ java.util.concurrent.ThreadPoolExecutor.runWorker( java.util.concurrent.ThreadPoolExecutor$

I also tried the following as suggested by the javadoc but it has errors during pipeline construction.

public List<OffsetRange> splitRestriction(A element, OffsetRange range) {
  return range.split(1000, 10);

Without implementing @SplitRestriction, my pipeline can run without any errors. However, I think the SDF is not really splitted by default, which defeats the purpose of improving performance.

Does anyone know if @SplitRestriction is currently supported by Dataflow runner? How can I write a working version with the latest SDK?