git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)


Hi Aljoscha,

Thanks! I will look into this.

Best,

Aaron Levin

On Fri, Nov 9, 2018 at 5:01 AM, Aljoscha Krettek <aljoscha@xxxxxxxxxx> wrote:
Hi,

I think for this case a model that is similar to how the Streaming File Source works should be good. You can have a look at ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The idea is that the first emits splits that should be processed and the second is responsible for reading those splits. A generic version of that is what I'm proposing for the refactoring of our source interface [1] that also comes with a prototype implementation [2].

I think something like this should be adaptable to your case. The split enumerator would at first only emit file splits downstream, after that it would emit Kafka partitions that should be read. The split reader would understand both file splits and kafka partitions and can read from both. This still has some kinks to be worked out when it comes to watermarks, FLIP-27 is not finished.

What do you think?

Best,
Aljoscha

[2] https://github.com/aljoscha/flink/commits/refactor-source-interface


On 1. Nov 2018, at 16:50, Aaron Levin <aaronlevin@xxxxxxxxxx> wrote:

Hey,

Thanks for reaching out! I'd love to take a step back and find a better solution, so I'll try to be succint in what I'm trying to accomplish:

We're trying to write a SourceFunction which:
* reads some Sequence files from S3 in a particular order (each task gets files in a specific order).
* sends a watermark between each sequence file 
* when that's complete, starts reading from Kafka topics.
* (This is similar to the bootstrap problem which Lyft has talked about (see: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink)) 

The current solution I have involves a custom InputFormat, InputSplit, and SplitAssignor. It achieves most of these requirements, except I have to extend InputFormatSourceFunction. I have a class that looks like:

class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}

There are lots I don't like about the existing solution:
* I have to extend InputFormatSourceFunction to ensure the graph is initialized properly (the bug I wrote about)
* I had to replicate most of the implementation of InputFormatSourceFunction so I could insert Watermarks between splits. 

I'd love any suggestions around improving this!

Best,

Aaron Levin

On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <aljoscha@xxxxxxxxxx> wrote:
Hi Aaron,

I'l like to take a step back and understand why you're trying to wrap an InputFormatSourceFunction?

In my opinion, InputFormatSourceFunction should not be used because it has some shortcomings, the most prominent among them that it does not support checkpointing, i.e. in case of failure all data will (probably) be read again. I'm saying probably because the interaction of InputFormatSourceFunction with how InputSplits are generated (which relates to that code snippet with the cast you found) could be somewhat "spooky" and lead to weird results in some cases.

The interface is a remnant of a very early version of the streaming API and should probably be removed soon. I hope we can find a better solution for your problem that fits better with Flink.

Best,
Aljoscha

On 1. Nov 2018, at 15:30, Aaron Levin <aaronlevin@xxxxxxxxxx> wrote:

Hey Friends! Last ping and I'll move this over to a ticket. If anyone can provide any insight or advice, that would be helpful!

Thanks again.

Best,

Aaron Levin

On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronlevin@xxxxxxxxxx> wrote:
Hey,

Not sure how convo threading works on this list, so in case the folks CC'd missed my other response, here's some more info:

First, I appreciate everyone's help! Thank you! 

I wrote several wrappers to try and debug this, including one which is an exact copy of `InputFormatSourceFunction` which also failed. They all failed with the same error I detail above. I'll post two of them below. They all extended `RichParallelSourceFunction` and, as far as I could tell, were properly initialized (though I may have missed something!). Additionally, for the two below, if I change `extends RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, I no longer receive the exception. This is what led me to believe the source of the issue was casting and how I found the line of code where the stream graph is given the input format.

Quick explanation of the wrappers:
1. `WrappedInputFormat` does a basic wrap around `InputFormatSourceFunction` and delegates all methods to the underlying `InputFormatSourceFunction`
2. `ClonedInputFormatSourceFunction` is a ~exact copy of the `InputFormatSourceFunction` source.
3. They're being used in a test which looks vaguely like: `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new InputFormatSourceFunction[String](source, implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq`

