Skip to main content

2 posts tagged with "GCP"

View All Tags

Twitter Real-time Word Processing

ยท 9 min read

Real time word processing using Apache Spark (PySpark).

Overviewโ€‹

The main goal of this project was to do a little PoC and learn how real-time data processing works. When someone begins their journey in data engineering, they will usually start out with a similar project. What I did was use a much known API from Twitter. With the help of Tweepy I was able to gather the tweets of specific content based on the keywords stored on a Postgres database. Following this search, the Tweepy sent the tweets to a Kafka topic to ensure that none of the tweets were lost. Then Spark started it's work with two worker nodes to read data from the Kafka topic and to process the tweets, saving the result in a postgres table which fed a metabase dashboard.

diagram

Getting Tweets with Tweepyโ€‹

Tweepy is a Python library that provides access to the entire Twitter REST API. After setting up a developer account in Twitter, the next step was to implement the code to get the tweets and send them to the previously created Kafka topic.

#read_twitter_data_stream.py

import tweepy
from kafka import KafkaProducer
from json import dumps
import os
import psycopg2

topic = os.environ.get("TOPIC_NAME")

api_key = os.environ.get("TWITTER_API_KEY")
api_secret = os.environ.get("TWITTER_API_SECRET")
bearer_token = os.environ.get("TWITTER_BEARER_TOKEN")
access_token = os.environ.get("TWITTER_ACCESS_TOKEN")
access_token_secret = os.environ.get("TWITTER_ACCESS_TOKEN_SECRET")

class TwitterStream(tweepy.StreamingClient):
# This function gets called when the stream is working
def on_connect(self):
print("Connected")

# This function gets called when a tweet passes the stream
def on_tweet(self, tweet):

# Displaying tweet in console
if tweet.referenced_tweets == None:
print(f"Sending to Kafka: {tweet.text}")
try:
producer.send(topic, tweet.text)
producer.flush()
except:
print("An exception occurred")


db_name = os.environ.get("POSTGRES_DB")
db_host = os.environ.get("POSTGRES_HOST")
db_user = os.environ.get("POSTGRES_USER")
db_pass = os.environ.get("POSTGRES_PASSWORD")
db_port = os.environ.get("POSTGRES_PORT")

conn = psycopg2.connect(database=db_name,
host=db_host,
user=db_user,
password=db_pass,
port=db_port)

cursor = conn.cursor()

cursor.execute('SET search_path TO streaming_data')
cursor.execute('SELECT value FROM streaming_data.term_search WHERE active is true')

results = cursor.fetchall()

conn.close()

if len(results) > 0:
for term in results:
print(f"Printing search term: {term[0]}")

producer = KafkaProducer(bootstrap_servers=['kafka:29092'],
value_serializer=lambda x:
dumps(x, ensure_ascii=False).encode('utf-8'))

client = tweepy.Client(bearer_token, api_key, api_secret,
access_token, access_token_secret)

auth = tweepy.OAuth1UserHandler(
api_key, api_secret, access_token, access_token_secret)
api = tweepy.API(auth)

# Creating Stream object
stream = TwitterStream(bearer_token=bearer_token)

print("Search for previous rules.")
previousRules = stream.get_rules().data

if previousRules:
print("Deleting previous rules.")
stream.delete_rules(previousRules)

for term in results:
stream.add_rules(tweepy.StreamRule(term[0]))

stream.filter(tweet_fields=["referenced_tweets"])
else:
print("No search term find in database. Aborting.")

The authentication details are being injected through an '.env' file.

Queuing Data With Kafkaโ€‹

To set up Kafka I used a Docker image from Confluentinc. Confluentic also has a Docker image to manage the Kafka topics, so I installed the UI interface, which is named "control-center". To create the topic, the command I ran was:

kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic tp-twitter-stream --replication-factor 1 --partitions 1

diagram

Data (Real-time) Processingโ€‹

This was by far the most difficult part of all process. In my opinion, Spark is hard to learn. With no prior experience with Spark, I got very confused with how it works, then I realized that I should have start with Spark, instead of start with Spark Structured Streaming (which seems more complex). With PySpark, I was able to follow the docs available in Apache Spark website to get the implementation of word count done the correct way.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

