Running Splink WITHOUT duckb installed on Spark EMR #1239
-
Hello, For reasons that are confidential, my organization is not able to install duckdb on AWS EMR and we still need to run Splink using pySpark. For the most part everything works fine, except the function; It yields the error; Traceback (most recent call last): I am wondering if there is a work around for this? Does duckdb HAVE to be installed for the estimate_u_using_random_sampling function to work? |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 7 replies
-
Unfortunately the answer is currently yes, but if I recall correctly, the only dependency on duckdb if you're using Spark is here: splink/splink/expectation_maximisation.py Line 88 in 6e3a2e1 If you rewrote that to run using another SQL engine you do have access to (sqlite, polars etc), or rewrote it to work in pandas, then I believe everything else would work. Note that same function is also used when training m values using the EM algorithm. i.e. at the moment you'd get the same error on some of the other training steps, but if you modify it to not use duckdb, it should fix those as well I haven't checked the following code but chatgpt4 thinks this might work: Chat GPT4 suggested codeimport sqlite3
def compute_proportions_for_new_parameters(m_u_df):
"""Using the results from compute_new_parameters_sql, compute
m and u
"""
# Create an in-memory SQLite database
conn = sqlite3.connect(":memory:")
m_u_df.to_sql("m_u_df", conn, if_exists="replace", index=False)
sql = """
SELECT
comparison_vector_value,
output_column_name,
m_count * 1.0 / SUM(m_count) OVER (PARTITION BY output_column_name)
AS m_probability,
u_count * 1.0 / SUM(u_count) OVER (PARTITION BY output_column_name)
AS u_probability
FROM m_u_df
WHERE comparison_vector_value != -1
AND output_column_name != '_probability_two_random_records_match'
UNION ALL
SELECT
comparison_vector_value,
output_column_name,
m_count * 1.0 AS m_probability,
u_count * 1.0 AS u_probability
FROM m_u_df
WHERE output_column_name = '_probability_two_random_records_match'
ORDER BY output_column_name, comparison_vector_value ASC
"""
result = conn.execute(sql).fetchall()
# Convert the result to a list of dictionaries
column_names = ["comparison_vector_value", "output_column_name", "m_probability", "u_probability"]
result_dicts = [dict(zip(column_names, row)) for row in result]
# Close the SQLite connection
conn.close()
return result_dicts
If you're able to get this to work, I'd be grateful if you'd confirm as we'd consider accepting it as a PR, especially if it didn't introduce any new dependencies . We know of at least one other user with the same problem as yourselves. |
Beta Was this translation helpful? Give feedback.
-
👋 just to check, are you trying to run splink on locked down machines that have pre-installed packages, or are you able to run I've got a fix to remove the duckdb dependency, but how we manage dependencies for this depends on the above. |
Beta Was this translation helpful? Give feedback.
-
We ended up using spark.sql and settled with this code and everything seems to be working. Robin, thanks for leading us in the right direction and for all of your work on this amazing package! import spark.sql
def compute_proportions_for_new_parameters(m_u_df):
"""Using the results from compute_new_parameters_sql, compute
m and u
"""
# create temporary view from Spark DataFrame
m_u_df.createOrReplaceTempView("m_u_df")
# Now we can use Spark SQL
result_df = spark.sql("""
SELECT
comparison_vector_value,
output_column_name,
m_count/SUM(m_count) OVER (PARTITION BY output_column_name) AS m_probability,
u_count/SUM(u_count) OVER (PARTITION BY output_column_name) AS u_probability
FROM m_u_df
WHERE comparison_vector_value != -1
AND output_column_name != '_probability_two_random_records_match'
UNION ALL
SELECT
comparison_vector_value,
output_column_name,
m_count AS m_probability,
u_count AS u_probability
FROM m_u_df
WHERE output_column_name = '_probability_two_random_records_match'
ORDER BY output_column_name, comparison_vector_value ASC
""")
# 'result_df' is now a DataFrame with the results of the SQL query
return result_df.to_dict("records") |
Beta Was this translation helpful? Give feedback.
-
@JustinWinthers @JoeGanser To update, and for future readers of this discussion - it's now possible 'officially' to install Splink without duckdb: |
Beta Was this translation helpful? Give feedback.
@JustinWinthers @JoeGanser To update, and for future readers of this discussion - it's now possible 'officially' to install Splink without duckdb:
https://moj-analytical-services.github.io/splink/installations.html#duckdb-less-installation