はじめに
「Notion*1にしかない情報を使ってダッシュボードを構築してほしい」みたいな需要が出てきた。その需要にこたえるため、Notionからデータを取り込んでSnowflakeに取り込む仕組みを作ってみたら社内で案外評判が良かったので、やり方を記しておきたい。
この記事では取り込む部分までしか書かない。ダッシュボードとして表示させるためには、もう二手間くらい必要になるので注意。
どんな要望?
- Notionにデータベースがある
- そのデータベース内にダッシュボードで使えそうな情報があるから、表示できるようにしてほしい
Notionのデータベースというのは要するにページの集合体、つまりはレコードの集合なので、一つ一つレコードを取り出して処理していけばいけそうだ。アプリ上から見てみるとよくわからないが、APIで操作してみると「ページの集合」だというのがよくわかる。
ユースケース
- Notionは社内ドキュメントとして利用している
- 「プロダクト側では取れないが、Notionにはある」情報が欲しいときに使う
どうやる?
- Notion API (Python) を利用して、データベース(Notion用語)からデータを取得し、
pandas.DataFrame
形式に変換してからCSVに落とし込む。 - CSV形式のデータをSnowflakeの内部ステージにPUTし、
COPY INTO
文でテーブルに入れる。
注意点
- Notionのデータベース内に「プロダクト側のデータと紐づけるキー」がないと使い物にならない
- 人が更新するものなので、エラーよく発生する。なので、Slack通知をするなどして、エラーが発生した場合にわかるようにしておく必要がある。悲鳴ドリブンでやっても構わないと思う。
具体的なプログラム
from notion_client import Client import pandas as pd import snowflake.connector import os NOTION_BEAERER_TOKEN = os.environ["NOTION_BEAERER_TOKEN"] NOTION_DATABASE_ID = os.environ["NOTION_DATABASE_ID"] DATA_FILEPATH = "/tmp/data.csv" TABLE_NAME = "notion_master" def get_data_from_notion() -> None: notion_client = Client(auth=NOTION_BEAERER_TOKEN) result = notion_client.databases.query(**{"database_id": NOTION_DATABASE_ID})[ "results" ] content = [ { "item" : item["properties"]["名前"]["title"][0]["plain_text"], "created_time" : item["created_time"], } for item in result if len(item["properties"]["名前"]["title"]) >= 1 ] df = pd.DataFrame(content) df.to_csv("/tmp/data.csv", index=False) return def get_snowflake_connection() -> snowflake.connector.SnowflakeConnection: snowflake_connection = snowflake.connector.connect( user=os.environ["SNOWFLAKE_USER"], password=os.environ["SNOWFLAKE_PASSWORD"], account=os.environ["SNOWFLAKE_ACCOUNT"], warehouse=os.environ["SNOWFLAKE_WAREHOUSE"], role=os.environ["SNOWFLAKE_ROLE"], database=os.environ["SNOWFLAKE_DATABASE"], schema=os.environ["SNOWFLAKE_SCHEMA"], ) return snowflake_connection def upload_data_to_snowflake() -> None: """ SnowflakeにCSVをアップロードする """ snowflake_connection = get_snowflake_connection() cursor = snowflake_connection.cursor() cursor.execute(f"PUT file://{DATA_FILEPATH} @%{TABLE_NAME} OVERWRITE = TRUE") cursor.execute( f"copy into {TABLE_NAME} from @%{TABLE_NAME} file_format = (type = csv field_delimiter = ',' skip_header = 1) PURGE = TRUE" ) snowflake_connection.close() return def lambda_handler(event, context): get_data_from_notion() upload_data_to_snowflake() if __name__ == "__main__": lambda_handler(None, None)
*1:超素敵ドキュメント管理ツール