设计图如下:
图片
构建消息代理服务器
消息代理服务器允许客户端为主题生成事件并订阅它们。它使用Warp作为HTTP和WebSocket服务器,使用Tokio作为异步运行时。
使用以下命令创建一个Rust项目:
cargo new real-ime-message
在Cargo.toml文件中加入以下依赖项:
[dependencies]
futures-util = "0.3.30"
tokio = {version = "1.35.1", features = ["full"]}
tokio-tungstenite = "0.21.0"
url = "2.5.0"
warp = "0.3.6"
在src/main.rs文件中定义一个Broker结构体:
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};
use futures_util::{SinkExt, StreamExt};
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
};
use warp::{filters::ws::Message, Filter};
type Topic = String;
type Event = String;
type WsSender = UnboundedSender;
struct Broker {
events: Arc>>>,
subscribers: Arc>>>,
}
- events:存储每个主题的事件。
- subscribers:跟踪每个主题的订阅者。
创建一个新的Broker实例:
impl Broker {
fn new() -> Self {
Broker {
events: Arc::new(RwLock::new(HashMap::new())),
subscribers: Arc::new(RwLock::new(HashMap::new())),
}
}
}
定义发布事件的方法produce:
impl Broker {
......
async fn produce(&self, topic: Topic, event: Event) {
let mut events = self.events.write().await;
events
.entry(topic.clone())
.or_default()
.push_back(event.clone());
// 异步通知所有订阅者
let subscribers_list;
{
let subscribers = self.subscribers.read().await;
subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();
}
for ws_sender in subscribers_list {
// 将事件发送到WebSocket客户端
let _ = ws_sender.send(warp::ws::Message::text(event.clone()));
}
}
}
这个方法主要是将事件添加到相应的主题,然后将新事件通知所有订阅者。
定义subscribe方法,来管理新的订阅:
impl Broker {
......
pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {
let (ws_sender, mut ws_receiver) = socket.split();
let (tx, mut rx) = mpsc::unbounded_channel::();
{
let mut subs = self.subscribers.write().await;
subs.entry(topic).or_default().push(tx);
}
tokio::task::spawn(async move {
while let Some(result) = ws_receiver.next().await {
match result {
Ok(message) => {
// 处理有效的消息
if message.is_text() {
println!(
"Received message from client: {}",
message.to_str().unwrap()
);
}
}
Err(e) => {
// 处理错误
eprintln!("WebSocket error: {:?}", e);
break;
}
}
}
println!("WebSocket connection closed");
});
tokio::task::spawn(async move {
let mut sender = ws_sender;
while let Some(msg) = rx.recv().await {
let _ = sender.send(msg).await;
}
});
}
}
这个方法主要是将WebSocket拆分为发送方和接收方,将订阅者添加到订阅者列表中,处理传入的WebSocket消息。
main函数代码如下:
#[tokio::main]
async fn main() {
let broker = Arc::new(Broker::new());
let broker_clone1 = Arc::clone(&broker);
let broker_clone2 = Arc::clone(&broker);
let produce = warp::path!("produce" / String)
.and(warp::post())
.and(warp::body::json())
.and(warp::any().map(move || Arc::clone(&broker_clone1)))
.and_then(
move |topic: String, event: Event, broker_clone2: Arc| async move {
broker_clone2.produce(topic, event).await;
Ok::<_, warp::Rejection>(warp::reply())
},
);
let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(
move |topic: String, ws: warp::ws::Ws| {
let broker_clone3 = Arc::clone(&broker_clone2);
ws.on_upgrade(move |socket| async move {
broker_clone3.subscribe(topic.clone(), socket).await;
})
},
);
let routes = produce.or(subscribe);
println!("Broker server running at http://127.0.0.1:3030");
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
实现WebSocket客户端
WebSocket客户端将模拟一个订阅主题和接收消息的真实用户。
在src/bin目录下,创建一个ws_cli.rs文件。在文件中定义websocket_client函数,建立WebSocket连接并管理消息:
use futures_util::{sink::SinkExt, stream::StreamExt};
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::{sleep, Duration};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use url::Url;
async fn websocket_client(topic_url: &str) {
// 解析要连接WebSocket服务器的URL
let url = Url::parse(topic_url).expect("Invalid URL");
// 连接到WebSocket服务器
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("WebSocket client connected");
let (mut write, mut read) = ws_stream.split();
let message = Arc::new(RwLock::new(String::new()));
let message_1 = message.clone();
// 生成一个任务来处理传入的消息
tokio::spawn(async move {
let msg_lock = message_1.clone();
while let Some(message) = read.next().await {
match message {
Ok(msg) => {
let mut ms = msg_lock.write().await;
*ms = msg.to_text().unwrap().to_string();
println!("Received message: {}", msg.to_text().unwrap());
}
Err(e) => {
eprintln!("Error receiving message: {:?}", e);
break;
}
}
}
});
// 发送消息
loop {
let msg_lock = message.clone();
let ms = msg_lock.read().await;
if let Err(e) = write.send(Message::Text(ms.to_string())).await {
eprintln!("Error sending message: {:?}", e);
break;
}
sleep(Duration::from_secs(5)).await;
}
}
main函数代码如下:
#[tokio::main]
async fn main() {
websocket_client("ws://127.0.0.1:3030/subscribe/newtopic").await;
}
测试
执行如下命令运行消息代理服务器:
cargo run --bin real-ime-message
执行结果:
Broker server running at http://127.0.0.1:3030
然后打开一个新的命令行,执行如下命令运行WebSocket客户端:
cargo run --bin ws_cli
执行结果:
WebSocket client connected
向http://127.0.0.1:3030/produce/newtopic接口发送post请求,如图:
图片
客户端接收到消息:
WebSocket client connected
Received message: This is a new event
总结
我们已经探索了在Rust中创建一个简单的消息代理,并使用WebSocket客户端对其进行测试。这个例子突出了Rust在构建高效、并发的网络应用程序方面的能力。