##// END OF EJS Templates
GRESB Read/Write ops are working but Read has a memory leak.
Jeandet Alexis -
r76:630afd0dda2f GRESB
parent child
Show More
@@ -85,7 +85,8 void GR_ESB_bridge::setIP(QString ip)
85
85
86 void GR_ESB_bridge::setVirtualLink(QString vlink)
86 void GR_ESB_bridge::setVirtualLink(QString vlink)
87 {
87 {
88 vlink = vlink.section("Virtual link",0,0);
88 //vlink = vlink.section(,0,0);
89 vlink.remove("Virtual link");
89 bool success;
90 bool success;
90 int vlinkTmp = vlink.toInt(&success);
91 int vlinkTmp = vlink.toInt(&success);
91 if(success)
92 if(success)
@@ -138,7 +139,7 unsigned int GR_ESB_bridge::Write(unsign
138 RMAP_MAX_XFER_SIZE*4,
139 RMAP_MAX_XFER_SIZE*4,
139 writeBuffer);
140 writeBuffer);
140 manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(RMAP_MAX_XFER_SIZE*4));
141 manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(RMAP_MAX_XFER_SIZE*4));
141 manager->getRMAPanswer(transactionID,&RMAPAckBuff);
142 int len = manager->getRMAPanswer(transactionID,&RMAPAckBuff);
142 free(RMAPAckBuff);
143 free(RMAPAckBuff);
143 written+=RMAP_MAX_XFER_SIZE;
144 written+=RMAP_MAX_XFER_SIZE;
144 count-=RMAP_MAX_XFER_SIZE;
145 count-=RMAP_MAX_XFER_SIZE;
@@ -165,7 +166,7 unsigned int GR_ESB_bridge::Write(unsign
165 count*4,
166 count*4,
166 writeBuffer);
167 writeBuffer);
167 manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(count*4));
168 manager->sendPacket(writeBuffer,RMAP_WRITE_PACKET_MIN_SZ(count*4));
168 manager->getRMAPanswer(transactionID,&RMAPAckBuff);
169 int len = manager->getRMAPanswer(transactionID,&RMAPAckBuff);
169 free(RMAPAckBuff);
170 free(RMAPAckBuff);
170 written+=count;
171 written+=count;
171 if(progress!=NULL)
172 if(progress!=NULL)
@@ -276,97 +277,166 int GR_ESB_bridge::pushRMAPPacket(char *
276 GR_ESB_Manager::GR_ESB_Manager(socexplorerplugin *plugin, QObject *parent)
277 GR_ESB_Manager::GR_ESB_Manager(socexplorerplugin *plugin, QObject *parent)
277 :abstractSpwManager(plugin, parent)
278 :abstractSpwManager(plugin, parent)
278 {
279 {
279 this->Read_soc = new QTcpSocket(this);
280 // this->Read_soc = new QTcpSocket(this);
280 this->Write_soc = new QTcpSocket(this);
281 // this->Write_soc = new QTcpSocket(this);
281 this->sourceLogicalAddress=32;
282 this->sourceLogicalAddress=32;
282 this->destinationLogicalAddress=254;
283 this->destinationLogicalAddress=254;
283 this->destinationKey=2;
284 this->destinationKey=2;
285 connect(&(this->Read_soc),SIGNAL(readyRead()),this,SLOT(readyRead()));
284 }
286 }
285
287
286 GR_ESB_Manager::~GR_ESB_Manager()
288 GR_ESB_Manager::~GR_ESB_Manager()
287 {
289 {
288 }
290 }
289
291
290 void GR_ESB_Manager::run()
292 void GR_ESB_Manager::__processPacket(packetBuffer_t *packet)
291 {
292 char buffer[(RMAP_MAX_XFER_SIZE*4)+50];
293 SocExplorerEngine::message(this->plugin,"Starting GRESB pooling thread",1);
294 while (!this->isInterruptionRequested())
295 {
293 {
296 if(this->connected)
294 if(packet->complete)
297 {
295 {
298 handleMutex->lock();
296 if(packet->buffer[1]==(char)SPW_PROTO_ID_RMAP) //RMAP packet
299 SocExplorerEngine::message(this->plugin,"Looking for new RMAP packets",5);
300 if(Read_soc->waitForReadyRead(100))
301 {
297 {
302 QByteArray data = Read_soc->readAll();
298 RMAP_Answer* RMapPacket;
303 int PacketLen= ((int)data.at(2)& 0x0FF) + (((int)data.at(3)& 0x0FF)<<8) + (((int)data.at(4)& 0x0FF)<<16);
299 //SocExplorerEngine::message(this->plugin,"Got RMAP packet",2);
304 if(data[1]==(char)SPW_PROTO_ID_RMAP) //RMAP packet
300 //SocExplorerEngine::message(this->plugin,QString("Rmap packet size %1").arg(packet->PacketLen),2);
301 char* packetbuffer = (char*)malloc(packet->PacketLen);
302 if(packetbuffer)
305 {
303 {
306 RMAP_Answer* packet;
304 memcpy(packetbuffer,packet->buffer,packet->PacketLen);
307 SocExplorerEngine::message(this->plugin,"Got RMAP packet",2);
308 SocExplorerEngine::message(this->plugin,QString("Rmap packet size %1").arg(PacketLen),2);
309 char* packetbuffer = (char*)malloc(PacketLen);
310 memcpy(packetbuffer,data.data(),PacketLen);
311 this->handleMutex->unlock();
305 this->handleMutex->unlock();
312 if(PacketLen==8)
306 if(packet->PacketLen==8)
313 {
307 {
314 packet=new RMAP_Answer(RMAP_get_transactionID(buffer),packetbuffer,PacketLen);
308 RMapPacket=new RMAP_Answer(RMAP_get_transactionID(packetbuffer),packetbuffer,packet->PacketLen);
315 }
309 }
316 else
310 else
317 {
311 {
318 packet=new RMAP_Answer(RMAP_get_transactionID(buffer+1),packetbuffer,PacketLen);
312 RMapPacket=new RMAP_Answer(RMAP_get_transactionID(packetbuffer),packetbuffer,packet->PacketLen);
319 }
313 }
320 RMAP_AnswersMtx->lock();
314 RMAP_AnswersMtx->lock();
321 RMAP_Answers.append(packet);
315 RMAP_Answers.append(RMapPacket);
322 RMAP_AnswersMtx->unlock();
316 RMAP_AnswersMtx->unlock();
323 RMAP_AnswersSem->release();
317 RMAP_AnswersSem->release();
324 }
318 }
319 }
325 else //any non-rmap packet will be pushed to the network
320 else //any non-rmap packet will be pushed to the network
326 {
321 {
327 char* packetbuffer = (char*)malloc(PacketLen);
322 char* packetbuffer = (char*)malloc(packet->PacketLen);
328 memcpy(packetbuffer,data.data(),PacketLen);
323 if(packetbuffer)
329 emit emitPacket(packetbuffer,PacketLen);
324 {
330 this->handleMutex->unlock();
325 //memcpy(packetbuffer,packet->buffer,packet->PacketLen);
331 SocExplorerEngine::message(this->plugin,"Got SPW packet",2);
326 //emit emitPacket(packetbuffer,packet->PacketLen);
327 //SocExplorerEngine::message(this->plugin,"Got SPW packet",2);
328 }
329 }
330 }
331
332 }
333
334 QByteArray GR_ESB_Manager::__processData(const QByteArray &data, packetBuffer_t *buffer)
335 {
336 if(buffer->complete)
337 {
338 if(Q_UNLIKELY(data.size()<=4))
339 return data;
340 buffer->PacketLen= ((int)data.at(3)& 0x0FF) + (((int)data.at(2)& 0x0FF)<<8) + (((int)data.at(1)& 0x0FF)<<16);
341 if(buffer->PacketLen>(data.size()-4))
342 {
343 memcpy(buffer->buffer,data.data()+4,data.size()-4);
344 buffer->complete=false;
345 buffer->index=data.size()-4;
346 }
347 else
348 {
349 memcpy(buffer->buffer,data.data()+4,data.size()-4);
350 buffer->PacketLen = data.size()-4;
351 buffer->index=data.size()-4;
352 buffer->complete=true;
353 __processPacket(buffer);
354 int len = buffer->PacketLen;
355 buffer->PacketLen = 0;
356 buffer->index=0;
357 if((data.size()-4-len))
358 {
359 return __processData(data.right((data.size()-4-len)),buffer);
360 }
332 }
361 }
333
362
334 }
363 }
335 else
364 else
336 {
365 {
337 handleMutex->unlock();
366 if(buffer->PacketLen>(data.size()+buffer->index))
367 {
368 memcpy(buffer->buffer+buffer->index,data.data(),data.size());
369 buffer->complete=false;
370 buffer->index+=data.size();
371 }
372 else
373 {
374 memcpy(buffer->buffer+buffer->index,data.data(),buffer->PacketLen-buffer->index);
375 buffer->complete=true;
376 buffer->index+=data.size();
377 __processPacket(buffer);
378 int len = buffer->PacketLen;
379 buffer->PacketLen = 0;
380 buffer->index=0;
381 if((data.size()-len))
382 {
383 return __processData(data.right((data.size()-len)),buffer);
384 }
385 }
386 }
387 return QByteArray();
388 }
389
390 void GR_ESB_Manager::run()
391 {
392 char bufferData[(RMAP_MAX_XFER_SIZE*4)+50];
393 packetBuffer_t buffer={bufferData,0,0,true};
394 //SocExplorerEngine::message(this->plugin,"Starting GRESB pooling thread",1);
395 QByteArray data;
396 while (!this->isInterruptionRequested())
397 {
398 if(this->connected)
399 {
400 //SocExplorerEngine::message(this->plugin,"Looking for new RMAP packets",8);
401 if(!incomingPackets.isEmpty())
402 {
403 incomingPacketsMutex.lock();
404 data.append(incomingPackets.dequeue());
405 this->incomingPacketsMutex.unlock();
406 data = __processData(data,&buffer);
338 }
407 }
339
408
340 }
409 }
341 else
410 else
342 {
411 {
343 //do some sanity checks!
412 //do some sanity checks!
344
345 usleep(RMAPtimeout/2);
413 usleep(RMAPtimeout/2);
346 }
414 }
347 usleep(1000);
415 usleep(1000);
348 }
416 }
349 SocExplorerEngine::message(this->plugin,"Exiting Startdundee USB pooling thread",1);
417 //SocExplorerEngine::message(this->plugin,"Exiting Startdundee USB pooling thread",1);
350 }
418 }
351
419
352 bool GR_ESB_Manager::connectBridge()
420 bool GR_ESB_Manager::connectBridge()
353 {
421 {
354 int timeout=60;
422 int timeout=60;
355 if(this->Read_soc->state()==QTcpSocket::UnconnectedState)
423 this->connected = false;
424 if(this->Read_soc.state()==QTcpSocket::UnconnectedState)
356 {
425 {
357 this->Read_soc->connectToHost(IP,gresb_Conf[virtualLinkIndex].Receive_port);
426 this->Read_soc.connectToHost(IP,gresb_Conf[virtualLinkIndex].Receive_port);
358 this->Read_soc->waitForConnected(30000);
427 this->Read_soc.waitForConnected(30000);
359 }
428 }
360 if(this->Write_soc->state()==QTcpSocket::UnconnectedState)
429 if(this->Write_soc.state()==QTcpSocket::UnconnectedState)
361 {
430 {
362 this->Write_soc->connectToHost(IP,gresb_Conf[virtualLinkIndex].Transmit_port);
431 this->Write_soc.connectToHost(IP,gresb_Conf[virtualLinkIndex].Transmit_port);
363 this->Write_soc->waitForConnected(30000);
432 this->Write_soc.waitForConnected(30000);
364 }
433 }
365 while((this->Read_soc->state()!=QTcpSocket::ConnectedState) && (this->Write_soc->state()!=QTcpSocket::ConnectedState))
434 while((this->Read_soc.state()!=QTcpSocket::ConnectedState) && (this->Write_soc.state()!=QTcpSocket::ConnectedState))
366 {
435 {
367 usleep(100000);
436 usleep(100000);
368 if(timeout--==0)return false;
437 if(timeout--==0)return false;
369 }
438 }
439 this->connected = true;
370 return true;
440 return true;
371
441
372 }
442 }
@@ -374,17 +444,17 bool GR_ESB_Manager::connectBridge()
374 bool GR_ESB_Manager::disconnectBridge()
444 bool GR_ESB_Manager::disconnectBridge()
375 {
445 {
376 int timeout=60;
446 int timeout=60;
377 if(this->Read_soc->state()!=QTcpSocket::UnconnectedState)
447 if(this->Read_soc.state()!=QTcpSocket::UnconnectedState)
378 {
448 {
379 this->Read_soc->disconnectFromHost();
449 this->Read_soc.disconnectFromHost();
380 this->Read_soc->waitForDisconnected(30000);
450 this->Read_soc.waitForDisconnected(30000);
381 }
451 }
382 if(this->Write_soc->state()!=QTcpSocket::UnconnectedState)
452 if(this->Write_soc.state()!=QTcpSocket::UnconnectedState)
383 {
453 {
384 this->Write_soc->disconnectFromHost();
454 this->Write_soc.disconnectFromHost();
385 this->Write_soc->waitForDisconnected(30000);
455 this->Write_soc.waitForDisconnected(30000);
386 }
456 }
387 while((this->Read_soc->state()!=QTcpSocket::UnconnectedState) && (this->Write_soc->state()!=QTcpSocket::UnconnectedState))
457 while((this->Read_soc.state()!=QTcpSocket::UnconnectedState) && (this->Write_soc.state()!=QTcpSocket::UnconnectedState))
388 {
458 {
389 usleep(100000);
459 usleep(100000);
390 if(timeout--==0)return false;
460 if(timeout--==0)return false;
@@ -398,7 +468,7 bool GR_ESB_Manager::sendPacket(char *pa
398 bool result = false;
468 bool result = false;
399 char protocoleIdentifier;
469 char protocoleIdentifier;
400 SocExplorerEngine::message(this->plugin,"Sending SPW packet",2);
470 SocExplorerEngine::message(this->plugin,"Sending SPW packet",2);
401 if(Q_UNLIKELY(this->Write_soc->state()!=QAbstractSocket::ConnectedState))
471 if(Q_UNLIKELY(this->Write_soc.state()!=QAbstractSocket::ConnectedState))
402 {
472 {
403 SocExplorerEngine::message(this->plugin,"Socket closed",2);
473 SocExplorerEngine::message(this->plugin,"Socket closed",2);
404 //TODO handle disconnection
474 //TODO handle disconnection
@@ -413,7 +483,8 bool GR_ESB_Manager::sendPacket(char *pa
413 SPWpacket[3]=size & 0x0FF;
483 SPWpacket[3]=size & 0x0FF;
414 }
484 }
415 this->handleMutex->lock();
485 this->handleMutex->lock();
416 result = ((size+4) == this->Write_soc->write(SPWpacket,size+4));
486 result = ((size+4) == this->Write_soc.write(SPWpacket,size+4));
487 this->Write_soc.flush();
417 this->handleMutex->unlock();
488 this->handleMutex->unlock();
418 if (Q_UNLIKELY(!result))
489 if (Q_UNLIKELY(!result))
419 {
490 {
@@ -432,3 +503,10 bool GR_ESB_Manager::sendPacket(char *pa
432 return true;
503 return true;
433 }
504 }
434
505
506 void GR_ESB_Manager::readyRead()
507 {
508 incomingPacketsMutex.lock();
509 incomingPackets.append(Read_soc.readAll());
510 incomingPacketsMutex.unlock();
511 }
512
@@ -5,6 +5,7
5 #include <QThread>
5 #include <QThread>
6 #include <QMutex>
6 #include <QMutex>
7 #include <QSemaphore>
7 #include <QSemaphore>
8 #include <QQueue>
8
9
9 struct gresb_Conf_str
10 struct gresb_Conf_str
10 {
11 {
@@ -27,6 +28,15 const struct gresb_Conf_str gresb_Conf[]
27 class GR_ESB_Manager: public abstractSpwManager
28 class GR_ESB_Manager: public abstractSpwManager
28 {
29 {
29 Q_OBJECT
30 Q_OBJECT
31
32 typedef struct packetBuffer_t
33 {
34 char* buffer;
35 int PacketLen;
36 int index;
37 bool complete;
38 }packetBuffer_t;
39
30 public:
40 public:
31 explicit GR_ESB_Manager(socexplorerplugin *plugin = 0,QObject* parent=0);
41 explicit GR_ESB_Manager(socexplorerplugin *plugin = 0,QObject* parent=0);
32 ~GR_ESB_Manager();
42 ~GR_ESB_Manager();
@@ -41,9 +51,16 private:
41 void pushRmapPacket(char* packet,int len);
51 void pushRmapPacket(char* packet,int len);
42 char* SPWPacketBuff;
52 char* SPWPacketBuff;
43
53
54 void __processPacket(packetBuffer_t *packet);
55 QByteArray __processData(const QByteArray &data, packetBuffer_t* buffer);
56 private slots:
57 void readyRead();
58
44 public:
59 public:
45 QTcpSocket* Read_soc;
60 QTcpSocket Read_soc;
46 QTcpSocket* Write_soc;
61 QTcpSocket Write_soc;
62 QMutex incomingPacketsMutex;
63 QQueue<QByteArray> incomingPackets;
47 QString IP;
64 QString IP;
48 int virtualLinkIndex;
65 int virtualLinkIndex;
49 };
66 };
@@ -109,6 +109,7 int abstractSpwManager::getRMAPanswer(in
109 *buffer=NULL;
109 *buffer=NULL;
110 int count=0;
110 int count=0;
111 SocExplorerEngine::message(this->plugin,"Looking for RMAP answer",2);
111 SocExplorerEngine::message(this->plugin,"Looking for RMAP answer",2);
112 qApp->processEvents();
112 timeout.start();
113 timeout.start();
113 while (*buffer==NULL)
114 while (*buffer==NULL)
114 {
115 {
@@ -151,6 +152,7 int abstractSpwManager::getRMAPanswer(in
151 return -1;
152 return -1;
152 }
153 }
153 usleep(1000);
154 usleep(1000);
155 qApp->processEvents();
154 }
156 }
155 this->RMAP_AnswersSem->acquire();
157 this->RMAP_AnswersSem->acquire();
156 }
158 }
General Comments 0
You need to be logged in to leave comments. Login now