@@ -1976,6 +1976,24 @@ def run(self):
1976
1976
sys .stdout .write (" server: read CB tls-unique from client, sending our CB data...\n " )
1977
1977
data = self .sslconn .get_channel_binding ("tls-unique" )
1978
1978
self .write (repr (data ).encode ("us-ascii" ) + b"\n " )
1979
+ elif stripped == b'PHA' :
1980
+ if support .verbose and self .server .connectionchatty :
1981
+ sys .stdout .write (
1982
+ " server: initiating post handshake auth\n " )
1983
+ try :
1984
+ self .sslconn .verify_client_post_handshake ()
1985
+ except ssl .SSLError as e :
1986
+ self .write (repr (e ).encode ("us-ascii" ) + b"\n " )
1987
+ else :
1988
+ self .write (b"OK\n " )
1989
+ elif stripped == b'HASCERT' :
1990
+ if self .sslconn .getpeercert () is not None :
1991
+ self .write (b'TRUE\n ' )
1992
+ else :
1993
+ self .write (b'FALSE\n ' )
1994
+ elif stripped == b'GETCERT' :
1995
+ cert = self .sslconn .getpeercert ()
1996
+ self .write (repr (cert ).encode ("us-ascii" ) + b"\n " )
1979
1997
else :
1980
1998
if (support .verbose and
1981
1999
self .server .connectionchatty ):
@@ -3629,6 +3647,194 @@ def test_session_handling(self):
3629
3647
'Session refers to a different SSLContext.' )
3630
3648
3631
3649
3650
+ def testing_context ():
3651
+ """Create context
3652
+
3653
+ client_context, server_context, hostname = testing_context()
3654
+ """
3655
+ client_context = ssl .SSLContext (ssl .PROTOCOL_TLS_CLIENT )
3656
+ client_context .load_verify_locations (SIGNING_CA )
3657
+
3658
+ server_context = ssl .SSLContext (ssl .PROTOCOL_TLS_SERVER )
3659
+ server_context .load_cert_chain (SIGNED_CERTFILE )
3660
+ server_context .load_verify_locations (SIGNING_CA )
3661
+
3662
+ return client_context , server_context , 'localhost'
3663
+
3664
+
3665
+ @unittest .skipUnless (ssl .HAS_TLSv1_3 , "Test needs TLS 1.3" )
3666
+ class TestPostHandshakeAuth (unittest .TestCase ):
3667
+ def test_pha_setter (self ):
3668
+ protocols = [
3669
+ ssl .PROTOCOL_TLS , ssl .PROTOCOL_TLS_SERVER , ssl .PROTOCOL_TLS_CLIENT
3670
+ ]
3671
+ for protocol in protocols :
3672
+ ctx = ssl .SSLContext (protocol )
3673
+ self .assertEqual (ctx .post_handshake_auth , False )
3674
+
3675
+ ctx .post_handshake_auth = True
3676
+ self .assertEqual (ctx .post_handshake_auth , True )
3677
+
3678
+ ctx .verify_mode = ssl .CERT_REQUIRED
3679
+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
3680
+ self .assertEqual (ctx .post_handshake_auth , True )
3681
+
3682
+ ctx .post_handshake_auth = False
3683
+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
3684
+ self .assertEqual (ctx .post_handshake_auth , False )
3685
+
3686
+ ctx .verify_mode = ssl .CERT_OPTIONAL
3687
+ ctx .post_handshake_auth = True
3688
+ self .assertEqual (ctx .verify_mode , ssl .CERT_OPTIONAL )
3689
+ self .assertEqual (ctx .post_handshake_auth , True )
3690
+
3691
+ def test_pha_required (self ):
3692
+ client_context , server_context , hostname = testing_context ()
3693
+ server_context .post_handshake_auth = True
3694
+ server_context .verify_mode = ssl .CERT_REQUIRED
3695
+ client_context .post_handshake_auth = True
3696
+ client_context .load_cert_chain (SIGNED_CERTFILE )
3697
+
3698
+ server = ThreadedEchoServer (context = server_context , chatty = False )
3699
+ with server :
3700
+ with client_context .wrap_socket (socket .socket (),
3701
+ server_hostname = hostname ) as s :
3702
+ s .connect ((HOST , server .port ))
3703
+ s .write (b'HASCERT' )
3704
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3705
+ s .write (b'PHA' )
3706
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3707
+ s .write (b'HASCERT' )
3708
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3709
+ # PHA method just returns true when cert is already available
3710
+ s .write (b'PHA' )
3711
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3712
+ s .write (b'GETCERT' )
3713
+ cert_text = s .recv (4096 ).decode ('us-ascii' )
3714
+ self .assertIn ('Python Software Foundation CA' , cert_text )
3715
+
3716
+ def test_pha_required_nocert (self ):
3717
+ client_context , server_context , hostname = testing_context ()
3718
+ server_context .post_handshake_auth = True
3719
+ server_context .verify_mode = ssl .CERT_REQUIRED
3720
+ client_context .post_handshake_auth = True
3721
+
3722
+ server = ThreadedEchoServer (context = server_context , chatty = False )
3723
+ with server :
3724
+ with client_context .wrap_socket (socket .socket (),
3725
+ server_hostname = hostname ) as s :
3726
+ s .connect ((HOST , server .port ))
3727
+ s .write (b'PHA' )
3728
+ # receive CertificateRequest
3729
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3730
+ # send empty Certificate + Finish
3731
+ s .write (b'HASCERT' )
3732
+ # receive alert
3733
+ with self .assertRaisesRegex (
3734
+ ssl .SSLError ,
3735
+ 'tlsv13 alert certificate required' ):
3736
+ s .recv (1024 )
3737
+
3738
+ def test_pha_optional (self ):
3739
+ if support .verbose :
3740
+ sys .stdout .write ("\n " )
3741
+
3742
+ client_context , server_context , hostname = testing_context ()
3743
+ server_context .post_handshake_auth = True
3744
+ server_context .verify_mode = ssl .CERT_REQUIRED
3745
+ client_context .post_handshake_auth = True
3746
+ client_context .load_cert_chain (SIGNED_CERTFILE )
3747
+
3748
+ # check CERT_OPTIONAL
3749
+ server_context .verify_mode = ssl .CERT_OPTIONAL
3750
+ server = ThreadedEchoServer (context = server_context , chatty = False )
3751
+ with server :
3752
+ with client_context .wrap_socket (socket .socket (),
3753
+ server_hostname = hostname ) as s :
3754
+ s .connect ((HOST , server .port ))
3755
+ s .write (b'HASCERT' )
3756
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3757
+ s .write (b'PHA' )
3758
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3759
+ s .write (b'HASCERT' )
3760
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3761
+
3762
+ def test_pha_optional_nocert (self ):
3763
+ if support .verbose :
3764
+ sys .stdout .write ("\n " )
3765
+
3766
+ client_context , server_context , hostname = testing_context ()
3767
+ server_context .post_handshake_auth = True
3768
+ server_context .verify_mode = ssl .CERT_OPTIONAL
3769
+ client_context .post_handshake_auth = True
3770
+
3771
+ server = ThreadedEchoServer (context = server_context , chatty = False )
3772
+ with server :
3773
+ with client_context .wrap_socket (socket .socket (),
3774
+ server_hostname = hostname ) as s :
3775
+ s .connect ((HOST , server .port ))
3776
+ s .write (b'HASCERT' )
3777
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3778
+ s .write (b'PHA' )
3779
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3780
+ # optional doens't fail when client does not have a cert
3781
+ s .write (b'HASCERT' )
3782
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
3783
+
3784
+ def test_pha_no_pha_client (self ):
3785
+ client_context , server_context , hostname = testing_context ()
3786
+ server_context .post_handshake_auth = True
3787
+ server_context .verify_mode = ssl .CERT_REQUIRED
3788
+ client_context .load_cert_chain (SIGNED_CERTFILE )
3789
+
3790
+ server = ThreadedEchoServer (context = server_context , chatty = False )
3791
+ with server :
3792
+ with client_context .wrap_socket (socket .socket (),
3793
+ server_hostname = hostname ) as s :
3794
+ s .connect ((HOST , server .port ))
3795
+ with self .assertRaisesRegex (ssl .SSLError , 'not server' ):
3796
+ s .verify_client_post_handshake ()
3797
+ s .write (b'PHA' )
3798
+ self .assertIn (b'extension not received' , s .recv (1024 ))
3799
+
3800
+ def test_pha_no_pha_server (self ):
3801
+ # server doesn't have PHA enabled, cert is requested in handshake
3802
+ client_context , server_context , hostname = testing_context ()
3803
+ server_context .verify_mode = ssl .CERT_REQUIRED
3804
+ client_context .post_handshake_auth = True
3805
+ client_context .load_cert_chain (SIGNED_CERTFILE )
3806
+
3807
+ server = ThreadedEchoServer (context = server_context , chatty = False )
3808
+ with server :
3809
+ with client_context .wrap_socket (socket .socket (),
3810
+ server_hostname = hostname ) as s :
3811
+ s .connect ((HOST , server .port ))
3812
+ s .write (b'HASCERT' )
3813
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3814
+ # PHA doesn't fail if there is already a cert
3815
+ s .write (b'PHA' )
3816
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
3817
+ s .write (b'HASCERT' )
3818
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
3819
+
3820
+ def test_pha_not_tls13 (self ):
3821
+ # TLS 1.2
3822
+ client_context , server_context , hostname = testing_context ()
3823
+ server_context .verify_mode = ssl .CERT_REQUIRED
3824
+ client_context .options |= ssl .OP_NO_TLSv1_3
3825
+ client_context .post_handshake_auth = True
3826
+ client_context .load_cert_chain (SIGNED_CERTFILE )
3827
+
3828
+ server = ThreadedEchoServer (context = server_context , chatty = False )
3829
+ with server :
3830
+ with client_context .wrap_socket (socket .socket (),
3831
+ server_hostname = hostname ) as s :
3832
+ s .connect ((HOST , server .port ))
3833
+ # PHA fails for TLS != 1.3
3834
+ s .write (b'PHA' )
3835
+ self .assertIn (b'WRONG_SSL_VERSION' , s .recv (1024 ))
3836
+
3837
+
3632
3838
def test_main (verbose = False ):
3633
3839
if support .verbose :
3634
3840
import warnings
@@ -3681,6 +3887,7 @@ def test_main(verbose=False):
3681
3887
thread_info = support .threading_setup ()
3682
3888
if thread_info :
3683
3889
tests .append (ThreadedTests )
3890
+ tests .append (TestPostHandshakeAuth )
3684
3891
3685
3892
try :
3686
3893
support .run_unittest (* tests )
0 commit comments