diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py index 3d5ca2faab..3b309b7869 100644 --- a/Lib/test/test_urllib2_localnet.py +++ b/Lib/test/test_urllib2_localnet.py @@ -568,8 +568,6 @@ def test_200_with_parameters(self): self.assertEqual(data, expected_response) self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"]) - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON, ValueError: illegal environment variable name") def test_https(self): handler = self.start_https_server() @@ -577,8 +575,6 @@ def test_https(self): data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) self.assertEqual(data, b"we care a bit") - # TODO: RUSTPYTHON - @unittest.expectedFailure @unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON, ValueError: illegal environment variable name") def test_https_with_cafile(self): handler = self.start_https_server(certfile=CERT_localhost) diff --git a/stdlib/src/ssl.rs b/stdlib/src/ssl.rs index c642d6e087..052248a583 100644 --- a/stdlib/src/ssl.rs +++ b/stdlib/src/ssl.rs @@ -1,5 +1,7 @@ // spell-checker:disable +mod cert; + use crate::vm::{PyRef, VirtualMachine, builtins::PyModule}; use openssl_probe::ProbeResult; @@ -26,15 +28,12 @@ cfg_if::cfg_if! { } #[allow(non_upper_case_globals)] -#[pymodule(with(ossl101, ossl111, windows))] +#[pymodule(with(cert::ssl_cert, ossl101, ossl111, windows))] mod _ssl { use super::{bio, probe}; use crate::{ - common::{ - ascii, - lock::{ - PyMappedRwLockReadGuard, PyMutex, PyRwLock, PyRwLockReadGuard, PyRwLockWriteGuard, - }, + common::lock::{ + PyMappedRwLockReadGuard, PyMutex, PyRwLock, PyRwLockReadGuard, PyRwLockWriteGuard, }, socket::{self, PySocket}, vm::{ @@ -43,7 +42,7 @@ mod _ssl { PyBaseExceptionRef, PyBytesRef, PyListRef, PyOSError, PyStrRef, PyTypeRef, PyWeak, }, class_or_notimplemented, - convert::{ToPyException, ToPyObject}, + convert::ToPyException, exceptions, function::{ ArgBytesLike, ArgCallable, ArgMemoryBuffer, ArgStrOrBytesLike, Either, FsPath, @@ -60,7 +59,7 @@ mod _ssl { error::ErrorStack, nid::Nid, ssl::{self, SslContextBuilder, SslOptions, SslVerifyMode}, - x509::{self, X509, X509Ref}, + x509::X509, }; use openssl_sys as sys; use rustpython_vm::ospath::OsPath; @@ -73,6 +72,14 @@ mod _ssl { time::Instant, }; + // Import certificate types from parent module + use super::cert::{self, cert_to_certificate, cert_to_py}; + + // Re-export PySSLCertificate to make it available in the _ssl module + // It will be automatically exposed to Python via #[pyclass] + #[allow(unused_imports)] + use super::cert::PySSLCertificate; + // Constants #[pyattr] use sys::{ @@ -178,6 +185,18 @@ mod _ssl { #[pyattr] const HAS_PSK: bool = true; + // Encoding constants for Certificate.public_bytes() + #[pyattr] + pub(crate) const ENCODING_PEM: i32 = sys::X509_FILETYPE_PEM; + #[pyattr] + pub(crate) const ENCODING_DER: i32 = sys::X509_FILETYPE_ASN1; + #[pyattr] + const ENCODING_PEM_AUX: i32 = sys::X509_FILETYPE_PEM + 0x100; + + // OpenSSL error codes for unexpected EOF detection + const ERR_LIB_SSL: i32 = 20; + const SSL_R_UNEXPECTED_EOF_WHILE_READING: i32 = 294; + // the openssl version from the API headers #[pyattr(name = "OPENSSL_VERSION")] @@ -349,32 +368,6 @@ mod _ssl { fn _nid2obj(nid: Nid) -> Option { unsafe { ptr2obj(sys::OBJ_nid2obj(nid.as_raw())) } } - fn obj2txt(obj: &Asn1ObjectRef, no_name: bool) -> Option { - let no_name = i32::from(no_name); - let ptr = obj.as_ptr(); - let b = unsafe { - let buflen = sys::OBJ_obj2txt(std::ptr::null_mut(), 0, ptr, no_name); - assert!(buflen >= 0); - if buflen == 0 { - return None; - } - let buflen = buflen as usize; - let mut buf = Vec::::with_capacity(buflen + 1); - let ret = sys::OBJ_obj2txt( - buf.as_mut_ptr() as *mut libc::c_char, - buf.capacity() as _, - ptr, - no_name, - ); - assert!(ret >= 0); - // SAFETY: OBJ_obj2txt initialized the buffer successfully - buf.set_len(buflen); - buf - }; - let s = String::from_utf8(b) - .unwrap_or_else(|e| String::from_utf8_lossy(e.as_bytes()).into_owned()); - Some(s) - } type PyNid = (libc::c_int, String, String, Option); fn obj2py(obj: &Asn1ObjectRef, vm: &VirtualMachine) -> PyResult { @@ -387,7 +380,12 @@ mod _ssl { .long_name() .map_err(|_| vm.new_value_error("NID has no long name".to_owned()))? .to_owned(); - Ok((nid.as_raw(), short_name, long_name, obj2txt(obj, true))) + Ok(( + nid.as_raw(), + short_name, + long_name, + cert::obj2txt(obj, true), + )) } #[derive(FromArgs)] @@ -1219,46 +1217,54 @@ mod _ssl { } #[pymethod] - fn get_unverified_chain(&self, vm: &VirtualMachine) -> Option { + fn get_unverified_chain(&self, vm: &VirtualMachine) -> PyResult> { let stream = self.stream.read(); - let chain = stream.ssl().peer_cert_chain()?; + let Some(chain) = stream.ssl().peer_cert_chain() else { + return Ok(None); + }; + // Return Certificate objects let certs: Vec = chain .iter() - .filter_map(|cert| cert.to_der().ok().map(|der| vm.ctx.new_bytes(der).into())) - .collect(); - - Some(vm.ctx.new_list(certs).into()) + .map(|cert| unsafe { + sys::X509_up_ref(cert.as_ptr()); + let owned = X509::from_ptr(cert.as_ptr()); + cert_to_certificate(vm, owned) + }) + .collect::>()?; + Ok(Some(vm.ctx.new_list(certs))) } #[pymethod] - fn get_verified_chain(&self, vm: &VirtualMachine) -> Option { + fn get_verified_chain(&self, vm: &VirtualMachine) -> PyResult> { let stream = self.stream.read(); unsafe { let chain = sys::SSL_get0_verified_chain(stream.ssl().as_ptr()); if chain.is_null() { - return None; + return Ok(None); } let num_certs = sys::OPENSSL_sk_num(chain as *const _); - let mut certs = Vec::new(); + let mut certs = Vec::with_capacity(num_certs as usize); + // Return Certificate objects for i in 0..num_certs { let cert_ptr = sys::OPENSSL_sk_value(chain as *const _, i) as *mut sys::X509; if cert_ptr.is_null() { continue; } - let cert = X509Ref::from_ptr(cert_ptr); - if let Ok(der) = cert.to_der() { - certs.push(vm.ctx.new_bytes(der).into()); - } + // Clone the X509 certificate to create an owned copy + sys::X509_up_ref(cert_ptr); + let owned_cert = X509::from_ptr(cert_ptr); + let cert_obj = cert_to_certificate(vm, owned_cert)?; + certs.push(cert_obj); } - if certs.is_empty() { + Ok(if certs.is_empty() { None } else { Some(vm.ctx.new_list(certs)) - } + }) } } @@ -1978,7 +1984,10 @@ mod _ssl { } #[track_caller] - fn convert_openssl_error(vm: &VirtualMachine, err: ErrorStack) -> PyBaseExceptionRef { + pub(crate) fn convert_openssl_error( + vm: &VirtualMachine, + err: ErrorStack, + ) -> PyBaseExceptionRef { let cls = PySslError::class(&vm.ctx).to_owned(); match err.errors().last() { Some(e) => { @@ -2047,18 +2056,49 @@ mod _ssl { ), ssl::ErrorCode::SYSCALL => match e.io_error() { Some(io_err) => return io_err.to_pyexception(vm), - None => ( - PySslSyscallError::class(&vm.ctx).to_owned(), - "EOF occurred in violation of protocol", - ), + // When no I/O error and OpenSSL error queue is empty, + // this is an EOF in violation of protocol -> SSLEOFError + // Need to set args[0] = SSL_ERROR_EOF for suppress_ragged_eofs check + None => { + return vm.new_exception( + PySslEOFError::class(&vm.ctx).to_owned(), + vec![ + vm.ctx.new_int(SSL_ERROR_EOF).into(), + vm.ctx + .new_str("EOF occurred in violation of protocol") + .into(), + ], + ); + } }, - ssl::ErrorCode::SSL => match e.ssl_error() { - Some(e) => return convert_openssl_error(vm, e.clone()), - None => ( + ssl::ErrorCode::SSL => { + // Check for OpenSSL 3.0 SSL_R_UNEXPECTED_EOF_WHILE_READING + if let Some(ssl_err) = e.ssl_error() { + // In OpenSSL 3.0+, unexpected EOF is reported as SSL_ERROR_SSL + // with this specific reason code instead of SSL_ERROR_SYSCALL + unsafe { + let err_code = sys::ERR_peek_last_error(); + let reason = sys::ERR_GET_REASON(err_code); + let lib = sys::ERR_GET_LIB(err_code); + if lib == ERR_LIB_SSL && reason == SSL_R_UNEXPECTED_EOF_WHILE_READING { + return vm.new_exception( + vm.class("_ssl", "SSLEOFError"), + vec![ + vm.ctx.new_int(SSL_ERROR_EOF).into(), + vm.ctx + .new_str("EOF occurred in violation of protocol") + .into(), + ], + ); + } + } + return convert_openssl_error(vm, ssl_err.clone()); + } + ( PySslError::class(&vm.ctx).to_owned(), "A failure in the SSL library occurred", - ), - }, + ) + } _ => ( PySslError::class(&vm.ctx).to_owned(), "A failure in the SSL library occurred", @@ -2106,93 +2146,6 @@ mod _ssl { (cipher.name(), cipher.version(), cipher.bits().secret) } - fn cert_to_py(vm: &VirtualMachine, cert: &X509Ref, binary: bool) -> PyResult { - let r = if binary { - let b = cert.to_der().map_err(|e| convert_openssl_error(vm, e))?; - vm.ctx.new_bytes(b).into() - } else { - let dict = vm.ctx.new_dict(); - - let name_to_py = |name: &x509::X509NameRef| -> PyResult { - let list = name - .entries() - .map(|entry| { - let txt = obj2txt(entry.object(), false).to_pyobject(vm); - let data = vm.ctx.new_str(entry.data().as_utf8()?.to_owned()); - Ok(vm.new_tuple(((txt, data),)).into()) - }) - .collect::>() - .map_err(|e| convert_openssl_error(vm, e))?; - Ok(vm.ctx.new_tuple(list).into()) - }; - - dict.set_item("subject", name_to_py(cert.subject_name())?, vm)?; - dict.set_item("issuer", name_to_py(cert.issuer_name())?, vm)?; - // X.509 version: OpenSSL uses 0-based (0=v1, 1=v2, 2=v3) but Python uses 1-based (1=v1, 2=v2, 3=v3) - dict.set_item("version", vm.new_pyobj(cert.version() + 1), vm)?; - - let serial_num = cert - .serial_number() - .to_bn() - .and_then(|bn| bn.to_hex_str()) - .map_err(|e| convert_openssl_error(vm, e))?; - dict.set_item( - "serialNumber", - vm.ctx.new_str(serial_num.to_owned()).into(), - vm, - )?; - - dict.set_item( - "notBefore", - vm.ctx.new_str(cert.not_before().to_string()).into(), - vm, - )?; - dict.set_item( - "notAfter", - vm.ctx.new_str(cert.not_after().to_string()).into(), - vm, - )?; - - #[allow(clippy::manual_map)] - if let Some(names) = cert.subject_alt_names() { - let san = names - .iter() - .filter_map(|gen_name| { - if let Some(email) = gen_name.email() { - Some(vm.new_tuple((ascii!("email"), email)).into()) - } else if let Some(dnsname) = gen_name.dnsname() { - Some(vm.new_tuple((ascii!("DNS"), dnsname)).into()) - } else if let Some(ip) = gen_name.ipaddress() { - Some( - vm.new_tuple(( - ascii!("IP Address"), - String::from_utf8_lossy(ip).into_owned(), - )) - .into(), - ) - } else { - // TODO: convert every type of general name: - // https://github.com/python/cpython/blob/3.6/Modules/_ssl.c#L1092-L1231 - None - } - }) - .collect(); - dict.set_item("subjectAltName", vm.ctx.new_tuple(san).into(), vm)?; - }; - - dict.into() - }; - Ok(r) - } - - #[pyfunction] - fn _test_decode_cert(path: FsPath, vm: &VirtualMachine) -> PyResult { - let path = path.to_path_buf(vm)?; - let pem = std::fs::read(path).map_err(|e| e.to_pyexception(vm))?; - let x509 = X509::from_pem(&pem).map_err(|e| convert_openssl_error(vm, e))?; - cert_to_py(vm, &x509, false) - } - impl Read for SocketStream { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { let mut socket: &PySocket = &self.0; diff --git a/stdlib/src/ssl/cert.rs b/stdlib/src/ssl/cert.rs new file mode 100644 index 0000000000..9a77dee1ea --- /dev/null +++ b/stdlib/src/ssl/cert.rs @@ -0,0 +1,234 @@ +pub(super) use ssl_cert::{PySSLCertificate, cert_to_certificate, cert_to_py, obj2txt}; + +// Certificate type for SSL module + +#[pymodule(sub)] +pub(crate) mod ssl_cert { + use crate::{ + common::ascii, + vm::{ + PyObjectRef, PyPayload, PyResult, VirtualMachine, + convert::{ToPyException, ToPyObject}, + function::{FsPath, OptionalArg}, + }, + }; + use foreign_types_shared::ForeignTypeRef; + use openssl::{ + asn1::Asn1ObjectRef, + x509::{self, X509, X509Ref}, + }; + use openssl_sys as sys; + use std::fmt; + + // Import constants and error converter from _ssl module + use crate::ssl::_ssl::{ENCODING_DER, ENCODING_PEM, convert_openssl_error}; + + pub(crate) fn obj2txt(obj: &Asn1ObjectRef, no_name: bool) -> Option { + let no_name = i32::from(no_name); + let ptr = obj.as_ptr(); + let b = unsafe { + let buflen = sys::OBJ_obj2txt(std::ptr::null_mut(), 0, ptr, no_name); + assert!(buflen >= 0); + if buflen == 0 { + return None; + } + let buflen = buflen as usize; + let mut buf = Vec::::with_capacity(buflen + 1); + let ret = sys::OBJ_obj2txt( + buf.as_mut_ptr() as *mut libc::c_char, + buf.capacity() as _, + ptr, + no_name, + ); + assert!(ret >= 0); + // SAFETY: OBJ_obj2txt initialized the buffer successfully + buf.set_len(buflen); + buf + }; + let s = String::from_utf8(b) + .unwrap_or_else(|e| String::from_utf8_lossy(e.as_bytes()).into_owned()); + Some(s) + } + + #[pyattr] + #[pyclass(module = "ssl", name = "Certificate")] + #[derive(PyPayload)] + pub(crate) struct PySSLCertificate { + cert: X509, + } + + impl fmt::Debug for PySSLCertificate { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Certificate") + } + } + + #[pyclass] + impl PySSLCertificate { + #[pymethod] + fn public_bytes( + &self, + format: OptionalArg, + vm: &VirtualMachine, + ) -> PyResult { + let format = format.unwrap_or(ENCODING_PEM); + + match format { + x if x == ENCODING_DER => { + // DER encoding + let der = self + .cert + .to_der() + .map_err(|e| convert_openssl_error(vm, e))?; + Ok(vm.ctx.new_bytes(der).into()) + } + x if x == ENCODING_PEM => { + // PEM encoding + let pem = self + .cert + .to_pem() + .map_err(|e| convert_openssl_error(vm, e))?; + Ok(vm.ctx.new_bytes(pem).into()) + } + _ => Err(vm.new_value_error("Unsupported format".to_owned())), + } + } + + #[pymethod] + fn get_info(&self, vm: &VirtualMachine) -> PyResult { + cert_to_dict(vm, &self.cert) + } + } + + fn name_to_py(vm: &VirtualMachine, name: &x509::X509NameRef) -> PyResult { + let list = name + .entries() + .map(|entry| { + let txt = obj2txt(entry.object(), false).to_pyobject(vm); + let asn1_str = entry.data(); + let data_bytes = asn1_str.as_slice(); + let data = match std::str::from_utf8(data_bytes) { + Ok(s) => vm.ctx.new_str(s.to_owned()), + Err(_) => vm + .ctx + .new_str(String::from_utf8_lossy(data_bytes).into_owned()), + }; + Ok(vm.new_tuple(((txt, data),)).into()) + }) + .collect::>()?; + Ok(vm.ctx.new_tuple(list).into()) + } + + // Helper to convert X509 to dict (for getpeercert with binary=False) + fn cert_to_dict(vm: &VirtualMachine, cert: &X509Ref) -> PyResult { + let dict = vm.ctx.new_dict(); + + dict.set_item("subject", name_to_py(vm, cert.subject_name())?, vm)?; + dict.set_item("issuer", name_to_py(vm, cert.issuer_name())?, vm)?; + // X.509 version: OpenSSL uses 0-based (0=v1, 1=v2, 2=v3) but Python uses 1-based (1=v1, 2=v2, 3=v3) + dict.set_item("version", vm.new_pyobj(cert.version() + 1), vm)?; + + let serial_num = cert + .serial_number() + .to_bn() + .and_then(|bn| bn.to_hex_str()) + .map_err(|e| convert_openssl_error(vm, e))?; + dict.set_item( + "serialNumber", + vm.ctx.new_str(serial_num.to_owned()).into(), + vm, + )?; + + dict.set_item( + "notBefore", + vm.ctx.new_str(cert.not_before().to_string()).into(), + vm, + )?; + dict.set_item( + "notAfter", + vm.ctx.new_str(cert.not_after().to_string()).into(), + vm, + )?; + + if let Some(names) = cert.subject_alt_names() { + let san: Vec = names + .iter() + .map(|gen_name| { + if let Some(email) = gen_name.email() { + vm.new_tuple((ascii!("email"), email)).into() + } else if let Some(dnsname) = gen_name.dnsname() { + vm.new_tuple((ascii!("DNS"), dnsname)).into() + } else if let Some(ip) = gen_name.ipaddress() { + // Parse IP address properly (IPv4 or IPv6) + let ip_str = if ip.len() == 4 { + // IPv4 + format!("{}.{}.{}.{}", ip[0], ip[1], ip[2], ip[3]) + } else if ip.len() == 16 { + // IPv6 - format like: "X:X:X:X:X:X:X:X" (not compressed) + format!( + "{:X}:{:X}:{:X}:{:X}:{:X}:{:X}:{:X}:{:X}", + (ip[0] as u16) << 8 | ip[1] as u16, + (ip[2] as u16) << 8 | ip[3] as u16, + (ip[4] as u16) << 8 | ip[5] as u16, + (ip[6] as u16) << 8 | ip[7] as u16, + (ip[8] as u16) << 8 | ip[9] as u16, + (ip[10] as u16) << 8 | ip[11] as u16, + (ip[12] as u16) << 8 | ip[13] as u16, + (ip[14] as u16) << 8 | ip[15] as u16 + ) + } else { + // Fallback for unexpected length + String::from_utf8_lossy(ip).into_owned() + }; + vm.new_tuple((ascii!("IP Address"), ip_str)).into() + } else if let Some(uri) = gen_name.uri() { + vm.new_tuple((ascii!("URI"), uri)).into() + } else { + // Handle DirName, Registered ID, and othername + // Check if this is a directory name + if let Some(dirname) = gen_name.directory_name() + && let Ok(py_name) = name_to_py(vm, dirname) + { + return vm.new_tuple((ascii!("DirName"), py_name)).into(); + } + + // TODO: Handle Registered ID (GEN_RID) + // CPython implementation uses i2t_ASN1_OBJECT to convert OID + // This requires accessing GENERAL_NAME union which is complex in Rust + // For now, we return for unhandled types + + // For othername and other unsupported types + vm.new_tuple((ascii!("othername"), ascii!(""))) + .into() + } + }) + .collect(); + dict.set_item("subjectAltName", vm.ctx.new_tuple(san).into(), vm)?; + }; + + Ok(dict.into()) + } + + // Helper to create Certificate object from X509 + pub(crate) fn cert_to_certificate(vm: &VirtualMachine, cert: X509) -> PyResult { + Ok(PySSLCertificate { cert }.into_ref(&vm.ctx).into()) + } + + // For getpeercert() - returns bytes or dict depending on binary flag + pub(crate) fn cert_to_py(vm: &VirtualMachine, cert: &X509Ref, binary: bool) -> PyResult { + if binary { + let b = cert.to_der().map_err(|e| convert_openssl_error(vm, e))?; + Ok(vm.ctx.new_bytes(b).into()) + } else { + cert_to_dict(vm, cert) + } + } + + #[pyfunction] + pub(crate) fn _test_decode_cert(path: FsPath, vm: &VirtualMachine) -> PyResult { + let path = path.to_path_buf(vm)?; + let pem = std::fs::read(path).map_err(|e| e.to_pyexception(vm))?; + let x509 = X509::from_pem(&pem).map_err(|e| convert_openssl_error(vm, e))?; + cert_to_py(vm, &x509, false) + } +}