First: we lower the default shuffle partitions to make application more responsive with small data.

In [1]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

Now we create a DataFrame from a CSV file.

In [2]:
olympia = spark.read \
    .option('inferSchema', 'true') \
    .option('header', 'true') \
    .csv('hdfs:///user/daniel/olympia.csv')
    
olympia.select(['country', 'year', 'medals']).show(4)

+-----------+----+------+
|    country|year|medals|
+-----------+----+------+
|Afghanistan|2008|     1|
|Afghanistan|2012|     1|
|Afghanistan|2016|     0|
|    Albania|1996|     0|
+-----------+----+------+
only showing top 4 rows



Let's see some statistics.

In [3]:
olympia.describe('country', 'year', 'medals').show()

+-------+-----------+------------------+-----------------+
|summary|    country|              year|           medals|
+-------+-----------+------------------+-----------------+
|  count|       1040|              1040|             1040|
|   mean|       null| 2006.373076923077|5.133653846153846|
| stddev|       null|6.7388553528508846|13.69320177659335|
|    min|Afghanistan|              1996|                0|
|    max|   Zimbabwe|              2016|              121|
+-------+-----------+------------------+-----------------+



Let's inspect the schema.

In [4]:
olympia.columns

['country',
 'iso3',
 'year',
 'medals',
 'host',
 'gdp_cap',
 'pop',
 'region',
 'continent',
 'geographic area',
 'exports',
 'gdp growth rate',
 'gdp per capita',
 'inflation rate',
 'unemployment rate',
 'gross national saving',
 'public debt',
 'population growth rate',
 'life expectancy at birth',
 'net migration rate',
 'obesity',
 'education expenditures',
 'health expenditures',
 'electricity consumption',
 'oil imports',
 'internet users',
 'military expenditures']

In [5]:
olympia.dtypes

[('country', 'string'),
 ('iso3', 'string'),
 ('year', 'int'),
 ('medals', 'int'),
 ('host', 'int'),
 ('gdp_cap', 'double'),
 ('pop', 'int'),
 ('region', 'int'),
 ('continent', 'int'),
 ('geographic area', 'double'),
 ('exports', 'decimal(13,0)'),
 ('gdp growth rate', 'double'),
 ('gdp per capita', 'int'),
 ('inflation rate', 'double'),
 ('unemployment rate', 'double'),
 ('gross national saving', 'double'),
 ('public debt', 'double'),
 ('population growth rate', 'double'),
 ('life expectancy at birth', 'double'),
 ('net migration rate', 'double'),
 ('obesity', 'double'),
 ('education expenditures', 'double'),
 ('health expenditures', 'double'),
 ('electricity consumption', 'decimal(13,0)'),
 ('oil imports', 'double'),
 ('internet users', 'int'),
 ('military expenditures', 'double')]

In [6]:
olympia.printSchema()

root
 |-- country: string (nullable = true)
 |-- iso3: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- medals: integer (nullable = true)
 |-- host: integer (nullable = true)
 |-- gdp_cap: double (nullable = true)
 |-- pop: integer (nullable = true)
 |-- region: integer (nullable = true)
 |-- continent: integer (nullable = true)
 |-- geographic area: double (nullable = true)
 |-- exports: decimal(13,0) (nullable = true)
 |-- gdp growth rate: double (nullable = true)
 |-- gdp per capita: integer (nullable = true)
 |-- inflation rate: double (nullable = true)
 |-- unemployment rate: double (nullable = true)
 |-- gross national saving: double (nullable = true)
 |-- public debt: double (nullable = true)
 |-- population growth rate: double (nullable = true)
 |-- life expectancy at birth: double (nullable = true)
 |-- net migration rate: double (nullable = true)
 |-- obesity: double (nullable = true)
 |-- education expenditures: double (nullable = true)
 |-- health expendit

Let's compute some correlations. Note how we can use simple python for loops to iterate.

In [8]:
interesting_columns = ['exports', 'geographic area', 'region', 'pop', 'education expenditures']

for col_name in interesting_columns:
    print("medals-%s: %f" % (col_name, abs(olympia.stat.corr('medals', col_name))))


medals-exports: 0.647598
medals-geographic area: 0.532670
medals-region: 0.362122
medals-pop: 0.415016
medals-education expenditures: 0.030358


Create a new DataFrame without the entries from 2016.

In [9]:
olympia_pre16 = olympia.filter(olympia['year'] < 2016)

Let's see if it worked by grouping all years of a country and keeping only the maximum year of any entry for a country.

In [10]:
from pyspark.sql.functions import max

max_years = olympia_pre16['country', 'year'].groupBy('country').agg(max('year'))

Select a random sample and print it to confirm there is no year >= 2016

