Launching and managing applications for Spark and PySpark
There are multiple ways to run a Spark or PySpark job in a Yandex Data Processing cluster:
- Spark Shell (a command shell for Scala and Python). This method runs calculations line by line rather than using a script. For more information about Spark Shell, see the Spark documentation
. - spark-submit script. It saves the calculation results to HDFS. For more information about
spark-submit, see the Spark documentation . - Yandex Cloud CLI commands. These allow you to save calculation results not only to HDFS but also to a Yandex Object Storage bucket.
Below is an example demonstrating the calculation of 2018 US air traffic statistics based on data from transtats.bts.govyc-mdb-examples.
Getting started
Set up the infrastructure:
-
Create a network named
data-proc-network. Disable Create subnets when creating it. -
In
data-proc-network, create a subnet with the following parameters:- Name:
data-proc-subnet-a - Availability zone:
ru-central1-a - CIDR:
192.168.1.0/24
- Name:
-
Create a NAT gateway and a route table named
data-proc-route-tableindata-proc-network. Associate the table withdata-proc-subnet-a: -
In
data-proc-network, create a security group nameddata-proc-security-groupwith the following rules:-
One rule for incoming and another one for outgoing service traffic:
- Port range:
0-65535 - Protocol:
Any - Source/Destination name:
Security group - Security group:
Current
- Port range:
-
Rule for incoming traffic, for online access to subcluster hosts:
- Port range:
22 - Protocol:
TCP - Source:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
Rule for outgoing HTTPS traffic:
- Port range:
443 - Protocol:
TCP - Destination name:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
Rule for outgoing HTTP traffic:
- Port range:
80 - Protocol:
TCP - Destination name:
CIDR - CIDR blocks:
0.0.0.0/0
- Port range:
-
-
Create a service account named
data-proc-sawith the following roles: -
Create a Yandex Object Storage bucket named
data-proc-bucketwith restricted access. -
Grant
READ and WRITEpermissions fordata-proc-bucketto thedata-proc-saservice account. -
Create a Yandex Data Processing cluster in any suitable configuration with the following settings:
- Environment:
PRODUCTION. - Service account:
data-proc-sa. - Availability zone:
ru-central1-a. - Bucket name:
data-proc-bucket. - Network:
data-proc-network. - Security groups:
data-proc-security-group. - Public access for subclusters: Provided.
- Environment:
-
If you do not have Terraform yet, install it.
-
Get the authentication credentials. You can add them to environment variables or specify them later in the provider configuration file.
-
Configure and initialize a provider. There is no need to create a provider configuration file manually, you can download it
. -
Place the configuration file in a separate working directory and specify the parameter values. If you did not add the authentication credentials to environment variables, specify them in the configuration file.
-
Download the data-proc-for-spark-jobs.tf
configuration file to the same working directory.This file describes:
- Network.
- Subnet.
- NAT gateway and route table.
- Security groups.
- Service account to work with cluster resources.
- Bucket to store job dependencies and results.
- Yandex Data Processing cluster.
-
In the
data-proc-for-spark-jobs.tfconfiguration file, specify the required parameters. -
Make sure the Terraform configuration files are correct using this command:
terraform validateTerraform will display any configuration errors detected in your files.
-
Create the required infrastructure:
-
Run this command to view the planned changes:
terraform planIf you described the configuration correctly, the terminal will display a list of the resources to update and their parameters. This is a verification step that does not apply changes to your resources.
-
If everything looks correct, apply the changes:
-
Run this command:
terraform apply -
Confirm updating the resources.
-
Wait for the operation to complete.
-
All the required resources will be created in the specified folder. You can check resource availability and their settings in the management console
. -
Using Spark Shell
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Run Spark Shell on the master host:
/usr/bin/pysparkThe number of cores and executors is only limited by your Yandex Data Processing cluster configuration.
-
Enter the following code line by line:
sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01")The last line reads the data from the public bucket containing the sample dataset. After this line is run, a dataframe named
dfwith the read data will be available in the current session. -
To see the schema of the dataframe you got, run this command:
df.printSchema()The terminal will display a list of columns with their types.
-
Calculate flight statistics by month and find the top ten cities by number of departures:
-
Number of flights by month:
df.groupBy("Month").count().orderBy("Month").show() -
Top ten cities by number of departures:
df.groupBy("OriginCityName").count().orderBy("count", ascending=False).show(10)
-
Using Spark Submit
Spark Submit allows you to run pre-written applications using the spark-submit script. In this example, we will calculate the number of flights by month.
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Create a file named
month_stat.pywith the following code on the master host:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): conf = SparkConf().setAppName("Month Stat - Python") conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") defaultFS = sc._jsc.hadoopConfiguration().get("fs.defaultFS") month_stat = df.groupBy("Month").count() month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") if __name__ == "__main__": main() -
Run the application:
/usr/bin/spark-submit month_stat.py -
The result will be exported to HDFS. You can list the files you got using this command:
hdfs dfs -ls /tmp/month_stat
This example describes how to build and run an application using Scala
To create and launch a Spark application:
-
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Install
sbt. It comes bundled with the Scala programming language. -
Create a folder, e.g.,
spark-app. -
Add the file with the
./src/main/scala/app.scalapath to the created folder. -
Pate the following code to the
app.scalafile:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) sc.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet("s3a://yc-mdb-examples/dataproc/example01/set01") val month_stat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") month_stat.repartition(1).write.format("csv").save(defaultFS+"/tmp/month_stat") sc.stop() } } -
Prepare the data for building your application:
-
To find out your version of Scala, run the
scala -versioncommand. -
To find out your
spark-coreandspark-sqlversions, check the contents of the/usr/lib/spark/jarsfolder:ls /usr/lib/spark/jarsThe versions are specified in the names of JAR files. Here is an example:
spark-core_2.12-3.0.3.jar spark-sql_2.12-3.0.3.jarThe version you need is
3.0.3. -
In the
spark-appfolder, create a file namedbuild.sbtwith this configuration:scalaVersion := "<Scala_version>" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "<spark-core_version>" % "provided", "org.apache.spark" %% "spark-sql" % "<spark-sql_version>" % "provided" )Here is an example:
scalaVersion := "2.12.10" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "3.0.3" % "provided", "org.apache.spark" %% "spark-sql" % "3.0.3" % "provided" )
-
-
Compile and build your JAR file:
sbt compile && sbt package -
Get the name of the JAR file you built:
ls ~/spark-app/target/scala-<Scala_version>Result:
spark-app_2.12-0.1.0-SNAPSHOT.jar. -
Launch the application you got:
/usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main target/scala-<Scala_version>/<name_of_built_JAR_file>Here is an example:
/usr/bin/spark-submit --class com.yandex.cloud.dataproc.scala.Main target/scala-2.12/spark-app_2.12-0.1.0-SNAPSHOT.jar -
The result will be exported to HDFS. You can list the files you got using this command:
hdfs dfs -ls /tmp/month_stat
Terminating the application
By default, the resources of the running application are managed by the YARN component. If you need to terminate or remove the application from the queue, use the yarn utility:
-
List the applications:
yarn application -list -
Terminate the application you no longer need:
yarn application -kill <app_ID>
For more information about YARN commands, see YARN Commands
Running jobs using the Yandex Cloud CLI
Jobs are run from the Yandex Cloud CLI with the help of a Yandex Data Processing agent installed on the cluster master host. The agent gets job parameters through the Yandex Data Processing API.
The executable file and its dependencies must be located in a storage accessible to the Yandex Data Processing cluster service account. The executed application itself must have access to the storages in which the source data set and execution results are saved.
You can save the calculation results to HDFS on the Yandex Data Processing cluster or data-proc-bucket bucket you specified when creating the cluster.
All service and debugging information will be saved to data-proc-bucket. For each job, the Yandex Data Processing agent will create a separate folder at this path: dataproc/clusters/<cluster_ID>/jobs/<job_ID>.
Note
You can view the job logs and search data in them using Yandex Cloud Logging. For more information, see Working with logs.
Below are the two application versions, one for Python and one for Scala.
Running a PySpark job
To run a PySpark job:
Install additional dependencies
On a local computer:
-
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the
yc config set folder-id <folder_ID>command. You can also set a different folder for any specific command using the--folder-nameor--folder-idparameter. -
Install and configure the S3cmd console client to work with Yandex Object Storage.
-
Install Python. Make sure the Python version matches the version available from the image. You can check the version under Runtime environment. For image version 2.0, use Python 3.8.10:
sudo apt update && sudo apt install python3.8
Prepare and run a PySpark job
-
Create a file named
job.pywith this code:import sys from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext def main(): if len(sys.argv) != 3: print('Usage job.py <input_directory> <output_directory>') sys.exit(1) in_dir = sys.argv[1] out_dir = sys.argv[2] conf = SparkConf().setAppName('Month Stat - Python') sc = SparkContext(conf=conf) sql = SQLContext(sc) df = sql.read.parquet(in_dir) month_stat = df.groupBy('Month').count() job_id = dict(sc._conf.getAll())['spark.yarn.tags'].replace('dataproc_job_', '') if out_dir.startswith('s3a://'): month_stat.repartition(1).write.format('csv').save(out_dir + job_id) else: default_fs = sc._jsc.hadoopConfiguration().get('fs.defaultFS') month_stat.repartition(1).write.format('csv').save(default_fs + out_dir + job_id) if __name__ == '__main__': main() -
To make sure PySpark can access your code, upload the
job.pyfile to the Object Storage bucket the Yandex Data Processing cluster service account has access to:s3cmd put ./job.py s3://data-proc-bucket/bin/ -
Run the job.
The run command varies depending on whether you want to save the job results to Object Storage or to HDFS.
Object StorageHDFS directoryyc dataproc job create-pyspark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-python-file-uri="s3a://data-proc-bucket/bin/job.py" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="s3a://data-proc-bucket/jobs_results/"In the command, specify the following:
--cluster-id: Cluster ID. You can get it with the list of clusters in the folder.--name: Any Spark job name.
A CSV file with the result will be saved to
data-proc-bucket.yc dataproc job create-pyspark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-python-file-uri="s3a://data-proc-bucket/bin/job.py" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="tmp/jobs/"In the command, specify the following:
--cluster-id: Cluster ID. You can get it with the list of clusters in the folder.--name: Any Spark job name.
A CSV file with the result will be saved to the
/tmp/jobs/<job_ID>/folder in HDFS. -
Optionally, check the job logs:
yc dataproc job log <job_ID> --cluster-id=<cluster_ID>
Running a Spark job
To run a Spark job:
- Install additional dependencies.
- Build a Scala application.
- Upload the JAR file to Object Storage.
- Run the Spark job in the Yandex Data Processing cluster.
Install additional dependencies
-
If you do not have the Yandex Cloud CLI installed yet, install and initialize it.
By default, the CLI uses the folder specified when creating the profile. To change the default folder, use the
yc config set folder-id <folder_ID>command. You can also set a different folder for any specific command using the--folder-nameor--folder-idparameter. -
Use SSH to connect to the Yandex Data Processing cluster's master host.
-
Install
sbt, the standard build utility for Scala. It comes bundled with the Scala programming language. -
Install and configure the S3cmd console client to work with Yandex Object Storage.
Build a Scala application
To streamline dependency management, build the application to a single JAR file (fat JAR) using the sbt-assembly
-
Create a folder named
spark-appwith theprojectandsrc/main/scalasubfolders. -
Create a file named
spark-app/project/plugins.sbtwhich describes thesbt-assemblyconnection for building a single JAR file:addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "<sbt-assembly_version>")You can check the
sbt-assemblyversion in the plugin repository under Releases. -
Run the
scala -versioncommand to get the version of Scala installed on your system. -
Create a file named
spark-app/build.sbtwith a description of the dependencies and the strategy for merging them into a single JAR file. Specify the Scala version in thebuild.sbtfile:scalaVersion := "<Scala_version>" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.4.4", "org.apache.spark" %% "spark-sql" % "2.4.4", ) assembly / assemblyMergeStrategy := { case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last case PathList("javax", "inject", xs @ _*) => MergeStrategy.last case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("javax", "activation", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("com", "google", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case "overview.html" => MergeStrategy.last case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last case "plugin.properties" => MergeStrategy.last case "log4j.properties" => MergeStrategy.last case "git.properties" => MergeStrategy.last case x => val oldStrategy = (assembly / assemblyMergeStrategy).value oldStrategy(x) } -
Create the
spark-app/src/main/scala/app.scalafile with the app code:package com.yandex.cloud.dataproc.scala import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext object Main { def main(args: Array[String]) { if (args.length != 2){ //check the argument System.err.println("Usage spark-app.jar <input_directory> <output_directory>"); System.exit(-1); } val inDir = args(0); //URI to the input data val outDir = args(1); //URI of the output directory val conf = new SparkConf().setAppName("Month Stat - Scala App") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.parquet(inDir) val monthStat = df.groupBy("Month").count() val defaultFS = sc.hadoopConfiguration.get("fs.defaultFS") //get the HDFS server endpoint val jobId = conf.get("spark.yarn.tags").replace("dataproc_job_", ""); //get the job ID if (outDir.toLowerCase().startsWith("s3a://")) { monthStat.repartition(1).write.format("csv").save(outDir + jobId) } else { monthStat.repartition(1).write.format("csv").save(defaultFS + "/" + outDir + jobId) } sc.stop() } } -
Start the app build in the
spark-appfolder:sbt clean && sbt compile && sbt assemblyIf you get
Error looking up function 'stat'If you get the
java.lang.UnsatisfiedLinkError: Error looking up function 'stat': java: undefined symbol: staterror and your master host OS is Ubuntu, run eachsbtcommand with the-Dsbt.io.jdktimestamps=trueflag:sbt clean -Dsbt.io.jdktimestamps=true && \ sbt compile -Dsbt.io.jdktimestamps=true && \ sbt assembly -Dsbt.io.jdktimestamps=true
The file will be available at the following path: spark-app/target/scala-<Scala_version>/spark-app-assembly-0.1.0-SNAPSHOT.jar.
Upload the JAR file to Object Storage
For Spark to have access to the JAR file you built, upload the file to data-proc-bucket. You can upload the file using s3cmd:
s3cmd put ~/spark-app/target/scala-<Scala_version>/spark-app-assembly-0.1.0-SNAPSHOT.jar s3://data-proc-bucket/bin/
The file will be uploaded to s3://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar.
Run the Spark job in the Yandex Data Processing cluster
-
Disconnect from the cluster master host.
-
Run the job.
The run command varies depending on whether you want to save the job results to Object Storage or to HDFS.
Object StorageHDFS directoryyc dataproc job create-spark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-class="com.yandex.cloud.dataproc.scala.Main" \ --main-jar-file-uri="s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="s3a://data-proc-bucket/jobs_results/"In the command, specify the following:
--cluster-id: Cluster ID. You can get it with the list of clusters in the folder.--name: Any Spark job name.
A CSV file with the result will be saved to
data-proc-bucket.yc dataproc job create-spark \ --cluster-id=<cluster_ID> \ --name=<job_name> \ --main-class="com.yandex.cloud.dataproc.scala.Main" \ --main-jar-file-uri="s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar" \ --args="s3a://yc-mdb-examples/dataproc/example01/set01" \ --args="tmp/jobs/"In the command, specify the following:
--cluster-id: Cluster ID. You can get it with the list of clusters in the folder.--name: Any Spark job name.
A CSV file with the result will be saved to the
/tmp/jobs/<job_ID>/folder in HDFS.Example of a message saying that the job was run successfully:
done (1m2s) id: {your_job_id} cluster_id: {your_cluster_id} name: test02 status: DONE spark_job: args: - s3a://yc-mdb-examples/dataproc/example01/set01 - s3a://data-proc-bucket/jobs_results/ main_jar_file_uri: s3a://data-proc-bucket/bin/spark-app-assembly-0.1.0-SNAPSHOT.jar main_class: com.yandex.cloud.dataproc.scala.Main
Delete the resources you created
Some resources incur charges. To avoid unnecessary expenses, delete the resources you no longer need:
- Delete the Yandex Data Processing cluster.
- If you reserved public static IP addresses, release and delete them.
- Delete the subnet.
- Delete the route table.
- Delete the NAT gateway.
- Delete the network.
-
In the terminal window, go to the directory containing the infrastructure plan.
Warning
Make sure the directory has no Terraform manifests with the resources you want to keep. Terraform deletes all resources that were created using the manifests in the current directory.
-
Delete resources:
-
Run this command:
terraform destroy -
Confirm deleting the resources and wait for the operation to complete.
All the resources described in the Terraform manifests will be deleted.
-