8000 Add mm_disable_node function · m99coder/postgres_cluster@f510c8f · GitHub
[go: up one dir, main page]

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f510c8f

Browse files
committed
Add mm_disable_node function
1 parent 63663e5 commit f510c8f

File tree

2 files changed

+116
-61
lines changed

2 files changed

+116
-61
lines changed

contrib/multimaster/multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,7 @@ CREATE FUNCTION mm_stop_replication() RETURNS void
99
AS 'MODULE_PATHNAME','mm_stop_replication'
1010
LANGUAGE C;
1111

12+
CREATE FUNCTION mm_disable_node(node integer) RETURNS void
13+
AS 'MODULE_PATHNAME','mm_disable_node'
14+
LANGUAGE C;
15+

contrib/multimaster/multimaster.c

Lines changed: 112 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ typedef struct
5959
TransactionId minXid; /* XID of oldest transaction visible by any active transaction (local or global) */
6060
TransactionId nextXid; /* next XID for local transaction */
6161
size_t nReservedXids; /* number of XIDs reserved for local transactions */
62+
int64 disabledNodeMask;
6263
int nNodes;
6364
pg_atomic_uint32 nReceivers;
6465
bool initialized;
@@ -74,13 +75,16 @@ typedef struct
7475
#define DTM_SHMEM_SIZE (64*1024*1024)
7576
#define DTM_HASH_SIZE 1003
7677

78+
#define BIT_SET(mask, bit) ((mask) & ((int64)1 << (bit)))
79+
7780
void _PG_init(void);
7881
void _PG_fini(void);
7982

8083
PG_MODULE_MAGIC;
8184

8285
PG_FUNCTION_INFO_V1(mm_start_replication);
8386
PG_FUNCTION_INFO_V1(mm_stop_replication);
87+
PG_FUNCTION_INFO_V1(mm_disable_node);
8488

8589
static Snapshot DtmGetSnapshot(Snapshot snapshot);
8690
static void DtmMergeWithGlobalSnapshot(Snapshot snapshot);
@@ -108,6 +112,7 @@ static void DtmBackgroundWorker(Datum arg);
108112
static void MMMarkTransAsLocal(TransactionId xid);
109113
static BgwPool* MMPoolConstructor(void);
110114
static bool MMRunUtilityStmt(PGconn* conn, char const* sql);
115+
static void MMBroadcastUtilityStmt(char const* sql, bool ignoreError);
111116

112117
static HTAB* xid_in_doubt;
113118
static HTAB* local_trans;
@@ -737,6 +742,7 @@ static void DtmInitialize()
737742
dtm->nReservedXids = 0;
738743
dtm->minXid = InvalidTransactionId;
739744
dtm->nNodes = MMNodes;
745+
dtm->disabledNodeMask = 0;
740746
pg_atomic_write_u32(&dtm->nReceivers, 0);
741747
dtm->initialized = false;
742748
BgwPoolInit(&dtm->pool, MMExecutor, MMDatabaseName, MMQueueSize);
@@ -1209,6 +1215,22 @@ mm_stop_replication(PG_FUNCTION_ARGS)
12091215
PG_RETURN_VOID();
12101216
}
12111217

1218+
Datum
1219+
mm_disable_node(PG_FUNCTION_ARGS)
1220+
{
1221+
int nodeId = PG_GETARG_INT32(0);
1222+
if (!BIT_SET(dtm->disabledNodeMask, nodeId))
1223+
{
1224+
dtm->disabledNodeMask |= ((int64)1 << nodeId);
1225+
dtm->nNodes -= 1;
1226+
if (!IsTransactionBlock())
1227+
{
1228+
MMBroadcastUtilityStmt(psprintf("select mm_disable_node(%d)", nodeId), true);
1229+
}
1230+
}
1231+
PG_RETURN_VOID();
1232+
}
1233+
12121234
/*
12131235
* Execute statement with specified parameters and check its result
12141236
*/
@@ -1224,6 +1246,95 @@ static bool MMRunUtilityStmt(PGconn* conn, char const* sql)
12241246
return ret;
12251247
}
12261248

