Skip to main content

Firehose

One of the core primitives of the AT Protocol that underlies Bluesky is the firehose. It is an authenticated stream of events used to efficiently sync user updates (posts, likes, follows, handle changes, etc).

Many applications people will want to build on top of atproto and Bluesky will start with the firehose, from feed generators to labelers, to bots and search engines.

In the atproto ecosystem, there are many different endpoints that serve firehose APIs. Each PDS serves a stream of all of the activity on the repos it is responsible for. From there, relays aggregate the streams of any PDS who requests it into a single unified stream.

This makes the job of downstream consumers much easier, as you can get all the data from a single location. The main relay for Bluesky is bsky.network, which we use in the examples below.

To get started, you will open a WebSocket connection to your favorite firehose provider for the com.atproto.sync.subscribeRepos endpoint:

url := "https://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
con, _, err := websocket.DefaultDialer.Dial(uri, http.Header{})

From there, you need to read off each message as it comes in, and decode the CBOR event data. More details on this can be found here (TODO: link to lexicon page).

Most SDKs have a nice wrapper for this though, In this example we will just print each repo operation in each event we receive. These operations are things like "create post", "create like", "delete follow" and so on.

rsc := &events.RepoStreamCallbacks{
RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
fmt.Println("Event from ", evt.Repo)
for _, op := range evt.Ops {
fmt.Printf(" - %s record %s\n", op.Action, op.Path)
}
},
}

sched := sequential.NewScheduler("myfirehose", rsc.EventHandler)
events.HandleRepoStream(context.Background(), con, sched)

In this snippet we set up a sequential "scheduler", which handles all events sequentially in order. Other schedulers run event handling in parallel, or do limited concurrency based on who the event is for.

Once we have a scheduler, we call into HandleRepoStream which does the actual decoding of the data coming over the websocket and calls into the event handler we wrote.