Step by step guide to continuous data loading in Snowflake with Snowpipe and Azure Storage

Snowpipe enables loading data from files as soon as they’re available. This means you can load data from files in micro-batches, making it available to users within minutes, rather than manually executing COPY statements on a schedule to load larger batches.

Event driven, automated data ingestion into a scalable cloud data platform is one of the many things we at Data Engineers love and so do our clients.  


After having set this up for a few Snowflake customers now, I thought it might be useful to create a step by step guide with lots of helpful screenshots to assist you setting up this awesome functionality.

While the Snowflake documentation is very good, I like to have a few pretty pictures to follow along with, I hope you do too.

https://docs.snowflake.com/en/user-guide/data-load-snowpipe-auto-azure.html

The following are the steps to create Snowpipe.

Create a storage Account

If you want to create a new Resource Group in Azure you can do that by going to the Azure Portal.

In the newly created Resource Group (anil-test-resource-group), click new and select storage account as per the below image.

Under the resource group, click new and select storage account as per the below image.

 

Storage Container

Go inside the storage account that you have recently created click on the container and give the appropriate name.

Storage Queues

When Events (a new file has arrived in this case) are raised they are written to a Storage Queue.  The messages raised by the Event Grid we create shortly contain pointers to the Blob Storage Container.

In the Storage Account where you created the Blob Container also create the Storage Queue.

Events

Whenever a file is loaded into the container, the event creates a message in the queue.   From the Snowflake documentation. 
“Your client application calls a public REST endpoint with the name of a pipe object and a list of data filenames. If new data files matching the list are discovered in the stage referenced by the pipe object, they are queued for loading. Snowflake-provided computes resources load data from the queue into a Snowflake table based on parameters defined in the pipe.”

In the same Storage Account, you created the Blob Container and the Storage Queue, create an event as follows

Click on the add event subscription and select the value as:

Event Schema--> Event Grid Schema

Filter to Event Types--> Blob created

Endpoint Type-->Storage Queues

On Endpoint select the storage account and select the storage queue that you have recently created.

 

 

 

Notification Integration

The following snowflake script creates a Notification Integration.

create notification integration 
  enabled = true
  type = queue
  notification_provider = azure_storage_queue
  azure_storage_queue_primary_uri = '<<storage queue uri>>'
  azure_tenant_id = '<<tenant id'; 

azure_storage_queue_primary_uri  is the URL from the storage queue that you have recently created.

Go to the storage account and then click on the queues. I have made queue named as "test-snowflake-queue"  and URL is https://anillteststorageaccount.queue.core.windows.net/test-snowflake-queue

For azure_tenant_id, go to Azure portal --> click on the menu --> click the Azure Active Directory and copy the directory ID and run the command.

The run the command

Snowflake permissions on Azure

Once, Notification Event is created, run the following command and you will see the AZURE_CONCENT_URL. Copy the URL and paste on the browser and click on the accept button.

  desc notification integration SNOWPIPE_TEST_EVENT;

Once you have added the authentication the snowflake can be seen on Enterprise applications.

Go to Azure Active Directory--> Enterprise applications

Add Role Assignment to Azure storage account

Go to storage account--> Access Control(IAM) --> Add a role assignment.

Select the right values

Role--> Storage Queue Data Contributor
Assign access to -->  Azure AD user, group, or service principal

Then type snowflake in the select field. you can see the snowflake account and hit save button.

 

Create Stage

To create a stage on azure blob storage the following command is needed:

create or replace stage my_azure_stage
  url='azure://xxxxxxx.blob.core.windows.net/mycontainer/load/files'
  credentials=(azure_sas_token='?sv=2016-05-31&ss=b&srt=sco&sp=rwdl&se=2018-06-27T10:05:50Z&st=2017-06-27T02:05:50Z&spr=https,http&sig=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')

To get azure_sas_token, you can have 2 options either from the Azure portal or from Microsoft Azure Storage Explorer.

I am using Azure Storage Explorer. Right-click on blob container and click on "Get Shared Access Signature". Select expiry time and permissions. 

Create database command

create database SNOWPIPE_TEST

Run the following command on the snowflake to create the stage.

To view the stage file run following command:

LIST @"SNOWPIPE_TEST"."PUBLIC"."TEST_STAGE"

 

Create Table to load JSON Data

 CREATE OR REPLACE TABLE "SNOWPIPE_TEST"."PUBLIC"."CROPTRACKER_API_DATA"
(
  "API_DATA" VARIANT
);

 

To create Snowpipe, run the following command

CREATE OR REPLACE pipe "SNOWPIPE_TEST"."PUBLIC"."TEST_SNOWPIPE"
  auto_ingest = true
  integration = 'SNOWPIPE_TEST_EVENT'
  as
  copy into "SNOWPIPE_TEST"."PUBLIC"."CROPTRACKER_API_DATA"
  from @"SNOWPIPE_TEST"."PUBLIC"."TEST_STAGE"
  file_format = (type = 'JSON');
 

 

Run the following command to check whether  the Snowpipe is working or not 

 SELECT * FROM  "SNOWPIPE_TEST"."PUBLIC"."CROPTRACKER_API_DATA"

 

 

If you log in as an ACCOUNT ADMIN, you can see the snowpipe usages and billing.

Share this Post

- Associated Blogs

2024 © Data Engineers Ltd - All Rights Reserved