Skip to main content

2 posts tagged with "Azure Storage"

View All Tags

ETL Pipeline with Azure Data Factory and Azure Databricks

Β· 9 min read

ETL Pipeline with Azure Data Factory and Azure Databricks to process CSV files of global emissions and the planet temperature along the years.

Overview​

The main goal of this project was to learn how a typical cloud ETL works. I've learned along the way, the only knowlege I had was the concepts from Azure Data Fundamentals that I completed recently. With that in mind I've searched in Kaggle for some datasets to play with, and I selected the Global Emissions C02 dataset and the Global Temperature. The idea was to load the file into a Azure Storage Account, process it with an Azure Databricks notebook (with PySpark) and load the processed file into a customly designed data model into a SQL Server database. This was accomplished with a creation of an Azure Data Factory Pipeline.

diagram

After processing the files and loaded the information on the SQL Database, the final results were displayed in a Metabase dashboard.

diagram diagram diagram

Azure Data Factory​

This part of the project was the one that I was most curious about, because of it's potential. I won't describe every step i did to get ADF working because it was a lot of trial and error since it was the first time working with Azure Data Factory. The goal was to call the Azure Databricks notebook and insert the processed files into a SQL Server database. To do so, the following steps were done:

Linked Services​

The first step of the process was to create the services to connect to the SQL Server database (to log the errors when they happened), the LS to connect to Azure Databricks and finally the two LS to connect to the two containers of the Storage Account. diagram

Data Sets​

After the services, it was time to create the Datasets, so I've created two datasets per CSV file, which were the datasets before and after processed.

diagram

The following example was of the CO2 (input dataset), which uses the LS_Emission_Data_Input linked service to connect to the storage account. As you can see, when hitting the "Preview Data" button, the content of the file is displayed on the screen. diagram

After processing the file, the LS_Emission_Data_Output was "called" to store the file inside the "container-emission-files-processed" container. diagram

Final Pipeline​

This was the final pipeline: diagram

It was made up of the following activities:

  1. Two Validation activities called "check_for_co2_data" and "check_for_temp_data" that check if the file is in the "container-emission-files" to be processed;
  2. When on sucess in the previous task, a call to the Databricks Notebook that processes the CSV files;
  3. Two Set Variable activities that set the description of the error and the activity name when the Databricks task ends up on error;
  4. A Stored Procedure activity that was called when there is an error in Databricks process;
  5. Two Copy Data activities that copies the files from the source container to the "processed" container;
  6. Two Delete activities that delete the processed files from the source container.

When on sucess, the pipeline fills the SQL Server database tables: diagram

Error Handling​

Error handling is a part of the job. With a lot more time (and money, God! Databricks is expensive!) to do this project i would made it more robust in terms of error validation. But the key point was to see a possible approach to do the error handling. With that, i created a Stored Procedure that writes to a specific table the error detail information: diagram diagram

Azure Databricks​

Databricks is a powerful platform. I've decided to try it. After creating a Standard Cluster (Standard_F4 with 8GB and 4 Cores), I used it's capabilities to process the two CSV files that I loaded with the help of PySpark. I could have use Pandas for this job, but let's leave it for another time. So, i built this notebook:

from pyspark.sql.functions import *


# If the mount point already exists, does not try to mount it again. Otherwise it will.
# Gets the password from the azure key vault.
mountPoint = "/mnt/container-emission-files"
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
dbutils.fs.mount(source = "wasbs://container-emission-files@saemissionfiles.blob.core.windows.net",
mount_point = mountPoint, extra_configs = {"fs.azure.account.key.saemissionfiles.blob.core.windows.net":dbutils.secrets.get(scope = "databricks-secret-scope", key = "sa-acess-key")})

#database properties
db_pass = dbutils.secrets.get(scope="databricks-secret-scope", key= "db-emission-data-pwd")
jdbcUrl = "jdbc:sqlserver://az-server-emissions.database.windows.net:1433;database=db-emissions-data"
dbConnectionProperties = {
"user" : "db-user",
"password" : db_pass
}

def write_to_database(df, table_name):
'''
Writes processed df to database
'''
df.write.mode("append").jdbc(jdbcUrl, table_name,properties=dbConnectionProperties)

#Load Data
df_global_temperature = spark.read.format("csv").option("header", "true").load(mountPoint+"/GlobalLandTemperaturesByCountry.csv")
df_co2 = spark.read.format("csv").option("header", "true").load(mountPoint+"/owid-co2-data.csv")

