多路串口转TCP连接,进行数据双向透传
最近在写一个关于物联网的小工具,用linux工控小主机做一个串口服务器,将串口数据与指定的tcp服务器做数据双向透传,使用spring-integration和jssc的方案实现,把主要过程记录下来,以备查询
整个工程是基于jssc和spring-integration-ip在Spring boot上开发,便于后期集成管理界面,总体思路是用jssc接收发和转发串口数据,再用spirng integration将串口数据转发到tcp服务端,大体架构如下图所示:

工程中需要用的几个关键依赖包如下:
org.springframework.integration spring-integration-ip 5.3.1.RELEASE
commons-codec commons-codec 1.14
org.scream3r jssc 2.8.0
对于spring integration的配置还是比较容易的,这里有几个概念对于第一次接触spring integration的来说可有点晕,主要涉及到消息的入站、出站,管道,我个人理解就是入站从外部输入数据,出站是从本地输出数据,管道就是数据的通道,对于出站和入站也分出站管道和入站管道,具体配置如下:
package org.noka.serialservice.config;import org.apache.commons.codec.binary.Hex;
import org.noka.serialservice.Serializer.NByteArrayCrLfSerializer;
import org.noka.serialservice.service.SerialService;
import org.noka.serialservice.service.TcpGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnectionOpenEvent;
import org.springframework.integration.ip.tcp.connection.TcpNetClientConnectionFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageBuilder;
/** ----------------------------------------------------------------------------------* TCP数据转发服务配置* @author xiefangjian@163.com* @version 1.0.0*----------------------------------------------------------------------------------*/
@EnableIntegration
@Configuration
public class TcpConfig implements ApplicationListener {private static Logger logger = LoggerFactory.getLogger(TcpConfig.class);//TCP服务器地址,可以用域名或IP@Value("${socket.client.ip}")private String host;//TCP服务器端口@Value("${socket.client.port}")private int port;@Value("${socket.client.RetryInterval:60}")private long RetryInterval;//连接断开时,多长时间重新连接,以秒秒为单位,默认为1分钟//串口数据转发服务对象private final SerialService serialService;//TCP端数据转发网关private final TcpGateway tcpGateway;/*** 配置构造方法* @param serialService 串口数据转发服务对象* @param tcpGateway TCP端数据转发网关*/public TcpConfig(SerialService serialService, TcpGateway tcpGateway) {this.serialService = serialService;this.tcpGateway = tcpGateway;}/*** 创建TCP连接* @return tcp clinet连接工厂对象 AbstractClientConnectionFactory*/@Beanpublic AbstractClientConnectionFactory clientCF() {TcpNetClientConnectionFactory tc = new TcpNetClientConnectionFactory(this.host, this.port);//创建连接tc.setDeserializer(new NByteArrayCrLfSerializer());//设置自定义反序列化对象,对数据转发做分析处理tc.setSerializer(new NByteArrayCrLfSerializer());return tc;}/*** TCP服务下发数据接收管道配置,spring integration称之为入站管道配置* @param connectionFactory 连接工厂* @return TcpReceivingChannelAdapter 入站管道对象*/@Beanpublic TcpReceivingChannelAdapter tcpInAdapter(AbstractClientConnectionFactory connectionFactory) {TcpReceivingChannelAdapter inGate = new TcpReceivingChannelAdapter();//新建一个TCP入站管道inGate.setConnectionFactory(connectionFactory);//绑定到当前的连接工厂上inGate.setClientMode(true);//设置连接为客户端模式inGate.setOutputChannelName("clientIn");//入站管道名称,后面数据接收的地方需要该名称进行匹配inGate.setRetryInterval(RetryInterval*1000);//连接断开时,多长时间重新连接,以毫秒为单位return inGate;}/*** 服务器有数据下发* @param in 服务器有数据下发时,序列化后的对象,这里使用byte数组*/@ServiceActivator(inputChannel = "clientIn")public void upCase(Message in) {logger.info("[net service data]========================================");logger.info("[net dow data]"+new String(in.getPayload()));//字符串方式打印服务器下发的数据logger.info("[net dow hex]"+Hex.encodeHexString(in.getPayload(),false));//16进制方式打印服务器下发的数据serialService.send(in.getPayload());//将服务器下发的数据转发给串口}/*** 向服务器发送数据管道绑定* @param connectionFactory tcp连接工厂类* @return 消息管道对象*/@Bean@ServiceActivator(inputChannel = "clientOut")public MessageHandler tcpOutAdapter(AbstractClientConnectionFactory connectionFactory) {TcpSendingMessageHandler outGate = new TcpSendingMessageHandler();//创建一个新的出站管道outGate.setConnectionFactory(connectionFactory);//绑定到连接工厂outGate.setClientMode(true);//设置为客户端连接模式return outGate;}/*** 连接成功时调用的方法* @param event 响应事件*/@Overridepublic void onApplicationEvent(ApplicationEvent event) { //监听连接打开事件if (event instanceof TcpConnectionOpenEvent) {/**---------连接时如果需要发送认证类消息时可以写在这里--------------------**/byte[] snc="OK".getBytes();//这里在连接时,简单的向服务器发送一个OK字符串tcpGateway.send(MessageBuilder.withPayload(snc).build());//发送消息}}
}
TcpGateway只是一个接口,用于在其它地方调用该接口向TCP服务器发送消息,具体实现如下:
package org.noka.serialservice.service;import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/**----------------------------------------------------------------* TCP发送消息网关,其它需要发向TCP服务器发送消息时,调用该接口**--------------------------------------------------------------**/
@MessagingGateway(defaultRequestChannel = "clientOut")
@Component
public interface TcpGateway {void send(Message out);//发送消息方法
}
自定义序列列和反序列化对象,主要是处理byte类型的数据,在测试的时候开始使用默认的序列化对象,TCP服务端每次发送数据都需要在结束时加入"\r\n",否则收不到数据,在使用byte类型传输时,显示很不友好,而且这样对TCP服务端造成了特殊要求,不能通用,改造后的序列化对象如下:
package org.noka.serialservice.Serializer;import org.springframework.integration.ip.tcp.serializer.AbstractPooledBufferByteArraySerializer;
import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException;
import java.io.*;/**----------------------------------------------------------------------------------* 自定义序列化工具类* @author xiefangjian@163.com* @version 1.0.0**--------------------------------------------------------------------------------**/
public class NByteArrayCrLfSerializer extends AbstractPooledBufferByteArraySerializer{/*** 单例模式*/public static final NByteArrayCrLfSerializer INSTANCE = new NByteArrayCrLfSerializer();/*** 数据转换输出,当前服务器有数据下发时,读取成byte数组后调用输出方法,传输给出站管道* @param inputStream 输入流* @param buffer 缓存对象* @return 读取之后的bytes* @throws IOException*/@Overridepublic byte[] doDeserialize(InputStream inputStream, byte[] buffer) throws IOException {int n = this.fillToCrLf(inputStream, buffer);return this.copyToSizedArray(buffer, n);}/*** 数据校验及数据处理,原对象是逐个byte读取的,然后校验是否为结束符* 这里做了修改,直接读取到指定长度的bytes中,然后返回,* @param inputStream* @param buffer* @return* @throws IOException*/public int fillToCrLf(InputStream inputStream, byte[] buffer) throws IOException {int n=0; //读到到的数据长度,这里指byte数组长度if (logger.isDebugEnabled()) {logger.debug("Available to read: " + inputStream.available());}try {return inputStream.read(buffer);//读取数据,buffer默认为2048个byte} catch (SoftEndOfStreamException e) {throw e;} catch (IOException e) {publishEvent(e, buffer, n);throw e;} catch (RuntimeException e) {publishEvent(e, buffer, n);throw e;}}/*** 写入数据,去掉了原对象的结束符自动补齐,不需要特定结束符* @param bytes* @param outputStream* @throws IOException*/@Overridepublic void serialize(byte[] bytes, OutputStream outputStream) throws IOException {outputStream.write(bytes);//直接输出}
}
串口部分采用一个服务类加一个工具类完成,一个串口一个监听,有数据就进行TCP转发,同样TCP有数据下发就向串口转发,服务类实现如下:
package org.noka.serialservice.service;import jssc.SerialPort;
import jssc.SerialPortList;
import org.apache.commons.codec.binary.Hex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;/**---------------------------------------------------* 串口服务类,主要对串口进行操作* @author xiefangjian@163.com* @version 1.0.0**-------------------------------------------------**/
@Component
public class SerialService{private static Logger logger = LoggerFactory.getLogger(SerialService.class);private final TcpGateway tcpGateway;//TCP数据发送网关,自动注入,调用该接口发送数据到TCP服务端private List COM_LIST = new ArrayList<>();//缓存串口列表/*** 构造方法,注入TCP网关对象* @param tcpGateway TCP网关对象*/public SerialService(TcpGateway tcpGateway){this.tcpGateway = tcpGateway;initComs();//初始化串口}/*** 打开所有串口*/public void initComs(){String[] com_lists= SerialPortList.getPortNames();//获取串口列表for(String com:com_lists){logger.info("init com:"+com);SerialPort serialPort = new SerialPort(com);//设置串口COM_LIST.add(new SerialUtils(serialPort,tcpGateway));//缓存串口列表}}/*** 发送数据到所有打开的串口* @param str 需要发送的数据*/public void send(byte[] str){for(SerialUtils s:COM_LIST){s.sendData(str);//发送数据到串口logger.info("[send "+s.getName()+" data]"+new String(str));//字符串方式日志打印logger.info("[send "+s.getName()+" hex]"+Hex.encodeHexString(str,false));//16进制方式打印日志}}/*** 关闭串口*/@PreDestroypublic void destory(){for(SerialUtils s:COM_LIST){s.close();//关闭串口}}
}
串口工具类如下:
package org.noka.serialservice.service;import jssc.SerialPort;
import jssc.SerialPortEvent;
import jssc.SerialPortEventListener;
import jssc.SerialPortException;
import org.apache.commons.codec.binary.Hex;
import org.noka.serialservice.config.SerialParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.support.MessageBuilder;
import java.io.ByteArrayOutputStream;
import java.io.IOException;/**--------------------------------------------------------------------* 串口工具类* @author xiefangjian@163.com* @version 1.0.0**------------------------------------------------------------------**/
public class SerialUtils implements SerialPortEventListener {private static Logger logger = LoggerFactory.getLogger(SerialUtils.class);private SerialPort serialPort=null;//串口对象private TcpGateway tcpGateway; //网关对象/*** 构造方法,初始化串口对象和网关对象* @param serialPort 串口对象* @param tcpGateway 网关对象*/public SerialUtils(SerialPort serialPort, TcpGateway tcpGateway) {if(null!=serialPort){this.serialPort=serialPort; //串口对象openCom();//初始化串口}this.tcpGateway = tcpGateway;//网关对象}/*** 获取本串口名称* @return 串口名称*/public String getName(){if(null!=serialPort) {return serialPort.getPortName();}return null;}/*** 打开串口*/public void openCom(){try {serialPort.openPort();//打开串口/*串口波特率,默认为115200串口数据位,默认为8位串口停止位,默认为1位串口奇偶校验,默认无奇偶校验串口发送请求,默认为true串口发送允许,默认为trueserial.baud=115200serial.dataBits=8serial.stopBits=1serial.parity=0serial.rts=trueserial.dtr=true*/serialPort.setParams(SerialParams.baudRate, //串口波特率,默认为115200SerialParams.dataBits, //串口数据位,默认为8位SerialParams.stopBits, //串口停止位,默认为1位SerialParams.parity, //串口奇偶校验,默认无奇偶校验SerialParams.rts, //串口发送请求,默认为trueSerialParams.dtr //串口发送允许,默认为true);int a = serialPort.getFlowControlMode();serialPort.setFlowControlMode(SerialParams.flow);//无硬件流控logger.info("----------------------------------------------------------------------");logger.info("Serial Port:"+serialPort.getPortName()+" baudRate:"+SerialParams.baudRate);logger.info("Serial Port:"+serialPort.getPortName()+" dataBits:"+SerialParams.dataBits);logger.info("Serial Port:"+serialPort.getPortName()+" stopBits:"+SerialParams.stopBits);logger.info("Serial Port:"+serialPort.getPortName()+" parity:"+SerialParams.parity);logger.info("Serial Port:"+serialPort.getPortName()+" rts:"+SerialParams.rts);logger.info("Serial Port:"+serialPort.getPortName()+" dtr:"+SerialParams.dtr);logger.info("Serial Port:"+serialPort.getPortName()+" FlowControlMode:"+SerialParams.flow);logger.info("-----------------------------------------------------------------------");serialPort.addEventListener(this);//开启数据接收监听}catch (SerialPortException ux) {logger.error(ux.getMessage());}}/*** 发送数据* @param str 需要写入的数据*/public void sendData(byte[] str){if(null!=serialPort && serialPort.isOpened()){//如果串口已经打开try {serialPort.writeBytes(str);//向串口写入数据}catch (SerialPortException ex){logger.error(ex.getMessage());}}}/*** 发送数据* @param str 需要写入的数据*/public void sendData(String str){if(null!=serialPort && serialPort.isOpened()){//如果串口已经打开try {serialPort.writeString(str);//向串口写入数据}catch (SerialPortException ex){logger.error(ex.getMessage());}}}/*** 关闭串口*/public void close(){try {serialPort.closePort();}catch (Exception ex){logger.error(ex.getMessage());}}/*** 串口有数据上来,转发数据到服务器上* @param serialPortEvent 事件类型*/@Overridepublic void serialEvent(SerialPortEvent serialPortEvent) {if (serialPortEvent.isRXCHAR()) {//有数据到达事件发生ByteArrayOutputStream xs = new ByteArrayOutputStream();//数据缓存对象try {if(serialPort.getInputBufferBytesCount()>0){//数据池有数据Thread.sleep(10);//等待10毫秒,以便更多的数据进入数据池,以确保数据传输完成while(serialPort.getInputBufferBytesCount()>0) {//循环读取数据Thread.sleep(3);//等待10毫秒byte[] sx = serialPort.readBytes();//读取数据if (null != sx) {xs.write(sx);//放入数据缓存池} else {break;//为空时,说明读取完成,需要跳出循环}}}} catch (SerialPortException | InterruptedException | IOException e) {logger.error(e.getMessage());}byte[] xbs = xs.toByteArray();//获取所有缓存数据try {tcpGateway.send(MessageBuilder.withPayload(xbs).build());//发送串口数据到服务器logger.info("[com up data start for " + getName() + "]----------------------------------------");logger.info("[com up data]" + new String(xbs));//字符串方式打印日志logger.info("[com up hex]" + Hex.encodeHexString(xbs, false));//16进制方式打印日志logger.info("[com up data end for" + getName() + "]-----------------------------------------");}catch (Exception ex){logger.error("[net client is closed]");logger.info("[com up data start for " + getName() + "]----------------------------------------");logger.info("[com up data]" + new String(xbs));//字符串方式打印日志logger.info("[com up hex]" + Hex.encodeHexString(xbs, false));//16进制方式打印日志logger.info("[com up data end for" + getName() + "]-----------------------------------------");}try{xs.close();}catch (Exception es){}//关闭数据缓存池}}
}
SerialParams是自定义的一个参数配置类,主要用于在配置文件中配置串口的各项参数
package org.noka.serialservice.config;import jssc.SerialPort;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/**-------------------------------------------------* 串口参数配置* @author xiefangjian@163.com* @version 1.0.0**-----------------------------------------------**//**-------------------------------------------------** serial.baud=115200 串口波特率,默认为115200* serial.dataBits=8 串口数据位,默认为8位* serial.stopBits=1 串口停止位,默认为1位* serial.parity=0 串口奇偶校验,默认无奇偶校验* serial.rts=true 串口发送请求,默认为true* serial.dtr=true 串口发送允许,默认为true***---------------------------------------------**/
@Component
public class SerialParams {public static Integer baudRate= SerialPort.BAUDRATE_115200;//串口波特率public static Integer dataBits=SerialPort.DATABITS_8;//8位数据位public static Integer stopBits=SerialPort.STOPBITS_1;//1位停止位public static Integer parity=SerialPort.PARITY_NONE;//无奇偶校验public static boolean rts=true;//发送请求public static boolean dtr=true;//发送允许public static Integer flow=SerialPort.FLOWCONTROL_NONE;//无硬件流控制@Value("${serial.baud:115200}")public void setBaudRate(Integer baudRate) {this.baudRate = baudRate;}@Value("${serial.dataBits:8}")public void setDataBits(Integer dataBits) {this.dataBits = dataBits;}@Value("${serial.stopBits:1}")public void setStopBits(Integer stopBits) {this.stopBits = stopBits;}@Value("${serial.parity:0}")public void setParity(Integer parity) {this.parity = parity;}@Value("${serial.rts:true}")public void setRts(boolean rts) { this.rts = rts; }@Value("${serial.dtr:true}")public void setDtr(boolean dtr) {this.dtr = dtr;}@Value("${serial.parity:0}")public void setFlow(Integer flow){this.flow=flow;}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
