Blockieren Signals/Slots Threads?
Blockieren Signals/Slots Threads?
Hi,
Sachlage:
Ich habe drei Threads, der eine ist der Datensammler, die anderen die Konsumenten.
Wenn der Sammler die Daten bereit hält schiebt er den Konsumenten mit kosument->getData() die Daten und informiert sie mit indiv. Signalen. dass daten anliegen.
So wie ich das verstanden habe sind beide in dem Moment geblockt wenn das Signal rausgeschickt wird.
Der Konsument, kann gerade nicht konsumieren, da Signal eingetroffen ist.
Der Datensammler kann keine weiteren Daten sammeln, solange das "emit"/SLOT von Konusment noch nciht abgearbeitet wurde.
Das was ich machen möchte ist foglendes:
Beide sollen ihre eigentliche Arbeit weitermachen, auch wenn Signals/Slots auftreten...
Geht das überhaupt?
Mein Absicht war es parallelität in die Sache reinzubringen...
- Sammler hält daten bereit..schickt sie an Konsument 1
- Slot von Konsument wird abgearbeitet
- währendessen liegen daten vor für konsument 2
- der sammler sollte nun nicht solange warten bis konusmet1 seinen slot fertig abgearbeitet hat. sondern direkt den konsumenten 2 versorgen
Wie löse ich das am besten?
Sachlage:
Ich habe drei Threads, der eine ist der Datensammler, die anderen die Konsumenten.
Wenn der Sammler die Daten bereit hält schiebt er den Konsumenten mit kosument->getData() die Daten und informiert sie mit indiv. Signalen. dass daten anliegen.
So wie ich das verstanden habe sind beide in dem Moment geblockt wenn das Signal rausgeschickt wird.
Der Konsument, kann gerade nicht konsumieren, da Signal eingetroffen ist.
Der Datensammler kann keine weiteren Daten sammeln, solange das "emit"/SLOT von Konusment noch nciht abgearbeitet wurde.
Das was ich machen möchte ist foglendes:
Beide sollen ihre eigentliche Arbeit weitermachen, auch wenn Signals/Slots auftreten...
Geht das überhaupt?
Mein Absicht war es parallelität in die Sache reinzubringen...
- Sammler hält daten bereit..schickt sie an Konsument 1
- Slot von Konsument wird abgearbeitet
- währendessen liegen daten vor für konsument 2
- der sammler sollte nun nicht solange warten bis konusmet1 seinen slot fertig abgearbeitet hat. sondern direkt den konsumenten 2 versorgen
Wie löse ich das am besten?
-
CaptnChaos
- Beiträge: 605
- Registriert: 28. Juni 2007 15:01
- Kontaktdaten:
in den slots wollte ich kaum etwas machen, also recht minimal halten.
Sagen wir mal ich übertrage meine Daten via Signal und im Slot speichere ich sie lokal z.B. in ein Array.
Jetzt muss ja mein Thread X, der die Daten bekommen hat informiert werden "Es liegen Daten vor"
Dafür müsste ich im SLOT von Thread X ein weiteres Signal auslösen
um Thread X zu informieren, dass Daten da sind.
Denn das erste Signal war nur dazu da um die Daten zu übertragen.
ch will nicht pollen!)
Das wiederum heisst für mich:
- Sammler-Thread schickt Signal 1
- Thread X bekommt Signal 1 und Slot wird ausgeführt
- Im Slot wird ein weiteres Signal ausgelöst um Thread X zu sagen, dass in seinem Array Daten sind
- So würde nun Sammler-Thread solange warten bis der letzte SLOT beendet wurde
Da zerbreche ich mir die Gedanken...
Sagen wir mal ich übertrage meine Daten via Signal und im Slot speichere ich sie lokal z.B. in ein Array.
Jetzt muss ja mein Thread X, der die Daten bekommen hat informiert werden "Es liegen Daten vor"
Dafür müsste ich im SLOT von Thread X ein weiteres Signal auslösen
um Thread X zu informieren, dass Daten da sind.
Denn das erste Signal war nur dazu da um die Daten zu übertragen.
Das wiederum heisst für mich:
- Sammler-Thread schickt Signal 1
- Thread X bekommt Signal 1 und Slot wird ausgeführt
- Im Slot wird ein weiteres Signal ausgelöst um Thread X zu sagen, dass in seinem Array Daten sind
- So würde nun Sammler-Thread solange warten bis der letzte SLOT beendet wurde
Da zerbreche ich mir die Gedanken...
ok das mit dem Flag ist logisch.
Für die Signal/Slot Verbindungen, abgehend oder einkommend ,brauche ich exec().
D.h. in meiner run() kann ich schlecht eine Endlos-Schleife laufen lassen, weder vor exec oder nach exec.
Eine Empfehlung wie ich / wo ich mein Flag überprüfen soll?
Code: Alles auswählen
run(){
exec();
}
D.h. in meiner run() kann ich schlecht eine Endlos-Schleife laufen lassen, weder vor exec oder nach exec.
Eine Empfehlung wie ich / wo ich mein Flag überprüfen soll?
-
CaptnChaos
- Beiträge: 605
- Registriert: 28. Juni 2007 15:01
- Kontaktdaten:
sicher. dafür ist ein thread da. exec ist nichts anderes als eine endlosschleife...
Code: Alles auswählen
run(){
while(!stopMe)
{
if(flag == true)
{
daten_verarbeiten(myStoredData);
}
}
exec();
} Für dein Problem würde ich dir empfehlen QSemaphores zu verwenden, damit bleiben die Threads so lange gestoppt, bis Daten vorhanden sind.
Siehe "Sempahores Example".
Der Sammler könnte dann die Daten (am besten Pointer oder Objekte die sehr schnell kopiert werden können, z.B. QStrings, da sich das kopierte Objekt die Daten mit dem alten teilt) in einen Array schreiben (Die Arrayklassen von Qt funktioneren auch!), den du mithilfe eines Mutexs beim Schreibzugriff schützen musst, da du zwei Arbeitsthreads hast.
Zudem musst die noch, wie bereits vorhanden, eine Variable verwenden, auf die alle Threads Zugriff haben, damit der Auslesethread, den Arbeitern mit teilen kann, dass keine neuen Daten eingelesen werden.
Wenn der Auslesethread fertig ist muss er noch warten bis die Arbeitsthread fertig sind. Bei mehreren Threads kann hier das Problem auftreten, dass ein Arbeitsthread bei einem(einer ?) Semaphore wartet und der Hauptthread auschließen, ebenfalls auf diesen wartet. Dies würde zu einem Deadlock führen, was nur vermieden werden kann, indem du den Semaphore nur max 10 Millisekunden (oder weniger) warten lässt und dann überprüfst, ob der Auslesethread fertig ist, ansonsten sollte die Schleife wieder starten.
Da der Semaphore nur blockiert, wenn keine Daten vorhanden sind, bedeutet dies, dass der Thread nur beendet wird wenn keine Daten mehr vorhanden sind und alle Daten eingelesen sind.
Nach diesem riesen Block Text noch einen (noch längerer
)Codeschnipsel von mir
Ich hoffe, dass das mein Codeschnipsel verständlich ist.
Damit die Konsumenten bei der Methode in deinem ersten Post jeweils in einem eigenen Thread laufen, darfst du sie erst in run() erstellen, da sie ansonsten alle im selben Thread laufen.
Die Signale von Qt werden standardmäßig direkt ausgeführt, wenn das Zielobjekt im selben Thread läuft wie der Aufrufer, wenn beide in einem unterschiedlichen Thread laufen, werden die Aufrufe in eine Queue eingetragen und beim nächsten Durchgang innerhalb von exec() verarbeitet. Wenn du die Threads mit den Slots steuern willst, musst die irgendwie eine Methode finden, die dir klar sagt, dass die Threads fertig sind. Wenn du die Threads über Signale mit Daten versorgst werden die Daten erst gespeichert, nachdem der vorherige Arbeitsvorgang beendet ist.
Wenn die Queue z.B. folgendes enthält:
Daten setzen
Daten verarbeiten
Daten setzen
Daten verarbeiten
werden die einzelnen Einträge nach einander verarbeitet!
Was bedeutet, dass du nach dem Datenverarbeiten nicht sagen kannst, dass die Threads fertig sind.
Alternativ kannst du die auch mal QFuture anschauen, was aber soweit ich es mir angeschaut habe, bereits die zu verarbeitetenden Daten benötigt.
edit:
noch was zu deinem letzten Post:
wenn exec läuft, kommst du beim vorherigen Code tatsächlich nicht mehr zurück in die Schleife, sondern kannst nur den Thread beenden.
Siehe "Sempahores Example".
Der Sammler könnte dann die Daten (am besten Pointer oder Objekte die sehr schnell kopiert werden können, z.B. QStrings, da sich das kopierte Objekt die Daten mit dem alten teilt) in einen Array schreiben (Die Arrayklassen von Qt funktioneren auch!), den du mithilfe eines Mutexs beim Schreibzugriff schützen musst, da du zwei Arbeitsthreads hast.
Zudem musst die noch, wie bereits vorhanden, eine Variable verwenden, auf die alle Threads Zugriff haben, damit der Auslesethread, den Arbeitern mit teilen kann, dass keine neuen Daten eingelesen werden.
Wenn der Auslesethread fertig ist muss er noch warten bis die Arbeitsthread fertig sind. Bei mehreren Threads kann hier das Problem auftreten, dass ein Arbeitsthread bei einem(einer ?) Semaphore wartet und der Hauptthread auschließen, ebenfalls auf diesen wartet. Dies würde zu einem Deadlock führen, was nur vermieden werden kann, indem du den Semaphore nur max 10 Millisekunden (oder weniger) warten lässt und dann überprüfst, ob der Auslesethread fertig ist, ansonsten sollte die Schleife wieder starten.
Da der Semaphore nur blockiert, wenn keine Daten vorhanden sind, bedeutet dies, dass der Thread nur beendet wird wenn keine Daten mehr vorhanden sind und alle Daten eingelesen sind.
Nach diesem riesen Block Text noch einen (noch längerer
Code: Alles auswählen
QByteArray Class::threadedWork() {
QByteArray ret;
QHash<qint64, QByteArray> retData; //Hash für die zurückzugebenden Daten
QHash<qint64, QByteArray> data; //Hash für die zu verarbeitenden Daten
//qint64 ist hier eine Indexziffer, damit die Daten in der richtigen Reihenfolge zusammengesetzt werden
//kann weggelassen werden, wenn die Daten in data in der richtigen Reihenfolge sind, z.B. QQueue<QByteArray> statt QHash<qint64, QByteArray>
QMutex dataMutex; //Mutex für data
QMutex retDataMutex; //Mutex für retData
int maxData = 4; //maximale Anzahl an erlaubten Datenblöcken
QSemaphore freeData(maxData); //Semaphore, die die freien Datenblöcke verwaltet
QSemaphore usedData; //Semaphore, die die benutzten Datenblöcke verwaltet
bool finished = false;
QList<HashThread*> threads; //Liste der Threads
for(qint64 i = 0; i < blockCount(); ++i) { //solange Daten lesen wie vorhanden
if (freeData.available() == 0 && threads.size() < QThread::idealThreadCount()) { //wenn keine freien Datenblöcke vorhanden sind und und mehre Prozessoren vorhanden sind als Threads, dann einen neuen Thread erzeugen
HashThread* thread = new HashThread(&data, &dataMutex, &retData, &retDataMutex, &freeData, &usedData, &finished);
thread->start(); //Thread initialisieren und starten
threads.append(thread);
if (threads.count() * 2 + 2 > maxData) {
freeData.release(2); //erlaube dem einlese Thread zwei weiter Datenblöcke zu benutzen
maxData += 2; //nur Cache vergrößern, da zum verkleinern garantiert werden müsste, dass die Daten nicht verwendet werden
}
}
freeData.acquire(); //Auf freien Datenblock warten
QByteArray tmp = readBlock(i); //Block lesen
dataMutex.lock(); //erst dann(!) den Mutex benutzen und die Daten schreiben
data[i] = tmp; //verbraucht zwar etwas mehr Arbeitsspeicher, verhindert aber, dass die Parallelisierung durch den Mutex verhindert wird
dataMutex.unlock();
usedData.release(); //Einen belegten Datenblock freigeben
}
finished = true; //fertig setzen
for (int i = 0; i < threads.size(); ++i) //warten bis alle Threads fertig sind
if (threads[i]->isRunning())
threads[i]->wait();
//Daten zusammensetzen oder irgendwas machen
}
class HashThread : public QThread {
public:
HashThread(QHash<qint64, QByteArray>* data, QMutex* dataMutex, QHash<qint64, QByteArray>* retData, QMutex* retDataMutex, QSemaphore* freeData, QSemaphore* usedData, bool* finished);
protected:
void run();
QHash<qint64, QByteArray>* pdata;
QMutex* pdataMutex;
QHash<qint64, QByteArray>* pretData;
QMutex* pretDataMutex;
QSemaphore* pfreeData;
QSemaphore* pusedData;
bool* pfinished;
};
HashThread::HashThread(QHash<qint64, QByteArray>* data, QMutex* dataMutex, QHash<qint64, QByteArray>* retData, QMutex* retDataMutex,
QSemaphore* freeData, QSemaphore* usedData, bool* finished)
: QThread(0), pdata(data), pdataMutex(dataMutex), pretData(retData), pretDataMutex(retDataMutex), pfreeData(freeData),
pusedData(usedData), pfinished(finished) { }
void HashThread::run() {
while(!(*pfinished && pusedData->available() == 0)) { //solange in der Schleife bleiben bis keine Daten mehr verarbeitet werden müssen und der Einlesethread fertig ist
if (!pusedData->tryAcquire(1, 10))
if (*pfinished)
return;
else
continue;
pdataMutex->lock(); //Daten lesen und zwischenspeichern um mit dem Mutex nicht alles zu blockieren
qint64 index = pdata->keys()[0];
QByteArray data = (*pdata)[index];
pdata->remove(index);
pdataMutex->unlock();
pfreeData->release(); //Quelldatenblock freigeben
QByteArray tmp = rechenIntensiveOperation(data);
pretDataMutex->lock();
(*pretData)[index] = tmp; //Rückgabedaten schreiben
pretDataMutex->unlock();
}
}
Damit die Konsumenten bei der Methode in deinem ersten Post jeweils in einem eigenen Thread laufen, darfst du sie erst in run() erstellen, da sie ansonsten alle im selben Thread laufen.
Die Signale von Qt werden standardmäßig direkt ausgeführt, wenn das Zielobjekt im selben Thread läuft wie der Aufrufer, wenn beide in einem unterschiedlichen Thread laufen, werden die Aufrufe in eine Queue eingetragen und beim nächsten Durchgang innerhalb von exec() verarbeitet. Wenn du die Threads mit den Slots steuern willst, musst die irgendwie eine Methode finden, die dir klar sagt, dass die Threads fertig sind. Wenn du die Threads über Signale mit Daten versorgst werden die Daten erst gespeichert, nachdem der vorherige Arbeitsvorgang beendet ist.
Wenn die Queue z.B. folgendes enthält:
Daten setzen
Daten verarbeiten
Daten setzen
Daten verarbeiten
werden die einzelnen Einträge nach einander verarbeitet!
Was bedeutet, dass du nach dem Datenverarbeiten nicht sagen kannst, dass die Threads fertig sind.
Alternativ kannst du die auch mal QFuture anschauen, was aber soweit ich es mir angeschaut habe, bereits die zu verarbeitetenden Daten benötigt.
edit:
noch was zu deinem letzten Post:
wenn exec läuft, kommst du beim vorherigen Code tatsächlich nicht mehr zurück in die Schleife, sondern kannst nur den Thread beenden.
Danke für deine recht ausführliche Beschreibung.
Jedoch hilft mir das nicht so ganz, denn ich brauche auch exec().
- Signale müssen in die Threads rein und raus. (daher exec())
- run darf nicht beendet werden, Threads sollen nur pausiert werden und später weiterlaufen (das pausieren eventuell mit semaphore oder einfach ein bool Flag für eine Schleife)
-
Irgendwie scheint es dafür keine richtige Lösung zu geben.
Werde mir da noch etwas Gedanken machen....

Jedoch hilft mir das nicht so ganz, denn ich brauche auch exec().
- Signale müssen in die Threads rein und raus. (daher exec())
- run darf nicht beendet werden, Threads sollen nur pausiert werden und später weiterlaufen (das pausieren eventuell mit semaphore oder einfach ein bool Flag für eine Schleife)
-
Irgendwie scheint es dafür keine richtige Lösung zu geben.
Werde mir da noch etwas Gedanken machen....
-
CaptnChaos
- Beiträge: 605
- Registriert: 28. Juni 2007 15:01
- Kontaktdaten:
Schon klar, dass es ne endlosschleife ist.
Ich brauche aber meine eigene Endlosschleife !?!?
Oder wo kann ich exec sagen, dass es in gewissen Abständen ein Variable überprüfen soll und do_sth() ausführen soll.
edit:
der thread bekommt Daten zugeschickt,..überprüft von mir aus ein Flag und macht was mit diesen Daten.
Wo soll ich deiner Meinung nach dieses überprüfen des Flags realisieren und zusätzlich noch exec aufrufen?
stupides Beispiel:
flag wird durch ein signal von aussen gesetzt( wenn daten vorliegen).
So erreiche ich nie exec()...macht also keinen Sinn!
Ich brauche aber meine eigene Endlosschleife !?!?
Oder wo kann ich exec sagen, dass es in gewissen Abständen ein Variable überprüfen soll und do_sth() ausführen soll.
edit:
der thread bekommt Daten zugeschickt,..überprüft von mir aus ein Flag und macht was mit diesen Daten.
Wo soll ich deiner Meinung nach dieses überprüfen des Flags realisieren und zusätzlich noch exec aufrufen?
stupides Beispiel:
flag wird durch ein signal von aussen gesetzt( wenn daten vorliegen).
So erreiche ich nie exec()...macht also keinen Sinn!
Code: Alles auswählen
run() {
while(1){
if(flag) //flag wird von ausserhalb gesetzt, damit daten verarbeitet werden können
do_sth(); //verarbeitet daten
}
exec(); // das brauche für signals/slots
}
ich glaub ich habs: (noch nicht getestet)
processEvents
Ich glaub es reicht aus wenn ich statt exec(), regelmäig processEvents() aufrufe. Was meint ihr?
- So würde meine eigene Schleife mein FLAG überprüfen und die Daten verarbeiten
- Signals/Slots Verarbeitung in beiden Richtungen dürfte so auch funktionieren
- Thread bleibt am leben, solange ich will!
processEvents
Code: Alles auswählen
run()
{
while(1) {
if(flag)
do_sth();
QEventloop::processEvents()
}
}
Ich glaub es reicht aus wenn ich statt exec(), regelmäig processEvents() aufrufe. Was meint ihr?
- So würde meine eigene Schleife mein FLAG überprüfen und die Daten verarbeiten
- Signals/Slots Verarbeitung in beiden Richtungen dürfte so auch funktionieren
- Thread bleibt am leben, solange ich will!
-
CaptnChaos
- Beiträge: 605
- Registriert: 28. Juni 2007 15:01
- Kontaktdaten:
Du weisst schon das signale und slots callbacks sind, also nicht von der event loop abhängig? und ein QThread hat keine events.
Hier mal ein auszug vom Mandelbrot example:
Hier mal ein auszug vom Mandelbrot example:
Code: Alles auswählen
void RenderThread::run()
{
forever {
mutex.lock();
QSize resultSize = this->resultSize;
double scaleFactor = this->scaleFactor;
double centerX = this->centerX;
double centerY = this->centerY;
mutex.unlock();
int halfWidth = resultSize.width() / 2;
int halfHeight = resultSize.height() / 2;
QImage image(resultSize, QImage::Format_RGB32);
const int NumPasses = 8;
int pass = 0;
while (pass < NumPasses) {
const int MaxIterations = (1 << (2 * pass + 6)) + 32;
const int Limit = 4;
bool allBlack = true;
for (int y = -halfHeight; y < halfHeight; ++y) {
if (restart)
break;
if (abort)
return;
uint *scanLine =
reinterpret_cast<uint *>(image.scanLine(y + halfHeight));
double ay = centerY + (y * scaleFactor);
for (int x = -halfWidth; x < halfWidth; ++x) {
double ax = centerX + (x * scaleFactor);
double a1 = ax;
double b1 = ay;
int numIterations = 0;
do {
++numIterations;
double a2 = (a1 * a1) - (b1 * b1) + ax;
double b2 = (2 * a1 * b1) + ay;
if ((a2 * a2) + (b2 * b2) > Limit)
break;
++numIterations;
a1 = (a2 * a2) - (b2 * b2) + ax;
b1 = (2 * a2 * b2) + ay;
if ((a1 * a1) + (b1 * b1) > Limit)
break;
} while (numIterations < MaxIterations);
if (numIterations < MaxIterations) {
*scanLine++ = colormap[numIterations % ColormapSize];
allBlack = false;
} else {
*scanLine++ = qRgb(0, 0, 0);
}
}
}
if (allBlack && pass == 0) {
pass = 4;
} else {
if (!restart)
emit renderedImage(image, scaleFactor);
++pass;
}
}
mutex.lock();
if (!restart)
condition.wait(&mutex);
restart = false;
mutex.unlock();
}
}
Im Mandelbrot-Beispiel wird nicht auf eingehende Signals eingegangen.
Es wird nur im eigenen Loop was berechnet....das reicht mir aber nicht.
vielleicht sollte ich es mal so erklären,, wird eventuell klar was ich machen möchte.
Akteure:(Threads)
Sammler (Producer)
Consumer0
Consumer1
ConsumerN-1
GUI
Problemstellung:
Sammler arbeitet auf Ebene 0 und sammelt ununterbrochen Daten, diese Daten haben auf dieser Ebene keine Logik können also noch nicht verarbeitet werden.
Daher sollen sie an die richtigen Consumer weitergereicht werden. Der Sammler verteilt die Pakete an die richtigen Consumer (Feste Zuordnung vorhanden, Paket-Typ0 kann nur an Consumer0 usw...)
Die Verteilung an die Consumer soll über Signal-Slots erfolgen.
Die Consumer-Threads werden im Producer-Thread dynamisch erzeugt.
Nach der Verteilung der Daten an den ConsumerN sammelt Producer weiter.
Der ConsumerN bemerkt, dass Daten anliegen. (SLOT wurde aufgerufen, also die Daten sind angekommen)
Er soll nun anfangen diese Daten zu verarbeiten und die Logik, also die höhere Ebene 1, zu interpretieren.
Während ConsumerN aus den Daten die Ebene 1 interpretiert können währendessen vom Sammler neue Daten gesammelt werden.
Diese können wiederum an andere oder den gleichen ConsumerN weitergereicht werden.
Informationen über Veränderungen an der GUI sollen meine Consumer via Signals mitbekommen.
Da die Consumer ( nicht-GUI-Threads) an der GUI nichts verändern können muss ich mir hierfür noch etwas einfallen lassen.
Die empfangenen Daten (Logik Ebene 1) sollen auf der GUI visualisiert werden.
Wie würdet ihr es realiserien? Vielleicht ist mein Ansatz komplett daneben.
Wichtig:
- Consumer-Threads dürfen nicht unnötig sterben
- Signals müssen in die Consumer reinkommen auch rausgehen ( z.B. Rückantwort von Consumer an Producer oder Signal von Consumer für GUI-Veränderung)
Es wird nur im eigenen Loop was berechnet....das reicht mir aber nicht.
vielleicht sollte ich es mal so erklären,, wird eventuell klar was ich machen möchte.
Akteure:(Threads)
Sammler (Producer)
Consumer0
Consumer1
ConsumerN-1
GUI
Problemstellung:
Sammler arbeitet auf Ebene 0 und sammelt ununterbrochen Daten, diese Daten haben auf dieser Ebene keine Logik können also noch nicht verarbeitet werden.
Daher sollen sie an die richtigen Consumer weitergereicht werden. Der Sammler verteilt die Pakete an die richtigen Consumer (Feste Zuordnung vorhanden, Paket-Typ0 kann nur an Consumer0 usw...)
Die Verteilung an die Consumer soll über Signal-Slots erfolgen.
Die Consumer-Threads werden im Producer-Thread dynamisch erzeugt.
Nach der Verteilung der Daten an den ConsumerN sammelt Producer weiter.
Der ConsumerN bemerkt, dass Daten anliegen. (SLOT wurde aufgerufen, also die Daten sind angekommen)
Er soll nun anfangen diese Daten zu verarbeiten und die Logik, also die höhere Ebene 1, zu interpretieren.
Während ConsumerN aus den Daten die Ebene 1 interpretiert können währendessen vom Sammler neue Daten gesammelt werden.
Diese können wiederum an andere oder den gleichen ConsumerN weitergereicht werden.
Informationen über Veränderungen an der GUI sollen meine Consumer via Signals mitbekommen.
Da die Consumer ( nicht-GUI-Threads) an der GUI nichts verändern können muss ich mir hierfür noch etwas einfallen lassen.
Die empfangenen Daten (Logik Ebene 1) sollen auf der GUI visualisiert werden.
Wie würdet ihr es realiserien? Vielleicht ist mein Ansatz komplett daneben.
Wichtig:
- Consumer-Threads dürfen nicht unnötig sterben
- Signals müssen in die Consumer reinkommen auch rausgehen ( z.B. Rückantwort von Consumer an Producer oder Signal von Consumer für GUI-Veränderung)
Wenn du den Signal-Slot-Mechanismus von Qt als Queued-connection verwendest, dann wird das intern als Event verarbeitet, der zum aktuellen Thread hinzugefügt wird. Wenn du dann den Eventloop durchlaufen lässt, werden die Signale verteilt.
Zuerst mal ein paar Tips wie ich es implementieren würde und dann noch ein Vergleich zwischen der Methode mit Signals und Slots und der von mir vorgeschlagenen mit Semaphores.
Die Consumer dürfen erst innerhalb eines QThreads erstellt werden.
Du brauchst eigentlich keine Flag um irgendeinen Befehl im Thread aufzurufen, es sollte reichen mit Signals den Consumer aufzurufen. Wenn jetzt der Producer per Signal dem Consumer die Daten gibt, werden diese abgearbeitet und wenn der Consumer gerade arbeitet, werden sie eben zur Queue hinzugefügt.
Das bedeutet, dass wenn die GUI oder irgendwer anderes dem Consumer irgendwas per Signal mitteilt, dann wird dies erst bearbeitet, nachdem die vorherige Arbeit fertig ist.
Zum überprüfen, ob noch Signals in der Warteschleife sind, kannst du bei Queued-Connections überprüfen, ob noch Events abgearbeitet werden müssen, da die Signale für Queued-Connections als QMetaCallEvents in die Eventschleife hinzugefügt werden.
Damit kannst du am Ende überprüfen, ob die Threads noch Arbeit zu verrichten haben und dann solange warten bis sie fertig sind.
Von den Consumern aus kannst du eigentlich ohne Probleme direkt auf die GUI, entweder über nen Pointer(Mutex nötig!!) oder per Signal, zugreifen.
Die Threads würde ich in einer Liste, eventuell nach Typ geordnet, aufbewahren. Wenn du dynamisch, je nach vorhandener Arbeit mehrere Consumer vom selben Typ erstellen willst, dann musst du mitzählen, wie viel Arbeit die einzelnen Threadtypen haben und ggf. einen weiteren Thread erstellen. Allerdings solltest du die Threadanzahl pro Typ auf QThread::idealThreadCount() beschränken, ansonsten könnten bei hoher CPU-Auslastung unnötig viele Threads erzeugt werden, die dann nur bremsen. Die einmal gestarteten Threads kannst du einfach warten lassen und dann wieder benutzen oder nach einer längeren Zeit, in der sie nichts getan haben beenden.
Damit der Producer-Thread auch die Signals verarbeitet, muss er immer wieder, am besten nach jedem Datenblock, den Eventloop durchlaufen lassen. Allerdings können die dadurch gestarteten Slots nur Variablen innerhalb der Klasse verändern, was heißt, dass sie nur indirekt die Arbeit vom Producer verändern können.
Beim Verarbeiten der Daten musst du aufpassen, dass wenn du QObjects verwendest, die Parents für diese im selben Thread sein müssen, ansonsten weigern sich diese Objekte zu funktionieren, was mir schon einige Probleme verursacht hat.
Dein Ansatz sollte (eigentlich ?) funktionieren.
Da immer wieder der Eventloop gestartet wird werden auch alle Anweisungen von der GUI irgendwann verarbeitet.
Als Ablauf hättest du dann so was wie:
Producer --> Daten lesen, Befehl irgendwie zwischenspeichern, bis Consumer fertig
Consumer0 starten --> Producer Initialisierung melden
Producer --> Anweisung an Consumer0 senden --> Consumer0 arbeitet
Producer liest weitere Daten --> wie an Consumer0 zuteilen --> Befehl muss warten
Consumer0 berechnet ersten Befehl fertig, Rückmeldung an Producer --> zweiten Befehl abarbeiten
Producer verarbeitet Rückmeldung --> ...
Producer ist fertig --> warten bis alle Daten verarbeitet sind --> Threads beenden
Meinen Beispielcode aus dem vorherigen Post, kannst du ebenfalls so anpassen, dass er deine Wünsche erfüllt, dazu müsstest du nur die QSemaphores in einer Liste sammeln und den entsprechenden an den passende Consumer weiterleiten.
Jetzt noch eine Liste mit den Vor- und Nachteilen der beiden Methoden (keine Garantie für Vollständigkeit
):
Signal-Slot-Synchronisierung:
+/-benötigt QObject
-wenn die Daten auch QObjects verwenden, sind die an den Thread gebunden
-Signale werden in fester Reihenfolge ausgeführt, keine Priorisierung möglich (ist wahrscheinlich nicht notwendig)
(wenn die Befehle priorisiert werden können sollen, dann musst du die Daten zwischenspeichern und dann gewählt ausführen, was dann ca ähnlich komplex wird wie mit QSemaphore, allerdings geht dann der Hauptvorteil der Signals verloren, dass Qt selbst die gewünschten Funktionen aufruft)
-Befehle müssen beim Start eines Threads extra zwischengespeichert werden
+Funktionen können direkt aufgerufen werden, ohne eigenen Zusatzcode
+Synchronisierung wird größtenteils von Qt selbst gehandhabt
-Befehle müssen vom eigenen Code auf die Consumer verteilt werden --> sehr schwierig, wenn z.B. 2 gleiche Consumer vorhanden sind (evtl. mit zusätzlichem Synchronisierungsthread zu implementieren --> Synchronisierungsthread bekommt Befehle und vergibt sie an die entsprechenden Consumer. Der Thread sendet immer nur einen Befehl an den Consumer und verteilt den nächsten, wenn dieser fertig ist. Der Producer sollte nicht dazu benutzt werden die Daten zu verteilen, da ansonsten zu große Wartezeiten entstehen, hierbei gibt es wieder das selbe Problem, wie beim Priorisierungsproblem angemerkt. Auch solltest du den Synchronisierungsthread dann selbst das Starten weiterer Threads durchführen lassen)
QSemaphore-Synchronisierung (es sollte ein Semaphore reichen, der die Anzahl der verfügbaren Befehle als Wert hat):
+/-verwendet/benötigt kein QObject
+Befehle lassen sich in beliebiger Reihenfolge ausführen
-Funktionen können nur indirekt aufgerufen werden (am besten Codenummern für die einzelnen Funktionen verteilen, wie beim moc-Code)
+Keine Zwischenspeicherung beim Threadstart notwendig
-schwieriger zu synchronisieren --> viel Code und viele Mutexes notwendig, zudem muss man aufpassen, dass der Thread nicht durch den Semaphore blockiert wird (hier wäre es nützlich, wenn man direkt am Thread feststellen könnte, ob ein Thread pausiert oder nicht; dazu am besten selbst eine Flag setzen die anzeigt, ob gewartet wird oder nicht, dann kann der Producer dies ähnlich wie bei den Signals und Slots überprüfen und den Thread ggf. beenden)
+verteilt selbstständig die Arbeitslast --> skaliert besser als mit Signals
-->Für einfache Synchronisierungsarbeiten sind Signals die einfachere Lösung, wenn es allerdings komplizierter wird, würde ich Semaphores verwenden.
Beide Methoden zu mischen ist meiner Meinung nach sinnlos, da die Semaphores den Thread pausieren, bis neue Befehle vorhanden sind und zum anderen die Signals beim informieren über neue Befehle ohnehin langsamer wären, denn die Semaphores werden praktisch sofort entsperrt.
Mit Semaphores kannst du ebenfalls einen Callback machen. Am besten die Callbacks in eine Liste im Producer schreiben, die dieser dann immer wieder prüft. Dies funktioniert fast gleich wie wenn du Signals verwendest, nur musst du wie in den Threads, das Aufrufen der richtigen Funktion selbst übernehmen. Bei den Signals lässt der Producer immer wieder den Eventloop abarbeiten, der hier durch die Liste ersetzt ist. Beim selbstgeschriebenen Callback kannst du auch die Variablen direkt innerhalb der Funktion verändern, was evtl. nützlich sein könnte.
So das wars erst mal. Ich hoffe mal, dass mein Roman
verständlich ist. Wenn jemand noch Ergänzungen zu meiner Liste hat, bitte posten.
Zuerst mal ein paar Tips wie ich es implementieren würde und dann noch ein Vergleich zwischen der Methode mit Signals und Slots und der von mir vorgeschlagenen mit Semaphores.
Die Consumer dürfen erst innerhalb eines QThreads erstellt werden.
Code: Alles auswählen
class ConsumerN: public QObject {
Q_OBJECT
ConsumerN();
public slots:
doSomething(data);
}
class ConsumerNThread {
ConsumerNThread();
ConsumerN* consumer;
void run() {
consumer = new ConsumerN();
//Producerthread benachrichtigen, dass Consumer initialisiert ist
exec();
}
}
Das bedeutet, dass wenn die GUI oder irgendwer anderes dem Consumer irgendwas per Signal mitteilt, dann wird dies erst bearbeitet, nachdem die vorherige Arbeit fertig ist.
Zum überprüfen, ob noch Signals in der Warteschleife sind, kannst du bei Queued-Connections überprüfen, ob noch Events abgearbeitet werden müssen, da die Signale für Queued-Connections als QMetaCallEvents in die Eventschleife hinzugefügt werden.
Code: Alles auswählen
if(QCoreApplication::hasPendingEvents())
//do something
Von den Consumern aus kannst du eigentlich ohne Probleme direkt auf die GUI, entweder über nen Pointer(Mutex nötig!!) oder per Signal, zugreifen.
Die Threads würde ich in einer Liste, eventuell nach Typ geordnet, aufbewahren. Wenn du dynamisch, je nach vorhandener Arbeit mehrere Consumer vom selben Typ erstellen willst, dann musst du mitzählen, wie viel Arbeit die einzelnen Threadtypen haben und ggf. einen weiteren Thread erstellen. Allerdings solltest du die Threadanzahl pro Typ auf QThread::idealThreadCount() beschränken, ansonsten könnten bei hoher CPU-Auslastung unnötig viele Threads erzeugt werden, die dann nur bremsen. Die einmal gestarteten Threads kannst du einfach warten lassen und dann wieder benutzen oder nach einer längeren Zeit, in der sie nichts getan haben beenden.
Damit der Producer-Thread auch die Signals verarbeitet, muss er immer wieder, am besten nach jedem Datenblock, den Eventloop durchlaufen lassen. Allerdings können die dadurch gestarteten Slots nur Variablen innerhalb der Klasse verändern, was heißt, dass sie nur indirekt die Arbeit vom Producer verändern können.
Code: Alles auswählen
QCoreApplication::processEvents()
Dein Ansatz sollte (eigentlich ?) funktionieren.
Da immer wieder der Eventloop gestartet wird werden auch alle Anweisungen von der GUI irgendwann verarbeitet.
Als Ablauf hättest du dann so was wie:
Producer --> Daten lesen, Befehl irgendwie zwischenspeichern, bis Consumer fertig
Consumer0 starten --> Producer Initialisierung melden
Producer --> Anweisung an Consumer0 senden --> Consumer0 arbeitet
Producer liest weitere Daten --> wie an Consumer0 zuteilen --> Befehl muss warten
Consumer0 berechnet ersten Befehl fertig, Rückmeldung an Producer --> zweiten Befehl abarbeiten
Producer verarbeitet Rückmeldung --> ...
Producer ist fertig --> warten bis alle Daten verarbeitet sind --> Threads beenden
Meinen Beispielcode aus dem vorherigen Post, kannst du ebenfalls so anpassen, dass er deine Wünsche erfüllt, dazu müsstest du nur die QSemaphores in einer Liste sammeln und den entsprechenden an den passende Consumer weiterleiten.
Jetzt noch eine Liste mit den Vor- und Nachteilen der beiden Methoden (keine Garantie für Vollständigkeit
Signal-Slot-Synchronisierung:
+/-benötigt QObject
-wenn die Daten auch QObjects verwenden, sind die an den Thread gebunden
-Signale werden in fester Reihenfolge ausgeführt, keine Priorisierung möglich (ist wahrscheinlich nicht notwendig)
(wenn die Befehle priorisiert werden können sollen, dann musst du die Daten zwischenspeichern und dann gewählt ausführen, was dann ca ähnlich komplex wird wie mit QSemaphore, allerdings geht dann der Hauptvorteil der Signals verloren, dass Qt selbst die gewünschten Funktionen aufruft)
-Befehle müssen beim Start eines Threads extra zwischengespeichert werden
+Funktionen können direkt aufgerufen werden, ohne eigenen Zusatzcode
+Synchronisierung wird größtenteils von Qt selbst gehandhabt
-Befehle müssen vom eigenen Code auf die Consumer verteilt werden --> sehr schwierig, wenn z.B. 2 gleiche Consumer vorhanden sind (evtl. mit zusätzlichem Synchronisierungsthread zu implementieren --> Synchronisierungsthread bekommt Befehle und vergibt sie an die entsprechenden Consumer. Der Thread sendet immer nur einen Befehl an den Consumer und verteilt den nächsten, wenn dieser fertig ist. Der Producer sollte nicht dazu benutzt werden die Daten zu verteilen, da ansonsten zu große Wartezeiten entstehen, hierbei gibt es wieder das selbe Problem, wie beim Priorisierungsproblem angemerkt. Auch solltest du den Synchronisierungsthread dann selbst das Starten weiterer Threads durchführen lassen)
QSemaphore-Synchronisierung (es sollte ein Semaphore reichen, der die Anzahl der verfügbaren Befehle als Wert hat):
+/-verwendet/benötigt kein QObject
+Befehle lassen sich in beliebiger Reihenfolge ausführen
-Funktionen können nur indirekt aufgerufen werden (am besten Codenummern für die einzelnen Funktionen verteilen, wie beim moc-Code)
+Keine Zwischenspeicherung beim Threadstart notwendig
-schwieriger zu synchronisieren --> viel Code und viele Mutexes notwendig, zudem muss man aufpassen, dass der Thread nicht durch den Semaphore blockiert wird (hier wäre es nützlich, wenn man direkt am Thread feststellen könnte, ob ein Thread pausiert oder nicht; dazu am besten selbst eine Flag setzen die anzeigt, ob gewartet wird oder nicht, dann kann der Producer dies ähnlich wie bei den Signals und Slots überprüfen und den Thread ggf. beenden)
+verteilt selbstständig die Arbeitslast --> skaliert besser als mit Signals
-->Für einfache Synchronisierungsarbeiten sind Signals die einfachere Lösung, wenn es allerdings komplizierter wird, würde ich Semaphores verwenden.
Beide Methoden zu mischen ist meiner Meinung nach sinnlos, da die Semaphores den Thread pausieren, bis neue Befehle vorhanden sind und zum anderen die Signals beim informieren über neue Befehle ohnehin langsamer wären, denn die Semaphores werden praktisch sofort entsperrt.
Mit Semaphores kannst du ebenfalls einen Callback machen. Am besten die Callbacks in eine Liste im Producer schreiben, die dieser dann immer wieder prüft. Dies funktioniert fast gleich wie wenn du Signals verwendest, nur musst du wie in den Threads, das Aufrufen der richtigen Funktion selbst übernehmen. Bei den Signals lässt der Producer immer wieder den Eventloop abarbeiten, der hier durch die Liste ersetzt ist. Beim selbstgeschriebenen Callback kannst du auch die Variablen direkt innerhalb der Funktion verändern, was evtl. nützlich sein könnte.
So das wars erst mal. Ich hoffe mal, dass mein Roman