#Country Data
df_country = df_co2.select([Co2Columns.COUNTRY, 'iso_code']).dropDuplicates()
df_country = df_country.na.drop()
df_country = df_country.sort(df_country['iso_code'].asc()).select("*").withColumn("id_country", monotonically_increasing_id()+1)
write_to_database(df_country, "processed_data.tr_country")

df_country_info = df_co2.select(['year', 'population', 'gdp', 'iso_code']).dropDuplicates()
df_country_info = df_country_info.na.drop()
df_country_info = df_country_info.withColumnRenamed("iso_code","iso_code_info")
df_country_info = df_country_info.join(df_country, df_country['iso_code'] == df_country_info['iso_code_info'], "inner")
df_country_info = df_country_info.drop('iso_code_info', 'country', 'iso_code')
write_to_database(df_country_info, "processed_data.td_country_info")

#Temperature Data
df_global_temperature = df_global_temperature.drop('AverageTemperatureUncertainty')
df_global_temperature = df_global_temperature.withColumnRenamed('dt', 'Date')
df_global_temperature = df_global_temperature.na.drop(how="any", subset=['AverageTemperature'])
df_global_temperature = df_global_temperature.withColumn('Date',year(df_global_temperature.Date))
df_global_temperature = df_global_temperature.groupBy(group_cols).agg(avg("AverageTemperature").alias("AverageTemperature_ByYear"))

df_co2_countries = df_co2.select(df_co2['iso_code'], df_co2['country']).dropDuplicates()
df_co2_countries = df_co2_countries.na.drop()
df_temperature_processed = df_global_temperature.join(df_co2_countries, df_global_temperature["Country"] == df_co2_countries["country"], "inner")
df_temperature_processed = df_temperature_processed.withColumnRenamed('Date', 'year').withColumnRenamed('AverageTemperature_ByYear','average_temperature')
df_temperature_final = df_temperature_processed.join(df_country, df_temperature_processed["iso_code"] == df_country["iso_code"], "inner")
df_temperature_final = df_temperature_final.drop('country', 'Country', 'iso_code')
write_to_database(df_temperature_final, "processed_data.td_global_temperature")

#Methane Data
df_methane_emissions_processed = df_co2.select(['year', 'methane', 'methane_per_capita', 'iso_code']).dropDuplicates()
df_methane_emissions_processed = df_methane_emissions_processed.na.drop(how="any", subset=['iso_code'])
df_methane_emissions_processed = df_methane_emissions_processed.withColumnRenamed("iso_code","iso_code_2")
df_methane_emissions_processed = df_methane_emissions_processed.join(df_country, df_country['iso_code'] == df_methane_emissions_processed['iso_code_2'], "inner")
df_methane_emissions_processed = df_methane_emissions_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_methane_emissions_processed, "processed_data.td_produced_methane")

#Nox Data
df_nox_processed = df_co2.select(['year', 'nitrous_oxide', 'nitrous_oxide_per_capita', 'iso_code']).dropDuplicates()
df_nox_processed = df_nox_processed.na.drop(how="any", subset=['iso_code'])
df_nox_processed = df_nox_processed.withColumnRenamed("iso_code","iso_code_2")
df_nox_processed = df_nox_processed.join(df_country, df_country['iso_code'] == df_nox_processed['iso_code_2'], "inner")
df_nox_processed = df_nox_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_nox_processed, "processed_data.td_produced_nox")

#Green House Gas Data
df_ghg_processed = df_co2.select(['year', 'ghg_per_capita', 'total_ghg', 'iso_code']).dropDuplicates()
df_ghg_processed = df_ghg_processed.na.drop(how="any", subset=['iso_code'])
df_ghg_processed = df_ghg_processed.withColumnRenamed("iso_code","iso_code_2")
df_ghg_processed = df_ghg_processed.join(df_country, df_country['iso_code'] == df_ghg_processed['iso_code_2'], "inner")
df_ghg_processed = df_ghg_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_ghg_processed, "processed_data.td_produced_ghg")

#Co2 Consumption Data
df_co2_consumption_processed = df_co2.select(['year', 'consumption_co2', 'consumption_co2_per_capita', 'consumption_co2_per_gdp', 'iso_code']).dropDuplicates()
df_co2_consumption_processed = df_co2_consumption_processed.na.drop(how="any", subset=['iso_code'])
df_co2_consumption_processed = df_co2_consumption_processed.withColumnRenamed("iso_code","iso_code_2")
df_co2_consumption_processed = df_co2_consumption_processed.join(df_country, df_country['iso_code'] == df_co2_consumption_processed['iso_code_2'], "inner")
df_co2_consumption_processed = df_co2_consumption_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_co2_consumption_processed, "processed_data.td_consumption_co2")

