Before your edits, I started doing some work on a truly async approach:
Let's get the formalities out of the way:
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
namespace ba = boost::asio;
namespace bp = boost::process;
#include <iostream>
#define LOG(x) std::clog
Now lets create a ProcessManager
that runs all processes on a single io_service
that is shutdown in the destructor.
The IO service is used to schedule all the work (like asynchronous IO). I've
- randomly decided to focus on line-oriented IO operations
- decided that there's likely no reason to use more than 1 IO thread, but in case you would ever, there is the
strand
that correctly synchronizes operations with respect to a child.
#include <map>
#include <list>
#include <thread>
class ProcessManager { // ugh naming smell
using error_code = boost::system::error_code;
private:
ba::io_service _service;
boost::optional<ba::io_service::work> _keep{_service};
boost::process::group _group;
std::thread io_thread;
struct patchProcess : std::enable_shared_from_this<patchProcess> {
using ptr = std::shared_ptr<patchProcess>;
static ptr start(std::string command, std::vector<std::string> args, ProcessManager& mgr) {
ptr p(new patchProcess(std::move(command), std::move(args), mgr));
p->output_read_loop();
return p;
}
boost::optional<std::string> getline() {
std::lock_guard<std::mutex> lk(_mx);
std::string s;
if (has_newline(_output.data()) && std::getline(std::istream(&_output), s))
return s;
return boost::none;
}
void write(std::string message) {
std::lock_guard<std::mutex> lk(_mx);
_input_bufs.push_back({false, std::move(message)});
if (_input_bufs.size() == 1)
input_write_loop();
}
void close_stdin() {
std::lock_guard<std::mutex> lk(_mx);
if (_input_bufs.empty()) {
_strand.post([this, self=shared_from_this()] { _stdin.close(); });
} else {
_input_bufs.push_back({true, {}});
}
}
bool is_running() { return _process.running(); }
private:
patchProcess(std::string command, std::vector<std::string> args, ProcessManager& mgr)
: _strand(mgr._service),
_stdout(mgr._service), _stdin(mgr._service),
_process(command, args, mgr._group, bp::std_out > _stdout, bp::std_in < _stdin, mgr._service)
{ }
void output_read_loop() {
ba::async_read_until(_stdout, pending_output, "
", _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
if (!ec) {
std::lock_guard<std::mutex> lk(_mx);
std::ostream(&_output) << &pending_output;
output_read_loop();
}
}));
}
void input_write_loop() { // assumes _mx locked
if (!_input_bufs.empty()) {
auto& msg = _input_bufs.front();
if (msg.eof) {
_strand.post([this, self=shared_from_this()] { _stdin.close(); });
} else {
ba::async_write(_stdin, ba::buffer(_input_bufs.front().pay_load),
_strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
std::lock_guard<std::mutex> lk(_mx);
_input_bufs.pop_front();
if (!ec)
input_write_loop();
}));
}
}
}
ba::io_service::strand _strand; // thread-safe
// strand-local
bp::async_pipe _stdout, _stdin;
bp::child _process;
ba::streambuf pending_output;
// mutex protected
std::mutex mutable _mx;
struct out_message { bool eof; std::string pay_load; };
std::list<out_message> _input_bufs; // iterator stability again!
ba::streambuf _output;
// static helpers
template <typename T>
static bool has_newline(T buffer) {
return std::find(buffers_begin(buffer), buffers_end(buffer), '
') != buffers_end(buffer);
}
};
using Map = std::map<std::string, patchProcess::ptr>; // iterator stability required!
Map processList;
void eventloop() {
for(;;) try {
if (!_service.run()) break;
} catch(std::exception const& e) {
LOG(error) << "Exception in handler: " << e.what() << "
";
}
}
public:
ProcessManager() : io_thread([this] { eventloop(); }) { }
~ProcessManager() {
status(__FUNCTION__);
_keep.reset();
io_thread.join();
status(__FUNCTION__);
}
void status(std::string const& caption = "Status") const {
for (auto& p : processList) {
LOG(info) << caption << ": '" << p.first << "' is " << (p.second->is_running()? "still running":"done") << "
";
}
}
patchProcess::ptr addNew(std::string name, std::string command, std::vector<std::string> args) {
auto pit = processList.find(name);
if (pit != processList.end()) {
if (pit->second->is_running()) {
LOG(error) << "Process already running ('" << name << "')
";
return {};
}
// TODO make sure process cleaned up etc.
}
LOG(info) << "Creating process for patch " << name << "
";
return processList[name] = patchProcess::start(std::move(command), std::move(args), *this);
}
};
Demos
The most naive run would be:
int main() {
ProcessManager pm;
}
Which, predictably returns after doing nothing. Next, we try
int main() {
ProcessManager pm;
pm.addNew("sleeper", "/bin/bash", {"-c", "sleep 3" });
}
Which predictably waits 3 seconds before exiting. It prints:
Creating process for patch sleeper
~ProcessManager: 'sleeper' is still running
~ProcessManager: 'sleeper' is done
But WAIT! Didn't you specifically say you didn't want waiting? Well, there is none! You can do whatever you please in the mean time. It's just that ProcessManager
's destructor will - by default - wait for the child to finish.
Let's do some IO:
Live On Coliru
int main() {
ProcessManager pm;
auto ls = pm.addNew("listing", "/bin/ls", {"-ltr" });
boost::optional<std::string> l;
while ((l = ls->getline()) || ls->is_running()) {
if (l.is_initialized()) {
std::cout << "ls: " << std::quoted(*l) << std::endl;
l.reset();
}
}
}
Prints
Creating process for patch listing
ls: "total 172"
ls: "-rw-rw-rw- 1 2001 2000 5645 Feb 11 00:10 main.cpp"
ls: "-rwxr-xr-x 1 2001 2000 162784 Feb 11 00:10 a.out"
~ProcessManager: 'listing' is done
~ProcessManager: 'listing' is done
To really drive the point home that the processes and their IO are synchronous, we can replace
auto ls = pm.addNew("listing", "/bin/ls", {"-ltr" });
with something more time-varied:
auto ls = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo "$line"; done" });
Now, to make it really challenging, let's add another child process and send the output of the ls
to the other child
:
Live On Coliru
int main() {
ProcessManager pm;
auto ls = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo "$line"; done" });
auto xxd = pm.addNew("hex encoding", "/usr/bin/xxd", {});
boost::optional<std::string> l, x;
bool closed = false;
while ((l || (l = ls->getline())) || (x || (x = xxd->getline())) || ls->is_running() || xxd->is_running()) {
if (l.is_initialized()) {
xxd->write(std::move(*l) + '
');
l.reset();
std::cout << "[forwarded from ls to xxd]" << std::endl;
} else {
if (!closed && !ls->is_running()) {
std::cout << "[closing input to xxd]" << std::endl;
xxd->close_stdin();
closed = true;
}
}
if (x.is_initialized()) {
std::cout << std::quoted(*x) << std::endl;
x.reset();
}
}
}
Now, on Coliru the listing is too small to be interesting, but on my system you get output like:
Creating process for patch listing
Creating process for patch hex encoding
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
"00000000: 746f 7461 6c20 3733 3635 0a2d 7277 2d72 total 7365.-rw-r"
"00000010: 772d 722d 2d20 2031 2073 6568 6520 7365 w-r-- 1 sehe se"
"00000020: 6865 2020 2020 3133 3737 206d 6569 2031 he 1377 mei 1"
"00000030: 3020 2032 3031 3720 636d 616b 655f 696e 0 2017 cmake_in"
"00000040: 7374 616c 6c2e 636d 616b 650a 6c72 7778 stall.cmake.lrwx"
"00000050: 7277 7872 7778 2020 3120 7365 6865 2073 rwxrwx 1 sehe s"
"00000060: 6568 6520 2020 2020 2020 3820 6d65 6920 ehe 8 mei "
"00000070: 3234 2020 3230 3137 206d 6169 6e2e 6370 24 2017 main.cp"
"00000080: 7020 2d3e 2074 6573 742e 6370 700a 2d72 p -> test.cpp.-r"
"00000090: 772d 7277 2d72 2d2d 2020 3120 7365 6865 w-rw-r-- 1 sehe"
"000000a0: 2073 6568 6520 2020 2020 3531 3420 7365 sehe 514 se"
"000000b0: 7020 3133 2030 383a 3336 2063 6f6d 7069 p 13 08:36 compi"
"000000c0: 6c65 5f63 6f6d 6d61 6e64 732e 6a73 6f6e le_commands.json"
"000000d0: 0a2d 7277 2d72 772d 722d 2d20 2031 2073 .-rw-rw-r-- 1 s"
"000000e0: 6568 6520 7365 6865 2020 2020 3135 3834 ehe sehe 1584"
"000000f0: 2073 6570 2032 3020 3232 3a30 3320 576f sep 20 22:03 Wo"
"00000100: 7264 436f 756e 7465 722e 680a 2d72 772d rdCounter.h.-rw-"
"00000110: 7277 2d72 2d2d 2020 3120 7365 6865 2073 rw-r-- 1 sehe s"
"00000120: 6568 6520 2020 2020 3336 3920 7365 7020 ehe 369 sep "
"00000130: 3233 2030 333a 3131 2063 6f6d 6d6f 6e2e 23 03:11 common."
"00000140: