3
3
import os
4
4
import shutil
5
5
import time
6
+ from pathlib import Path
6
7
7
8
import pytest
8
9
import requests
@@ -493,6 +494,23 @@ def test_local_jwk():
493
494
assert kb
494
495
495
496
497
+ def test_local_jwk_update ():
498
+ cache_time = 0.1
499
+ _path = full_path ('jwk_private_key.json' )
500
+ kb = KeyBundle (source = 'file://{}' .format (_path ), cache_time = cache_time )
501
+ assert kb
502
+ _ = kb .keys ()
503
+ last1 = kb .last_local
504
+ _ = kb .keys ()
505
+ last2 = kb .last_local
506
+ assert last1 == last2 # file not changed
507
+ time .sleep (cache_time + 0.1 )
508
+ Path (_path ).touch ()
509
+ _ = kb .keys ()
510
+ last3 = kb .last_local
511
+ assert last2 != last3 # file changed
512
+
513
+
496
514
def test_local_jwk_copy ():
497
515
_path = full_path ('jwk_private_key.json' )
498
516
kb = KeyBundle (source = 'file://{}' .format (_path ))
@@ -517,13 +535,14 @@ def mocked_jwks_response():
517
535
def test_httpc_params_1 ():
518
536
source = 'https://login.salesforce.com/id/keys' # From test_jwks_url()
519
537
# Mock response
520
- responses .add ( method = responses . GET , url = source , json = JWKS_DICT , status = 200 )
521
- httpc_params = { 'timeout' : ( 2 , 2 )} # connect, read timeouts in seconds
522
- kb = KeyBundle ( source = source , httpc = requests . request ,
523
- httpc_params = httpc_params )
524
- assert kb . do_remote ( )
525
-
538
+ with responses .RequestsMock () as rsps :
539
+ rsps . add ( method = responses . GET , url = source , json = JWKS_DICT , status = 200 )
540
+ httpc_params = { 'timeout' : ( 2 , 2 )} # connect, read timeouts in seconds
541
+ kb = KeyBundle ( source = source , httpc = requests . request ,
542
+ httpc_params = httpc_params )
543
+ assert kb . do_remote ()
526
544
545
+ @pytest .mark .network
527
546
def test_httpc_params_2 ():
528
547
httpc_params = {'timeout' : 0 }
529
548
kb = KeyBundle (source = 'https://login.salesforce.com/id/keys' ,
@@ -969,3 +988,41 @@ def test_remote():
969
988
assert kb2 .httpc_params == {'timeout' : (2 , 2 )}
970
989
assert kb2 .imp_jwks
971
990
assert kb2 .last_updated
991
+
992
+ def test_remote_not_modified ():
993
+ source = 'https://example.com/keys.json'
994
+ headers = {
995
+ "Date" : "Fri, 15 Mar 2019 10:14:25 GMT" ,
996
+ "Last-Modified" : "Fri, 1 Jan 1970 00:00:00 GMT" ,
997
+ }
998
+ headers = {}
999
+
1000
+ # Mock response
1001
+ httpc_params = {'timeout' : (2 , 2 )} # connect, read timeouts in seconds
1002
+ kb = KeyBundle (source = source , httpc = requests .request ,
1003
+ httpc_params = httpc_params )
1004
+
1005
+ with responses .RequestsMock () as rsps :
1006
+ rsps .add (method = "GET" , url = source , json = JWKS_DICT , status = 200 , headers = headers )
1007
+ assert kb .do_remote ()
1008
+ assert kb .last_remote == headers .get ("Last-Modified" )
1009
+ timeout1 = kb .time_out
1010
+
1011
+ with responses .RequestsMock () as rsps :
1012
+ rsps .add (method = "GET" , url = source , status = 304 , headers = headers )
1013
+ assert kb .do_remote ()
1014
+ assert kb .last_remote == headers .get ("Last-Modified" )
1015
+ timeout2 = kb .time_out
1016
+
1017
+ assert timeout1 == timeout2
1018
+
1019
+ exp = kb .dump ()
1020
+ kb2 = KeyBundle ().load (exp )
1021
+ assert kb2 .source == source
1022
+ assert len (kb2 .keys ()) == 3
1023
+ assert len (kb2 .get ("rsa" )) == 1
1024
+ assert len (kb2 .get ("oct" )) == 1
1025
+ assert len (kb2 .get ("ec" )) == 1
1026
+ assert kb2 .httpc_params == {'timeout' : (2 , 2 )}
1027
+ assert kb2 .imp_jwks
1028
+ assert kb2 .last_updated
0 commit comments