Tyler Tries Real-Time Data

Real-Time Data
Data Apps
Data Engineering
My experience building a real-time data pipeline to visualize Coinbase order book depth, highlighting the seamless integration of Redpanda, Materialize, dbt, and Streamlit.
Author

Tyler Hillery

Published

October 1, 2023


Introducing “Tyler Tries”

This blog kicks off a new series I am calling “Tyler Tries” where I write about the topics I am learning about. We are going to start the series off with my experience building a real-time data pipeline to visualize Coinbase order book depth.

TL;DR

I built a real-time dashboard that visualizes Coinbase order book depth powered by Redpanda + Materialize + dbt + Streamlit you can view the code here. The data comes from the free Coinbase WebSocket feed using the level2_batch channel.

Introduction

I posted the above video on Twitter/X and several people requested a blog post about how it all works. It may come as a bit of a surprise but it wasn’t all too difficult. This is coming from someone without any prior experience dealing with real-time data and who has never used Redpanda or Materialize before. Funny enough, the part I struggled with most was pandas dataframe styling and creating an unstacked area chart (it turns out most python plotting libraries assume your area chart will be stacked).

Historically, real-time data was hard to manage. We’ve come a long way since then. If you look at the various components of the modern streaming stack the biggest thing slowing people down from building real-time data pipelines are the sources. Besides CDC feeds, very rarely are there event-driven APIs that allow for the consumption of the source data in real time.

This is why I got so excited when I came across this guide from bytewax: Real-Time Financial Exchange Order Book. It’s how I discovered the free Coinbase WebSocket feed which displays their level 2 order book data. Using this data I thought it would be fun to recreate a common visual to display exchange depth.

Coinbase Pro

Robinhood

Kraken

These visuals illustrate the cumulative size, indicating the quantity of a security someone is willing to buy or sell at a specific price level. This information is valuable as it can address questions such as, “At what price should I place my order to ensure it gets fully filled if I aim to buy or sell 100 quantities

The pipeline will work as follows:

  1. Create a KafkaProducer to send the data from the Coinbase WebSocket and add it to a Redpanda topic.
  2. Ingest the Redpanda topic into Materialize using dbt to manage the transformations.
  3. Connect Streamlit to Materialize with psycopg2 to visualize the data.

Ingestion

The first step in this pipeline involves creating a KafkaProducer that will take the data from the Coinbase WebSocket and add it to a Redpanda topic.

To start ingesting data from Coinbase we submit an initial request to the WebSocket channel like so:


import json
from websocket import create_connection
ws = create_connection("wss://ws-feed.exchange.coinbase.com")
ws.send(
  json.dumps({
    "type": "subscribe",
    "product_ids": [
        "ETH-USD",
        "BTC-USD"
    ],
    "channels": ["level2_batch"]
  })
)

If successful the first message back after running print(ws.recv()) will look like:

{
   "type":"subscriptions",
   "channels":[
      {
         "name":"level2_50",
         "product_ids":[
            "BTC-USD",
            "ETH-USD"
         ]
      }
   ]
}

Following the subscriptions message, the next message received will be the snapshot which provides an overview of the level 2 order book.

[
   {
      "type":"snapshot",
      "product_id":"BTC-USD",
      "bids":[
         ["10101.10", "0.45054140"]
      ],
      "asks":[
         ["10102.55","0.57753524"]
      ]
   },
   {
      "type":"snapshot",
      "product_id":"ETH-USD",
      "bids":[
         ["10101.10", "0.45054140"]
      ],
      "asks":[
         ["10102.55","0.57753524"]
      ]
   }
]

The bids and asks provide a list of lists where the first element is the price and the second element is the quantity.

After the snapshot messages, the last message is l2update which provides a message if the size has changed at a given price level for the product_id.

Note

The size property is the updated size at the price level, not a delta. A size of “0” indicates the price level can be removed.

{
   "type":"l2update",
   "product_id":"BTC-USD",
   "time":"2019-08-14T20:42:27.265Z",
   "changes":[
      ["buy","10101.80000000","0.162567"]
   ]
}

To process these messages within the KafkaProducer I’ve created an infinite while loop that will keep running until the program is stopped.

