@@ -1976,6 +1976,24 @@ def run(self):
19761976 sys .stdout .write (" server: read CB tls-unique from client, sending our CB data...\n " )
19771977 data = self .sslconn .get_channel_binding ("tls-unique" )
19781978 self .write (repr (data ).encode ("us-ascii" ) + b"\n " )
1979+ elif stripped == b'PHA' :
1980+ if support .verbose and self .server .connectionchatty :
1981+ sys .stdout .write (
1982+ " server: initiating post handshake auth\n " )
1983+ try :
1984+ self .sslconn .verify_client_post_handshake ()
1985+ except ssl .SSLError as e :
1986+ self .write (repr (e ).encode ("us-ascii" ) + b"\n " )
1987+ else :
1988+ self .write (b"OK\n " )
1989+ elif stripped == b'HASCERT' :
1990+ if self .sslconn .getpeercert () is not None :
1991+ self .write (b'TRUE\n ' )
1992+ else :
1993+ self .write (b'FALSE\n ' )
1994+ elif stripped == b'GETCERT' :
1995+ cert = self .sslconn .getpeercert ()
1996+ self .write (repr (cert ).encode ("us-ascii" ) + b"\n " )
19791997 else :
19801998 if (support .verbose and
19811999
6284
self .server .connectionchatty ):
@@ -3629,6 +3647,194 @@ def test_session_handling(self):
36293647 'Session refers to a different SSLContext.' )
36303648
36313649
3650+ def testing_context ():
3651+ """Create context
3652+
3653+ client_context, server_context, hostname = testing_context()
3654+ """
3655+ client_context = ssl .SSLContext (ssl .PROTOCOL_TLS_CLIENT )
3656+ client_context .load_verify_locations (SIGNING_CA )
3657+
3658+ server_context = ssl .SSLContext (ssl .PROTOCOL_TLS_SERVER )
3659+ server_context .load_cert_chain (SIGNED_CERTFILE )
3660+ server_context .load_verify_locations (SIGNING_CA )
3661+
3662+ return client_context , server_context , 'localhost'
3663+
3664+
3665+ @unittest .skipUnless (ssl .HAS_TLSv1_3 , "Test needs TLS 1.3" )
3666+ class TestPostHandshakeAuth (unittest .TestCase ):
3667+ def test_pha_setter (self ):
3668+ protocols = [
3669+ ssl .PROTOCOL_TLS , ssl .PROTOCOL_TLS_SERVER , ssl .PROTOCOL_TLS_CLIENT
3670+ ]
3671+ for protocol in protocols :
3672+ ctx = ssl .SSLContext (protocol )
3673+ self .assertEqual (ctx .post_handshake_auth , False )
3674+
3675+ ctx .post_handshake_auth = True
3676+ self .assertEqual (ctx .post_handshake_auth , True )
3677+
3678+ ctx .verify_mode = ssl .CERT_REQUIRED
3679+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
3680+ self .assertEqual (ctx .post_handshake_auth , True )
3681+
3682+ ctx .post_handshake_auth = False
3683+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
3684+ self .assertEqual (ctx .post_handshake_auth , False )
3685+
3686+ ctx .verify_mode = ssl .CERT_OPTIONAL
3687+ ctx .post_handshake_auth = True
3688+ self .assertEqual (ctx .verify_mode , ssl .CERT_OPTIONAL )
3689+ self .assertEqual (ctx .post_handshake_auth , True )
3690+
3691+ def test_pha_required (self ):
3692+ client_context , server_context , hostname = testing_context ()
3693+ server_context .post_handshake_auth = True
3694+ server_context .verify_mode = ssl .CERT_REQUIRED
3695+ client_context .post_handshake_auth = True
3696+ client_context .load_cert_chain (SIGNED_CERTFILE )
3697+
3698+ server = ThreadedEchoServer (context = server_context , chatty = False )
3699+ with server :
3700+ with client_context .wrap_socket (socket .socket (),
3701+ server_hostname = hostname ) as s :
3702+ s .connect ((HOST , server .port ))
3703+ s .write (b'HASCERT' )
3704+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3705+ s .write (b'PHA' )
3706+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3707+ s .write (b'HASCERT' )
3708+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3709+ # PHA method just returns true when cert is already available
3710+ s .write (b'PHA' )
3711+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3712+ s .write (b'GETCERT' )
3713+ cert_text = s .recv (4096 ).decode ('us-ascii' )
3714+ self .assertIn ('Python Software Foundation CA' , cert_text )
3715+
3716+ def test_pha_required_nocert (self ):
3717+ client_context , server_context , hostname = testing_context ()
3718+ server_context .post_handshake_auth = True
3719+ server_context .verify_mode = ssl .CERT_REQUIRED
3720+ client_context .post_handshake_auth = True
3721+
3722+ server = ThreadedEchoServer (context = server_context , chatty = False )
3723+ with server :
3724+ with client_context .wrap_socket (socket .socket (),
3725+ server_hostname = hostname ) as s :
3726+ s .connect ((HOST , server .port ))
3727+ s .write (b'PHA' )
3728+ # receive CertificateRequest
3729+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3730+ # send empty Certificate + Finish
3731+ s .write (b'HASCERT' )
3732+ # receive alert
3733+ with self .assertRaisesRegex (
3734+ ssl .SSLError ,
3735+ 'tlsv13 alert certificate required' ):
3736+ s .recv (1024 )
3737+
3738+ def test_pha_optional (self ):
3739+ if support .verbose :
3740+ sys .stdout .write ("\n " )
3741+
3742+ client_context , server_context , hostname = testing_context ()
3743+ server_context .post_handshake_auth = True
3744+ server_context .verify_mode = ssl .CERT_REQUIRED
3745+ client_context .post_handshake_auth = True
3746+ client_context .load_cert_chain (SIGNED_CERTFILE )
3747+
3748+ # check CERT_OPTIONAL
3749+ server_context .verify_mode = ssl .CERT_OPTIONAL
3750+ server = ThreadedEchoServer (context = server_context , chatty = False )
3751+ with server :
3752+ with client_context .wrap_socket (socket .socket (),
3753+ server_hostname = hostname ) as s :
3754+ s .connect ((HOST , server .port ))
3755+ s .write (b'HASCERT' )
3756+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3757+ s .write (b'PHA' )
3758+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3759+ s .write (b'HASCERT' )
3760+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3761+
3762+ def test_pha_optional_nocert (self ):
3763+ if support .verbose :
3764+ sys .stdout .write ("\n " )
3765+
3766+ client_context , server_context , hostname = testing_context ()
3767+ server_context .post_handshake_auth = True
3768+ server_context .verify_mode <
436E
span class=pl-c1>= ssl .CERT_OPTIONAL
3769+ client_context .post_handshake_auth = True
3770+
3771+ server = ThreadedEchoServer (context = server_context , chatty = False )
3772+ with server :
3773+ with client_context .wrap_socket (socket .socket (),
3774+ server_hostname = hostname ) as s :
3775+ s .connect ((HOST , server .port ))
3776+ s .write (b'HASCERT' )
3777+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3778+ s .write (b'PHA' )
3779+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3780+ # optional doens't fail when client does not have a cert
3781+ s .write (b'HASCERT' )
3782+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3783+
3784+ def test_pha_no_pha_client (self ):
3785+ client_context , server_context , hostname = testing_context ()
3786+ server_context .post_handshake_auth = True
3787+ server_context .verify_mode = ssl .CERT_REQUIRED
3788+ client_context .load_cert_chain (SIGNED_CERTFILE )
3789+
3790+ server = ThreadedEchoServer (context = server_context , chatty = False )
3791+ with server :
3792+ with client_context .wrap_socket (socket .socket (),
3793+ server_hostname = hostname ) as s :
3794+ s .connect ((HOST , server .port ))
3795+ with self .assertRaisesRegex (ssl .SSLError , 'not server' ):
3796+ s .verify_client_post_handshake ()
3797+ s .write (b'PHA' )
3798+ self .assertIn (b'extension not received' , s .recv (1024 ))
3799+
3800+ def test_pha_no_pha_server (self ):
3801+ # server doesn't have PHA enabled, cert is requested in handshake
3802+ client_context , server_context , hostname = testing_context ()
3803+ server_context .verify_mode = ssl .CERT_REQUIRED
3804+ client_context .post_handshake_auth = True
3805+ client_context .load_cert_chain (SIGNED_CERTFILE )
3806+
3807+ server = ThreadedEchoServer (context = server_context , chatty = False )
3808+ with server :
3809+ with client_context .wrap_socket (socket .socket (),
3810+ server_hostname = hostname ) as s :
3811+ s .connect ((HOST , server .port ))
3812+ s .write (b'HASCERT' )
3813+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3814+ # PHA doesn't fail if there is already a cert
3815+ s .write (b'PHA' )
3816+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3817+ s .write (b'HASCERT' )
3818+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3819+
3820+ def test_pha_not_tls13 (self ):
3821+ # TLS 1.2
3822+ client_context , server_context , hostname = testing_context ()
3823+ server_context .verify_mode = ssl .CERT_REQUIRED
3824+ client_context .options |= ssl .OP_NO_TLSv1_3
3825+ client_context .post_handshake_auth = True
3826+ client_context .load_cert_chain (SIGNED_CERTFILE )
3827+
3828+ server = ThreadedEchoServer (context = server_context , chatty = False )
3829+ with server :
3830+ with client_context .wrap_socket (socket .socket (),
3831+ server_hostname = hostname ) as s :
3832+ s .connect ((HOST , server .port ))
3833+ # PHA fails for TLS != 1.3
3834+ s .write (b'PHA' )
3835+ self .assertIn (b'WRONG_SSL_VERSION' , s .recv (1024 ))
3836+
3837+
36323838def test_main (verbose = False ):
36333839 if support .verbose :
36343840 import warnings
@@ -3681,6 +3887,7 @@ def test_main(verbose=False):
36813887 thread_info = support .threading_setup ()
36823888 if thread_info :
36833889 tests .append (ThreadedTests )
3890+ tests .append (TestPostHandshakeAuth )
36843891
36853892 try :
36863893 support .run_unittest (* tests )
00)