Skip to content

Commit 48451ea

Browse files
websocket write bug fix (#88)
* added mutex to protect web socket write queue from concurrent access.
1 parent eb951de commit 48451ea

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
name: 'Checkout'
2020
- name: Install brew dependencies
2121
run: |
22-
brew install openssl zlib cmake wget
22+
brew install openssl@1.1 zlib cmake wget
2323
- name: Install boost
2424
working-directory: ${{ github.workspace }}
2525
run: |
@@ -54,7 +54,7 @@ jobs:
5454
run: |
5555
mkdir build
5656
cd build
57-
cmake .. -DOPENSSL_ROOT_DIR=/usr/local/Cellar/[email protected]/1.1.1n/ -DOPENSSL_LIBRARIES=/usr/local/Cellar/[email protected]/1.1.1n/lib/
57+
cmake .. -DOPENSSL_ROOT_DIR=/usr/local/Cellar/[email protected]/1.1.1q/ -DOPENSSL_LIBRARIES=/usr/local/Cellar/[email protected]/1.1.1q/lib/
5858
make
5959
ubuntu:
6060
runs-on: ubuntu-latest

src/TcpAdapterProxy.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1536,14 +1536,19 @@ namespace aws { namespace iot { namespace securedtunneling {
15361536
BOOST_LOG_SEV(log, trace) << "Put data " << data_to_send->size() << " bytes into the web_socket_outgoing_message_queue for service id: " << service_id;
15371537
tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id);
15381538
data_message temp = std::make_pair(data_to_send, socket_connection->after_send_message);
1539+
1540+
const std::lock_guard<std::mutex> lock(tac.web_socket_outgoing_message_queue_mutex);
15391541
tac.web_socket_outgoing_message_queue.push(temp);
15401542
// Are we already writing?
15411543
if(tac.web_socket_outgoing_message_queue.size() > 1)
1544+
{
15421545
return;
1546+
}
15431547
}
15441548

15451549
// We are not currently writing, so send this immediately
15461550
data_message message_to_send = tac.web_socket_outgoing_message_queue.front();
1551+
15471552
tac.wss->async_write(message_to_send.first->data(), [=, &tac](boost::system::error_code const &ec, std::size_t const bytes_sent)
15481553
{
15491554
if (ec)
@@ -1552,12 +1557,14 @@ namespace aws { namespace iot { namespace securedtunneling {
15521557
}
15531558
BOOST_LOG_SEV(log, trace) << "Sent " << bytes_sent << " bytes over websocket for service id: " << service_id;
15541559
std::function<void()> capture_after_send_message = message_to_send.second;
1555-
tac.web_socket_outgoing_message_queue.pop();
15561560

15571561
if(capture_after_send_message)
15581562
{
15591563
capture_after_send_message();
15601564
}
1565+
1566+
const std::lock_guard<std::mutex> lock(tac.web_socket_outgoing_message_queue_mutex);
1567+
tac.web_socket_outgoing_message_queue.pop();
15611568
if(tac.web_socket_outgoing_message_queue.empty())
15621569
{
15631570
BOOST_LOG_SEV(log, trace) << "web_socket_outgoing_message_queue is empty, no more messages to send.";

src/TcpAdapterProxy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ namespace aws { namespace iot { namespace securedtunneling {
8787
bind_address_actual{ },
8888
is_web_socket_reading{ false },
8989
is_service_ids_received{ false },
90-
web_socket_outgoing_message_queue{}
90+
web_socket_outgoing_message_queue{},
91+
web_socket_outgoing_message_queue_mutex{}
9192
{ }
9293

9394
boost::asio::io_context io_ctx;
@@ -120,6 +121,7 @@ namespace aws { namespace iot { namespace securedtunneling {
120121
bool is_web_socket_reading;
121122
bool is_service_ids_received;
122123
std::queue<data_message> web_socket_outgoing_message_queue;
124+
std::mutex web_socket_outgoing_message_queue_mutex;
123125
};
124126

125127
//simple re-usable structure for a basic retry strategy's state

0 commit comments

Comments
 (0)