git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)


Hi Aaron,

I'l like to take a step back and understand why you're trying to wrap an InputFormatSourceFunction?

In my opinion, InputFormatSourceFunction should not be used because it has some shortcomings, the most prominent among them that it does not support checkpointing, i.e. in case of failure all data will (probably) be read again. I'm saying probably because the interaction of InputFormatSourceFunction with how InputSplits are generated (which relates to that code snippet with the cast you found) could be somewhat "spooky" and lead to weird results in some cases.

The interface is a remnant of a very early version of the streaming API and should probably be removed soon. I hope we can find a better solution for your problem that fits better with Flink.

Best,
Aljoscha

On 1. Nov 2018, at 15:30, Aaron Levin <aaronlevin@xxxxxxxxxx> wrote:

Hey Friends! Last ping and I'll move this over to a ticket. If anyone can provide any insight or advice, that would be helpful!

Thanks again.

Best,

Aaron Levin

On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronlevin@xxxxxxxxxx> wrote:
Hey,

Not sure how convo threading works on this list, so in case the folks CC'd missed my other response, here's some more info:

First, I appreciate everyone's help! Thank you! 

I wrote several wrappers to try and debug this, including one which is an exact copy of `InputFormatSourceFunction` which also failed. They all failed with the same error I detail above. I'll post two of them below. They all extended `RichParallelSourceFunction` and, as far as I could tell, were properly initialized (though I may have missed something!). Additionally, for the two below, if I change `extends RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, I no longer receive the exception. This is what led me to believe the source of the issue was casting and how I found the line of code where the stream graph is given the input format.

Quick explanation of the wrappers:
1. `WrappedInputFormat` does a basic wrap around `InputFormatSourceFunction` and delegates all methods to the underlying `InputFormatSourceFunction`
2. `ClonedInputFormatSourceFunction` is a ~exact copy of the `InputFormatSourceFunction` source.
3. They're being used in a test which looks vaguely like: `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new InputFormatSourceFunction[String](source, implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq`

class WrappedInputFormat[A](
  inputFormat: InputFormatSourceFunction[A]
)(
  implicit typeInfo: TypeInformation[A]
) extends RichParallelSourceFunction[A] {

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    inputFormat.run(sourceContext)
  }
  override def setRuntimeContext(t: RuntimeContext): Unit = {
    inputFormat.setRuntimeContext(t)
  }
  override def equals(obj: scala.Any) = {
    inputFormat.equals(obj)
  }
  override def hashCode() = { inputFormat.hashCode() }
  override def toString = { inputFormat.toString }
  override def getRuntimeContext(): RuntimeContext = { inputFormat.getRuntimeContext }
  override def getIterationRuntimeContext = { inputFormat.getIterationRuntimeContext }
  override def open(parameters: Configuration): Unit = {
    inputFormat.open(parameters)
  }
  override def cancel(): Unit = {
    inputFormat.cancel()
  }
  override def close(): Unit = {
    inputFormat.close()
  }
}

And the other one:

class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, InputSplit], val typeInfo: TypeInformation[A]) extends RichParallelSourceFunction[A] {

  @transient private var provider: InputSplitProvider = _
  @transient private var serializer: TypeSerializer[A] = _
  @transient private var splitIterator: Iterator[InputSplit] = _
  private var isRunning: Boolean = _

  override def open(parameters: Configuration): Unit = {
    val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
    }
    format.configure(parameters)

    provider = context.getInputSplitProvider
    serializer = typeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
    splitIterator = getInputSplits()
    isRunning = splitIterator.hasNext
  }

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
    }

    var nextElement: A = serializer.createInstance()
    try {
      while (isRunning) {
        format.open(splitIterator.next())
        while (isRunning && !format.reachedEnd()) {
          nextElement = format.nextRecord(nextElement)
          if (nextElement != null) {
            sourceContext.collect(nextElement)
          } else {
            break
          }
          format.close()
          if (isRunning) {
            isRunning = splitIterator.hasNext
          }
        }
      }
    } finally {

      format.close()
      if (format.isInstanceOf[RichInputFormat[_,_]]) {
        format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
      }
      isRunning = false
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }

  override def close(): Unit = {
    format.close()
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
    }
  }

  private def getInputSplits(): Iterator[InputSplit] = {
    new Iterator[InputSplit] {
      private var nextSplit: InputSplit = _
      private var exhausted: Boolean = _

      override def hasNext: Boolean = {
        if(exhausted) { return false }
        if(nextSplit != null) { return true }
        var split: InputSplit = null

        try {
          split = provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
        } catch {
          case e: InputSplitProviderException =>
            throw new RuntimeException("No InputSplit Provider", e)
        }

        if(split != null) {
          nextSplit = split
          true
        } else {
          exhausted = true
          false
        }
      }

      override def next(): InputSplit = {
        if(nextSplit == null && !hasNext) {
          throw new NoSuchElementException()
        }
        val tmp: InputSplit = nextSplit
        nextSplit = null
        tmp
      }

    }
  }
}


On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakowicz@xxxxxxxxxx> wrote:

Hi Aaron,

Could you share the code of you custom function?

I am also adding Aljosha and Kostas to cc, who should be more helpful on that topic.

Best,

Dawid

On 19/10/2018 20:06, Aaron Levin wrote:
Hi,

I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource` and a subsequent sink) I get errors related to the `InputSplitAssigner` not being initialized for a particular vertex ID. Full error here[1].

I believe the underlying error is related to this[0] call to `instanceof InputFormatSourceFunction`.

My questions:

1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, would ya'll be open to a PR which adds an interface one can extend which will set the input format in the stream graph? Or is there a preferred way of achieving this?

Thanks!

Aaron Levin

[1] 
java.lang.RuntimeException: Could not retrieve next input split.
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
    at REDACTED
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
    at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
    ... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
...