Skip to main content

4 posts tagged with "Python"

View All Tags

ETL Pipeline with Azure Data Factory and Azure Databricks

ยท 9 min read

ETL Pipeline with Azure Data Factory and Azure Databricks to process CSV files of global emissions and the planet temperature along the years.

Overviewโ€‹

The main goal of this project was to learn how a typical cloud ETL works. I've learned along the way, the only knowlege I had was the concepts from Azure Data Fundamentals that I completed recently. With that in mind I've searched in Kaggle for some datasets to play with, and I selected the Global Emissions C02 dataset and the Global Temperature. The idea was to load the file into a Azure Storage Account, process it with an Azure Databricks notebook (with PySpark) and load the processed file into a customly designed data model into a SQL Server database. This was accomplished with a creation of an Azure Data Factory Pipeline.

diagram

After processing the files and loaded the information on the SQL Database, the final results were displayed in a Metabase dashboard.

diagram diagram diagram

Azure Data Factoryโ€‹

This part of the project was the one that I was most curious about, because of it's potential. I won't describe every step i did to get ADF working because it was a lot of trial and error since it was the first time working with Azure Data Factory. The goal was to call the Azure Databricks notebook and insert the processed files into a SQL Server database. To do so, the following steps were done:

Linked Servicesโ€‹

The first step of the process was to create the services to connect to the SQL Server database (to log the errors when they happened), the LS to connect to Azure Databricks and finally the two LS to connect to the two containers of the Storage Account. diagram

Data Setsโ€‹

After the services, it was time to create the Datasets, so I've created two datasets per CSV file, which were the datasets before and after processed.

diagram

The following example was of the CO2 (input dataset), which uses the LS_Emission_Data_Input linked service to connect to the storage account. As you can see, when hitting the "Preview Data" button, the content of the file is displayed on the screen. diagram

After processing the file, the LS_Emission_Data_Output was "called" to store the file inside the "container-emission-files-processed" container. diagram

Final Pipelineโ€‹

This was the final pipeline: diagram

It was made up of the following activities:

  1. Two Validation activities called "check_for_co2_data" and "check_for_temp_data" that check if the file is in the "container-emission-files" to be processed;
  2. When on sucess in the previous task, a call to the Databricks Notebook that processes the CSV files;
  3. Two Set Variable activities that set the description of the error and the activity name when the Databricks task ends up on error;
  4. A Stored Procedure activity that was called when there is an error in Databricks process;
  5. Two Copy Data activities that copies the files from the source container to the "processed" container;
  6. Two Delete activities that delete the processed files from the source container.

When on sucess, the pipeline fills the SQL Server database tables: diagram

Error Handlingโ€‹

Error handling is a part of the job. With a lot more time (and money, God! Databricks is expensive!) to do this project i would made it more robust in terms of error validation. But the key point was to see a possible approach to do the error handling. With that, i created a Stored Procedure that writes to a specific table the error detail information: diagram diagram

Azure Databricksโ€‹

Databricks is a powerful platform. I've decided to try it. After creating a Standard Cluster (Standard_F4 with 8GB and 4 Cores), I used it's capabilities to process the two CSV files that I loaded with the help of PySpark. I could have use Pandas for this job, but let's leave it for another time. So, i built this notebook:

from pyspark.sql.functions import *


# If the mount point already exists, does not try to mount it again. Otherwise it will.
# Gets the password from the azure key vault.
mountPoint = "/mnt/container-emission-files"
if not any(mount.mountPoint == mountPoint for mount in dbutils.fs.mounts()):
dbutils.fs.mount(source = "wasbs://container-emission-files@saemissionfiles.blob.core.windows.net",
mount_point = mountPoint, extra_configs = {"fs.azure.account.key.saemissionfiles.blob.core.windows.net":dbutils.secrets.get(scope = "databricks-secret-scope", key = "sa-acess-key")})

#database properties
db_pass = dbutils.secrets.get(scope="databricks-secret-scope", key= "db-emission-data-pwd")
jdbcUrl = "jdbc:sqlserver://az-server-emissions.database.windows.net:1433;database=db-emissions-data"
dbConnectionProperties = {
"user" : "db-user",
"password" : db_pass
}

def write_to_database(df, table_name):
'''
Writes processed df to database
'''
df.write.mode("append").jdbc(jdbcUrl, table_name,properties=dbConnectionProperties)

#Load Data
df_global_temperature = spark.read.format("csv").option("header", "true").load(mountPoint+"/GlobalLandTemperaturesByCountry.csv")
df_co2 = spark.read.format("csv").option("header", "true").load(mountPoint+"/owid-co2-data.csv")

#Country Data
df_country = df_co2.select([Co2Columns.COUNTRY, 'iso_code']).dropDuplicates()
df_country = df_country.na.drop()
df_country = df_country.sort(df_country['iso_code'].asc()).select("*").withColumn("id_country", monotonically_increasing_id()+1)
write_to_database(df_country, "processed_data.tr_country")

df_country_info = df_co2.select(['year', 'population', 'gdp', 'iso_code']).dropDuplicates()
df_country_info = df_country_info.na.drop()
df_country_info = df_country_info.withColumnRenamed("iso_code","iso_code_info")
df_country_info = df_country_info.join(df_country, df_country['iso_code'] == df_country_info['iso_code_info'], "inner")
df_country_info = df_country_info.drop('iso_code_info', 'country', 'iso_code')
write_to_database(df_country_info, "processed_data.td_country_info")

#Temperature Data
df_global_temperature = df_global_temperature.drop('AverageTemperatureUncertainty')
df_global_temperature = df_global_temperature.withColumnRenamed('dt', 'Date')
df_global_temperature = df_global_temperature.na.drop(how="any", subset=['AverageTemperature'])
df_global_temperature = df_global_temperature.withColumn('Date',year(df_global_temperature.Date))
df_global_temperature = df_global_temperature.groupBy(group_cols).agg(avg("AverageTemperature").alias("AverageTemperature_ByYear"))

df_co2_countries = df_co2.select(df_co2['iso_code'], df_co2['country']).dropDuplicates()
df_co2_countries = df_co2_countries.na.drop()
df_temperature_processed = df_global_temperature.join(df_co2_countries, df_global_temperature["Country"] == df_co2_countries["country"], "inner")
df_temperature_processed = df_temperature_processed.withColumnRenamed('Date', 'year').withColumnRenamed('AverageTemperature_ByYear','average_temperature')
df_temperature_final = df_temperature_processed.join(df_country, df_temperature_processed["iso_code"] == df_country["iso_code"], "inner")
df_temperature_final = df_temperature_final.drop('country', 'Country', 'iso_code')
write_to_database(df_temperature_final, "processed_data.td_global_temperature")

