流传输方案:数据实时交互

概述

在软件开发中,流传输(Streaming)是一个经典且高频的需求,特别是在工业监控(SCADA)、金融交易或实时大屏展示等领域。最典型的场景莫过于:成百上千个传感器正在实时采集压力、温度或转速等数值,前端界面需要以毫秒级的延迟将这些变化可视化地呈现给用户。在深入探讨具体的传输技术(如 WebSocket 或 gRPC)之前,我们需要先回到架构层面,理解数据是如何在不同形态的软件中流动的。

在早期的单体架构(Monolithic Architecture)中,问题相对简单。当数据采集模块、业务处理逻辑和前端界面渲染都运行在同一个进程(Process)甚至同一个线程中时,实现“实时传送”几乎是本能的。因为它们共享同一个内存空间,数据传输本质上就是变量的读取与更新。UI 框架(如 WinForms、WPF 的早期写法)只需监听内存数据的变化并刷新界面即可,不存在网络延迟与协议转换的开销。

随着系统规模的扩大,我们将应用拆分为客户端(Client)服务器(Server),即经典的 C/S 架构。随后,浏览器(Browser)作为一种特殊的瘦客户端兴起,演化出了 B/S 应用。在这种模式下,数据不再共享内存,而是通过网络进行交换。服务器负责部署统一的后端服务(数据处理、持久化),并对外暴露 API;客户端(无论是原生 App 还是网页)则通过网络协议(HTTP, TCP, WebSocket 等)与服务端通信。

虽然 B/S 本质上是 C/S 的一种特例,但在流传输的技术选型上,两者还是有着很大的差异:

  • 原生客户端(Native Client): 拥有操作系统级别的权限(只要用户允许)。它可以自由创建 TCP/UDP 连接,直接访问本地文件系统,甚至利用多线程处理密集任务。比如一个用 WPF 或 Qt 编写的桌面监控端,可以直接通过 TCP 协议连接 Kafka 集群或 Redis 实例,订阅 Topic 并实时消费数据。虽然从架构分层看,客户端直接连中间件略显耦合,但这在技术实现上是完全可行的。
  • 浏览器客户端(Web Client): 运行在沙箱(Sandbox)环境中。出于安全考虑,浏览器严禁网页直接访问本地文件系统(除受限的 File API 外)、调用系统底层 API 或建立任意的原始 TCP 连接。浏览器无法直接连接 Kafka 或 Redis 原生协议。它必须依赖一个中间层(如 WebSocket 网关或 HTTP 后端)来转发数据。

近年来,C/S 与 B/S 的界限已逐渐模糊。现代应用架构的主流形态可以概括为:“一套云端服务 + 多种形态终端”

  • Server 端(后端): 统一的基础设施。无论是 Kafka、Flink 实时计算,还是 AI 推理服务,最终都封装为标准的网络接口(HTTP/gRPC/WebSocket)。
  • Client 端(前端): “B”与“C”往往只是同一套业务逻辑的不同“外壳”。

最典型的例子莫过于 VS Code,它完美展示了 Web 技术与原生能力的融合:

  • 桌面版 VS Code(Electron 应用):
    • 架构: Chromium 浏览器内核 + Node.js 运行时。
    • UI: 使用 Web 技术(HTML/CSS/JS)构建。
    • 能力: 由于集成了 Node.js,它突破了浏览器沙箱,可以调用 Shell 指令、访问本地文件系统、开启子进程、建立任意 Socket 连接。
  • Web 版 VS Code (vscode.dev):
    • 架构: 纯粹运行在标准浏览器中(无 Node.js 后端支持)。
    • 能力: 受限于浏览器沙箱,它无法直接读写本地磁盘(需依赖浏览器 File System Access API 或远程文件系统),也无法直接连接本地的 TCP 服务。

总而言之上面的讨论告诉我们,在处理数据流(Streaming)时,必须根据运行环境(原生 vs Web)选择不同的传输策略,特别是在现代客户端-服务器模式下(Web 环境或跨平台混合应用)需要实现实时数据推送时。业界目前演化出了多种成熟的解决方案来应对这一挑战。在接下来的内容中,我们将分析并对比以下几种主流的流传输技术:

  1. WebSocket:全双工通信的标准解法。
  2. Server-Sent Events (SSE):轻量级的单向推送。
  3. gRPC (Streaming):基于 HTTP/2 的高性能 RPC 流。
  4. WebTransport:面向未来的低延迟传输协议。

WebSocket

我们用 Node 来做一个简易的服务后端,在一个空目录里:

1
2
npm init -y
npm install ws

新建 server.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// server.js
const WebSocket = require('ws');

// 创建 WebSocket 服务器,监听 8080 端口
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
console.log('客户端已连接');

// 定时模拟产生“传感器数据”
const timer = setInterval(() => {
const data = {
timestamp: Date.now(),
pressure: (Math.random() * 10 + 90).toFixed(2), // 90-100
temperature: (Math.random() * 20 + 40).toFixed(2), // 40-60
rpm: Math.floor(Math.random() * 2000 + 1000) // 1000-3000
};

// 以 JSON 字符串形式发送
ws.send(JSON.stringify(data));
}, 500); // 每 500ms 推一次

