git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

RE: Problem with SparkSubmit


Hi, Fokko
Thanks for you help.

I got new error:
ERROR - [Errno 2] No such file or directory: 'spark-submit'
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1493, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 145, in execute
    self._hook.submit(self._application)
  File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 231, in submit
    **kwargs)
  File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
    restore_signals, start_new_session)
  File "/usr/lib/python3.5/subprocess.py", line 1551, in _execute_child
    raise child_exception_type(errno_num, err_msg)

In my case spark-submit isn't added to PATH and I can't do it. I can't find information how I should configure spark submit operator for this case.
Could you help me? Should I set some os.environ for define path to spark-submit script?

Best Regards,
Anton


-----Original Message-----
From: fokko@xxxxxxxxxxxxxxx <fokko@xxxxxxxxxxxxxxx> On Behalf Of Driesprong, Fokko
Sent: Monday, May 07, 2018 9:21 PM
To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
Subject: Re: Problem with SparkSubmit

Hi Anton,

I see the issue now. You're passing the to the jars argument. But this is actually for additional jars that need to be passed on to the spark classpath. For example additional jars that provide UDF's. You need to pass the jar to the application argument.

_config = {
    'application': 'spark_job.jar',
    'executor_memory': '2g',
    'name': 'myJob',
    'conn_id': connection_id,
    'java_class':'org.Job'
}

operator = SparkSubmitOperator(
    task_id='myTask',
    dag=dag,
    **_config
)

Hope this helps.

Cheers, Fokko

2018-05-07 6:54 GMT+02:00 Anton Mushin <Anton_Mushin@xxxxxxxx>:

> Hi,  Fokko
> Thanks for your reply.
>
> I use version 1.9.0
>
> -----Original Message-----
> From: fokko@xxxxxxxxxxxxxxx <fokko@xxxxxxxxxxxxxxx> On Behalf Of 
> Driesprong, Fokko
> Sent: Saturday, April 28, 2018 10:54 PM
> To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
> Subject: Re: Problem with SparkSubmit
>
> Hi Anton,
>
> Which version of Airflow are you running?
>
> Cheers, Fokko
>
> 2018-04-27 10:24 GMT+02:00 Anton Mushin <Anton_Mushin@xxxxxxxx>:
>
> > Hi all,
> > I have problem with spark operator. I get exception
> >
> > user@host:/# airflow test myDAG myTask 2018-04-26
> > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar 
> > tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar 
> > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor 
> > SequentialExecutor
> > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the 
> > DagBag from /usr/local/airflow/dags
> > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> > sparkhost
> > Traceback (most recent call last):
> >   File "/usr/local/bin/airflow", line 27, in <module>
> >     args.func(args)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py",
> > line 528, in test
> >     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1584, in run
> >     session=session)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1493, in _run_raw_task
> >     result = task_copy.execute(context=context)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> > operators/spark_submit_operator.py", line 145, in execute
> >     self._hook.submit(self._application)
> >   File
> > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_
> > su
> > bmit_hook.py",
> > line 231, in submit
> >     **kwargs)
> >   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
> >     restore_signals, start_new_session)
> >   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
> >     restore_signals, start_new_session, preexec_fn)
> > TypeError: Can't convert 'list' object to str implicitly
> >
> > My DAG look like:
> >
> > from airflow import DAG
> > from datetime import datetime, timedelta, date from 
> > airflow.contrib.operators.spark_submit_operator import 
> > SparkSubmitOperator
> >
> > default_args = {
> >     'owner': 'spark',
> >     'depends_on_past': False,
> >     'start_date': datetime.now(),
> >     'retries': 1,
> >     'retry_delay': timedelta(minutes=1) }
> >
> > dag = DAG('myDAG', default_args=default_args,)
> >
> > connection_id = "SPARK"
> > os.environ[('AIRFLOW_CONN_%s' % connection_id)] =
> 'spark://sparkhost:7077'
> >
> > _config = {
> >     'jars': 'spark_job.jar',
> >     'executor_memory': '2g',
> >     'name': 'myJob',
> >     'conn_id': connection_id,
> >     'java_class':'org.Job'
> > }
> >
> > operator = SparkSubmitOperator(
> >     task_id='myTask',
> >     dag=dag,
> >     **_config
> > )
> >
> > What is wrong? Could somebody help me wit it?
> >
> >
>