From 0ef0a577d37e02cd2a9f2e8a2ee51a63fa3068b8 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Thu, 25 Jun 2020 19:00:48 +0530 Subject: [PATCH 1/5] Reconciler as service --- .circleci/config.yml | 7 +- config/default.js | 5 +- package.json | 3 +- src/reconciler.js | 324 +++++++++++++++++++++++-------------------- 4 files changed, 180 insertions(+), 159 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 22fe3cf..6855c7c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -62,9 +62,9 @@ builddeploy_steps: &builddeploy_steps # # reconciler deployment # rm -rf buildenvvar # ./unsetenv.sh - # ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar - # source buildenvvar - # ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME} + ./buildenv.sh -e $DEPLOY_ENV -b ${LOGICAL_ENV}-${APPNAME}-reconciler-deployvar + source buildenvvar + ./master_deploy.sh -d ECS -e $DEPLOY_ENV -t latest -s ${LOGICAL_ENV}-global-appvar,${LOGICAL_ENV}-${APPNAME}-appvar -i ${APPNAME} @@ -98,6 +98,7 @@ workflows: only: - dev - dev-chr-fix + - dev-reconciler-fix # Production builds are exectuted only on tagged commits to the # master branch. diff --git a/config/default.js b/config/default.js index 44cb065..b7ddbae 100644 --- a/config/default.js +++ b/config/default.js @@ -72,6 +72,9 @@ module.exports = { RECONCILER_DIFF_PERIOD : parseInt(process.env.RECONCILER_DIFF_PERIOD || 10), RECONCILER_DURATION_TYPE : process.env.RECONCILER_DURATION_TYPE || 'm', RECONCILER_RETRY_COUNT : parseInt(process.env.RECONCILER_RETRY_COUNT || 1), - RECONCILER_POST_URL : process.env.RECONCILER_POST_URL || 'http://ifxpg-migrator.topcoder-dev.com/kafkaevents' + RECONCILER_POST_URL : process.env.RECONCILER_POST_URL || 'http://ifxpg-migrator.topcoder-dev.com/kafkaevents', + RECONCILE_DYNAMODB : process.env.RECONCILE_DYNAMODB || 'true', + RECONCILE_PGSTATUS : process.env.RECONCILE_PGSTATUS || 'true', + RECONCILE_TIMESCHEDULE : process.env.RECONCILE_TIMESCHEDULE || '*/3 * * * *' } } diff --git a/package.json b/package.json index 5b91f09..d7a28df 100644 --- a/package.json +++ b/package.json @@ -37,7 +37,8 @@ "topcoder-healthcheck-dropin": "^1.0.3", "util": "latest", "get-parameter-names": "^0.3.0", - "lodash": "^4.17.4" + "lodash": "^4.17.4", + "node-cron" : "latest" }, "devDependencies": { "chai": "^4.2.0", diff --git a/src/reconciler.js b/src/reconciler.js index bb955da..9eeeb54 100644 --- a/src/reconciler.js +++ b/src/reconciler.js @@ -1,5 +1,6 @@ const config = require('config') -//Establishing connection in postgress +const cron = require("node-cron"); +const express = require("express"); const pg = require('pg') const request = require("request"); const pgOptions = config.get('POSTGRES') @@ -8,185 +9,200 @@ const pgConnectionString = `postgresql://${pgOptions.user}:${pgOptions.password} const logger = require('./common/logger') const _ = require('lodash') var AWS = require("aws-sdk"); -var params +var pgClient var docClient = new AWS.DynamoDB.DocumentClient({ region: config.DYNAMODB.REGION, convertEmptyValues: true }); -async function dynamo_pg_validation() { - ElapsedTime = config.RECONCILER.RECONCILER_ELAPSE_TIME - //ElapsedTime = 4995999000 - params = { - TableName: config.DYNAMODB.TABLENAME, - FilterExpression: "NodeSequenceID between :time_1 and :time_2", - ExpressionAttributeValues: { - ":time_1": Date.now() - ElapsedTime, - ":time_2": Date.now() - } +ElapsedTime = config.RECONCILER.RECONCILER_ELAPSE_TIME +var params = { + TableName: config.DYNAMODB.TABLENAME, + FilterExpression: "NodeSequenceID between :time_1 and :time_2", + ExpressionAttributeValues: { + ":time_1": Date.now() - ElapsedTime, + ":time_2": Date.now() } - logger.info("scanning"); - await docClient.scan(params, onScan); - return +} +async function dynamo_pg_validation() { + return new Promise(async function (resolve, reject) { + logger.info("scanning"); + await docClient.scan(params, onScan); + resolve(true) + }); } async function onScan(err, data) { - if (err) { - logger.error("Unable to scan the table.") - logger.logFullError(err); - } else { - - logger.info("Scan succeeded."); - data.Items.forEach(async function (item) { - await validate_data_in_pg(item.SequenceID, item.pl_document) - }); - - // continue scanning if we have more movies, because - // scan can retrieve a maximum of 1MB of data - - if (typeof data.LastEvaluatedKey != "undefined") { - logger.info("Scanning for more..."); - params.ExclusiveStartKey = data.LastEvaluatedKey; - await docClient.scan(params, onScan); + return new Promise(async function (resolve, reject) { + if (err) { + logger.error("Unable to scan the table.") + logger.logFullError(err); + reject(err) } else { - return + logger.info("Scan succeeded."); + await Promise.all(ata.Items.map(async (item) => { + //data.Items.forEach(async function (item) { + await validate_data_in_pg(item.SequenceID, item.pl_document) + })); + + if (typeof data.LastEvaluatedKey != "undefined") { + logger.info("Scanning for more..."); + params.ExclusiveStartKey = data.LastEvaluatedKey; + await docClient.scan(params, onScan); + resolve(true) + } else { + resolve(true) + } } - } + }); } async function validate_data_in_pg(SequenceID, payload) { - const pgClient = new pg.Client(pgConnectionString) - // await setupPgClient() - try { - await pgClient.connect() - logger.debug('Connected to Pg Client2 Audit:') - } catch (err) { - logger.error('Could not setup postgres client2') - logger.logFullError(err) - process.exit() - } - logger.debug(SequenceID); - const sqlquerytovalidate = 'SELECT COUNT(*) FROM audit_log WHERE "SEQ_ID"=$1'; - const sqlquerytovalidate_values = [SequenceID] - logger.debug(sqlquerytovalidate); - await pgClient.query(sqlquerytovalidate, sqlquerytovalidate_values, async (err, res) => { + return new Promise(async function (resolve, reject) { + logger.debug(SequenceID); + const sqlquerytovalidate = 'SELECT COUNT(*) FROM audit_log WHERE "SEQ_ID"=$1'; + const sqlquerytovalidate_values = [SequenceID] + logger.debug(sqlquerytovalidate); + await pgClient.query(sqlquerytovalidate, sqlquerytovalidate_values, async (err, res) => { + if (err) { + var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"` + logger.debug(errmsg0) + reject(errmsg0) + } else { + logger.info("validating whether data exist---------------------"); + const data = res.rows; + await Promise.all(data.map(async (row) => { + if (row['count'] == 0) { + await posttopic(payload, 0) + logger.debug("post the topic"); + } else { + logger.info(`${SequenceID} is exist in pg`) + } + resolve(true) + })); - if (err) { - var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"` - logger.debug(errmsg0) - // await callposttoslack(errmsg0) - } else { - logger.info("validating data count---------------------"); - const data = res.rows; - data.forEach(async (row) => { - if (row['count'] == 0) { - await posttopic(payload, 0) - logger.debug("post the topic"); - } else { - logger.info(`${SequenceID} is exist in pg`) - } - }); - } - pgClient.end(); + } + }); }); - return } async function repostfailure() { - const pgClient = new pg.Client(pgConnectionString) - // await setupPgClient() - try { - await pgClient.connect() - logger.debug('Connected to Pg Client2 Audit:') - } catch (err) { - logger.error('Could not setup postgres client2') - logger.logFullError(err) - process.exit() - } + return new Promise(async function (resolve, reject) { + rec_ignore_status = config.RECONCILER.RECONCILER_IGNORE_STATUS + rec_start_elapse = config.RECONCILER.RECONCILER_START_ELAPSE_TIME + rec_diff_period = config.RECONCILER.RECONCILER_DIFF_PERIOD //Need to be equal to or greater than scheduler time + rec_interval_type = config.RECONCILER.RECONCILER_DURATION_TYPE + rec_retry_count = config.RECONCILER.RECONCILER_RETRY_COUNT - // select seq_id, producer_payload, overall_status from audit_log where - // overall_status not in ('PostgresUpdated') and - // request_create_time between (timezone('utc',now()) - interval '10m') and (timezone('utc',now()) - interval '1m') and - // reconcile_status < 1 ; - rec_ignore_status = config.RECONCILER.RECONCILER_IGNORE_STATUS - rec_start_elapse = config.RECONCILER.RECONCILER_START_ELAPSE_TIME - rec_diff_period = config.RECONCILER.RECONCILER_DIFF_PERIOD //Need to be equal to or greater than scheduler time - rec_interval_type = config.RECONCILER.RECONCILER_DURATION_TYPE - rec_retry_count = config.RECONCILER.RECONCILER_RETRY_COUNT - - sql1 = `select "SEQ_ID", "PRODUCER_PAYLOAD" from audit_log where "OVERALL_STATUS" not in ($1)` - sql2 = ` and "REQUEST_CREATE_TIME" between (timezone('utc',now()) - interval '1${rec_interval_type}' * $2)` - sql3 = ` and (timezone('utc',now()) - interval '1${rec_interval_type}' * $3)` - sql4 = ` and "RECONCILE_STATUS" < $4 ;` - sqltofetchfailure = sql1 + sql2 + sql3 + sql4 - var sqltofetchfailure_values = [rec_ignore_status, rec_diff_period, rec_start_elapse, rec_retry_count] - //var sqltofetchfailure_values = [rec_ignore_status, rec_diff_period, rec_start_elapse, rec_retry_count] - logger.info('sql : ', sqltofetchfailure) - await pgClient.query(sqltofetchfailure, sqltofetchfailure_values, async (err, res) => { - if (err) { - var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"` - logger.debug(errmsg0) - // await callposttoslack(errmsg0) - } else { - logger.info("Reposting Data---------------------\n"); - const data = res.rows; - data.forEach(async (row) => { - logger.info("\npost the topic for : " + row['SEQ_ID']); - await posttopic(row['PRODUCER_PAYLOAD'], 1) - }); - } - pgClient.end(); + sql1 = `select "SEQ_ID", "PRODUCER_PAYLOAD" from audit_log where "OVERALL_STATUS" not in ($1)` + sql2 = ` and "REQUEST_CREATE_TIME" between (timezone('utc',now()) - interval '1${rec_interval_type}' * $2)` + sql3 = ` and (timezone('utc',now()) - interval '1${rec_interval_type}' * $3)` + sql4 = ` and "RECONCILE_STATUS" < $4 ;` + sqltofetchfailure = sql1 + sql2 + sql3 + sql4 + var sqltofetchfailure_values = [rec_ignore_status, rec_diff_period, rec_start_elapse, rec_retry_count] + logger.info('sql : ', sqltofetchfailure) + await pgClient.query(sqltofetchfailure, sqltofetchfailure_values, async (err, res) => { + if (err) { + var errmsg0 = `error-sync: Audit reconsiler query "${err.message}"` + logger.debug(errmsg0) + reject(err) + // await callposttoslack(errmsg0) + } else { + logger.info("Reposting Data---------------------\n"); + const data = res.rows; + await Promise.all(data.map(async (row) => { + logger.info("\npost the topic for : " + row['SEQ_ID']); + await posttopic(row['PRODUCER_PAYLOAD'], 1) + resolve(true) + })); + } + }); }); - return - } - async function postpayload_to_restapi(payload) { - let options = { - method: 'POST', - url: config.RECONCILER.RECONCILER_POST_URL, - headers: { - 'cache-control': 'no-cache', - 'Content-Type': 'application/json' - }, - body: payload, - json: true - }; - - request(options, function (error, response, body) { - if (error) { - var errmsg0 = `error-sync: Audit Reconsiler1 query "${error.message}"` - logger.debug (errmsg0) - throw new Error(error); - - } - else - { - logger.info("ReconcilerIFXtoPG : " + payload['TIME'] + "_" + payload['TABLENAME'] + " Success") - logger.debug(body); - } + return new Promise(async function (resolve, reject) { + let options = { + method: 'POST', + url: config.RECONCILER.RECONCILER_POST_URL, + headers: { + 'cache-control': 'no-cache', + 'Content-Type': 'application/json' + }, + body: payload, + json: true + }; + request(options, function (error, response, body) { + if (error) { + var errmsg0 = `error-sync: Audit Reconsiler1 query "${error.message}"` + logger.debug(errmsg0) + reject(error) + //throw new Error(error); + } else { + logger.info("ReconcilerIFXtoPG : " + payload['TIME'] + "_" + payload['TABLENAME'] + " Success") + logger.debug(body); + resolve(true) + } + }); }); - return } async function posttopic(payload, integratereconcileflag) { - logger.debug(payload + " " + integratereconcileflag); - if (integratereconcileflag == 1) { - //update payload with reconcile status - //post to rest api - let reconcile_flag = payload['RECONCILE_STATUS'] ? payload['RECONCILE_STATUS'] : 0 - reconcile_flag = reconcile_flag + 1 - payload.RECONCILE_STATUS = reconcile_flag - await postpayload_to_restapi(payload) - } else { - //post to rest api - await postpayload_to_restapi(payload) - } - return + return new Promise(async function (resolve, reject) { + try { + logger.debug(payload + " " + integratereconcileflag); + if (integratereconcileflag == 1) { + //update payload with reconcile status and post to rest api + let reconcile_flag = payload['RECONCILE_STATUS'] ? payload['RECONCILE_STATUS'] : 0 + reconcile_flag = reconcile_flag + 1 + payload.RECONCILE_STATUS = reconcile_flag + await postpayload_to_restapi(payload) + resolve(true) + } else { + //post to rest api + await postpayload_to_restapi(payload) + resolve(true) + } + } catch (errmsg) { + reject(errmsg) + } + }); } async function main() { - //await setupPgClient() - await dynamo_pg_validation() - //await pgClient.on('end') - await repostfailure() + return new Promise(async function (resolve, reject) { + try { + pgClient = new pg.Client(pgConnectionString) + try { + await pgClient.connect() + logger.debug('Connected to Pg Client2 Audit:') + } catch (err) { + logger.error('Could not setup postgres client2') + logger.logFullError(err) + process.exit() + } + if (config.RECONCILER.RECONCILE_DYNAMODB == 'true') { + await dynamo_pg_validation() + } + //await pgClient.on('end') + if (config.RECONCILER.RECONCILE_PGSTATUS == 'true') { + await repostfailure() + } + await pgClient.end(); + resolve(true); + } catch (e) { + reject(e); + } + }); } -main() + +const app = express(); +const port = process.env.PORT || 8080; +SchedulerTime = config.RECONCILER.RECONCILE_TIMESCHEDULE + +cron.schedule(SchedulerTime, function () { + logger.info("Running task as per schedule"); + main() +}); +app.get('/', function (req, res) { + res.send('hello world') +}) +app.listen(port); +logger.info('Server started! At http://localhost:' + port); \ No newline at end of file From 27de7e34dc9f8dd240827dd2b04d744f37b920b2 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Thu, 25 Jun 2020 19:45:58 +0530 Subject: [PATCH 2/5] Reconciler as service bug --- src/reconciler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reconciler.js b/src/reconciler.js index 9eeeb54..dde82c7 100644 --- a/src/reconciler.js +++ b/src/reconciler.js @@ -38,7 +38,7 @@ async function onScan(err, data) { reject(err) } else { logger.info("Scan succeeded."); - await Promise.all(ata.Items.map(async (item) => { + await Promise.all(data.Items.map(async (item) => { //data.Items.forEach(async function (item) { await validate_data_in_pg(item.SequenceID, item.pl_document) })); From 5c66d9f5ce56f1aaeec7293282ec958d941d77fd Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Thu, 25 Jun 2020 20:06:52 +0530 Subject: [PATCH 3/5] Reconciler as service bug --- src/reconciler.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/reconciler.js b/src/reconciler.js index dde82c7..b0c4b3d 100644 --- a/src/reconciler.js +++ b/src/reconciler.js @@ -57,7 +57,7 @@ async function onScan(err, data) { async function validate_data_in_pg(SequenceID, payload) { return new Promise(async function (resolve, reject) { - logger.debug(SequenceID); + logger.info(`Validating sequence id : ${SequenceID}`); const sqlquerytovalidate = 'SELECT COUNT(*) FROM audit_log WHERE "SEQ_ID"=$1'; const sqlquerytovalidate_values = [SequenceID] logger.debug(sqlquerytovalidate); @@ -71,10 +71,12 @@ async function validate_data_in_pg(SequenceID, payload) { const data = res.rows; await Promise.all(data.map(async (row) => { if (row['count'] == 0) { + logger.info(`posting the topic from dynamodb : ${SequenceID} `); await posttopic(payload, 0) - logger.debug("post the topic"); + //logger.debug("post the topic"); + } else { - logger.info(`${SequenceID} is exist in pg`) + logger.info(`${SequenceID} is exist in pg. So skipping dynamo db post`) } resolve(true) })); @@ -86,6 +88,7 @@ async function validate_data_in_pg(SequenceID, payload) { async function repostfailure() { return new Promise(async function (resolve, reject) { + logger.info("Vaildating audit log on PG"); rec_ignore_status = config.RECONCILER.RECONCILER_IGNORE_STATUS rec_start_elapse = config.RECONCILER.RECONCILER_START_ELAPSE_TIME rec_diff_period = config.RECONCILER.RECONCILER_DIFF_PERIOD //Need to be equal to or greater than scheduler time @@ -150,6 +153,7 @@ async function posttopic(payload, integratereconcileflag) { logger.debug(payload + " " + integratereconcileflag); if (integratereconcileflag == 1) { //update payload with reconcile status and post to rest api + logger.info("Integrated the Reconciler flag"); let reconcile_flag = payload['RECONCILE_STATUS'] ? payload['RECONCILE_STATUS'] : 0 reconcile_flag = reconcile_flag + 1 payload.RECONCILE_STATUS = reconcile_flag @@ -157,6 +161,7 @@ async function posttopic(payload, integratereconcileflag) { resolve(true) } else { //post to rest api + logger.info("Skipping the Reconciler flag"); await postpayload_to_restapi(payload) resolve(true) } From 19aac88c7f7005a6d58e85f5e0e05468c44230d4 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Mon, 29 Jun 2020 17:24:34 +0530 Subject: [PATCH 4/5] debuging data retrived from dynamo --- src/reconciler.js | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/reconciler.js b/src/reconciler.js index b0c4b3d..a6c8a15 100644 --- a/src/reconciler.js +++ b/src/reconciler.js @@ -38,9 +38,18 @@ async function onScan(err, data) { reject(err) } else { logger.info("Scan succeeded."); + if (data === undefined || data.length == 0) { + logger.info("No datat retrived") + } + else + { + logger.info("data retrived"); + } await Promise.all(data.Items.map(async (item) => { //data.Items.forEach(async function (item) { - await validate_data_in_pg(item.SequenceID, item.pl_document) + logger.info("items retrived dynamodb : " + JSON.stringify(item)); + await validate_data_in_pg(item.SequenceID, item.pl_document) + })); if (typeof data.LastEvaluatedKey != "undefined") { From 7ce79d8bee080f53bb6a0a9b5319b2f11c5327e8 Mon Sep 17 00:00:00 2001 From: Gunasekar-K Date: Mon, 29 Jun 2020 17:36:53 +0530 Subject: [PATCH 5/5] debuging data retrived from dynamo --- src/reconciler.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/reconciler.js b/src/reconciler.js index a6c8a15..89d6542 100644 --- a/src/reconciler.js +++ b/src/reconciler.js @@ -43,7 +43,8 @@ async function onScan(err, data) { } else { - logger.info("data retrived"); + logger.info("data retrived" + JSON.stringify(data)); + } await Promise.all(data.Items.map(async (item) => { //data.Items.forEach(async function (item) {