ws.on('close', () => {
console.log('客户端断开连接');
clearInterval(timer);
});

ws.on('error', (err) => {
console.error('WebSocket error:', err);
clearInterval(timer);
});
});

console.log('WebSocket 服务器已启动,ws://localhost:8080');

启动服务

1
node server.js

我们编写一个不用构建工具的轻量页面,用 <script> 引入 Vue 3,来实现 Web 前端。新建 index.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<title>WebSocket + Vue 实时展示 Demo</title>
<style>
body {
font-family: system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
padding: 24px;
}
.card {
border: 1px solid #ddd;
border-radius: 8px;
padding: 16px 20px;
max-width: 400px;
}
.label {
color: #666;
font-size: 14px;
}
.value {
font-size: 20px;
font-weight: bold;
margin-bottom: 8px;
}
.status {
margin-bottom: 12px;
font-size: 14px;
}
.status-ok {
color: #2e7d32;
}
.status-error {
color: #c62828;
}
</style>
</head>
<body>
<h1>实时传感器数据示例(WebSocket + Vue)</h1>

<div id="app">
<!-- 根组件只关心“我要展示一个传感器卡片” -->
<sensor-card
:data="data"
:is-connected="isConnected"
:formatted-time="formattedTime"
></sensor-card>
</div>

<!-- 引入 Vue 3(CDN) -->
<script src="https://unpkg.com/vue@3/dist/vue.global.prod.js"></script>
<script>
const { createApp, ref, computed, onMounted, onBeforeUnmount } = Vue;

/**
* 小工具函数:格式化时间
*/
function formatTimestamp(timestamp) {
if (!timestamp) return '';

const d = new Date(timestamp);
const pad = (n) => String(n).padStart(2, '0');

return (
d.getFullYear() + '-' +
pad(d.getMonth() + 1) + '-' +
pad(d.getDate()) + ' ' +
pad(d.getHours()) + ':' +
pad(d.getMinutes()) + ':' +
pad(d.getSeconds())
);
}

/**
* 组合函数:封装 WebSocket 连接 + 自动重连逻辑
* 用法类似于:const { data, isConnected, formattedTime } = useSensorWebSocket('ws://localhost:8080');
*/
function useSensorWebSocket(url) {
const data = ref({
pressure: null,
temperature: null,
rpm: null,
timestamp: null,
});

const isConnected = ref(false);
let socket = null;
let reconnectTimer = null;

const formattedTime = computed(() => formatTimestamp(data.value.timestamp));

const cleanup = () => {
if (socket) {
socket.close();
socket = null;
}
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
};

const connect = () => {
cleanup(); // 防止重复调用时产生旧连接

socket = new WebSocket(url);

socket.onopen = () => {
console.log('WebSocket 已连接');
isConnected.value = true;
};

socket.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
data.value = msg;
} catch (e) {
console.error('解析消息失败:', e);
}
};

socket.onclose = () => {
console.log('WebSocket 已关闭,准备重连');
isConnected.value = false;
reconnectTimer = setTimeout(() => connect(), 1000); // 1 秒后重连
};

socket.onerror = (err) => {
console.error('WebSocket 错误:', err);
// 出错时关闭,让 onclose 负责重连
socket.close();
};
};

onMounted(() => {
connect();
});

onBeforeUnmount(() => {
cleanup();
});

return {
data,
isConnected,
formattedTime,
};
}

/**
* 展示组件:只负责 UI
* props 进来什么就渲染什么,不关心 WebSocket 细节
*/
const SensorCard = {
name: 'SensorCard',
props: {
data: {
type: Object,
required: true,
},
isConnected: {
type: Boolean,
required: true,
},
formattedTime: {
type: String,
required: true,
},
},
computed: {
statusText() {
return this.isConnected ? '已连接' : '未连接 / 重连中…';
},
statusClass() {
return this.isConnected ? 'status-ok' : 'status-error';
},
pressureText() {
return this.data.pressure ?? '——';
},
temperatureText() {
return this.data.temperature ?? '——';
},
rpmText() {
return this.data.rpm ?? '——';
},
timeText() {
return this.formattedTime || '——';
},
},
template: `
<div class="card">
<div class="status" :class="statusClass">
连接状态:{{ statusText }}
</div>

<div class="label">最新采集时间</div>
<div class="value">{{ timeText }}</div>

<div class="label">压力 (bar)</div>
<div class="value">{{ pressureText }}</div>

<div class="label">温度 (°C)</div>
<div class="value">{{ temperatureText }}</div>

<div class="label">转速 (rpm)</div>
<div class="value">{{ rpmText }}</div>
</div>
`,
};

/**
* 根组件:负责拿数据,然后交给 UI 组件
*/
const App = {
name: 'App',
components: {
SensorCard,
},
setup() {
// 这里把 WebSocket 地址抽成变量,后面要改只改一处即可
const WS_URL = 'ws://localhost:8080';

const {
data,
isConnected,
formattedTime,
} = useSensorWebSocket(WS_URL);

return {
data,
isConnected,
formattedTime,
};
},
};

createApp(App).mount('#app');
</script>
</body>
</html>