Using Snowpark AsyncJob in Streamlit apps for better performance

Ryan Templeton
6 min readNov 11, 2024

Ever since I started working on the Snowflake platform I’ve been enthralled with its Multi Cluster Warehouse capabilities. Virtual warehouses are the workhorse of the Snowflake platform. These are the compute units that are responsible for processing user requests; and as anyone who works in the industry knows, when requests are greater than resources available in the warehouse, bottlenecks will occur as queries queue up to be processed. Multi Cluster Warehouses have the ability to take action when this happens and spawn additional warehouses to take on the growing workload. Keep in mind that this doesn’t add compute capacity to the existing warehouse, it creates new, discrete warehouses to offload queued queries and when the queue depletes, those warehouses turn off.

Now typically when you are writing applications that interact with a database, you run a query, wait for the response, then proceed with whatever processing. In cases where you need to run many queries, this kind of pattern of running queries serially can be painfully slow. This article will discuss using asynchronous query execution to speed up processing in your applications. This article discusses running queries in parallel in a Streamlit application but the techniques here are applicable whenever multiple, independent queries must be run.

Streamlit applications, particularly dashboards, are a perfect candidate for parallel query processing. Dashboards typically need to run multiple queries to gather whatever information is required by the app and anyone who’s ever seen this will notice the kind of incremental building of the page as one query after another is run serially. Using asynchronous processing lets the developer fire off a number of queries at once, then proceed with the processing when they are all finished. The benefit being that parallel processing should return all the results quicker than running then one at a time.

The Snowpark API added an AsyncJob feature in late 2022. Usually when you run a command in Snowpark, you use collect() or to_pandas() to kick off that command then wait for the result to come back. AsyncJobs offer an alternative of collect_nowait(). Rather than waiting to get back a result set, the command returns an AsyncJob object immediately which then allows you to fire off more collect_nowait() commands. Each AsyncJob object can be checked to see if it is done and then return the result when you are ready to use it. One simple pattern is to add each AsyncJob to a list and iterate over that list until all jobs are done, then proceed with your processing.

Try the following example in a Snowsight notebook:

import streamlit as st
from snowflake.snowpark.context import get_active_session

session = get_active_session()
queries = {}

credits_used_sql = f"select round(sum(credits_used),0) as total_credits from snowflake.account_usage.metering_history where start_time between '2024-10-01' and '2024-10-31'"
queries['credits_used_sql'] = session.sql(credits_used_sql).collect_nowait()

num_jobs_sql = f"select count(*) as number_of_jobs from snowflake.account_usage.query_history where start_time between '2024-10-01' and '2024-10-31'"
queries['num_jobs_sql'] = session.sql(num_jobs_sql).collect_nowait()

current_storage_sql = f"select (avg(storage_bytes + stage_bytes + failsafe_bytes) / power(1024, 4))::number(10,3) as billable_tb from snowflake.account_usage.storage_usage where USAGE_DATE = current_date() -1;"
queries['current_storage_sql'] = session.sql(current_storage_sql).collect_nowait()

count = 0

while True:
for query in queries.values():
if query.is_done():
count += 1;

if count < len(queries):
time.sleep(2)
count = 0
else:
st.write('all queries are done!')
break

for key, value in queries.items():
st.write(key)
st.write(value.to_df())

Another beneficial side effect of AsyncJob is that it is a pointer to the result set cached by Snowflake. When Snowflake executes a query, the result is stored by Snowflake for the next 24 hours. You can retrieve that result set explicitly via the query id using the result_scan command. When you run a query, Snowflake will check to see if the same query was previously run AND that the underlying tables the query ran against haven’t changed. If those conditions are met, Snowflake will simply return the cached result rather than rerunning your query, saving you both time and money. However, there are cases where result caching doesn’t work such as when running against SECURE_VIEWS, external tables, or queries that contain non deterministic commands like current_date() or current_timestamp(). The SNOWFLAKE.ACCOUNT_USAGE data is one such dataset that is a SECURE_VIEW so cannot benefit from this built in caching. Instead, we can implement a clever workaround here by keeping our reference to the AsyncJob we originally executed. We can compare our query to the AsyncJob.query() and if they match, there’s no need to rerun the query again, just use the AsyncJob that’s already there. When moving back and forth between Streamlit pages we see that once a page is initially loaded any subsequent return to that page renders nearly instantaneously because the query results are pulled from cache.

The following Python class is an easy to use implementation of the techniques mentioned above.

import streamlit as st
import logging
import time
from snowflake.snowpark import Session
from enum import Enum


logger = logging.getLogger("AsyncCache_logger")

class ReturnType(Enum):
ROW = 'row'
ROW_INTERATOR = 'row_iterator'
PANDAS = 'pandas'
PANDAS_BATCHES = 'pandas_batches'


class AsyncCache:

def __init__(self):
self._res = {}


def addquery(self, qname: str, query: str, overwrite: bool=False):
"""
Adds the query to the local dictionary if the qname is not found OR the hash of the cached query matches the hash of the incoming query

:qname: The key value to search for in the local dictionary
:query: The query to be added
:overwrite: Force the query to be added even if a cached version exists
"""


