Extracting data from Azure Cosmos DB through Data Flow Pipelines

Photo by Carlos Muza on Unsplash

Today’s applications are required to be highly responsive and always online. To achieve low latency and high availability, instances of these applications need to be deployed in datacenters that are close to their users. Applications need to respond in real time to large changes in usage at peak hours, store ever increasing volumes of data, and make this data available to users in milliseconds.

Azure Cosmos DB is a globally distributed, multi-model database service. With a click of a button, Cosmos DB enables you to elastically and independently scale throughput and storage across any number of Azure regions worldwide. You can elastically scale throughput and storage, and take advantage of fast, single-digit-millisecond data access using your favorite API including SQL, MongoDB, Cassandra, Tables, or Gremlin.

When using the Cosmos DB with the MongoDB API, directly connecting it with a visualization tool like Power BI or Tableau or Data Studio leads to certain hiccups. Thankfully, with Azure Data Factory, you can set up data pipelines that transform the document data into a relational data, making it easier for your data analysts to run their analysis and create dashboards or reports for you to use.

In this article, we’ll be looking at setting up a Data Flow pipeline to transfer data from a Cosmos DB instance into a Data Factory SQL Database, without any developer assistance.

The Setup

You should have a slave setup for your Cosmos (or use read-only keys if the production load isn’t too high and you’ve a low frequency of copy).
Create the following in a resource group:

  1. Azure Data Factory
  2. Azure SQL Server with a SQL database

When you create the SQL server, allow Azure services to connect to it and ensure you locate the Firewall service.

For this article, we’ll take a real-life case of transfering your order information (stored in your Cosmos DB Order DB) to the Azure SQL server. Here is the (simplified) schema used by the Cosmos DB:

{"orderID": "String","customerName": "String","customerEmail": "String","customerID": "BIGINT","totalOrderAmount": "BIGINT","orderDetails": {    "itemID": "BIGINT",    "itemListPrice": "INT",    "itemName": "String",    "itemCategory": "String",    "itemSalePrice": "INT",    "itemDiscountAmount": "INT"},"orderStatus": "STRING","createdAt": "DATETIME","updatedAt": "DATETIME"}

This opens us up for two tables where we’ll store data: the orders table and the items table. We’ll consider only the orders table for now.

Building the storage database

We already know what our source schema looks like. It is time we define the sink database.

One simple way to do this, is to take a single document from the table in Cosmos and upload it on: https://json-csv.com/.

Using the above, we get a CSV view of the column headers.

Transpose this to get all the column headers in a single column and copy it to Sublime Text. From there, create a “CREATE TABLE” SQL :

create table orders (
orderID VARCHAR (20) PRIMARY KEY,
customerName VARCHAR (20),
customerEmail VARCHAR (20),
customerID VARCHAR (20),
totalOrderAmount VARCHAR (20),
orderStatus VARCHAR (20),
createdAt VARCHAR (20),
updatedAt VARCHAR (20)
)

Note that I’ve marked everything as VARCHAR, but you can provide data types for each of your columns. Another important thing to note is how datetime is stored in Cosmos — in the case I dealt with, we used UNIX timestamp (millionth value) — which was difficult for SQL to convert into a standard datetime. Hence I opted to use VARCHAR, leaving the clean up on the analyst.

Keys & Indexes

Setup an index with this query:

CREATE INDEX index_name
ON table_name (column1, column2, ...);
--- in our case using UNIQUE as our order numbers have to be uniqueCREATE UNIQUE INDEX orderIndex
ON orders (orderId);
--- note that SQL Server will automatically create indexes for primary keys, so the above statement is redundant for us.

Creating the storage table

Building the pipeline

A new page will open with the Azure Data Factory options. Click on Copy Data in the middle to see this screen:

To create the pipeline, first setup the name of the task and the cadence (you can change it later).

Next set up the Source.

The Source

