使用Node.js + RSocket + WebSocket + Spring Boot实现实时前后端通讯 – vinsguru

微信扫一扫,分享到朋友圈

使用Node.js + RSocket + WebSocket + Spring Boot实现实时前后端通讯 – vinsguru

在本教程中,我想向您展示如何使用RSocket WebSocketSpring Boot创建实时应用程序。

我们的应用程序有2个端点。

  • 数字流
    • 每当服务器收到数字“ N”时,它将以1秒的延迟发出从1到N的数字。因此,对于每个请求,服务器都可以通过单个WebSocket连接发送多个响应。
  • 数字频道
    • 这是双向流。也就是说,当用户输入数字时,该数字将被发送到后端,该后端将计算给定数字的平方,然后进行响应。

注意:  此示例可能看起来非常简单。在这里,我们正在学习客户端与服务器之间的实时通信及其工作方式。如果我们能够实现这一目标,则可以将其用于YouTube / Netflix等应用,以获取实时视频/电影推荐。概念是一样的!

后端实现

满足我们要求的rsocket服务器端实现如下所示!Spring Boot在这里完成了所有繁重的工作。

  • 控制器
@Controller
<b>public</b> <b>class</b> RSocketController {
@MessageMapping(<font>"number.stream"</font><font>)
<b>public</b> Flux<Integer> responseStream(Integer number) {
<b>return</b> Flux.range(1, number)
.delayElements(Duration.ofSeconds(1));
}
@MessageMapping(</font><font>"number.channel"</font><font>)
<b>public</b> Flux<Long> biDirectionalStream(Flux<Long> numberFlux) {
<b>return</b> numberFlux
.map(n -> n * n)
.onErrorReturn(-1L);
}
}
</font>
  • application.properties

端口和映射路径可以是任何东西。默认情况下,传输是TCP。我们需要将传输作为WebSocket添加,以使其明确作为WebSocket服务器。

spring.rsocket.server.port=6565
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket

前端实现

需要做一些客户端通信。我们不能仅通过使用普通JavaScript来轻松实现这一目标。我们必须引入一些Node.js模块。

  • 通过发出以下命令来创建项目
npm init -y
  • 目结构如下所示。因此,如下所示添加文件。
  • 添加依赖项,如下所示。我们只需要此依赖项rsocket-websocket-client。为了开发目的,我们几乎不需要其他依赖项。
{
<font>"name"</font><font>: </font><font>"rsocket-websocker-client"</font><font>,
</font><font>"version"</font><font>: </font><font>"1.0.0"</font><font>,
</font><font>"description"</font><font>: </font><font>""</font><font>,
</font><font>"main"</font><font>: </font><font>"index.js"</font><font>,
</font><font>"scripts"</font><font>: {
</font><font>"test"</font><font>: </font><font>"echo \&#34Error: no test specified\" && exit 1"</font><font>,
</font><font>"build"</font><font>: </font><font>"webpack --mode=development"</font><font>,
</font><font>"serve"</font><font>: </font><font>"webpack-dev-server --open"</font><font>
},
</font><font>"author"</font><font>: </font><font>""</font><font>,
</font><font>"license"</font><font>: </font><font>"ISC"</font><font>,
</font><font>"devDependencies"</font><font>: {
</font><font>"clean-webpack-plugin"</font><font>: </font><font>"^3.0.0"</font><font>,
</font><font>"html-webpack-plugin"</font><font>: </font><font>"^4.3.0"</font><font>,
</font><font>"webpack"</font><font>: </font><font>"^4.44.1"</font><font>,
</font><font>"webpack-cli"</font><font>: </font><font>"^3.3.12"</font><font>,
</font><font>"webpack-dev-server"</font><font>: </font><font>"^3.11.0"</font><font>
},
</font><font>"dependencies"</font><font>: {
</font><font>"rsocket-websocket-client"</font><font>: </font><font>"0.0.19"</font><font>
}
}
</font>
  • webpack.config.js
<b>const</b> path = require('path');
<b>const</b> HtmlWebpackPlugin = require('html-webpack-plugin');
module.exports = {
mode: 'development',
devServer: {
port: 8081,
contentBase: './dist',
},
entry: './src/index.js',
output: {
filename: 'bundle.js',
path: path.resolve(__dirname, 'dist'),
publicPath: '/'
},
plugins: [
<b>new</b> HtmlWebpackPlugin({
inlineSource: '.(js|css)$',
template: __dirname + `/src/index.html`,
filename: __dirname + `/dist/index.html`,
inject: 'head',
})
]
}
  • index.html