db_name = os.environ.get("POSTGRES_DB")
db_host = os.environ.get("POSTGRES_HOST")
db_user = os.environ.get("POSTGRES_USER")
db_pass = os.environ.get("POSTGRES_PASSWORD")
db_port = os.environ.get("POSTGRES_PORT")
db_schema = os.environ.get("POSTGRES_SCHEMA")

db_target_properties = {"user": db_user, "password": db_pass, "driver": 'org.postgresql.Driver'}
db_connection_string = f"jdbc:postgresql://{db_host}:{db_port}/{db_name}?currentSchema={db_schema}"

def split_words(input_expression):
return explode(split(input_expression, " "))

def foreach_batch_function(df, id):
df.write.jdbc(url=db_connection_string,table='word_count', properties=db_target_properties, mode='overwrite')
pass

print("## Starting Processing ##")

spark = SparkSession \
.builder \
.appName("Twitter Stream Word Count") \
.master("spark://10.0.0.8:7077")\
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("## Creating session ##")

kafka_input_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "10.0.0.3:29092") \
.option("subscribe", "tp-twitter-stream") \
.option("startingOffsets", "earliest") \
.load()

print("## Listening to Kafka Topic ##")

# Split the lines into words
splited_words = split_words(kafka_input_df.value)

words_df = kafka_input_df.select(splited_words.alias("word"),kafka_input_df.timestamp.alias('time'))

# Take special chars out
words_df_clean = words_df.withColumn("word", regexp_replace(col("word"), "[^0-9a-zA-Z]+", ""))

counts_df = words_df_clean.withWatermark("time", "1 minutes").groupBy("word").count()
final_df = counts_df.writeStream.outputMode("complete").foreachBatch(foreach_batch_function).start()
final_df.awaitTermination()

diagram

Data Visualizationโ€‹

For data visualization, I used Metabase. It's an AMAZING product to build dashboards! Since I was short on time to finish this project, I decided to use Metabase again. I saw alternatives like Redash and Apache Superset but the complexity of their docker-compose files made my decision easy: there was no need to have a super complex product to show a simple desired chart.

Integrating all technologies with Dockerโ€‹

To integrate all of this technologies together, I've used Docker with docker-compose to simplify the management of the ten dockers.

Dockerfile:

FROM python:3
ENV PYTHONUNBUFFERED=1
RUN mkdir -p /app
WORKDIR /app

#Installing python dependencies
RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install kafka-python
RUN pip install psycopg2

COPY ./streaming-files/read_twitter_data_stream.py .
CMD ["/bin/bash", "-c", "/bin/sleep 60 && python ./read_twitter_data_stream.py"]

Docker Compose:

version: "2"
services:
zookeeper:
container_name: zookeeper
user: "0:0"
image: "confluentinc/cp-zookeeper:latest"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
restart: always
volumes:
- ./volume-data/zookeeper/data:/var/lib/zookeeper/data
- ./volume-data/zookeeper/log:/var/lib/zookeeper/log
networks:
containers-network:
ipv4_address: 10.0.0.2

kafka:
container_name: kafka
user: "0:0"
image: "confluentinc/cp-kafka:latest"
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
restart: always
volumes:
- ./volume-data/kafka/data:/var/lib/kafka/data
networks:
containers-network:
ipv4_address: 10.0.0.3

control-center:
container_name: control-center
image: "confluentinc/cp-enterprise-control-center:latest"
hostname: control-center
depends_on:
- zookeeper
- kafka
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:29092"
CONTROL_CENTER_ZOOKEEPER_CONNECT: "zookeeper:2181"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
restart: always
networks:
containers-network:
ipv4_address: 10.0.0.4

init-kafka:
container_name: init-kafka
image: confluentinc/cp-kafka:latest
depends_on:
- kafka
entrypoint: ["/bin/sh", "-c"]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list

echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic tp-twitter-stream --replication-factor 1 --partitions 1

echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
networks:
containers-network:
ipv4_address: 10.0.0.5

python-script:
container_name: python-script
build: ./
image: python:3
depends_on:
- zookeeper
- kafka
- init-kafka
- pg-streaming-db
environment:
- PYTHONUNBUFFERED=1
networks:
containers-network:
ipv4_address: 10.0.0.6
env_file:
- .env

