git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Exception occurred while processing valve output watermark & NullPointerException


Any guidance would be most appreciated.

Thx

Steve
===========================================

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
	... 14 more
Caused by: java.lang.NullPointerException
	at java.lang.String.contains(String.java:2133)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
	... 19 more


==================================================

The code

      // Consume the data streams from AWS Kinesis stream
DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
pt.getRequired("stream"),
new EventSchema(),
kinesisConsumerConfig))
.name("Kinesis Stream Consumer");

//dataStream.print();

DataStream<Event> kinesisStream = dataStream
.assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
.map(event -> (IoTEvent) event);

// Prints the mapped records from the Kinesis stream

//kinesisStream.print();


Pattern<Event, ?> pattern = Pattern
.<Event> begin("first event").subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>()
{
//private static final long serialVersionUID = -6301755149429716724L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("second")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
//private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("third")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.next("fourth")
.subtype(IoTEvent.class)
.where(new IterativeCondition<IoTEvent>() {
private static final long serialVersionUID = 2392863109523984059L;

@Override
public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
}
})
.within(Time.seconds(10));


// Match the pattern in the input data stream
PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern);

// Detects MOTION pattern match and alert
DataStream<Alert> alerts = patternStream.select(
new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
Alert alert = new Alert(pattern);
System.out.printf("AUDIO ALERT\n");


return alert;
}

}).name("Audio Alert Sink");