<body>
<div <b>class</b>=<font>"container mt-3"</font><font>>
<h1>RSocket-WebSocket Demo</h1>
<div <b>class</b>=</font><font>"row"</font><font>>
<div <b>class</b>=</font><font>"col"</font><font>>
<p <b>class</b>=</font><font>"font-weight-light mt-3"</font><font>>Enter number:</p>
<input type=</font><font>"text"</font><font> <b>class</b>=</font><font>"form-control"</font><font>  id=</font><font>"n"</font><font>>
</div>
</div>
<div <b>class</b>=</font><font>"row"</font><font>>
<ul <b>class</b>=</font><font>"list-group mt-5 pl-2"</font><font>  id=</font><font>"result"</font><font>></ul>
</div>
</div>
</body>
</font>
  •  index.js
  1. 首先,我们导入特定的依赖项。
  2. 然后我们创建rsocket客户
  3. 我们有一个响应和错误处理程序
  4. 我们向后端发出请求,并通过订阅来处理响应
  5. 建立websocket连接后,我们将注册侦听器。
<b>import</b> { RSocketClient, JsonSerializer, IdentitySerializer } from 'rsocket-core';
<b>import</b> RSocketWebSocketClient from 'rsocket-websocket-client';
<font><i>// backend ws endpoint</i></font><font>
<b>const</b> wsURL = 'ws:</font><font><i>//localhost:6565/rsocket';</i></font><font>
</font><font><i>// rsocket client</i></font><font>
<b>const</b> client = <b>new</b> RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
keepAlive: 60000,
lifetime: 180000,
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: <b>new</b> RSocketWebSocketClient({
url: wsURL
})
});
</font><font><i>// error handler</i></font><font>
<b>const</b> errorHanlder = (e) => console.log(e);
</font><font><i>// response handler</i></font><font>
<b>const</b> responseHanlder = (payload) => {
<b>const</b> li = document.createElement('li');
li.innerText = payload.data;
li.classList.add('list-group-item', 'small')
document.getElementById('result').appendChild(li);
}
</font><font><i>// request to rsocket-websocket and response handling</i></font><font>
<b>const</b> numberRequester = (socket, value) => {
socket.requestStream({
data: value,
metadata: String.fromCharCode('number.stream'.length) + 'number.stream'
}).subscribe({
onError: errorHanlder,
onNext: responseHanlder,
onSubscribe: subscription => {
subscription.request(100); </font><font><i>// set it to some max value</i></font><font>
}
})
}
</font><font><i>// once the backend connection is established, register the event listeners</i></font><font>
client.connect().then(socket => {
document.getElementById('n').addEventListener('change', ({srcElement}) => {
numberRequester(socket, parseInt(srcElement.value));
})
}, errorHanlder);
</font>

基于WebSocket的RSocket流

我们来看看实现这种双向流所需的更改。

  • 我们介绍了FlowableProcessor,它将同时充当发布者和订阅者。前端将发布将由后端应用程序订阅的值。
  • 这里的想法是–每当我输入数字时,键入事件就会发布值。所以我立即得到给定数字的平方。
<font><i>// reactive stream processor</i></font><font>
<b>const</b> processor = <b>new</b> FlowableProcessor(sub => {});
<b>const</b> numberRequester = (socket, processor) => {
socket.requestChannel(processor.map(i => {
<b>return</b> {
data: i,
metadata: String.fromCharCode('number.channel'.length) + 'number.channel'
}
})).subscribe({
onError: errorHanlder,
onNext: responseHanlder,
onSubscribe: subscription => {
subscription.request(100); </font><font><i>// set it to some max value</i></font><font>
}
})
}
client.connect().then(sock => {
numberRequester(sock, processor);
document.getElementById('n').addEventListener('keyup', ({srcElement}) => {
<b>if</b>(srcElement.value.length > 0){
processor.onNext(parseInt(srcElement.value))
}
})
}, errorHanlder);
</font>

结论

我们能够成功建立RSocket WebSocket客户端-服务器通信,并以此开发一个简单的应用程序。我们可以将其用于实时视频推荐,游戏应用程序等。

此演示的源代码 在此处

Steam上线Playtest功能 可免费申请参与游戏试玩

上一篇

《侏罗纪世界3》杀青!2022年6月10日正式上映

下一篇

你也可能喜欢

使用Node.js + RSocket + WebSocket + Spring Boot实现实时前后端通讯 – vinsguru

长按储存图像,分享给朋友