Code
while True:
  message = ws.recv()
  data = json.loads(message)
  if data["type"] == "snapshot":
      
      asks = [{
              "message_type": "snapshot",
              "message_key": data["product_id"] + "-sell-" + str(order[0]),
              "product_id": data["product_id"],
              "side": "sell",
              "price": order[0],
              "size": order[1],
              "message_created_at_utc": format_datetime(data["time"])
              } for order in data["asks"]
          ]
      
      bids = [{
              "message_type": "snapshot",
              "message_key": data["product_id"] + "-buy-" + str(order[0]),
              "product_id": data["product_id"],
              "side": "buy",
              "price": order[0],
              "size": order[1],
              "message_created_at_utc": format_datetime(data["time"])
              } for order in data["bids"]
          ]
      
      order_book = asks + bids

      for order in order_book:
          prod.send(
              topic="coinbase_level2_channel", 
              key=order["message_key"].encode("utf-8"),
              value=json.dumps(order,default=json_serializer,ensure_ascii=False).encode("utf-8")
          )
          print(order) #log
      prod.flush()

  elif data["type"] == "l2update":
      orders = [{
              "message_type": "l2update",
              "message_key": data["product_id"] + "-" + order[0] + "-" + str(order[1]),
              "product_id": data["product_id"],
              "side": order[0],
              "price": order[1],
              "size": order[2],
              "message_created_at_utc": format_datetime(data["time"])
              } for order in data["changes"]
          ]
      for order in orders:
          prod.send(
                  topic="coinbase_level2_channel", 
                  key=order["message_key"].encode("utf-8"),
                  value=json.dumps(order,default=json_serializer,ensure_ascii=False).encode("utf-8")
              )
          print(order) #log
      prod.flush()
  else:
      print(f"Unexpected value for 'type': {data['type']}")

Redpanda Console

The Redpanda Console provided a nice UI to peer into how the Redpanda instance is operating. If you haven’t used the Redpanda Console, it’s described as:

A Kafka web UI for developers.
Redpanda Console gives you a simple, interactive approach for gaining visibility into your topics, masking data, managing consumer groups, and exploring real-time data with time-travel debugging1.

The console helped me identify a bug in my original code. The unique key I was making for each record was a combination of the product_id, side, & price e.g. BTC-USD-buy-10000. The problem was for the snapshot messages I was using the terms bid and ask but for the l2update messages I was using buy and sell. This was important to fix because the data was being inserted into Materialize via the ENVELOPE UPSERT based on this key.

Materialize & dbt

The next step in our pipeline entails processing and storing the real-time data. Materialize is a streaming database that allows for just that. It has integrations with Kafka and since Redpanda is compatible with Kafka APIs, Materialize and Redpanda work together out of the box.

A key thing to understand about Materialize is how it handles materialized views. Their MVs are incrementally maintained so as the underlying data changes the MV automatically updates. This is why when you use Materialize and dbt together you’ll typically set up dbt to only run in a CI/CD pipeline which is kicked off when changes occur to the dbt models. If you want to learn more about how this works I highly recommend checking out this video on Materialize+dbt Streaming for the Modern Data Stack.

Most of the code for this project mimics what I found in the Materialize + Redpanda + dbt Hack Day.

First we define our source:

{{ config(materialized='source') }}
CREATE SOURCE {{ this }}
FROM KAFKA BROKER 'redpanda:9092' TOPIC 'coinbase_level2_channel'
  KEY FORMAT BYTES
  VALUE FORMAT BYTES
ENVELOPE UPSERT;

The ENVELOPE UPSERT treats all records as having a key and a value, and supports inserts, updates and deletes within Materialize2:

  • If the key does not match a preexisting record, it inserts the record’s key and value.
  • If the key matches a preexisting record and the value is non-null, Materialize updates the existing record with the new value.
  • If the key matches a preexisting record and the value is null, Materialize deletes the record.

The staging model does some light transformations to get the data into a more usable form.

stg_coinbase_level2_channel
{{ 
    config(
        materialized='materializedview'
    ) 
}}

with 
source as (
    select * from {{ source('coinbase', 'level2_channel') }}
),

converted as (
    select convert_from(data, 'utf8') as data from source
),

casted AS (
    select cast(data as jsonb) as data from converted
),

renamed as (
    select
        (data->>'message_type')::string                 as message_type,
        (data->>'message_key')::string                  as message_key,
        (data->>'product_id')::string                   as product_id,
        (data->>'side')::string                         as side,
        (data->>'price')::double                        as price,
        (data->>'size')::double                         as size,
        (data->>'message_created_at_utc')::timestamp    as message_created_at_utc
    from
        casted
),

final as (
    select
        message_type,
        message_key,
        product_id,
        side,
        price,
        size,
        price * size as notional_size,
        message_created_at_utc
    from
        renamed
    where
        size != 0
)

select * from final

Because Materialize does not handle window functions very well they provided some alternative approaches in their docs. I referenced the Materialize Top K by group that allowed me to return the top bid and ask record for each product id. The highest bid and lowest ask is referred to as the national best bid and offer (NBBO).

