Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fail to run ray example NotImplementedError: no registered dataset conversion for <class 'ray.data.dataset.Dataset'>" #871

Closed
lushunn opened this issue Jul 5, 2024 · 6 comments

Comments

@lushunn
Copy link

lushunn commented Jul 5, 2024

my code:

import ray
import logging
ray.init(logging_level=logging.ERROR)
from statsforecast.core import StatsForecast
from statsforecast.models import ( 
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series

n_series = 4
horizon = 7

series = generate_series(n_series)



series = series.reset_index()
series['unique_id'] = series['unique_id'].astype(str)
ctx = ray.data.context.DatasetContext.get_current()
sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D'
)
ctx.use_streaming_executor = False
ray_series = ray.data.from_pandas(series).repartition(4)
sf.forecast(df=ray_series, h=horizon).take(5)

but I just get a error report,which like:
2024-07-05 21:43:33,724 INFO util.py:159 -- Outdated packages: ipywidgets==7.5.1 found, needs ipywidgets>=8 Runpip install -U ipywidgets`, then restart the notebook server for rich notebook output.
c:\Users\1\anaconda3\lib\site-packages\pandas\core\computation\expressions.py:20: UserWarning: Pandas requires version '2.7.3' or newer of 'numexpr' (version '2.7.1' currently installed).
from pandas.core.computation.check import NUMEXPR_INSTALLED
(pid=8808) c:\Users\1\anaconda3\lib\site-packages\pandas\core\computation\expressions.py:20: UserWarning: Pandas requires version '2.7.3' or newer of 'numexpr' (version '2.7.1' currently installed).
(pid=8808) from pandas.core.computation.check import NUMEXPR_INSTALLED
{
"name": "NotImplementedError",
"message": "no registered dataset conversion for <class 'ray.data.dataset.Dataset'>",
"stack": "---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
in
25 ctx.use_streaming_executor = False
26 ray_series = ray.data.from_pandas(series).repartition(4)
---> 27 sf.forecast(df=ray_series, h=horizon).take(5)
28

c:\Users\1\anaconda3\lib\site-packages\statsforecast\core.py in forecast(self, h, df, X_df, level, fitted, sort_df, prediction_intervals, id_col, time_col, target_col)
1635 engine = make_execution_engine(infer_by=[df])
1636 self._backend = make_backend(engine)
-> 1637 return self._backend.forecast(
1638 models=self.models,
1639 fallback_model=self.fallback_model,

c:\Users\1\anaconda3\lib\site-packages\statsforecast\distributed\fugue.py in forecast(self, df, freq, models, fallback_model, X_df, h, level, fitted, prediction_intervals, id_col, time_col, target_col)
344 Or the list of available StatsForecast's models.
345 """
--> 346 self._fcst_schema = self._get_output_schema(
347 df=df,
348 models=models,

c:\Users\1\anaconda3\lib\site-packages\statsforecast\distributed\fugue.py in _get_output_schema(self, df, models, level, mode, id_col, time_col, target_col)
262 target_col,
263 ) -> Schema:
--> 264 keep_schema = fa.get_schema(df).extract([id_col, time_col])
265 cols: List[Any] = []
266 if level is None:

c:\Users\1\anaconda3\lib\site-packages\triad\utils\dispatcher.py in call(self, *args, **kwds)
254 if self._is_broadcast:
255 return list(self.run(*args, **kwds))
--> 256 return self.run_top(*args, **kwds)
257
258 def run(self, *args: Any, **kwargs: Any) -> Iterable[Any]:

c:\Users\1\anaconda3\lib\site-packages\triad\utils\dispatcher.py in run_top(self, *args, **kwargs)
278 :return: the return of the child function
279 """
--> 280 return list(itertools.islice(self.run(*args, **kwargs), 1))[0]
281
282 def register(

c:\Users\1\anaconda3\lib\site-packages\triad\utils\dispatcher.py in run(self, *args, **kwargs)
271 has_return = True
272 if not has_return:
--> 273 yield self._func(*args, **kwargs)
274
275 def run_top(self, *args: Any, **kwargs: Any) -> Any:

c:\Users\1\anaconda3\lib\site-packages\fugue\dataframe\api.py in get_schema(df)
49 How to get schema of any dataframe using Fugue?
50 """
---> 51 return as_fugue_df(df).schema
52
53

c:\Users\1\anaconda3\lib\site-packages\fugue\dataframe\dataframe.py in as_fugue_df(df, **kwargs)
455 :param df: the object to wrap
456 """
--> 457 ds = as_fugue_dataset(df, **kwargs)
458 if isinstance(ds, DataFrame):
459 return ds

c:\Users\1\anaconda3\lib\site-packages\triad\utils\dispatcher.py in call(self, *args, **kwds)
254 if self._is_broadcast:
255 return list(self.run(*args, **kwds))
--> 256 return self.run_top(*args, **kwds)
257
258 def run(self, *args: Any, **kwargs: Any) -> Iterable[Any]:

c:\Users\1\anaconda3\lib\site-packages\triad\utils\dispatcher.py in run_top(self, *args, **kwargs)
278 :return: the return of the child function
279 """
--> 280 return list(itertools.islice(self.run(*args, **kwargs), 1))[0]
281
282 def register(

c:\Users\1\anaconda3\lib\site-packages\triad\utils\dispatcher.py in run(self, *args, **kwargs)
271 has_return = True
272 if not has_return:
--> 273 yield self._func(*args, **kwargs)
274
275 def run_top(self, *args: Any, **kwargs: Any) -> Any:

c:\Users\1\anaconda3\lib\site-packages\fugue\dataset\api.py in as_fugue_dataset(data, **kwargs)
13 if isinstance(data, Dataset) and len(kwargs) == 0:
14 return data
---> 15 raise NotImplementedError(f"no registered dataset conversion for {type(data)}")
16
17

NotImplementedError: no registered dataset conversion for <class 'ray.data.dataset.Dataset'>"
}
`
my package version is statsforecast==1.7.5,I wonder why i can't run the example nomarlly

@lushunn lushunn changed the title fail to run ray example fail to run ray example NotImplementedError: no registered dataset conversion for <class 'ray.data.dataset.Dataset'>" Jul 5, 2024
@jmoralez
Copy link
Member

Hey. Which versions of fugue and ray are you using?

@lushunn
Copy link
Author

lushunn commented Jul 16, 2024

ray=2.7.2 fugue=0.9.1

@jmoralez
Copy link
Member

Sorry I just saw you're using windows, is that for debugging or do you want to use ray locally? On a single machine it should be faster to use a regular pandas/polars dataframe with n_jobs>1.

I'm not able to reproduce the issue, also one of our recent ubuntu CI runs used those versions (fugue, ray) and ran successfully.

@jmoralez
Copy link
Member

Hey, sorry I just realized we pin ray

ray_requirements = fugue[ray]>=0.8.1 protobuf>=3.15.3,<4.0.0 ray<2.8

so we actually use 2.7.2 during the CI. You should get that if you install the ray extra pip install "statsforecast[ray]"

@lushunn
Copy link
Author

lushunn commented Jul 18, 2024

Thank you very much for your answer!

@lushunn
Copy link
Author

lushunn commented Jul 18, 2024

I tested it, pandas with n_jobs>1 is much faster on a standalone machine

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants