[DNM] Write to sockets in order, instead of out of order

This commit is contained in:
Peter Cai 2025-03-09 16:39:46 -04:00
parent 4259730938
commit 1f4ca74cf9
2 changed files with 34 additions and 15 deletions

View file

@ -145,6 +145,28 @@ impl<'a> WlMsgWriter<'a> {
self.write_queue.push(msg);
}
pub async fn write(&mut self, msg: WlRawMsg) -> io::Result<()> {
let (buf, fds) = msg.into_parts();
let mut written = 0usize;
while written < buf.len() {
self.egress.writable().await?;
let res = self
.egress
.send_with_fd(&buf[written..], unsafe { std::mem::transmute(fds.deref()) });
match res {
Ok(new_written) => written += new_written,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => return Err(e),
}
}
Ok(())
}
/// Try to make progress by flushing some of the queued up messages into the stream.
/// When this resolves, note that we might have only partially written. In that
/// case the buffer is saved internally in this structure.

View file

@ -141,7 +141,7 @@ impl<'a> ConnDuplex<'a> {
match verdict {
WlMitmVerdict::Allowed => {
self.downstream_write.queue_write(wl_raw_msg);
self.downstream_write.write(wl_raw_msg).await?;
}
WlMitmVerdict::Terminate => {
return Err(io::Error::new(
@ -180,18 +180,20 @@ impl<'a> ConnDuplex<'a> {
match verdict {
WlMitmVerdict::Allowed => {
self.upstream_write.queue_write(wl_raw_msg);
self.upstream_write.write(wl_raw_msg).await?;
}
WlMitmVerdict::Rejected(error_code) => {
self.downstream_write.queue_write(
WlDisplayErrorEvent::new(
WL_DISPLAY_OBJECT_ID,
wl_raw_msg.obj_id,
error_code,
"Rejected by wl-mitm",
self.downstream_write
.write(
WlDisplayErrorEvent::new(
WL_DISPLAY_OBJECT_ID,
wl_raw_msg.obj_id,
error_code,
"Rejected by wl-mitm",
)
.build(),
)
.build(),
);
.await?;
}
WlMitmVerdict::Terminate => {
return Err(io::Error::new(
@ -213,11 +215,6 @@ impl<'a> ConnDuplex<'a> {
pub async fn run_to_completion(mut self) -> io::Result<()> {
loop {
tokio::select! {
biased;
res = self.downstream_write.dequeue_write() => res?,
res = self.upstream_write.dequeue_write() => res?,
msg = self.upstream_read.read() => {
control_flow!(self.handle_s2c_event(msg?).await?);
}