My question
How do I avoid a data race when using two threads to send and receive over an asio::ip::tcp::iostream
?
Design
I am writing a program that uses an asio::ip::tcp::iostream
for input and output. The program accepts commands from the (remote) user over port 5555 and sends messages over that same TCP connection to the user. Because these events (commands received from the user or messages sent to the user) occur asynchronously, I have separate transmit and receive threads.
In this toy version, the commands are "one", "two" and "quit". Of course "quit" quits the program. The other commands do nothing, and any unrecognized command causes the server to close the TCP connection.
The transmitted messages are simple serial-numbered messages that are are sent once per second.
In both this toy version and the real code I'm trying to write, the transmit and receive processes are both using blocking IO, so there doesn't appear to be a good way to use a std::mutex
or other synchronization mechanism. (In my attempts, one process would grab the mutex and then block, which isn't going to work for this.)
Build and test
To build and test this, I'm using gcc version 7.2.1 and valgrind 3.13 on a 64-bit Linux machine. Build:
g++ -DASIO_STANDALONE -Wall -Wextra -pedantic -std=c++14 concurrent.cpp -o concurrent -lpthread
To test, I run the server with this command:
valgrind --tool=helgrind --log-file=helgrind.txt ./concurrent
Then I use telnet 127.0.0.1 5555
in another window to create a connection to the server. What helgrind
correctly points out is that there is a data race because both runTx
and runRx
are trying to access the same stream asynchronously:
==16188== Possible data race during read of size 1 at 0x1FFEFFF1CC by thread #1
==16188== Locks held: none
... many more lines elided
concurrent.cpp
#include <asio.hpp>
#include <iostream>
#include <fstream>
#include <thread>
#include <array>
#include <chrono>
class Console {
public:
Console() :
want_quit{false},
want_reset{false}
{}
bool getQuitValue() const { return want_quit; }
int run(std::istream *in, std::ostream *out);
bool wantReset() const { return want_reset; }
private:
int runTx(std::istream *in);
int runRx(std::ostream *out);
bool want_quit;
bool want_reset;
};
int Console::runTx(std::istream *in) {
static const std::array<std::string, 3> cmds{
"quit", "one", "two",
};
std::string command;
while (!want_quit && !want_reset && *in >> command) {
if (command == cmds.front()) {
want_quit = true;
}
if (std::find(cmds.cbegin(), cmds.cend(), command) == cmds.cend()) {
want_reset = true;
std::cout << "unknown command [" << command << "]
";
} else {
std::cout << command << '
';
}
}
return 0;
}
int Console::runRx(std::ostream *out) {
for (int i=0; !(want_reset || want_quit); ++i) {
(*out) << "This is message number " << i << '
';
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
out->flush();
}
return 0;
}
int Console::run(std::istream *in, std::ostream *out) {
want_reset = false;
std::thread t1{&Console::runRx, this, out};
int status = runTx(in);
t1.join();
return status;
}
int main()
{
Console con;
asio::io_service ios;
// IPv4 address, port 5555
asio::ip::tcp::acceptor acceptor(ios,
asio::ip::tcp::endpoint{asio::ip::tcp::v4(), 5555});
while (!con.getQuitValue()) {
asio::ip::tcp::iostream stream;
acceptor.accept(*stream.rdbuf());
con.run(&stream, &stream);
if (con.wantReset()) {
std::cout << "resetting
";
}
}
}
See Question&Answers more detail:os