In [11]:
max_years.sample(False, 0.1).orderBy('country').show() #select (about) 10% of the records without replacement at random.

+-------------------+---------+
|            country|max(year)|
+-------------------+---------+
|Antigua and Barbuda|     2012|
|             Brunei|     2004|
|              Chile|     2012|
|              China|     2012|
|               Cuba|     2012|
|           Dominica|     2012|
|          Hong Kong|     2012|
|            Jamaica|     2012|
|            Liberia|     2012|
|           Mongolia|     2012|
|            Nigeria|     2012|
|           Paraguay|     2012|
|            Romania|     2012|
|         Seychelles|     2012|
|             Turkey|     2012|
|            Ukraine|     2012|
+-------------------+---------+



How about calculating the median medals for each country? We will use SQL for this. Note: 50% percentile = median.

In [12]:
olympia_pre16.registerTempTable("olympia_pre16") # allows to run SQL queries

median_medals = spark.sql("""
SELECT country, percentile_approx(medals, 0.5) AS median_medals 
FROM olympia_pre16
GROUP BY country
""") 

median_medals.sample(False, 0.1).orderBy('country').show()

+--------------+-------------+
|       country|median_medals|
+--------------+-------------+
|       Croatia|          5.0|
|       Finland|          4.0|
|       Germany|         49.0|
|     Lithuania|          5.0|
|         Spain|         17.0|
|    Tajikistan|          0.0|
|          Togo|          0.0|
|         Tonga|          0.0|
|United Kingdom|         30.0|
|       Uruguay|          0.0|
+--------------+-------------+



Let's create a new data frame with the median column.

In [13]:
olympia_pre16_median = olympia_pre16.join(median_medals, 'country')

olympia_pre16_median.select(['country', 'year', 'median_medals']).show()

+-------------------+----+-------------+
|            country|year|median_medals|
+-------------------+----+-------------+
|        Afghanistan|2008|          1.0|
|        Afghanistan|2012|          1.0|
|            Albania|1996|          0.0|
|            Albania|2000|          0.0|
|            Albania|2004|          0.0|
|            Albania|2008|          0.0|
|            Albania|2012|          0.0|
|            Algeria|1996|          2.0|
|            Algeria|2000|          2.0|
|            Algeria|2004|          2.0|
|            Algeria|2008|          2.0|
|            Algeria|2012|          2.0|
|Antigua and Barbuda|1996|          0.0|
|Antigua and Barbuda|2000|          0.0|
|Antigua and Barbuda|2004|          0.0|
|Antigua and Barbuda|2008|          0.0|
|Antigua and Barbuda|2012|          0.0|
|            Armenia|2000|          1.0|
|            Armenia|2004|          1.0|
|            Armenia|2008|          1.0|
+-------------------+----+-------------+
only showing top

Example of accessing the RDD view of a DataFrame. Perform a WordCount.

In [14]:
olympia_pre16_median.rdd \
    .map(lambda x: (x[0], 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()

[('Afghanistan', 2),
 ('Albania', 5),
 ('Algeria', 5),
 ('Antigua and Barbuda', 5),
 ('Armenia', 4),
 ('Australia', 5),
 ('Austria', 5),
 ('Azerbaijan', 4),
 ('Bahamas', 5),
 ('Bahrain', 5),
 ('Bangladesh', 5),
 ('Barbados', 5),
 ('Belarus', 4),
 ('Belgium', 5),
 ('Belize', 5),
 ('Benin', 5),
 ('Bermuda', 5),
 ('Bhutan', 5),
 ('Bolivia', 5),
 ('Bosnia and Herzegovina', 5),
 ('Botswana', 5),
 ('Brazil', 5),
 ('Brunei', 2),
 ('Bulgaria', 5),
 ('Burkina Faso', 5),
 ('Burundi', 4),
 ('Cambodia', 4),
 ('Cameroon', 5),
 ('Canada', 5),
 ('Cape Verde', 4),
 ('Central African Republic', 5),
 ('Chad', 5),
 ('Chile', 5),
 ('China', 5),
 ('Colombia', 5),
 ('Comoros', 4),
 ('Congo Dem. Rep.', 5),
 ('Congo Rep.', 5),
 ('Costa Rica', 5),
 ('Cote d`Ivoire', 5),
 ('Croatia', 5),
 ('Cuba', 5),
 ('Cyprus', 5),
 ('Czech Republic', 4),
 ('Denmark', 5),
 ('Djibouti', 3),
 ('Dominica', 4),
 ('Dominican Republic', 5),
 ('Ecuador', 5),
 ('Egypt', 5),
 ('El Salvador', 5),
 ('Equatorial Guinea', 5),
 ('Eritrea',