pg-streaming-db:
container_name: pg-streaming-db
image: postgres:14.5-alpine
env_file:
- .env
volumes:
- ./volume-data/pg-streaming-db/data:/var/lib/postgresql/data
- ./sql-scripts/:/docker-entrypoint-initdb.d/
networks:
containers-network:
ipv4_address: 10.0.0.7
ports:
- 40000:5432
restart: always

spark:
container_name: spark
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "8080:8080"
networks:
containers-network:
ipv4_address: 10.0.0.8
depends_on:
- kafka
- python-script
env_file:
- .env
restart: always

spark-worker-1:
container_name: spark-worker-1
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
containers-network:
ipv4_address: 10.0.0.9
depends_on:
- spark
#command: "spark-submit --master spark://spark:7077 --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /opt/bitnami/spark/spark-scripts/spark_stream_process.py"
volumes:
- ./spark-scripts/:/opt/bitnami/spark/spark-scripts
restart: always

spark-worker-2:
container_name: spark-worker-2
image: docker.io/bitnami/spark:3.3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
containers-network:
ipv4_address: 10.0.0.10
depends_on:
- spark
restart: always

metabase-app:
container_name: metabase-app
image: metabase/metabase
ports:
- 3000:3000
environment:
MB_DB_TYPE: postgres
MB_DB_DBNAME: metabase
MB_DB_PORT: ${POSTGRES_PORT}
MB_DB_USER: ${POSTGRES_USER}
MB_DB_PASS: ${POSTGRES_PASSWORD}
MB_DB_HOST: ${POSTGRES_HOST}
depends_on:
- pg-streaming-db
links:
- pg-streaming-db
env_file:
- .env
networks:
containers-network:
ipv4_address: 10.0.0.11
restart: always

networks:
containers-network:
driver: bridge
ipam:
config:
- subnet: 10.0.0.0/16
gateway: 10.0.0.1

Some considerations:

  • Zookeeper, Kafka and Control Center are provided by Confluentinc
  • init-kafka container was usefull to create the kafka topic if it not exists
  • python-script was the docker that runs the tweepy script to get the tweets and insert them into database
  • There was a need to create a docker network to solve the connection to the DB docker. The solution was to use a static IP address so the host never changes
  • There was a problem with spark, spark-worker-1 and spark-worker-2. When setting the command to the master run, it was thrown an error because the spark workers were not running at the time. When I tried to set the PySpark script to run on the worker, the job registered to work, but the worker died right after. Since I could not solve this issue (probably due to some config error), I started the PySpark script manually
  • The python-script waits 60 seconds before all of the containers are up, to call the twitter API.

The commands needed to up the dockers were:

docker compose build --no-cache

docker compose up -d

docker cp ./spark-scripts/spark_stream_process.py spark:/opt/bitnami/spark/spark_stream_process.py

docker exec spark /opt/bitnami/spark/bin/spark-submit --packages org.postgresql:postgresql:42.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 /opt/bitnami/spark/spark_stream_process.py

diagram

Testing & Cleaningโ€‹

Because I'm a huge fan of Formula 1, I decided to put this to work on a F1 Grand Prix day with the race in Austin, Texas. To achieve that, I registered in database the word "#F1naSPORTTV" and waited for the results to see what word was the most written in Twitter. There was immediately a problem! The machine running the containers was a GCP e2-standard-2 (2 vCPU and 8 GB of memory). The CPUs were at their maximium capacity with processing being slow. So, just to test the speed of processing, I changed to a e2-standard-4 (4 vCPU and 16 GB of memory). The all process was a lot more smooth afer the change of resources available.

After cleaning data the AustinGP Word count the results were: diagram

To be honest, I was not expecting that much "dirty" data. I think when I understand the full capacity of Spark i'll be able to filter the data before processing it. Since this is just a little PoC, I did the process of cleaning the dirty data manually:

delete from word_count wc where length(word) > 20 or length(word) < 2;

With this, I took out the big words and the small ones also, that does not make sense.

Conclusions & Further Stepsโ€‹

Understanding Spark was the most difficult part of this project, but in the end I think I gave one more step to understand it's concepts. I plan to keep working with Spark in order to be better with it and to do more complex stuff.

