Introduction to Arrow in R

NEDs Workshop (18th July 2024)

Introductions

  • Nic Crane

Arrow contributor!

Welcome

Today we’re going to cover:

  • Working with larger-than-memory datasets with Arrow
  • How to get the best performance with your tabular data
  • Where to find more information

Workshop format

Getting set up

Repository URL: https://github.com/thisisnic/introtoarrowneds

Dataset to follow along with

options(timeout = 18000)
curl::multi_download(
  "https://r4ds.s3.us-west-2.amazonaws.com/seattle-library-checkouts.csv",
  "data/seattle-library-checkouts.csv",
  resume = TRUE
)

If your download stops partway through, you can stop and resume from the same place.

Dataset to follow along with - tiny version

options(timeout = 18000)
curl::multi_download(
  "https://github.com/posit-conf-2023/arrow/releases/download/v0.1.0/seattle-library-checkouts-tiny.csv",
  "data/seattle-library-checkouts-tiny.csv",
  resume = TRUE
)

Part 1 - Arrow

What is Apache Arrow?

A multi-language toolbox for accelerated data interchange and in-memory processing

Arrow is designed to both improve the performance of analytical algorithms and the efficiency of moving data from one system or programming language to another

https://arrow.apache.org/overview/

Apache Arrow Specification

In-memory columnar format: a standardized, language-agnostic specification for representing structured, table-like datasets in-memory.


A Multi-Language Toolbox

Accelerated Data Interchange

Accelerated In-Memory Processing

Arrow’s Columnar Format is Fast

arrow 📦


arrow 📦

Part 2 - Working with Arrow Datasets

Seattle Checkouts - Big CSV

https://data.seattle.gov/Community/Checkouts-by-Title/tmmm-ytt6/about_data

Dataset contents

How big is the dataset?

library(arrow)
library(dplyr)
file.size("./data/seattle-library-checkouts.csv") / 10 **9
[1] 9.211969

Opening in Arrow

seattle_csv <- open_dataset(
  sources = "./data/seattle-library-checkouts.csv", 
  format = "csv"
)

How many rows of data?

nrow(seattle_csv)
[1] 41389465

Extract schema

schema(seattle_csv)
Schema
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: null
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

Arrow Data Types

Arrow has a rich data type system, including direct analogs of many R data types

  • <dbl> == <double>
  • <chr> == <string> or <utf8>
  • <int> == <int32>


https://arrow.apache.org/docs/r/articles/data_types.html

Parsing the Metadata


Arrow scans 👀 1MB of the file(s) to impute or “guess” the data types

📚 arrow vs readr blog post: https://thisisnic.github.io/2022/11/21/type-inference-in-readr-and-arrow/

Parsers Are Not Always Right

schema(seattle_csv)
Schema
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: null
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

Let’s Control the Schema

seattle_csv <- open_dataset(
  sources = "./data/seattle-library-checkouts.csv", 
  col_types = schema(ISBN = string()),
  format = "csv"
)

schema(seattle_csv)
Schema
UsageClass: string
CheckoutType: string
MaterialType: string
CheckoutYear: int64
CheckoutMonth: int64
Checkouts: int64
Title: string
ISBN: string
Creator: string
Subjects: string
Publisher: string
PublicationYear: string

Part 3 - Data Manipulation with Arrow

Arrow dplyr backend

Querying the data - new column: is this a book?

seattle_csv |>
  mutate(IsBook = endsWith(MaterialType, "BOOK")) |>
  select(MaterialType, IsBook)
FileSystemDataset (query)
MaterialType: string
IsBook: bool (ends_with(MaterialType, {pattern="BOOK", ignore_case=false}))

See $.data for the source Arrow object

Nothing is pulled into memory yet!

Preview the query

seattle_csv |>
  head(20) |>
  mutate(IsBook = endsWith(MaterialType, "BOOK")) |>
  select(MaterialType, IsBook) |>
  collect()
# A tibble: 20 × 2
   MaterialType IsBook
   <chr>        <lgl> 
 1 BOOK         TRUE  
 2 BOOK         TRUE  
 3 EBOOK        TRUE  
 4 BOOK         TRUE  
 5 SOUNDDISC    FALSE 
 6 BOOK         TRUE  
 7 BOOK         TRUE  
 8 EBOOK        TRUE  
 9 BOOK         TRUE  
10 EBOOK        TRUE  
11 BOOK         TRUE  
12 BOOK         TRUE  
13 BOOK         TRUE  
14 AUDIOBOOK    TRUE  
15 BOOK         TRUE  
16 EBOOK        TRUE  
17 SOUNDDISC    FALSE 
18 VIDEODISC    FALSE 
19 SOUNDDISC    FALSE 
20 AUDIOBOOK    TRUE  

How many books were checked out each year?

