diff --git a/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py b/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py index 5fd376ed2..329c02b3b 100644 --- a/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py +++ b/soda/core/soda/execution/check/user_defined_failed_rows_expression_check.py @@ -81,6 +81,7 @@ def get_failed_rows_sql(self) -> str: scan = self.data_source_scan.scan condition = scan.jinja_resolve(definition=partition_filter, location=self.check_cfg.location) sql += f"\n AND ({condition})" + return sql def get_log_diagnostic_dict(self) -> dict: diff --git a/soda/core/soda/execution/query/query.py b/soda/core/soda/execution/query/query.py index 7077f035c..771a7f0ae 100644 --- a/soda/core/soda/execution/query/query.py +++ b/soda/core/soda/execution/query/query.py @@ -160,7 +160,7 @@ def fetchall(self): finally: self.duration = datetime.now() - start - def store(self): + def store(self, allow_samples=True): """ DataSource query execution exceptions will be caught and result in the self.exception being populated. @@ -174,7 +174,6 @@ def store(self): try: # Check if query does not contain forbidden columns and only create sample if it does not. # Query still needs to execute in case this is a query that also sets a metric value. (e.g. reference check) - allow_samples = True offending_columns = [] if self.partition and self.partition.table: diff --git a/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py b/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py index 141333d4f..c94aa52f6 100644 --- a/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py +++ b/soda/core/soda/execution/query/user_defined_failed_rows_expression_query.py @@ -2,6 +2,8 @@ from soda.execution.query.query import Query +from soda.core.soda.execution.query.sample_query import SampleQuery + class UserDefinedFailedRowsExpressionQuery(Query): def __init__( @@ -23,4 +25,12 @@ def __init__( self.metric = metric def execute(self): - self.store() + # By default don't store samples through the store method to circumvent sample limit not being applied to + # failed row checks + self.store(allow_samples=False) + + samples_sql = self.sql + if self.metric.samples_limit: + samples_sql += f"\nLIMIT {self.metric.samples_limit}" + sample_query = SampleQuery(self.data_source_scan, self.metric, "failed_rows", samples_sql) + sample_query.execute()