Future steps:

  • Improve overall logging
  • Configure Kafka with some details (for example, retention time)
  • Solve a problem with the start of PySpark job mentioned early
  • Create an UI to register in DB the wanted term to search instead of beeing manually added
  • In Spark, change the Dataframe to filter the words with little characters
  • Implement some sentiment analysis with the words read from twitter

Due to the costs of having the VM running under a GCP VM Instance, the current pipeline is only available if I start the VM manually.

F1 Data Pipeline

ยท 8 min read

ELT Pipeline to Extract, Load and Transform F1 Data using Kaggle API, Docker, Python, GCP Cloud Storage, GCP BigQuery and Metabase

Overviewโ€‹

As a software engineer, this is the first ever data pipeline i built. The goal was to gain experience with modern technologies in data universe. This pipeline is scheduled to run daily( as there is no fixed data for F1 data updates). The scheduling is done by airflow and it has two DAGs. The first one is responsible for getting data from Kaggle API (in CSV files format), transforming it to parquet files, loading into Google Cloud Storage and creating a staging dataset. After this step is complete, an airflow sensor in the second DAG is triggered and it will call the dbt tool to perform the transformations needed and load them into Google BigQuery as a "refined dataset". Once completed, data will be available in dashboards created with the help of Metabase dashboards.

This data pipeline could not be done wihtout help from Data Talks Club Team and their Data Engineering Course.

Overview of data pipeline

Data Extractionโ€‹

The first step was to get F1 Data from Kaggle through Kaggle API and Python. After the zip file was downloaded, it was removed from filesystem in order to clear the workspace.

def download_data(directory):
logging.info(f"Starting ingestion - Searching for files in directory: {directory}")

try:
logging.info("Instancianting KaggleAPI.")
api = KaggleApi()

logging.info("Authenticating KaggleAPI.")
api.authenticate()

logging.info("Downloading files ... ")
api.dataset_download_files('rohanrao/formula-1-world-championship-1950-2020', path=f"{directory}")

list_dir = os.listdir(f"{directory}")
total_downloaded_files = len(list_dir)

logging.info(f"Downloaded {total_downloaded_files} files.")

if(total_downloaded_files == 0):
raise Exception("Aborting. No files downloaded")

logging.info("Extracting files ... ")
with zipfile.ZipFile(f"{directory}/formula-1-world-championship-1950-2020.zip", 'r') as zipref:
zipref.extractall(f"{directory}")

for item in list_dir:
if item.endswith(".zip"):
logging.info(f"Removing file: {item}.")
os.remove(os.path.join(directory, item))

except Exception as error:
logging.error(f"Error loading data. Error: {error}")
raise

After downloading data, the next step was to convert data into .parquet files as the "data conversions" defend. I did it in order to save some cloud storage space, as it compresses the file.

def convert_csv_to_parquet(directory):
logging.info("Starting conversion from CSV to parquet files.")

try:
for root, dirs, files in os.walk(f"{directory}"):
for file in files:
p=os.path.join(root,file)
full_path = os.path.abspath(p)

if full_path.endswith('.csv'):
logging.info(f"Reading file {full_path} in CSV format")
table = pyarrowcsv.read_csv(full_path)
logging.info(f"Writing file {full_path} to parquet format")
pyarrowpqt.write_table(table, full_path.replace('.csv', '.parquet'))

except Exception as error:
logging.error(f"Error converting CSV data to parquet. Error: {error}")
raise

After converting all CSV files to parquet format, it was time to upload it to GCP Cloud Storage. The goal was to create a directory structure like "f1-data/raw-files/[year]/[file]" to split data by F1 seasons which happen each civil year.

def upload_data_gcs(directory, bucket_id):
logging.info(f"Starting upload to GCS Bucket: {bucket_id} from directory: {directory} ")

try:
for root, dirs, files in os.walk(f"{directory}"):
for file in files:
p=os.path.join(root,file)
parquet_file_full_path = os.path.abspath(p)

if file.endswith('.parquet'):
logging.info(f"Authenticating in GCS to send parquet_file: {file} ")
client = storage.Client()
bucket = client.bucket(bucket_id)

logging.info("Creating blob.")
blob = bucket.blob(f"f1-data/raw-files/{date.today().year}/{file}")

logging.info(f"Uploading file from location: {parquet_file_full_path} ")
blob.upload_from_filename(parquet_file_full_path)

except Exception as error:
logging.error(f"Error uploading data. Error: {error}")
raise