Once the Database has been connected, you’ll see all the tables in the database. We’re after the order table so we should choose that. You can also choose to provide a Mongo query to pick up only changes in the last one day, but since this is our first sync, we’ll go with the entire table dump. Later on, once the first sync is complete, we’ll switch to using a query that pulls only the updates.

When selecting the batch size, be careful not to select a large amount — this locks up database resources till the query is completed. Batch size determines how many documents will be extracted per query, using cursors for the next batch. Depending upon your available TUs, it’s best to keep 500–1000 as the batch size.

Additionally, you can copy multiple tables as part of the pipeline, but since we’re only dealing with one table, we’ll select only one input table.

The Sink

When you setup the sink connection, the visual table mapping screen appears. Select the table to which the data is to be copied to (in our case, orders) and click on next.

Schema Mapping

This is the best part of Azure Data Pipeline. This is a visual mapping editor that allows you to map fields from source to sink without writing a single line of code. If you’ve kept the names of the fields the same, the mapping should be automatic. Don’t forget to ignore the _id field as that is not relevant for us.

The additional beauty of this tool is that it allows you to run a pre-copy script on the table — since we’re doing a data dump, we can simply set this up to emptying the table before running:

DELETE FROM orders

Once you’ve set that up, proceed to update the settings. Personally, before I move every pipeline into production, I keep the fault tolerance on “Abort activity on the first incompatibility”, giving me the view of the input data inconsistencies.

Deployment

1 — Takes you to your pipeline. 2 — takes you to monitoring where you can see your executed runs

To run the pipeline, select it and click on “Add Trigger”:

Triggering your pipeline

When you trigger your pipeline, you can monitor the status by clicking on “Monitor” in the left hand side bar (shown above).

A failed pipeline run

Debugging

Clicking on the first icon (Activity runs) in the Actions column, we can see in the input data from the source and the output that was sent to the sink. We can also see the reason why the task failed by clicking on the error.

Next Steps

  1. Create a summary table that can be directly pulled into a dashboard
    If you’re good at SQL (and trust me, it’s not too hard if you’re good at MySQL), you can create a stored procedure that updates another table with the summary data. And you can add it as a step in your pipeline.
  2. Trigger another pipeline
    If your current pipeline is a prerequisite for the running of another pipeline, extend a success line from the first task in the visual editor to trigger the next pipeline. Failures can be sent to your monitoring tool via a webhook call (found under General).
  3. Change the query from a dump to only updates
    The current data copy operation is a data dump into the database. As your database becomes larger, the data being transferred increases exponentially. Hence it is wiser to opt for a delta feed that UPSERTS (update or insert) data into the sink.
  4. Endless possibilities with Python/JAR through Databricks!
    Build complex computation through python (you should learn python) to setup advance analytics, custom triggers etc.

All in all, through Azure Data Factory, getting to your data is as simple as signing up to an account.

As is tradition, there are certain caveats/points to consider to all my articles:

Connecting to your database

Open your SQL Server (not the database) in the Azure Portal and scroll down till you see the Firewall and Virtual Networks section. Click on it to see the below screen.

If your connecting from your local machine, you can enter the value in the Client IP address (which is your public IP) and press enter. Press save for the settings to take place. Ensure you give a recognizable name to the firewall rule to take better control.

If you’re using a third party software like Alteryx to connect, find out the IP of that Alteryx server and add it to the Firewall.

Personally Identifiable Information (PII)

Creating a new user

Thank you for reading. I’ve been exploring data ETL in the last few months and found that Microsoft Azure provides multiple easy steps to take control of your data. Whether you’re a business user or a data analyst, data should be key in your pursuit of excellence. You cannot let the unavailability of data to hold you back — using tools like Data Flow can let you take control and visualize your data. As always, try different ways to get to your objective.

Feel free to ask me your questions here or on Linkedin. Thank you.

Other articles:

Simplifying Complexities for a Living | rkakodker.com

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store