nio通道,接收数据new一个新线程,导致数据发送多次触发。却找不到原因。求大佬帮忙看看

每次执行new 一个线程处理数据时候,
当new的线程多了,
大概率会出现SocketChannel.write()多次执行
而内置循环却无法卡主问题。。

目前一直找不到是什么原因引起 SocketChannel.write()多次执行。而且没有触发到do while的等待

你这开局一句话,几张图。。。就指望人能帮你找到BUG?

K哥说的有理。。我去整理整理先

1 Like

//出问题的线程代码

public class   FileTransmitThread extends Thread{

	private ITransfeType tramsfe;
	private  Map<Integer, byte[]> fullData = new HashMap<Integer, byte[]>();
	//存储每个通道ip对应的数据编码快
	public static Map<String, Integer> Number = new HashMap<String, Integer>();
	//存储每个通道ip对应的下一个数据编码快
	private static Map<String, Integer> UsedNumber = new HashMap<String, Integer>();
	private List<UserGroup_EquipmentPojo> channelID;
	private String remoteAddress;
	private PortDataMapper portDataMapper;
	private Object ProtocolType;
	
	public FileTransmitThread(){
		super();
	}
	/**
	 * 文件数据传输线程
	 * @param sc
	 * @param buffer
	 * @param fullData
	 */
	@Autowired
	public FileTransmitThread(ITransfeType tramsfe,
								Map<Integer, byte[]> fullData,
								List<UserGroup_EquipmentPojo> channelID,
								PortDataMapper portDataMapper,String remoteAddress,Object ProtocolType){
		this.tramsfe = tramsfe;
		this.fullData = fullData;
		this.channelID = channelID;
		
		this.portDataMapper = portDataMapper;
		this.remoteAddress = remoteAddress;
		this.ProtocolType = ProtocolType;
	}
	
	@Override
	public void run(){
		FileHEX Fhex = new FileHEX();
		
		Integer userGroupId = 0;
		Integer deviceId = 0;
		System.out.println("----设备IP地址:"+remoteAddress);
		System.out.println(Number.get(remoteAddress)+"----"+UsedNumber.get(remoteAddress)+"----"+fullData.size());
		try {
			
			if(fullData.size() > 0 && (Number.get(remoteAddress)) < fullData.size()-1 ){
				

				for (int i = 0; i < channelID.size(); i++) {
					if(channelID.get(i).getDeviceIp().equals(remoteAddress)){
						userGroupId = channelID.get(i).getUserGroupId();
						deviceId = channelID.get(i).getDeviceId();
						break;
					}
				}
			//重发机制循环
			bre:for (int j = 0; j < 3; j++) {
						long start,end = 0;
						start = System.currentTimeMillis();
					if(Number.get(remoteAddress) != null){	
						if(fullData.get(Number.get(remoteAddress)+1) != null){	
								
								System.out.println("升级发送数据"+(Number.get(remoteAddress)+1)+"\n\n");
								//发送数据,然后等待回应,此处在多线程情况下,会执行多次,而不会执行到do while循环处
								tramsfe.send(fullData.get(Number.get(remoteAddress)+1), ProtocolType);
								//存储发送记录到mysql
								WaterLevelPojo waterleve = new WaterLevelPojo();//
								waterleve.setIP(remoteAddress.substring(remoteAddress.indexOf("/")+1, remoteAddress.indexOf(":")));
								waterleve.setPort(Integer.parseInt(remoteAddress.split(":")[1]));
								waterleve.setStatus(1);
								waterleve.setMessage(Fhex.HEX_FileData(fullData.get(Number.get(remoteAddress)+1), fullData.get(Number.get(remoteAddress)+1).length));
	
								portDataMapper.addMessage(waterleve);
								//给ip通道赋予获得编码号
								if(UsedNumber.get(remoteAddress) == null){
									UsedNumber.put(remoteAddress, Number.get(remoteAddress));
								}
								//内循环等待判断nio的数据回应,满足条件马上跳出外循环,否则一分钟后执行失败操作
								do{
									if(Number.get(remoteAddress)!= null && UsedNumber.get(remoteAddress) != null && fullData.size() > 0){
										if(Number.get(remoteAddress) > UsedNumber.get(remoteAddress)){
											UsedNumber.put(remoteAddress, Number.get(remoteAddress)); 
											break bre;
										}
									}
									end = System.currentTimeMillis();  
								}while ((end-start) < 60000);
								System.out.println("结束时间:"+(end-start)+"\n\n");
							if(j == 2 && Number!= null && UsedNumber != null){
								System.out.println("----断开后设备IP地址:"+ProtocolType);
								UsedNumber.put(remoteAddress, 0);

								tramsfe.send(hexbyte.hex2byte(Fhex.END("A0","00000000")),ProtocolType);
								portDataMapper.addStatus("编号:"+Number.get(remoteAddress)+"传输失败",userGroupId,deviceId,Fhex.HEX_FileData(fullData.get(Number.get(remoteAddress)+1), fullData.get(Number.get(remoteAddress)+1).length),5);
								//fullData.clear();
								Number.clear();
								UsedNumber.clear();
								//NioServer.TransmissionMode = false;
								channelID.clear();
							}
						}
					}
				}
			}else if((Number.get(remoteAddress)) == fullData.size()-1){

				tramsfe.send(hexbyte.hex2byte(Fhex.END("A1","00000000")),ProtocolType);
				portDataMapper.addStatus("节点总数:"+(fullData.size()-1)+",编号:"+Number.get(remoteAddress)+"传输完成",
										userGroupId,deviceId,
										Fhex.END("A1","00000000"),5);
				//fullData.clear();
				Number.clear();
				UsedNumber.clear();
				//NioServer.TransmissionMode = false;
				channelID.clear();
			}

		} catch (Exception e) {
			try {
				portDataMapper.addStatus("编号:"+Number.get(remoteAddress)+"传输失败",userGroupId,deviceId,
										Fhex.HEX_FileData(fullData.get(Number.get(remoteAddress)+1),
														  fullData.get(Number.get(remoteAddress)+1).length),5);
				tramsfe.StopK(ProtocolType);
				//fullData.clear();
				Number.clear();
				UsedNumber.clear();
				//NioServer.TransmissionMode = false;
				channelID.clear();
			} catch (IOException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			e.printStackTrace();
		}
	}

}