Pandas - Pipe Method

2019-12-14
Pipe in Pandas

What’s pipe?

Pipe is a method in pandas.DataFrame capable of passing existing functions from packages or self-defined functions to dataframe. It is part of the methods that enable method chaining. By using pipe, multiple processes can be combined with method chaining without nesting. Let’s look into an example here to show its benefits.

In the documentation of pandas, there are 3 functions: h(df), g(df,arg1=a), f(df,arg2=b, arg3=c) applied on df in this order. Usually, three functions are nested in the sequence of calling. It is hard to read the functions & arguments at first glance. By method chaining, the relationships among operations can be shown in a clearer format.

# Nested functions
f(g(h(df), arg1=a), arg2=b, arg3=c)

# Method chaining
(
df.pipe(h)
 pipe(g, arg1=a)
 pipe(f, arg2=b, arg3=c)
)

Let’s use online shipping as an example to show a different approach combining multiple processes in a row. There are 5 functions add_to_cart, checkout, shipping, billing, and place_order used to complete a transaction by customers. The following two examples shows common ways of calling multiple functions consecutively.

The first one that uses nested functions heavily is hard to read without proper formatting and hard to recognize the argument of each function. From my personal experience, the second one is harder for me as it requires giving meaningful names to the intermediate results otherwise hard to be recognized later. Also, the intermediate results are sometimes one-time results and not used in the later part of process. That’s why I wouldn’t choose this way if I have alternatives.

# 1. Nested functions
place_order(
      billing(
        shipping(
            checkout(add_to_cart(items)),
            "address"),
        "credit_card"))

# 2. Save all the intermediate results
cart = add_to_cart(items)
new_order = checkout(cart)
shipping_info = shipping(new_order, "address")
billing_info = billing(shipping_info, "credit_card")
completed_order = place_order(billing_info)

For the same process, using method chaining/pipe makes the process readable and easily recognizable the argument of each function call. It clearly shows the sequence of the execution and the arguments without the need to nest.

# Method chaining
items.pipe(add_to_cart)
     .pipe(checkout)
     .pipe(shipping,"address")
     .pipe(billing, "credit_card")
     .pipe(place_order)

This situation is common in data science as there are numerous processes involved in data manipulation. Oftentimes, the intermediate results are not important since the goal of data manipulation is to get the final data clean.

Examples

Let’s look into several examples to see how pipe really works. Here, a dataframe containing student name, subject and score is randomly generated.

import pandas as pd
import numpy as np

# Set seed
np.random.seed(520)

# Create a dataframe
df = pd.DataFrame({
    'name': ['Ted'] * 3 + ['Lisa'] * 3 + ['Sam'] * 3,
    'subject': ['math', 'physics', 'history'] * 3,
    'score': np.random.randint(60, 100, 9)
})
index name subject score
0 Ted math 87
1 Ted physics 80
2 Ted history 75
3 Lisa math 79
4 Lisa physics 78
5 Lisa history 77
6 Sam math 85
7 Sam physics 61
8 Sam history 88

To get rank by subject in a line

Return pandas dataframe

The goal is to get the rank of every subject in one line and append it to the original dataframe. Thus, a function - get_subject_rank - is created to complete this task. By passing the functions, the rank is appended to the original dataframe.

def get_subject_rank(input_df):
    # Avoid overwrite the original dataframe
    input_df = input_df.copy()
    input_df['subject_rank'] = (input_df
                                .groupby(['subject'])['score']
                                .rank(ascending=False))
    return input_df

# pipe method
df.pipe(get_subject_rank)
index name subject score subject_rank
0 Ted math 87 1
1 Ted physics 80 1
2 Ted history 75 3
3 Lisa math 79 3
4 Lisa physics 78 2
5 Lisa history 77 2
6 Sam math 85 2
7 Sam physics 61 3
8 Sam history 88 1

Return pandas series

Pipe can return arbitrary outputs when defined in functions. In the following example, the function returns pandas series once df_or_not = False. Other arguments needs to be specified in the calling in pipe when functions have more than one arguments, also shown in the example below.

