Compare commits

...
Sign in to create a new pull request.

2 commits

Author SHA1 Message Date
5a7c4aa781 Enable biased again 2025-03-09 16:51:03 -04:00
1f4ca74cf9 [DNM] Write to sockets in order, instead of out of order 2025-03-09 16:39:46 -04:00
2 changed files with 34 additions and 13 deletions

View file

@ -145,6 +145,28 @@ impl<'a> WlMsgWriter<'a> {
self.write_queue.push(msg); self.write_queue.push(msg);
} }
pub async fn write(&mut self, msg: WlRawMsg) -> io::Result<()> {
let (buf, fds) = msg.into_parts();
let mut written = 0usize;
while written < buf.len() {
self.egress.writable().await?;
let res = self
.egress
.send_with_fd(&buf[written..], unsafe { std::mem::transmute(fds.deref()) });
match res {
Ok(new_written) => written += new_written,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
Err(e) => return Err(e),
}
}
Ok(())
}
/// Try to make progress by flushing some of the queued up messages into the stream. /// Try to make progress by flushing some of the queued up messages into the stream.
/// When this resolves, note that we might have only partially written. In that /// When this resolves, note that we might have only partially written. In that
/// case the buffer is saved internally in this structure. /// case the buffer is saved internally in this structure.

View file

@ -141,7 +141,7 @@ impl<'a> ConnDuplex<'a> {
match verdict { match verdict {
WlMitmVerdict::Allowed => { WlMitmVerdict::Allowed => {
self.downstream_write.queue_write(wl_raw_msg); self.downstream_write.write(wl_raw_msg).await?;
} }
WlMitmVerdict::Terminate => { WlMitmVerdict::Terminate => {
return Err(io::Error::new( return Err(io::Error::new(
@ -180,18 +180,20 @@ impl<'a> ConnDuplex<'a> {
match verdict { match verdict {
WlMitmVerdict::Allowed => { WlMitmVerdict::Allowed => {
self.upstream_write.queue_write(wl_raw_msg); self.upstream_write.write(wl_raw_msg).await?;
} }
WlMitmVerdict::Rejected(error_code) => { WlMitmVerdict::Rejected(error_code) => {
self.downstream_write.queue_write( self.downstream_write
WlDisplayErrorEvent::new( .write(
WL_DISPLAY_OBJECT_ID, WlDisplayErrorEvent::new(
wl_raw_msg.obj_id, WL_DISPLAY_OBJECT_ID,
error_code, wl_raw_msg.obj_id,
"Rejected by wl-mitm", error_code,
"Rejected by wl-mitm",
)
.build(),
) )
.build(), .await?;
);
} }
WlMitmVerdict::Terminate => { WlMitmVerdict::Terminate => {
return Err(io::Error::new( return Err(io::Error::new(
@ -215,9 +217,6 @@ impl<'a> ConnDuplex<'a> {
tokio::select! { tokio::select! {
biased; biased;
res = self.downstream_write.dequeue_write() => res?,
res = self.upstream_write.dequeue_write() => res?,
msg = self.upstream_read.read() => { msg = self.upstream_read.read() => {
control_flow!(self.handle_s2c_event(msg?).await?); control_flow!(self.handle_s2c_event(msg?).await?);
} }