10000 Parallel COPY FROM by ololobus · Pull Request #2 · ololobus/postgres · GitHub
[go: up one dir, main page]

Skip to content

Parallel COPY FROM #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Per row log outputs commented
  • Loading branch information
ololobus committed Aug 18, 2017
commit db5271de4d82b0faaeb40abc2f3ee52fd7b7cef3
26 changes: 18 additions & 8 deletions src/backend/commands/copy.c
D943
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
NULL, stmt->attlist, stmt->options);

if (cstate->allow_parallel) /* copy from file to database */
if (cstate->allow_parallel) /* copy from file to database */
// if (false)
{
*processed = ParallelCopyFrom(cstate, pstate, rel, stmt->filename, stmt->is_program,
stmt->attlist, stmt->options);
Expand Down Expand Up @@ -5406,7 +5407,7 @@ CopyFromBgwMainLoop(Datum main_arg)
elog(LOG, "BGWorker #%d: got zero-length message, stopping", myworkernumber);
break;
}
elog(LOG, "BGWorker #%d processing line: %s", myworkernumber, msg);
// elog(LOG, "BGWorker #%d processing line: %s", myworkernumber, msg);

cstate->line_buf.data = msg;
cstate->line_buf.len = len;
Expand Down Expand Up @@ -5747,8 +5748,11 @@ CopyFromBgwMainLoop(Datum main_arg)
// }
LWLockRelease(CopyFromBgwLock);

elog(LOG, "BGWorker #%d processed: %lu", myworkernumber, processed);

/* Signal main process that we are done. */
ConditionVariableBroadcast(&cv);
// ConditionVariableBroadcast(&cv);
SetLatch(&registrant->procLatch);

// TODO Segmentation fault:
// * frame #0: 0x0000000107519678 postgres`GetMemoryChunkContext(pointer=0x00007fdf268bf858) at memutils.h:124
Expand Down Expand Up @@ -6120,7 +6124,13 @@ wait_for_workers_to_finish(volatile ParallelState *pst)

elog(LOG, "Going to sleep again");
/* Wait for the workers to wake us up. */
ConditionVariableSleep(&cv, WAIT_EVENT_COPY_FROM_BGWORKERS_FINISHED);
// ConditionVariableSleep(&cv, WAIT_EVENT_COPY_FROM_BGWORKERS_FINISHED);

/* Wait to be signalled. */
WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION);

/* Reset the latch so we don't spin. */
ResetLatch(MyLatch);

/* An interrupt may have occurred while we were waiting. */
CHECK_FOR_INTERRUPTS();
Expand All @@ -6144,11 +6154,11 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
ParallelState *pst;
dsm_segment *seg;
int32 nworkers = max_parallel_workers_per_gather;
int64 queue_size = 100;
int64 queue_size = 100000;
shm_mq_handle **mq_handles;
shm_mq_result shmq_res;
int last_worker_used = 0;
int message_size = sizeof(char);
int message_size = sizeof(char) * 80;

mq_handles = palloc0(sizeof(shm_mq_handle *) * nworkers);

Expand All @@ -6157,7 +6167,7 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
uint64 processed = 0;

// MQ size = 100 messages x 80 chars each
pst = setup_parallel_copy_from(message_size * 80 * queue_size,
pst = setup_parallel_copy_from(message_size * queue_size,
nworkers,
&seg,
&mq_handles,
Expand Down Expand Up @@ -6211,7 +6221,7 @@ ParallelCopyFrom(CopyState cstate, ParseState *pstate,
}
else
{
elog(LOG, "Sending line #%d to BGWorker #%d", cstate->cur_lineno, last_worker_used + 1);
// elog(LOG, "Sending line #%d to BGWorker #%d", cstate->cur_lineno, last_worker_used + 1);

shmq_res = shm_mq_send(mq_handles[++last_worker_used - 1], cstate->line_buf.len, cstate->line_buf.data, false);
if (shmq_res != SHM_MQ_SUCCESS)
Expand Down
0