Re: Store Predicate or any lambda in MapState

Here are the error logs. 

First error log was encountered when getting the values from the MapState.

java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879.1452224137 at at java.lang.ClassLoader.loadClass( at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass( at java.lang.ClassLoader.loadClass( ... 22 common frames omitted Wrapped by: java.lang.NoClassDefFoundError: com/test/MatcherFactory$$Lambda$879/1452224137 at sun.reflect.GeneratedSerializationConstructorAccessor282.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance( at org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance( at com.esotericsoftware.kryo.Kryo.newInstance( at com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy( ... 17 frames truncated
Subsequent error logs were encountered on task manager restart (for the same job).
java.lang.ClassNotFoundException: com.test.MatcherFactory$$Lambda$879/1452224137 at java.lang.Class.forName0( at java.lang.Class.forName( at com.esotericsoftware.kryo.util.DefaultClassResolver.readName( ... 18 common frames omitted Wrapped by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.test.MatcherFactory$$Lambda$879/1452224137 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName( at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( at com.esotericsoftware.kryo.Kryo.readClass( at com.esotericsoftware.kryo.Kryo.readClassAndObject( at ... 8 frames truncated ... 6 common frames omitted Wrapped by: java.lang.IllegalStateException: Could not initialize keyed state backend. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState( at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState( at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators( at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState( at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( ... 2 frames truncated

Hey Jayant, 

I don't really think that the sole fact of using Predicate should cause the ClassNotFoundException that You are talking about. The exception may come from the fact that some libraries are missing from Your cluster environment. Have You tried running the job locally to verify that the exception occurs? Also, could You please paste some logs here, they may help in determining the exact reason for the problem.

I want to store a custom POJO in the MapState. One of the fields in the object is a java.util.function.Predicate type. 
Flink gives ClassNotFoundException exception on the lambda. How do I store this object in the mapState?

Marking the predicate field as transient is an option. But in my use-case, the predicate field is set using another library, and I don't want to call it every time I want.

