@@ -218,7 +218,7 @@ def testing_context(server_cert=SIGNED_CERTFILE):
218
218
219
219
server_context = ssl .SSLContext (ssl .PROTOCOL_TLS_SERVER )
220
220
server_context .load_cert_chain (server_cert )
221
- client_context .load_verify_locations (SIGNING_CA )
221
+ server_context .load_verify_locations (SIGNING_CA )
222
222
223
223
return client_context , server_context , hostname
224
224
@@ -2262,6 +2262,23 @@ def run(self):
2262
2262
sys .stdout .write (" server: read CB tls-unique from client, sending our CB data...\n " )
2263
2263
data = self .sslconn .get_channel_binding ("tls-unique" )
2264
2264
self .write (repr (data ).encode ("us-ascii" ) + b"\n " )
2265
+ elif stripped == b'PHA' :
2266
+ if support .verbose and self .server .connectionchatty :
2267
+ sys .stdout .write (" server: initiating post handshake auth\n " )
2268
+ try :
2269
+ self .sslconn .verify_client_post_handshake ()
2270
+ except ssl .SSLError as e :
2271
+ self .write (repr (e ).encode ("us-ascii" ) + b"\n " )
2272
+ else :
2273
+ self .write (b"OK\n " )
2274
+ elif stripped == b'HASCERT' :
2275
+ if self .sslconn .getpeercert () is not None :
2276
+ self .write (b'TRUE\n ' )
2277
+ else :
2278
+ self .write (b'FALSE\n ' )
2279
+ elif stripped == b'GETCERT' :
2280
+ cert = self .sslconn .getpeercert ()
2281
+ self .write (repr (cert ).encode ("us-ascii" ) + b"\n " )
2265
2282
else :
2266
2283
if (support .verbose and
2267
2284
self .server .connectionchatty ):
@@ -4148,6 +4165,179 @@ def test_session_handling(self):
4148
4165
'Session refers to a different SSLContext.' )
4149
4166
4150
4167
4168
+ @unittest .skipUnless (ssl .HAS_TLSv1_3 , "Test needs TLS 1.3" )
4169
+ class TestPostHandshakeAuth (unittest .TestCase ):
4170
+ def test_pha_setter (self ):
4171
+ protocols = [
4172
+ ssl .PROTOCOL_TLS , ssl .PROTOCOL_TLS_SERVER , ssl .PROTOCOL_TLS_CLIENT
4173
+ ]
4174
+ for protocol in protocols :
4175
+ ctx = ssl .SSLContext (protocol )
4176
+ self .assertEqual (ctx .post_handshake_auth , False )
4177
+
4178
+ ctx .post_handshake_auth = True
4179
+ self .assertEqual (ctx .post_handshake_auth , True )
4180
+
4181
+ ctx .verify_mode = ssl .CERT_REQUIRED
4182
+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
4183
+ self .assertEqual (ctx .post_handshake_auth , True )
4184
+
4185
+ ctx .post_handshake_auth = False
4186
+ self .assertEqual (ctx .verify_mode , ssl .CERT_REQUIRED )
4187
+ self .assertEqual (ctx .post_handshake_auth , False )
4188
+
4189
+ ctx .verify_mode = ssl .CERT_OPTIONAL
4190
+ ctx .post_handshake_auth = True
4191
+ self .assertEqual (ctx .verify_mode , ssl .CERT_OPTIONAL )
4192
+ self .assertEqual (ctx .post_handshake_auth , True )
4193
+
4194
+ def test_pha_required (self ):
4195
+ client_context , server_context , hostname = testing_context ()
4196
+ server_context .post_handshake_auth = True
4197
+ server_context .verify_mode = ssl .CERT_REQUIRED
4198
+ client_context .post_handshake_auth = True
4199
+ client_context .load_cert_chain (SIGNED_CERTFILE )
4200
+
4201
+ server = ThreadedEchoServer (context = server_context , chatty = False )
4202
+ with server :
4203
+ with client_context .wrap_socket (socket .socket (),
4204
+ server_hostname = hostname ) as s :
4205
+ s .connect ((HOST , server .port ))
4206
+ s .write (b'HASCERT' )
4207
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4208
+ s .write (b'PHA' )
4209
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4210
+ s .write (b'HASCERT' )
4211
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4212
+ # PHA method just returns true when cert is already available
4213
+ s .write (b'PHA' )
4214
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4215
+ s .write (b'GETCERT' )
4216
+ cert_text = s .recv (4096 ).decode ('us-ascii' )
4217
+ self .assertIn ('Python Software Foundation CA' , cert_text )
4218
+
4219
+ def test_pha_required_nocert (self ):
4220
+ client_context , server_context , hostname = testing_context ()
4221
+ server_context .post_handshake_auth = True
4222
+ server_context .verify_mode = ssl .CERT_REQUIRED
4223
+ client_context .post_handshake_auth = True
4224
+
4225
+ server = ThreadedEchoServer (context = server_context , chatty = False )
4226
+ with server :
4227
+ with client_context .wrap_socket (socket .socket (),
4228
+ server_hostname = hostname ) as s :
4229
+ s .connect ((HOST , server .port ))
4230
+ s .write (b'PHA' )
4231
+ # receive CertificateRequest
4232
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4233
+ # send empty Certificate + Finish
4234
+ s .write (b'HASCERT' )
4235
+ # receive alert
4236
+ with self .assertRaisesRegex (
4237
+ ssl .SSLError ,
4238
+ 'tlsv13 alert certificate required' ):
4239
+ s .recv (1024 )
4240
+
4241
+ def test_pha_optional (self ):
4242
+ if support .verbose :
4243
+ sys .stdout .write ("\n " )
4244
+
4245
+ client_context , server_context , hostname = testing_context ()
4246
+ server_context .post_handshake_auth = True
4247
+ server_context .verify_mode = ssl .CERT_REQUIRED
4248
+ client_context .post_handshake_auth = True
4249
+ client_context .load_cert_chain (SIGNED_CERTFILE )
4250
+
4251
+ # check CERT_OPTIONAL
4252
+ server_context .verify_mode = ssl .CERT_OPTIONAL
4253
+ server = ThreadedEchoServer (context = server_context , chatty = False )
4254
+ with server :
4255
+ with client_context .wrap_socket (socket .socket (),
4256
+ server_hostname = hostname ) as s :
4257
+ s .connect ((HOST , server .port ))
4258
+ s .write (b'HASCERT' )
4259
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4260
+ s .write (b'PHA' )
4261
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4262
+ s .write (b'HASCERT' )
4263
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4264
+
4265
+ def test_pha_optional_nocert (self ):
4266
+ if support .verbose :
4267
+ sys .stdout .write ("\n " )
4268
+
4269
+ client_context , server_context , hostname = testing_context ()
4270
+ server_context .post_handshake_auth = True
4271
+ server_context .verify_mode = ssl .CERT_OPTIONAL
4272
+ client_context .post_handshake_auth = True
4273
+
4274
+ server = ThreadedEchoServer (context = server_context , chatty = False )
4275
+ with server :
4276
+ with client_context .wrap_socket (socket .socket (),
4277
+ server_hostname = hostname ) as s :
4278
+ s .connect ((HOST , server .port ))
4279
+ s .write (b'HASCERT' )
4280
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4281
+ s .write (b'PHA' )
4282
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4283
+ # optional doens't fail when client does not have a cert
4284
+ s .write (b'HASCERT' )
4285
+ self .assertEqual (s .recv (1024 ), b'FALSE\n ' )
4286
+
4287
+ def test_pha_no_pha_client (self ):
4288
+ client_context , server_context , hostname = testing_context ()
4289
+ server_context .post_handshake_auth = True
4290
+ server_context .verify_mode = ssl .CERT_REQUIRED
4291
+ client_context .load_cert_chain (SIGNED_CERTFILE )
4292
+
4293
+ server = ThreadedEchoServer (context = server_context , chatty = False )
4294
+ with server :
4295
+ with client_context .wrap_socket (socket .socket (),
4296
+ server_hostname = hostname ) as s :
4297
+ s .connect ((HOST , server .port ))
4298
+ with self .assertRaisesRegex (ssl .SSLError , 'not server' ):
4299
+ s .verify_client_post_handshake ()
4300
+ s .write (b'PHA' )
4301
+ self .assertIn (b'extension not received' , s .recv (1024 ))
4302
+
4303
+ def test_pha_no_pha_server (self ):
4304
+ # server doesn't have PHA enabled, cert is requested in handshake
4305
+ client_context , server_context , hostname = testing_context ()
4306
+ server_context .verify_mode = ssl .CERT_REQUIRED
4307
+ client_context .post_handshake_auth = True
4308
+ client_context .load_cert_chain (SIGNED_CERTFILE )
4309
+
4310
+ server = ThreadedEchoServer (context = server_context , chatty = False )
4311
+ with server :
4312
+ with client_context .wrap_socket (socket .socket (),
4313
+ server_hostname = hostname ) as s :
4314
+ s .connect ((HOST , server .port ))
4315
+ s .write (b'HASCERT' )
4316
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4317
+ # PHA doesn't fail if there is already a cert
4318
+ s .write (b'PHA' )
4319
+ self .assertEqual (s .recv (1024 ), b'OK\n ' )
4320
+ s .write (b'HASCERT' )
4321
+ self .assertEqual (s .recv (1024 ), b'TRUE\n ' )
4322
+
4323
+ def test_pha_not_tls13 (self ):
4324
+ # TLS 1.2
4325
+ client_context , server_context , hostname = testing_context ()
4326
+ server_context .verify_mode = ssl .CERT_REQUIRED
4327
+ client_context .maximum_version = ssl .TLSVersion .TLSv1_2
4328
+ client_context .post_handshake_auth = True
4329
+ client_context .load_cert_chain (SIGNED_CERTFILE )
4330
+
4331
+ server = ThreadedEchoServer (context = server_context , chatty = False )
4332
+ with server :
4333
+ with client_context .wrap_socket (socket .socket (),
4334
+ server_hostname = hostname ) as s :
4335
+ s .connect ((HOST , server .port ))
4336
+ # PHA fails for TLS != 1.3
4337
+ s .write (b'PHA' )
4338
+ self .assertIn (b'WRONG_SSL_VERSION' , s .recv (1024 ))
4339
+
4340
+
4151
4341
def test_main (verbose = False ):
4152
4342
if support .verbose :
4153
4343
import warnings
@@ -4183,6 +4373,7 @@ def test_main(verbose=False):
4183
4373
tests = [
4184
4374
ContextTests , BasicSocketTests , SSLErrorTests , MemoryBIOTests ,
4185
4375
SSLObjectTests , SimpleBackgroundTests , ThreadedTests ,
4376
+ TestPostHandshakeAuth
4186
4377
]
4187
4378
4188
4379
if support .is_resource_enabled ('network' ):
0 commit comments