if self.mosquitto_process is None:
self.mosquitto_process = QProcess()
self.mosquitto_process.start("mosquitto.cmd")
self.port = utils.pick_free_port()
_logger().debug('starting server process: %s',
' '.join([self._interpreter, server.__file__,
str(self.port)]))
self._process = QtCore.QProcess()
self._process.readyRead.connect(self._on_ready_read)
self._process.setProcessChannelMode(self._process.MergedChannels)
self._process.finished.connect(self._on_finished)
self._process.stateChanged.connect(self._on_state_changed)
self._process.start(
self._interpreter, (server.__file__, str(self.port)))
proc = QProcess()
proc.start(*py_proc('print("Hello World")'))
dev = qtutils.PyQIODevice(proc)
assert not dev.closed
with pytest.raises(OSError) as excinfo:
dev.seek(0)
assert str(excinfo.value) == 'Random access not allowed!'
with pytest.raises(OSError) as excinfo:
dev.tell()
assert str(excinfo.value) == 'Random access not allowed!'
proc.waitForFinished(1000)
proc.kill()
assert bytes(dev.read()).rstrip() == b'Hello World'
References
https://programtalk.com/python-examples/PyQt5.QtCore.QProcess/