#Methane Data
df_methane_emissions_processed = df_co2.select(['year', 'methane', 'methane_per_capita', 'iso_code']).dropDuplicates()
df_methane_emissions_processed = df_methane_emissions_processed.na.drop(how="any", subset=['iso_code'])
df_methane_emissions_processed = df_methane_emissions_processed.withColumnRenamed("iso_code","iso_code_2")
df_methane_emissions_processed = df_methane_emissions_processed.join(df_country, df_country['iso_code'] == df_methane_emissions_processed['iso_code_2'], "inner")
df_methane_emissions_processed = df_methane_emissions_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_methane_emissions_processed, "processed_data.td_produced_methane")

#Nox Data
df_nox_processed = df_co2.select(['year', 'nitrous_oxide', 'nitrous_oxide_per_capita', 'iso_code']).dropDuplicates()
df_nox_processed = df_nox_processed.na.drop(how="any", subset=['iso_code'])
df_nox_processed = df_nox_processed.withColumnRenamed("iso_code","iso_code_2")
df_nox_processed = df_nox_processed.join(df_country, df_country['iso_code'] == df_nox_processed['iso_code_2'], "inner")
df_nox_processed = df_nox_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_nox_processed, "processed_data.td_produced_nox")

#Green House Gas Data
df_ghg_processed = df_co2.select(['year', 'ghg_per_capita', 'total_ghg', 'iso_code']).dropDuplicates()
df_ghg_processed = df_ghg_processed.na.drop(how="any", subset=['iso_code'])
df_ghg_processed = df_ghg_processed.withColumnRenamed("iso_code","iso_code_2")
df_ghg_processed = df_ghg_processed.join(df_country, df_country['iso_code'] == df_ghg_processed['iso_code_2'], "inner")
df_ghg_processed = df_ghg_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_ghg_processed, "processed_data.td_produced_ghg")

#Co2 Consumption Data
df_co2_consumption_processed = df_co2.select(['year', 'consumption_co2', 'consumption_co2_per_capita', 'consumption_co2_per_gdp', 'iso_code']).dropDuplicates()
df_co2_consumption_processed = df_co2_consumption_processed.na.drop(how="any", subset=['iso_code'])
df_co2_consumption_processed = df_co2_consumption_processed.withColumnRenamed("iso_code","iso_code_2")
df_co2_consumption_processed = df_co2_consumption_processed.join(df_country, df_country['iso_code'] == df_co2_consumption_processed['iso_code_2'], "inner")
df_co2_consumption_processed = df_co2_consumption_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_co2_consumption_processed, "processed_data.td_consumption_co2")

#Co2 Production Data
df_co2_processed = df_co2.select(['year', 'cement_co2', 'cement_co2_per_capita', 'co2_including_luc', 'co2_per_capita', 'co2_per_gdp', 'coal_co2', 'coal_co2_per_capita', 'flaring_co2', 'flaring_co2_per_capita', 'gas_co2' , 'gas_co2_per_capita', 'oil_co2','oil_co2_per_capita', 'other_co2_per_capita', 'other_industry_co2', 'iso_code']).dropDuplicates()
df_co2_processed = df_co2_processed.na.drop(how="any", subset=['iso_code'])
df_co2_processed = df_co2_processed.withColumnRenamed("iso_code","iso_code_2").withColumnRenamed("other_industry_co2","other_co2").withColumnRenamed("co2_including_luc","total_co2").withColumnRenamed("co2_per_capita","total_co2_per_capita").withColumnRenamed("co2_per_capita","total_co2_per_capita").withColumnRenamed("co2_per_gdp","total_co2_per_gdp")
df_co2_processed = df_co2_processed.join(df_country, df_country['iso_code'] == df_co2_processed['iso_code_2'], "inner")
df_co2_processed = df_co2_processed.drop('iso_code','country', 'iso_code_2')
write_to_database(df_co2_processed, "processed_data.td_produced_co2")

dbutils.fs.unmount("/mnt/container-emission-files")

It is a big one, but it's simple. What it does is to access the storage account (with the mount option) and load the CSV files to memory. After that, it just selects the columns from each file to have the information that i needed. All of the joins were made with the country table, to avoid duplicate entry values. After this, each dataframe was written to the database. One of the challenges was to include the secrets into the notebook, but with the help of Microsoft and Databricks documentation, I was able to avoid plain text passwords inside the notebook. To do so, with the Azure Key Vault already created, i needed to create a Secret Scope to allow me connect to the Key Vault.

diagram

After this, it was possible to access the password from the key vault with the following:

db_pass = dbutils.secrets.get(scope="databricks-secret-scope", key= "db-emission-data-pwd")

Other Azure Servicesโ€‹

Azure Key Vaultโ€‹

To store the passwords from Storage Account and SQL Server access, I chose the Azure Key Vault to do so. This was the first time working with this service. So, I just needed to create a Key Vault, create two secrets and that's it!

diagram diagram

The great advantage of this service is that it stores the password encrypted and all of the other Azure Services can access those passwords.

Azure Storage Accountโ€‹

To store the files before and after being processed, I chose an Azure Storage Account. Since this is a PoC project, the configurations were selected to be the "minimum" available to have the files in there. The only change I needed to do on the storage account was to allow the "Storage Account Key Access" option. Without changing this parameter, I was not able to run the Validation task on ADF. diagram

Since the ADF pipeline had a task to copy the files from the source container to the processed container, there was a need to create two containers, the "container-emission-files" and the "container-emission-files-processed".

diagram

Azure SQL Serverโ€‹

To store the processed CSV files, I've create a SQL Server Database to write the information gathered and filtered from the CSV files.

diagram

After this done, I've designed a simple data model to settle the information. This data model had the following tables and schemas:

  • error_handling.error_log - stores the errors thrown by Azure Databricks
  • processed_data.td_consumption_co2 - stores the CO2 consumption data
  • processed_data.td_country_info- stores the details of the countries like gdp and population_
  • processed_data.td_global_temperature- stores the global temperature information
  • processed_data.td_produced_co2- stores the produced CO2 data
  • processed_data.td_produced_ghg- stores the produced Green House Gas emissions data
  • processed_data.td_produced_methane- stores the produced methane emissions data
  • processed_data.td_produced_nox- stores the produced nox emissions data
  • processed_data.tr_country- stores the countries that exist in both data files

diagram

Conclusions & Further Stepsโ€‹

