git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Problem with SparkSubmit


Hi Anton,

I see the issue now. You're passing the to the jars argument. But this is
actually for additional jars that need to be passed on to the spark
classpath. For example additional jars that provide UDF's. You need to pass
the jar to the application argument.

_config = {
    'application': 'spark_job.jar',
    'executor_memory': '2g',
    'name': 'myJob',
    'conn_id': connection_id,
    'java_class':'org.Job'
}

operator = SparkSubmitOperator(
    task_id='myTask',
    dag=dag,
    **_config
)

Hope this helps.

Cheers, Fokko

2018-05-07 6:54 GMT+02:00 Anton Mushin <Anton_Mushin@xxxxxxxx>:

> Hi,  Fokko
> Thanks for your reply.
>
> I use version 1.9.0
>
> -----Original Message-----
> From: fokko@xxxxxxxxxxxxxxx <fokko@xxxxxxxxxxxxxxx> On Behalf Of
> Driesprong, Fokko
> Sent: Saturday, April 28, 2018 10:54 PM
> To: dev@xxxxxxxxxxxxxxxxxxxxxxxxxxxx
> Subject: Re: Problem with SparkSubmit
>
> Hi Anton,
>
> Which version of Airflow are you running?
>
> Cheers, Fokko
>
> 2018-04-27 10:24 GMT+02:00 Anton Mushin <Anton_Mushin@xxxxxxxx>:
>
> > Hi all,
> > I have problem with spark operator. I get exception
> >
> > user@host:/# airflow test myDAG myTask 2018-04-26
> > [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar
> > tables from /usr/lib/python3.5/lib2to3/Grammar.txt
> > [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar
> > tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
> > [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor
> > SequentialExecutor
> > [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag
> > from /usr/local/airflow/dags
> > [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
> > sparkhost
> > Traceback (most recent call last):
> >   File "/usr/local/bin/airflow", line 27, in <module>
> >     args.func(args)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py",
> > line 528, in test
> >     ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1584, in run
> >     session=session)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py",
> > line 50, in wrapper
> >     result = func(*args, **kwargs)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py",
> > line 1493, in _run_raw_task
> >     result = task_copy.execute(context=context)
> >   File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/
> > operators/spark_submit_operator.py", line 145, in execute
> >     self._hook.submit(self._application)
> >   File
> > "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_su
> > bmit_hook.py",
> > line 231, in submit
> >     **kwargs)
> >   File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
> >     restore_signals, start_new_session)
> >   File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
> >     restore_signals, start_new_session, preexec_fn)
> > TypeError: Can't convert 'list' object to str implicitly
> >
> > My DAG look like:
> >
> > from airflow import DAG
> > from datetime import datetime, timedelta, date from
> > airflow.contrib.operators.spark_submit_operator import
> > SparkSubmitOperator
> >
> > default_args = {
> >     'owner': 'spark',
> >     'depends_on_past': False,
> >     'start_date': datetime.now(),
> >     'retries': 1,
> >     'retry_delay': timedelta(minutes=1) }
> >
> > dag = DAG('myDAG', default_args=default_args,)
> >
> > connection_id = "SPARK"
> > os.environ[('AIRFLOW_CONN_%s' % connection_id)] =
> 'spark://sparkhost:7077'
> >
> > _config = {
> >     'jars': 'spark_job.jar',
> >     'executor_memory': '2g',
> >     'name': 'myJob',
> >     'conn_id': connection_id,
> >     'java_class':'org.Job'
> > }
> >
> > operator = SparkSubmitOperator(
> >     task_id='myTask',
> >     dag=dag,
> >     **_config
> > )
> >
> > What is wrong? Could somebody help me wit it?
> >
> >
>