Web端即时通讯技术之SSE

背景:

web项目中,前端下发设备控制指令,后端收到指令后,调用第三方接口控制设备。设备的状态返回后需要后端主动推送设备状态给前端。需要服务器推送。经过技术选型,采用sse实现,为了以后的扩展,预留了mpush的实现接口。

Web端即时通讯技术简介

即时通讯技术简单的说就是实现这样一种功能:服务器端可以即时地将数据的更新或变化反应到客户端,例如消息即时推送等功能都是通过这种技术实现的。
但是在Web中,由于浏览器的限制,实现即时通讯需要借助一些方法。这种限制出现的主要原因是,一般的Web通信都是浏览器先发送请求到服务器,服务器再进行响应完成数据的现实更新。
Web端即时通讯技术的四种实现:短轮询,长轮询(comet),长连接(SSE),WebSocket

长连接(SSE)

严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。
SSE 是一种在基于浏览器的 Web 应用程序中仅从服务器向客户端发送文本消息的技术。SSE基于 HTTP 协议中的持久连接, 具有由 W3C 标准化的网络协议和 EventSource 客户端接口,作为 HTML5 标准套件的一部分。

websocket

websocket简介

服务端SSE实现

由于我这个项目比较老,是sevlet项目,现在的参考意义不大,所以就不多赘述。简单描述一下。
使用 org.eclipse.jetty.servlets 实现的 EventSourceServlet,具体可以去git仓库搜,9.x已经整合到jetty包中。
具体过程是这样的:

  1. 前端项目在初始化的时候发送请求,建立连接。
  2. 服务端接收请求后将连接保存到线程安全集合中。
  3. 前端在建立连接回调函数中创建一个Oberseve对象,在OnMessage方法中给这个对象传值。在业务js中subscribe这个对象。
  4. 当需要推送消息时使用Emitter对象发送,其中包含事件ID,data两部分。

在我自己研究EventSourceServlet源码时,有几个不理解的问题,但是资料太少,也没法科学上网,就把这个代码贴下来,希望有大佬回复。问题就写成注释了


```java
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package org.eclipse.jetty.servlets;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;public abstract class EventSourceServlet extends HttpServlet {private static final Charset UTF_8 = Charset.forName("UTF-8");private static final byte[] EVENT_FIELD;private static final byte[] DATA_FIELD;private static final byte[] COMMENT_FIELD;private static final byte[] CRLF;private ScheduledExecutorService scheduler;private int heartBeatPeriod = 10;public EventSourceServlet() {}public void init() throws ServletException {//为什么需要心跳检测,浏览器已经实现断线重连了吗?String heartBeatPeriodParam = this.getServletConfig().getInitParameter("heartBeatPeriod");if (heartBeatPeriodParam != null) {this.heartBeatPeriod = Integer.parseInt(heartBeatPeriodParam);}this.scheduler = Executors.newSingleThreadScheduledExecutor();}public void destroy() {if (this.scheduler != null) {this.scheduler.shutdown();}}protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {Enumeration<String> acceptValues = request.getHeaders("Accept");String accept;do {if (!acceptValues.hasMoreElements()) {super.doGet(request, response);return;}accept = (String)acceptValues.nextElement();} while(!accept.equals("text/event-stream"));EventSource eventSource = this.newEventSource(request);if (eventSource == null) {response.sendError(503);} else {this.respond(request, response);Continuation continuation = ContinuationSupport.getContinuation(request);continuation.setTimeout(0L);continuation.suspend(response);EventSourceEmitter emitter = new EventSourceEmitter(eventSource, continuation);emitter.scheduleHeartBeat();this.open(eventSource, emitter);}}protected abstract EventSource newEventSource(HttpServletRequest var1);protected void respond(HttpServletRequest request, HttpServletResponse response) throws IOException {response.setStatus(200);response.setCharacterEncoding(UTF_8.name());response.setContentType("text/event-stream");//这个为什么要close呢,不是基于keepalive实现的吗?,看git说是不会断开连接。response.addHeader("Connection", "close");response.flushBuffer();}protected void open(EventSource eventSource, EventSource.Emitter emitter) throws IOException {eventSource.onOpen(emitter);}static {EVENT_FIELD = "event: ".getBytes(UTF_8);DATA_FIELD = "data: ".getBytes(UTF_8);COMMENT_FIELD = ": ".getBytes(UTF_8);CRLF = new byte[]{13, 10};}protected class EventSourceEmitter implements EventSource.Emitter, Runnable {private final EventSource eventSource;private final Continuation continuation;private final ServletOutputStream output;private Future<?> heartBeat;private boolean closed;public EventSourceEmitter(EventSource eventSource, Continuation continuation) throws IOException {this.eventSource = eventSource;this.continuation = continuation;this.output = continuation.getServletResponse().getOutputStream();}public void data(String data) throws IOException {synchronized(this) {this.output.write(EventSourceServlet.DATA_FIELD);this.output.write(data.getBytes(EventSourceServlet.UTF_8.name()));this.output.write(EventSourceServlet.CRLF);this.flush();}}public void comment(String comment) throws IOException {synchronized(this) {this.output.write(EventSourceServlet.COMMENT_FIELD);this.output.write(comment.getBytes(EventSourceServlet.UTF_8.name()));this.output.write(EventSourceServlet.CRLF);this.flush();}}public void run() {try {synchronized(this) {this.output.write(13);this.flush();this.output.write(10);this.flush();}this.scheduleHeartBeat();} catch (IOException var4) {this.close();this.eventSource.onClose();}}protected void flush() throws IOException {this.continuation.getServletResponse().flushBuffer();}public void close() {synchronized(this) {this.closed = true;this.heartBeat.cancel(false);}this.continuation.complete();}private void scheduleHeartBeat() {synchronized(this) {if (!this.closed) {this.heartBeat = EventSourceServlet.this.scheduler.schedule(this, (long)EventSourceServlet.this.heartBeatPeriod, TimeUnit.SECONDS);}}}}
}
## springboot实现
偷懒在网上找了一篇
[Web消息推送之SSE](https://blog.csdn.net/lemon_TT/article/details/126063422?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168715486416800186571803%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=168715486416800186571803&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-2-126063422-null-null.142%5Ev88%5Econtrol_2,239%5Ev2%5Einsert_chatgpt&utm_term=SSE%20web&spm=1018.2226.3001.4187)感慨一下,不得不说spring是真的方便,现在java程序员真是面向spring开发程序员。有了spring开发变得无比简单,离开spirng直接gg。突然想起以前看过一个笑话,说是程序员的核心竞争力就是写别人看不懂的代码,不写注释,公司离开你直接暴毙。现在各种规范,技术栈都大差不差,框架屏蔽了很多技术上的活,剩下的谁来干活都一样。技术门槛大大降低,前浪后浪一起卷。忘了以前在那本书上说过,技术的发展和就业永远是反向的。科技是第一生产力,但是科技发展和社会学的发展貌似并不成比例,科技越发展,社会问题越多。嗯。。。卷?卷的过同行怕是卷不过ai,再过几年chatgpt也许会把80%程序员干掉。何去何从呢?这个世界还是精英的,普通人活着就已经不容易了。。。一想到行业寒冬,薪资越来越低,由于信息差,后面一批又一批的学生等着入行,计算机的红利怕是要到头了,反正我也没吃上。蓝海变红海,又没有什么门槛,预测一波,行业薪资会慢慢变得和制造业这些差不多。好处可能会和这些一样稳定吧。


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部