Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)


I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource` and a subsequent sink) I get errors related to the `InputSplitAssigner` not being initialized for a particular vertex ID. Full error here[1].

I believe the underlying error is related to this[0] call to `instanceof InputFormatSourceFunction`.

My questions:

1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, would ya'll be open to a PR which adds an interface one can extend which will set the input format in the stream graph? Or is there a preferred way of achieving this?


Aaron Levin

java.lang.RuntimeException: Could not retrieve next input split.
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
    at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at java.util.concurrent.CompletableFuture.reportGet(
    at java.util.concurrent.CompletableFuture.get(
    at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(
    ... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(
    at java.lang.reflect.Method.invoke(
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(