文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

如何解决Java Socket通信技术收发线程互斥的问题

2023-06-17 15:53

关注

本篇内容介绍了“如何解决Java Socket通信技术收发线程互斥的问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Java Socket通信技术在很长的时间里都在使用,在不少的程序员眼中都有很多高的评价。那么下面我们就看看如何才能掌握这门复杂的编程语言,希望大家在今后的Java Socket通信技术使用中有所收获。

下面就是Java Socket通信技术在解决收发线程互斥的代码介绍。

  1. package com.bill99.svr;   

  2. import java.io.IOException;   

  3. import java.io.InputStream;   

  4. import java.io.OutputStream;   

  5. import java.net.InetSocketAddress;   

  6. import java.net.Socket;   

  7. import java.net.SocketException;   

  8. import java.net.SocketTimeoutException;   

  9. import java.text.SimpleDateFormat;   

  10. import java.util.Date;   

  11. import java.util.Properties;   

  12. import java.util.Timer;   

  13. import java.util.TimerTask;   

  14. import java.util.concurrent.ConcurrentHashMap;   

  15. import java.util.concurrent.TimeUnit;   

  16. import java.util.concurrent.locks.Condition;   

  17. import java.util.concurrent.locks.ReentrantLock;   

  18. import org.apache.log4j.Logger;   

  19.  public class SocketConnection {   

  20. private volatile Socket socket;   

  21. private int timeout = 1000*10; //超时时间,初始值10秒   

  22. private boolean isLaunchHeartcheck = false;//是否已启动心跳检测   

  23. private boolean isNetworkConnect = false; //网络是否已连接   

  24. private static String host = "";   

  25. private static int port;   

  26. static InputStream inStream = null;   

  27. static OutputStream outStream = null;   

  28. private static Logger log =Logger.getLogger
    (SocketConnection.class);   

  29. private static SocketConnection socketConnection = null;   

  30. private static java.util.Timer heartTimer=null;     

  31. //private final Map<String, Object> recMsgMap= Collections.
    synchronizedMap(new HashMap<String, Object>());   

  32. private final ConcurrentHashMap<String, Object> recMsgMap 
    = new ConcurrentHashMap<String, Object>();   

  33. private static Thread receiveThread = null;   

  34. private final ReentrantLock lock = new ReentrantLock();   

  35. private SocketConnection(){   

  36. Properties conf = new Properties();   

  37. try {   

  38. conf.load(SocketConnection.class.getResourceAsStream
    ("test.conf"));   

  39. this.timeout = Integer.valueOf(conf.getProperty("timeout"));   

  40. init(conf.getProperty("ip"),Integer.valueOf
    (conf.getProperty("port")));   

  41. } catch(IOException e) {   

  42. log.fatal("socket初始化异常!",e);   

  43. throw new RuntimeException("socket初始化异常,请检查配置参数");   

  44. }   

  45. }   

  46.    

  47. public static SocketConnection getInstance() {   

  48. if(socketConnection==null) {   

  49. synchronized(SocketConnection.class) {   

  50. if(socketConnection==null) {   

  51. socketConnection = new SocketConnection();   

  52. return socketConnection;   

  53. }   

  54. }   

  55. }   

  56. return socketConnection;   

  57. }   

  58. private void init(String host,int port) throws IOException {   

  59. InetSocketAddress addr = new InetSocketAddress(host,port);   

  60. socket = new Socket();   

  61. synchronized (this) {   

  62. log.info("【准备与"+addr+"建立连接】");   

  63. socket.connect(addr, timeout);   

  64. log.info("【与"+addr+"连接已建立】");   

  65. inStream = socket.getInputStream();   

  66. outStream = socket.getOutputStream();   

  67. socket.setTcpNoDelay(true);//数据不作缓冲,立即发送   

  68. socket.setSoLinger(true, 0);//socket关闭时,立即释放资源   

  69. socket.setKeepAlive(true);   

  70. socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输   

  71. isNetworkConnect=true;   

  72. receiveThread = new Thread(new ReceiveWorker());   

  73. receiveThread.start();   

  74. SocketConnection.host=host;   

  75. SocketConnection.port=port;   

  76. if(!isLaunchHeartcheck)   

  77. launchHeartcheck();   

  78. }   

  79. }   

  80.    

  81. private void launchHeartcheck() {   

  82. if(socket == null)   

  83. throw new IllegalStateException("socket is not 
    established!");   

  84. heartTimer = new Timer();   

  85. isLaunchHeartcheck = true;   

  86. heartTimer.schedule(new TimerTask() {   

  87. public void run() {   

  88. String msgStreamNo = StreamNoGenerator.getStreamNo("kq");   

  89. int mstType =9999;//999-心跳包请求   

  90. SimpleDateFormat dateformate = new SimpleDateFormat
    ("yyyyMMddHHmmss");   

  91. String msgDateTime = dateformate.format(new Date());   

  92. int msgLength =38;//消息头长度   

  93. String commandstr = "00" +msgLength + mstType + msgStreamNo;   

  94. log.info("心跳检测包 -> IVR "+commandstr);   

  95. int reconnCounter = 1;   

  96. while(true) {   

  97. String responseMsg =null;   

  98. try {   

  99. responseMsg = readReqMsg(commandstr);   

  100. } catch (IOException e) {   

  101. log.error("IO流异常",e);   

  102. reconnCounter ++;   

  103. }   

  104. if(responseMsg!=null) {   

  105. log.info("心跳响应包 <- IVR "+responseMsg);   

  106. reconnCounter = 1;   

  107. break;   

  108. } else {   

  109. reconnCounter ++;   

  110. }   

  111. if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,
    重新建立连接。连接未被建立时不释放锁   

  112. reConnectToCTCC(); break;   

  113. }   

  114. }   

  115. }   

  116. },1000 * 60*1,1000*60*2);   

  117. }   

  118.    

  119. private void reConnectToCTCC() {   

  120. new Thread(new Runnable(){   

  121. public void run(){   

  122. log.info("重新建立与"+host+":"+port+"的连接");   

  123. //清理工作,中断计时器,中断接收线程,恢复初始变量   

  124. heartTimer.cancel();   

  125. isLaunchHeartcheck=false;   

  126. isNetworkConnect = false;   

  127. receiveThread.interrupt();   

  128. try {   

  129. socket.close();   

  130. } catch (IOException e1) {log.error("重连时,关闭socket连
    接发生IO流异常",e1);}   

  131. //----------------   

  132. synchronized(this){   

  133. for(; ;){   

  134. try {   

  135. Thread.currentThread();   

  136. Thread.sleep(1000 * 1);   

  137. init(host,port);   

  138. this.notifyAll();   

  139. break ;   

  140. } catch (IOException e) {   

  141. log.error("重新建立连接未成功",e);   

  142. } catch (InterruptedException e){   

  143. log.error("重连线程中断",e);   

  144. }   

  145. }   

  146. }   

  147. }   

  148. }).start();   

  149. }   

  150.    

  151. public String readReqMsg(String requestMsg) throws IOException {   

  152. if(requestMsg ==null) {   

  153. return null;   

  154. }   

  155. if(!isNetworkConnect) {   

  156. synchronized(this){   

  157. try {   

  158. this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常   

  159. if(!isNetworkConnect) {   

  160. throw new IOException("网络连接中断!");   

  161. }   

  162. } catch (InterruptedException e) {   

  163. log.error("发送线程中断",e);   

  164. }   

  165. }   

  166. }   

  167. String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号   

  168. outStream = socket.getOutputStream();   

  169. outStream.write(requestMsg.getBytes());   

  170. outStream.flush();   

  171. Condition msglock = lock.newCondition(); //消息锁   

  172. //注册等待接收消息   

  173. recMsgMap.put(msgNo, msglock);   

  174. try {   

  175. lock.lock();   

  176. msglock.await(timeout,TimeUnit.MILLISECONDS);   

  177. } catch (InterruptedException e) {   

  178. log.error("发送线程中断",e);   

  179. } finally {   

  180. lock.unlock();   

  181. }   

  182. Object respMsg = recMsgMap.remove(msgNo); //响应信息   

  183. if(respMsg!=null &&(respMsg != msglock)) {   

  184. //已经接收到消息,注销等待,成功返回消息   

  185. return (String) respMsg;   

  186. } else {   

  187. log.error(msgNo+" 超时,未收到响应消息");   

  188. throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息");   

  189. }   

  190. }   

  191. public void finalize() {   

  192. if (socket != null) {   

  193. try {   

  194. socket.close();   

  195. } catch (IOException e) {   

  196. e.printStackTrace();   

  197. }   

  198. }   

  199. }   

  200. //消息接收线程   

  201. private class ReceiveWorker implements Runnable {   

  202. String intStr= null;   

  203. public void run() {   

  204. while(!Thread.interrupted()){   

  205. try {   

  206. byte[] headBytes = new byte[4];   

  207. if(inStream.read(headBytes)==-1){   

  208. log.warn("读到流未尾,对方已关闭流!");   

  209. reConnectToCTCC();//读到流未尾,对方已关闭流   

  210. return;   

  211. }   

  212. byte[] tmp =new byte[4];   

  213. tmp = headBytes;   

  214. String tempStr = new String(tmp).trim();   

  215. if(tempStr==null || tempStr.equals("")) {   

  216. log.error("received message is null");   

  217. continue;   

  218. }   

  219. intStr = new String(tmp);   

  220. int totalLength =Integer.parseInt(intStr);   

  221. //----------------   

  222. byte[] msgBytes = new byte[totalLength-4];   

  223. inStream.read(msgBytes);   

  224. String resultMsg = new String(headBytes)+ new 
    String(msgBytes);   

  225. //抽出消息ID   

  226. String msgNo = resultMsg.substring(8, 8 + 24);   

  227. Condition msglock =(Condition) recMsgMap.get(msgNo);   

  228. if(msglock ==null) {   

  229. log.warn(msgNo+"序号可能已被注销!响应消息丢弃");   

  230. recMsgMap.remove(msgNo);   

  231. continue;   

  232. }   

  233. recMsgMap.put(msgNo, resultMsg);   

  234. try{   

  235. lock.lock();   

  236. msglock.signalAll();   

  237. }finally {   

  238. lock.unlock();   

  239. }   

  240. }catch(SocketException e){   

  241. log.error("服务端关闭socket",e);   

  242. reConnectToCTCC();   

  243. } catch(IOException e) {   

  244. log.error("接收线程读取响应数据时发生IO流异常",e);   

  245. } catch(NumberFormatException e){   

  246. log.error("收到没良心包,String转int异常,异常字符:"+intStr);   

  247. }   

  248. }   

  249. }   

  250. }   

“如何解决Java Socket通信技术收发线程互斥的问题”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