Data Pipeline Implementation: Do It Yourself

These instructions build on what has been discussed in ELT, ETL and Data Pipelines. In that guide, we discussed the problems that arise in storing and using data for a company. In response to those problems, we introduced the concept of Data Pipelines, which helps the company become better aware of the data loading steps and incorporating these steps in the most optimal way to create a Data-driven Culture. We also discussed some specific tooling that can be used to properly deploy Data Pipelines.

Preparing for Implementation

Now that we understand the concepts behind Data Pipelines, we will now apply them to implement a functioning Data Pipeline. Just like most of our data engineering processes, we follow a step-by-step plan and provide an implementation strategy for each step. Hopefully a step-by-step plan will give you a solid foundation when you are constructing your own Data Pipeline as well as the implementation methods. You can find the whole code on our Giftlab.

To prepare for the implementation, we require a data source that generates real-time data that can be retrieved periodically. We have chosen twitter data as an example, from which we can collect the highest trending tweets from the Netherlands. For this purpose, we created a Twitter Developer account and granted the necessary access tokens. We also make use of the following tooling:

● Python 3.8

● Docker

● Docker Compose

Step 1: Access Data Source

Before we actually start building anything, it is important that we have access to the data source and can acquire data from it. As part of the preparation, we created a Twitter Developer Account and generated access tokens there. First, we had to apply for the Developed Account, and once approved, we created another application within the developer account to create authentication tokens. Make sure to double check that this data source is callable via an API or that an extract of the data is shared periodically via a fileshare service.

Step 2: Authenticate API

Before we can use the API or fileshare service, we must first authorize ourselves to use it. The specific authentication process depends on the specific service we want to use.

For the twitter API, we need to provide four different tokens:

To invoke the Twitter API, we use the Python package Tweepy. It looks like this when we authorize ourselves:

Step 3: Extract Data

Now that we have access to the data source (either through an API or a data extract), we can import data. We call API, ask for the data we need and the API delivers it.

In this step, it may also be useful to perform some transformations on the data. Think about removing or transforming privacy sensitive data or excluding data that you are certain you won’t use.

We select the trending topics from Twitter and the corresponding top 10 tweets. We also perform some transformations on the data here, because we only want to store the twitter text.

Step 4: Load Data

Now that we have brought in the data we need, we store it in its rawest form. Remember that the storage method depends very much on which storage service you use.

Storage services can be roughly divided into the following four types. The first type is a Data Lake where the data is stored in several file formats. These include JSON, CSV, SQL and TXT. This form of storage is usually the cheapest and is often used to store data in its most raw form and/or as a backup location. The second type is a SQL database, where the data is stored in a structured tabular form according to a predefined table schema. This form of storage is usually more expensive and is best for making data in its purest form available for analysis. The third type is a NoSQL database, where the data is stored semi-structured in a key-value format. This form usually lands between a Data Lake and an SQL database in terms of cost and is used to store unstructured data, such as tweets, within a standardized format. The last type is a Graph database, where the data is stored with relationships between the different data points. This type of storage is often more expensive and used to generate more complex insights from the data.

We store the data on the same device as we run the code and within JSON files.

We chose the JSON format because tweets are a semi-structured data point. In fact, tweets can contain links, images, text, emojis, and hashtags. JSON allows us to store the data in a readable schema. Alternatives that are often used are CSV, XLSX and YAML. The disadvantage of CSV and XLSX is that the schemas are less flexible than those of JSON, because they have a fixed separator to differentiate the columns. These separators may already exist in the tweets themselves, making the storage process unnecessarily complex. YAML and JSON are almost identical, only YAML is a bit more readable for users than JSON.

So far, we have been creating a local Data Lake. We also keep an index to sustain an overview of the data for our Data Lake. Here we use the name of the data as the key, and the location of the JSON file as the value:

Step 5: Data Versioning

Now that we have our data stored, we can apply some data versioning. To take full advantage of data versioning, it is important to include any change in the data storage while versioning. We use the DVC tool for this.

Assuming we have not already applied the DVC, we will first initiate it on our new data:

After each change on the data, run the following command again:

Use data to generate insights.

Step 6: Including Previous Data

This step begins by searching information to produce insight/analysis parameters or goals based on the previously stored data. The previous five steps are only required whenever a new data source is added. In terms of applying analysis to the data, the first step starts here.

Before we generate the insights, we must first extract the specific data from our own storage. We will use the most recent trending tweets set that we have in our data storage:

Step 7: Transform Tweets

In this step, we apply a few transformation steps to the data so that we can generate insights from it. Think about combining different data sources or converting the data to a better format for generating insights.

There is often an initial exploration step to choose the right transformation steps and what kind of insights can be extracted from the data and what type of data has added value. However, this goes beyond the scope of this blog and is therefore not discussed within this guide.

You can see below how we have performed transformations on the Twitter data so that only the keywords of each tweet remain:

Step 8: Generating Insights

Now, we can use the data transformed from the previous step to generate insights. There are many ways to generate insights, the optimal way is always dependent on the end users and what exactly you want the insights to convey.

Here, we apply topic modeling to the transformed tweets. With topic modeling, coherent keywords are immediately grouped into a Topic, which represents the core topics of all the tweets:

Execute the code periodically.

Step 9: Add Code to a Scheduler

Now that we have written and tested a full version of the Data Pipeline, it is time to add it to a Scheduler. The Scheduler ensures that the code is automatically executed according to a time regime that you provide. We use the Prefect Python package because it is intuitive, user-friendly and helps make the code more readable.

First, we schedule the steps to import the trending tweets from twitter in order to define the complete Data Pipeline:

Then, we add a schedule, which defines when the Data Pipeline should be executed:

Finally, we initiate the Scheduler: