!pip install getdaft --pre --extra-index-url https://pypi.anaconda.org/daft-nightly/simple

Hint

✨✨✨ Run this notebook on Google Colab ✨✨✨

You can run this notebook yourself with Google Colab!

10 minutes Quickstart#

This is a short introduction to all the main functionality in Daft, geared towards new users.

We import from daft as follows:

from daft import DataFrame

DataFrame creation#

See also: API Reference: DataFrame Construction

We can create a DataFrame from a dictionary of columns - this is a dictionary where the keys are strings representing the columns’ names and the values are equal-length lists representing the columns’ values.

import datetime

df = DataFrame.from_pydict({
    "A": [1, 2, 3, 4],
    "B": [1.5, 2.5, 3.5, 4.5],
    "C": [True, True, False, False],
    "D": ["a", "b", "c", "d"],
    "E": [b"a", b"b", b"c", b"d"],
    "F": [datetime.date(1994, 1, 1), datetime.date(1994, 1, 2), datetime.date(1994, 1, 3), datetime.date(1994, 1, 4)],
    "G": [[1, 1, 1], [2, 2, 2], [3, 3, 3], [4, 4, 4]],
})
2023-01-21 14:36:57.855 | INFO     | daft.context:runner:85 - Using PyRunner

You can also load DataFrames from other sources, such as:

  1. CSV files: DataFrame.read_csv("s3://bucket/*.csv")

  2. Parquet files: DataFrame.read_parquet("/path/*.parquet")

  3. JSON line-delimited files: DataFrame.read_json("/path/*.parquet")

  4. Files on disk: DataFrame.from_glob_path("/path/*.jpeg")

Daft automatically supports local paths as well as paths to object storage such as AWS S3.

Inspect your dataframe by printing the df variable

df
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
(No data to display: Dataframe not materialized)

Executing your DataFrame and Displaying Data#

Notice that instead of the contents of the dataframe, the message (no data to display: Dataframe not materialized) is displayed when we printed our dataframe in the previous section.

This is because Daft is lazy and only executes computations when explicitly told to do so. When you call methods on DataFrames such as .select, .where and .read_csv, Daft actually only enqueues these operations in a Logical Plan. You can examine this logical plan using DataFrame.explain():

