Category Archives: spark

Get your spark job running on spark job server with Mesos and Docker

Spark Job Server (SJS) has a readme on how to run spark jobs, but some details are missing when you want to quickly deploy a spark job server as a docker, and able to submit a job to Mesos with Spark dockers. Writing spark job (sqlcontext in this blog) requires certain configurations as well.

Components include Spark Job Server (as docker, spark 1.6.1 for mesos 0.25.0), Spark Job (scala), Mesos (0.25.0) and Spark docker (1.6.1 with mesos 0.25.0).

0. Get Default Spark Job Server docker to running first

Docker is the most convenient way to start a SJS server. The only problem is that SJS docker is written as sbt-docker instead of conventional Dockerfile.  Later we will have to change the build.sbt to customize our docker, but before that let’s run the default one to test first.

Follow the docs docker,  we pulled the default docker image from dockerhub and SJS started automatically.

docker run -d -p 8090:8090 velvia/spark-jobserver:0.6.2.mesos-0.28.1.spark-1.6.1

Obtain the wordcount jar by git clone repo and make sure you have sbt installed. Remember to checkout branch with version 0.6.2, instead of master. Run packaging at project root

sbt job-server-tests/package

allocate the jar in generated target folder and follow the README.md.

upload the jar

curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests-$VER.jar localhost:8090/jars/test
curl -d "input.string = a b c a b see" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.WordCountExample'

Obtain the sql context jar for testing sql context. Follow doc/contexts.md

sbt 'job-server-extras/package'
curl --data-binary @job-server-extras/job-server-extras/target/scala-2.10/job-server-extras_2.10-0.6.2-SNAPSHOT-tests.jar  127.0.0.1:8090/jars/sql

create sql context
curl -d "" '127.0.0.1:8090/contexts/sql-context?context-factory=spark.jobserver.context.SQLContextFactory'

curl -d "sql = \"select * from addresses limit 10\"" '127.0.0.1:8090/jobs?appName=sql&classPath=spark.jobserver.SqlTestJob&context=sql-context&sync=true'

If wired thing happen when building jars, do ‘sbt clean’

1. Build Customized SJS docker

If start SJS docker by specifying

-e SPARK_MASTER=mesos:zk://

and submit to your mesos cluster, it is probably not working out of the box, due to different mesos version or docker network problem. The symptom is from mesos cluster, you will see a framework keep trying to register with mesos but fail immediately. Check mesos log which will be only available for mesos leader not others.

Shutdown failed on fd=117: Transport endpoint is not connected [107]

From SJS server log, you will see

ERROR ocalContextSupervisorActor [] [] – Exception after sending Initialize to JobManagerActor
akka.pattern.AskTimeoutException: Ask timed out on

From README.md, try to run SJS docker with HOST network if on AWS EC2. ‘–net=host’, and submit wordcount job again. Observe Mesos UI you will see your wordcount framework will be registered, even though it will still fail for other reasons.

Note, if problem still exists, it is likely due to some firewall issue or connection traps (e.g. Windows as your development machine).

Registered framework still fails because of following error.

Fetching URI ”
Failed to fetch ”: A relative path was passed for the resource but the Mesos framework home was not specified. Please either provide this config option or avoid using a relative path
Failed to synchronize with slave (it’s probably exited)

URI refers to SPARK_EXECUTOR_URI, and SJS by default tries to start server with option

-Dspark.executor.uri=

So it is bug, since if I don’t have anything to specify, just remove this option.

To fix the bug, go to bin/setenv.sh, replace with following, and rebuild docker.

# For Mesos
CONFIG_OVERRIDES=""
if [ -n "$SPARK_EXECUTOR_URI" ]; then
 CONFIG_OVERRIDES="-Dspark.executor.uri=$SPARK_EXECUTOR_URI "
fi

To rebuild docker, at project root, issue ‘sbt docker’, if you see

unknown shorthand flag: ‘f’ in -f

