# HG changeset patch # User Jeandet Alexis # Date 2015-10-15 17:03:00 # Node ID 630afd0dda2f2ee59b3ed12343b08ee00ca0bbb7 # Parent 00101d96643675c4a495ca0043f9bde0c462359a GRESB Read/Write ops are working but Read has a memory leak. diff --git a/spwplugin/GR-ESB/gr_esb_bridge.cpp b/spwplugin/GR-ESB/gr_esb_bridge.cpp --- a/spwplugin/GR-ESB/gr_esb_bridge.cpp +++ b/spwplugin/GR-ESB/gr_esb_bridge.cpp @@ -85,7 +85,8 @@ void GR_ESB_bridge::setIP(QString ip) void GR_ESB_bridge::setVirtualLink(QString vlink) { - vlink = vlink.section("Virtual link",0,0); + //vlink = vlink.section(,0,0); + vlink.remove("Virtual link"); bool success; int vlinkTmp = vlink.toInt(&success); if(success) @@ -138,7 +139,7 @@ unsigned int GR_ESB_bridge::Write(unsign RMAP_MAX_XFER_SIZE*4, writeBuffer); manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(RMAP_MAX_XFER_SIZE*4)); - manager->getRMAPanswer(transactionID,&RMAPAckBuff); + int len = manager->getRMAPanswer(transactionID,&RMAPAckBuff); free(RMAPAckBuff); written+=RMAP_MAX_XFER_SIZE; count-=RMAP_MAX_XFER_SIZE; @@ -165,7 +166,7 @@ unsigned int GR_ESB_bridge::Write(unsign count*4, writeBuffer); manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(count*4)); - manager->getRMAPanswer(transactionID,&RMAPAckBuff); + int len = manager->getRMAPanswer(transactionID,&RMAPAckBuff); free(RMAPAckBuff); written+=count; if(progress!=NULL) @@ -276,97 +277,166 @@ int GR_ESB_bridge::pushRMAPPacket(char * GR_ESB_Manager::GR_ESB_Manager(socexplorerplugin *plugin, QObject *parent) :abstractSpwManager(plugin, parent) { - this->Read_soc = new QTcpSocket(this); - this->Write_soc = new QTcpSocket(this); + // this->Read_soc = new QTcpSocket(this); + // this->Write_soc = new QTcpSocket(this); this->sourceLogicalAddress=32; this->destinationLogicalAddress=254; this->destinationKey=2; + connect(&(this->Read_soc),SIGNAL(readyRead()),this,SLOT(readyRead())); } GR_ESB_Manager::~GR_ESB_Manager() { } +void GR_ESB_Manager::__processPacket(packetBuffer_t *packet) +{ + if(packet->complete) + { + if(packet->buffer[1]==(char)SPW_PROTO_ID_RMAP) //RMAP packet + { + RMAP_Answer* RMapPacket; + //SocExplorerEngine::message(this->plugin,"Got RMAP packet",2); + //SocExplorerEngine::message(this->plugin,QString("Rmap packet size %1").arg(packet->PacketLen),2); + char* packetbuffer = (char*)malloc(packet->PacketLen); + if(packetbuffer) + { + memcpy(packetbuffer,packet->buffer,packet->PacketLen); + this->handleMutex->unlock(); + if(packet->PacketLen==8) + { + RMapPacket=new RMAP_Answer(RMAP_get_transactionID(packetbuffer),packetbuffer,packet->PacketLen); + } + else + { + RMapPacket=new RMAP_Answer(RMAP_get_transactionID(packetbuffer),packetbuffer,packet->PacketLen); + } + RMAP_AnswersMtx->lock(); + RMAP_Answers.append(RMapPacket); + RMAP_AnswersMtx->unlock(); + RMAP_AnswersSem->release(); + } + } + else //any non-rmap packet will be pushed to the network + { + char* packetbuffer = (char*)malloc(packet->PacketLen); + if(packetbuffer) + { + //memcpy(packetbuffer,packet->buffer,packet->PacketLen); + //emit emitPacket(packetbuffer,packet->PacketLen); + //SocExplorerEngine::message(this->plugin,"Got SPW packet",2); + } + } + } + +} + +QByteArray GR_ESB_Manager::__processData(const QByteArray &data, packetBuffer_t *buffer) +{ + if(buffer->complete) + { + if(Q_UNLIKELY(data.size()<=4)) + return data; + buffer->PacketLen= ((int)data.at(3)& 0x0FF) + (((int)data.at(2)& 0x0FF)<<8) + (((int)data.at(1)& 0x0FF)<<16); + if(buffer->PacketLen>(data.size()-4)) + { + memcpy(buffer->buffer,data.data()+4,data.size()-4); + buffer->complete=false; + buffer->index=data.size()-4; + } + else + { + memcpy(buffer->buffer,data.data()+4,data.size()-4); + buffer->PacketLen = data.size()-4; + buffer->index=data.size()-4; + buffer->complete=true; + __processPacket(buffer); + int len = buffer->PacketLen; + buffer->PacketLen = 0; + buffer->index=0; + if((data.size()-4-len)) + { + return __processData(data.right((data.size()-4-len)),buffer); + } + } + + } + else + { + if(buffer->PacketLen>(data.size()+buffer->index)) + { + memcpy(buffer->buffer+buffer->index,data.data(),data.size()); + buffer->complete=false; + buffer->index+=data.size(); + } + else + { + memcpy(buffer->buffer+buffer->index,data.data(),buffer->PacketLen-buffer->index); + buffer->complete=true; + buffer->index+=data.size(); + __processPacket(buffer); + int len = buffer->PacketLen; + buffer->PacketLen = 0; + buffer->index=0; + if((data.size()-len)) + { + return __processData(data.right((data.size()-len)),buffer); + } + } + } + return QByteArray(); +} + void GR_ESB_Manager::run() { - char buffer[(RMAP_MAX_XFER_SIZE*4)+50]; - SocExplorerEngine::message(this->plugin,"Starting GRESB pooling thread",1); + char bufferData[(RMAP_MAX_XFER_SIZE*4)+50]; + packetBuffer_t buffer={bufferData,0,0,true}; + //SocExplorerEngine::message(this->plugin,"Starting GRESB pooling thread",1); + QByteArray data; while (!this->isInterruptionRequested()) { if(this->connected) { - handleMutex->lock(); - SocExplorerEngine::message(this->plugin,"Looking for new RMAP packets",5); - if(Read_soc->waitForReadyRead(100)) + //SocExplorerEngine::message(this->plugin,"Looking for new RMAP packets",8); + if(!incomingPackets.isEmpty()) { - QByteArray data = Read_soc->readAll(); - int PacketLen= ((int)data.at(2)& 0x0FF) + (((int)data.at(3)& 0x0FF)<<8) + (((int)data.at(4)& 0x0FF)<<16); - if(data[1]==(char)SPW_PROTO_ID_RMAP) //RMAP packet - { - RMAP_Answer* packet; - SocExplorerEngine::message(this->plugin,"Got RMAP packet",2); - SocExplorerEngine::message(this->plugin,QString("Rmap packet size %1").arg(PacketLen),2); - char* packetbuffer = (char*)malloc(PacketLen); - memcpy(packetbuffer,data.data(),PacketLen); - this->handleMutex->unlock(); - if(PacketLen==8) - { - packet=new RMAP_Answer(RMAP_get_transactionID(buffer),packetbuffer,PacketLen); - } - else - { - packet=new RMAP_Answer(RMAP_get_transactionID(buffer+1),packetbuffer,PacketLen); - } - RMAP_AnswersMtx->lock(); - RMAP_Answers.append(packet); - RMAP_AnswersMtx->unlock(); - RMAP_AnswersSem->release(); - } - else //any non-rmap packet will be pushed to the network - { - char* packetbuffer = (char*)malloc(PacketLen); - memcpy(packetbuffer,data.data(),PacketLen); - emit emitPacket(packetbuffer,PacketLen); - this->handleMutex->unlock(); - SocExplorerEngine::message(this->plugin,"Got SPW packet",2); - } - - } - else - { - handleMutex->unlock(); + incomingPacketsMutex.lock(); + data.append(incomingPackets.dequeue()); + this->incomingPacketsMutex.unlock(); + data = __processData(data,&buffer); } } else { //do some sanity checks! - usleep(RMAPtimeout/2); } usleep(1000); } - SocExplorerEngine::message(this->plugin,"Exiting Startdundee USB pooling thread",1); + //SocExplorerEngine::message(this->plugin,"Exiting Startdundee USB pooling thread",1); } bool GR_ESB_Manager::connectBridge() { int timeout=60; - if(this->Read_soc->state()==QTcpSocket::UnconnectedState) + this->connected = false; + if(this->Read_soc.state()==QTcpSocket::UnconnectedState) { - this->Read_soc->connectToHost(IP,gresb_Conf[virtualLinkIndex].Receive_port); - this->Read_soc->waitForConnected(30000); + this->Read_soc.connectToHost(IP,gresb_Conf[virtualLinkIndex].Receive_port); + this->Read_soc.waitForConnected(30000); } - if(this->Write_soc->state()==QTcpSocket::UnconnectedState) + if(this->Write_soc.state()==QTcpSocket::UnconnectedState) { - this->Write_soc->connectToHost(IP,gresb_Conf[virtualLinkIndex].Transmit_port); - this->Write_soc->waitForConnected(30000); + this->Write_soc.connectToHost(IP,gresb_Conf[virtualLinkIndex].Transmit_port); + this->Write_soc.waitForConnected(30000); } - while((this->Read_soc->state()!=QTcpSocket::ConnectedState) && (this->Write_soc->state()!=QTcpSocket::ConnectedState)) + while((this->Read_soc.state()!=QTcpSocket::ConnectedState) && (this->Write_soc.state()!=QTcpSocket::ConnectedState)) { usleep(100000); if(timeout--==0)return false; } + this->connected = true; return true; } @@ -374,17 +444,17 @@ bool GR_ESB_Manager::connectBridge() bool GR_ESB_Manager::disconnectBridge() { int timeout=60; - if(this->Read_soc->state()!=QTcpSocket::UnconnectedState) + if(this->Read_soc.state()!=QTcpSocket::UnconnectedState) { - this->Read_soc->disconnectFromHost(); - this->Read_soc->waitForDisconnected(30000); + this->Read_soc.disconnectFromHost(); + this->Read_soc.waitForDisconnected(30000); } - if(this->Write_soc->state()!=QTcpSocket::UnconnectedState) + if(this->Write_soc.state()!=QTcpSocket::UnconnectedState) { - this->Write_soc->disconnectFromHost(); - this->Write_soc->waitForDisconnected(30000); + this->Write_soc.disconnectFromHost(); + this->Write_soc.waitForDisconnected(30000); } - while((this->Read_soc->state()!=QTcpSocket::UnconnectedState) && (this->Write_soc->state()!=QTcpSocket::UnconnectedState)) + while((this->Read_soc.state()!=QTcpSocket::UnconnectedState) && (this->Write_soc.state()!=QTcpSocket::UnconnectedState)) { usleep(100000); if(timeout--==0)return false; @@ -398,7 +468,7 @@ bool GR_ESB_Manager::sendPacket(char *pa bool result = false; char protocoleIdentifier; SocExplorerEngine::message(this->plugin,"Sending SPW packet",2); - if(Q_UNLIKELY(this->Write_soc->state()!=QAbstractSocket::ConnectedState)) + if(Q_UNLIKELY(this->Write_soc.state()!=QAbstractSocket::ConnectedState)) { SocExplorerEngine::message(this->plugin,"Socket closed",2); //TODO handle disconnection @@ -413,7 +483,8 @@ bool GR_ESB_Manager::sendPacket(char *pa SPWpacket[3]=size & 0x0FF; } this->handleMutex->lock(); - result = ((size+4) == this->Write_soc->write(SPWpacket,size+4)); + result = ((size+4) == this->Write_soc.write(SPWpacket,size+4)); + this->Write_soc.flush(); this->handleMutex->unlock(); if (Q_UNLIKELY(!result)) { @@ -426,9 +497,16 @@ bool GR_ESB_Manager::sendPacket(char *pa // read the protocole identifier protocoleIdentifier = packet[2]; if (protocoleIdentifier == SPW_PROTO_ID_CCSDS) - emit ccsdsPacketTransmittedToSpw(); + emit ccsdsPacketTransmittedToSpw(); SocExplorerEngine::message(this->plugin,"Packet sent",2); } return true; } +void GR_ESB_Manager::readyRead() +{ + incomingPacketsMutex.lock(); + incomingPackets.append(Read_soc.readAll()); + incomingPacketsMutex.unlock(); +} + diff --git a/spwplugin/GR-ESB/gr_esb_bridge.h b/spwplugin/GR-ESB/gr_esb_bridge.h --- a/spwplugin/GR-ESB/gr_esb_bridge.h +++ b/spwplugin/GR-ESB/gr_esb_bridge.h @@ -5,6 +5,7 @@ #include #include #include +#include struct gresb_Conf_str { @@ -15,18 +16,27 @@ struct gresb_Conf_str const struct gresb_Conf_str gresb_Conf[]= { - {3000,3001}, //Virtual link 0 - {3002,3003}, //Virtual link 1 - {3004,3005}, //Virtual link 2 - {3006,3007}, //Virtual link 3 - {3008,3009}, //Virtual link 4 - {3010,3011} //Virtual link 5 +{3000,3001}, //Virtual link 0 +{3002,3003}, //Virtual link 1 +{3004,3005}, //Virtual link 2 +{3006,3007}, //Virtual link 3 +{3008,3009}, //Virtual link 4 +{3010,3011} //Virtual link 5 }; class GR_ESB_Manager: public abstractSpwManager { Q_OBJECT + + typedef struct packetBuffer_t + { + char* buffer; + int PacketLen; + int index; + bool complete; + }packetBuffer_t; + public: explicit GR_ESB_Manager(socexplorerplugin *plugin = 0,QObject* parent=0); ~GR_ESB_Manager(); @@ -41,9 +51,16 @@ private: void pushRmapPacket(char* packet,int len); char* SPWPacketBuff; + void __processPacket(packetBuffer_t *packet); + QByteArray __processData(const QByteArray &data, packetBuffer_t* buffer); +private slots: + void readyRead(); + public: - QTcpSocket* Read_soc; - QTcpSocket* Write_soc; + QTcpSocket Read_soc; + QTcpSocket Write_soc; + QMutex incomingPacketsMutex; + QQueue incomingPackets; QString IP; int virtualLinkIndex; }; diff --git a/spwplugin/abstractspwbridge.cpp b/spwplugin/abstractspwbridge.cpp --- a/spwplugin/abstractspwbridge.cpp +++ b/spwplugin/abstractspwbridge.cpp @@ -109,6 +109,7 @@ int abstractSpwManager::getRMAPanswer(in *buffer=NULL; int count=0; SocExplorerEngine::message(this->plugin,"Looking for RMAP answer",2); + qApp->processEvents(); timeout.start(); while (*buffer==NULL) { @@ -151,6 +152,7 @@ int abstractSpwManager::getRMAPanswer(in return -1; } usleep(1000); + qApp->processEvents(); } this->RMAP_AnswersSem->acquire(); }