Skip to content

Commit

Permalink
Merge pull request #2289 from mabel-dev/HOUSEKEEPING/16
Browse files Browse the repository at this point in the history
Housekeeping/16
  • Loading branch information
joocer authored Jan 21, 2025
2 parents 7be91b2 + 4e0756d commit 8a98f50
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/regression_suite.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ name: Regression Suite

on:
push:
branches-ignore:
- main # Exclude the main branch
- 'refs/tags/*' # Exclude tags (releases)
schedule:
- cron: "0 4 * * *"

Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/regression_suite_arm.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
name: Regression Suite (ARM)

on:
push
push:
branches-ignore:
- main # Exclude the main branch
- 'refs/tags/*' # Exclude tags (releases)

jobs:
regression_matrix:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/regression_suite_mac_ARM.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
name: Regression Suite (Mac ARM)

on:
push
push:
branches-ignore:
- main # Exclude the main branch
- 'refs/tags/*' # Exclude tags (releases)

jobs:
regression_matrix:
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/regression_suite_mac_x86.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
name: Regression Suite (Mac x86)

on:
push
push:
branches-ignore:
- main # Exclude the main branch
- 'refs/tags/*' # Exclude tags (releases)

jobs:
regression_matrix:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/static_analysis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jobs:
mypy:
name: Type Checks
runs-on: ubuntu-latest
if: false # disable the job
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion opteryx/__version__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__build__ = 1008
__build__ = 1011

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
28 changes: 27 additions & 1 deletion opteryx/compiled/structures/node.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ during execution for about 0.2 seconds, Cython runs this class approx 33% faster
raw Python version.
"""


from cpython.dict cimport PyDict_Copy
from cpython cimport dict
from uuid import uuid4

Expand Down Expand Up @@ -141,3 +141,29 @@ cdef class Node:
new_node = Node(self.node_type, **{key: _inner_copy(value) for key, value in self._properties.items()})
new_node.uuid = self.uuid
return new_node

def __reduce__(self):
"""
Implements support for pickling (serialization).
Returns a tuple with:
- The class (Node)
- The arguments needed to reconstruct the object
- The state dictionary (optional)
"""
return (self.__class__, (self.node_type,), self.__getstate__())

def __getstate__(self):
"""
Capture the state of the object as a dictionary.
"""
return {
"uuid": self.uuid,
"_properties": PyDict_Copy(self._properties) # Deep copy properties
}

def __setstate__(self, state):
"""
Restore the object's state from a dictionary.
"""
self.uuid = state["uuid"]
self._properties = state["_properties"]
2 changes: 1 addition & 1 deletion opteryx/managers/expression/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def _inner_evaluate(root: Node, table: Table):

# if we have this column already, just return it
if identity in table.column_names:
return table[identity].to_numpy()
return table[identity].to_numpy(False)

# LITERAL TYPES
if node_type == NodeType.LITERAL:
Expand Down
71 changes: 55 additions & 16 deletions opteryx/operators/filter_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
This node is responsible for applying filters to datasets.
"""

import multiprocessing

import numpy
import pyarrow

Expand All @@ -25,6 +27,32 @@

from . import BasePlanNode

multiprocessing.set_start_method("fork", force=True)


def _parallel_filter(queue, morsel, function_evaluations, filters):
if function_evaluations:
morsel = evaluate_and_append(function_evaluations, morsel)
mask = evaluate(filters, morsel)

if not isinstance(mask, pyarrow.lib.BooleanArray):
try:
mask = pyarrow.array(mask, type=pyarrow.bool_())
except Exception as err: # nosec
raise SqlError(f"Unable to filter on expression '{format_expression(filters)} {err}'.")

mask = numpy.nonzero(mask)[0]
# if there's no matching rows, don't return anything
if mask.size > 0 and not numpy.all(mask is None):
morsel = morsel.take(pyarrow.array(mask))
else:
morsel = morsel.slice(0, 0)

if queue is not None:
queue.put(morsel)
else:
return morsel


class FilterNode(BasePlanNode):
def __init__(self, properties: QueryProperties, **parameters):
Expand All @@ -36,6 +64,8 @@ def __init__(self, properties: QueryProperties, **parameters):
select_nodes=(NodeType.FUNCTION,),
)

self.worker_count = pyarrow.io_thread_count() // 2

@property
def config(self): # pragma: no cover
return format_expression(self.filter)
Expand All @@ -53,21 +83,30 @@ def execute(self, morsel: pyarrow.Table, **kwargs) -> pyarrow.Table:
yield morsel
return

if self.function_evaluations:
morsel = evaluate_and_append(self.function_evaluations, morsel)
mask = evaluate(self.filter, morsel)

if not isinstance(mask, pyarrow.lib.BooleanArray):
try:
mask = pyarrow.array(mask, type=pyarrow.bool_())
except Exception as err: # nosec
raise SqlError(
f"Unable to filter on expression '{format_expression(self.filter)} {err}'."
if morsel.num_rows <= 10000 or self.worker_count <= 2:
yield _parallel_filter(None, morsel, self.function_evaluations, self.filter)
else:
workers = []
queue = multiprocessing.Queue()

for block in morsel.to_batches((morsel.num_rows // self.worker_count) + 1):
block = pyarrow.Table.from_batches([block])
p = multiprocessing.Process(
target=_parallel_filter,
args=(queue, block, self.function_evaluations, self.filter),
)
mask = numpy.nonzero(mask)[0]
p.start()
workers.append(p)

# if there's no matching rows, don't return anything
if mask.size > 0 and not numpy.all(mask is None):
yield morsel.take(pyarrow.array(mask))
else:
yield morsel.slice(0, 0)
# Collect all results from the queue
results = []
for _ in workers: # Expecting one result per worker
results.append(queue.get()) # This will block until a result is available

# Merge all results and return them
if results:
yield pyarrow.concat_tables(results)

# Ensure all workers have finished before exiting
for p in workers:
p.join()

0 comments on commit 8a98f50

Please sign in to comment.