Skip to content

Commit

Permalink
compat: Spatialpandas with dask-expr (#1405)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoxbro authored Feb 17, 2025
1 parent 6a5dbb3 commit f6dea71
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 241 deletions.
87 changes: 9 additions & 78 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import datashader.utils as du

import pytest
from datashader.tests.utils import dask_switcher
from datashader.tests.test_pandas import _pandas

try:
Expand All @@ -34,39 +33,26 @@
pytestmark = pytest.importorskip("dask")



@dask_switcher(query=False)
def _dask():
return dd.from_pandas(_pandas(), npartitions=2)

@dask_switcher(query=True)
def _dask_expr():
return dd.from_pandas(_pandas(), npartitions=2)

@dask_switcher(query=False, extras=["dask_cudf"])
def _dask_cudf():
import dask_cudf

_dask = dd.from_pandas(_pandas(), npartitions=2)
if Version(dask_cudf.__version__) >= Version("24.06"):
return _dask.to_backend("cudf")
else:
return dask_cudf.from_dask_dataframe(_dask)

@dask_switcher(query=True, extras=["dask_cudf"])
def _dask_expr_cudf():
import dask_cudf
if Version(dask_cudf.__version__) < Version("24.06"):
pytest.skip("dask-expr requires dask-cudf 24.06 or later")
_dask = dd.from_pandas(_pandas(), npartitions=2)
return _dask.to_backend("cudf")

_backends = [
pytest.param(_dask, id="dask"),
pytest.param(_dask_expr, id="dask-expr"),
pytest.param(_dask_cudf, marks=pytest.mark.gpu, id="dask-cudf"),
pytest.param(_dask_expr_cudf, marks=pytest.mark.gpu, id="dask-expr-cudf"),
]


@pytest.fixture(params=_backends)
def ddf(request):
return request.param()
Expand All @@ -76,7 +62,7 @@ def ddf(request):
def npartitions(request):
return request.param

@dask_switcher(query=False)

def _dask_DataFrame(*args, **kwargs):
if kwargs.pop("geo", False):
df = sp.GeoDataFrame(*args, **kwargs)
Expand All @@ -85,53 +71,21 @@ def _dask_DataFrame(*args, **kwargs):
return dd.from_pandas(df, npartitions=2)


@dask_switcher(query=True)
def _dask_expr_DataFrame(*args, **kwargs):
if kwargs.pop("geo", False):
pytest.skip("dask-expr currently does not work with spatialpandas")
# df = sp.GeoDataFrame(*args, **kwargs)
else:
df = pd.DataFrame(*args, **kwargs)
return dd.from_pandas(df, npartitions=2)


@dask_switcher(query=False, extras=["dask_cudf"])
def _dask_cudf_DataFrame(*args, **kwargs):
import cudf
import dask_cudf
if kwargs.pop("geo", False):
# As of dask-cudf version 24.06, dask-cudf is not
# compatible with spatialpandas version 0.4.10
pytest.skip("dask-cudf currently does not work with spatialpandas")
cdf = cudf.DataFrame.from_pandas(
pd.DataFrame(*args, **kwargs), nan_as_null=False
)
return dask_cudf.from_cudf(cdf, npartitions=2)


@dask_switcher(query=True, extras=["dask_cudf"])
def _dask_expr_cudf_DataFrame(*args, **kwargs):
import cudf
import dask_cudf

if Version(dask_cudf.__version__) < Version("24.06"):
pytest.skip("dask-expr requires dask-cudf 24.06 or later")

if kwargs.pop("geo", False):
# As of dask-cudf version 24.06, dask-cudf is not
# compatible with spatialpandas version 0.4.10
pytest.skip("dask-cudf currently does not work with spatialpandas")
cdf = cudf.DataFrame.from_pandas(
pd.DataFrame(*args, **kwargs), nan_as_null=False
)
cdf = cudf.DataFrame.from_pandas(pd.DataFrame(*args, **kwargs), nan_as_null=False)
return dask_cudf.from_cudf(cdf, npartitions=2)


_backends = [
pytest.param(_dask_DataFrame, id="dask"),
pytest.param(_dask_expr_DataFrame, id="dask-expr"),
pytest.param(_dask_cudf_DataFrame, marks=pytest.mark.gpu, id="dask-cudf"),
pytest.param(_dask_expr_cudf_DataFrame, marks=pytest.mark.gpu, id="dask-expr-cudf"),
]

@pytest.fixture(params=_backends)
Expand Down Expand Up @@ -163,25 +117,6 @@ def floats(n):
n = n + np.spacing(n)


@pytest.mark.gpu
def test_check_query_setting():
import os
from subprocess import check_output, SubprocessError

# dask-cudf does not support query planning as of 24.04.
# So we check that it is not set outside of Python.
assert os.environ.get('DASK_DATAFRAME__QUERY_PLANNING', 'false').lower() != 'true'

# This also have problem with the global setting so we check
try:
cmd = ['dask', 'config', 'get', 'dataframe.query-planning']
output = check_output(cmd, text=True).strip().lower()
assert output != 'true'
except SubprocessError:
# Newer version will error out if not set
pass


def test_count(ddf, npartitions):
ddf = ddf.repartition(npartitions=npartitions)
assert ddf.npartitions == npartitions
Expand Down Expand Up @@ -1236,7 +1171,6 @@ def test_log_axis_points(ddf):


@pytest.mark.skipif(not sp, reason="spatialpandas not installed")
@dask_switcher(query=False, extras=["spatialpandas.dask"])
def test_points_geometry():
axis = ds.core.LinearAxis()
lincoords = axis.compute_index(axis.compute_scale_and_translate((0., 2.), 3), 3)
Expand All @@ -1257,7 +1191,6 @@ def test_points_geometry():
assert_eq_xr(agg, out)


@dask_switcher(query=False, extras=["spatialpandas.dask"])
def test_line(DataFrame):
axis = ds.core.LinearAxis()
lincoords = axis.compute_index(axis.compute_scale_and_translate((-3., 3.), 7), 7)
Expand Down Expand Up @@ -1339,7 +1272,6 @@ def test_line(DataFrame):
}, dtype='Line[int64]'), dict(geometry='geom'))
)