This was the first ETL process that I've made entirely on the Cloud. Work with Azure Data Factory and Azure Databricks was a process of trial and error until I was able to made it work. It was a "fun ride", and I am quite happy with the result. To this display the processed data, I've decided to work with Metabase again since it is, in my opinion, the most straightforward software to do dashboards! I'm a huge fan, I have to admit it! But the main goal of this project wasn't make the display of the data to shine, it was to learn the "behind the scenes" technology that can be used to ingest and transform large amounts of CSV (or other types of files) in the Cloud. For the future, it would be nice to automatically donwload the files from Kaggle (with a Python script, for example) and load it to the Azure Storage Account based on a schedule, similarly with what i've done with F1 Project into Google Cloud Storage.

Web Scraping - Portuguese Parliament Voting

ยท 9 min read

Scraping data from portuguese parliament with the help of Python library called Beautiful Soap.

Overviewโ€‹

The main goal of this project was to learn how to do data extaction with web scraping. In order to practice web scraping I choosed the Portuguese Parliament Voting website to get all of the votings since 2012. The goal was to extract, process and provide an interactive dashboard to see the votings, allowing the possibility to filter by party and year.

diagram

To allow this, my implementation was based on the following architectural diagram:

The components used were:

  • Azure Cloud
  • Azure Storage Account
  • Azure Postgres Database (with Flexible Servers)
  • docker
  • Apache Superset

diagram

Data Ingestionโ€‹

To ingest the data from the voting website I used Python with a library called Beautiful Soap. With this, I was able to iterate for each voting entry and save the available PDF file and upload it to an Azure Storage Account. That was achieved with the following code:

def scrap_voting_website(self, url, text_to_search, container_name):
page = requests.get(url)
soup = BeautifulSoup(page.content, "html.parser")

results = soup.find(
id="ctl00_ctl50_g_ef1010ac_1ad2_43f3_b329_d1623b7a1246_ctl00_pnlUpdate")

job_elements = results.find_all(
"div", class_="row home_calendar hc-detail")

for job_element in job_elements:
date = job_element.find("p", class_="date")
year = job_element.find("p", class_="time")

date_fmt = date.text.strip()
year_fmt = year.text.strip()

links = job_element.find_all("a", href=True)

# Iterates for all pdfs and downloads the links to access the site
for link in links:
for text in text_to_search:
if text.casefold() in link.text.strip().casefold():
url = link['href']
file_utils = FileUtils()
file_utils.download_file_upload_az_storage(
url, link.text.strip(), date_fmt, year_fmt, container_name)

To perform the upload to the azure data storage:

    def download_file_upload_az_storage(self, *args):

try:
filename = f"{args[1]}_{args[2]}.{args[3]}.pdf"
filename = filename.replace('/', '_').replace(' ', '_').replace('\\', '_').replace('-', '_')

file = Path(f"polls-pdfs/{args[3]}/{filename}")

print(f"Downloading {filename}.", flush=True)
response = requests.get(args[0])
file.write_bytes(response.content)

except requests.exceptions.HTTPError as errh:
print ("Http Error:",errh, flush=True)
except requests.exceptions.ConnectionError as errc:
print ("Error Connecting:",errc, flush=True)
except requests.exceptions.Timeout as errt:
print ("Timeout Error:",errt, flush=True)
except requests.exceptions.RequestException as err:
print ("RequestException: General Error",err, flush=True)

azure_utils = AzureUtils()
container_name = args[4] + "/"+ args[3]

print(f"Uploading {filename} to container {container_name}.")
azure_utils.upload_blob_to_container(container_name, file, filename)

As a result, the PDF files downloaded from voting site was uploaded to the existing Azure Storage Account. The download_file_upload_az_storage function downloads the file and uploads it to the container inside the storage account. Each downloaded file is divided by year.

diagram diagram

After upload the PDF files to the Storage Account, I've used a library called PyPDF2 to read all PDF files and take out all of the URLs. This was achieved with the following function:

    def get_url_from_pdf_files(self, destination_folder, url_to_filter, arr_url_initiatives):

for filename in os.listdir(destination_folder):
PDFFile = open(f"{destination_folder}/{filename}", 'rb')

PDF = PyPDF2.PdfFileReader(PDFFile)
pages = PDF.getNumPages()
key = '/Annots'
uri = '/URI'
ank = '/A'

for page in range(pages):
pageSliced = PDF.getPage(page)
pageObject = pageSliced.getObject()
if key in pageObject.keys():
ann = pageObject[key]
for a in ann:
u = a.getObject()
if uri in u[ank].keys() and url_to_filter.casefold() in u[ank][uri].casefold():
if not u[ank][uri] in arr_url_initiatives:
arr_url_initiatives.append(u[ank][uri])

return arr_url_initiatives

With the array filled with the existing URLs from voting details page, it was time to scrape the voting details page.

I = 1
for url in arr_url_initiatives:
print(f"{str(I)} || Scraping initiative urls to get voting details.", flush=True)
arr_voting_data = scrapper_utils.scrap_initiative_website(url, arr_voting_data)
I += 1

Having the arr_voting_data filled with all of the voting model object details, it was time to insert this data into a Postgres Database. To achieve that, I've made a class that connects to the Postgres Server running on Azure and inserts the data in the staging_data schema created.

import psycopg2
import os
from dotenv import load_dotenv

load_dotenv() # take environment variables from .env.

db_name = os.environ.get("POSTGRES_DB")
db_host = os.environ.get("POSTGRES_HOST")
db_user = os.environ.get("POSTGRES_USER")
db_pass = os.environ.get("POSTGRES_PASSWORD")
db_port = os.environ.get("POSTGRES_PORT")
db_schema = os.environ.get("POSTGRES_SCHEMA")

class AZdbUtils:

def connect(self):
try:
conn = psycopg2.connect(database=db_name,
host=db_host,
user=db_user,
password=db_pass,
port=db_port)

except (Exception, psycopg2.DatabaseError) as error:
print(error)

return conn

def upload_data_db(self, arr_voting_data):

conn = self.connect()
cursor = conn.cursor()

for data in arr_voting_data:
print('Inserting data for initiative: ' + data.get_title(), flush=True)
cursor.execute('INSERT INTO staging_data.scraped_data (title, text, status, url, date, voting_favor, voting_against, voting_abstention, voting_absent) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)',
(data.get_title(), data.get_text(), data.get_status(), data.get_url(), data.get_date(), data.get_voting_favor(), data.get_voting_against(), data.get_voting_abstention(), data.get_voting_absent()))

conn.commit()

