Skip to content

Commit

Permalink
Merge pull request #2442 from moj-analytical-services/feature/score-m…
Browse files Browse the repository at this point in the history
…issing-edges

Score missing intra-cluster edges
  • Loading branch information
ADBond authored Oct 7, 2024
2 parents 3473d8f + 3c1b45d commit cd849d6
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 0 deletions.
169 changes: 169 additions & 0 deletions splink/internals/linker_components/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
_join_new_table_to_df_concat_with_tf_sql,
colname_to_tf_tablename,
)
from splink.internals.unique_id_concat import _composite_unique_id_from_edges_sql
from splink.internals.vertically_concatenate import (
compute_df_concat_with_tf,
enqueue_df_concat_with_tf,
Expand Down Expand Up @@ -290,6 +291,174 @@ def predict(

return predictions

def _score_missing_cluster_edges(
self,
df_clusters: SplinkDataFrame,
df_predict: SplinkDataFrame = None,
threshold_match_probability: float = None,
threshold_match_weight: float = None,
) -> SplinkDataFrame:
"""
Given a table of clustered records, create a dataframe of scored
pairwise comparisons for all pairs of records that belong to the same cluster.
If you also supply a scored edges table, this will only return pairwise
comparisons that are not already present in your scored edges table.
Args:
df_clusters (SplinkDataFrame): A table of clustered records, such
as the output of
`linker.clustering.cluster_pairwise_predictions_at_threshold()`.
All edges within the same cluster as specified by this table will
be scored.
Table needs cluster_id, id columns, and any columns used in
model comparisons.
df_predict (SplinkDataFrame, optional): An edges table, the output of
`linker.inference.predict()`.
If supplied, resulting table will not include any edges already
included in this table.
threshold_match_probability (float, optional): If specified,
filter the results to include only pairwise comparisons with a
match_probability above this threshold. Defaults to None.
threshold_match_weight (float, optional): If specified,
filter the results to include only pairwise comparisons with a
match_weight above this threshold. Defaults to None.
Examples:
```py
linker = linker(df, "saved_settings.json", db_api=db_api)
df_edges = linker.inference.predict()
df_clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(
df_edges,
0.9,
)
df_remaining_edges = linker._score_missing_cluster_edges(
df_clusters,
df_edges,
)
df_remaining_edges.as_pandas_dataframe(limit=5)
```
Returns:
SplinkDataFrame: A SplinkDataFrame of the scored pairwise comparisons.
"""

start_time = time.time()

source_dataset_input_column = (
self._linker._settings_obj.column_info_settings.source_dataset_input_column
)
unique_id_input_column = (
self._linker._settings_obj.column_info_settings.unique_id_input_column
)

pipeline = CTEPipeline()
enqueue_df_concat_with_tf(self._linker, pipeline)
# we need to adjoin tf columns onto clusters table now
# also alias cluster_id so that it doesn't interfere with existing column
sql = f"""
SELECT
c.cluster_id AS _cluster_id,
ctf.*
FROM
{df_clusters.physical_name} c
LEFT JOIN
__splink__df_concat_with_tf ctf
ON
c.{unique_id_input_column.name} = ctf.{unique_id_input_column.name}
"""
if source_dataset_input_column:
sql += (
f" AND c.{source_dataset_input_column.name} = "
f"ctf.{source_dataset_input_column.name}"
)
sqls = [
{
"sql": sql,
"output_table_name": "__splink__df_clusters_renamed",
}
]
blocking_input_tablename_l = "__splink__df_clusters_renamed"
blocking_input_tablename_r = "__splink__df_clusters_renamed"

link_type = self._linker._settings_obj._link_type
sqls.extend(
block_using_rules_sqls(
input_tablename_l=blocking_input_tablename_l,
input_tablename_r=blocking_input_tablename_r,
blocking_rules=[BlockingRule("l._cluster_id = r._cluster_id")],
link_type=link_type,
source_dataset_input_column=source_dataset_input_column,
unique_id_input_column=unique_id_input_column,
)
)
# we are going to insert an intermediate table, so rename this
sqls[-1]["output_table_name"] = "__splink__raw_blocked_id_pairs"

sql = """
SELECT ne.*
FROM __splink__raw_blocked_id_pairs ne
"""
if df_predict is not None:
# if we are given edges, we left join them, and then keep only rows
# where we _didn't_ have corresponding rows in edges table
if source_dataset_input_column:
unique_id_columns = [
source_dataset_input_column,
unique_id_input_column,
]
else:
unique_id_columns = [unique_id_input_column]
uid_l_expr = _composite_unique_id_from_edges_sql(unique_id_columns, "l")
uid_r_expr = _composite_unique_id_from_edges_sql(unique_id_columns, "r")
sql_predict_with_join_keys = f"""
SELECT *, {uid_l_expr} AS join_key_l, {uid_r_expr} AS join_key_r
FROM {df_predict.physical_name}
"""
sqls.append(
{
"sql": sql_predict_with_join_keys,
"output_table_name": "__splink__df_predict_with_join_keys",
}
)

sql = f"""
{sql}
LEFT JOIN __splink__df_predict_with_join_keys oe
ON oe.join_key_l = ne.join_key_l AND oe.join_key_r = ne.join_key_r
WHERE oe.join_key_l IS NULL AND oe.join_key_r IS NULL
"""

sqls.append({"sql": sql, "output_table_name": "__splink__blocked_id_pairs"})

pipeline.enqueue_list_of_sqls(sqls)

sqls = compute_comparison_vector_values_from_id_pairs_sqls(
self._linker._settings_obj._columns_to_select_for_blocking,
self._linker._settings_obj._columns_to_select_for_comparison_vector_values,
input_tablename_l=blocking_input_tablename_l,
input_tablename_r=blocking_input_tablename_r,
source_dataset_input_column=self._linker._settings_obj.column_info_settings.source_dataset_input_column,
unique_id_input_column=self._linker._settings_obj.column_info_settings.unique_id_input_column,
)
pipeline.enqueue_list_of_sqls(sqls)

sqls = predict_from_comparison_vectors_sqls_using_settings(
self._linker._settings_obj,
threshold_match_probability,
threshold_match_weight,
sql_infinity_expression=self._linker._infinity_expression,
)
sqls[-1]["output_table_name"] = "__splink__df_predict_missing_cluster_edges"
pipeline.enqueue_list_of_sqls(sqls)

predictions = self._linker._db_api.sql_pipeline_to_splink_dataframe(pipeline)

predict_time = time.time() - start_time
logger.info(f"Predict time: {predict_time:.2f} seconds")

self._linker._predict_warning()
return predictions

def find_matches_to_new_records(
self,
records_or_tablename: AcceptableInputTableType | str,
Expand Down
146 changes: 146 additions & 0 deletions tests/test_score_missing_edges.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import pandas as pd
from pytest import mark

import splink.comparison_library as cl
from splink import Linker, SettingsCreator, block_on

from .decorator import mark_with_dialects_excluding

df_pd = pd.read_csv("./tests/datasets/fake_1000_from_splink_demos.csv")
# we don't need full data to check this logic - a smallish subset will do
# as long as it's large enough to contain missed intra-cluster edges
# when predicted with default parameters
df_pd = df_pd[0:200]


@mark_with_dialects_excluding()
@mark.parametrize(
["link_type", "copies_of_df"],
[["dedupe_only", 1], ["link_only", 2], ["link_and_dedupe", 2], ["link_only", 3]],
)
def test_score_missing_edges(test_helpers, dialect, link_type, copies_of_df):
helper = test_helpers[dialect]

df = helper.convert_frame(df_pd)
settings = SettingsCreator(
link_type=link_type,
comparisons=[
cl.ExactMatch("first_name"),
cl.ExactMatch("surname"),
cl.ExactMatch("dob"),
cl.ExactMatch("city"),
],
blocking_rules_to_generate_predictions=[
block_on("surname"),
block_on("dob"),
],
retain_intermediate_calculation_columns=True,
)
linker_input = df if copies_of_df == 1 else [df for _ in range(copies_of_df)]
linker = Linker(linker_input, settings, **helper.extra_linker_args())

df_predict = linker.inference.predict()
df_clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(
df_predict, 0.95
)

df_missing_edges = linker.inference._score_missing_cluster_edges(
df_clusters,
df_predict,
).as_pandas_dataframe()

assert not df_missing_edges.empty, "No missing edges found"
assert not any(df_missing_edges["surname_l"] == df_missing_edges["surname_r"])
assert not any(df_missing_edges["dob_l"] == df_missing_edges["dob_r"])


@mark_with_dialects_excluding()
@mark.parametrize(
["link_type", "copies_of_df"],
[["dedupe_only", 1], ["link_only", 2]],
)
def test_score_missing_edges_all_edges(test_helpers, dialect, link_type, copies_of_df):
helper = test_helpers[dialect]

df = helper.convert_frame(df_pd)
settings = SettingsCreator(
link_type=link_type,
comparisons=[
cl.ExactMatch("first_name"),
cl.ExactMatch("surname"),
cl.ExactMatch("dob"),
cl.ExactMatch("city"),
],
blocking_rules_to_generate_predictions=[
block_on("surname"),
block_on("dob"),
],
retain_intermediate_calculation_columns=True,
)
linker_input = df if copies_of_df == 1 else [df for _ in range(copies_of_df)]
linker = Linker(linker_input, settings, **helper.extra_linker_args())

df_predict = linker.inference.predict()
df_clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(
df_predict, 0.95
)

df_missing_edges = linker.inference._score_missing_cluster_edges(
df_clusters,
).as_pandas_dataframe()

assert not df_missing_edges.empty, "No missing edges found"
# some of these should be present now, as we are scoring all intracluster edges
assert any(df_missing_edges["surname_l"] == df_missing_edges["surname_r"])
assert any(df_missing_edges["dob_l"] == df_missing_edges["dob_r"])


@mark_with_dialects_excluding()
@mark.parametrize(
["link_type"],
[["dedupe_only"], ["link_only"]],
)
def test_score_missing_edges_changed_column_names(test_helpers, dialect, link_type):
helper = test_helpers[dialect]

df = df_pd.copy()
df["record_id"] = df["unique_id"]
del df["unique_id"]
df["sds"] = "frame_1"
settings = SettingsCreator(
link_type=link_type,
comparisons=[
cl.ExactMatch("first_name"),
cl.ExactMatch("surname"),
cl.ExactMatch("dob"),
cl.ExactMatch("city"),
],
blocking_rules_to_generate_predictions=[
block_on("surname"),
block_on("dob"),
],
retain_intermediate_calculation_columns=True,
unique_id_column_name="record_id",
source_dataset_column_name="sds",
)
if link_type == "dedupe_only":
linker_input = helper.convert_frame(df)
else:
df_2 = df.copy()
df_2["sds"] = "frame_2"
linker_input = [helper.convert_frame(df), helper.convert_frame(df_2)]
linker = Linker(linker_input, settings, **helper.extra_linker_args())

df_predict = linker.inference.predict()
df_clusters = linker.clustering.cluster_pairwise_predictions_at_threshold(
df_predict, 0.95
)

df_missing_edges = linker.inference._score_missing_cluster_edges(
df_clusters,
df_predict,
).as_pandas_dataframe()

assert not df_missing_edges.empty, "No missing edges found"
assert not any(df_missing_edges["surname_l"] == df_missing_edges["surname_r"])
assert not any(df_missing_edges["dob_l"] == df_missing_edges["dob_r"])

0 comments on commit cd849d6

Please sign in to comment.