git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: error while joining two datastream


Looks like you need to assign time stamps and emit watermarks to both the streams viz. formatStream1 and formatStream2 as described at 

On Thu, Nov 22, 2018 at 10:55 PM Abhijeet Kumar <abhijeet.kumar@xxxxxxxxxxxx> wrote:
Hello Team,

I'm new to Flink and coming from Spark background. I need help in completing this stream job. I'm reading data from two different Kafka topics and I want to join them.

My code:

formatStream1.join(formatStream2)
.where(new KeySelector<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple11<String, String, String, String, String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
})
.equalTo(new KeySelector<Tuple7<String, String, String, String, String, String, Long>, String>() {
public String getKey(Tuple7<String, String, String, String, String, String, Long> t1) throws Exception {
return t1.f0;
}
}).window(TumblingEventTimeWindows.of(Time.seconds(15)))
.apply(new JoinFunction<Tuple11<String, String, String, String, String, String, String, String, String, String, Long>, Tuple7<String, String, String, String, String, String, Long>, Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>>() {

public Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long> join(
Tuple11<String, String, String, String, String, String, String, String, String, String, Long> first,
Tuple7<String, String, String, String, String, String, Long> second) {
return new Tuple17<String, String, String, String, String, String, String, String, String, String, String, String, String, String, String, Long, Long>(first.f0, first.f1, first.f2, first.f3, first.f4, first.f5, first.f6, first.f7, first.f8, first.f9, second.f1, second.f2, second.f3, second.f4, second.f5, second.f6, first.f10);
}
}).print();


Error:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:630)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
at com.argoid.flink.streaming.kafkajoin.JoinKafka.main(JoinKafka.java:155)
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
at org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows(TumblingEventTimeWindows.java:69)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:295)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

In formatStream1 and formatStream2 variable data is coming I checked by printing them. So, the issue is in the code which I shared. Thanks in advance!!!

Thanks,


Abhijeet Kumar
Software Development Engineer,
Sentienz Solutions Pvt Ltd
Cognitive Data Platform - Perceive the Data !

--
Regards,
Nagarjun

Success is not final, failure is not fatal: it is the courage to continue that counts. 
- Winston Churchill -