[go: up one dir, main page]

0% found this document useful (0 votes)
66 views6 pages

Data Pipeline

This document provides steps to set up Apache Spark on a system and use it to extract data from a SQL Server database into a CSV file, then upload that file to an S3 bucket. It installs Spark, configures environment variables and paths, starts the Spark master and slaves, downloads necessary JAR files, writes a Python script to query the database and save to a CSV, configure AWS credentials, and run the Python script to extract the data, save to local disk, then upload to S3.

Uploaded by

Kaushal
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
66 views6 pages

Data Pipeline

This document provides steps to set up Apache Spark on a system and use it to extract data from a SQL Server database into a CSV file, then upload that file to an S3 bucket. It installs Spark, configures environment variables and paths, starts the Spark master and slaves, downloads necessary JAR files, writes a Python script to query the database and save to a CSV, configure AWS credentials, and run the Python script to extract the data, save to local disk, then upload to S3.

Uploaded by

Kaushal
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 6

java -version; git --version; python --version

cd /opt/spark
sudo wget https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

tar xvf spark-*

ls -lrt spark-*

vi ~/.profile

export SPARK_HOME=/opt/spark
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_PYTHON=/usr/bin/python3

source ~/.profile

start-master.sh
http://127.0.0.1:8080/

start-slave.sh spark://0.0.0.0:8082

start-slave.sh spark://waplgmdalin_lab01:8082

start-slave.sh spark://0.0.0.0:8082 -c 4 -m 512M


stop-master.sh

stop-slave.sh

export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH

sudo wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.30/aws-java-sdk-1.7.4.jar


sudo wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.4.jar
sudo wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar

vi sqlfile.py

query1 = """(select * from sales-data where date >= '2021-01-01' and status ='Completed')"""

vi config.ini
ACCESS_KEY=BBIYYTU6L4U47BGGG&^CF
SECRET_KEY=Uy76BBJabczF7h6vv+BFssqTVLDVkKYN/f31puYHtG
BUCKET_NAME=s3-bucket-name
DIRECTORY=sales-data-directory

[mssql]
url = jdbc:sqlserver://PPOP24888S08YTA.APAC.PAD.COMPANY-DSN.COM:1433;databaseName=Transactions
database = Transactions
user = MSSQL-USER
password = MSSQL-Password
dbtable = sales-data
filename = data_extract.csv

from pyspark.sql import SparkSession


import shutil
import os
import glob
import boto3
from sqlfile import query1
from configparser import ConfigParser

appName = "PySpark ETL Example - via MS-SQL JDBC"


master = "local"
spark = SparkSession
.builder
.master(master)
.appName(appName)
.config("spark.driver.extraClassPath","/opt/spark/jars/mssql-jdbc-9.2.1.jre8.jar")
.getOrCreate()

url = config.get('mssql-onprem', 'url')


user = config.get('mssql-onprem', 'user')
password = config.get('mssql-onprem', 'password')
dbtable = config.get('mssql-onprem', 'dbtable')
filename = config.get('mssql-onprem', 'filename')
ACCESS_KEY=config.get('aws', 'ACCESS_KEY')

SECRET_KEY=config.get('aws', 'SECRET_KEY')

BUCKET_NAME=config.get('aws', 'BUCKET_NAME')

DIRECTORY=config.get('aws', 'DIRECTORY')

jdbcDF = spark.read.format("jdbc")
.option("url", url)
.option("query", query2)
.option("user", user)
.option("password", password)
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
jdbcDF.show(5)

path = 'output'
jdbcDF.coalesce(1).write.option("header","true").option("sep",",").mode("overwrite").csv(path)
shutil.move(glob.glob(os.getcwd() + '/' + path + '/' + r'*.csv')[0], os.getcwd()+ '/' + filename )
shutil.rmtree(os.getcwd() + '/' + path)

session = boto3.Session(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
bucket_name=BUCKET_NAME
s3_output_key=DIRECTORY + filename
s3 = session.resource('s3')
# Filename - File to upload
# Bucket - Bucket to upload to (the top level directory under AWS S3)
# Key - S3 object name (can contain subdirectories). If not specified then file_name is used
s3.meta.client.upload_file(Filename=filename, Bucket=bucket_name, Key=s3_output_key)

if os.path.isfile(filename):
os.remove(filename)
else:
print("Error: %s file not found" % filename)

You might also like