Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 19 additions & 16 deletions crates/cli/src/subcommands/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures::{Sink, SinkExt, TryStream, TryStreamExt};
use http::header;
use reqwest::Url;
use serde_json::Value;
use spacetimedb_client_api_messages::websocket::{self as ws, JsonFormat};
use spacetimedb_client_api_messages::websocket::v1 as ws_v1;
use spacetimedb_data_structures::map::HashMap;
use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9;
use spacetimedb_lib::de::serde::{DeserializeWrapper, SeedWrapper};
Expand Down Expand Up @@ -71,16 +71,16 @@ pub fn cli() -> clap::Command {
.arg(common_args::server().help("The nickname, host name or URL of the server hosting the database"))
}

fn parse_msg_json(msg: &WsMessage) -> Option<ws::ServerMessage<JsonFormat>> {
fn parse_msg_json(msg: &WsMessage) -> Option<ws_v1::ServerMessage<ws_v1::JsonFormat>> {
let WsMessage::Text(msg) = msg else { return None };
serde_json::from_str::<DeserializeWrapper<ws::ServerMessage<JsonFormat>>>(msg)
serde_json::from_str::<DeserializeWrapper<ws_v1::ServerMessage<ws_v1::JsonFormat>>>(msg)
.inspect_err(|e| eprintln!("couldn't parse message from server: {e}"))
.map(|wrapper| wrapper.0)
.ok()
}

fn reformat_update<'a>(
msg: &'a ws::DatabaseUpdate<JsonFormat>,
msg: &'a ws_v1::DatabaseUpdate<ws_v1::JsonFormat>,
schema: &RawModuleDefV9,
) -> anyhow::Result<HashMap<&'a str, SubscriptionTable>> {
msg.tables
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
let mut req = url.into_client_request()?;
req.headers_mut().insert(
header::SEC_WEBSOCKET_PROTOCOL,
http::HeaderValue::from_static(ws::TEXT_PROTOCOL),
http::HeaderValue::from_static(ws_v1::TEXT_PROTOCOL),
);
// Add the authorization header, if any.
if let Some(auth_header) = api.con.auth_header.to_header() {
Expand Down Expand Up @@ -241,8 +241,8 @@ async fn subscribe<S>(ws: &mut S, query_strings: Box<[Box<str>]>) -> Result<(),
where
S: Sink<WsMessage, Error = WsError> + Unpin,
{
let msg = serde_json::to_string(&SerializeWrapper::new(ws::ClientMessage::<()>::Subscribe(
ws::Subscribe {
let msg = serde_json::to_string(&SerializeWrapper::new(ws_v1::ClientMessage::<()>::Subscribe(
ws_v1::Subscribe {
query_strings,
request_id: 0,
},
Expand All @@ -262,22 +262,22 @@ where
while let Some(msg) = ws.try_next().await.map_err(|source| Error::Websocket { source })? {
let Some(msg) = parse_msg_json(&msg) else { continue };
match msg {
ws::ServerMessage::InitialSubscription(sub) => {
ws_v1::ServerMessage::InitialSubscription(sub) => {
if let Some(module_def) = module_def {
let output = format_output_json(&sub.database_update, module_def)?;
tokio::io::stdout().write_all(output.as_bytes()).await?
}
break;
}
ws::ServerMessage::TransactionUpdate(ws::TransactionUpdate { status, .. }) => {
ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate { status, .. }) => {
return Err(match status {
ws::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg },
ws_v1::UpdateStatus::Failed(msg) => Error::TransactionFailure { reason: msg },
_ => Error::Protocol {
details: RECV_TX_UPDATE,
},
})
}
ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { .. }) => {
ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { .. }) => {
return Err(Error::Protocol {
details: RECV_TX_UPDATE,
})
Expand Down Expand Up @@ -310,14 +310,14 @@ where

let Some(msg) = parse_msg_json(&msg) else { continue };
match msg {
ws::ServerMessage::InitialSubscription(_) => {
ws_v1::ServerMessage::InitialSubscription(_) => {
return Err(Error::Protocol {
details: "received a second initial subscription update",
})
}
ws::ServerMessage::TransactionUpdateLight(ws::TransactionUpdateLight { update, .. })
| ws::ServerMessage::TransactionUpdate(ws::TransactionUpdate {
status: ws::UpdateStatus::Committed(update),
ws_v1::ServerMessage::TransactionUpdateLight(ws_v1::TransactionUpdateLight { update, .. })
| ws_v1::ServerMessage::TransactionUpdate(ws_v1::TransactionUpdate {
status: ws_v1::UpdateStatus::Committed(update),
..
}) => {
let output = format_output_json(&update, module_def)?;
Expand All @@ -329,7 +329,10 @@ where
}
}

fn format_output_json(msg: &ws::DatabaseUpdate<JsonFormat>, schema: &RawModuleDefV9) -> Result<String, Error> {
fn format_output_json(
msg: &ws_v1::DatabaseUpdate<ws_v1::JsonFormat>,
schema: &RawModuleDefV9,
) -> Result<String, Error> {
let formatted = reformat_update(msg, schema).map_err(|source| Error::Reformat { source })?;
let output = serde_json::to_string(&formatted)? + "\n";

Expand Down
Loading
Loading