PolarsDuckDB

Polars & DuckDB: DataFrames and SQL For Python Without Pandas


1. Introduction

2. Getting Set Up On AWS with Docker

3. Intro To Polars

4. DuckDB To The Rescue For SQL

5. Conclusions

Introduction


In the last few years there has been an explosion of dataframe alternatives to Pandas due to its limitations. Even the original author, Wes McKinney, wrote a blog post about 10 Things I Hate About Pandas.

My biggest complaints about Pandas are:

  1. High memory usage
  2. Limited multi-core algorithms
  3. No ability to execute SQL statements (like SparkSQL & DataFrame)
  4. No query planning/lazy-execution
  5. NULL values only exist for floats not ints (this changed in Pandas 1.0+)
  6. Using strings is inefficient (this too changed in Pandas 1.0+

I should note that many of these issues have been addressed by the Pandas 2.0 release. And while there has been a steady march towards replacing the NumPy backend with Apache Arrow, I still feel the lack of SQL and overall API design is a major weakness of Pandas. Let me expand upon tha last point.

For context I have been using a Apache Spark since 2017 and love it not just from a performance point of view, but I also love how well the API is designed. The syntax makes sense coming from a SQL users perspective. If I want to group by a column and count in SQL or on Spark DataFrame I get what I expect either way: A single column with the count of each item the original dataframes/tables column. In Pandas, this is not the result.

For example using this datas set from NYC Open Data on Motor Vechicle Collisions, I can run a groupby-count expression on a Pandas DataFrame and I get:

In [1]:
import pandas as pd
pd_df = pd.read_csv("https://data.cityofnewyork.us/resource/h9gi-nx95.csv")
pd_df.groupby("borough").count()
Out[1]:
crash_date crash_time zip_code latitude longitude location on_street_name off_street_name cross_street_name number_of_persons_injured ... contributing_factor_vehicle_2 contributing_factor_vehicle_3 contributing_factor_vehicle_4 contributing_factor_vehicle_5 collision_id vehicle_type_code1 vehicle_type_code2 vehicle_type_code_3 vehicle_type_code_4 vehicle_type_code_5
borough
BRONX 107 107 107 107 107 107 59 59 48 107 ... 81 5 0 0 107 106 65 4 0 0
BROOKLYN 247 247 247 245 245 245 155 155 92 247 ... 192 24 7 2 247 242 157 22 7 2
MANHATTAN 98 98 98 96 96 96 52 52 46 98 ... 65 6 1 1 98 96 57 5 1 0
QUEENS 154 154 153 150 150 150 98 98 56 154 ... 120 9 2 0 154 154 97 7 2 0
STATEN ISLAND 27 27 27 26 26 26 18 18 9 27 ... 21 2 2 1 27 27 19 2 2 1

5 rows × 28 columns

Notice this is the number of non nulls in every column. Not exactly what I wanted.

To get what I want I have to use the syntax:

In [2]:
pd_df.groupby("borough").size() # or pd_df.value_counts()
Out[2]:
borough
BRONX            107
BROOKLYN         247
MANHATTAN         98
QUEENS           154
STATEN ISLAND     27
dtype: int64

But this returns a Pandas Series. It seems like a trivial difference, but counting duplicates in a column is easy in Spark because we can use method chaining, to the do the equivalent in Pandas I have to convert the series back to a dataframe and reset the index first:

In [3]:
pd_df.groupby("borough").size().to_frame("counts").reset_index().query("counts > 0")
Out[3]:
borough counts
0 BRONX 107
1 BROOKLYN 247
2 MANHATTAN 98
3 QUEENS 154
4 STATEN ISLAND 27

Furthermore, in Pandas there are too many ways to do the same thing. In my opinion, in a well designed API this shouldn't be the case. Lastly, in Pandas, window functions, which are incredibly import in SQL are just awkward to write.

For years I have been using Spark for large datasets, but for smaller ones sticking with Pandas and making do. Recently though, I heard lots of hype about Polars and DuckDB and decide to try them myself and was immediately impressed. In my opinion, Polars is not 100% mature yet, but I still has a lot of potential, many because for me the API is much more similar to Spark's than Pandas is.

In this blog post I go over my first interactions with both libraries and call out things I like and do not like, but first let's get set up to run this notebook on an AWS EC2 instance using Docker.

Getting Set Up On AWS with Docker

I have mostly used Google Cloud for my prior personal projects, but for this project I wanted to use Amazon Web Services. The first thing I do is create a S3 bucket. I do this from the console by signing on to aws.com and going to the S3 page:

images/s3.png

I can click the Create bucket button and create a bucket called harmonskis (for funskis) with all the default settings and click theCreate bucket button on the bottom right side.

Next I need to have access to read and write to and from the S3 bucket so I create an IAM role to do so. Going to the signin dashboard I can search for "IAM" and click on the link. This takes me to another site where selecting the "Roles" link in the the "Access Management" drop down on the left hand side takes me to the following:

IAM.png

I can click create the Create role button on the top right that takes me to the page:

create_role

I keep the selection of "AWS Service", select the "ec2" option and then click the Next button on the bottom right. This takes me to a page where I can create a policy. Searching for "s3" I select the following policy that gives me read/write access:

create_policy.png

I then click the Next button in the bottom right which takes me to the final page:

final_role.png

I give the role the name "s3acess" (spelling isnt my best skill) and then click Create role in the bottom right.

Next I will create my Elastic Compute Cloud (EC2) Instance instance by going to the console again and clicking on ec2, scrolling down and clicking the orange Launch instance button,

images/launch.png

Next I have to make sure I create a keypair file called "mikeskey.pem" that I download.

images/keypair.png

Notice that in the security group I use allows SSH traffic from "Anywhere". Finally, under the "Advanced details" drop down I select "s3acess" (I'm living with my spelling mistake) from the "IAM instance policy":

images/s3acess.png

Once I launch the EC2 instance I can see the instance running and click on Instance ID as shown below:

images/instance.png

I can then click on the pop up choice of Connect. This takes me to another page where I get the command at the bottom of the page to SSH onto my machine using the keypair I created:

images/connect.png

I could ssh onto the server with the following command:

ssh -i <path-to-key>/mikeskey.pem ec2-user@<dns-address>.compute-1.amazonaws.com

Note that I didnt create a user name so it defaulted to ec2-user.

However, since I'll be running jupyter lab on a remote EC2 server I need to set up ssh-tunneling as described here so that I can access it from the web browser on my laptop. I can do this by running the command:

ssh -i <path-to-key>/mikeskey.pem -L 8888:localhost:8888 ec2-user@<dns-address>.compute-1.amazonaws.com

Next I set up git ssh-keys so I could develop on the instance as described here and clone the repo. I can then set up Docker as discussed here. Then I build the image and call it polars_nb:

sudo docker build -t polars_nb . 

Finally, I start up the container from this image using port forwarding and loading the current directory as the volume:

sudo docker run -ip 8888:8888 -v `pwd`:/home/jovyan/ -t polars_nb

The terminal shows a link that I can copy and paste into my webbrowser, I make sure to copy the one with the 127 in it and viola it works!

Intro To Polars

Now that we're set up with a notebook on an EC2 isntance we can start to discuss Polars dataframes. The Polars library is written in Rust with Python bindings. Polars uses multi-core processing making it fast and the authors smartly used Apache Arrow making it efficient for cross-language in-memory dataframes as there is no serialization between the Rust and Python. According to the website the philosophy of Polars is,

The goal of Polars is to provide a lightning fast DataFrame library that:

  • Utilizes all available cores on your machine.
  • Optimizes queries to reduce unneeded work/memory allocations.
  • Handles datasets much larger than your available RAM.
  • Has an API that is consistent and predictable.
  • Has a strict schema (data-types should be known before running the query).

Let's get started! We can import polars and read in a dataset from NY Open Data on Motor Vehicle Collisions using the read_csv function:

In [3]:
import polars as pl
df = pl.read_csv("https://data.cityofnewyork.us/resource/h9gi-nx95.csv")
df.head(2)
Out[3]:
shape: (2, 29)
crash_datecrash_timeboroughzip_codelatitudelongitudelocationon_street_nameoff_street_namecross_street_namenumber_of_persons_injurednumber_of_persons_killednumber_of_pedestrians_injurednumber_of_pedestrians_killednumber_of_cyclist_injurednumber_of_cyclist_killednumber_of_motorist_injurednumber_of_motorist_killedcontributing_factor_vehicle_1contributing_factor_vehicle_2contributing_factor_vehicle_3contributing_factor_vehicle_4contributing_factor_vehicle_5collision_idvehicle_type_code1vehicle_type_code2vehicle_type_code_3vehicle_type_code_4vehicle_type_code_5
strstrstri64f64f64strstrstrstri64i64i64i64i64i64i64i64strstrstrstrstri64strstrstrstrstr
"2021-09-11T00:…"2:39"nullnullnullnullnull"WHITESTONE EXP…"20 AVENUE"null20000020"Aggressive Dri…"Unspecified"nullnullnull4455765"Sedan""Sedan"nullnullnull
"2022-03-26T00:…"11:45"nullnullnullnullnull"QUEENSBORO BRI…nullnull10000010"Pavement Slipp…nullnullnullnull4513547"Sedan"nullnullnullnull

The initial reading of CSVs is the same as Pandas and the head dataframe method returns the top n rows as Pandas does. However, in addition to the printed rows, I also get shape of the dataframe as well as the datatypes of the columns.

I can get the name of columns and their datatypes using the schema method which is similar to Spark:

In [4]:
df.schema
Out[4]:
{'crash_date': Utf8,
 'crash_time': Utf8,
 'borough': Utf8,
 'zip_code': Int64,
 'latitude': Float64,
 'longitude': Float64,
 'location': Utf8,
 'on_street_name': Utf8,
 'off_street_name': Utf8,
 'cross_street_name': Utf8,
 'number_of_persons_injured': Int64,
 'number_of_persons_killed': Int64,
 'number_of_pedestrians_injured': Int64,
 'number_of_pedestrians_killed': Int64,
 'number_of_cyclist_injured': Int64,
 'number_of_cyclist_killed': Int64,
 'number_of_motorist_injured': Int64,
 'number_of_motorist_killed': Int64,
 'contributing_factor_vehicle_1': Utf8,
 'contributing_factor_vehicle_2': Utf8,
 'contributing_factor_vehicle_3': Utf8,
 'contributing_factor_vehicle_4': Utf8,
 'contributing_factor_vehicle_5': Utf8,
 'collision_id': Int64,
 'vehicle_type_code1': Utf8,
 'vehicle_type_code2': Utf8,
 'vehicle_type_code_3': Utf8,
 'vehicle_type_code_4': Utf8,
 'vehicle_type_code_5': Utf8}

We can see that the datatypes of Polars are built on top of Arrow's datatypes and use Arrow arrays. This is awesome because Arrow is memory efficient and can also used for in-memory dataframes with zero-serialization across languages.

The first command I tried with Polars was looking for duplicates in the dataframe. I found I could do this with the syntax:

In [5]:
test = (df.groupby("collision_id")
           .count()
           .filter(pl.col("count") > 1))

test
Out[5]:
shape: (0, 2)
collision_idcount
i64u32

Right away from the syntax I was in love.

Then I saw statements returned a dataframe:

In [6]:
type(test)
Out[6]:
polars.dataframe.frame.DataFrame

This is exactly what I want! I don't want a series (even though Polars does have Series data structures). You can even print the dataframes:

In [7]:
print(test)
shape: (0, 2)
┌──────────────┬───────┐
│ collision_id ┆ count │
│ ---          ┆ ---   │
│ i64          ┆ u32   │
╞══════════════╪═══════╡
└──────────────┴───────┘

This turns out to be helpful when you have lazy execution (which I'll go over later). The next thing I tried was to access the column of the dataframe by using the dot operator:

In [8]:
df.crash_date
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[8], line 1
----> 1 df.crash_date

AttributeError: 'DataFrame' object has no attribute 'crash_date'

I was actually happy to see this was not implemented! For me a column in a dataframe should not be accessed this way. The dot operator is meant to access attributes of the class.

Instead we can access the column of the dataframe like a dictionary's key:

In [7]:
df["crash_date"].is_null().any()
Out[7]:
False

The crash dates are strings that I wanted to convert to datetime type (I'm doing this to build up to more complex queries). I can see the format of the string:

In [8]:
df['crash_date'][0] # the .loc method doesnt exist!
Out[8]:
'2021-09-11T00:00:00.000'

To do so, I write two queries:

  1. The first query extracts the year-month-day and writes it as a string in the format YYYY-MM-DD
  2. The second query converts the YYYY-MM-DD strings into timestamp objects

For the first query I can extract the year-month-day from the string and assign that to a new column named crash_date_str. Note the syntax to create a new column in Polars is with_columns (similar to withColumn in Spark) and I have to use the col function similar to Spark! I can get the first 10 characters of the string using the vectorized str method similar to Pandas. Finally, I rename the new column crash_data_str using the alias function (again just like Spark). The default for the with_column is to label the new column name the same as the old column name, so we use alias to rename it.

In the second query I use the vectorized string method strptime to convert the crash_date_str column to a PyArrow datetime object and rename that column crash_date (overriding the old column with this name).

These two queries are chained together and the results are shown below.

In [9]:
df = df.with_columns(
            pl.col("crash_date").str.slice(0, length=10).alias("crash_date_str")
      ).with_columns(
            pl.col("crash_date_str").str.strptime(
                pl.Datetime, "%Y-%m-%d", strict=False).alias("crash_date")
)

df.select(["crash_date", "crash_date_str"]).head()
Out[9]:
shape: (5, 2)
crash_datecrash_date_str
datetime[μs]str
2021-09-11 00:00:00"2021-09-11"
2022-03-26 00:00:00"2022-03-26"
2022-06-29 00:00:00"2022-06-29"
2021-09-11 00:00:00"2021-09-11"
2021-12-14 00:00:00"2021-12-14"

Notice the col function in Polars lets me access derived columns that are not in the original dataframe. In Pandas to do the same operations I would have to use a lambda function within an assign function:

df.assign(crash_date=lambda: df["crash_date_str"].str.strptime(...))

I can see the number of crashes in each borough of NYC with the query

In [10]:
print(df.groupby("borough").count())
shape: (6, 2)
┌───────────────┬───────┐
│ borough       ┆ count │
│ ---           ┆ ---   │
│ str           ┆ u32   │
╞═══════════════╪═══════╡
│ MANHATTAN     ┆ 98    │
│ STATEN ISLAND ┆ 27    │
│ BROOKLYN      ┆ 247   │
│ BRONX         ┆ 107   │
│ null          ┆ 367   │
│ QUEENS        ┆ 154   │
└───────────────┴───────┘

There is a borough value of NULL. I can filter this out with the commands:

In [11]:
nn_df = df.filter(pl.col("borough").is_not_null())

Now I can get just the unique values of non-null boroughs with the query:

In [12]:
print(df.filter(pl.col("borough").is_not_null())
        .select("borough")
        .unique())
shape: (5, 1)
┌───────────────┐
│ borough       │
│ ---           │
│ str           │
╞═══════════════╡
│ STATEN ISLAND │
│ MANHATTAN     │
│ QUEENS        │
│ BRONX         │
│ BROOKLYN      │
└───────────────┘

Notice that I can use the select method in Polars to select just the columns I need. This is actually pretty powerful, as I can select columns and run queries on them similar to selectEpr in Spark:

In [13]:
print(
 df.filter(pl.col("borough").is_not_null())
   .select([
       "borough", 
       (pl.col("number_of_persons_injured")  + 1).alias("number_of_persons_injured_plus1")
    ]).head()
)
shape: (5, 2)
┌───────────┬─────────────────────────────────┐
│ borough   ┆ number_of_persons_injured_plus1 │
│ ---       ┆ ---                             │
│ str       ┆ i64                             │
╞═══════════╪═════════════════════════════════╡
│ BROOKLYN  ┆ 1                               │
│ BROOKLYN  ┆ 1                               │
│ BRONX     ┆ 3                               │
│ BROOKLYN  ┆ 1                               │
│ MANHATTAN ┆ 1                               │
└───────────┴─────────────────────────────────┘

Doing the same query in Pandas is not as elegant or readable:

In [18]:
(pd_df[~pd_df["borough"].isnull()]
      .assign(number_of_persons_injured_plus1=pd_df["number_of_persons_injured"] + 1)
      [["borough", "number_of_persons_injured_plus1"]]
      .head()
)
Out[18]:
borough number_of_persons_injured_plus1
3 BROOKLYN 1
4 BROOKLYN 1
7 BRONX 3
8 BROOKLYN 1
9 MANHATTAN 1

To me, the Polars query is so much easier to read. And what's more is that it's actually more efficient. The Pandas dataframe transforms the whole dataset, then subsets the columns to return just two. On the other hand Polars subsets the two columns first and then transforms just those two columns.

Now I can create a Polars dataframe the exact same way as in Pandas:

In [9]:
borough_df = pl.DataFrame({
                "borough": ["BROOKLYN", "BRONX", "MANHATTAN", "STATEN ISLAND", "QUEENS"],
                "population": [2590516, 1379946, 1596273, 2278029, 378977],
                "area":[179.7, 109.2, 58.68, 281.6, 149.0]
})

print(borough_df)
shape: (5, 3)
┌───────────────┬────────────┬───────┐
│ borough       ┆ population ┆ area  │
│ ---           ┆ ---        ┆ ---   │
│ str           ┆ i64        ┆ f64   │
╞═══════════════╪════════════╪═══════╡
│ BROOKLYN      ┆ 2590516    ┆ 179.7 │
│ BRONX         ┆ 1379946    ┆ 109.2 │
│ MANHATTAN     ┆ 1596273    ┆ 58.68 │
│ STATEN ISLAND ┆ 2278029    ┆ 281.6 │
│ QUEENS        ┆ 378977     ┆ 149.0 │
└───────────────┴────────────┴───────┘

This is the population and area of the boroughs which I got from Wikipedia. I'll save it to s3. It was a little awkward to write to s3 with Polars directly so I'll first convert the dataframe to Pandas and then write to s3:

In [10]:
borough_df.to_pandas().to_parquet("s3://harmonskis/nyc_populations.parquet")

However, reading from s3 is just the same as with Pandas:

In [11]:
borough_df = pl.read_parquet("s3://harmonskis/nyc_populations.parquet")

We'll use it to go over a more complicated query:

Get the total number of injuries per borough then join that result to the borough dataframe to get the injuries by population and finally sort them by borough name.

In Polars this can be using method chaining on the dataframe:

In [20]:
print(
 df.filter(pl.col("borough").is_not_null())
   .select(["borough", "number_of_persons_injured"])
   .groupby("borough")
   .sum()
   .join(borough_df, on=["borough"])
   .select([
       "borough", 
       (pl.col("number_of_persons_injured") / pl.col("population")).alias("injuries_per_population")
   ])
   .sort(pl.col("borough"))
)
shape: (5, 2)
┌───────────────┬─────────────────────────┐
│ borough       ┆ injuries_per_population │
│ ---           ┆ ---                     │
│ str           ┆ f64                     │
╞═══════════════╪═════════════════════════╡
│ BRONX         ┆ 0.000033                │
│ BROOKLYN      ┆ 0.000045                │
│ MANHATTAN     ┆ 0.000025                │
│ QUEENS        ┆ 0.000193                │
│ STATEN ISLAND ┆ 0.000007                │
└───────────────┴─────────────────────────┘

Doing the same query in the Pandas API would be an awkward mess. As we can see in Polars it's very easy to use method chaining and the resulting syntax reads pretty similar to SQL!

Which brings me to something that was super exciting to see in Polars: sqlcontext. SQLContext in Polars can be used to create a table from a Polars dataframe and then run SQL commands that return another Polars dataframe.

We can see this by creating a table called crashes from the dataframe df:

In [21]:
ctx = pl.SQLContext(crashes=df)

Now I can get the sum of every crash per day in each borough:

In [22]:
daily_df = ctx.execute("""
    SELECT
        borough,
        crash_date AS day,
        SUM(number_of_persons_injured)
    FROM 
        crashes
    WHERE 
        borough IS NOT NULL
    GROUP BY 
        borough, crash_date
    ORDER BY 
        borough, day
""")

print(daily_df.collect().head())
shape: (5, 3)
┌─────────┬─────────────────────┬───────────────────────────┐
│ borough ┆ day                 ┆ number_of_persons_injured │
│ ---     ┆ ---                 ┆ ---                       │
│ str     ┆ datetime[μs]        ┆ i64                       │
╞═════════╪═════════════════════╪═══════════════════════════╡
│ BRONX   ┆ 2021-02-26 00:00:00 ┆ 0                         │
│ BRONX   ┆ 2021-04-06 00:00:00 ┆ 0                         │
│ BRONX   ┆ 2021-04-08 00:00:00 ┆ 0                         │
│ BRONX   ┆ 2021-04-10 00:00:00 ┆ 4                         │
│ BRONX   ┆ 2021-04-11 00:00:00 ┆ 0                         │
└─────────┴─────────────────────┴───────────────────────────┘

Notice I had to use collect() function to get the results. That is because by default SQL in Polars uses lazy execution.

You can see evidence of this when printing the resulting dataframe; it actually prints the query plan:

In [23]:
print(daily_df)
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

SORT BY [col("borough"), col("day")]
   SELECT [col("borough"), col("crash_date").alias("day"), col("number_of_persons_injured")] FROM
    AGGREGATE
    	[col("number_of_persons_injured").sum()] BY [col("borough"), col("crash_date")] FROM
      FILTER col("borough").is_not_null() FROM
      DF ["crash_date", "crash_time", "borough", "zip_code"]; PROJECT */30 COLUMNS; SELECTION: "None"

To get back a Polars dataframe from this result I would have to use the eager=True parameter in the execute method.

I can register this new dataframe as a table called daily_crashes in the SQLContext:

In [24]:
ctx = ctx.register("daily_crashes", daily_df)

I can see the tables that are registered using the command:

In [25]:
ctx.tables()
Out[25]:
['crashes', 'daily_crashes']

Now say I want to get the current day's number of injured people and the prior days; I could use the lag function in SQL to do so:

In [26]:
ctx.execute("""
    SELECT
        borough,
        day,
        number_of_persons_injured,
        LAG(1,number_of_persons_injured) 
            OVER (
            PARTITION BY borough 
            ORDER BY day ASC
            ) AS prior_day_injured
FROM
    daily_crashes
ORDER BY 
    borough,
    day DESC
""", eager=True)
---------------------------------------------------------------------------
InvalidOperationError                     Traceback (most recent call last)
Cell In[26], line 1
----> 1 ctx.execute("""
      2     SELECT
      3         borough,
      4         day,
      5         number_of_persons_injured,
      6         LAG(1,number_of_persons_injured) 
      7             OVER (
      8             PARTITION BY borough 
      9             ORDER BY day ASC
     10             ) AS prior_day_injured
     11 FROM
     12     daily_crashes
     13 ORDER BY 
     14     borough,
     15     day DESC
     16 """, eager=True)

File /opt/conda/lib/python3.10/site-packages/polars/sql/context.py:282, in SQLContext.execute(self, query, eager)
    204 def execute(self, query: str, eager: bool | None = None) -> LazyFrame | DataFrame:
    205     """
    206     Parse the given SQL query and execute it against the registered frame data.
    207 
   (...)
    280     └────────┴─────────────┴─────────┘
    281     """
--> 282     res = wrap_ldf(self._ctxt.execute(query))
    283     return res.collect() if (eager or self._eager_execution) else res

InvalidOperationError: unsupported SQL function: lag

I finally hit snag in Polars: their doesnt seem to be a lot of support for Window functions. This was initially disappointing since the library was so promising!

Upon further research I found window functions are supported, infact they are VERY WELL supported!. The query I was trying to turns out to be fairly easy to write as dataframe operations using the over expression. This is exactly the same as SQL where the column names within the over(...) operator are the columns you partition by. You can the sort within each partition (or group as they say in Polars) and use shift instead of LAG:

In [45]:
print(
    daily_df.with_columns(
            pl.col("number_of_persons_injured")
              .sort_by("day", descending=False)
              .shift(periods=1)
              .over("borough")
              .alias("prior_day_injured")
).collect().head(8))
shape: (8, 4)
┌─────────┬─────────────────────┬───────────────────────────┬───────────────────┐
│ borough ┆ day                 ┆ number_of_persons_injured ┆ prior_day_injured │
│ ---     ┆ ---                 ┆ ---                       ┆ ---               │
│ str     ┆ datetime[μs]        ┆ i64                       ┆ i64               │
╞═════════╪═════════════════════╪═══════════════════════════╪═══════════════════╡
│ BRONX   ┆ 2021-02-26 00:00:00 ┆ 0                         ┆ null              │
│ BRONX   ┆ 2021-04-06 00:00:00 ┆ 0                         ┆ 0                 │
│ BRONX   ┆ 2021-04-08 00:00:00 ┆ 0                         ┆ 0                 │
│ BRONX   ┆ 2021-04-10 00:00:00 ┆ 4                         ┆ 0                 │
│ BRONX   ┆ 2021-04-11 00:00:00 ┆ 0                         ┆ 4                 │
│ BRONX   ┆ 2021-04-12 00:00:00 ┆ 0                         ┆ 0                 │
│ BRONX   ┆ 2021-04-13 00:00:00 ┆ 3                         ┆ 0                 │
│ BRONX   ┆ 2021-04-14 00:00:00 ┆ 3                         ┆ 3                 │
└─────────┴─────────────────────┴───────────────────────────┴───────────────────┘

It turns out you can do the same thing with Pandas as shown below.

Note that I have to collect the lazy datafame and convert it to Pandas first:

In [49]:
pd_daily_df = daily_df.collect().to_pandas()
In [54]:
pd_daily_df = pd_daily_df.assign(prior_day_injured=
                        pd_daily_df.sort_values(by=['day'], ascending=True)
                          .groupby(['borough'])
                          ['number_of_persons_injured']
                          .shift(1))

pd_daily_df.head(8)
Out[54]:
borough day number_of_persons_injured prior_day_injured
0 BRONX 2021-02-26 0 NaN
1 BRONX 2021-04-06 0 0.0
2 BRONX 2021-04-08 0 0.0
3 BRONX 2021-04-10 4 0.0
4 BRONX 2021-04-11 0 4.0
5 BRONX 2021-04-12 0 0.0
6 BRONX 2021-04-13 3 0.0
7 BRONX 2021-04-14 3 3.0

Syntactically, I still perfer the Polars to Pandas.

But let's I really want to use SQL and not do things in the dataframe, atleast to me, it doesnt seem possible with Polars.

Luckily there is another library that support blazingly fast SQL queries and integrates with Polars (and Pandas) directly: DuckDB.

DuckDB To The Rescue For SQL

I heard about DuckDB when I saw someone star it on github and thought it was "Yet Another SQL Engine". While DuckDB is a SQL engine, it does much more than I thought a SQL engine could!

DuckDB is a parallel query processing library written in C++ and according to their website:

    DuckDB is designed to support analytical query workloads, also known as Online analytical processing (OLAP). These workloads are characterized by complex, relatively long-running queries that process significant portions of the stored dataset, for example aggregations over entire tables or joins between several large tables.
    ...
    DuckDB contains a columnar-vectorized query execution engine, where queries are still interpreted, but a large batch of values (a “vector”) are processed in one operation.

In other words, DuckDB can be used for fast SQL query execution on large datasets. For example the above query that failed in Polars runs perfectly using DuckDB:

In [26]:
import duckdb

query = duckdb.sql("""
    SELECT
        borough,
        day,
        number_of_persons_injured,
        LAG(1, number_of_persons_injured) 
            OVER (
                PARTITION BY borough 
                ORDER BY day ASC
                ) as prior_day_injured
FROM
    daily_df
ORDER BY 
    borough,
    day DESC
LIMIT 5
""")

Now we can see the output of the query:

In [27]:
query
Out[27]:
┌─────────┬─────────────────────┬───────────────────────────┬───────────────────┐
│ borough │         day         │ number_of_persons_injured │ prior_day_injured │
│ varchar │      timestamp      │           int64           │       int32       │
├─────────┼─────────────────────┼───────────────────────────┼───────────────────┤
│ BRONX   │ 2022-04-24 00:00:00 │                         0 │                 1 │
│ BRONX   │ 2022-03-26 00:00:00 │                         7 │                 1 │
│ BRONX   │ 2022-03-25 00:00:00 │                         1 │                 1 │
│ BRONX   │ 2022-03-24 00:00:00 │                         1 │                 1 │
│ BRONX   │ 2022-03-22 00:00:00 │                         1 │                 1 │
└─────────┴─────────────────────┴───────────────────────────┴───────────────────┘

We can return the result as polars dataframe using the pl method:

In [28]:
day_prior_df = query.pl()
print(day_prior_df.head(5))
shape: (5, 4)
┌─────────┬─────────────────────┬───────────────────────────┬───────────────────┐
│ borough ┆ day                 ┆ number_of_persons_injured ┆ prior_day_injured │
│ ---     ┆ ---                 ┆ ---                       ┆ ---               │
│ str     ┆ datetime[μs]        ┆ i64                       ┆ i32               │
╞═════════╪═════════════════════╪═══════════════════════════╪═══════════════════╡
│ BRONX   ┆ 2022-04-24 00:00:00 ┆ 0                         ┆ 1                 │
│ BRONX   ┆ 2022-03-26 00:00:00 ┆ 7                         ┆ 1                 │
│ BRONX   ┆ 2022-03-25 00:00:00 ┆ 1                         ┆ 1                 │
│ BRONX   ┆ 2022-03-24 00:00:00 ┆ 1                         ┆ 1                 │
│ BRONX   ┆ 2022-03-22 00:00:00 ┆ 1                         ┆ 1                 │
└─────────┴─────────────────────┴───────────────────────────┴───────────────────┘

Now we can see another cool part of DuckDB, you can execute SQL directly on local files!

First we save the daily crash dataframe as Parquet file, but first remember it's a "lazy dataframe":

In [29]:
daily_df
Out[29]:
naive plan: (run LazyFrame.explain(optimized=True) to see the optimized plan)

SORT BY [col("borough"), col("day")]

SELECT [col("borough"), col("crash_date").alias("day"), col("number_of_persons_injured")] FROM

AGGREGATE

[col("number_of_persons_injured").sum()] BY [col("borough"), col("crash_date")] FROM

FILTER col("borough").is_not_null() FROM

DF ["crash_date", "crash_time", "borough", "zip_code"]; PROJECT */30 COLUMNS; SELECTION: "None"

It turns out you cant write lazy dataframes as Parquet using Polars. So first we'll collect it and then write it to parquet:

In [30]:
daily_df.collect().write_parquet("daily_crashes.parquet")

Apache Parquet is a compressed columnar-stored file format that is great for analytical queries. Column-based formats are particularly good for OLAP queries since columns can subsetted and be read in continuously allowing for aggregations to be easily performed on them. The datatypes for each column in Parquet are known which allows the format to be compressed. Since the columns and datatypes are known metadata we can read them in with the following query:

In [31]:
duckdb.sql("SELECT * FROM parquet_schema(daily_crashes.parquet)").pl()
Out[31]:
shape: (4, 11)
file_namenametypetype_lengthrepetition_typenum_childrenconverted_typescaleprecisionfield_idlogical_type
strstrstrstrstri64stri64i64i64str
"daily_crashes.…"root"nullnullnull3nullnullnullnullnull
"daily_crashes.…"borough""BYTE_ARRAY"null"OPTIONAL"null"UTF8"nullnullnull"StringType()"
"daily_crashes.…"day""INT64"null"OPTIONAL"nullnullnullnullnull"TimestampType(…
"daily_crashes.…"number_of_pers…"INT64"null"OPTIONAL"nullnullnullnullnullnull

Now we can perform queries on the actualy files without having to resort to dataframes at all:

In [14]:
query = duckdb.sql("""
    SELECT
        borough,
        day,
        number_of_persons_injured,
        SUM(number_of_persons_injured) 
            OVER (
                PARTITION BY borough 
                ORDER BY day ASC
                ) AS cumulative_injuried
    FROM 
        read_parquet(daily_crashes.parquet)
    ORDER BY
        borough,
        day ASC
""")
In [16]:
print(query.pl().head(8))
shape: (8, 4)
┌─────────┬─────────────────────────┬───────────────────────────┬─────────────────────┐
│ borough ┆ day                     ┆ number_of_persons_injured ┆ cumulative_injuried │
│ ---     ┆ ---                     ┆ ---                       ┆ ---                 │
│ str     ┆ str                     ┆ i64                       ┆ f64                 │
╞═════════╪═════════════════════════╪═══════════════════════════╪═════════════════════╡
│ BRONX   ┆ 2021-02-26T00:00:00.000 ┆ 0                         ┆ 0.0                 │
│ BRONX   ┆ 2021-04-06T00:00:00.000 ┆ 0                         ┆ 0.0                 │
│ BRONX   ┆ 2021-04-08T00:00:00.000 ┆ 0                         ┆ 0.0                 │
│ BRONX   ┆ 2021-04-10T00:00:00.000 ┆ 4                         ┆ 4.0                 │
│ BRONX   ┆ 2021-04-11T00:00:00.000 ┆ 0                         ┆ 4.0                 │
│ BRONX   ┆ 2021-04-12T00:00:00.000 ┆ 0                         ┆ 4.0                 │
│ BRONX   ┆ 2021-04-13T00:00:00.000 ┆ 3                         ┆ 7.0                 │
│ BRONX   ┆ 2021-04-14T00:00:00.000 ┆ 3                         ┆ 10.0                │
└─────────┴─────────────────────────┴───────────────────────────┴─────────────────────┘

Pretty cool!!!

Conclusions

In this post I quickly covered what I view as the limitations of Pandas library. Next I covered how to get set up in with Jupyter lab using Docker on AWS and covered some basics of Polars, DuckDB and how to use the two in combination. The benefits of Polars is that,

  • It allows for fast parallel querying on dataframes.
  • It uses Apache Arrow for backend datatypes making it memory efficient.
  • It has both lazy and eager execution mode.
  • It allows for SQL queries directly on dataframes.
  • Its API is similar to Spark's API and allows for highly readable queries using method chaining.

I am still new to both libraries, but looking forward to learning more about them.

Hope you enjoyed reading this!