seattle_csv |>
  filter(endsWith(MaterialType, "BOOK")) |>
  group_by(CheckoutYear) |>
  summarise(Checkouts = sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect()
# A tibble: 18 × 2
   CheckoutYear Checkouts
          <int>     <int>
 1         2005   2129128
 2         2006   3385869
 3         2007   3679981
 4         2008   4156859
 5         2009   4500788
 6         2010   4389760
 7         2011   4484366
 8         2012   4696376
 9         2013   5394411
10         2014   5606168
11         2015   5784864
12         2016   5915722
13         2017   6280679
14         2018   6831226
15         2019   7339010
16         2020   5549585
17         2021   6659627
18         2022   6301822

How long did it take?

seattle_csv |>
  filter(endsWith(MaterialType, "BOOK")) |>
  group_by(CheckoutYear) |>
  summarise(Checkouts = sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |>
  system.time()
   user  system elapsed 
 13.120   1.162  11.797 

42 million rows – not bad, but could be faster….

Part 4 - Engineering the Data

.NORM Files


https://xkcd.com/2116/

File Format: Apache Parquet

https://parquet.apache.org/

Parquet Files: “row-chunked”

Parquet Files: “row-chunked & column-oriented”

Parquet

  • compression and encoding == usually much smaller than equivalent CSV file, less data to move from disk to memory
  • rich type system & stores the schema along with the data == more robust pipelines
  • “row-chunked & column-oriented” == work on different parts of the file at the same time or skip some chunks all together, better performance than row-by-row

Writing to Parquet

seattle_parquet_dir <- "./data/seattle-library-checkouts-parquet"

seattle_csv |>
  write_dataset(path = seattle_parquet_dir, format = "parquet")

Storage: Parquet vs CSV

file <- list.files(seattle_parquet_dir,
                   recursive = TRUE,
                   full.names = TRUE)

file.size(file) / 10 ** 9
[1] 4.423348


Parquet about half the size of the CSV file on-disk 💾

4.5GB Parquet file + arrow + dplyr

open_dataset(seattle_parquet_dir, 
             format = "parquet") |>
  filter(endsWith(MaterialType, "BOOK")) |>
  group_by(CheckoutYear) |>
  summarise(Checkouts = sum(Checkouts)) |>
  arrange(CheckoutYear) |> 
  collect() |>
  system.time()
   user  system elapsed 
  5.379   0.267   0.840 


42 million rows – much better! But could be even faster….

File Storage: Partitioning

Dividing data into smaller pieces, making it more easily accessible and manageable

Poll: Partitioning?

Have you partitioned your data or used partitioned data before today?

  • 1️⃣ Yes
  • 2️⃣ No
  • 3️⃣ Not sure, the data engineers sort that out!

Rewriting the Data Again

seattle_parquet_part <- "./data/seattle-library-checkouts"

seattle_csv |>
  group_by(CheckoutYear) |>
  write_dataset(path = seattle_parquet_part,
                format = "parquet")

What Did We “Engineer”?

seattle_parquet_part <- "./data/seattle-library-checkouts"

sizes <- tibble(
  files = list.files(seattle_parquet_part,
                     recursive = TRUE),
  size_GB = round(file.size(file.path(seattle_parquet_part, files)) / 10**9, 3)
)

sizes
# A tibble: 18 × 2
   files                            size_GB
   <chr>                              <dbl>
 1 CheckoutYear=2005/part-0.parquet   0.114
 2 CheckoutYear=2006/part-0.parquet   0.172
 3 CheckoutYear=2007/part-0.parquet   0.186
 4 CheckoutYear=2008/part-0.parquet   0.204
 5 CheckoutYear=2009/part-0.parquet   0.224
 6 CheckoutYear=2010/part-0.parquet   0.233
 7 CheckoutYear=2011/part-0.parquet   0.25 
 8 CheckoutYear=2012/part-0.parquet   0.261
 9 CheckoutYear=2013/part-0.parquet   0.282
10 CheckoutYear=2014/part-0.parquet   0.296
11 CheckoutYear=2015/part-0.parquet   0.308
12 CheckoutYear=2016/part-0.parquet   0.315
13 CheckoutYear=2017/part-0.parquet   0.319
14 CheckoutYear=2018/part-0.parquet   0.306
15 CheckoutYear=2019/part-0.parquet   0.302
16 CheckoutYear=2020/part-0.parquet   0.158
17 CheckoutYear=2021/part-0.parquet   0.24 
18 CheckoutYear=2022/part-0.parquet   0.252

4.5GB partitioned Parquet files + arrow + dplyr

seattle_parquet_part <- "./data/seattle-library-checkouts"

open_dataset(seattle_parquet_part,
             format = "parquet") |>
  filter(endsWith(MaterialType, "BOOK")) |>
  group_by(CheckoutYear) |>
  summarise(Checkouts = sum(Checkouts)) |>
  arrange(CheckoutYear) |>
  collect() |>
  system.time()
   user  system elapsed 
  4.853   0.245   0.520 


42 million rows – not too shabby!

Art & Science of Partitioning


  • avoid files < 20MB and > 2GB
  • avoid > 10,000 files (🤯)
  • partition on variables used in filter()

Performance Review: Single CSV

How long does it take to calculate the number of books checked out in each month of 2021?


open_dataset(sources = "./data/seattle-library-checkouts.csv",
             col_types = schema(ISBN = string()),
             format = "csv") |>
  filter(CheckoutYear == 2021, endsWith(MaterialType, "BOOK")) |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |>
  system.time()
   user  system elapsed 
 13.460   1.611  12.442 

Performance Review: Partitioned Parquet

How long does it take to calculate the number of books checked out in each month of 2021?


open_dataset("./data/seattle-library-checkouts",
             format = "parquet") |>
  filter(CheckoutYear == 2021, endsWith(MaterialType, "BOOK")) |>
  group_by(CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutMonth)) |>
  collect() |>
  system.time()
   user  system elapsed 
  0.353   0.034   0.085 

Engineering Data Tips for Improved Storage & Performance


  • use Parquet over CSV if possible
  • consider partitioning, experiment to get an appropriate partition design 🗂️
  • watch your schemas 👀

Part 5 - More Resources

Arrow docs

https://arrow.apache.org/docs/r/

R for Data Science (2e)

Scaling Up with R and Arrow

Currently being written - preview available online soon!

And what about Python??

Some resources for PyArrow users (thanks to Alenka Frim for adding this content!):

PyArrow resources:

More granular PyArrow resources 1

More granular PyArrow resources 2