#Co2 Production Data
df_co2_processed = df_co2.select(['year', 'cement_co2', 'cement_co2_per_capita', 'co2_including_luc', 'co2_per_capita', 'co2_per_gdp', 'coal_co2', 'coal_co2_per_capita', 'flaring_co2', 'flaring_co2_per_capita', 'gas_co2' , 'gas_co2_per_capita', 'oil_co2','oil_co2_per_capita', 'other_co2_per_capita', 'other_industry_co2', 'iso_code']).dropDuplicates()
df_co2_processed = df_co2_processed.na.drop(how="any", subset=['iso_code'])
df_co2_processed = df_co2_processed.withColumnRenamed("iso_code","iso_code_2").withColumnRenamed("other_industry_co2","other_co2").withColumnRenamed("co2_including_luc","total_co2").withColumnRenamed("co2_per_capita","total_co2_per_capita").withColumnRenamed("co2_per_capita","total_co2_per_capita").withColumnRenamed("co2_per_gdp","total_co2_per_gdp")
df_co2_processed = df_co2_processed.join(df_country, df_country['iso_code'] == df_co2_processed['iso_code_2'], "inner")
df_co2_processed = df_co2_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_co2_processed, "processed_data.td_produced_co2")

dbutils.fs.unmount("/mnt/container-emission-files")

It is a big one, but it's simple. What it does is to access the storage account (with the mount option) and load the CSV files to memory. After that, it just selects the columns from each file to have the information that i needed. All of the joins were made with the country table, to avoid duplicate entry values. After this, each dataframe was written to the database. One of the challenges was to include the secrets into the notebook, but with the help of Microsoft and Databricks documentation, I was able to avoid plain text passwords inside the notebook. To do so, with the Azure Key Vault already created, i needed to create a Secret Scope to allow me connect to the Key Vault.

diagram

After this, it was possible to access the password from the key vault with the following:

db_pass = dbutils.secrets.get(scope="databricks-secret-scope", key= "db-emission-data-pwd")

Other Azure Services​

Azure Key Vault​

To store the passwords from Storage Account and SQL Server access, I chose the Azure Key Vault to do so. This was the first time working with this service. So, I just needed to create a Key Vault, create two secrets and that's it!

diagram diagram

The great advantage of this service is that it stores the password encrypted and all of the other Azure Services can access those passwords.

Azure Storage Account​

To store the files before and after being processed, I chose an Azure Storage Account. Since this is a PoC project, the configurations were selected to be the "minimum" available to have the files in there. The only change I needed to do on the storage account was to allow the "Storage Account Key Access" option. Without changing this parameter, I was not able to run the Validation task on ADF. diagram

Since the ADF pipeline had a task to copy the files from the source container to the processed container, there was a need to create two containers, the "container-emission-files" and the "container-emission-files-processed".

diagram

Azure SQL Server​

To store the processed CSV files, I've create a SQL Server Database to write the information gathered and filtered from the CSV files.

diagram

After this done, I've designed a simple data model to settle the information. This data model had the following tables and schemas:

  • error_handling.error_log - stores the errors thrown by Azure Databricks
  • processed_data.td_consumption_co2 - stores the CO2 consumption data
  • processed_data.td_country_info- stores the details of the countries like gdp and population_
  • processed_data.td_global_temperature- stores the global temperature information
  • processed_data.td_produced_co2- stores the produced CO2 data
  • processed_data.td_produced_ghg- stores the produced Green House Gas emissions data
  • processed_data.td_produced_methane- stores the produced methane emissions data
  • processed_data.td_produced_nox- stores the produced nox emissions data
  • processed_data.tr_country- stores the countries that exist in both data files

diagram

Conclusions & Further Steps​

This was the first ETL process that I've made entirely on the Cloud. Work with Azure Data Factory and Azure Databricks was a process of trial and error until I was able to made it work. It was a "fun ride", and I am quite happy with the result. To this display the processed data, I've decided to work with Metabase again since it is, in my opinion, the most straightforward software to do dashboards! I'm a huge fan, I have to admit it! But the main goal of this project wasn't make the display of the data to shine, it was to learn the "behind the scenes" technology that can be used to ingest and transform large amounts of CSV (or other types of files) in the Cloud. For the future, it would be nice to automatically donwload the files from Kaggle (with a Python script, for example) and load it to the Azure Storage Account based on a schedule, similarly with what i've done with F1 Project into Google Cloud Storage.

