Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
deadline
  • Loading branch information
youknowone committed Jan 4, 2026
commit 8baade604b546d0308215882b6187093e3f2f125
52 changes: 40 additions & 12 deletions crates/stdlib/src/ssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2724,6 +2724,8 @@ mod _ssl {
recv_method.call((self.sock.clone(), vm.ctx.new_int(size)), vm)
}

/// Socket send - just sends data, caller must handle pending flush
/// Use flush_pending_tls_output before this if ordering is important
pub(crate) fn sock_send(&self, data: &[u8], vm: &VirtualMachine) -> PyResult<PyObjectRef> {
// In BIO mode, write to outgoing BIO
if let Some(ref bio) = self.outgoing_bio {
Expand All @@ -2742,19 +2744,45 @@ mod _ssl {
}

/// Flush any pending TLS output data to the socket
/// This should be called before generating new TLS output
pub(crate) fn flush_pending_tls_output(&self, vm: &VirtualMachine) -> PyResult<()> {
/// Optional deadline parameter allows respecting a read deadline during flush
pub(crate) fn flush_pending_tls_output(
&self,
vm: &VirtualMachine,
deadline: Option<std::time::Instant>,
) -> PyResult<()> {
let mut pending = self.pending_tls_output.lock();
if pending.is_empty() {
return Ok(());
}

let timeout = self.get_socket_timeout(vm)?;
let is_non_blocking = timeout.map(|t| t.is_zero()).unwrap_or(false);
let socket_timeout = self.get_socket_timeout(vm)?;
let is_non_blocking = socket_timeout.map(|t| t.is_zero()).unwrap_or(false);

let mut sent_total = 0;
while sent_total < pending.len() {
let timed_out = self.sock_wait_for_io_impl(SelectKind::Write, vm)?;
// Calculate timeout: use deadline if provided, otherwise use socket timeout
let timeout_to_use = if let Some(dl) = deadline {
let now = std::time::Instant::now();
if now >= dl {
// Deadline already passed
*pending = pending[sent_total..].to_vec();
return Err(
timeout_error_msg(vm, "The operation timed out".to_string()).upcast()
);
}
Some(dl - now)
} else {
socket_timeout
};

// Use sock_select directly with calculated timeout
let py_socket: PyRef<PySocket> = self.sock.clone().try_into_value(vm)?;
let socket = py_socket
.sock()
.map_err(|e| vm.new_os_error(format!("Failed to get socket: {e}")))?;
let timed_out = sock_select(&socket, SelectKind::Write, timeout_to_use)
.map_err(|e| vm.new_os_error(format!("select failed: {e}")))?;

if timed_out {
// Keep unsent data in pending buffer
*pending = pending[sent_total..].to_vec();
Expand Down Expand Up @@ -2888,7 +2916,7 @@ mod _ssl {
);
}

// Try to send pending data
// Try to send pending data (use raw to avoid recursion)
match self.sock_send(&pending_data, vm) {
Ok(result) => {
let sent: usize = result.try_to_value::<isize>(vm)?.try_into().unwrap_or(0);
Expand Down Expand Up @@ -3565,7 +3593,7 @@ mod _ssl {
// This ensures TLS 1.3 Finished message reaches server before application data
// Without this, server may not be ready to process our data
if !is_bio {
self.flush_pending_tls_output(vm)?;
self.flush_pending_tls_output(vm, None)?;
}

// Write data in chunks to avoid filling the internal TLS buffer
Expand Down Expand Up @@ -3599,7 +3627,7 @@ mod _ssl {
} else {
// Socket mode: flush all pending TLS data
// First, try to send any previously pending data
self.flush_pending_tls_output(vm)?;
self.flush_pending_tls_output(vm, None)?;

while conn.wants_write() {
let mut buf = Vec::new();
Expand Down Expand Up @@ -3954,7 +3982,7 @@ mod _ssl {
self.blocking_flush_all_pending(vm)?;
} else {
// BIO mode: non-blocking flush (caller handles pending data)
let _ = self.flush_pending_tls_output(vm);
let _ = self.flush_pending_tls_output(vm, None);
}

conn.send_close_notify();
Expand Down Expand Up @@ -4030,12 +4058,12 @@ mod _ssl {
Some(0.0) => {
// Non-blocking: best-effort flush, ignore errors
// to avoid deadlock with asyncore-based servers
let _ = self.flush_pending_tls_output(vm);
let _ = self.flush_pending_tls_output(vm, None);
}
Some(_t) => {
// Timeout mode: use flush with socket's timeout
// Errors (including timeout) are propagated to caller
self.flush_pending_tls_output(vm)?;
self.flush_pending_tls_output(vm, None)?;
}
None => {
// Blocking mode: wait until all pending data is sent
Expand Down Expand Up @@ -4075,7 +4103,7 @@ mod _ssl {
fn write_pending_tls(&self, conn: &mut TlsConnection, vm: &VirtualMachine) -> PyResult<()> {
// First, flush any previously pending TLS output
// Must succeed before sending new data to maintain order
self.flush_pending_tls_output(vm)?;
self.flush_pending_tls_output(vm, None)?;

loop {
if !conn.wants_write() {
Expand Down
49 changes: 37 additions & 12 deletions crates/stdlib/src/ssl/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,17 +1010,35 @@ pub(super) fn is_blocking_io_error(err: &Py<PyBaseException>, vm: &VirtualMachin
/// Loops until all bytes are sent. For blocking sockets, this will wait
/// until all data is sent. For non-blocking sockets, returns WantWrite
/// if no progress can be made.
fn send_all_bytes(socket: &PySSLSocket, buf: Vec<u8>, vm: &VirtualMachine) -> SslResult<()> {
// First, flush any previously pending TLS data
// Must succeed before sending new data to maintain order
socket.flush_pending_tls_output(vm).map_err(SslError::Py)?;
/// Optional deadline parameter allows respecting a read deadline during flush.
fn send_all_bytes(
socket: &PySSLSocket,
buf: Vec<u8>,
vm: &VirtualMachine,
deadline: Option<std::time::Instant>,
) -> SslResult<()> {
// First, flush any previously pending TLS data with deadline
socket
.flush_pending_tls_output(vm, deadline)
.map_err(SslError::Py)?;

if buf.is_empty() {
return Ok(());
}

let mut sent_total = 0;
while sent_total < buf.len() {
// Check deadline before each send attempt
if let Some(dl) = deadline
&& std::time::Instant::now() >= dl
{
socket
.pending_tls_output
.lock()
.extend_from_slice(&buf[sent_total..]);
return Err(SslError::Timeout("The operation timed out".to_string()));
}

match socket.sock_send(&buf[sent_total..], vm) {
Ok(result) => {
let sent: usize = result
Expand Down Expand Up @@ -1075,7 +1093,9 @@ fn handshake_write_loop(

// Flush any previously pending TLS data before generating new output
// Must succeed before sending new data to maintain order
socket.flush_pending_tls_output(vm).map_err(SslError::Py)?;
socket
.flush_pending_tls_output(vm, None)
.map_err(SslError::Py)?;

while conn.wants_write() || force_initial_write {
if force_initial_write && !conn.wants_write() {
Expand All @@ -1090,7 +1110,7 @@ fn handshake_write_loop(

if written > 0 && !buf.is_empty() {
// Send all bytes to socket, handling partial sends
send_all_bytes(socket, buf, vm)?;
send_all_bytes(socket, buf, vm, None)?;
made_progress = true;
} else if written == 0 {
// No data written but wants_write is true - should not happen normally
Expand Down Expand Up @@ -1209,7 +1229,7 @@ fn handle_handshake_complete(
// Do NOT loop on wants_write() - avoid infinite loop/deadlock
let tls_data = ssl_write_tls_records(conn)?;
if !tls_data.is_empty() {
send_all_bytes(socket, tls_data, vm)?;
send_all_bytes(socket, tls_data, vm, None)?;
}

// IMPORTANT: Don't check wants_write() again!
Expand All @@ -1224,7 +1244,7 @@ fn handle_handshake_complete(
if tls_data.is_empty() {
break;
}
match send_all_bytes(socket, tls_data, vm) {
match send_all_bytes(socket, tls_data, vm, None) {
Ok(()) => {}
Err(SslError::WantWrite) => break,
Err(e) => return Err(e),
Expand Down Expand Up @@ -1314,13 +1334,13 @@ pub(super) fn ssl_do_handshake(
if !is_bio {
conn.send_close_notify();
// Flush any pending TLS data before sending close_notify
let _ = socket.flush_pending_tls_output(vm);
let _ = socket.flush_pending_tls_output(vm, None);
// Actually send the close_notify alert using send_all_bytes
// for proper partial send handling and retry logic
if let Ok(alert_data) = ssl_write_tls_records(conn)
&& !alert_data.is_empty()
{
let _ = send_all_bytes(socket, alert_data, vm);
let _ = send_all_bytes(socket, alert_data, vm, None);
}
}

Expand Down Expand Up @@ -1371,7 +1391,7 @@ pub(super) fn ssl_do_handshake(
break;
}
// Send to outgoing BIO
send_all_bytes(socket, buf[..n].to_vec(), vm)?;
send_all_bytes(socket, buf[..n].to_vec(), vm, None)?;
// Check if there's more to write
if !conn.wants_write() {
break;
Expand Down Expand Up @@ -1496,15 +1516,20 @@ pub(super) fn ssl_read(
}

// Flush pending TLS data before continuing
// CRITICAL: Pass deadline so flush respects read timeout
let tls_data = ssl_write_tls_records(conn)?;
if !tls_data.is_empty() {
// Use best-effort send - don't fail READ just because WRITE couldn't complete
match send_all_bytes(socket, tls_data, vm) {
match send_all_bytes(socket, tls_data, vm, deadline) {
Ok(()) => {}
Err(SslError::WantWrite) => {
// Socket buffer full - acceptable during READ operation
// Pending data will be sent on next write/read call
}
Err(SslError::Timeout(_)) => {
// Timeout during flush is acceptable during READ
// Pending data stays buffered for next operation
}
Err(e) => return Err(e),
}
}
Expand Down