From 37425c3b90b0650ca4758de9eb68e1eab0568311 Mon Sep 17 00:00:00 2001 From: Denis Dallinga Date: Thu, 11 Jan 2024 14:30:19 +0100 Subject: [PATCH] Apply sample limit to UDFailedRowsExpressionQuery This changes the way samples are collected from UserDefinedFailedRowsExpressionQueries. In order to apply the samples limit given in the check configuration, or apply the default samples limit, the failed rows expression needs to fire off an additional SampleQuery. The SampleQuery executes a copy of the query, appending a LIMIT clause. [#1985](https://github.com/sodadata/soda-core/issues/1985) --- .../user_defined_failed_rows_expression_check.py | 1 + soda/core/soda/execution/query/query.py | 3 +-- .../user_defined_failed_rows_expression_query.py | 12 +++++++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py b/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py index 5fd376ed2..329c02b3b 100644 --- a/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py +++ b/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py @@ -81,6 +81,7 @@ def get_failed_rows_sql(self) -> str: scan = self.data_source_scan.scan condition = scan.jinja_resolve(definition=partition_filter, location=self.check_cfg.location) sql += f"\n AND ({condition})" + return sql def get_log_diagnostic_dict(self) -> dict: diff --git a/soda/core/soda/execution/query/query.py b/soda/core/soda/execution/query/query.py index 7077f035c..771a7f0ae 100644 --- a/soda/core/soda/execution/query/query.py +++ b/soda/core/soda/execution/query/query.py @@ -160,7 +160,7 @@ def fetchall(self): finally: self.duration = datetime.now() - start - def store(self): + def store(self, allow_samples=True): """ DataSource query execution exceptions will be caught and result in the self.exception being populated. @@ -174,7 +174,6 @@ def store(self): try: # Check if query does not contain forbidden columns and only create sample if it does not. # Query still needs to execute in case this is a query that also sets a metric value. (e.g. reference check) - allow_samples = True offending_columns = [] if self.partition and self.partition.table: diff --git a/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py b/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py index 141333d4f..c94aa52f6 100644 --- a/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py +++ b/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py @@ -2,6 +2,8 @@ from soda.execution.query.query import Query +from soda.core.soda.execution.query.sample_query import SampleQuery + class UserDefinedFailedRowsExpressionQuery(Query): def __init__( @@ -23,4 +25,12 @@ def __init__( self.metric = metric def execute(self): - self.store() + # By default don't store samples through the store method to circumvent sample limit not being applied to + # failed row checks + self.store(allow_samples=False) + + samples_sql = self.sql + if self.metric.samples_limit: + samples_sql += f"\nLIMIT {self.metric.samples_limit}" + sample_query = SampleQuery(self.data_source_scan, self.metric, "failed_rows", samples_sql) + sample_query.execute()