Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply Operand Closure clean up #3205

Merged
merged 16 commits into from
Aug 24, 2022
Merged
7 changes: 5 additions & 2 deletions mars/core/graph/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,13 @@ def logic_key(self):
if not hasattr(self, "_logic_key") or self._logic_key is None:
token_keys = []
for node in self.bfs():
logic_key = node.op.get_logic_key()
if hasattr(node.op, "logic_key") and node.op.logic_key is None:
node.op.logic_key = logic_key
token_keys.append(
tokenize(node.op.get_logic_key(), **node.extra_params)
tokenize(logic_key, **node.extra_params)
if node.extra_params
else node.op.get_logic_key()
else logic_key
)
self._logic_key = tokenize(*token_keys)
return self._logic_key
Expand Down
15 changes: 15 additions & 0 deletions mars/core/graph/tests/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ def test_tileable_graph_logic_key():
graph8 = df1.apply(lambda x: x.max() - x.min()).build_graph(tile=False)
graph9 = df2.apply(lambda x: x.max() - x.min()).build_graph(tile=False)
assert graph8.logic_key != graph9.logic_key
assert (
graph8.result_tileables[0].op.logic_key
== graph9.result_tileables[0].op.logic_key
)
pieces1 = [df1[:3], df1[3:7], df1[7:]]
graph10 = md.concat(pieces1).build_graph(tile=False)
pieces2 = [df2[:3], df2[3:7], df2[7:]]
Expand All @@ -195,3 +199,14 @@ def test_tileable_graph_logic_key():
graph14 = df2.groupby("A").sum().build_graph(tile=False)
graph15 = df3.groupby("A").sum().build_graph(tile=False)
assert graph14.logic_key != graph15.logic_key
graph16 = (
df2.groupby("A").apply(lambda x: x.max() - x.min()).build_graph(tile=False)
)
graph17 = (
df3.groupby("A").apply(lambda x: x.max() - x.min()).build_graph(tile=False)
)
assert graph16.logic_key != graph17.logic_key
assert (
graph16.result_tileables[0].op.logic_key
== graph17.result_tileables[0].op.logic_key
)
43 changes: 43 additions & 0 deletions mars/dataframe/base/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
make_dtype,
build_empty_df,
build_empty_series,
clean_up_func,
restore_func,
)