1249+
static void MMBroadcastUtilityStmt(char const* sql, bool ignoreError)
1250+
{
1251+
char* conn_str = pstrdup(MMConnStrs);
1252+
char* conn_str_end = conn_str + strlen(conn_str);
1253+
int i = 0;
1254+
int64 disabledNodeMask = dtm->disabledNodeMask;
1255+
int failedNode = -1;
1256+
char const* erro 57AE rMsg = NULL;
1257+
PGconn **conns = palloc0(sizeof(PGconn*)*MMNodes);
1258+
1259+
while (conn_str < conn_str_end)
1260+
{
1261+
char* p = strchr(conn_str, ',');
1262+
if (p == NULL) {
1263+
p = conn_str_end;
1264+
}
1265+
*p = '\0';
1266+
if (!BIT_SET(disabledNodeMask, i))
1267+
{
1268+
conns[i] = PQconnectdb(conn_str);
1269+
if (PQstatus(conns[i]) != CONNECTION_OK)
1270+
{
1271+
if (ignoreError)
1272+
{
1273+
PQfinish(conns[i]);
1274+
conns[i] = NULL;
1275+
} else {
1276+
failedNode = i;
1277+
do {
1278+
PQfinish(conns[i]);
1279+
} while (--i >= 0);
1280+
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
1281+
}
1282+
}
1283+
}
1284+
conn_str = p + 1;
1285+
i += 1;
1286+
}
1287+
Assert(i == MMNodes);
1288+
1289+
for (i = 0; i < MMNodes; i++)
1290+
{
1291+
if (conns[i])
1292+
{
1293+
if (!MMRunUtilityStmt(conns[i], "BEGIN TRANSACTION") && !ignoreError)
1294+
{
1295+
errorMsg = "Failed to start transaction at node %d";
1296+
failedNode = i;
1297+
break;
1298+
}
1299+
if (!MMRunUtilityStmt(conns[i], sql) && !ignoreError)
1300+
{
1301+
errorMsg = "Failed to run command at node %d";
1302+
failedNode = i;
1303+
break;
1304+
}
1305+
}
1306+
}
1307+
if (failedNode >= 0 && !ignoreError)
1308+
{
1309+
for (i = 0; i < MMNodes; i++)
1310+
{
1311+
if (conns[i])
1312+
{
1313+
MMRunUtilityStmt(conns[i], "ROLLBACK TRANSACTION");
1314+
}
1315+
}
1316+
} else {
1317+
for (i = 0; i < MMNodes; i++)
1318+
{
1319+
if (conns[i] && !MMRunUtilityStmt(conns[i], "COMMIT TRANSACTION") && !ignoreError)
1320+
{
1321+
errorMsg = "Commit failed at node %d";
1322+
failedNode = i;
1323+
}
1324+
}
1325+
}
1326+
for (i = 0; i < MMNodes; i++)
1327+
{
1328+
if (conns[i])
1329+
{
1330+
PQfinish(conns[i]);
1331+
}
1332+
}
1333+
if (!ignoreError && failedNode >= 0)
1334+
{
1335+
elog(ERROR, errorMsg, failedNode+1);
1336+
}
1337+
}
12271338

12281339
static void MMProcessUtility(Node *parsetree, const char *queryString,
12291340
ProcessUtilityContext context, ParamListInfo params,
@@ -1267,67 +1378,7 @@ static void MMProcessUtility(Node *parsetree, const char *queryString,
12671378
MMIsDistributedTrans = false;
12681379
}
12691380
} else {
1270-
char* conn_str = pstrdup(MMConnStrs);
1271-
char* conn_str_end = conn_str + strlen(conn_str);
1272-
int i = 0;
1273-
int failedNode = -1;
1274-
char const* errorMsg = NULL;
1275-
PGconn **conns;
1276-
conns = palloc(sizeof(PGconn*)*MMNodes);
1277-
1278-
while (conn_str < conn_str_end) {
1279-
char* p = strchr(conn_str, ',');
1280-
if (p == NULL) {
1281-
p = conn_str_end;
1282-
}
1283-
*p = '\0';
1284-
conns[i] = PQconnectdb(conn_str);
1285-
if (PQstatus(conns[i]) != CONNECTION_OK)
1286-
{
1287-
failedNode = i;
1288-
do {
1289-
PQfinish(conns[i]);
1290-
} while (--i >= 0);
1291-
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
1292-
}
1293-
conn_str = p + 1;
1294-
i += 1;
1295-
}
1296-
Assert(i == MMNodes);
1297-
1298-
for (i = 0; i < MMNodes; i++) {
1299-
if (!MMRunUtilityStmt(conns[i], "BEGIN TRANSACTION"))
1300-
{
1301-
errorMsg = "Failed to start transaction at node %d";
1302-
failedNode = i;
1303-
break;
1304-
}
1305-
if (!MMRunUtilityStmt(conns[i], queryString))
1306-
{
1307-
errorMsg = "Failed to run command at node %d";
1308-
failedNode = i;
1309-
break;
1310-
}
1311-
}
1312-
if (failedNode >= 0)
1313-
{
1314-
for (i = 0; i < MMNodes; i++) {
1315-
MMRunUtilityStmt(conns[i], "ROLLBACK TRANSACTION");
1316-
}
1317-
} else {
1318-
for (i = 0; i < MMNodes; i++) {
1319-
if (!MMRunUtilityStmt(conns[i], "COMMIT TRANSACTION")) {
1320-
errorMsg = "Commit failed at node %d";
1321-
failedNode = i;
1322-
}
1323-
}
1324-
}
1325-
for (i = 0; i < MMNodes; i++) {
1326-
PQfinish(conns[i]);
1327-
}
1328-
if (failedNode >= 0) {
1329-
elog(ERROR, errorMsg, failedNode+1);
1330-
}
1381+
MMBroadcastUtilityStmt(queryString, false);
13311382
}
13321383
}
13331384
static void

0 commit comments

Comments
 (0)
0