[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

flink 1.4.2 NPE after job startup when using managed ValueState

Dear Flink Users,
I have two environments, one old, one new.  I'm trying to migrate my flink job.  The exact same application code that runs in one environment, fails in the second with a very confusing NPE (listed below).  I can garuntee the application code only calls ValueState.get() inside of a keyed context (and works perfectly in my other environment).  I get the same error when I try with rocksDB other than the stack trace changes just a little bit.  I jumped into the source code, but I sort of lose it when InternalKeyedContext is passed via the constructor (I'm not sure where to jump to from there).  

Other than the obvious (keyed context) why else might I get this error?  (application code follows exception)

java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
    at org.apache.flink.util.Preconditions.checkNotNull(
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.checkKeyNamespacePreconditions(
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.computeHashForOperationAndDoIncrementalRehash(
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
    at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(
    at org.apache.flink.runtime.state.heap.HeapValueState.value(
    at com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:30)
    at com.physiq.vitalink.timeseries.processor.seriesframecoverage.WriteDataCoverage.flatMap(WriteDataCoverage.kt:12)
    at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(

Application code---

    override fun open(config: Configuration) {
        val descriptor = ValueStateDescriptor <HashMap<Long, ArrayList<Long>>>(dataName, TypeInformation.of(object : TypeHint<HashMap<Long, ArrayList<Long>>>() {}))
        coverageData = runtimeContext.getState(descriptor)

    override fun flatMap(value: CoverageWithPartitionInfo, out: Collector<String>) {

        var currentCoverage = coverageData.value()
---  this is the line that blows up on one environment but not others ^^ ---