Expand Down Expand Up @@ -72,6 +74,11 @@ class ApplyOperand(
_raw = BoolField("raw")
_result_type = StringField("result_type")
_elementwise = BoolField("elementwise")
_logic_key = StringField("logic_key")
# func_key may be string or ObjectRef, while ray.ObjectRef
# shall be serialized by Ray rather than Mars
_func_key = AnyField("func_key")
_need_clean_up_func = BoolField("need_clean_up_func")
_args = TupleField("args")
_kwds = DictField("kwds")

Expand All @@ -86,6 +93,9 @@ def __init__(
kwds=None,
output_type=None,
elementwise=None,
logic_key=None,
func_key=None,
need_clean_up_func=False,
**kw,
):
if output_type:
Expand All @@ -99,6 +109,9 @@ def __init__(
_args=args,
_kwds=kwds,
_elementwise=elementwise,
_logic_key=logic_key,
_func_key=func_key,
_need_clean_up_func=need_clean_up_func,
**kw,
)

Expand All @@ -113,6 +126,10 @@ def _update_key(self):
def func(self):
return self._func

@func.setter
def func(self, func):
self._func = func

@property
def axis(self):
return self._axis
Expand All @@ -133,6 +150,30 @@ def result_type(self):
def elementwise(self):
return self._elementwise

@property
def logic_key(self):
return self._logic_key

@logic_key.setter
def logic_key(self, logic_key):
self._logic_key = logic_key

@property
def func_key(self):
return self._func_key

@func_key.setter
def func_key(self, func_key):
self._func_key = func_key

@property
def need_clean_up_func(self):
return self._need_clean_up_func

@need_clean_up_func.setter
def need_clean_up_func(self, need_clean_up_func: bool):
self._need_clean_up_func = need_clean_up_func

@property
def args(self):
return getattr(self, "_args", None) or ()
Expand All @@ -145,6 +186,7 @@ def kwds(self):
@redirect_custom_log
@enter_current_session
def execute(cls, ctx, op):
restore_func(ctx, op)
input_data = ctx[op.inputs[0].key]
out = op.outputs[0]
if len(input_data) == 0:
Expand Down Expand Up @@ -287,6 +329,7 @@ def _tile_series(cls, op):

@classmethod
def tile(cls, op):
clean_up_func(op)
if op.inputs[0].ndim == 2:
return (yield from cls._tile_df(op))
else:
Expand Down
34 changes: 34 additions & 0 deletions mars/dataframe/base/tests/test_base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,23 @@ def test_data_frame_apply_execute(setup):
options.chunk_store_limit = old_chunk_store_limit


def test_data_frame_apply_closure_execute(setup):
cols = [chr(ord("A") + i) for i in range(10)]
df_raw = pd.DataFrame(dict((c, [i**2 for i in range(20)]) for c in cols))
df = from_pandas_df(df_raw, chunk_size=5)

x = pd.Series([i for i in range(10**4)])
y = pd.Series([i for i in range(10**4)])

def closure(z):
return pd.concat([x, y], ignore_index=True)

r = df.apply(closure, axis=1)
result = r.execute().fetch()
expected = df_raw.apply(closure, axis=1)
pd.testing.assert_frame_equal(result, expected)
vcfgv marked this conversation as resolved.
Show resolved Hide resolved


def test_series_apply_execute(setup):
idxes = [chr(ord("A") + i) for i in range(20)]
s_raw = pd.Series([i**2 for i in range(20)], index=idxes)
Expand Down Expand Up @@ -423,6 +440,23 @@ def test_series_apply_execute(setup):
pd.testing.assert_frame_equal(result, expected)


def test_series_apply_closure_execute(setup):
idxes = [chr(ord("A") + i) for i in range(20)]
s_raw = pd.Series([i**2 for i in range(20)], index=idxes)

series = from_pandas_series(s_raw, chunk_size=5)

x, y = 1, 2

def closure(z):
return [z + x, z + y]

r = series.apply(closure, convert_dtype=False)
result = r.execute().fetch()
expected = s_raw.apply(closure, convert_dtype=False)
pd.testing.assert_series_equal(result, expected)
vcfgv marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.skipif(pa is None, reason="pyarrow not installed")
def test_apply_with_arrow_dtype_execution(setup):
df1 = pd.DataFrame({"a": [1, 2, 1], "b": ["a", "b", "a"]})
Expand Down
9 changes: 9 additions & 0 deletions mars/dataframe/groupby/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
from ...core.context import get_context
from ...core.custom_log import redirect_custom_log
from ...serialization.serializables import (
AnyField,
BoolField,
TupleField,
DictField,
FunctionField,
StringField,
)
from ...core.operand import OperatorLogicKeyGeneratorMixin
from ...utils import enter_current_session, quiet_stdio, get_func_token, tokenize
Expand All @@ -36,6 +38,8 @@
validate_output_types,
make_dtypes,
make_dtype,
clean_up_func,
restore_func,
)


Expand All @@ -58,6 +62,9 @@ class GroupByApply(
args = TupleField("args", default_factory=tuple)
kwds = DictField("kwds", default_factory=dict)
maybe_agg = BoolField("maybe_agg", default=None)
logic_key = StringField("logic_key", default=None)
func_key = AnyField("func_key", default=None)
need_clean_up_func = BoolField("need_clean_up_func", default=False)

def __init__(self, output_types=None, **kw):
super().__init__(_output_types=output_types, **kw)
Expand All @@ -73,6 +80,7 @@ def _update_key(self):
@redirect_custom_log
@enter_current_session
def execute(cls, ctx, op):
restore_func(ctx, op)
in_data = ctx[op.inputs[0].key]
out = op.outputs[0]
if not in_data:
Expand Down Expand Up @@ -109,6 +117,7 @@ def execute(cls, ctx, op):

@classmethod
def tile(cls, op):
clean_up_func(op)
in_groupby = op.inputs[0]
out_df = op.outputs[0]

Expand Down
37 changes: 37 additions & 0 deletions mars/dataframe/groupby/tests/test_groupby_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,43 @@ def apply_series(s, truncate=True):
)


def test_groupby_apply_closure(setup):
# DataFrame
df1 = pd.DataFrame(
{
"a": [3, 4, 5, 3, 5, 4, 1, 2, 3],
"b": [1, 3, 4, 5, 6, 5, 4, 4, 4],
"c": list("aabaaddce"),
}
)

x, y = 10, 11

def apply_closure_df(df):
return df["a"].max() * x

