大家好,這是 .NET 開(kāi)源項(xiàng)目 StreamJsonRpc 介紹的最后一篇。上篇介紹了一些預(yù)備知識(shí),包括 JSON-RPC 協(xié)議介紹,StreamJsonRpc 是一個(gè)實(shí)現(xiàn)了 JSON-RPC 協(xié)議的庫(kù),它基于 Stream、WebSocket 和自定義的全雙工管道傳輸。中篇通過(guò)示例講解了 StreamJsonRpc 如何使用全雙工的 Stream 作為傳輸管道實(shí)現(xiàn) RPC 通訊。本篇(下篇)將繼續(xù)通過(guò)示例講解如何基于 WebSocket 傳輸管道實(shí)現(xiàn) RPC 通訊。
準(zhǔn)備工作為了示例的完整性,本文示例繼續(xù)在中篇創(chuàng)建的示例基礎(chǔ)上進(jìn)行。該示例的 GitHub 地址為:
github.com/liamwang/StreamJsonRpcSamples
我們繼續(xù)添加三個(gè)項(xiàng)目,一個(gè)是名為 WebSocketSample.Client 的 Console 應(yīng)用,一個(gè)是名為 WebSocketSample.Server 的 ASP.NET Core 應(yīng)用,還有一個(gè)名為 Contract 的契約類(lèi)庫(kù)(和 gRPC 類(lèi)似)。
你可以直接復(fù)制并執(zhí)行下面的命令一鍵完成大部分準(zhǔn)備工作:
dotnet new console -n WebSocketSample.Client # 建新客戶端應(yīng)用
dotnet new webapi -n WebSocketSample.Server # 新建服務(wù)端應(yīng)用
dotnet new classlib -n Contract # 新建契約類(lèi)庫(kù)
dotnet sln add WebSocketSample.Client WebSocketSample.Server Contract # 將項(xiàng)目添加到解決方案
dotnet add WebSocketSample.Client package StreamJsonRpc # 為客戶端安裝 StreamJsonRpc 包
dotnet add WebSocketSample.Server package StreamJsonRpc # 為服務(wù)端安裝 StreamJsonRpc 包
dotnet add WebSocketSample.Client reference Contract # 添加客戶端引用 Common 引用
dotnet add WebSocketSample.Server reference Contract # 添加服務(wù)端引用 Common 引用
為了把重點(diǎn)放在實(shí)現(xiàn)上,這次我們依然以一個(gè)簡(jiǎn)單的功能作為示例。該示例實(shí)現(xiàn)客戶端向服務(wù)端發(fā)送一個(gè)問(wèn)候數(shù)據(jù),然后服務(wù)端響應(yīng)一個(gè)消息。為了更貼合實(shí)際的場(chǎng)景,這次使用強(qiáng)類(lèi)型進(jìn)行操作。為此,我們?cè)?Contract 項(xiàng)目中添加三個(gè)類(lèi)用來(lái)約定客戶端和服務(wù)端通訊的數(shù)據(jù)結(jié)構(gòu)和接口。
用于客戶端發(fā)送的數(shù)據(jù)的 HelloRequest 類(lèi):
public class HelloRequest
{
public string Name { get; set; }
}
用于服務(wù)端響應(yīng)的數(shù)據(jù)的 HelloResponse 類(lèi):
public class HelloResponse
{
public string Message { get; set; }
}
用于約定服務(wù)端和客戶端行為的 IGreeter 接口:
public interface IGreeter
{
Task<HelloResponse> SayHelloAsync(HelloRequest request);
}
接下來(lái)和中篇一樣,通過(guò)建立連接、發(fā)送請(qǐng)求、接收請(qǐng)求、斷開(kāi)連接這四個(gè)步驟演示和講解一個(gè)完整的基于 WebSocket 的 RPC 通訊示例。
建立連接上一篇講到要實(shí)現(xiàn) JSON-RPC 協(xié)議的通訊,要求傳輸管道必須是全雙工的。而 WebSocket 就是標(biāo)準(zhǔn)的全雙工通訊,所以自然可以用來(lái)實(shí)現(xiàn) JSON-RPC 協(xié)議的通訊。.NET 本身就有現(xiàn)成的 WebSocket 實(shí)現(xiàn),所以在建立連接階段和 StreamJsonRpc 沒(méi)有關(guān)系。我們只需要把 WebSocket 通訊管道架設(shè)好,然后再使用 StreamJsonRpc 來(lái)發(fā)送和接收請(qǐng)求即可。
客戶端使用 WebSocket 建立連接比較簡(jiǎn)單,使用?ClientWebSocket
?來(lái)實(shí)現(xiàn),代碼如下:
using (var webSocket = new ClientWebSocket())
{
Console.WriteLine("正在與服務(wù)端建立連接...");
var uri = new Uri("ws://localhost:5000/rpc/greeter");
await webSocket.ConnectAsync(uri, CancellationToken.None);
Console.WriteLine("已建立連接");
}
服務(wù)端建立 WebSocket 連接最簡(jiǎn)單的方法就是使用 ASP.NET Core,借助 Kestrel 和 ASP.NET Core 的中間件機(jī)制可以輕松搭建基于 WebSocket 的 RPC 服務(wù)。只要簡(jiǎn)單的封裝還可以實(shí)現(xiàn)同一套代碼同時(shí)提供 RPC 服務(wù)和 Web API 服務(wù)。
首先在服務(wù)端項(xiàng)目的 Startup.cs 類(lèi)的?Configure
?方法中引入 WebSocket 中間件:
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UseRouting();
app.UseWebSockets(); // 增加此行,引入 WebSocket 中間件
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
再新建一個(gè) Controller 并定義一個(gè) Action 用來(lái)路由映射 WebSocket 請(qǐng)求:
public class RpcController : ControllerBase
{
...
[Route("/rpc/greeter")]
public async Task<IActionResult> Greeter()
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
return new BadRequestResult();
}
var socket = await HttpContext.WebSockets.AcceptWebSocketAsync();
...
}
}
這里的 Greeter 提供的服務(wù)既能接收 HTTP 請(qǐng)求也能接收 WebSocket 請(qǐng)求。HttpContext
?中的?WebSockets
?屬性是一個(gè)?WebSocketManager
?對(duì)象,它可以用來(lái)判斷當(dāng)前請(qǐng)求是否為一個(gè) WebSocket 請(qǐng)求,也可以用來(lái)等待和接收 WebSocket 連接,即上面代碼中的?AcceptWebSocketAsync
?方法。另外客戶端的 WebSocket 的 Uri 路徑需要與 Router 指定的路徑對(duì)應(yīng)。
連接已經(jīng)建立,現(xiàn)在到了 StreamJsonRpc 發(fā)揮作用的時(shí)候了。
發(fā)送請(qǐng)求客戶端通過(guò) WebSocket 發(fā)送請(qǐng)求的方式和前一篇講的 Stream 方式是一樣的。還記得前一篇講到的 JsonRpc 類(lèi)的 Attach 靜態(tài)方法嗎?它告訴 StreamJsonRpc 如何傳輸數(shù)據(jù),并返回一個(gè)用于調(diào)用 RPC 的客戶端,它除了可以接收 Stream 參數(shù)外還有多個(gè)重載方法。比如:
public static T Attach<T>(Stream stream);
public static T Attach<T>(IJsonRpcMessageHandler handler);
第二個(gè)重載方法可以實(shí)現(xiàn)更靈活的 Attach 方式,你可以 Attach 一個(gè)交由 WebSocket 傳輸數(shù)據(jù)的管道,也可以 Attach 給一個(gè)自定義實(shí)現(xiàn)的 TCP 全雙工傳輸管道(此方式本文不講,但文末會(huì)直接給出示例)?,F(xiàn)在我們需要一個(gè)實(shí)現(xiàn)了?IJsonRpcMessageHandler
?接口的處理程序,StreamJsonRpc 已經(jīng)實(shí)現(xiàn)好了,它是?WebSocketMessageHandler
?類(lèi)。通過(guò) Attach 該實(shí)例,可以拿到一個(gè)用于調(diào)用 RPC 服務(wù)的對(duì)象。代碼示例如下:
Console.WriteLine("開(kāi)始向服務(wù)端發(fā)送消息...");
var messageHandler = new WebSocketMessageHandler(webSocket);
var greeterClient = JsonRpc.Attach<IGreeter>(messageHandler);
var request = new HelloRequest { Name = "精致碼農(nóng)" };
var response = await greeterClient.SayHelloAsync(request);
Console.WriteLine($"收到來(lái)自服務(wù)端的響應(yīng):{response.Message}");
你會(huì)發(fā)現(xiàn),定義客戶端和服務(wù)端契約的好處是可以實(shí)現(xiàn)強(qiáng)類(lèi)型編程。接下來(lái)看服務(wù)端如何接收并處理客戶端發(fā)送的消息。
接收請(qǐng)求和前一篇一樣,我們先定義一個(gè) GreeterServer 類(lèi)用來(lái)處理接收到的客戶端消息。
public class GreeterServer : IGreeter
{
private readonly ILogger<GreeterServer> _logger;
public GreeterServer(ILogger<GreeterServer> logger)
{
_logger = logger;
}
public Task<HelloResponse> SayHelloAsync(HelloRequest request)
{
_logger.LogInformation("收到并回復(fù)了客戶端消息");
return Task.FromResult(new HelloResponse
{
Message = $"您好, {request.Name}!"
});
}
}
同樣,WebSocket 服務(wù)端也需要使用 Attach 來(lái)告訴 StreamJsonRpc 數(shù)據(jù)如何通訊,而且使用的也是?WebSocketMessageHandler
?類(lèi),方法與客戶端類(lèi)似。在前一篇中,我們 Attach 一個(gè) Stream 調(diào)用的方法是:
public static JsonRpc Attach(Stream stream, object? target = null);
同理,我們推測(cè)應(yīng)該也有一個(gè)這樣的靜態(tài)重載方法:
public static JsonRpc Attach(IJsonRpcMessageHandler handler, object? target = null);
可惜,StreamJsonRpc 并沒(méi)有提供這個(gè)靜態(tài)方法。既然 Attach 方法返回的是一個(gè) JsonRpc 對(duì)象,那我們是否可以直接實(shí)例化該對(duì)象呢?查看該類(lèi)的定義,我們發(fā)現(xiàn)是可以的,而且有我們需要的構(gòu)造函數(shù):
public JsonRpc(IJsonRpcMessageHandler messageHandler, object? target);
接下來(lái)就簡(jiǎn)單了,一切和前一篇的 Stream 示例都差不多。在 RpcController 的 Greeter Action 中實(shí)例化一個(gè) JsonRpc,然后開(kāi)啟消息監(jiān)聽(tīng)。
public class RpcController : ControllerBase
{
private readonly ILogger<RpcController> _logger;
private readonly GreeterServer _greeterServer;
public RpcController(ILogger<RpcController> logger, GreeterServer greeterServer)
{
_logger = logger;
_greeterServer = greeterServer;
}
[Route("/rpc/greeter")]
public async Task<IActionResult> Greeter()
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
{
return new BadRequestResult();
}
_logger.LogInformation("等待客戶端連接...");
var socket = await HttpContext.WebSockets.AcceptWebSocketAsync();
_logger.LogInformation("已與客戶端建立連接");
var handler = new WebSocketMessageHandler(socket);
using (var jsonRpc = new JsonRpc(handler, _greeterServer))
{
_logger.LogInformation("開(kāi)始監(jiān)聽(tīng)客戶端消息...");
jsonRpc.StartListening();
await jsonRpc.Completion;
_logger.LogInformation("客戶端斷開(kāi)了連接");
}
return new EmptyResult();
}
}
看起來(lái)和我們平時(shí)寫(xiě) Web API 差不多,區(qū)別僅僅是對(duì)請(qǐng)求的處理方式。但需要注意的是,WebSocket 是長(zhǎng)連接,如果客戶端沒(méi)有事情可以處理了,最好主動(dòng)斷開(kāi)與服務(wù)端的連接。如果客戶客戶沒(méi)有斷開(kāi)連接,執(zhí)行的上下文就會(huì)停在?await jsonRpc.Completion
?處。
通常斷開(kāi)連接是由客戶端主動(dòng)發(fā)起的,所以服務(wù)端不需要做什么處理。服務(wù)端響應(yīng)完消息后,只需使用?jsonRpc.Completion
?等待客戶端斷開(kāi)連接即可,上一節(jié)的代碼示例中已經(jīng)包含了這部分代碼,就不再累述了。如果特殊情況下服務(wù)端需要斷開(kāi)連接,調(diào)用 JsonRpc 對(duì)象的 Dispose 方法即可。
不管是 Stream 還是 WebSocket,其客戶端對(duì)象都提供了 Close 或 Dispose 方法,連接會(huì)隨著對(duì)象的釋放自動(dòng)斷開(kāi)。但最好還是主動(dòng)調(diào)用 Close 方法斷開(kāi)連接,以確保服務(wù)端收到斷開(kāi)的請(qǐng)求。對(duì)于 ClientWebSocket,需要調(diào)用 CloseAsync 方法??蛻舳送暾纠a如下:
static async Task Main(string[] args)
{
using (var webSocket = new ClientWebSocket())
{
Console.WriteLine("正在與服務(wù)端建立連接...");
var uri = new Uri("ws://localhost:5000/rpc/greeter");
await webSocket.ConnectAsync(uri, CancellationToken.None);
Console.WriteLine("已建立連接");
Console.WriteLine("開(kāi)始向服務(wù)端發(fā)送消息...");
var messageHandler = new WebSocketMessageHandler(webSocket);
var greeterClient = JsonRpc.Attach<IGreeter>(messageHandler);
var request = new HelloRequest { Name = "精致碼農(nóng)" };
var response = await greeterClient.SayHelloAsync(request);
Console.WriteLine($"收到來(lái)自服務(wù)端的響應(yīng):{response.Message}");
Console.WriteLine("正在斷開(kāi)連接...");
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "斷開(kāi)連接", CancellationToken.None);
Console.WriteLine("已斷開(kāi)連接");
}
Console.ReadKey();
}
在實(shí)際項(xiàng)目中可能還需要因異常而斷開(kāi)連接的情況做處理,比如網(wǎng)絡(luò)不穩(wěn)定可能導(dǎo)致連接中斷,這種情況可能需要加入重試機(jī)制。
運(yùn)行示例由于服務(wù)端使用的是 ASP.NET Core 模板,VS 默認(rèn)使用 IIS Express 啟動(dòng),啟動(dòng)后會(huì)自動(dòng)打開(kāi)網(wǎng)頁(yè),這樣看不到 Console 的日志信息。所以需要把服務(wù)端項(xiàng)目 WebSocketSample.Server 的啟動(dòng)方式改成自啟動(dòng)。
另外,為了更方便地同時(shí)運(yùn)行客戶端和服務(wù)端應(yīng)用,可以把解決方案設(shè)置成多啟動(dòng)。右鍵解決方案,選擇“Properties”,把對(duì)應(yīng)的項(xiàng)目設(shè)置“Start”即可。
如果你用的是 VS Code,也是支持多啟動(dòng)調(diào)試的,具體方法你自行 Google。如果你用的是?dotnet run
?命令運(yùn)行項(xiàng)目可忽略以上設(shè)置。
項(xiàng)目運(yùn)行后的截圖如下:
你也可以自定義實(shí)現(xiàn) TCP 全雙工通訊管道,但比較復(fù)雜而且也很少這么做,所以就略過(guò)不講了。但我在 GitHub 的示例代碼也放了一個(gè)自定義全雙工管道實(shí)現(xiàn)的示例,感興趣的話你可以克隆下來(lái)研究一下。
該示例運(yùn)行截圖:
本篇總結(jié)本文通過(guò)示例演示了如何使用 StreamJsonRpc 基于 WebSocket 數(shù)據(jù)傳輸實(shí)現(xiàn) JSON-RPC 協(xié)議的 RPC 通訊。其中客戶端和服務(wù)端有共同的契約部分,實(shí)現(xiàn)了強(qiáng)類(lèi)型編程。通過(guò)示例我們也清楚了 StreamJsonRpc 這個(gè)庫(kù)為了實(shí)現(xiàn) RPC 通訊做了哪些工作,其實(shí)它就是在現(xiàn)有傳輸管道(Stream、WebSocket 和 自定義 TCP 連接)上進(jìn)行數(shù)據(jù)通訊。正如前一篇所說(shuō),由于 StreamJsonRpc 把大部分我們不必要知道的細(xì)節(jié)做了封裝,所以在示例中感覺(jué)不到 JSON-RPC 協(xié)議帶來(lái)的統(tǒng)一規(guī)范,也沒(méi)看到具體的 JSON 格式的數(shù)據(jù)。其實(shí)只要遵循了 JSON-RPC 協(xié)議實(shí)現(xiàn)的客戶端或服務(wù)端,不管是用什么語(yǔ)言實(shí)現(xiàn),都是可以互相通訊的。
希望這三篇關(guān)于 StreamJsonRpc 的介紹能讓你有所收獲,如果你在工作中計(jì)劃使用 StreamJsonRpc,這幾篇文章包括示例代碼應(yīng)該有值得參考的地方。
本文摘自 :https://blog.51cto.com/u