10000 [pbckp-128] dry-run option for catchup by dlepikhova · Pull Request #477 · postgrespro/pg_probackup · GitHub
[go: up one dir, main page]

Skip to content

[pbckp-128] dry-run option for catchup #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 52 additions & 32 deletions src/catchup.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* catchup.c: sync DB cluster
*
* Copyright (c) 2021, Postgres Professional
* Copyright (c) 2022, Postgres Professional
*
*-------------------------------------------------------------------------
*/
Expand Down Expand Up @@ -507,16 +507,20 @@ catchup_multithreaded_copy(int num_threads,
/* Run threads */
thread_interrupted = false;
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
for (i = 0; i < num_threads; i++)
if (!dry_run)
{
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
for (i = 0; i < num_threads; i++)
{
elog(VERBOSE, "Start thread num: %i", i);
pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
}
}

/* Wait threads */
for (i = 0; i < num_threads; i++)
{
pthread_join(threads[i], NULL);
if (!dry_run)
pthread_join(threads[i], NULL);
all_threads_successful &= threads_args[i].completed;
transfered_bytes_result += threads_args[i].transfered_bytes;
}
Expand Down Expand Up @@ -706,9 +710,14 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,

/* Start stream replication */
join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR);
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
if (!dry_run)
{
fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, false);
}
else
elog(INFO, "WAL streaming skipping with --dry-run option");

source_filelist = parray_new();

Expand Down Expand Up @@ -779,9 +788,9 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,

/* Build the page map from ptrack information */
make_pagemap_from_ptrack_2(source_filelist, source_conn,
source_node_info.ptrack_schema,
source_node_info.ptrack_version_num,
dest_redo.lsn);
source_node_info.ptrack_schema,
source_node_info.ptrack_version_num,
dest_redo.lsn);
time(&end_time);
elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec",
difftime(end_time, start_time));
Expand Down Expand Up @@ -820,9 +829,9 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char dirpath[MAXPGPATH];

join_path_components(dirpath, dest_pgdata, file->rel_path);

elog(VERBOSE, "Create directory '%s'", dirpath);
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
if (!dry_run)
fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
}
else
{
Expand Down Expand Up @@ -853,15 +862,18 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
elog(VERBOSE, "Create directory \"%s\" and symbolic link \"%s\"",
linked_path, to_path);

/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));

/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
if (!dry_run)
{
/* create tablespace directory */
if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
elog(ERROR, "Could not create tablespace directory \"%s\": %s",
linked_path, strerror(errno));

/* create link to linked_path */
if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
linked_path, to_path, strerror(errno));
}
}
}

Expand Down Expand Up @@ -930,7 +942,10 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
char fullpath[MAXPGPATH];

join_path_components(fullpath, dest_pgdata, file->rel_path);
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
if (!dry_run)
{
fio_delete(file->mode, fullpath, FIO_LOCAL_HOST);
}
elog(VERBOSE, "Deleted file \"%s\"", fullpath);

/* shrink dest pgdata list */
Expand Down Expand Up @@ -961,7 +976,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
catchup_isok = transfered_datafiles_bytes != -1;

/* at last copy control file */
if (catchup_isok)
if (catchup_isok && !dry_run)
{
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
Expand All @@ -972,7 +987,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
transfered_datafiles_bytes += source_pg_control_file->size;
}

if (!catchup_isok)
if (!catchup_isok && !dry_run)
{
char pretty_time[20];
char pretty_transfered_data_bytes[20];
Expand Down Expand Up @@ -1010,14 +1025,18 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
pg_free(stop_backup_query_text);
}

wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);
if (!dry_run)
wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, &current);

#if PG_VERSION_NUM >= 90600
/* Write backup_label */
Assert(stop_backup_result.backup_label_content != NULL);
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
if (!dry_run)
{
pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
NULL);
}
free(stop_backup_result.backup_label_content);
stop_backup_result.backup_label_content = NULL;
stop_backup_result.backup_label_content_len = 0;
Expand All @@ -1040,6 +1059,7 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
#endif

/* wait for end of wal streaming and calculate wal size transfered */
if (!dry_run)
{
parray *wal_files_list = NULL;
wal_files_list = parray_new();
Expand Down Expand Up @@ -1091,17 +1111,17 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads,
}

/* Sync all copied files unless '--no-sync' flag is used */
if (sync_dest_files)
if (sync_dest_files && !dry_run)
catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);
else
elog(WARNING, "Files are not synced to disk");

/* Cleanup */
if (dest_filelist)
if (dest_filelist && !dry_run)
{
parray_walk(dest_filelist, pgFileFree);
parray_free(dest_filelist);
}
parray_free(dest_filelist);
parray_walk(source_filelist, pgFileFree);
parray_free(source_filelist);
pgFileFree(source_pg_control_file);
Expand Down
4 changes: 4 additions & 0 deletions src/help.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ help_pg_probackup(void)
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--dry-run]\n"));
printf(_(" [--help]\n"));