@dask_switcher(query=False, extras=["spatialpandas.dask"])
@pytest.mark.parametrize('df_kwargs,cvs_kwargs', line_manual_range_params[5:7])
def test_line_manual_range(DataFrame, df_kwargs, cvs_kwargs, request):
if "cudf" in request.node.name:
Expand Down Expand Up @@ -1452,7 +1384,6 @@ def test_line_manual_range(DataFrame, df_kwargs, cvs_kwargs, request):
}, dtype='Line[int64]'), dict(geometry='geom'))
)

@dask_switcher(query=False, extras=["spatialpandas.dask"])
@pytest.mark.parametrize('df_kwargs,cvs_kwargs', line_autorange_params)
def test_line_autorange(DataFrame, df_kwargs, cvs_kwargs, request):
if "cudf" in request.node.name:
Expand Down Expand Up @@ -1621,7 +1552,7 @@ def test_auto_range_line(DataFrame):
}, dtype='Ragged[float32]'), dict(x='x', y='y', axis=1))
])
def test_area_to_zero_fixedrange(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame == _dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1713,7 +1644,7 @@ def test_area_to_zero_fixedrange(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', axis=1))
])
def test_area_to_zero_autorange(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame ==_dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1790,7 +1721,7 @@ def test_area_to_zero_autorange(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', axis=1))
])
def test_area_to_zero_autorange_gap(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame ==_dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1893,7 +1824,7 @@ def test_area_to_zero_autorange_gap(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', y_stack='y_stack', axis=1))
])
def test_area_to_line_autorange(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame == _dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down Expand Up @@ -1980,7 +1911,7 @@ def test_area_to_line_autorange(DataFrame, df_kwargs, cvs_kwargs):
}, dtype='Ragged[float32]'), dict(x='x', y='y', y_stack='y_stack', axis=1))
])
def test_area_to_line_autorange_gap(DataFrame, df_kwargs, cvs_kwargs):
if DataFrame in (_dask_cudf_DataFrame, _dask_expr_cudf_DataFrame):
if DataFrame == _dask_cudf_DataFrame:
if df_kwargs.get('dtype', '').startswith('Ragged'):
pytest.skip("Ragged array not supported with cudf")

Expand Down
44 changes: 6 additions & 38 deletions datashader/tests/test_geopandas.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,16 @@
# Testing GeoPandas and SpatialPandas
import contextlib

import datashader as ds
from datashader.tests.test_pandas import assert_eq_ndarray
import numpy as np
from numpy import nan
import pytest
from datashader.tests.utils import dask_switcher
from packaging.version import Version

try:
import dask.dataframe as dd
except ImportError:
dd = None

_backends = [
pytest.param(False, id="dask"),
]

_extras = ["spatialpandas.dask", "dask_geopandas.backends", "dask_geopandas"]

