Posts Tagged ‘Apache-Spark’
PySpark starter for ten
I just threw this together and I’m putting it here mainly in case I need it later. It might come in handy for others too…
So you have a new Spark installation against a yarn cluster, you want to run something simple on it (akin to hello World) to see if it does anything. Try copying and pasting this into your bash shell:
echo "from pyspark import SparkContext, HiveContext, SparkConf" > sparking.py echo "conf = SparkConf().setAppName('sparking')" >> sparking.py echo 'conf.set("spark.sql.parquet.binaryAsString", "true")' >> sparking.py echo "sc = SparkContext(conf=conf)" >> sparking.py echo "sqlContext = HiveContext(sc)" >> sparking.py echo "l = [('Alice', 1)]" >> sparking.py echo "rdd = sc.parallelize(l)" >> sparking.py echo "for x in rdd.take(10):" >> sparking.py echo " print x" >> sparking.py spark-submit --master yarn --deploy-mode cluster --supervise --name "sparking" sparking.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
echo "from pyspark import SparkContext, HiveContext, SparkConf" > sparking.py | |
echo "conf = SparkConf().setAppName('sparking')" >> sparking.py | |
echo 'conf.set("spark.sql.parquet.binaryAsString", "true")' >> sparking.py | |
echo "sc = SparkContext(conf=conf)" >> sparking.py | |
echo "sqlContext = HiveContext(sc)" >> sparking.py | |
echo "l = [('Alice', 1)]" >> sparking.py | |
echo "rdd = sc.parallelize(l)" >> sparking.py | |
echo "for x in rdd.take(10):" >> sparking.py | |
echo " print x" >> sparking.py | |
spark-submit –master yarn –deploy-mode cluster –supervise –name "sparking" sparking.py | |
If it runs you should see something like this at the end of the yarn log:
Log Type: stdout Log Upload Time: Thu Jan 05 14:56:09 +0000 2017 Log Length: 13 ('Alice', 1)
Creating a Spark dataframe containing only one column
I’ve been doing lots of Apache Spark development using Python (aka PySpark) recently, specifically Spark SQL (aka the dataframes API), and one thing I’ve found very useful to be able to do for testing purposes is create a dataframe from literal values. The documentation at pyspark.sql.SQLContext.createDataFrame() covers this pretty well however the code there describes how to create a dataframe containing more than one column like so:
l = [('Alice', 1)] sqlContext.createDataFrame(l).collect() # returns [Row(_1=u'Alice', _2=1)] sqlContext.createDataFrame(l, ['name', 'age']).collect() # returns [Row(name=u'Alice', age=1)]
For simple testing purposes I wanted to create a dataframe that has only one column so you might think that the above code could be amended simply like so:
l = [('Alice')] sqlContext.createDataFrame(l).collect() sqlContext.createDataFrame(l, ['name']).collect()
but unfortunately that throws an error:
TypeError: Can not infer schema for type: <type 'str'>
The reason is simple,
('Alice', 1)
returns a tuple whereas
('Alice')
returns a string.
type(('Alice',1)) # returns tuple type(('Alice')) #returns str
The latter causes an error because createDataFrame() only creates a dataframe from a RDD of tuples, not a RDD of strings.
There is a very easy fix which will be obvious to any half-decent Python developer, unfortunately that’s not me so I didn’t stumble on the answer immediately. Its possible to create a one-element tuple by including an extra comma like so:
type(('Alice',)) # returns tuple
hence the earlier failing code can be adapted to this:
l = [('Alice',)] sqlContext.createDataFrame(l).collect() # returns [Row(_1=u'Alice')] sqlContext.createDataFrame(l, ['name']).collect() # returns [Row(name=u'Alice')]
It took me far longer than it should have done to figure that out!
Here is another snippet that creates a dataframe from literal values without letting Spark infer the schema (behaviour which, I believe, is deprecated anyway):
from pyspark.sql.types import * schema = StructType([StructField("foo", StringType(), True)]) l = [('bar1',),('bar2',),('bar3',)] sqlContext.createDataFrame(l, schema).collect() # returns: [Row(foo=u'bar1'), Row(foo=u'bar2'), Row(foo=u'bar3')]
or, if you don’t want to use the one-element tuple workaround that I outlined above and would rather just pass a list of strings:
from pyspark.sql.types import * from pyspark.sql import Row schema = StructType([StructField("foo", StringType(), True)]) l = ['bar1','bar2','bar3'] rdd = sc.parallelize(l).map (lambda x: Row(x)) sqlContext.createDataFrame(rdd, schema).collect() # returns [Row(foo=u'bar1'), Row(foo=u'bar2'), Row(foo=u'bar3')]
Happy sparking!