Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Jan 15, 2025
1 parent c469625 commit 8be16a1
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 294 deletions.
40 changes: 37 additions & 3 deletions opteryx/compiled/list_ops/list_ops.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@

#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION

import cython
import numpy
cimport numpy as cnp
from cython import Py_ssize_t
from numpy cimport ndarray
from cpython.unicode cimport PyUnicode_AsUTF8String
from cpython.bytes cimport PyBytes_AsString

cnp.import_array()

Expand Down Expand Up @@ -285,8 +285,6 @@ cpdef cnp.ndarray cython_get_element_op(cnp.ndarray[object, ndim=1] array, int k
return result


@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray array_encode_utf8(cnp.ndarray inp):
"""
Parallel UTF-8 encode all elements of a 1D ndarray of "object" dtype.
Expand Down Expand Up @@ -320,3 +318,39 @@ cpdef cnp.ndarray[cnp.uint8_t, ndim=1] list_contains_any(cnp.ndarray array, cnp.
res[i] = 1
break
return res


cdef extern from "string.h":
void *memmem(const void *haystack, size_t haystacklen, const void *needle, size_t needlelen)

cpdef cnp.ndarray[cnp.uint8_t, ndim=1] list_substring(cnp.ndarray[cnp.str, ndim=1] haystack, str needle):
"""
Used as the InStr operator, which was written to replace using LIKE to execute list_substring
matching. We tried using PyArrow's substring but the performance was almost identical to LIKE.
"""
cdef Py_ssize_t n = haystack.shape[0]
cdef bytes needle_bytes = needle.encode('utf-8')
cdef char *c_pattern = PyBytes_AsString(needle_bytes)
cdef size_t pattern_length = len(needle_bytes)
cdef cnp.ndarray[cnp.uint8_t, ndim=1] result = numpy.zeros(n, dtype=numpy.uint8)
cdef Py_ssize_t i = 0
cdef Py_ssize_t length
cdef char *data

# Check the type of the first item to decide the processing method
if isinstance(haystack[0], str):
for i in range(n):
item = PyUnicode_AsUTF8String(haystack[i])
data = <char*> item
length = len(item)
if memmem(data, length, c_pattern, pattern_length) != NULL:
result[i] = 1
else:
for i in range(n):
item = haystack[i]
data = <char*> item
length = len(item)
if memmem(data, length, c_pattern, pattern_length) != NULL:
result[i] = 1

return result
2 changes: 1 addition & 1 deletion opteryx/connectors/capabilities/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def map_statistics(
if statistics is None:
return schema

schema.record_count_metric = statistics.record_count
schema.row_count_metric = statistics.record_count

for column in schema.columns:
column.highest_value = statistics.upper_bounds.get(column.name, None)
Expand Down
9 changes: 6 additions & 3 deletions opteryx/connectors/disk_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,12 @@ def read_dataset(
decoder=decoder,
just_schema=True,
)
if schema.row_count_metric:
schema.row_count_metric *= len(blob_names)
self.statistics.estimated_row_count += schema.row_count_metric
# if we have more than one blob we need to estimate the row count
blob_count = len(blob_names)
if schema.row_count_metric and blob_count > 1:
schema.row_count_estimate = schema.row_count_metric * blob_count
schema.row_count_metric = None
self.statistics.estimated_row_count += schema.row_count_estimate
yield schema

except UnsupportedFileTypeError:
Expand Down
8 changes: 5 additions & 3 deletions opteryx/connectors/gcp_cloudstorage_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,10 @@ def get_dataset_schema(self) -> RelationSchema:
if self.schema is None:
raise DatasetNotFoundError(dataset=self.dataset)

if self.schema.row_count_metric:
self.schema.row_count_metric *= number_of_blobs
self.statistics.estimated_row_count += self.schema.row_count_metric
# if we have more than one blob we need to estimate the row count
if self.schema.row_count_metric and number_of_blobs > 1:
self.schema.row_count_estimate = self.schema.row_count_metric * number_of_blobs
self.schema.row_count_metric = None
self.statistics.estimated_row_count += self.schema.row_count_estimate

return self.schema
4 changes: 4 additions & 0 deletions opteryx/connectors/sql_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from opteryx.connectors.base.base_connector import BaseConnector
from opteryx.connectors.capabilities import LimitPushable
from opteryx.connectors.capabilities import PredicatePushable
from opteryx.exceptions import DatasetNotFoundError
from opteryx.exceptions import MissingDependencyError
from opteryx.exceptions import UnmetRequirementError
from opteryx.managers.expression import Node
Expand Down Expand Up @@ -225,6 +226,7 @@ def read_dataset( # type:ignore

def get_dataset_schema(self) -> RelationSchema:
from sqlalchemy import Table
from sqlalchemy.exc import NoSuchTableError

if self.schema:
return self.schema
Expand Down Expand Up @@ -256,6 +258,8 @@ def get_dataset_schema(self) -> RelationSchema:
for column in table.columns
],
)
except NoSuchTableError as err:
raise DatasetNotFoundError(dataset=self.dataset)
except Exception as err:
if not err:
pass
Expand Down
8 changes: 7 additions & 1 deletion opteryx/managers/expression/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from pyarrow import compute

from opteryx.compiled import list_ops
from opteryx.utils.sql import sql_like_to_regex


def filter_operations(arr, left_type, operator, value, right_type):
Expand Down Expand Up @@ -136,6 +135,13 @@ def _inner_filter_operations(arr, operator, value):
# MODIFIED FOR OPTERYX - see comment above
values = set(value[0])
return numpy.array([a not in values for a in arr], dtype=numpy.bool_) # [#325]?
if operator == "InStr":
needle = str(value[0])
return list_ops.list_ops.list_substring(arr, needle) # [#325]
if operator == "NotInStr":
needle = str(value[0])
matches = list_ops.list_ops.list_substring(arr, needle) # [#325]
return numpy.invert(matches.astype(dtype=bool))
if operator == "Like":
# MODIFIED FOR OPTERYX
# null input emits null output, which should be false/0
Expand Down
11 changes: 8 additions & 3 deletions opteryx/models/relation_statistics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Any
from typing import Dict
from typing import Optional


class RelationStatistics:
Expand All @@ -8,14 +9,18 @@ class RelationStatistics:
Attributes:
record_count (Optional[int]): The number of records in the file. Defaults to -1.
file_size (Optional[int]): The size of the file in bytes. Defaults to -1.
record_count_estimate (Optional[int]): The estimated number of records in the file. Defaults to -1.
null_count (Dict[str, int]): A dictionary containing the number of null values for each column.
lower_bounds (Dict[str, int]): A dictionary containing the lower bounds for data values.
upper_bounds (Dict[str, int]): A dictionary containing the upper bounds for data values.
"""

record_count: int = 0
null_count: Dict[str, int] = None
record_count: int = -1
"""The number of records in the dataset"""
record_count_estimate: int = -1
"""The estimated number of records in the dataset"""

null_count: Optional[Dict[str, int]] = None
lower_bounds: Dict[str, Any] = None
upper_bounds: Dict[str, Any] = None

Expand Down
28 changes: 0 additions & 28 deletions opteryx/planner/executor/__init__.py

This file was deleted.

Loading

0 comments on commit 8be16a1

Please sign in to comment.