Skip to content

Commit

Permalink
Preserve HEAP_COMBOCID when restoring t_cid from WAL (#8503)
Browse files Browse the repository at this point in the history
## Problem

See #8499

## Summary of changes

Save HEAP_COMBOCID flag in WAL and do not clear it in redo handlers.

Related Postgres PRs:
neondatabase/postgres#457
neondatabase/postgres#458
neondatabase/postgres#459


## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
  • Loading branch information
3 people authored Aug 14, 2024
1 parent c624317 commit 7a1736d
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 13 deletions.
16 changes: 9 additions & 7 deletions pgxn/neon_rmgr/neon_rmgr.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ static void
fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
{
*infomask &= ~(HEAP_XMAX_IS_MULTI | HEAP_XMAX_LOCK_ONLY |
HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK);
HEAP_XMAX_KEYSHR_LOCK | HEAP_XMAX_EXCL_LOCK | HEAP_COMBOCID);
*infomask2 &= ~HEAP_KEYS_UPDATED;

if (infobits & XLHL_XMAX_IS_MULTI)
Expand All @@ -195,6 +195,8 @@ fix_infomask_from_infobits(uint8 infobits, uint16 *infomask, uint16 *infomask2)
*infomask |= HEAP_XMAX_LOCK_ONLY;
if (infobits & XLHL_XMAX_EXCL_LOCK)
*infomask |= HEAP_XMAX_EXCL_LOCK;
if (infobits & XLHL_COMBOCID)
*infomask |= HEAP_COMBOCID;
/* note HEAP_XMAX_SHR_LOCK isn't considered here */
if (infobits & XLHL_XMAX_KEYSHR_LOCK)
*infomask |= HEAP_XMAX_KEYSHR_LOCK;
Expand Down Expand Up @@ -284,7 +286,7 @@ redo_neon_heap_insert(XLogReaderState *record)
htup->t_infomask = xlhdr.t_infomask;
htup->t_hoff = xlhdr.t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, xlhdr.t_cid);
htup->t_choice.t_heap.t_field3.t_cid = xlhdr.t_cid;
htup->t_ctid = target_tid;

if (PageAddItem(page, (Item) htup, newlen, xlrec->offnum,
Expand Down Expand Up @@ -373,7 +375,7 @@ redo_neon_heap_delete(XLogReaderState *record)
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
else
HeapTupleHeaderSetXmin(htup, InvalidTransactionId);
HeapTupleHeaderSetCmax(htup, xlrec->t_cid, false);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;

/* Mark the page as a candidate for pruning */
PageSetPrunable(page, XLogRecGetXid(record));
Expand Down Expand Up @@ -490,7 +492,7 @@ redo_neon_heap_update(XLogReaderState *record, bool hot_update)
fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
&htup->t_infomask2);
HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
HeapTupleHeaderSetCmax(htup, xlrec->t_cid, false);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;
/* Set forward chain link in t_ctid */
htup->t_ctid = newtid;

Expand Down Expand Up @@ -623,7 +625,7 @@ redo_neon_heap_update(XLogReaderState *record, bool hot_update)
htup->t_hoff = xlhdr.t_hoff;

HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, xlhdr.t_cid);
htup->t_choice.t_heap.t_field3.t_cid = xlhdr.t_cid;
HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
/* Make sure there is no forward chain link in t_ctid */
htup->t_ctid = newtid;
Expand Down Expand Up @@ -728,7 +730,7 @@ redo_neon_heap_lock(XLogReaderState *record)
offnum);
}
HeapTupleHeaderSetXmax(htup, xlrec->xmax);
HeapTupleHeaderSetCmax(htup, xlrec->t_cid, false);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
Expand Down Expand Up @@ -840,7 +842,7 @@ redo_neon_heap_multi_insert(XLogReaderState *record)
htup->t_infomask = xlhdr->t_infomask;
htup->t_hoff = xlhdr->t_hoff;
HeapTupleHeaderSetXmin(htup, XLogRecGetXid(record));
HeapTupleHeaderSetCmin(htup, xlrec->t_cid);
htup->t_choice.t_heap.t_field3.t_cid = xlrec->t_cid;
ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);

Expand Down
139 changes: 139 additions & 0 deletions test_runner/regress/test_combocid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from fixtures.neon_fixtures import NeonEnvBuilder


def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)

conn = endpoint.connect()
cur = conn.cursor()
n_records = 1000

cur.execute("CREATE EXTENSION neon_test_utils")