print('Executing transform function', flush=True)
cursor.execute('select staging_data.func_transform_staging_data();')
conn.commit()

if conn is not None:
conn.close()
print('Database connection closed.', flush=True)

Data Processingโ€‹

As you can see in the previous chapter, the final step of the upload_data_db function is to call a function named func_transform_staging_data. This function cleans the data and processes it to insert in the refined_data schema.

diagram

The two created schemas are:

  • staging_data - scraped data is inserted in "raw" format.
    • Table scraped_data diagram
  • refined_data - post-processed data
    • Table initiative - Stores the initiative details (text, title, created date, url)
    • Table initiative_voting - Stores the foreign keys to party, voting_status and initiative tables. The combination of the three keys make one composed primary key.
    • Table ref_party - Stores the party details
    • Table ref_initiative_result - Stores the initiative result details (approved, not approved, approved by all)
    • Table ref_voting_status - Stores the initiative voting (favor, against, absent, away)

diagram

To perform the data transformation, I've created the following Postgres function:

CREATE OR REPLACE FUNCTION staging_data.func_transform_staging_data()
RETURNS integer
LANGUAGE plpgsql
AS $function$
declare
SUCCESS INTEGER;
_row record;
r_voting_favor text;
r_voting_against text;
r_voting_abstention text;
r_voting_away text;
error_loop bool;
ret_initiative_id BIGINT;
party_vote_iterator integer;

arr_voting_favor bigint[] DEFAULT ARRAY[]::bigint[];
arr_voting_against bigint[] DEFAULT ARRAY[]::bigint[];
arr_voting_abstent bigint[] DEFAULT ARRAY[]::bigint[];
arr_voting_away bigint[] DEFAULT ARRAY[]::bigint[];
aux_id_ref_party bigint;

id_status_favor int;
id_status_against int;
id_status_abstent int;
id_status_away int;
id_initiative_result int;

begin
SUCCESS = 1;

truncate table refined_data.initiative cascade;
truncate table refined_data.initiative_voting cascade;


select id_ref_voting_status into id_status_favor from refined_data.ref_voting_status rvs where lower(status) = lower('Favor');
select id_ref_voting_status into id_status_against from refined_data.ref_voting_status rvs where lower(status) = lower('Contra');
select id_ref_voting_status into id_status_abstent from refined_data.ref_voting_status rvs where lower(status) = lower('Abstenรงรฃo');
select id_ref_voting_status into id_status_away from refined_data.ref_voting_status rvs where lower(status) = lower('Ausente');


for _row in select REGEXP_REPLACE(voting_favor, '{*}*"*', '', 'gm') AS voting_favor,
REGEXP_REPLACE(voting_against, '{*}*"*', '', 'gm') AS voting_against,
REGEXP_REPLACE(voting_abstention, '{*}*"*', '', 'gm') AS voting_abstention,
REGEXP_REPLACE(voting_absent, '{*}*"*', '', 'gm') AS voting_absent,
id_scraped_data , title , "text", status , url, "date"


from
staging_data.scraped_data sd
where
trim(status) <> '' and (voting_favor <> '{}' or voting_against <> '{}' or voting_abstention <> '{}' or voting_absent <> '{}')
and trim(date) <> ''
order by id_scraped_data asc

loop
ret_initiative_id = -1;
arr_voting_favor := '{}';
arr_voting_against := '{}';
arr_voting_abstent := '{}';
arr_voting_away := '{}';

select id_ref_initiative_result into id_initiative_result from refined_data.ref_initiative_result rvs where lower(result) = lower(_row.status);

raise notice 'Iterate favor parties';
for r_voting_favor in select unnest(string_to_array(_row.voting_favor,','))
loop
if r_voting_favor ~ '^[A-Z-]*$' then
raise notice 'Favor party % match', r_voting_favor;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_favor);
arr_voting_favor := arr_voting_favor || aux_id_ref_party;
end if;
end loop;

raise notice 'Iterate against parties';
for r_voting_against in select unnest(string_to_array(_row.voting_against,','))
loop
if r_voting_against ~ '^[A-Z-]*$' then
raise notice 'Against party % match', r_voting_against;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_against);
arr_voting_against := arr_voting_against || aux_id_ref_party;
end if;
end loop;

raise notice 'Iterate abstentions parties';
for r_voting_abstention in select unnest(string_to_array(_row.voting_abstention,','))
loop
if r_voting_abstention ~ '^[A-Z-]*$' then
raise notice 'Absentee party % match ', r_voting_abstention;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_abstention);
arr_voting_abstent := arr_voting_abstent || aux_id_ref_party;
end if;
end loop;

raise notice 'Iterar absent parties';
for r_voting_away in select unnest(string_to_array(_row.voting_absent,','))
loop
if r_voting_away ~ '^[A-Z-]*$' then
raise notice 'Absent party % match', r_voting_away;
select id_ref_party into aux_id_ref_party from refined_data.ref_party rp where lower(name) like lower(r_voting_away);
arr_voting_away := arr_voting_away || aux_id_ref_party;
end if;
end loop;

raise notice 'Inserting initiative: %.', _row.title;
insert into refined_data.initiative (title, "text", url, "date", id_result)
values (_row.title, _row.text, _row.url, _row.date::date, id_initiative_result::integer)
RETURNING id_initiative INTO ret_initiative_id;

raise notice 'Inserting favor votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_favor
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_favor, ret_initiative_id);
END LOOP;

raise notice 'Inserting against votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_against
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_against, ret_initiative_id);
END LOOP;

raise notice 'Inserting abstent votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_abstent
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_abstent, ret_initiative_id);
END LOOP;

raise notice 'Inserting away votes to iniative.';
FOREACH party_vote_iterator IN ARRAY arr_voting_away
LOOP
insert into refined_data.initiative_voting (id_ref_party, id_ref_voting_status, id_initiative) values (party_vote_iterator, id_status_away, ret_initiative_id);
END LOOP;


end loop;

truncate table staging_data.scraped_data;

return SUCCESS;
end;

$function$
;

This function selects each record from the staging_schema, checks if the party exists - with the use of a regex expression - and then inserts the data into the refined_schema tables. There was a need to implement this because there are votes that are not done by parties, but by single deputies and those, I did not considered to this project.

Data Visualizationโ€‹

Since I'm a huge fan of Metabase, as I did use in other projects, I've decided to switch to Apache Superset to learn how it works. Now I'm a huge fan of Apache Superset as I am of Metabase! To install Superset I've just followed the install guide present in their website. Superset allows to do querys that are refered as datasets. Having the result of the queries, we can create the dashboards, based on them.