Web Scraping - Portuguese Parliament Voting

Β· 9 min read

Scraping data from portuguese parliament with the help of Python library called Beautiful Soap.

Overview​

The main goal of this project was to learn how to do data extaction with web scraping. In order to practice web scraping I choosed the Portuguese Parliament Voting website to get all of the votings since 2012. The goal was to extract, process and provide an interactive dashboard to see the votings, allowing the possibility to filter by party and year.

diagram

To allow this, my implementation was based on the following architectural diagram:

The components used were:

  • Azure Cloud
  • Azure Storage Account
  • Azure Postgres Database (with Flexible Servers)
  • docker
  • Apache Superset

diagram

Data Ingestion​

To ingest the data from the voting website I used Python with a library called Beautiful Soap. With this, I was able to iterate for each voting entry and save the available PDF file and upload it to an Azure Storage Account. That was achieved with the following code:

def scrap_voting_website(self, url, text_to_search, container_name):
page = requests.get(url)
soup = BeautifulSoup(page.content, "html.parser")

results = soup.find(
id="ctl00_ctl50_g_ef1010ac_1ad2_43f3_b329_d1623b7a1246_ctl00_pnlUpdate")

job_elements = results.find_all(
"div", class_="row home_calendar hc-detail")

for job_element in job_elements:
date = job_element.find("p", class_="date")
year = job_element.find("p", class_="time")

date_fmt = date.text.strip()
year_fmt = year.text.strip()

links = job_element.find_all("a", href=True)

# Iterates for all pdfs and downloads the links to access the site
for link in links:
for text in text_to_search:
if text.casefold() in link.text.strip().casefold():
url = link['href']
file_utils = FileUtils()
file_utils.download_file_upload_az_storage(
url, link.text.strip(), date_fmt, year_fmt, container_name)

To perform the upload to the azure data storage:

    def download_file_upload_az_storage(self, *args):

try:
filename = f"{args[1]}_{args[2]}.{args[3]}.pdf"
filename = filename.replace('/', '_').replace(' ', '_').replace('\\', '_').replace('-', '_')

file = Path(f"polls-pdfs/{args[3]}/{filename}")

print(f"Downloading {filename}.", flush=True)
response = requests.get(args[0])
file.write_bytes(response.content)

except requests.exceptions.HTTPError as errh:
print ("Http Error:",errh, flush=True)
except requests.exceptions.ConnectionError as errc:
print ("Error Connecting:",errc, flush=True)
except requests.exceptions.Timeout as errt:
print ("Timeout Error:",errt, flush=True)
except requests.exceptions.RequestException as err:
print ("RequestException: General Error",err, flush=True)

azure_utils = AzureUtils()
container_name = args[4] + "/"+ args[3]

print(f"Uploading {filename} to container {container_name}.")
azure_utils.upload_blob_to_container(container_name, file, filename)

As a result, the PDF files downloaded from voting site was uploaded to the existing Azure Storage Account. The download_file_upload_az_storage function downloads the file and uploads it to the container inside the storage account. Each downloaded file is divided by year.

diagram diagram

After upload the PDF files to the Storage Account, I've used a library called PyPDF2 to read all PDF files and take out all of the URLs. This was achieved with the following function:

    def get_url_from_pdf_files(self, destination_folder, url_to_filter, arr_url_initiatives):

for filename in os.listdir(destination_folder):
PDFFile = open(f"{destination_folder}/{filename}", 'rb')

PDF = PyPDF2.PdfFileReader(PDFFile)
pages = PDF.getNumPages()
key = '/Annots'
uri = '/URI'
ank = '/A'

for page in range(pages):
pageSliced = PDF.getPage(page)
pageObject = pageSliced.getObject()
if key in pageObject.keys():
ann = pageObject[key]
for a in ann:
u = a.getObject()
if uri in u[ank].keys() and url_to_filter.casefold() in u[ank][uri].casefold():
if not u[ank][uri] in arr_url_initiatives:
arr_url_initiatives.append(u[ank][uri])

return arr_url_initiatives

With the array filled with the existing URLs from voting details page, it was time to scrape the voting details page.

I = 1
for url in arr_url_initiatives:
print(f"{str(I)} || Scraping initiative urls to get voting details.", flush=True)
arr_voting_data = scrapper_utils.scrap_initiative_website(url, arr_voting_data)
I += 1