int_coinbase_nbbo
{{ 
    config(
        materialized='materializedview'
    ) 
}}

with
stg_coinbase_level2_channel as (
    select * from {{ ref('stg_coinbase_level2_channel') }}
),

nbb as (
    select
        distinct on(product_id) product_id,
        side,
        price,
        size,
        notional_size,
        message_created_at_utc
    from 
        stg_coinbase_level2_channel
    where
        side = 'buy'
    order by
        product_id, price desc
),

nbo as (
    select
        distinct on(product_id) product_id,
        side,
        price,
        size,
        notional_size,
        message_created_at_utc
    from 
        stg_coinbase_level2_channel
    where
        side = 'sell'
    order by
        product_id, price asc
),

unioned as (
    select * from nbb
    union all 
    select * from nbo
)

select * from unioned

The last model I created was the fct_coinbase_nbbo which pivots the NBB and NBO so there is only one record per product_id. This allows for the calculation of the NBBO spread and NBBO midpoint. The spread is the difference between the NBB and NBO. The NBBO midpoint is usually used as the reference price for the current market value of a security.

fct_coinbase_nbbo
{{ 
    config(
        materialized='materializedview'
    ) 
}}

with
int_coinbase_nbbo as (
    select * from {{ ref('int_coinbase_nbbo') }}
),

nbbo as (
    select
        product_id,
        max(case when side = 'buy' then price end)                      as nbb_price,
        max(case when side = 'buy' then size end)                       as nbb_size,
        max(case when side = 'buy' then notional_size end)              as nbb_notional_size,
        max(case when side = 'buy' then message_created_at_utc end)     as nbb_last_updated_at_utc,
        max(case when side = 'sell' then price end)                     as nbo_price,
        max(case when side = 'sell' then size end)                      as nbo_size,
        max(case when side = 'sell' then notional_size end)             as nbo_notional_size,
        max(case when side = 'sell' then message_created_at_utc end)    as nbo_last_updated_at_utc
    from
        int_coinbase_nbbo
    group by
        product_id
),

final as (
    select
        product_id,
        nbb_price,
        nbb_size,
        nbb_notional_size,
        nbb_last_updated_at_utc,
        nbo_price,
        nbo_size,
        nbo_notional_size,
        nbo_last_updated_at_utc,
        (nbb_price + nbo_price) / 2 as nbbo_midpoint,
        nbo_price - nbb_price as nbbo_spread
    from
        nbbo
)

select * from final

TODO: Something I want to work on in the future is creating a model that provides the cumulative size. Normally a window function is what I would use to get the cumulative sum of something but window functions don’t work well with Materialize. My theory is that I can do a self-join on the product_id, side, and price where the price is >= the current price (or <= depending on the side) and then sum up the size.

Streamlit

Streamlit has been my go-to lately when I want to display a side project I’ve been working on. Because Materialize speaks the Postgres wire protocol I can leverage psycopg2 to connect to Streamlit.

import psycopg2
dsn = "user=materialize password=password host=materialized port=6875 dbname=materialize"
conn = psycopg2.connect(dsn)
cur = conn.cursor()

Currently, I am using a polling technique based on the refresh_interval that is set by the st.slider and defaults to 1 second. I use a package called streamlit_autorefresh that handles refreshing the Streamlit app.

import streamlit as st
from streamlit_autorefresh import st_autorefresh

with st.sidebar:
    refresh_interval = st.slider(
        "Auto-Refresh Interval (seconds)", 
        1, 
        15, 
        step=1
    )

st_autorefresh(interval=refresh_interval*1000)

TODO: I would like to switch this polling technique to a push model so the data is sent directly to Streamlit as it comes in. Shout-out to the Materialize team for whipping up a demo for me on how to do this!

Conclusion

Increasingly, individuals and organizations are discovering use cases for analytical data that have different standards than traditional OLAP data workflows, often venturing into what’s known as “Operational Analytics.” Coming from an ops background, I’ve seen firsthand how operational processes can get caught up in mundane data tasks. This is where data teams can step in, alleviating this burden, allowing focus to shift to higher-value problems.

The evolution of tooling and technology surrounding real-time data has significantly eased the prior complexity. In the past, the cost and effort required to establish the necessary infrastructure were deterrents. However, times have changed. I firmly believe real-time isn’t just the future; it’s the present.

Other TODOs

  • Utilize the Materialize SUBSCRIBE along with materializing dbt tests as MVs to create alerts on:
    • NBBO spread gets too wide
    • Price alerts
  • If the change of the size for a specific price level goes down we can use it as a proxy for executed volume and display those records as a table.

References