diagram

After creating all queries, the final dashboard displays the following information:

  • Number of approved initiatives
  • Number of rejected initiatives
  • Number of approved by all initiatives
  • Favor votes
  • Against votes
  • Abtsent votes
  • Away party when voting occured
  • Detail of all initiatives

diagram

With the possibility to add filters by year and party: diagram

Integrating all technologiesโ€‹

Integrating this technologies was achieved by creating a cron job on the Azure VM instance - running with Ubuntu Server - to, at 2:30 pm each day runs the scraper.py script that scrapes the voting website, downloads the pdf files, uploads it to Azure Storage and inserts into the staging dataset.

diagram

When running, the script displays the following output, when downloading the PDF files:

diagram

Scraping the voting details website:

diagram

To achieve this pipeline, I've created an Azure Free Account to allow me to better know how Microsoft cloud works.

diagram

Conclusions & Further Stepsโ€‹

I'm quite happy with this project. It helped me to know Azure products in practice, after I done the Azure Fundamentals Certification. I've had some difficulties with the scraping API because the voting website does not follow the same structure across the different voting pages. Besides that, since this was a limited time subscription (to avoid costs on my side), I think I've achieved a great final result. In the future, I plan to include the individual deputy votes and count them in the final votation.

Twitter Real-time Word Processing

ยท 9 min read

Real time word processing using Apache Spark (PySpark).

Overviewโ€‹

The main goal of this project was to do a little PoC and learn how real-time data processing works. When someone begins their journey in data engineering, they will usually start out with a similar project. What I did was use a much known API from Twitter. With the help of Tweepy I was able to gather the tweets of specific content based on the keywords stored on a Postgres database. Following this search, the Tweepy sent the tweets to a Kafka topic to ensure that none of the tweets were lost. Then Spark started it's work with two worker nodes to read data from the Kafka topic and to process the tweets, saving the result in a postgres table which fed a metabase dashboard.

diagram

Getting Tweets with Tweepyโ€‹

Tweepy is a Python library that provides access to the entire Twitter REST API. After setting up a developer account in Twitter, the next step was to implement the code to get the tweets and send them to the previously created Kafka topic.

#read_twitter_data_stream.py

import tweepy
from kafka import KafkaProducer
from json import dumps
import os
import psycopg2

topic = os.environ.get("TOPIC_NAME")

api_key = os.environ.get("TWITTER_API_KEY")
api_secret = os.environ.get("TWITTER_API_SECRET")
bearer_token = os.environ.get("TWITTER_BEARER_TOKEN")
access_token = os.environ.get("TWITTER_ACCESS_TOKEN")
access_token_secret = os.environ.get("TWITTER_ACCESS_TOKEN_SECRET")

class TwitterStream(tweepy.StreamingClient):
# This function gets called when the stream is working
def on_connect(self):
print("Connected")

# This function gets called when a tweet passes the stream
def on_tweet(self, tweet):

# Displaying tweet in console
if tweet.referenced_tweets == None:
print(f"Sending to Kafka: {tweet.text}")
try:
producer.send(topic, tweet.text)
producer.flush()
except:
print("An exception occurred")


db_name = os.environ.get("POSTGRES_DB")
db_host = os.environ.get("POSTGRES_HOST")
db_user = os.environ.get("POSTGRES_USER")
db_pass = os.environ.get("POSTGRES_PASSWORD")
db_port = os.environ.get("POSTGRES_PORT")

conn = psycopg2.connect(database=db_name,
host=db_host,
user=db_user,
password=db_pass,
port=db_port)

cursor = conn.cursor()

cursor.execute('SET search_path TO streaming_data')
cursor.execute('SELECT value FROM streaming_data.term_search WHERE active is true')

results = cursor.fetchall()

conn.close()

if len(results) > 0:
for term in results:
print(f"Printing search term: {term[0]}")

producer = KafkaProducer(bootstrap_servers=['kafka:29092'],
value_serializer=lambda x:
dumps(x, ensure_ascii=False).encode('utf-8'))

client = tweepy.Client(bearer_token, api_key, api_secret,
access_token, access_token_secret)

auth = tweepy.OAuth1UserHandler(
api_key, api_secret, access_token, access_token_secret)
api = tweepy.API(auth)

# Creating Stream object
stream = TwitterStream(bearer_token=bearer_token)

print("Search for previous rules.")
previousRules = stream.get_rules().data

if previousRules:
print("Deleting previous rules.")
stream.delete_rules(previousRules)

for term in results:
stream.add_rules(tweepy.StreamRule(term[0]))

stream.filter(tweet_fields=["referenced_tweets"])
else:
print("No search term find in database. Aborting.")

The authentication details are being injected through an '.env' file.

Queuing Data With Kafkaโ€‹

To set up Kafka I used a Docker image from Confluentinc. Confluentic also has a Docker image to manage the Kafka topics, so I installed the UI interface, which is named "control-center". To create the topic, the command I ran was:

kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic tp-twitter-stream --replication-factor 1 --partitions 1

diagram

Data (Real-time) Processingโ€‹

This was by far the most difficult part of all process. In my opinion, Spark is hard to learn. With no prior experience with Spark, I got very confused with how it works, then I realized that I should have start with Spark, instead of start with Spark Structured Streaming (which seems more complex). With PySpark, I was able to follow the docs available in Apache Spark website to get the implementation of word count done the correct way.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

db_name = os.environ.get("POSTGRES_DB")
db_host = os.environ.get("POSTGRES_HOST")
db_user = os.environ.get("POSTGRES_USER")
db_pass = os.environ.get("POSTGRES_PASSWORD")
db_port = os.environ.get("POSTGRES_PORT")
db_schema = os.environ.get("POSTGRES_SCHEMA")

db_target_properties = {"user": db_user, "password": db_pass, "driver": 'org.postgresql.Driver'}
db_connection_string = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}?currentSchema={db_schema}"

def split_words(input_expression):
return explode(split(input_expression, " "))

def foreach_batch_function(df, id):
df.write.jdbc(url=db_connection_string,table='word_count', properties=db_target_properties, mode='overwrite')
pass

print("## Starting Processing ##")

spark = SparkSession \
.builder \
.appName("Twitter Stream Word Count") \
.master("spark://10.0.0.8:7077")\
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("## Creating session ##")

kafka_input_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "10.0.0.3:29092") \
.option("subscribe", "tp-twitter-stream") \
.option("startingOffsets", "earliest") \
.load()

print("## Listening to Kafka Topic ##")

# Split the lines into words
splited_words = split_words(kafka_input_df.value)