class WrappedInputFormat[A](
  inputFormat: InputFormatSourceFunction[A]
)(
  implicit typeInfo: TypeInformation[A]
) extends RichParallelSourceFunction[A] {

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    inputFormat.run(sourceContext)
  }
  override def setRuntimeContext(t: RuntimeContext): Unit = {
    inputFormat.setRuntimeContext(t)
  }
  override def equals(obj: scala.Any) = {
    inputFormat.equals(obj)
  }
  override def hashCode() = { inputFormat.hashCode() }
  override def toString = { inputFormat.toString }
  override def getRuntimeContext(): RuntimeContext = { inputFormat.getRuntimeContext }
  override def getIterationRuntimeContext = { inputFormat.getIterationRuntimeContext }
  override def open(parameters: Configuration): Unit = {
    inputFormat.open(parameters)
  }
  override def cancel(): Unit = {
    inputFormat.cancel()
  }
  override def close(): Unit = {
    inputFormat.close()
  }
}

And the other one:

class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, InputSplit], val typeInfo: TypeInformation[A]) extends RichParallelSourceFunction[A] {

  @transient private var provider: InputSplitProvider = _
  @transient private var serializer: TypeSerializer[A] = _
  @transient private var splitIterator: Iterator[InputSplit] = _
  private var isRunning: Boolean = _

  override def open(parameters: Configuration): Unit = {
    val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
    }
    format.configure(parameters)

    provider = context.getInputSplitProvider
    serializer = typeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
    splitIterator = getInputSplits()
    isRunning = splitIterator.hasNext
  }

  override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = {
    if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
    }

    var nextElement: A = serializer.createInstance()
    try {
      while (isRunning) {
        format.open(splitIterator.next())
        while (isRunning && !format.reachedEnd()) {
          nextElement = format.nextRecord(nextElement)
          if (nextElement != null) {
            sourceContext.collect(nextElement)
          } else {
            break
          }
          format.close()
          if (isRunning) {
            isRunning = splitIterator.hasNext
          }
        }
      }
    } finally {

      format.close()
      if (format.isInstanceOf[RichInputFormat[_,_]]) {
        format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
      }
      isRunning = false
    }
  }

  override def cancel(): Unit = {
    isRunning = false
  }

  override def close(): Unit = {
    format.close()
    if(format.isInstanceOf[RichInputFormat[_,_]]) {
      format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
    }
  }

  private def getInputSplits(): Iterator[InputSplit] = {
    new Iterator[InputSplit] {
      private var nextSplit: InputSplit = _
      private var exhausted: Boolean = _

      override def hasNext: Boolean = {
        if(exhausted) { return false }
        if(nextSplit != null) { return true }
        var split: InputSplit = null

        try {
          split = provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader)
        } catch {
          case e: InputSplitProviderException =>
            throw new RuntimeException("No InputSplit Provider", e)
        }

        if(split != null) {
          nextSplit = split
          true
        } else {
          exhausted = true
          false
        }
      }

      override def next(): InputSplit = {
        if(nextSplit == null && !hasNext) {
          throw new NoSuchElementException()
        }
        val tmp: InputSplit = nextSplit
        nextSplit = null
        tmp
      }

    }
  }
}


On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakowicz@xxxxxxxxxx> wrote:

Hi Aaron,

Could you share the code of you custom function?

I am also adding Aljosha and Kostas to cc, who should be more helpful on that topic.

Best,

Dawid

On 19/10/2018 20:06, Aaron Levin wrote:
Hi,

I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource` and a subsequent sink) I get errors related to the `InputSplitAssigner` not being initialized for a particular vertex ID. Full error here[1].

I believe the underlying error is related to this[0] call to `instanceof InputFormatSourceFunction`.

My questions:

1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, would ya'll be open to a PR which adds an interface one can extend which will set the input format in the stream graph? Or is there a preferred way of achieving this?

Thanks!

Aaron Levin

[1] 
java.lang.RuntimeException: Could not retrieve next input split.
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
    at REDACTED
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed.
    at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
    ... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
...