At this time, i needed to authenticate and upload files using Google Cloud API .

Data Loadingโ€‹

After having the files in GCS, it was time to have them in BigQuery to query it. So, i created a staging dataset with the same exact data that exists in CSV files but in "queryable" format. After this dbt came into play.

def load_staging_files(directory, project_id, dataset_name, bucket_uri):
logging.info(f"Starting loading files for staging in BigQuery.")

try:
client = bigquery.Client()

job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
)

for root, dirs, files in os.walk(f"{directory}"):
for file in files:
if file.endswith('.parquet'):
bucket_file_location = str(bucket_uri) + str(date.today().year) + '/' + str(file)
logging.info(f"Searching file {bucket_file_location} from location: {bucket_uri}")

table_name = f"stg_{Path(f'{file}').stem}"
table_id = f"{project_id}.{dataset_name}.{table_name}"
logging.info(f"Uploading data from table: {table_id}")

load_job = client.load_table_from_uri(
bucket_file_location, table_id, job_config=job_config
)

load_job.result()

destination_table = client.get_table(table_id)
logging.info(f"Loaded: {destination_table.num_rows} rows.")

logging.info(f"Removing all files from {directory} after uploading them to BigQuery.")
for f in os.listdir(directory):
os.remove(os.path.join(directory, f))

except Exception as error:
logging.error(f"Error loading files for stating dataset. Error: {error}")
raise

Data Transformationโ€‹

This was the most challenging part. The goal was to start using dbt in order to get confortable with the platform. I thought it would help me get the queries done in an easy and mantainable way, and it was really a great help, yet i got the feeling that i wasn't extracting all of it's potential. Even thought it was not easy to start, i managed to get it to work and providing me with the queries i needed. Here's an example of how to extract the winning rate by driver.

{{ config(materialized='table', alias='driver_winning_rate') }}
with driver as (
select forename, surname, driverId from {{ source('src-bq-de-f1-project', 'stg_drivers') }}
),
results as (
select driverId, points, raceId from {{ source('src-bq-de-f1-project', 'stg_results') }}
),
races as (
select year, raceId from {{ source('src-bq-de-f1-project', 'stg_races') }}
),

final as (

select round(sum(q2.number_of_points)/sum(q1.number_of_participations),1) as winning_rate, q1.driver_name, q1.race_year
from (
select concat(driver.forename, ' ', driver.surname) as driver_name, count( distinct results.raceId) as number_of_participations, races.year as race_year, races.raceId as raceId
from results
inner join driver on driver.driverId = results.driverId
inner join races on races.raceId = results.raceId
group by driver_name, race_year, races.raceId
) q1
left join
(select concat(driver.forename, ' ', driver.surname) as driver_name, sum(points) as number_of_points, races.year as race_year , races.raceId as raceId
from results
inner join driver on driver.driverId = results.driverId
inner join races on races.raceId = results.raceId
group by driver_name, race_year, races.raceId
) q2
on q1.driver_name = q2.driver_name and q1.raceId = q2.raceId and q1.race_year = q2.race_year
where q2.number_of_points > 0
group by q1.driver_name, q1.race_year
)

select * from final

The result of the transformations made by dbt were uploaded to a "refined dataset" that feeded the Metabase dashboard.

Data Visualizationโ€‹

After gathering all info, it was time to put it all together in Metabase. It was a learning process, but i think it looks good.

Constructors Dashboard:

Drivers Dashboard:

Integrating all technologiesโ€‹

The final step was integrating Airflow, dbt, Metabase under Docker supported by Google Cloud Infrastructure.

Docker The image was built based on airflow official image, built with the help of a Dockerfile.

FROM apache/airflow:2.3.3

ENV AIRFLOW_HOME=/opt/airflow

USER ${AIRFLOW_UID}
#Installing python dependencies
RUN pip install --upgrade pip
RUN pip install kaggle
RUN pip install pyarrow
RUN pip install dbt-bigquery

#Making directories for airflow and kaggle.
RUN mkdir -p /opt/airflow/env_files/kaggle/
RUN mkdir -p /opt/airflow/env_files/google/credentials
RUN mkdir -p /opt/airflow/temp/download/kaggle/

