git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Exception occurred while processing valve output watermark & NullPointerException


Hi,

I think vino is right. It seems that the NullPointerException comes from your condition. Please add handling of the situation when the string that you are comparing is null.

Best,

Dawid


On 21/11/2018 04:32, vino yang wrote:
Hi Steve,

It seems the NPE caused by the property of the IoTEvent's instance. Can you make sure the property is not null?

Thanks, vino.

Steve Bistline <srbistline.tech@xxxxxxxxx> 于2018年11月21日周三 上午2:09写道:
Any guidance would be most appreciated.

Thx

Steve
===========================================

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
	... 14 more
Caused by: java.lang.NullPointerException
	at java.lang.String.contains(String.java:2133)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
	... 19 more


==================================================

The code

      // Consume the data streams from AWS Kinesis stream
        DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
                pt.getRequired("stream"),
                new EventSchema(),
                kinesisConsumerConfig))
                .name("Kinesis Stream Consumer");

       //dataStream.print();

        DataStream<Event> kinesisStream = dataStream
                .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
                .map(event -> (IoTEvent) event);

        // Prints the mapped records from the Kinesis stream

        //kinesisStream.print();


        Pattern<Event, ?> pattern = Pattern
                .<Event> begin("first event").subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>()
                {
                    //private static final long serialVersionUID = -6301755149429716724L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                    }
                })
                .next("second")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    //private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                    }
                })
                .next("third")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                   }
                })
                .next("fourth")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID = 2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
                        return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
                    }
                })
                .within(Time.seconds(10));


        // Match the pattern in the input data stream
        PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern);

        // Detects MOTION pattern match and alert
        DataStream<Alert> alerts = patternStream.select(
                new PatternSelectFunction<Event, Alert>() {
                    @Override
                    public Alert select(Map<String, List<Event>> pattern) throws Exception {
                        Alert alert = new Alert(pattern);
                        System.out.printf("AUDIO ALERT\n");


                        return alert;
                    }

        }).name("Audio Alert Sink");

Attachment: signature.asc
Description: OpenPGP digital signature