Skip to content Skip to sidebar Skip to footer

How Do I Add A New Column To A Spark DataFrame (using PySpark)?

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column. I've tried the following without any success: type(randomed_hours) # => list # Create in Pyth

Solution 1:

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

transforming an existing column:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

included using join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

or generated with function / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

If you want to add content of an arbitrary RDD as a column you can


Solution 2:

To add a column using a UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

Solution 3:

For Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

Solution 4:

There are multiple ways we can add a new column in pySpark.

Let's first create a simple DataFrame.

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

Now let's try to double the column value and store it in a new column. PFB few different approaches to achieve the same.

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

For more examples and explanation on spark DataFrame functions, you can visit my blog.

I hope this helps.


Solution 5:

To add new column with some custom value or dynamic value calculation which will be populated based on the existing columns.

e.g.

|ColumnA | ColumnB |
|--------|---------|
| 10     | 15      |
| 10     | 20      |
| 10     | 30      |

and new ColumnC as ColumnA+ColumnB

|ColumnA | ColumnB | ColumnC|
|--------|---------|--------|
| 10     | 15      | 25     |
| 10     | 20      | 30     |
| 10     | 30      | 40     |

using

#to add new column
def customColumnVal(row):
rd=row.asDict()
rd["ColumnC"]=row["ColumnA"] + row["ColumnB"]

new_row=Row(**rd)
return new_row
----------------------------
#convert DF to RDD
df_rdd= input_dataframe.rdd

#apply new fucntion to rdd
output_dataframe=df_rdd.map(customColumnVal).toDF()

input_dataframe is the dataframe which will get modified and customColumnVal function is having code to add new column.


Post a Comment for "How Do I Add A New Column To A Spark DataFrame (using PySpark)?"