現実モデリング

データとかエンジニアリングとか健エミュとか

【データ分析基盤】Notionをデータソースとして使う

はじめに

「Notion*1にしかない情報を使ってダッシュボードを構築してほしい」みたいな需要が出てきた。その需要にこたえるため、Notionからデータを取り込んでSnowflakeに取り込む仕組みを作ってみたら社内で案外評判が良かったので、やり方を記しておきたい。

www.notion.so

この記事では取り込む部分までしか書かない。ダッシュボードとして表示させるためには、もう二手間くらい必要になるので注意。

どんな要望?

  • Notionにデータベースがある
  • そのデータベース内にダッシュボードで使えそうな情報があるから、表示できるようにしてほしい
    「データベース」のイメージ

Notionのデータベースというのは要するにページの集合体、つまりはレコードの集合なので、一つ一つレコードを取り出して処理していけばいけそうだ。アプリ上から見てみるとよくわからないが、APIで操作してみると「ページの集合」だというのがよくわかる。

www.notion.so

ユースケース

  • Notionは社内ドキュメントとして利用している
  • 「プロダクト側では取れないが、Notionにはある」情報が欲しいときに使う

どうやる?

  1. Notion API (Python) を利用して、データベース(Notion用語)からデータを取得し、pandas.DataFrame 形式に変換してからCSVに落とし込む。
  2. CSV形式のデータをSnowflakeの内部ステージにPUTし、COPY INTO 文でテーブルに入れる。

注意点

  • Notionのデータベース内に「プロダクト側のデータと紐づけるキー」がないと使い物にならない
  • 人が更新するものなので、エラーよく発生する。なので、Slack通知をするなどして、エラーが発生した場合にわかるようにしておく必要がある。悲鳴ドリブンでやっても構わないと思う。
    • Q: 悲鳴ドリブンってなに?
    • A: ダッシュボード構築の文脈で話します。まずダッシュボードの更新を止めてみて、利用者が何か言ってくるのを待ちます。何か言ってきたら、その人が利用者なので詳しい事情をヒアリングしましょう。逆に誰も何も言ってこなれば、そのダッシュボードの管理は止めても大丈夫です。消してしまいましょう。これが悲鳴ドリブンダッシュボード運用です。

具体的なプログラム

from notion_client import Client
import pandas as pd
import snowflake.connector
import os

NOTION_BEAERER_TOKEN = os.environ["NOTION_BEAERER_TOKEN"]
NOTION_DATABASE_ID = os.environ["NOTION_DATABASE_ID"]
DATA_FILEPATH = "/tmp/data.csv"
TABLE_NAME = "notion_master"


def get_data_from_notion() -> None:
    notion_client = Client(auth=NOTION_BEAERER_TOKEN)
    result = notion_client.databases.query(**{"database_id": NOTION_DATABASE_ID})[
        "results"
    ]
    content = [
        {
        "item" : item["properties"]["名前"]["title"][0]["plain_text"],
        "created_time" : item["created_time"],
        }
        for item in result
        if len(item["properties"]["名前"]["title"]) >= 1
    ]
    df = pd.DataFrame(content)
    df.to_csv("/tmp/data.csv", index=False)
    return


def get_snowflake_connection() -> snowflake.connector.SnowflakeConnection:
    snowflake_connection = snowflake.connector.connect(
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
        role=os.environ["SNOWFLAKE_ROLE"],
        database=os.environ["SNOWFLAKE_DATABASE"],
        schema=os.environ["SNOWFLAKE_SCHEMA"],
    )
    return snowflake_connection


def upload_data_to_snowflake() -> None:
    """
    SnowflakeにCSVをアップロードする
    """
    snowflake_connection = get_snowflake_connection()
    cursor = snowflake_connection.cursor()
    cursor.execute(f"PUT file://{DATA_FILEPATH} @%{TABLE_NAME} OVERWRITE = TRUE")
    cursor.execute(
        f"copy into {TABLE_NAME} from @%{TABLE_NAME} file_format = (type = csv field_delimiter = ',' skip_header = 1) PURGE = TRUE"
    )

    snowflake_connection.close()
    return


def lambda_handler(event, context):
    get_data_from_notion()
    upload_data_to_snowflake()


if __name__ == "__main__":
    lambda_handler(None, None)

*1:超素敵ドキュメント管理ツール