def apply_closure_series(s):
return s.mean() * y

mdf = md.DataFrame(df1, chunk_size=3)

applied = mdf.groupby("b").apply(apply_closure_df)
pd.testing.assert_series_equal(
applied.execute().fetch().sort_index(),
df1.groupby("b").apply(apply_closure_df).sort_index(),
)

# Series
series1 = pd.Series([3, 4, 5, 3, 5, 4, 1, 2, 3])
ms1 = md.Series(series1, chunk_size=3)

applied = ms1.groupby(lambda x: x % 3).apply(apply_closure_series)
pd.testing.assert_series_equal(
applied.execute().fetch().sort_index(),
series1.groupby(lambda x: x % 3).apply(apply_closure_series).sort_index(),
)

vcfgv marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.ray_dag
def test_groupby_transform(setup):
df1 = pd.DataFrame(
Expand Down
61 changes: 60 additions & 1 deletion mars/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import functools
import itertools
import logging
import operator
from contextlib import contextmanager
from numbers import Integral
Expand All @@ -27,8 +30,10 @@

from ..config import options
from ..core import Entity, ExecutableTuple
from ..core.context import Context
from ..core.context import Context, get_context
from ..lib.mmh3 import hash as mmh_hash
from ..services.context import ThreadedServiceContext
from ..services.task.execution.ray.context import RayExecutionContext
from ..tensor.utils import dictify_chunk_size, normalize_chunk_sizes
from ..typing import ChunkType, TileableType
from ..utils import (
Expand All @@ -46,6 +51,8 @@
pa = ModulePlaceholder("pyarrow")

cudf = lazy_import("cudf", rename="cudf")
vineyard = lazy_import("vineyard")
logger = logging.getLogger(__name__)


def hash_index(index, size):
Expand Down Expand Up @@ -1424,3 +1431,55 @@ def _concat_chunks(merge_chunks: List[ChunkType], output_index: int):
else:
params["nsplits"] = (tuple(n_split), df_or_series.nsplits[1])
return new_op.new_tileable(df_or_series.op.inputs, kws=[params])


def clean_up_func(op):
closure_clean_up_bytes_threshold = int(
os.getenv("MARS_CLOSURE_CLEAN_UP_BYTES_THRESHOLD", 10**4)
)
if closure_clean_up_bytes_threshold == -1: # pragma: no cover
return
ctx = get_context()
if ctx is None:
return
# note: Vineyard internally uses `pickle` which fails to pickle
# cell objects and corresponding functions.
if vineyard is not None:
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
storage_backend = ctx.get_storage_info()
if storage_backend.get("name", None) == "vineyard":
logger.warning(
"Func cleanup is currently disabled when vineyard is used as storage backend."
)
return

func = op.func
if hasattr(func, "__closure__") and func.__closure__ is not None:
counted_bytes = 0
for cell in func.__closure__:
# note: another applicable way of measurements is df.memory_usage(index=True, deep=False).sum()
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
counted_bytes += sys.getsizeof(cell.cell_contents)
if counted_bytes >= closure_clean_up_bytes_threshold:
op.need_clean_up_func = True
break
# note: op.func_key is set to not None only when op.need_clean_up_func is True.
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
if op.need_clean_up_func:
# note: when used in ray task mode, data key is ray.ObjectRef which is returned
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
# after func being put into storage.
if isinstance(ctx, RayExecutionContext): # pragma: no cover
op.func_key = ctx.storage_put(op.func)
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
# note: when used in normal mode, data key is logic key while the returned data_info
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
# is not used as key.
elif isinstance(ctx, ThreadedServiceContext):
assert op.logic_key is not None
op.func_key = op.logic_key
ctx.storage_put(op.func_key, op.func)
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
else: # pragma: no cover
raise Exception("unknown context type: %s", type(ctx))
op.func = None


def restore_func(ctx: Context, op):
if op.need_clean_up_func and ctx is not None:
assert op.func_key is not None
assert op.func is None
vcfgv marked this conversation as resolved.
Show resolved Hide resolved
op.func = ctx.storage_get(op.func_key)
2 changes: 1 addition & 1 deletion mars/deploy/oscar/base_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ storage:
default_config:
transfer_block_size: 5 * 1024 ** 2
plasma:
store_memory: 20%
store_memory: 12%
"@overriding_fields": ["backends"]
meta:
store: dict
Expand Down
Loading