cur.execute("create table t(id integer, val integer)")

cur.execute("begin")
cur.execute("insert into t values (1, 0)")
cur.execute("insert into t values (2, 0)")
cur.execute(f"insert into t select g, 0 from generate_series(3,{n_records}) g")

# Open a cursor that scroll it halfway through
cur.execute("DECLARE c1 NO SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM t")
cur.execute("fetch 500 from c1")
rows = cur.fetchall()
assert len(rows) == 500

# Perform specified operation
cur.execute(op)

# Clear the cache, so that we exercise reconstructing the pages
# from WAL
cur.execute("SELECT clear_buffer_cache()")

# Check that the cursor opened earlier still works. If the
# combocids are not restored correctly, it won't.
cur.execute("fetch all from c1")
rows = cur.fetchall()
assert len(rows) == 500

cur.execute("rollback")


def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "delete from t")


def test_combocid_update(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "update t set val=val+1")


def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
do_combocid_op(neon_env_builder, "select * from t for update")


def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"shared_buffers='1MB'",
],
)

conn = endpoint.connect()
cur = conn.cursor()
n_records = 1000

cur.execute("CREATE EXTENSION neon_test_utils")

cur.execute("create table t(id integer, val integer)")
file_path = f"{endpoint.pg_data_dir_path()}/t.csv"
cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g")
cur.execute(f"copy t to '{file_path}'")
cur.execute("truncate table t")

cur.execute("begin")
cur.execute(f"copy t from '{file_path}'")

# Open a cursor that scroll it halfway through
cur.execute("DECLARE c1 NO SCROLL CURSOR WITHOUT HOLD FOR SELECT * FROM t")
cur.execute("fetch 500 from c1")
rows = cur.fetchall()
assert len(rows) == 500

# Delete all the rows. Because all of the rows were inserted earlier in the
# same transaction, all the rows will get a combocid.
cur.execute("delete from t")
# Clear the cache, so that we exercise reconstructing the pages
# from WAL
cur.execute("SELECT clear_buffer_cache()")

# Check that the cursor opened earlier still works. If the
# combocids are not restored correctly, it won't.
cur.execute("fetch all from c1")
rows = cur.fetchall()
assert len(rows) == 500

cur.execute("rollback")


def test_combocid(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start("main")

conn = endpoint.connect()
cur = conn.cursor()
n_records = 100000

cur.execute("create table t(id integer, val integer)")
cur.execute(f"insert into t values (generate_series(1,{n_records}), 0)")

cur.execute("begin")

cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records

cur.execute("delete from t")
assert cur.rowcount == n_records
cur.execute("delete from t")
assert cur.rowcount == 0

cur.execute(f"insert into t values (generate_series(1,{n_records}), 0)")
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records
cur.execute("update t set val=val+1")
assert cur.rowcount == n_records

cur.execute("rollback")
2 changes: 1 addition & 1 deletion vendor/postgres-v14
2 changes: 1 addition & 1 deletion vendor/postgres-v15
2 changes: 1 addition & 1 deletion vendor/postgres-v16
6 changes: 3 additions & 3 deletions vendor/revisions.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"v16": [
"16.3",
"5ea106b2583285849784e774b39d62eb2615bd5d"
"47a9122a5a150a3217fafd3f3d4fe8e020ea718a"
],
"v15": [
"15.7",
"39c51c33b383239c78b86afe561679f980e44842"
"46b4b235f38413ab5974bb22c022f9b829257674"
],
"v14": [
"14.12",
"a48faca1d9aef59649dd1bf34bc1b6303fa3489e"
"3fd7a45f8aae85c080df6329e3c85887b7f3a737"
]
}

1 comment on commit 7a1736d

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2236 tests run: 2155 passed, 2 failed, 79 skipped (full report)


Failures on Postgres 14

  • test_pgbench_intensive_init_workload[neon_off-github-actions-selfhosted-1000]: release
  • test_heavy_write_workload[neon_off-github-actions-selfhosted-10-5-5]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_pgbench_intensive_init_workload[neon_off-release-pg14-github-actions-selfhosted-1000] or test_heavy_write_workload[neon_off-release-pg14-github-actions-selfhosted-10-5-5]"

Code coverage* (full report)

  • functions: 32.3% (7161 of 22162 functions)
  • lines: 50.3% (57922 of 115177 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
7a1736d at 2024-08-14T07:06:55.483Z :recycle:

Please sign in to comment.