be sure to update sbt-docker plugin to 1.4.0 for removing deprecated flag. It is because sbt-docker 1.3.0 is using -f flag which is deprecated by docker 1.12.

From here, you probably find we have not told mesos where is the spark distribution. So slave error will be

sh: 1: /usr/local/spark/bin/spark-class: not found

In fact, spark.mesos.executor.docker.image is required to pass as SJS argument.

We create a script for our customized docker run. The idea is from ISSUE

#!/usr/bin/env sh

SPARK_MASTER="mesos://zk://xxx"
SPARK_IMAGE="xxx"
SJS_IMAGE="xxx"

docker run -d \
 -p 8090:8090 \
 --net host \
 -e SPARK_MASTER=${SPARK_MASTER} \
 -e SPARK_IMAGE=${SPARK_IMAGE} \
 $SJS_IMAGE \
 app/server_start.sh --conf spark.mesos.coarse=true --conf spark.mesos.executor.docker.image=${SPARK_IMAGE} --conf spark.mesos.executor.home=/opt/spark

Note:  It is possible that mesos in the docker is not compatible with your mesos cluster. To build a compatible mesos version with the SJS docker, we will have to customize the source code. Change mesos version in dependences.scala, to the compatible version which sbt-docker plugin can download from mesos repos, and then build the docker from.

2. Run examples to test on Mesos

Pass the word count jar to SJS and execute the job as before, and you will see it is beautifully executing.

Pass the sql context jar to SJS, and define the sqlcontext as before, and you will see it is running like a charm.

3. Write your own spark job running on Mesos

Suppose you want to use package like for avro, and access data from AWS S3. Case like this will require you to install external dependencies.

To begin, I recommend using maven to create a spark project with archetype.

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-core_${scala.tools.version}</artifactId>
 <version>${spark.version}</version>
 <scope>provided</scope>
</dependency>

<dependency>
 <groupId>org.apache.spark</groupId>
 <artifactId>spark-sql_2.10</artifactId>
 <version>1.6.1</version>
 <scope>provided</scope>
</dependency>

<dependency>
 <groupId>spark.jobserver</groupId>
 <artifactId>job-server-extras_2.10</artifactId>
 <version>0.6.2</version>
 <scope>provided</scope>
</dependency>

<dependency>
 <groupId>spark.jobserver</groupId>
 <artifactId>job-server-api_${scala.tools.version}</artifactId>
 <version>0.6.2</version>
 <scope>provided</scope>
</dependency>

<dependency>
 <groupId>com.databricks</groupId>
 <artifactId>spark-avro_2.10</artifactId>
 <version>2.0.1</version>
 <scope>provided</scope>
</dependency>



<repositories>
    <repository>
        <id>spark-job-server</id>
        <name>Job Server Bintray</name>
        <url>https://dl.bintray.com/spark-jobserver/maven</url>
    </repository>
</repositories>

You may need to exclude following to avoid conflicts with spark-core sub dependency.

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aws</artifactId>
    <version>2.7.1</version>
    <scope>provided</scope>
    <exclusions>
        <exclusion>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.1</version>
    <scope>provided</scope>
</dependency>

In your code you need to set, for using s3a://. By default, they are not configured, and you probably encounter Null Pointer Exception with less helpful message. But I found this blog is useful. Notice that I omit access key and secrete key, since I am relying on IAM provided on my instances.

sqlContext.sparkContext.hadoopConfiguration.set(“fs.s3a.buffer.dir”, “/tmp”)
sqlContext.sparkContext.hadoopConfiguration.set(“fs.s3a.impl”, “org.apache.hadoop.fs.s3a.S3AFileSystem”)

Make you code running and pass unit test. Then package it as a jar, by mvn package

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4.2</version>
    <configuration>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Since libraries for aws and avro are “provided”, simply add for SJS docker.

--packages com.databricks:spark-avro_2.10:2.0.1,org.apache.hadoop:hadoop-aws:2.7.1"

Yeah, that’s pretty much about it. And happy sparking.

 

 

 

Tagged , , ,