#Copy files to previous created folders
COPY .google/credentials/google_credentials.json /opt/airflow/env_files/google/credentials/
COPY .kaggle/kaggle.json /opt/airflow/env_files/kaggle/

#Set permissions
USER root
RUN chown -R airflow /opt/airflow/env_files/
RUN chown -R airflow /opt/airflow/temp/download/kaggle/
RUN chmod 770 /opt/airflow/temp/download/kaggle/

RUN mkdir -p /opt/dbt/

#Copy dbt files
WORKDIR /opt/dbt/
COPY dbt/dbt_project.yml .
COPY dbt/profiles.yml .
ADD --chown=airflow:root dbt/analyses/ analyses/
ADD --chown=airflow:root dbt/dbt_packages/ dbt_packages/
ADD --chown=airflow:root dbt/logs logs/
ADD --chown=airflow:root dbt/macros macros/
ADD --chown=airflow:root dbt/models models/
ADD --chown=airflow:root dbt/seeds seeds/
ADD --chown=airflow:root dbt/snapshots snapshots/
ADD --chown=airflow:root dbt/target target/
ADD --chown=airflow:root dbt/tests tests/

#Set permissions
RUN chmod -R 770 /opt/dbt/logs
RUN chmod -R 770 /opt/dbt/target

#GCP Specifics
#Ref: https://airflow.apache.org/docs/docker-stack/recipes.html
USER 0
SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"]
ARG CLOUD_SDK_VERSION=322.0.0
ENV GCLOUD_HOME=/home/google-cloud-sdk
ENV PATH="${GCLOUD_HOME}/bin/:${PATH}"

RUN DOWNLOAD_URL="https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz" \
&& TMP_DIR="$(mktemp -d)" \
&& curl -fL "${DOWNLOAD_URL}" --output "${TMP_DIR}/google-cloud-sdk.tar.gz" \
&& mkdir -p "${GCLOUD_HOME}" \
&& tar xzf "${TMP_DIR}/google-cloud-sdk.tar.gz" -C "${GCLOUD_HOME}" --strip-components=1 \
&& "${GCLOUD_HOME}/install.sh" \
--bash-completion=false \
--path-update=false \
--usage-reporting=false \
--quiet \
&& rm -rf "${TMP_DIR}" \
&& gcloud --version

WORKDIR $AIRFLOW_HOME

USER $AIRFLOW_UID

The containers were managed by docker compose, available in airflow documentation. This docker compose file was changed to include metabase containers.

  postgres-metabase:
image: postgres:13
environment:
POSTGRES_USER: <hidden>
POSTGRES_PASSWORD: <hidden>
POSTGRES_DB: <hidden>
volumes:
- /opt/docker/postgres-data:/var/lib/postgresql/data
ports:
- 5433:5432
restart: always

metabase-app:
image: metabase/metabase
restart: always
ports:
- 3000:3000
environment:
MB_DB_TYPE: postgres
MB_DB_DBNAME: <hidden>
MB_DB_PORT: 5432
MB_DB_USER: <hidden>
MB_DB_PASS: <hidden>
MB_DB_HOST: postgres-metabase
depends_on:
- postgres-metabase
links:
- postgres-metabase

Airflow

Were created two DAGs:

  • #1 - Get and load all data to Google Cloud Storage
  • #2 - Call dbt to perform transformations and get them ready to display information

The DAG #1 connects with Kaggle API, converts data to parquet and upload it to GCS in the first three steps. Meanwhile, it checks if the refined dataset exists and it deletes the existing staging dataset to load new and fresh data. Then, the tasks t5 and t6 it's where the staging dataset it's created again and the upload to BigQuery with the new data it's done. DAG #2 comes to life when the sensor (ExternalTaskSensor) signals that the last task from DAG #1 it's done (t6_load_staging_data) and starts the DAG #2.

dag-1 dag-2

Conclusions & Further Stepsโ€‹

This it's not my confort zone, however i am satisfied with the outcome and with all the work behind the shown graphics.

Next steps:

  • Create dbt docs to document all tables and columns inside refined dataset
  • Create dbt tests for validating the consistency of data
  • Review retry mecanisms in these two DAGs in airflow
  • Integrate a "live" streaming source of data with help of Kafka.

Due to the costs of having this pipeline running under a GCP VM Instance, the current pipeline is only available if i start the VM manually.