git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Submit WordCount to a Flink cluster


Hello,

I am a newbie to Beam.

Following the Beam docs, I am trying to submit the example WordCount to a Flink cluster (one jobmanger and one taskmanager running locally in two linked Docker containers with Maven installed).

It seems the Beam doc is a bit confusing as to how to submit jobs.

In https://beam.apache.org/get-started/quickstart-java/, it mentions I should use

mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
     -Dexec.args="--runner=FlinkRunner --flinkMaster=<flink master> --filesToStage=target/word-count-beam-bundled-0.1.jar \
                  --inputFile=/path/to/quickstart/pom.xml --output=/tmp/counts" -Pflink-runner
where <flink master> seems just a hostname.

In https://beam.apache.org/documentation/runners/flink/, it says I should use

$ mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Pflink-runner \
    -Dexec.args="--runner=FlinkRunner \
      --inputFile=/path/to/pom.xml \
      --output=/path/to/counts \
      --flinkMaster=<flink master url> \
      --filesToStage=target/word-count-beam--bundled-0.1.jar"
where I can give localhost:8081 for flinkMaster.

I have tried run this command both outside Docker and in the jobmanager container (with exec command). Either way, if I use "localhost" without port for <flink master url>, it just runs locally and ignores flink cluster. If I use "localhost:8081", the command hangs for about 5 seconds and shows the following error messages. It eventually disconnects and dies.

Could you help give some hint how the Beam jobs are submitted to Flink cluster in general? Can I do it outside jobmanager node remotely?

Thanks in advance!

--------------------------------------
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Executing pipeline using FlinkRunner.
Jul 14, 2018 12:49:47 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Translating pipeline to Flink program.
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkExecutionEnvironments createBatchExecutionEnvironment
INFO: Creating a Batch Execution Environment.
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
INFO:  enterCompositeTransform-
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
INFO: |    enterCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator visitPrimitiveTransform
INFO: |   |    visitPrimitiveTransform- ReadLines/Read
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
INFO: |    leaveCompositeTransform- ReadLines
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator enterCompositeTransform
INFO: |    enterCompositeTransform- WordCount.CountWords
...
INFO: |   |    leaveCompositeTransform- WriteCounts/WriteFiles
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
INFO: |    leaveCompositeTransform- WriteCounts
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkBatchPipelineTranslator leaveCompositeTransform
INFO:  leaveCompositeTransform-
Jul 14, 2018 12:49:48 AM org.apache.beam.runners.flink.FlinkRunner run
INFO: Starting execution of Flink program.
Jul 14, 2018 12:49:49 AM org.apache.flink.api.java.ExecutionEnvironment createProgramPlan
INFO: The job has 0 registered types and 0 default Kryo serializers
Jul 14, 2018 12:49:49 AM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern pom.xml matched 1 files with total size 10600
Jul 14, 2018 12:49:49 AM org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader get
INFO: Starting client actor system.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
INFO: Trying to select the network interface and address to use by connecting to the leading JobManager.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.util.LeaderRetrievalUtils findConnectingAddress
INFO: TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.net.ConnectionUtils$LeaderConnectingAddressListener findConnectingAddress
INFO: Retrieved new target address localhost/127.0.0.1:8081.
Jul 14, 2018 12:49:49 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Trying to start actor system at c4342d15c3f4:0
Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1 applyOrElse
INFO: Slf4jLogger started
Jul 14, 2018 12:49:50 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Starting remoting
Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Remoting started; listening on addresses :[akka.tcp://flink@c4342d15c3f4:46627]
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.clusterframework.BootstrapTools startActorSystem
INFO: Actor system started at akka.tcp://flink@c4342d15c3f4:46627
Jul 14, 2018 12:49:51 AM org.apache.flink.client.program.ClusterClient logAndSysout
INFO: Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion.
Submitting job with JobID: 87a12b5471d39a7837d6b0def6d748e2. Waiting for job completion.
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor handleMessage
INFO: Received SubmitJobAndWait(JobGraph(jobId: 87a12b5471d39a7837d6b0def6d748e2)) but there is no connection to a JobManager yet.
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobSubmissionClientActor handleCustomMessage
INFO: Received job wordcount-root-0714004948-3ee44b3d (87a12b5471d39a7837d6b0def6d748e2).
Jul 14, 2018 12:49:51 AM org.apache.flink.runtime.client.JobClientActor disconnectFromJobManager
INFO: Disconnect from JobManager null.
Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp
WARNING: Remote connection to [localhost/127.0.0.1:8081] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1213486164 - discarded
Jul 14, 2018 12:49:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$2 apply$mcV$sp
WARNING: Association with remote system [akka.tcp://flink@localhost:8081] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@localhost:8081]] Caused by: [The remote system explicitly disassociated (reason unknown).]
...
Jul 14, 2018 12:50:51 AM akka.event.slf4j.Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3 apply$mcV$sp
INFO: Remoting shut down.
Jul 14, 2018 12:50:51 AM org.apache.beam.runners.flink.FlinkRunner run
SEVERE: Pipeline execution failed
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:419)
at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:208)
at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:185)
...