每次执行new 一个线程处理数据时候,
当new的线程多了,
大概率会出现SocketChannel.write()多次执行
而内置循环却无法卡主问题。。
目前一直找不到是什么原因引起 SocketChannel.write()多次执行。而且没有触发到do while的等待
每次执行new 一个线程处理数据时候,
当new的线程多了,
大概率会出现SocketChannel.write()多次执行
而内置循环却无法卡主问题。。
目前一直找不到是什么原因引起 SocketChannel.write()多次执行。而且没有触发到do while的等待
你这开局一句话,几张图。。。就指望人能帮你找到BUG?
K哥说的有理。。我去整理整理先
//出问题的线程代码
public class FileTransmitThread extends Thread{
private ITransfeType tramsfe;
private Map<Integer, byte[]> fullData = new HashMap<Integer, byte[]>();
//存储每个通道ip对应的数据编码快
public static Map<String, Integer> Number = new HashMap<String, Integer>();
//存储每个通道ip对应的下一个数据编码快
private static Map<String, Integer> UsedNumber = new HashMap<String, Integer>();
private List<UserGroup_EquipmentPojo> channelID;
private String remoteAddress;
private PortDataMapper portDataMapper;
private Object ProtocolType;
public FileTransmitThread(){
super();
}
/**
* 文件数据传输线程
* @param sc
* @param buffer
* @param fullData
*/
@Autowired
public FileTransmitThread(ITransfeType tramsfe,
Map<Integer, byte[]> fullData,
List<UserGroup_EquipmentPojo> channelID,
PortDataMapper portDataMapper,String remoteAddress,Object ProtocolType){
this.tramsfe = tramsfe;
this.fullData = fullData;
this.channelID = channelID;
this.portDataMapper = portDataMapper;
this.remoteAddress = remoteAddress;
this.ProtocolType = ProtocolType;
}
@Override
public void run(){
FileHEX Fhex = new FileHEX();
Integer userGroupId = 0;
Integer deviceId = 0;
System.out.println("----设备IP地址:"+remoteAddress);
System.out.println(Number.get(remoteAddress)+"----"+UsedNumber.get(remoteAddress)+"----"+fullData.size());
try {
if(fullData.size() > 0 && (Number.get(remoteAddress)) < fullData.size()-1 ){
for (int i = 0; i < channelID.size(); i++) {
if(channelID.get(i).getDeviceIp().equals(remoteAddress)){
userGroupId = channelID.get(i).getUserGroupId();
deviceId = channelID.get(i).getDeviceId();
break;
}
}
//重发机制循环
bre:for (int j = 0; j < 3; j++) {
long start,end = 0;
start = System.currentTimeMillis();
if(Number.get(remoteAddress) != null){
if(fullData.get(Number.get(remoteAddress)+1) != null){
System.out.println("升级发送数据"+(Number.get(remoteAddress)+1)+"\n\n");
//发送数据,然后等待回应,此处在多线程情况下,会执行多次,而不会执行到do while循环处
tramsfe.send(fullData.get(Number.get(remoteAddress)+1), ProtocolType);
//存储发送记录到mysql
WaterLevelPojo waterleve = new WaterLevelPojo();//
waterleve.setIP(remoteAddress.substring(remoteAddress.indexOf("/")+1, remoteAddress.indexOf(":")));
waterleve.setPort(Integer.parseInt(remoteAddress.split(":")[1]));
waterleve.setStatus(1);
waterleve.setMessage(Fhex.HEX_FileData(fullData.get(Number.get(remoteAddress)+1), fullData.get(Number.get(remoteAddress)+1).length));
portDataMapper.addMessage(waterleve);
//给ip通道赋予获得编码号
if(UsedNumber.get(remoteAddress) == null){
UsedNumber.put(remoteAddress, Number.get(remoteAddress));
}
//内循环等待判断nio的数据回应,满足条件马上跳出外循环,否则一分钟后执行失败操作
do{
if(Number.get(remoteAddress)!= null && UsedNumber.get(remoteAddress) != null && fullData.size() > 0){
if(Number.get(remoteAddress) > UsedNumber.get(remoteAddress)){
UsedNumber.put(remoteAddress, Number.get(remoteAddress));
break bre;
}
}
end = System.currentTimeMillis();
}while ((end-start) < 60000);
System.out.println("结束时间:"+(end-start)+"\n\n");
if(j == 2 && Number!= null && UsedNumber != null){
System.out.println("----断开后设备IP地址:"+ProtocolType);
UsedNumber.put(remoteAddress, 0);
tramsfe.send(hexbyte.hex2byte(Fhex.END("A0","00000000")),ProtocolType);
portDataMapper.addStatus("编号:"+Number.get(remoteAddress)+"传输失败",userGroupId,deviceId,Fhex.HEX_FileData(fullData.get(Number.get(remoteAddress)+1), fullData.get(Number.get(remoteAddress)+1).length),5);
//fullData.clear();
Number.clear();
UsedNumber.clear();
//NioServer.TransmissionMode = false;
channelID.clear();
}
}
}
}
}else if((Number.get(remoteAddress)) == fullData.size()-1){
tramsfe.send(hexbyte.hex2byte(Fhex.END("A1","00000000")),ProtocolType);
portDataMapper.addStatus("节点总数:"+(fullData.size()-1)+",编号:"+Number.get(remoteAddress)+"传输完成",
userGroupId,deviceId,
Fhex.END("A1","00000000"),5);
//fullData.clear();
Number.clear();
UsedNumber.clear();
//NioServer.TransmissionMode = false;
channelID.clear();
}
} catch (Exception e) {
try {
portDataMapper.addStatus("编号:"+Number.get(remoteAddress)+"传输失败",userGroupId,deviceId,
Fhex.HEX_FileData(fullData.get(Number.get(remoteAddress)+1),
fullData.get(Number.get(remoteAddress)+1).length),5);
tramsfe.StopK(ProtocolType);
//fullData.clear();
Number.clear();
UsedNumber.clear();
//NioServer.TransmissionMode = false;
channelID.clear();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
e.printStackTrace();
}
}
}