df.explain()
┌─InMemoryScan
     output=[col(A#0: INTEGER), col(B#1: FLOAT), col(C#2: LOGICAL), col(D#3: STRING),
          col(E#4: BYTES), col(F#5: DATE), col(G#6: PY[list])]
     cache_id='6eea47a9faa44b2bb50c495246920c7f'
     partitioning=PartitionSpec(scheme=UNKNOWN, num_partitions=1, by=None)
 

Our currently plan says that there is only one operation to be executed, which is an InMemoryScan operation that reads from a set of in-memory data.

To execute all operations on all data in the current DataFrame’s plan, you can use the DataFrame.collect() method.

df.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
3 3.5false c b'c' 1994-01-03 [3, 3, 3]
4 4.5false d b'd' 1994-01-04 [4, 4, 4]
(Showing first 4 of 4 rows)

DataFrame.collect() is useful because it executes computations on all your data, and shows you a little preview of the materialized results. These results are then kept in memory so that subsequent operations will avoid recomputations.

However, if you only wish to “peek” at your data instead of materializing the entire dataframe (e.g. your dataframe has a million rows, and you only want to view the first 10 without materializing the entire result set in memory), you can use DataFrame.show(N) instead to view the first N rows of your dataframe. This is especially useful when developing interactively on small samples of data.

df.show(2)
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
(Showing first 2 rows)

Sorting Data#

You can sort a dataframe with DataFrame.sort, which we do so here in descending order:

df.sort(df["A"], desc=True).collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
4 4.5false d b'd' 1994-01-04 [4, 4, 4]
3 3.5false c b'c' 1994-01-03 [3, 3, 3]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
(Showing first 4 of 4 rows)

Data Selection#

You can limit the number of rows in a dataframe by calling DataFrame.limit.

df_limited = df.limit(1)
df_limited.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
(Showing first 1 of 1 rows)

To select just a few columns, you can use DataFrame.select:

df_selected = df.select(df["A"], df["B"])
df_selected.collect()
A
INTEGER
B
FLOAT
1 1.5
2 2.5
3 3.5
4 4.5
(Showing first 4 of 4 rows)

Column selection also allows you to rename columns using Expression.alias:

df_renamed = df.select(df["A"].alias("A2"), df["B"])
df_renamed.collect()
A2
INTEGER
B
FLOAT
1 1.5
2 2.5
3 3.5
4 4.5
(Showing first 4 of 4 rows)

To drop columns from the dataframe, call DataFrame.exclude:

df_excluded = df.exclude("A")
df_excluded.collect()
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
1.5true a b'a' 1994-01-01 [1, 1, 1]
2.5true b b'b' 1994-01-02 [2, 2, 2]
3.5false c b'c' 1994-01-03 [3, 3, 3]
4.5false d b'd' 1994-01-04 [4, 4, 4]
(Showing first 4 of 4 rows)

Expressions#

See: Expressions

Expressions are an API for defining computation that needs to happen over your columns.

For example, to create a new column that is just the column A incremented by 1:

df_A_plus1 = df.with_column("A_plus1", df["A"] + 1)  # does not run any computation
df_A_plus1.collect()  # materializes the new DataFrame, which includes the new column "A_plus1"
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
A_plus1
INTEGER
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 2
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 3
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 4
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 5
(Showing first 4 of 4 rows)

Method Accessors#

Some Expression methods are only allowed on certain types and are accessible through “method accessors” such as the Expression.str accessor (see: Expression Accessor Properties).

For example, the .str.length() expression is only valid when run on a STRING column:

df_E_length = df.with_column("D_length", df["D"].str.length())
df_E_length.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
D_length
INTEGER
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 1
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 1
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 1
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 1
(Showing first 4 of 4 rows)

Another example of a useful method accessor is the .url accessor. You can use .url.download() to download data from a column of URLs like so:

image_url_df = DataFrame.from_pydict({
    "urls": [
        "http://farm9.staticflickr.com/8186/8119368305_4e622c8349_z.jpg",
        "http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg",
        "http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_z.jpg",
    ],
})
image_downloaded_df = image_url_df.with_column("image_bytes", image_url_df["urls"].url.download())
image_downloaded_df.collect()
urls
STRING
image_bytes
BYTES
http://farm9.staticflickr.com/8186/8119368305_4e622c8349_...b'\xff\xd8\xff\xe1\x00TExif\x00\x00MM\x00*\x00\x00\x00\x0...
http://farm1.staticflickr.com/1/127244861_ab0c0381e7_z.jpg b'\xff\xd8\xff\xe1\x00(Exif\x00\x00MM\x00*\x00\x00\x00\x0...
http://farm3.staticflickr.com/2169/2118578392_1193aa04a0_...b'\xff\xd8\xff\xe1\x00\x16Exif\x00\x00MM\x00*\x00\x00\x00...
(Showing first 3 of 3 rows)

For a full list of all Expression methods and operators, see: Expressions API Docs

Operations on PY columns#

PY columns contain Python objects and operations called on these columns will be mapped on each object as well.

To work with such columns, Daft provides a few useful Expression methods.

For example, to repeat each list in column G 3 times, we can use the Python list’s native Python * operator:

df_G_extend_0 = df.with_column("G_repeat", df["G"] * 3)
df_G_extend_0.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
G_repeat
PY[object]
1 1.5true a b'a' 1994-01-01 [1, 1, 1] [1, 1, 1, 1, 1, 1, 1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2] [2, 2, 2, 2, 2, 2, 2, 2, 2]
3 3.5false c b'c' 1994-01-03 [3, 3, 3] [3, 3, 3, 3, 3, 3, 3, 3, 3]
4 4.5false d b'd' 1994-01-04 [4, 4, 4] [4, 4, 4, 4, 4, 4, 4, 4, 4]
(Showing first 4 of 4 rows)

To call a method on each list in column G, we can use the .as_py method. For example, here we use the Python list’s .count() method to count the number of occurences of the integer in column A:

df_G_count_A = df.with_column("G_count_A", df["G"].as_py(list).count(df["A"]))
df_G_count_A.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
G_count_A
PY[object]
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 3
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 3
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 3
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 3
(Showing first 4 of 4 rows)

For more complicated functions, you can use .apply(f) to call a function f on every object in the column. For example, here we construct a Numpy array for every list in column G.

Note

It is good practice to supply Daft with the return type of your function.

You can do this using the return_type= keyword argument in .apply, or by correctly type-annotating your function like in the list_to_numpy function below.

Annotating the return type of your function correctly lets Daft effectively optimize your data and operations under the hood. For example, in this case we specify return_type=np.ndarray which tells Daft that each row in this column contains a Numpy array object.

import numpy as np

def list_to_numpy(l: list) -> np.ndarray:
    return np.array(l)

df_G_to_numpy = df.with_column("G_to_numpy", df["G"].apply(list_to_numpy))
df_G_to_numpy.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
G_to_numpy
PY[ndarray]
1 1.5true a b'a' 1994-01-01 [1, 1, 1] <np.ndarray
shape=(3,)
dtype=int64>
2 2.5true b b'b' 1994-01-02 [2, 2, 2] <np.ndarray
shape=(3,)
dtype=int64>
3 3.5false c b'c' 1994-01-03 [3, 3, 3] <np.ndarray
shape=(3,)
dtype=int64>
4 4.5false d b'd' 1994-01-04 [4, 4, 4] <np.ndarray
shape=(3,)
dtype=int64>
(Showing first 4 of 4 rows)

Iterable types such as a PY[list] column can be exploded with DataFrame.explode, splitting each list into a row of its own and repeating the other columns:

df_G_exploded = df.explode(df["G"])
df_G_exploded.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[object]
1 1.5true a b'a' 1994-01-01 1
1 1.5true a b'a' 1994-01-01 1
1 1.5true a b'a' 1994-01-01 1
2 2.5true b b'b' 1994-01-02 2
2 2.5true b b'b' 1994-01-02 2
2 2.5true b b'b' 1994-01-02 2
3 3.5false c b'c' 1994-01-03 3
3 3.5false c b'c' 1994-01-03 3
3 3.5false c b'c' 1994-01-03 3
4 4.5false d b'd' 1994-01-04 4
(Showing first 10 of 12 rows)

User-Defined Functions#

.apply makes it really easy to map a function on a single column, but is limited in 2 main ways:

  1. Only runs on a single column: some algorithms require multiple columns as inputs

  2. Only runs on a single row: some algorithms run much more efficiently when run on a batch of rows instead

To overcome these limitations, you can use User-Defined Functions (UDFs).

See Also: UDF User Guide

import datetime
from daft import polars_udf
import polars as pl

@polars_udf(return_type=datetime.date)
def add_days(f_date_data: pl.Series, a_days_data: pl.Series):
    return f_date_data + pl.duration(days=a_days_data)

df.with_column("F_add_A_days", add_days(df["F"], df["A"])).collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
F_add_A_days
DATE
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 1994-01-02
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 1994-01-04
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 1994-01-06
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 1994-01-08
(Showing first 4 of 4 rows)

The simple UDF demonstrated above is a “stateless UDF”, and no state is maintained between invocations of the function. In certain use-cases, it can be important to maintain some state with a “stateful UDF”, which you can write using a Class instead of a Function. For example, running machine learning models often requires downloading some trained weights and initializing the model in memory/on a GPU, which an expensive operation and should be cached between UDF invocations.

@polars_udf(return_type=float)
class RunExpensiveModel:

    def __init__(self):
        # Initialize and cache an "expensive" model between invocations of the UDF
        self.model = np.array([1.23, 4.56])
        
    def __call__(self, a_data: pl.Series, b_data: pl.Series):
        return np.matmul(self.model, np.array([a_data.to_numpy(), b_data.to_numpy()]))

df.with_column("expensive_model_results", RunExpensiveModel(df["A"], df["B"])).collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
expensive_model_results
FLOAT
1 1.5true a b'a' 1994-01-01 [1, 1, 1] 8.07
2 2.5true b b'b' 1994-01-02 [2, 2, 2] 13.86
3 3.5false c b'c' 1994-01-03 [3, 3, 3] 19.65
4 4.5false d b'd' 1994-01-04 [4, 4, 4] 25.44
(Showing first 4 of 4 rows)

Filtering Data#

You can filter rows in dataframe using DataFrame.where, which accepts a LOGICAL type Expression as an argument:

# Keep only rows where values in column "A" are less than 3
df_filtered = df.where(df["A"] < 3)
df_filtered.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
(Showing first 2 of 2 rows)

Missing Data#

All columns in Daft are “nullable” by default. Unlike other frameworks such as Pandas, Daft differentiates between “null” (missing) and “nan” (stands for not a number - a special value indicating an invalid float).

missing_data_df = DataFrame.from_pydict({
    "floats": [1.5, None, float("nan")],
})
missing_data_df = missing_data_df \
    .with_column("floats_is_null", missing_data_df["floats"].is_null()) \
    .with_column("floats_is_nan", missing_data_df["floats"].is_nan())

missing_data_df.collect()
floats
FLOAT
floats_is_null
LOGICAL
floats_is_nan
LOGICAL
1.5 false false
None true none
nan false true
(Showing first 3 of 3 rows)

To fill in missing values, a useful Expression is the .if_else expression which can be used to fill in values if the value is null:

missing_data_df = missing_data_df.with_column("filled_in_floats", (missing_data_df["floats"].is_null()).if_else(0.0, missing_data_df["floats"]))
missing_data_df.collect()
floats
FLOAT
floats_is_null
LOGICAL
floats_is_nan
LOGICAL
filled_in_floats
FLOAT
1.5 false false 1.5
None true none 0
nan false true nan
(Showing first 3 of 3 rows)

Merging Dataframes#

DataFrames can be joined with .join. Here is a naive example of a self-join where we join df on itself with column “A” as the join key.

joined_df = df.join(df, on="A")
joined_df.collect()
A
INTEGER
B
FLOAT
C
LOGICAL
D
STRING
E
BYTES
F
DATE
G
PY[list]
1 1.5true a b'a' 1994-01-01 [1, 1, 1]
2 2.5true b b'b' 1994-01-02 [2, 2, 2]
3 3.5false c b'c' 1994-01-03 [3, 3, 3]
4 4.5false d b'd' 1994-01-04 [4, 4, 4]
(Showing first 4 of 4 rows)

Grouping and Aggregations#

Groupby aggregation operations over a dataset happens in 2 phases:

  1. Splitting the data into groups based on some criteria using DataFrame.groupby

  2. Specifying how to aggregate the data for each group using GroupedDataFrame.agg

Let’s take a look at an example:

grouping_df = DataFrame.from_pydict(
    {
        "A": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
        "B": ["a", "a", "b", "c", "b", "b", "a", "c"],
        "C": [i for i in range(8)],
        "D": [i for i in range(8)],
    }
)
grouping_df.collect()
A
STRING
B
STRING
C
INTEGER
D
INTEGER
foo a 0 0
bar a 1 1
foo b 2 2
bar c 3 3
foo b 4 4
bar b 5 5
foo a 6 6
foo c 7 7
(Showing first 8 of 8 rows)

First we group by “A”, so that we will evaluate rows with A=foo and A=bar separately in their respective groups.

grouped_df = grouping_df.groupby(grouping_df["A"])
grouped_df
GroupedDataFrame(df=+----------+----------+-----------+-----------+
| A        | B        |         C |         D |
| STRING   | STRING   |   INTEGER |   INTEGER |
+==========+==========+===========+===========+
| foo      | a        |         0 |         0 |
+----------+----------+-----------+-----------+
| bar      | a        |         1 |         1 |
+----------+----------+-----------+-----------+
| foo      | b        |         2 |         2 |
+----------+----------+-----------+-----------+
| bar      | c        |         3 |         3 |
+----------+----------+-----------+-----------+
| foo      | b        |         4 |         4 |
+----------+----------+-----------+-----------+
| bar      | b        |         5 |         5 |
+----------+----------+-----------+-----------+
| foo      | a        |         6 |         6 |
+----------+----------+-----------+-----------+
| foo      | c        |         7 |         7 |
+----------+----------+-----------+-----------+
(Showing first 8 of 8 rows), group_by=[col(A#23: STRING)])

Now we can specify the aggregations we want to compute over columns C and D. Here we compute the sum over column C, and the mean over column D for each group:

aggregated_df = grouped_df.agg([
    (grouped_df["C"].alias("C_sum"), "sum"),
    (grouped_df["D"].alias("D_mean"), "mean"),
])
aggregated_df.collect()
A
STRING
C_sum
INTEGER
D_mean
INTEGER
bar 9 3
foo 19 3.8
(Showing first 2 of 2 rows)

These operations work as well when run over multiple groupby columns, which will produce one row for each combination of columns that occur in the DataFrame:

grouping_df \
    .groupby(grouping_df["A"], grouping_df["B"]) \
    .agg([
        (grouping_df["C"].alias("C_sum"), "sum"),
        (grouping_df["D"].alias("D_mean"), "mean"),
    ]) \
    .collect()
A
STRING
B
STRING
C_sum
INTEGER
D_mean
INTEGER
foo a 6 3
bar b 5 5
foo c 7 7
bar a 1 1
bar c 3 3
foo b 6 3
(Showing first 6 of 6 rows)

Writing Data#

See: Writing Data

Writing data will execute your DataFrame and write the results out to the specified backend. For example, to write data out to CSV:

# NOTE: Daft does not write PY columns at the moment.
# This is a feature that is on the roadmap as various options for implementation are being designed.
write_df = df.exclude("G")

written_df = write_df.write_csv("my-dataframe.csv")

Note that writing your dataframe is a blocking operation that executes your DataFrame. It will return a new DataFrame that contains the filepaths to the written data:

written_df.collect()
file_path
STRING
my-dataframe.csv/c9da510b-9bef-4d4b-a3db-d40462100b52-0.csv
(Showing first 1 of 1 rows)