##// END OF EJS Templates
GRESB Read/Write ops are working but Read has a memory leak.
Jeandet Alexis -
r76:630afd0dda2f GRESB
parent child
Show More
@@ -85,7 +85,8 void GR_ESB_bridge::setIP(QString ip)
85 85
86 86 void GR_ESB_bridge::setVirtualLink(QString vlink)
87 87 {
88 vlink = vlink.section("Virtual link",0,0);
88 //vlink = vlink.section(,0,0);
89 vlink.remove("Virtual link");
89 90 bool success;
90 91 int vlinkTmp = vlink.toInt(&success);
91 92 if(success)
@@ -138,7 +139,7 unsigned int GR_ESB_bridge::Write(unsign
138 139 RMAP_MAX_XFER_SIZE*4,
139 140 writeBuffer);
140 141 manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(RMAP_MAX_XFER_SIZE*4));
141 manager->getRMAPanswer(transactionID,&RMAPAckBuff);
142 int len = manager->getRMAPanswer(transactionID,&RMAPAckBuff);
142 143 free(RMAPAckBuff);
143 144 written+=RMAP_MAX_XFER_SIZE;
144 145 count-=RMAP_MAX_XFER_SIZE;
@@ -165,7 +166,7 unsigned int GR_ESB_bridge::Write(unsign
165 166 count*4,
166 167 writeBuffer);
167 168 manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(count*4));
168 manager->getRMAPanswer(transactionID,&RMAPAckBuff);
169 int len = manager->getRMAPanswer(transactionID,&RMAPAckBuff);
169 170 free(RMAPAckBuff);
170 171 written+=count;
171 172 if(progress!=NULL)
@@ -276,97 +277,166 int GR_ESB_bridge::pushRMAPPacket(char *
276 277 GR_ESB_Manager::GR_ESB_Manager(socexplorerplugin *plugin, QObject *parent)
277 278 :abstractSpwManager(plugin, parent)
278 279 {
279 this->Read_soc = new QTcpSocket(this);
280 this->Write_soc = new QTcpSocket(this);
280 // this->Read_soc = new QTcpSocket(this);
281 // this->Write_soc = new QTcpSocket(this);
281 282 this->sourceLogicalAddress=32;
282 283 this->destinationLogicalAddress=254;
283 284 this->destinationKey=2;
285 connect(&(this->Read_soc),SIGNAL(readyRead()),this,SLOT(readyRead()));
284 286 }
285 287
286 288 GR_ESB_Manager::~GR_ESB_Manager()
287 289 {
288 290 }
289 291
292 void GR_ESB_Manager::__processPacket(packetBuffer_t *packet)
293 {
294 if(packet->complete)
295 {
296 if(packet->buffer[1]==(char)SPW_PROTO_ID_RMAP) //RMAP packet
297 {
298 RMAP_Answer* RMapPacket;
299 //SocExplorerEngine::message(this->plugin,"Got RMAP packet",2);
300 //SocExplorerEngine::message(this->plugin,QString("Rmap packet size %1").arg(packet->PacketLen),2);
301 char* packetbuffer = (char*)malloc(packet->PacketLen);
302 if(packetbuffer)
303 {
304 memcpy(packetbuffer,packet->buffer,packet->PacketLen);
305 this->handleMutex->unlock();
306 if(packet->PacketLen==8)
307 {
308 RMapPacket=new RMAP_Answer(RMAP_get_transactionID(packetbuffer),packetbuffer,packet->PacketLen);
309 }
310 else
311 {
312 RMapPacket=new RMAP_Answer(RMAP_get_transactionID(packetbuffer),packetbuffer,packet->PacketLen);
313 }
314 RMAP_AnswersMtx->lock();
315 RMAP_Answers.append(RMapPacket);
316 RMAP_AnswersMtx->unlock();
317 RMAP_AnswersSem->release();
318 }
319 }
320 else //any non-rmap packet will be pushed to the network
321 {
322 char* packetbuffer = (char*)malloc(packet->PacketLen);
323 if(packetbuffer)
324 {
325 //memcpy(packetbuffer,packet->buffer,packet->PacketLen);
326 //emit emitPacket(packetbuffer,packet->PacketLen);
327 //SocExplorerEngine::message(this->plugin,"Got SPW packet",2);
328 }
329 }
330 }
331
332 }
333
334 QByteArray GR_ESB_Manager::__processData(const QByteArray &data, packetBuffer_t *buffer)
335 {
336 if(buffer->complete)
337 {
338 if(Q_UNLIKELY(data.size()<=4))
339 return data;
340 buffer->PacketLen= ((int)data.at(3)& 0x0FF) + (((int)data.at(2)& 0x0FF)<<8) + (((int)data.at(1)& 0x0FF)<<16);
341 if(buffer->PacketLen>(data.size()-4))
342 {
343 memcpy(buffer->buffer,data.data()+4,data.size()-4);
344 buffer->complete=false;
345 buffer->index=data.size()-4;
346 }
347 else
348 {
349 memcpy(buffer->buffer,data.data()+4,data.size()-4);
350 buffer->PacketLen = data.size()-4;
351 buffer->index=data.size()-4;
352 buffer->complete=true;
353 __processPacket(buffer);
354 int len = buffer->PacketLen;
355 buffer->PacketLen = 0;
356 buffer->index=0;
357 if((data.size()-4-len))
358 {
359 return __processData(data.right((data.size()-4-len)),buffer);
360 }
361 }
362
363 }
364 else
365 {
366 if(buffer->PacketLen>(data.size()+buffer->index))
367 {
368 memcpy(buffer->buffer+buffer->index,data.data(),data.size());
369 buffer->complete=false;
370 buffer->index+=data.size();
371 }
372 else
373 {
374 memcpy(buffer->buffer+buffer->index,data.data(),buffer->PacketLen-buffer->index);
375 buffer->complete=true;
376 buffer->index+=data.size();
377 __processPacket(buffer);
378 int len = buffer->PacketLen;
379 buffer->PacketLen = 0;
380 buffer->index=0;
381 if((data.size()-len))
382 {
383 return __processData(data.right((data.size()-len)),buffer);
384 }
385 }
386 }
387 return QByteArray();
388 }
389
290 390 void GR_ESB_Manager::run()
291 391 {
292 char buffer[(RMAP_MAX_XFER_SIZE*4)+50];
293 SocExplorerEngine::message(this->plugin,"Starting GRESB pooling thread",1);
392 char bufferData[(RMAP_MAX_XFER_SIZE*4)+50];
393 packetBuffer_t buffer={bufferData,0,0,true};
394 //SocExplorerEngine::message(this->plugin,"Starting GRESB pooling thread",1);
395 QByteArray data;
294 396 while (!this->isInterruptionRequested())
295 397 {
296 398 if(this->connected)
297 399 {
298 handleMutex->lock();
299 SocExplorerEngine::message(this->plugin,"Looking for new RMAP packets",5);
300 if(Read_soc->waitForReadyRead(100))
400 //SocExplorerEngine::message(this->plugin,"Looking for new RMAP packets",8);
401 if(!incomingPackets.isEmpty())
301 402 {
302 QByteArray data = Read_soc->readAll();
303 int PacketLen= ((int)data.at(2)& 0x0FF) + (((int)data.at(3)& 0x0FF)<<8) + (((int)data.at(4)& 0x0FF)<<16);
304 if(data[1]==(char)SPW_PROTO_ID_RMAP) //RMAP packet
305 {
306 RMAP_Answer* packet;
307 SocExplorerEngine::message(this->plugin,"Got RMAP packet",2);
308 SocExplorerEngine::message(this->plugin,QString("Rmap packet size %1").arg(PacketLen),2);
309 char* packetbuffer = (char*)malloc(PacketLen);
310 memcpy(packetbuffer,data.data(),PacketLen);
311 this->handleMutex->unlock();
312 if(PacketLen==8)
313 {
314 packet=new RMAP_Answer(RMAP_get_transactionID(buffer),packetbuffer,PacketLen);
315 }
316 else
317 {
318 packet=new RMAP_Answer(RMAP_get_transactionID(buffer+1),packetbuffer,PacketLen);
319 }
320 RMAP_AnswersMtx->lock();
321 RMAP_Answers.append(packet);
322 RMAP_AnswersMtx->unlock();
323 RMAP_AnswersSem->release();
324 }
325 else //any non-rmap packet will be pushed to the network
326 {
327 char* packetbuffer = (char*)malloc(PacketLen);
328 memcpy(packetbuffer,data.data(),PacketLen);
329 emit emitPacket(packetbuffer,PacketLen);
330 this->handleMutex->unlock();
331 SocExplorerEngine::message(this->plugin,"Got SPW packet",2);
332 }
333
334 }
335 else
336 {
337 handleMutex->unlock();
403 incomingPacketsMutex.lock();
404 data.append(incomingPackets.dequeue());
405 this->incomingPacketsMutex.unlock();
406 data = __processData(data,&buffer);
338 407 }
339 408
340 409 }
341 410 else
342 411 {
343 412 //do some sanity checks!
344
345 413 usleep(RMAPtimeout/2);
346 414 }
347 415 usleep(1000);
348 416 }
349 SocExplorerEngine::message(this->plugin,"Exiting Startdundee USB pooling thread",1);
417 //SocExplorerEngine::message(this->plugin,"Exiting Startdundee USB pooling thread",1);
350 418 }
351 419
352 420 bool GR_ESB_Manager::connectBridge()
353 421 {
354 422 int timeout=60;
355 if(this->Read_soc->state()==QTcpSocket::UnconnectedState)
423 this->connected = false;
424 if(this->Read_soc.state()==QTcpSocket::UnconnectedState)
356 425 {
357 this->Read_soc->connectToHost(IP,gresb_Conf[virtualLinkIndex].Receive_port);
358 this->Read_soc->waitForConnected(30000);
426 this->Read_soc.connectToHost(IP,gresb_Conf[virtualLinkIndex].Receive_port);
427 this->Read_soc.waitForConnected(30000);
359 428 }
360 if(this->Write_soc->state()==QTcpSocket::UnconnectedState)
429 if(this->Write_soc.state()==QTcpSocket::UnconnectedState)
361 430 {
362 this->Write_soc->connectToHost(IP,gresb_Conf[virtualLinkIndex].Transmit_port);
363 this->Write_soc->waitForConnected(30000);
431 this->Write_soc.connectToHost(IP,gresb_Conf[virtualLinkIndex].Transmit_port);
432 this->Write_soc.waitForConnected(30000);
364 433 }
365 while((this->Read_soc->state()!=QTcpSocket::ConnectedState) && (this->Write_soc->state()!=QTcpSocket::ConnectedState))
434 while((this->Read_soc.state()!=QTcpSocket::ConnectedState) && (this->Write_soc.state()!=QTcpSocket::ConnectedState))
366 435 {
367 436 usleep(100000);
368 437 if(timeout--==0)return false;
369 438 }
439 this->connected = true;
370 440 return true;
371 441
372 442 }
@@ -374,17 +444,17 bool GR_ESB_Manager::connectBridge()
374 444 bool GR_ESB_Manager::disconnectBridge()
375 445 {
376 446 int timeout=60;
377 if(this->Read_soc->state()!=QTcpSocket::UnconnectedState)
447 if(this->Read_soc.state()!=QTcpSocket::UnconnectedState)
378 448 {
379 this->Read_soc->disconnectFromHost();
380 this->Read_soc->waitForDisconnected(30000);
449 this->Read_soc.disconnectFromHost();
450 this->Read_soc.waitForDisconnected(30000);
381 451 }
382 if(this->Write_soc->state()!=QTcpSocket::UnconnectedState)
452 if(this->Write_soc.state()!=QTcpSocket::UnconnectedState)
383 453 {
384 this->Write_soc->disconnectFromHost();
385 this->Write_soc->waitForDisconnected(30000);
454 this->Write_soc.disconnectFromHost();
455 this->Write_soc.waitForDisconnected(30000);
386 456 }
387 while((this->Read_soc->state()!=QTcpSocket::UnconnectedState) && (this->Write_soc->state()!=QTcpSocket::UnconnectedState))
457 while((this->Read_soc.state()!=QTcpSocket::UnconnectedState) && (this->Write_soc.state()!=QTcpSocket::UnconnectedState))
388 458 {
389 459 usleep(100000);
390 460 if(timeout--==0)return false;
@@ -398,7 +468,7 bool GR_ESB_Manager::sendPacket(char *pa
398 468 bool result = false;
399 469 char protocoleIdentifier;
400 470 SocExplorerEngine::message(this->plugin,"Sending SPW packet",2);
401 if(Q_UNLIKELY(this->Write_soc->state()!=QAbstractSocket::ConnectedState))
471 if(Q_UNLIKELY(this->Write_soc.state()!=QAbstractSocket::ConnectedState))
402 472 {
403 473 SocExplorerEngine::message(this->plugin,"Socket closed",2);
404 474 //TODO handle disconnection
@@ -413,7 +483,8 bool GR_ESB_Manager::sendPacket(char *pa
413 483 SPWpacket[3]=size & 0x0FF;
414 484 }
415 485 this->handleMutex->lock();
416 result = ((size+4) == this->Write_soc->write(SPWpacket,size+4));
486 result = ((size+4) == this->Write_soc.write(SPWpacket,size+4));
487 this->Write_soc.flush();
417 488 this->handleMutex->unlock();
418 489 if (Q_UNLIKELY(!result))
419 490 {
@@ -426,9 +497,16 bool GR_ESB_Manager::sendPacket(char *pa
426 497 // read the protocole identifier
427 498 protocoleIdentifier = packet[2];
428 499 if (protocoleIdentifier == SPW_PROTO_ID_CCSDS)
429 emit ccsdsPacketTransmittedToSpw();
500 emit ccsdsPacketTransmittedToSpw();
430 501 SocExplorerEngine::message(this->plugin,"Packet sent",2);
431 502 }
432 503 return true;
433 504 }
434 505
506 void GR_ESB_Manager::readyRead()
507 {
508 incomingPacketsMutex.lock();
509 incomingPackets.append(Read_soc.readAll());
510 incomingPacketsMutex.unlock();
511 }
512
@@ -5,6 +5,7
5 5 #include <QThread>
6 6 #include <QMutex>
7 7 #include <QSemaphore>
8 #include <QQueue>
8 9
9 10 struct gresb_Conf_str
10 11 {
@@ -15,18 +16,27 struct gresb_Conf_str
15 16
16 17 const struct gresb_Conf_str gresb_Conf[]=
17 18 {
18 {3000,3001}, //Virtual link 0
19 {3002,3003}, //Virtual link 1
20 {3004,3005}, //Virtual link 2
21 {3006,3007}, //Virtual link 3
22 {3008,3009}, //Virtual link 4
23 {3010,3011} //Virtual link 5
19 {3000,3001}, //Virtual link 0
20 {3002,3003}, //Virtual link 1
21 {3004,3005}, //Virtual link 2
22 {3006,3007}, //Virtual link 3
23 {3008,3009}, //Virtual link 4
24 {3010,3011} //Virtual link 5
24 25 };
25 26
26 27
27 28 class GR_ESB_Manager: public abstractSpwManager
28 29 {
29 30 Q_OBJECT
31
32 typedef struct packetBuffer_t
33 {
34 char* buffer;
35 int PacketLen;
36 int index;
37 bool complete;
38 }packetBuffer_t;
39
30 40 public:
31 41 explicit GR_ESB_Manager(socexplorerplugin *plugin = 0,QObject* parent=0);
32 42 ~GR_ESB_Manager();
@@ -41,9 +51,16 private:
41 51 void pushRmapPacket(char* packet,int len);
42 52 char* SPWPacketBuff;
43 53
54 void __processPacket(packetBuffer_t *packet);
55 QByteArray __processData(const QByteArray &data, packetBuffer_t* buffer);
56 private slots:
57 void readyRead();
58
44 59 public:
45 QTcpSocket* Read_soc;
46 QTcpSocket* Write_soc;
60 QTcpSocket Read_soc;
61 QTcpSocket Write_soc;
62 QMutex incomingPacketsMutex;
63 QQueue<QByteArray> incomingPackets;
47 64 QString IP;
48 65 int virtualLinkIndex;
49 66 };
@@ -109,6 +109,7 int abstractSpwManager::getRMAPanswer(in
109 109 *buffer=NULL;
110 110 int count=0;
111 111 SocExplorerEngine::message(this->plugin,"Looking for RMAP answer",2);
112 qApp->processEvents();
112 113 timeout.start();
113 114 while (*buffer==NULL)
114 115 {
@@ -151,6 +152,7 int abstractSpwManager::getRMAPanswer(in
151 152 return -1;
152 153 }
153 154 usleep(1000);
155 qApp->processEvents();
154 156 }
155 157 this->RMAP_AnswersSem->acquire();
156 158 }
General Comments 0
You need to be logged in to leave comments. Login now