with contextlib.suppress(ImportError):
import dask_geopandas

if Version(dask_geopandas.__version__) >= Version("0.4.0"):
_backends.append(pytest.param(True, id="dask-expr"))


@pytest.fixture(params=_backends)
def dask_both(request):
with dask_switcher(query=request.param, extras=_extras): ...
return request.param

@pytest.fixture
def dask_classic(request):
with dask_switcher(query=False, extras=_extras): ...

try:
import dask_geopandas
Expand Down Expand Up @@ -129,14 +105,6 @@ def dask_classic(request):
])


@pytest.mark.skipif(not dask_geopandas, reason="dask_geopandas not installed")
def test_dask_geopandas_switcher(dask_both):
import dask_geopandas
if dask_both:
assert dask_geopandas.expr.GeoDataFrame == dask_geopandas.GeoDataFrame
else:
assert dask_geopandas.core.GeoDataFrame == dask_geopandas.GeoDataFrame


@pytest.mark.skipif(not geodatasets, reason="geodatasets not installed")
@pytest.mark.skipif(not geopandas, reason="geopandas not installed")
Expand Down Expand Up @@ -177,7 +145,7 @@ def test_lines_geopandas(geom_type, explode, use_boundary):
("linestring", True, True),
],
)
def test_lines_dask_geopandas(geom_type, explode, use_boundary, npartitions, dask_both):
def test_lines_dask_geopandas(geom_type, explode, use_boundary, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df)) # Extra column for aggregation.
geometry = "boundary" if use_boundary else "geometry"
Expand Down Expand Up @@ -209,7 +177,7 @@ def test_lines_dask_geopandas(geom_type, explode, use_boundary, npartitions, das
("linestring", True, True),
],
)
def test_lines_spatialpandas(geom_type, explode, use_boundary, npartitions, dask_classic):
def test_lines_spatialpandas(geom_type, explode, use_boundary, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df)) # Extra column for aggregation.
geometry = "boundary" if use_boundary else "geometry"
Expand Down Expand Up @@ -252,7 +220,7 @@ def test_points_geopandas(geom_type):
@pytest.mark.skipif(not geopandas, reason="geopandas not installed")
@pytest.mark.parametrize('npartitions', [1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipoint", "point"])
def test_points_dask_geopandas(geom_type, npartitions, dask_both):
def test_points_dask_geopandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))

df["geometry"] = df["geometry"].sample_points(100, rng=93814) # multipoint
Expand All @@ -274,7 +242,7 @@ def test_points_dask_geopandas(geom_type, npartitions, dask_both):
@pytest.mark.skipif(not spatialpandas, reason="spatialpandas not installed")
@pytest.mark.parametrize('npartitions', [0, 1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipoint", "point"])
def test_points_spatialpandas(geom_type, npartitions, dask_classic):
def test_points_spatialpandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))

df["geometry"] = df["geometry"].sample_points(100, rng=93814) # multipoint
Expand Down Expand Up @@ -315,7 +283,7 @@ def test_polygons_geopandas(geom_type):
@pytest.mark.skipif(not geopandas, reason="geopandas not installed")
@pytest.mark.parametrize('npartitions', [1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipolygon", "polygon"])
def test_polygons_dask_geopandas(geom_type, npartitions, dask_both):
def test_polygons_dask_geopandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df))

Expand All @@ -338,7 +306,7 @@ def test_polygons_dask_geopandas(geom_type, npartitions, dask_both):
@pytest.mark.skipif(not spatialpandas, reason="spatialpandas not installed")
@pytest.mark.parametrize('npartitions', [0, 1, 2, 5])
@pytest.mark.parametrize("geom_type", ["multipolygon", "polygon"])
def test_polygons_spatialpandas(geom_type, npartitions, dask_classic):
def test_polygons_spatialpandas(geom_type, npartitions):
df = geopandas.read_file(geodatasets.get_path("nybb"))
df["col"] = np.arange(len(df))

Expand Down
4 changes: 0 additions & 4 deletions datashader/tests/test_polygons.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,12 @@
import xarray as xr
import datashader as ds
from datashader.tests.test_pandas import assert_eq_ndarray, assert_eq_xr
from datashader.tests.utils import dask_switcher

try:
import dask.dataframe as dd
except ImportError:
dd = None

@pytest.fixture(autouse=True)
def _classic_dd():
with dask_switcher(query=False, extras=["spatialpandas.dask"]): ...

try:
# Import to register extension arrays
Expand Down
Loading

0 comments on commit f6dea71

Please sign in to comment.