Skip to content

Commit

Permalink
Initialize dtypes, index_value, and columns_value for `BaseData…
Browse files Browse the repository at this point in the history
…FrameChunkData`
  • Loading branch information
zhongchun committed Apr 24, 2023
1 parent bfb2126 commit 132bbcc
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions mars/services/task/analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,15 @@ def _gen_subtask_info(
)
for i, fetch_chunk in zip(build_fetch_index_to_chunks, fetch_chunks):
inp_chunks[i] = fetch_chunk
copied_op = chunk.op.copy()
copied_op._key = chunk.op.key
for out_chunk in chunk.op.outputs:
# Note: `dtypes`, `index_value`, and `columns_value` are lazily
# initialized, so we should initialize them here.
if hasattr(out_chunk, "dtypes"):
out_chunk.dtypes
if hasattr(out_chunk, "index_value"):
out_chunk.index_value
if hasattr(out_chunk, "columns_value"):
out_chunk.columns_value
processed.add(out_chunk)
chunk_graph.add_node(out_chunk)
if out_chunk in self._final_result_chunks_set:
Expand Down Expand Up @@ -381,10 +387,9 @@ def _gen_map_reduce_info(
# record analyzer map reduce id for mapper op
# copied chunk exists because map chunk must have
# been processed before shuffle proxy
copied_map_chunk = self._chunk_to_copied[map_chunk]
if not hasattr(copied_map_chunk, "extra_params"): # pragma: no cover
copied_map_chunk.extra_params = dict()
copied_map_chunk.extra_params["analyzer_map_reduce_id"] = map_reduce_id
if not hasattr(map_chunk, "extra_params"): # pragma: no cover
map_chunk.extra_params = dict()
map_chunk.extra_params["analyzer_map_reduce_id"] = map_reduce_id
reducer_bands = [assign_results[r.outputs[0]] for r in reducer_ops]
map_reduce_info = MapReduceInfo(
map_reduce_id=map_reduce_id,
Expand Down

0 comments on commit 132bbcc

Please sign in to comment.