Having the arr_voting_data filled with all of the voting model object details, it was time to insert this data into a Postgres Database. To achieve that, I've made a class that connects to the Postgres Server running on Azure and inserts the data in the staging_data schema created.

import psycopg2
import os
from dotenv import load_dotenv

load_dotenv() # take environment variables from .env.

db_name = os.environ.get("POSTGRES_DB")
db_host = os.environ.get("POSTGRES_HOST")
db_user = os.environ.get("POSTGRES_USER")
db_pass = os.environ.get("POSTGRES_PASSWORD")
db_port = os.environ.get("POSTGRES_PORT")
db_schema = os.environ.get("POSTGRES_SCHEMA")

class AZdbUtils:

def connect(self):
try:
conn = psycopg2.connect(database=db_name,
host=db_host,
user=db_user,
password=db_pass,
port=db_port)

except (Exception, psycopg2.DatabaseError) as error:
print(error)

return conn

def upload_data_db(self, arr_voting_data):

conn = self.connect()
cursor = conn.cursor()

for data in arr_voting_data:
print('Inserting data for initiative: ' + data.get_title(), flush=True)
cursor.execute('INSERT INTO staging_data.scraped_data (title, text, status, url, date, voting_favor, voting_against, voting_abstention, voting_absent) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)',
(data.get_title(), data.get_text(), data.get_status(), data.get_url(), data.get_date(), data.get_voting_favor(), data.get_voting_against(), data.get_voting_abstention(), data.get_voting_absent()))

conn.commit()

print('Executing transform function', flush=True)
cursor.execute('select staging_data.func_transform_staging_data();')
conn.commit()

if conn is not None:
conn.close()
print('Database connection closed.', flush=True)

Data Processing​

As you can see in the previous chapter, the final step of the upload_data_db function is to call a function named func_transform_staging_data. This function cleans the data and processes it to insert in the refined_data schema.

diagram

The two created schemas are:

  • staging_data - scraped data is inserted in "raw" format.
    • Table scraped_data diagram
  • refined_data - post-processed data
    • Table initiative - Stores the initiative details (text, title, created date, url)
    • Table initiative_voting - Stores the foreign keys to party, voting_status and initiative tables. The combination of the three keys make one composed primary key.
    • Table ref_party - Stores the party details
    • Table ref_initiative_result - Stores the initiative result details (approved, not approved, approved by all)
    • Table ref_voting_status - Stores the initiative voting (favor, against, absent, away)

diagram

To perform the data transformation, I've created the following Postgres function:

CREATE OR REPLACE FUNCTION staging_data.func_transform_staging_data()
RETURNS integer
LANGUAGE plpgsql
AS $function$
declare
SUCCESS INTEGER;
_row record;
r_voting_favor text;
r_voting_against text;
r_voting_abstention text;
r_voting_away text;
error_loop bool;
ret_initiative_id BIGINT;
party_vote_iterator integer;

arr_voting_favor bigint[] DEFAULT ARRAY[]::bigint[];
arr_voting_against bigint[] DEFAULT ARRAY[]::bigint[];
arr_voting_abstent bigint[] DEFAULT ARRAY[]::bigint[];
arr_voting_away bigint[] DEFAULT ARRAY[]::bigint[];
aux_id_ref_party bigint;

id_status_favor int;
id_status_against int;
id_status_abstent int;
id_status_away int;
id_initiative_result int;

begin
SUCCESS = 1;

truncate table refined_data.initiative cascade;
truncate table refined_data.initiative_voting cascade;


select id_ref_voting_status into id_status_favor from refined_data.ref_voting_status rvs where lower(status) = lower('Favor');
select id_ref_voting_status into id_status_against from refined_data.ref_voting_status rvs where lower(status) = lower('Contra');
select id_ref_voting_status into id_status_abstent from refined_data.ref_voting_status rvs where lower(status) = lower('Abstenção');
select id_ref_voting_status into id_status_away from refined_data.ref_voting_status rvs where lower(status) = lower('Ausente');


for _row in select REGEXP_REPLACE(voting_favor, '{*}*"*', '', 'gm') AS voting_favor,
REGEXP_REPLACE(voting_against, '{*}*"*', '', 'gm') AS voting_against,
REGEXP_REPLACE(voting_abstention, '{*}*"*', '', 'gm') AS voting_abstention,
REGEXP_REPLACE(voting_absent, '{*}*"*', '', 'gm') AS voting_absent,
id_scraped_data , title , "text", status , url, "date"


