Frage Thread-Interrupt, der nicht beendet wird, blockiert Anruf beim Eingangsstrom, der gelesen wird


Ich benutze RXTX, um Daten von einem seriellen Port zu lesen. Das Lesen wird in einem Thread ausgeführt, der wie folgt erzeugt wird:

CommPortIdentifier portIdentifier = CommPortIdentifier.getPortIdentifier(port);
CommPort comm = portIdentifier.open("Whatever", 2000);
SerialPort serial = (SerialPort)comm;
...settings
Thread t = new Thread(new SerialReader(serial.getInputStream()));
t.start();

Die SerialReader-Klasse implementiert Runnable und führt eine unbegrenzte Anzahl von Loops durch, liest vom Port und erstellt die Daten in nützliche Pakete, bevor sie an andere Anwendungen gesendet wird. Allerdings habe ich es auf die folgende Einfachheit reduziert:

public void run() {
  ReadableByteChannel byteChan = Channels.newChannel(in); //in = InputStream passed to SerialReader
  ByteBuffer buffer = ByteBuffer.allocate(100);
  while (true) {
    try {
      byteChan.read(buffer);
    } catch (Exception e) {
      System.out.println(e);
    }
  }
}

Wenn ein Benutzer auf eine Stoppschaltfläche klickt, wird die folgende Funktionalität ausgelöst, die theoretisch den Eingabestream schließen und den blockierenden byteChan.read (Puffer) -Aufruf unterbrechen sollte. Der Code ist wie folgt:

public void stop() {
  t.interrupt();
  serial.close();
}

Wenn ich diesen Code jedoch ausführe, erhalte ich nie eine ClosedByInterruptException, die ausgelöst werden soll, sobald der Eingabestream geschlossen wird. Außerdem blockiert die Ausführung beim Aufruf von serial.close () - weil der zugrundeliegende Eingabestream beim Lesen weiterhin blockiert. Ich habe versucht, den Unterbrechungsaufruf mit byteChan.close () zu ersetzen, der dann eine AsynchronousCloseException verursachen sollte, jedoch erhalte ich die gleichen Resultate.

Jede Hilfe bei dem, was mir fehlt, wäre sehr willkommen.


8
2017-10-01 22:11


Ursprung


Antworten:


Sie können keinen Stream erstellen, der unterbrechbare E / A nicht unterstützt InterruptibleChannel einfach durch das Einwickeln (und sowieso ReadableByteChannel erstreckt sich nicht InterruptibleChannel).

Sie müssen sich den Kontrakt des Basiswerts ansehen InputStream. Was macht SerialPort.getInputStream() über die Unterbrechbarkeit seines Ergebnisses sagen? Wenn es nichts sagt, sollten Sie davon ausgehen, dass es Interrupts ignoriert.

Für alle E / A, die die Unterbrechbarkeit nicht explizit unterstützen, besteht die einzige Option darin, den Stream im Allgemeinen aus einem anderen Thread zu schließen. Dies kann sofort eine Erhöhung auslösen IOException (obwohl es nicht sein könnte AsynchronousCloseException) in dem Thread, der bei einem Aufruf des Streams blockiert ist.

Aber auch dies ist extrem abhängig von der Umsetzung der InputStream- und das zugrunde liegende Betriebssystem kann auch ein Faktor sein.


Notieren Sie sich den Kommentar zum Quellcode auf dem ReadableByteChannelImpl Klasse zurückgegeben von newChannel():

  private static class ReadableByteChannelImpl
    extends AbstractInterruptibleChannel       // Not really interruptible
    implements ReadableByteChannel
  {
    InputStream in;
    ⋮

5
2017-10-01 22:22



Der RXTX SerialInputStream (was vom Aufruf serial.getInputStream () zurückgegeben wird) unterstützt ein Timeout-Schema, das letztendlich alle meine Probleme löste. Wenn vor dem Erstellen des neuen SerialReader-Objekts Folgendes hinzugefügt wird, werden die Lesevorgänge nicht mehr unbegrenzt blockiert:

serial.enableReceiveTimeout(1000);

Innerhalb des SerialReader-Objekts musste ich einige Dinge ändern, um direkt aus dem InputStream zu lesen, anstatt den ReadableByteChannel zu erstellen, aber jetzt kann ich den Reader stoppen und neu starten, ohne dass es Probleme gibt.


3
2017-10-13 17:31



Ich verwende den folgenden Code, um rxtx herunterzufahren. Ich führe Tests aus, die sie starten und herunterfahren und das scheint gut zu funktionieren. Mein Leser sieht so aus:

private void addPartsToQueue(final InputStream inputStream) {
    byte[] buffer = new byte[1024];
    int len = -1;
    boolean first = true;
    // the read can throw
    try {
        while ((len = inputStream.read(buffer)) > -1) {
            if (len > 0) {
                if (first) {
                    first = false;
                    t0 = System.currentTimeMillis();
                } else
                    t1 = System.currentTimeMillis();
                final String part = new String(new String(buffer, 0, len));
                queue.add(part);
                //System.out.println(part + " " + (t1 - t0));
            }
            try {
                Thread.sleep(sleep);
            } catch (InterruptedException e) {
                //System.out.println(Thread.currentThread().getName() + " interrupted " + e);
                break;
            }
        }
    } catch (IOException e) {
        System.err.println(Thread.currentThread().getName() + " " + e);
        //if(interruSystem.err.println(e);
        e.printStackTrace();
    }
    //System.out.println(Thread.currentThread().getName() + " is ending.");
}

Vielen Dank

public void shutdown(final Device device) {
    shutdown(serialReaderThread);
    shutdown(messageAssemblerThread);
    serialPort.close();
    if (device != null)
        device.setSerialPort(null);
}

public static void shutdown(final Thread thread) {
    if (thread != null) {
        //System.out.println("before intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        thread.interrupt();
        //System.out.println("after intterupt() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted trying to sleep after interrupting" + thread.getName() + " " + e);
        }
        //System.out.println("before join() on thread " + thread.getName() + ", it's state is " + thread.getState());
        try {
            thread.join();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " join interruped");
        }
        //System.out.println(Thread.currentThread().getName() + " after join() on thread " + thread.getName() + ", it's state is" + thread.getState());
    }

1
2017-11-30 08:50