words_df = kafka_input_df.select(splited_words.alias("word"),kafka_input_df.timestamp.alias('time'))

# Take special chars out
words_df_clean = words_df.withColumn("word", regexp_replace(col("word"), "[^0-9a-zA-Z]+", ""))

counts_df = words_df_clean.withWatermark("time", "1 minutes").groupBy("word").count()
final_df = counts_df.writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()
final_df.awaitTermination()

diagram

Data Visualizationโ€‹

For data visualization, I used Metabase. It's an AMAZING product to build dashboards! Since I was short on time to finish this project, I decided to use Metabase again. I saw alternatives like Redash and Apache Superset but the complexity of their docker-compose files made my decision easy: there was no need to have a super complex product to show a simple desired chart.

Integrating all technologies with Dockerโ€‹

To integrate all of this technologies together, I've used Docker with docker-compose to simplify the management of the ten dockers.

Dockerfile:

FROM python:3
ENV PYTHONUNBUFFERED=1
RUN mkdir -p /app
WORKDIR /app

#Installing python dependencies
RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install kafka-python
RUN pip install psycopg2

COPY ./streaming-files/read_twitter_data_stream.py .
CMD ["/bin/bash", "-c", "/bin/sleep 60 && python ./read_twitter_data_stream.py"]

Docker Compose:

version: "2"
services:
zookeeper:
container_name: zookeeper
user: "0:0"
image: "confluentinc/cp-zookeeper:latest"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: always
volumes:
- ./volume-data/zookeeper/data:/var/lib/zookeeper/data
- ./volume-data/zookeeper/log:/var/lib/zookeeper/log
networks:
containers-network:
ipv4_address: 10.0.0.2

kafka:
container_name: kafka
user: "0:0"
image: "confluentinc/cp-kafka:latest"
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
restart: always
volumes:
- ./volume-data/kafka/data:/var/lib/kafka/data
networks:
containers-network:
ipv4_address: 10.0.0.3

control-center:
container_name: control-center
image: "confluentinc/cp-enterprise-control-center:latest"
hostname: control-center
depends_on:
- zookeeper
- kafka
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:29092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
restart: always
networks:
containers-network:
ipv4_address: 10.0.0.4

init-kafka:
container_name: init-kafka
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
entrypoint: ["/bin/sh", "-c"]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list

echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic tp-twitter-stream --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
networks:
containers-network:
ipv4_address: 10.0.0.5

python-script:
container_name: python-script
build: ./
image: python:3
depends_on:
- zookeeper
- kafka
- init-kafka
- pg-streaming-db
environment:
- PYTHONUNBUFFERED=1
networks:
containers-network:
ipv4_address: 10.0.0.6
env_file:
- .env

pg-streaming-db:
container_name: pg-streaming-db
image: postgres:14.5-alpine
env_file:
- .env
volumes:
- ./volume-data/pg-streaming-db/data:/var/lib/postgresql/data
- ./sql-scripts/:/docker-entrypoint-initdb.d/
networks:
containers-network:
ipv4_address: 10.0.0.7
ports:
- 40000:5432
restart: always

spark:
container_name: spark
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "8080:8080"
networks:
containers-network:
ipv4_address: 10.0.0.8
depends_on:
- kafka
- python-script
env_file:
- .env
restart: always

spark-worker-1:
container_name: spark-worker-1
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
containers-network:
ipv4_address: 10.0.0.9
depends_on:
- spark
#command: "spark-submit --master spark://spark:7077 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /opt/bitnami/spark/spark-scripts/spark_stream_process.py"
volumes:
- ./spark-scripts/:/opt/bitnami/spark/spark-scripts
restart: always

spark-worker-2:
container_name: spark-worker-2
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
containers-network:
ipv4_address: 10.0.0.10
depends_on:
- spark
restart: always

metabase-app:
container_name: metabase-app
image: metabase/metabase
ports:
- 3000:3000
environment:
MB_DB_TYPE: postgres
MB_DB_DBNAME: metabase
MB_DB_PORT: ${POSTGRES_PORT}
MB_DB_USER: ${POSTGRES_USER}
MB_DB_PASS: ${POSTGRES_PASSWORD}
MB_DB_HOST: ${POSTGRES_HOST}
depends_on:
- pg-streaming-db
links:
- pg-streaming-db
env_file:
- .env
networks:
containers-network:
ipv4_address: 10.0.0.11
restart: always

networks:
containers-network:
driver: bridge
ipam:
config:
- subnet: 10.0.0.0/16
gateway: 10.0.0.1

Some considerations:

  • Zookeeper, Kafka and Control Center are provided by Confluentinc
  • init-kafka container was usefull to create the kafka topic if it not exists
  • python-script was the docker that runs the tweepy script to get the tweets and insert them into database
  • There was a need to create a docker network to solve the connection to the DB docker. The solution was to use a static IP address so the host never changes
  • There was a problem with spark, spark-worker-1 and spark-worker-2. When setting the command to the master run, it was thrown an error because the spark workers were not running at the time. When I tried to set the PySpark script to run on the worker, the job registered to work, but the worker died right after. Since I could not solve this issue (probably due to some config error), I started the PySpark script manually
  • The python-script waits 60 seconds before all of the containers are up, to call the twitter API.

The commands needed to up the dockers were:

docker compose build --no-cache

docker compose up -d

docker cp ./spark-scripts/spark_stream_process.py spark:/opt/bitnami/spark/spark_stream_process.py

docker exec spark /opt/bitnami/spark/bin/spark-submit --packages org.postgresql:postgresql:42.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /opt/bitnami/spark/spark_stream_process.py

diagram

Testing & Cleaningโ€‹

Because I'm a huge fan of Formula 1, I decided to put this to work on a F1 Grand Prix day with the race in Austin, Texas. To achieve that, I registered in database the word "#F1naSPORTTV" and waited for the results to see what word was the most written in Twitter. There was immediately a problem! The machine running the containers was a GCP e2-standard-2 (2 vCPU and 8 GB of memory). The CPUs were at their maximium capacity with processing being slow. So, just to test the speed of processing, I changed to a e2-standard-4 (4 vCPU and 16 GB of memory). The all process was a lot more smooth afer the change of resources available.

After cleaning data the AustinGP Word count the results were: diagram

To be honest, I was not expecting that much "dirty" data. I think when I understand the full capacity of Spark i'll be able to filter the data before processing it. Since this is just a little PoC, I did the process of cleaning the dirty data manually:

delete from word_count wc where length(word) > 20 or length(word) < 2;

