git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: kafkaIO Run with Spark Runner: "streaming-job-executor-0"


Can you check the logs on the worker? 

On Wed, Jun 13, 2018 at 2:26 AM <linrick@xxxxxxxxxxx> wrote:

Dear all,

 

I am using the kafkaIO in my project (Beam 2.0.0 with Spark runner).

My running environment is:

OS: Ubuntn 14.04.3 LTS

The different version for these tools:

JAVA: JDK 1.8

Beam 2.0.0 (Spark runner with Standalone mode)

Spark 1.6.0

Standalone mode :One driver node: ubuntu7; One master node: ubuntu8; Two worker nodes: ubuntu8 and ubuntu9

Kafka: 2.10-0.10.1.1

 

The java code of my project is:

==============================================================================

SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

options.setSparkMaster("spark://ubuntu8:7077");

options.setAppName("App kafkaBeamTest");

options.setJobName("Job kafkaBeamTest");

options.setMaxRecordsPerBatch(1000L);

 

Pipeline p = Pipeline.create(options);

 

System.out.println("Beamtokafka");

PCollection<KV<Long, String>> readData = p.apply(KafkaIO.<Long, String>read()

.withBootstrapServers(ubuntu7:9092)

.withTopic("kafkasink")

.withKeyDeserializer(LongDeserializer.class)

.withValueDeserializer(StringDeserializer.class)

       .withoutMetadata()

       );

 

PCollection<KV<Long, String>> readDivideData = readData.

apply(Window.<KV<Long,String>>into(FixedWindows.of(Duration.standardSeconds(1)))

     .triggering(AfterWatermark.pastEndOfWindow()

       .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

     .withAllowedLateness(Duration.ZERO) .discardingFiredPanes());

 

System.out.println("CountData");

 

PCollection<KV<Long, Long>> countData = readDivideData.apply(Count.perKey());

 

p.run();

==============================================================================

 

The message of error is:

==============================================================================

Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Object.wait(Object.java:502)

        at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)

        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)

        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)

        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:912)

        at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

        at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)

at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

... 2 more

==============================================================================

 

Maven 3.5.0, in which related dependencies are listed in my project’s pom.xml:

<dependency>

<groupId>org.apache.beam</groupId>

  <artifactId>beam-sdks-java-core</artifactId>

  <version>2.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.beam</groupId>

   <artifactId>beam-sdks-java-io-kafka</artifactId>

   <version>2.0.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

  <artifactId>spark-core_2.10</artifactId>

  <version>1.6.0</version>

</dependency>

<dependency>

<groupId>org.apache.spark</groupId>

  <artifactId>spark-streaming_2.10</artifactId>

  <version>1.6.0</version>

</dependency>

 

<dependency>

<groupId>org.apache.kafka</groupId>

  <artifactId>kafka-clients</artifactId>

  <version>0.10.1.1</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

  <artifactId>kafka_2.10</artifactId>

  <version>0.10.1.1</version>

</dependency>

 

 

When I use the above code in Spark Runner (Local [4]), this project worked well (2000~4000 data/s). However, if I run it on Standalone mode, it failed along with the above error.

 

If you have any idea about the error ("streaming-job-executor-0"), I am looking forward to hearing from you.

 

Note that: perform command line is “./spark-submit --class com.itri.beam.kafkatest --master spark:// ubuntu8:7077 /root/BeamKafkaAdvanced-0.1-shaded.jar --runner=SparkRunner”

 

Thanks

 

Rick

 

 



--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.