from
staging_data.scraped_data sd
where
trim(status) <> '' and (voting_favor <> '{}' or voting_against <> '{}' or voting_abstention <> '{}' or voting_absent <> '{}')
and trim(date) <> ''
order by id_scraped_data asc

loop
ret_initiative_id = -1;
arr_voting_favor := '{}';
arr_voting_against := '{}';
arr_voting_abstent := '{}';
arr_voting_away := '{}';

select id_ref_initiative_result into id_initiative_result from refined_data.ref_initiative_result rvs where lower(result) = lower(_row.status);

raise notice 'Iterate favor parties';
for r_voting_favor in select unnest(string_to_array(_row.voting_favor,','))
loop
if r_voting_favor ~ '^[A-Z-]*$' then
raise notice 'Favor party % match', r_voting_favor;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_favor);
arr_voting_favor := arr_voting_favor || aux_id_ref_party;
end if;
end loop;

raise notice 'Iterate against parties';
for r_voting_against in select unnest(string_to_array(_row.voting_against,','))
loop
if r_voting_against ~ '^[A-Z-]*$' then
raise notice 'Against party % match', r_voting_against;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_against);
arr_voting_against := arr_voting_against || aux_id_ref_party;
end if;
end loop;

raise notice 'Iterate abstentions parties';
for r_voting_abstention in select unnest(string_to_array(_row.voting_abstention,','))
loop
if r_voting_abstention ~ '^[A-Z-]*$' then
raise notice 'Absentee party % match ', r_voting_abstention;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_abstention);
arr_voting_abstent := arr_voting_abstent || aux_id_ref_party;
end if;
end loop;

raise notice 'Iterar absent parties';
for r_voting_away in select unnest(string_to_array(_row.voting_absent,','))
loop
if r_voting_away ~ '^[A-Z-]*$' then
raise notice 'Absent party % match', r_voting_away;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_away);
arr_voting_away := arr_voting_away || aux_id_ref_party;
end if;
end loop;

raise notice 'Inserting initiative: %.', _row.title;
insert into refined_data.initiative (title, "text", url, "date", id_result)
values (_row.title, _row.text, _row.url, _row.date::date, id_initiative_result::integer)
RETURNING id_initiative INTO ret_initiative_id;

raise notice 'Inserting favor votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_favor
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_favor, ret_initiative_id);
END LOOP;

raise notice 'Inserting against votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_against
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_against, ret_initiative_id);
END LOOP;

raise notice 'Inserting abstent votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_abstent
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_abstent, ret_initiative_id);
END LOOP;

raise notice 'Inserting away votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_away
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_away, ret_initiative_id);
END LOOP;


end loop;

truncate table staging_data.scraped_data;

return SUCCESS;
end;

$function$
;

This function selects each record from the staging_schema, checks if the party exists - with the use of a regex expression - and then inserts the data into the refined_schema tables. There was a need to implement this because there are votes that are not done by parties, but by single deputies and those, I did not considered to this project.

Data Visualization​

Since I'm a huge fan of Metabase, as I did use in other projects, I've decided to switch to Apache Superset to learn how it works. Now I'm a huge fan of Apache Superset as I am of Metabase! To install Superset I've just followed the install guide present in their website. Superset allows to do querys that are refered as datasets. Having the result of the queries, we can create the dashboards, based on them.

diagram

After creating all queries, the final dashboard displays the following information:

  • Number of approved initiatives
  • Number of rejected initiatives
  • Number of approved by all initiatives
  • Favor votes
  • Against votes
  • Abtsent votes
  • Away party when voting occured
  • Detail of all initiatives

diagram

With the possibility to add filters by year and party: diagram

Integrating all technologies​

Integrating this technologies was achieved by creating a cron job on the Azure VM instance - running with Ubuntu Server - to, at 2:30 pm each day runs the scraper.py script that scrapes the voting website, downloads the pdf files, uploads it to Azure Storage and inserts into the staging dataset.

diagram

When running, the script displays the following output, when downloading the PDF files:

diagram

Scraping the voting details website:

diagram

To achieve this pipeline, I've created an Azure Free Account to allow me to better know how Microsoft cloud works.

diagram

Conclusions & Further Steps​

I'm quite happy with this project. It helped me to know Azure products in practice, after I done the Azure Fundamentals Certification. I've had some difficulties with the scraping API because the voting website does not follow the same structure across the different voting pages. Besides that, since this was a limited time subscription (to avoid costs on my side), I think I've achieved a great final result. In the future, I plan to include the individual deputy votes and count them in the final votation.