With this, I took out the big words and the small ones also, that does not make sense.

Conclusions & Further Stepsโ€‹

Understanding Spark was the most difficult part of this project, but in the end I think I gave one more step to understand it's concepts. I plan to keep working with Spark in order to be better with it and to do more complex stuff.

Future steps:

  • Improve overall logging
  • Configure Kafka with some details (for example, retention time)
  • Solve a problem with the start of PySpark job mentioned early
  • Create an UI to register in DB the wanted term to search instead of beeing manually added
  • In Spark, change the Dataframe to filter the words with little characters
  • Implement some sentiment analysis with the words read from twitter

Due to the costs of having the VM running under a GCP VM Instance, the current pipeline is only available if I start the VM manually.

F1 Data Pipeline

ยท 8 min read

ELT Pipeline to Extract, Load and Transform F1 Data using Kaggle API, Docker, Python, GCP Cloud Storage, GCP BigQuery and Metabase

Overviewโ€‹

As a software engineer, this is the first ever data pipeline i built. The goal was to gain experience with modern technologies in data universe. This pipeline is scheduled to run daily( as there is no fixed data for F1 data updates). The scheduling is done by airflow and it has two DAGs. The first one is responsible for getting data from Kaggle API (in CSV files format), transforming it to parquet files, loading into Google Cloud Storage and creating a staging dataset. After this step is complete, an airflow sensor in the second DAG is triggered and it will call the dbt tool to perform the transformations needed and load them into Google BigQuery as a "refined dataset". Once completed, data will be available in dashboards created with the help of Metabase dashboards.

This data pipeline could not be done wihtout help from Data Talks Club Team and their Data Engineering Course.

Overview of data pipeline

Data Extractionโ€‹

The first step was to get F1 Data from Kaggle through Kaggle API and Python. After the zip file was downloaded, it was removed from filesystem in order to clear the workspace.

def download_data(directory):
logging.info(f"Starting ingestion - Searching for files in directory: {directory}")

try:
logging.info("Instancianting KaggleAPI.")
api = KaggleApi()

logging.info("Authenticating KaggleAPI.")
api.authenticate()

logging.info("Downloading files ... ")
api.dataset_download_files('rohanrao/formula-1-world-championship-1950-2020', path=f"{directory}")

list_dir = os.listdir(f"{directory}")
total_downloaded_files = len(list_dir)

logging.info(f"Downloaded {total_downloaded_files} files.")

if(total_downloaded_files == 0):
raise Exception("Aborting. No files downloaded")

logging.info("Extracting files ... ")
with zipfile.ZipFile(f"{directory}/formula-1-world-championship-1950-2020.zip", 'r') as zipref:
zipref.extractall(f"{directory}")

for item in list_dir:
if item.endswith(".zip"):
logging.info(f"Removing file: {item}.")
os.remove(os.path.join(directory, item))

except Exception as error:
logging.error(f"Error loading data. Error: {error}")
raise

After downloading data, the next step was to convert data into .parquet files as the "data conversions" defend. I did it in order to save some cloud storage space, as it compresses the file.

def convert_csv_to_parquet(directory):
logging.info("Starting conversion from CSV to parquet files.")

try:
for root, dirs, files in os.walk(f"{directory}"):
for file in files:
p=os.path.join(root,file)
full_path = os.path.abspath(p)

if full_path.endswith('.csv'):
logging.info(f"Reading file {full_path} in CSV format")
table = pyarrowcsv.read_csv(full_path)
logging.info(f"Writing file {full_path} to parquet format")
pyarrowpqt.write_table(table, full_path.replace('.csv', '.parquet'))

except Exception as error:
logging.error(f"Error converting CSV data to parquet. Error: {error}")
raise

After converting all CSV files to parquet format, it was time to upload it to GCP Cloud Storage. The goal was to create a directory structure like "f1-data/raw-files/[year]/[file]" to split data by F1 seasons which happen each civil year.

def upload_data_gcs(directory, bucket_id):
logging.info(f"Starting upload to GCS Bucket: {bucket_id} from directory: {directory} ")

try:
for root, dirs, files in os.walk(f"{directory}"):
for file in files:
p=os.path.join(root,file)
parquet_file_full_path = os.path.abspath(p)

if file.endswith('.parquet'):
logging.info(f"Authenticating in GCS to send parquet_file: {file} ")
client = storage.Client()
bucket = client.bucket(bucket_id)

logging.info("Creating blob.")
blob = bucket.blob(f"f1-data/raw-files/{date.today().year}/{file}")

logging.info(f"Uploading file from location: {parquet_file_full_path} ")
blob.upload_from_filename(parquet_file_full_path)

except Exception as error:
logging.error(f"Error uploading data. Error: {error}")
raise

At this time, i needed to authenticate and upload files using Google Cloud API .

Data Loadingโ€‹

After having the files in GCS, it was time to have them in BigQuery to query it. So, i created a staging dataset with the same exact data that exists in CSV files but in "queryable" format. After this dbt came into play.

def load_staging_files(directory, project_id, dataset_name, bucket_uri):
logging.info(f"Starting loading files for staging in BigQuery.")

try:
client = bigquery.Client()

job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
)

for root, dirs, files in os.walk(f"{directory}"):
for file in files:
if file.endswith('.parquet'):
bucket_file_location = str(bucket_uri) + str(date.today().year) + '/' + str(file)
logging.info(f"Searching file {bucket_file_location} from location: {bucket_uri}")

table_name = f"stg_{Path(f'{file}').stem}"
table_id = f"{project_id}.{dataset_name}.{table_name}"
logging.info(f"Uploading data from table: {table_id}")

load_job = client.load_table_from_uri(
bucket_file_location, table_id, job_config=job_config
)

load_job.result()

destination_table = client.get_table(table_id)
logging.info(f"Loaded: {destination_table.num_rows} rows.")

logging.info(f"Removing all files from {directory} after uploading them to BigQuery.")
for f in os.listdir(directory):
os.remove(os.path.join(directory, f))

except Exception as error:
logging.error(f"Error loading files for stating dataset. Error: {error}")
raise

Data Transformationโ€‹

This was the most challenging part. The goal was to start using dbt in order to get confortable with the platform. I thought it would help me get the queries done in an easy and mantainable way, and it was really a great help, yet i got the feeling that i wasn't extracting all of it's potential. Even thought it was not easy to start, i managed to get it to work and providing me with the queries i needed. Here's an example of how to extract the winning rate by driver.

