From 431f802ac59d368a41c2cc74ba20b797794f4023 Mon Sep 17 00:00:00 2001 From: Jeremy Dyer Date: Tue, 21 Feb 2023 13:28:06 -0500 Subject: [PATCH] Add support for cudf as a physical execution engine --- Cargo.lock | 20 ++++----- conda/environments/datafusion-dev.yaml | 5 ++- datafusion/cudf.py | 62 ++++++++++++++++++++++++++ examples/sql-on-cudf.py | 26 +++++++++++ 4 files changed, 102 insertions(+), 11 deletions(-) create mode 100644 datafusion/cudf.py create mode 100644 examples/sql-on-cudf.py diff --git a/Cargo.lock b/Cargo.lock index 5059afaac..04a2ea8d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1134,9 +1134,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes", "fnv", @@ -1385,9 +1385,9 @@ checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" [[package]] name = "libflate" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093" +checksum = "97822bf791bd4d5b403713886a5fbe8bf49520fe78e323b0dc480ca1a03e50b0" dependencies = [ "adler32", "crc32fast", @@ -1396,9 +1396,9 @@ dependencies = [ [[package]] name = "libflate_lz77" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a" +checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf" dependencies = [ "rle-decode-fast", ] @@ -2359,9 +2359,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.7" +version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef" +checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d" dependencies = [ "autocfg", ] @@ -2635,9 +2635,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce" +checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313" dependencies = [ "futures-core", "pin-project-lite", diff --git a/conda/environments/datafusion-dev.yaml b/conda/environments/datafusion-dev.yaml index 0e17e1699..d9405e4fe 100644 --- a/conda/environments/datafusion-dev.yaml +++ b/conda/environments/datafusion-dev.yaml @@ -28,7 +28,7 @@ dependencies: - pytest - toml - importlib_metadata -- python>=3.7,<3.11 +- python>=3.10 # Packages useful for building distributions and releasing - mamba - conda-build @@ -38,4 +38,7 @@ dependencies: - pydata-sphinx-theme==0.8.0 - myst-parser - jinja2 +# GPU packages +- cudf +- cudatoolkit=11.8 name: datafusion-dev diff --git a/datafusion/cudf.py b/datafusion/cudf.py new file mode 100644 index 000000000..c38819c62 --- /dev/null +++ b/datafusion/cudf.py @@ -0,0 +1,62 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import cudf +import datafusion +from datafusion.expr import Projection, TableScan, Column + + +class SessionContext: + def __init__(self): + self.datafusion_ctx = datafusion.SessionContext() + self.parquet_tables = {} + + def register_parquet(self, name, path): + self.parquet_tables[name] = path + self.datafusion_ctx.register_parquet(name, path) + + def to_cudf_expr(self, expr): + + # get Python wrapper for logical expression + expr = expr.to_variant() + + if isinstance(expr, Column): + return expr.name() + else: + raise Exception("unsupported expression: {}".format(expr)) + + def to_cudf_df(self, plan): + # recurse down first to translate inputs into pandas data frames + inputs = [self.to_cudf_df(x) for x in plan.inputs()] + + # get Python wrapper for logical operator node + node = plan.to_variant() + + if isinstance(node, Projection): + args = [self.to_cudf_expr(expr) for expr in node.projections()] + return inputs[0][args] + elif isinstance(node, TableScan): + return cudf.read_parquet(self.parquet_tables[node.table_name()]) + else: + raise Exception( + "unsupported logical operator: {}".format(type(node)) + ) + + def sql(self, sql): + datafusion_df = self.datafusion_ctx.sql(sql) + plan = datafusion_df.logical_plan() + return self.to_cudf_df(plan) diff --git a/examples/sql-on-cudf.py b/examples/sql-on-cudf.py new file mode 100644 index 000000000..407cb1f00 --- /dev/null +++ b/examples/sql-on-cudf.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion.cudf import SessionContext + + +ctx = SessionContext() +ctx.register_parquet( + "taxi", "/home/jeremy/Downloads/yellow_tripdata_2021-01.parquet" +) +df = ctx.sql("select passenger_count from taxi") +print(df)