Site icon image usounds

v usoundsの日常や技術的なメモを残すブログです

Blueskyのカスタムフィードの仕組みを一から自分で考える #03

公式サンプルを動かす

カスタムフィードの公式サンプルではNginxが必要ということになっているので手が出しにくいのではと思いますが、単純に動作確認をするのであれば、node.jsとyarnとgitがインストールさえすれば大丈夫です。ただし、動作を理解するにはTypeScript(JavaScript)とデータベースの知識が必要です。

git clone https://github.com/bluesky-social/feed-generator.git
cd feed-generator

更新終ったら、Windowsはタスクマネージャーを起動した状態で

yarn start

を実行すると、PDS(Relay)との連合とカスタムフィードサーバーが起動します。起動直後からRelayから全世界の投稿がすべてコンソールを流れてくるため、びっくりしないようにします。びっくりしたらタスクマネージャでnodeのプロセスをキルします。

とりあえずログに大量に流れてくるとわかりにくいので、何とかします。前出の通り、Relay、すなわちBlueskyから投稿を受ける処理は「src/subscription.ts」が担います。このソースには

// This logs the text of every post off the firehose.
// Just for fun :)
// Delete before actually using

という記載があります。直訳すると「すべての投稿のテキストをログに流すよ。実運用するときには(ログに流すところを)消してね」とあります。

手を入れてさらに挙動を見る

「console.log(post.record.text)」をコメントアウトしたうえで、26行目の「// map alf-related posts to a db row」の下に「console.log(create)」の行を足しyarn startします。

// This logs the text of every post off the firehose.
// Just for fun :)
// Delete before actually using
//for (const post of ops.posts.creates) {
//  console.log(post.record.text)
//}

const postsToDelete = ops.posts.deletes.map((del) => del.uri)
const postsToCreate = ops.posts.creates
  .filter((create) => {
    // only alf-related posts
    console.log(create)
    return create.record.text.toLowerCase().includes('alf')
  })
  .map((create) => {
    // map alf-related posts to a db row
    return {
      uri: create.uri,
      cid: create.cid,
      replyParent: create.record?.reply?.parent.uri ?? null,
      replyRoot: create.record?.reply?.root.uri ?? null,
      indexedAt: new Date().toISOString(),
    }
  })

実行すると、今度はログがおとなしい状態になります。ここで、自身のアカウントで「テスト投稿 alf」と投稿するとログに「テスト投稿 alf」が表示されると思います。これで現在起動したサンプルソースは、alfという文字列を含む投稿があるとそれをログに出力するようになります。もともとのソースの仕様として、本サーバーが起動している間のalfが含まれる投稿は、すべてメモリに保存するようになります。

なお、この手順に沿っている場合、メモリに保存されているだけなので、時間経過またはプロセスを落とすと消えますが動作確認用なので気にしないようにします。また、起動していないときの投稿は拾うことが出来ません。

ジェネレータ本体の挙動を見る

yarn startしたPCのブラウザで「http://localhost:3000/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://FEEDGEN_PUBLISHER_DIDに設定したDID/app.bsky.feed.generator/whats-alf」にアクセスすると、JSONの配列で1件だけ投稿が返される状態になります。(FEEDGEN_PUBLISHER_DIDに設定したDIDは個別変えてください)

「whats-alf」は「src/algos/whats-alf.ts」のソースを呼び出すようになっています。ソースを見ると、「let builder = ctx.db」からSQLを生成しています。ざっくりいうと、postテーブルの中身をすべて(.selectAll()なのでWHERE句なし全件)投稿時間の降順で表示しています。

  let builder = ctx.db
    .selectFrom('post')
    .selectAll()
    .orderBy('indexedAt', 'desc')
    .orderBy('cid', 'desc')
    .limit(params.limit)

SQLで表すと「select * from post order by indexedAt desc, cid desc limit 30」です。indexedAtは投稿時間とほぼイコールですが、Relayから本サーバーに投稿が連携された本サーバーのシステム時間となります。ですので、このサンプルソースは「データベースには「alf」が含まれる投稿だけあらかじめ絞って登録している。「whats-alf」にアクセスすると、あらかじめ絞って登録されている投稿を全件表示している」ということになります。

今までの挙動から何ができるかを考える

ここまで理解すると、SkyFeedのように「正規表現を実データを見ながら徐々に変えることができる」という挙動と、サンプルソースの実装である「保存する際に条件を満たした投稿だけを保存しておく」ということに大きな乖離があります。SkyFeedと同じことをしたければ、全世界の投稿をデータベースに保存しておき、データベースの問い合わせ時に正規表現で表示するものを絞り込む、このような挙動をすればよいということになります。#FF14を含む投稿を全件取ってくるであれば、「select * from post where text like '%#FF14%' order by indexedAt desc, cid desc limit 30」のようなSQLに組み替えるようにするということですね。

ただし、データベースにBlueskyの投稿を全件保存しておくことはストレージ容量的には耐えられなくはないものの、80万件/1日程度あります。仮に80万件保存したとしても、そこに正規表現を用いた検索を全件に対して行うことはパフォーマンス的には現実的ではないことに頭を抱えると思います。SkyFeedの7日間制限は、7日間の投稿しか保存していないわけではなく全期間保存はしているが、正規表現の対象にできる範囲を絞っているということですね。SkyFeedのよい仕組みを把握できたところで独自サーバーを立てる意義を見つけることに苦労することになります。