{{ config(materialized='table', alias='driver_winning_rate') }}
with driver as (
select forename, surname, driverId from {{ source('src-bq-de-f1-project', 'stg_drivers') }}
),
results as (
select driverId, points, raceId from {{ source('src-bq-de-f1-project', 'stg_results') }}
),
races as (
select year, raceId from {{ source('src-bq-de-f1-project', 'stg_races') }}
),

final as (

select round(sum(q2.number_of_points)/sum(q1.number_of_participations),1) as winning_rate, q1.driver_name, q1.race_year
from (
select concat(driver.forename, ' ', driver.surname) as driver_name, count( distinct results.raceId) as number_of_participations, races.year as race_year, races.raceId as raceId
from results
inner join driver on driver.driverId = results.driverId
inner join races on races.raceId = results.raceId
group by driver_name, race_year, races.raceId
) q1
left join
(select concat(driver.forename, ' ', driver.surname) as driver_name, sum(points) as number_of_points, races.year as race_year , races.raceId as raceId
from results
inner join driver on driver.driverId = results.driverId
inner join races on races.raceId = results.raceId
group by driver_name, race_year, races.raceId
) q2
on q1.driver_name = q2.driver_name and q1.raceId = q2.raceId and q1.race_year = q2.race_year
where q2.number_of_points > 0
group by q1.driver_name, q1.race_year
)

select * from final

The result of the transformations made by dbt were uploaded to a "refined dataset" that feeded the Metabase dashboard.

Data Visualizationโ€‹

After gathering all info, it was time to put it all together in Metabase. It was a learning process, but i think it looks good.

Constructors Dashboard:

Drivers Dashboard:

Integrating all technologiesโ€‹

The final step was integrating Airflow, dbt, Metabase under Docker supported by Google Cloud Infrastructure.

Docker The image was built based on airflow official image, built with the help of a Dockerfile.

FROM apache/airflow:2.3.3

ENV AIRFLOW_HOME=/opt/airflow

USER ${AIRFLOW_UID}
#Installing python dependencies
RUN pip install --upgrade pip
RUN pip install kaggle
RUN pip install pyarrow
RUN pip install dbt-bigquery

#Making directories for airflow and kaggle.
RUN mkdir -p /opt/airflow/env_files/kaggle/
RUN mkdir -p /opt/airflow/env_files/google/credentials
RUN mkdir -p /opt/airflow/temp/download/kaggle/

#Copy files to previous created folders
COPY .google/credentials/google_credentials.json /opt/airflow/env_files/google/credentials/
COPY .kaggle/kaggle.json /opt/airflow/env_files/kaggle/

#Set permissions
USER root
RUN chown -R airflow /opt/airflow/env_files/
RUN chown -R airflow /opt/airflow/temp/download/kaggle/
RUN chmod 770 /opt/airflow/temp/download/kaggle/

RUN mkdir -p /opt/dbt/

#Copy dbt files
WORKDIR /opt/dbt/
COPY dbt/dbt_project.yml .
COPY dbt/profiles.yml .
ADD --chown=airflow:root dbt/analyses/ analyses/
ADD --chown=airflow:root dbt/dbt_packages/ dbt_packages/
ADD --chown=airflow:root dbt/logs logs/
ADD --chown=airflow:root dbt/macros macros/
ADD --chown=airflow:root dbt/models models/
ADD --chown=airflow:root dbt/seeds seeds/
ADD --chown=airflow:root dbt/snapshots snapshots/
ADD --chown=airflow:root dbt/target target/
ADD --chown=airflow:root dbt/tests tests/

#Set permissions
RUN chmod -R 770 /opt/dbt/logs
RUN chmod -R 770 /opt/dbt/target

#GCP Specifics
#Ref: https://airflow.apache.org/docs/docker-stack/recipes.html
USER 0
SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"]
ARG CLOUD_SDK_VERSION=322.0.0
ENV GCLOUD_HOME=/home/google-cloud-sdk
ENV PATH="${GCLOUD_HOME}/bin/:${PATH}"

RUN DOWNLOAD_URL="https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz" \
&& TMP_DIR="$(mktemp -d)" \
&& curl -fL "${DOWNLOAD_URL}" --output "${TMP_DIR}/google-cloud-sdk.tar.gz" \
&& mkdir -p "${GCLOUD_HOME}" \
&& tar xzf "${TMP_DIR}/google-cloud-sdk.tar.gz" -C "${GCLOUD_HOME}" --strip-components=1 \
&& "${GCLOUD_HOME}/install.sh" \
--bash-completion=false \
--path-update=false \
--usage-reporting=false \
--quiet \
&& rm -rf "${TMP_DIR}" \
&& gcloud --version

WORKDIR $AIRFLOW_HOME

USER $AIRFLOW_UID

The containers were managed by docker compose, available in airflow documentation. This docker compose file was changed to include metabase containers.

  postgres-metabase:
image: postgres:13
environment:
POSTGRES_USER: <hidden>
POSTGRES_PASSWORD: <hidden>
POSTGRES_DB: <hidden>
volumes:
- /opt/docker/postgres-data:/var/lib/postgresql/data
ports:
- 5433:5432
restart: always

metabase-app:
image: metabase/metabase
restart: always
ports:
- 3000:3000
environment:
MB_DB_TYPE: postgres
MB_DB_DBNAME: <hidden>
MB_DB_PORT: 5432
MB_DB_USER: <hidden>
MB_DB_PASS: <hidden>
MB_DB_HOST: postgres-metabase
depends_on:
- postgres-metabase
links:
- postgres-metabase

Airflow

Were created two DAGs:

  • #1 - Get and load all data to Google Cloud Storage
  • #2 - Call dbt to perform transformations and get them ready to display information

The DAG #1 connects with Kaggle API, converts data to parquet and upload it to GCS in the first three steps. Meanwhile, it checks if the refined dataset exists and it deletes the existing staging dataset to load new and fresh data. Then, the tasks t5 and t6 it's where the staging dataset it's created again and the upload to BigQuery with the new data it's done. DAG #2 comes to life when the sensor (ExternalTaskSensor) signals that the last task from DAG #1 it's done (t6_load_staging_data) and starts the DAG #2.

dag-1 dag-2

Conclusions & Further Stepsโ€‹

This it's not my confort zone, however i am satisfied with the outcome and with all the work behind the shown graphics.

Next steps:

  • Create dbt docs to document all tables and columns inside refined dataset
  • Create dbt tests for validating the consistency of data
  • Review retry mecanisms in these two DAGs in airflow
  • Integrate a "live" streaming source of data with help of Kafka.

Due to the costs of having this pipeline running under a GCP VM Instance, the current pipeline is only available if i start the VM manually.