if ((PROGRAM_URL || PROGRAM_EMAIL))
Expand Down Expand Up @@ -1047,6 +1048,7 @@ help_catchup(void)
printf(_(" [--remote-proto] [--remote-host]\n"));
printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
printf(_(" [--ssh-options]\n"));
printf(_(" [--dry-run]\n"));
printf(_(" [--help]\n\n"));

printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n"));
Expand Down Expand Up @@ -1081,4 +1083,6 @@ help_catchup(void)
printf(_(" --remote-user=username user name for ssh connection (default: current user)\n"));
printf(_(" --ssh-options=ssh_options additional ssh options (default: none)\n"));
printf(_(" (example: --ssh-options='-c cipher_spec -F configfile')\n\n"));

printf(_(" --dry-run perform a trial run without any changes\n\n"));
}
154 changes: 154 additions & 0 deletions tests/catchup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1455,3 +1455,157 @@ def test_config_exclusion(self):
dst_pg.stop()
#self.assertEqual(1, 0, 'Stop test')
self.del_test_dir(module_name, self.fname)

#########################################
# --dry-run
#########################################
def test_dry_run_catchup_full(self):
"""
Test dry-run option for full catchup
"""
# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True
)
src_pg.slow_start()

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))

src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()

# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)

# do full catchup
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)

# compare data dirs before and after catchup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)

# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)

def test_dry_run_catchup_ptrack(self):
"""
Test dry-run option for catchup in incremental ptrack mode
"""
if not self.ptrack:
return unittest.skip('Skipped because ptrack support is disabled')

# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
ptrack_enable = True,
initdb_params = ['--data-checksums']
)
src_pg.slow_start()
src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")

src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()

# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)

# do incremental catchup
self.catchup_node(
backup_mode = 'PTRACK',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', '--dry-run']
)

# compare data dirs before and after cathup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)

# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)

def test_dry_run_catchup_delta(self):
"""
Test dry-run option for catchup in incremental delta mode
"""

# preparation 1: source
src_pg = self.make_simple_node(
base_dir = os.path.join(module_name, self.fname, 'src'),
set_replication = True,
initdb_params = ['--data-checksums'],
pg_options = { 'wal_log_hints': 'on' }
)
src_pg.slow_start()

src_pg.pgbench_init(scale = 10)
pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
pgbench.wait()

# preparation 2: make clean shutdowned lagging behind replica
dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
self.catchup_node(
backup_mode = 'FULL',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
)
self.set_replica(src_pg, dst_pg)
dst_options = {}
dst_options['port'] = str(dst_pg.port)
self.set_auto_conf(dst_pg, dst_options)
dst_pg.slow_start(replica = True)
dst_pg.stop()

# save the condition before dry-run
content_before = self.pgdata_content(dst_pg.data_dir)

# do delta catchup
self.catchup_node(
backup_mode = 'DELTA',
source_pgdata = src_pg.data_dir,
destination_node = dst_pg,
options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream', "--dry-run"]
)

# compare data dirs before and after cathup
self.compare_pgdata(
content_before,
self.pgdata_content(dst_pg.data_dir)
)

# Cleanup
src_pg.stop()
self.del_test_dir(module_name, self.fname)

1 change: 1 addition & 0 deletions tests/expected/option_help.out
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pg_probackup - utility to manage backup/recovery of PostgreSQL database.
[--remote-proto] [--remote-host]
[--remote-port] [--remote-path] [--remote-user]
[--ssh-options]
[--dry-run]
[--help]

Read the website for details. <https://github.com/postgrespro/pg_probackup>
Expand Down
9 changes: 9 additions & 0 deletions travis/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,20 @@ source pyenv/bin/activate
pip3 install testgres

echo "############### Testing:"
echo PG_PROBACKUP_PARANOIA=${PG_PROBACKUP_PARANOIA}
echo ARCHIVE_COMPRESSION=${ARCHIVE_COMPRESSION}
echo PGPROBACKUPBIN_OLD=${PGPROBACKUPBIN_OLD}
echo PGPROBACKUPBIN=${PGPROBACKUPBIN}
echo PGPROBACKUP_SSH_REMOTE=${PGPROBACKUP_SSH_REMOTE}
echo PGPROBACKUP_GDB=${PGPROBACKUP_GDB}
echo PG_PROBACKUP_PTRACK=${PG_PROBACKUP_PTRACK}
if [ "$MODE" = "basic" ]; then
export PG_PROBACKUP_TEST_BASIC=ON
echo PG_PROBACKUP_TEST_BASIC=${PG_PROBACKUP_TEST_BASIC}
python3 -m unittest -v tests
python3 -m unittest -v tests.init
else
echo PG_PROBACKUP_TEST_BASIC=${PG_PROBACKUP_TEST_BASIC}
python3 -m unittest -v tests.$MODE
fi

Expand Down
163B
0