Data Pipeline
Data Pipeline
cd /opt/spark
sudo wget https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
ls -lrt spark-*
vi ~/.profile
export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_PYTHON=/usr/bin/python3
source ~/.profile
start-master.sh
http://127.0.0.1:8080/
start-slave.sh spark://0.0.0.0:8082
start-slave.sh spark://waplgmdalin_lab01:8082
stop-slave.sh
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH
vi sqlfile.py
query1 = """(select * from sales-data where date >= '2021-01-01' and status ='Completed')"""
vi config.ini
ACCESS_KEY=BBIYYTU6L4U47BGGG&^CF
SECRET_KEY=Uy76BBJabczF7h6vv+BFssqTVLDVkKYN/f31puYHtG
BUCKET_NAME=s3-bucket-name
DIRECTORY=sales-data-directory
[mssql]
url = jdbc:sqlserver://PPOP24888S08YTA.APAC.PAD.COMPANY-DSN.COM:1433;databaseName=Transactions
database = Transactions
user = MSSQL-USER
password = MSSQL-Password
dbtable = sales-data
filename = data_extract.csv
SECRET_KEY=config.get('aws', 'SECRET_KEY')
BUCKET_NAME=config.get('aws', 'BUCKET_NAME')
DIRECTORY=config.get('aws', 'DIRECTORY')
jdbcDF = spark.read.format("jdbc")
.option("url", url)
.option("query", query2)
.option("user", user)
.option("password", password)
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
jdbcDF.show(5)
path = 'output'
jdbcDF.coalesce(1).write.option("header","true").option("sep",",").mode("overwrite").csv(path)
shutil.move(glob.glob(os.getcwd() + '/' + path + '/' + r'*.csv')[0], os.getcwd()+ '/' + filename )
shutil.rmtree(os.getcwd() + '/' + path)
session = boto3.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
bucket_name=BUCKET_NAME
s3_output_key=DIRECTORY + filename
s3 = session.resource('s3')
# Filename - File to upload
# Bucket - Bucket to upload to (the top level directory under AWS S3)
# Key - S3 object name (can contain subdirectories). If not specified then file_name is used
s3.meta.client.upload_file(Filename=filename, Bucket=bucket_name, Key=s3_output_key)
if os.path.isfile(filename):
os.remove(filename)
else:
print("Error: %s file not found" % filename)