### conditionally add the query to the internal dict or don't (re)add it if it's already there to use caching
if overwrite:
self.__forcequery(qname, query)
elif qname in self._res:
if type(self._res[qname]) is str:
self.__forcequery(qname, query)
elif hash(self._res[qname].query) == hash(query) :
logger.info(f"query {hash(query)} was found and matched")
else:
self.__forcequery(qname, query)


def __forcequery(self, qname: str, query: str):
self._res[qname] = query
logger.info(f"adding/updating query {hash(query)} to cache")


def runasyncbatch(self, session: Session, forcererun: bool=False):
"""
Given a dictionary of queries (str) or Async objects, iterate and run asynchronously
any queries found there. Will only rerun queries where the VALUE of the dict is a string. Existing Async objects are not rerun

:session: The snowflake session
:forcererun: Forces previously run AsyncJobs to rerun
:return: The dict after all async queries have completed.
"""


count = 0
loop = 0

with st.spinner("Gathering information, please wait..."):
for key, value in self._res.items():
if type(value) is str:
self._res[key] = session.sql(value).collect_nowait()
if forcererun:
self._res[key] = session.sql(value.query).collect_nowait()


while True:
for query in self._res.values():
if query.is_done():
count += 1
elif loop > 5:
logger.info(f"query {query.query_id} is taking a long time to complete")
if count < len(self._res):
time.sleep(2)
loop += 1
count = 0
else:
break

def result(self, qname: str, response_type: ReturnType = ReturnType.PANDAS):
"""
Get the result from the AsyncJob in the form of the ReturnType. Defaults to Pandas dataframe if unspecified

:qname: The name of the AsyncJob
:response_type: Specify the format of result
:return: The result in the form of the corresponding ReturnType
"""

return self._res[qname].result(response_type.value)

In my Streamlit app, I instantiate an AsyncCache class for each page and add it to the st.session_state so it persists when I come back to the page. It’s entirely possible to use a single class across the entire app, but this AsyncCache per page technique helps avoid name collisions when adding queries.

if 'overview_cache' not in st.session_state:
st.session_state.overview_cache = AsyncCache()

I break the page generation into chunks by passing in small batches of queries, then rendering that chunk of the page with results before moving on to the next chunk. It’s entirely possible to add all your queries at once and then wait until they are all complete.

cache = st.session_state.overview_cache

### Add your queries to the cache
total_credits_used_sql = f"select warehouse_name,sum(credits_used) as total_credits_used from snowflake.account_usage.warehouse_metering_history where start_time between '{st.session_state.startingdate}' and '{st.session_state.endingdate}' group by 1 order by 2 desc limit 10 "
cache.addquery('total_credits_used_sql', total_credits_used_sql)

jobs_by_warehouse_sql = f"select warehouse_name,count(*) as number_of_jobs from snowflake.account_usage.query_history where start_time between '{st.session_state.startingdate}' and '{st.session_state.endingdate}' group by 1 order by 2 desc limit 10"
cache.addquery('jobs_by_warehouse_sql', jobs_by_warehouse_sql)

execution_by_qtype = f"select query_type, warehouse_size, avg(execution_time) / 1000 as average_execution_time from snowflake.account_usage.query_history where start_time between '{st.session_state.startingdate}' and '{st.session_state.endingdate}' group by 1, 2 order by 3 desc;"
cache.addquery('execution_by_qtype', execution_by_qtype)

### Process the queries in the cache
cache.runasyncbatch(session)

### Retrieve the results from the cache
res1 = cache.result('total_credits_used_sql')
res2 = cache.result('execution_by_qtype', ReturnType.PANDAS)
res3 = cache.result('jobs_by_warehouse_sql')

### Use the results in the page
c1, c2 = st.columns(2)
fig_credits_used=px.bar(res1, x='TOTAL_CREDITS_USED',y='WAREHOUSE_NAME',orientation='h',title="Credits Used by Warehouse")
fig_credits_used.update_traces(marker_color='green')
c1.plotly_chart(fig_credits_used, use_container_width=True)

fig_execution_by_qtype=px.bar(res2, x='AVERAGE_EXECUTION_TIME',y='QUERY_TYPE',orientation='h',title="Average Execution by Query Type")
c1.plotly_chart(fig_execution_by_qtype, use_container_width=True)

fig_jobs_by_warehouse=px.bar(res3, x='NUMBER_OF_JOBS',y='WAREHOUSE_NAME',orientation='h',title="# of Jobs by Warehouse")
fig_jobs_by_warehouse.update_traces(marker_color='purple')
c2.plotly_chart(fig_jobs_by_warehouse, use_container_width=True)

### Repeat the steps above with the next batch of queries

This article highlights the benefits and techniques of using Snowpark AsyncJobs coupled with Multi Cluster enabled warehouses to run your Snowflake commands in parallel. The result will be faster responsiveness in your applications vs running queries serially. The source code for the examples here plus more can be found in the source code here.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Ryan Templeton
Ryan Templeton

Written by Ryan Templeton

Solutions Engineer and big data nerd at Snowflake.com

No responses yet

Write a response