Daft

Daft is a distributed query engine written in Python and Rust, two fast-growing ecosystems in the data engineering and machine learning industry.

It exposes its flavor of the familiar Python DataFrame API which is a common abstraction over querying tables of data in the Python data ecosystem.

Daft DataFrames are a powerful interface to power use-cases across ML/AI training, batch inference, feature engineering and traditional analytics. Daft’s tight integration with Iceberg unlocks novel capabilities for both traditional analytics and Pythonic ML workloads on your data catalog.

Enabling Iceberg support in Daft

PyIceberg supports reading of Iceberg tables into Daft DataFrames.

To use Iceberg with Daft, ensure that the PyIceberg library is also installed in your current Python environment.

  1. pip install getdaft pyiceberg

Querying Iceberg using Daft

Daft interacts natively with PyIceberg to read Iceberg tables.

Reading Iceberg tables

Setup Steps

To follow along with this code, first create an Iceberg table following the Spark Quickstart tutorial. PyIceberg must then be correctly configured by ensuring that the ~/.pyiceberg.yaml file contains an appropriate catalog entry:

  1. catalog:
  2. default:
  3. # URL to the Iceberg REST server Docker container
  4. uri: http://localhost:8181
  5. # URL and credentials for the MinIO Docker container
  6. s3.endpoint: http://localhost:9000
  7. s3.access-key-id: admin
  8. s3.secret-access-key: password

Here is how the Iceberg table demo.nyc.taxis can be loaded into Daft:

  1. import daft
  2. from pyiceberg.catalog import load_catalog
  3. # Configure Daft to use the local MinIO Docker container for any S3 operations
  4. daft.set_planning_config(
  5. default_io_config=daft.io.IOConfig(
  6. s3=daft.io.S3Config(endpoint_url="http://localhost:9000"),
  7. )
  8. )
  9. # Load a PyIceberg table into Daft, and show the first few rows
  10. table = load_catalog("default").load_table("nyc.taxis")
  11. df = daft.read_iceberg(table)
  12. df.show()
  1. ╭───────────┬─────────┬───────────────┬─────────────┬────────────────────╮
  2. vendor_id trip_id trip_distance fare_amount store_and_fwd_flag
  3. --- --- --- --- ---
  4. Int64 Int64 Float32 Float64 Utf8
  5. ╞═══════════╪═════════╪═══════════════╪═════════════╪════════════════════╡
  6. 1 1000371 1.8 15.32 N
  7. ├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
  8. 1 1000374 8.4 42.13 Y
  9. ├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
  10. 2 1000372 2.5 22.15 N
  11. ├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
  12. 2 1000373 0.9 9.01 N
  13. ╰───────────┴─────────┴───────────────┴─────────────┴────────────────────╯
  14. (Showing first 4 of 4 rows)

Note that the operation above will produce a warning from PyIceberg that “no partition filter was specified” and that “this will result in a full table scan”. Any filter operations on the Daft dataframe, df, will push down the filters, correctly account for hidden partitioning, and utilize table statistics to inform query planning for efficient reads.

Let’s try the above query again, but this time with a filter applied on the table’s partition column "vendor_id" which Daft will correctly use to elide a full table scan.

  1. df = df.where(df["vendor_id"] > 1)
  2. df.show()
  1. ╭───────────┬─────────┬───────────────┬─────────────┬────────────────────╮
  2. vendor_id trip_id trip_distance fare_amount store_and_fwd_flag
  3. --- --- --- --- ---
  4. Int64 Int64 Float32 Float64 Utf8
  5. ╞═══════════╪═════════╪═══════════════╪═════════════╪════════════════════╡
  6. 2 1000372 2.5 22.15 N
  7. ├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
  8. 2 1000373 0.9 9.01 N
  9. ╰───────────┴─────────┴───────────────┴─────────────┴────────────────────╯
  10. (Showing first 2 of 2 rows)

Type compatibility

Daft and Iceberg have compatible type systems. Here are how types are converted across the two systems.

Iceberg Daft
Primitive Types
boolean daft.DataType.bool()
int daft.DataType.int32()
long daft.DataType.int64()
float daft.DataType.float32()
double daft.DataType.float64()
decimal(precision, scale) daft.DataType.decimal128(precision, scale)
date daft.DataType.date()
time daft.DataType.time(timeunit="us")
timestamp daft.DataType.timestamp(timeunit="us", timezone=None)
timestampz daft.DataType.timestamp(timeunit="us", timezone="UTC")
string daft.DataType.string()
uuid daft.DataType.binary()
fixed(L) daft.DataType.binary()
binary daft.DataType.binary()
Nested Types
struct(**fields) daft.DataType.struct(**fields)
list(child_type) daft.DataType.list(child_type)
map(K, V) daft.DataType.map(K, V)