def get_subject_rank(input_df, df_or_not=True):
    # Avoid overwrite the original dataframe
    input_df = input_df.copy()
    if df_or_not is True:
        input_df['subject_rank'] = (input_df
                                    .groupby(['subject'])['score']
                                    .rank(ascending=False))
        return input_df
    else:
        output_series = (input_df
                         .groupby(['subject'])['score']
                         .rank(ascending=False))
        return output_series

# pipe method - return arbitary output
df.pipe(get_subject_rank, df_or_not = False)
## 0    1.0
## 1    1.0
## 2    3.0
## 3    3.0
## 4    2.0
## 5    2.0
## 6    2.0
## 7    3.0
## 8    1.0
## Name: score, dtype: float64

Data is not the first argument

When calling functions in pipe, the first argument of the function by default is the dataframe/series applied by pipe. Here is an example of a function that modifies scores - add_score. The first argument - input_df - is df. There is no need to specify input_df in the calling in pipe.

def add_score(input_df, added_score):
    # Avoid overwrite the original dataframe
    input_df = input_df.copy()
    input_df = input_df.assign(new_score=lambda x: x.score+added_score)
    return input_df

df.pipe(add_score, 2)
index name subject score new_score
0 Ted math 87 89
1 Ted physics 80 82
2 Ted history 75 77
3 Lisa math 79 81
4 Lisa physics 78 80
5 Lisa history 77 79
6 Sam math 85 87
7 Sam physics 61 63
8 Sam history 88 90

The two arguments of add_score are swapped with each other. In this case, df is the second argument in the calling. Thus, a tuple - (function, “the argument of data”) - is passed to point out that which argument is the data to apply the function on.

def add_score(added_score, input_df):
    # Avoid overwrite the original dataframe
    input_df = input_df.copy()
    input_df = input_df.assign(new_score=lambda x: x.score+added_score)
    return input_df

df.pipe((add_score, "input_df"), 2)
index name subject score new_score
0 Ted math 87 89
1 Ted physics 80 82
2 Ted history 75 77
3 Lisa math 79 81
4 Lisa physics 78 80
5 Lisa history 77 79
6 Sam math 85 87
7 Sam physics 61 63
8 Sam history 88 90

Debug in method chaining

Some critics might have concerns that it is hard to debug with long chaining processes due to the lack of intermediate results returned. No worries! In this post, the author had provided a great way to tackle this problem - decorators. A decorators is a function that extends the behavior of wrapped function without explicitly modifying it.

Let’s look into actual examples. By using decorators & logging together, any properties of dataframe can be returned in log files when specified in decorators. Here, shape & columns are returned using log_shape & log_columns. The logging information are also printed below for reference.

Note: wraps is used to eliminate the side effect of decorators so that the name, docstring, arguments list, etc. are carried after the usage of decorators.

from functools import wraps
import logging

def log_shape(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        logging.info("%s,%s" % (func.__name__, result.shape))
        return result
    return wrapper

def log_columns(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        result = func(*args, **kwargs)
        logging.info("%s,%s" % (func.__name__, result.columns))
        return result
    return wrapper

@log_columns
@log_shape
def get_subject_rank(input_df):
    input_df = input_df.copy()
    input_df['subject_rank'] = (input_df
                                .groupby(['subject'])['score']
                                .rank(ascending=False))
    return input_df

@log_columns
@log_shape
def add_score(input_df, added_score):
    input_df = input_df.copy()
    input_df = input_df.assign(new_score=lambda x: x.score+added_score)
    return input_df

(
    df.pipe(get_subject_rank)
      .pipe(add_score, 2)
)

The codes above are modified from Tom’s codes.

INFO - get_subject_rank,(9, 4)
INFO - get_subject_rank,Index(['name', 'subject', 'score', 'subject_rank'], dtype='object')
INFO - add_score,(9, 5)
INFO - add_score,Index(['name', 'subject', 'score', 'subject_rank', 'new_score'], dtype='object')

Pipe is a flexible method to accommodate customized functions during pandas operations. It is great that pandas has implemented lots of methods to enable method chaining during the data manipulation process. I enjoy exploring more possibility & efficient way to play with data in pandas.

More info about why chaining (Python) & pipe (R) are useful for data scientists can be found in this article by Tom Augspurger - one of the main contributors of pandas about method chaining and the chapter about pipes of R for data science