@@ -497,19 +497,24 @@ def get_data_extension_rows(self, customer_key, search_filter=None, property_lis
497
497
de_row .props = property_list
498
498
return de_row .get ()
499
499
500
- def run_async_call (self , endpoint , payload ):
500
+ def run_async_call (self , endpoint , method , payload ):
501
501
headers = {'content-type' : 'application/json' , 'Authorization' : 'Bearer {}' .format (self .client .authToken )}
502
- r = requests .post (endpoint , json = payload , headers = headers )
502
+ if method == "POST" :
503
+ r = requests .post (endpoint , json = payload , headers = headers )
504
+ elif method == "PUT" :
505
+ r = requests .put (endpoint , json = payload , headers = headers )
506
+ else :
507
+ raise self .ETApiError ("Invalid Method." )
508
+
503
509
if r .status_code in range (200 , 300 ):
504
510
request_id = r .json ()['requestId' ]
505
511
endpoint = '{}/data/v1/async/{}/status' .format (self .client .base_api_url , request_id )
506
512
status = 'Pending'
507
513
while status == 'Pending' :
508
514
r = requests .get (endpoint , headers = headers )
509
515
if r .status_code in range (200 , 300 ):
510
- status = r .json ()["requestStatus" ]
511
- return True
512
- return False
516
+ status = r .json ()["status" ]["requestStatus" ]
517
+ return r
513
518
514
519
def create_data_extension_rows (self , data_extension_key , keys_list , values_list ):
515
520
endpoint = '{}hub/v1/dataevents/key:{}/rowset' .format (self .client .base_api_url , data_extension_key )
@@ -523,69 +528,19 @@ def create_data_extension_rows(self, data_extension_key, keys_list, values_list)
523
528
524
529
token = self .get_client ().authToken
525
530
res = requests .post (endpoint , json = payload , headers = {"Authorization" : "Bearer {}" .format (token )})
531
+ return res
526
532
527
- if res .status_code in range (200 , 300 ): # Success
528
- return len (values_list )
529
- else : # Error: Try inserting row by row
530
- rows_inserted = 0
531
- for keys_values in payload :
532
- property_dict = {}
533
- for i , key in enumerate (keys_values ["keys" ]):
534
- property_dict [key ] = keys_values ["values" ][i ]
535
- res = self .create_object (ObjectType .DATA_EXTENSION_ROW , property_dict , data_extension_key )
536
- if res .code in range (200 , 300 ):
537
- rows_inserted += 1
538
- return rows_inserted
539
-
540
- def create_data_extension_rows_async (self , data_extension_key , items_list ):
541
- endpoint = '{}/data/v1/async/dataextensions/key:{}/rows' .format (self .client .base_api_url , data_extension_key )
542
-
543
- payload = {'items' : items_list }
544
- res = self .run_async_call (endpoint , payload )
545
-
546
- if res : # Success
547
- return len (items_list )
548
- else : # Error: Try inserting row by row
549
- rows_inserted = 0
550
- for property_dict in items_list :
551
- res = self .create_object (ObjectType .DATA_EXTENSION_ROW , property_dict , data_extension_key )
552
- if res .code in range (200 , 300 ):
553
- rows_inserted += 1
554
- return rows_inserted
555
-
556
- def update_data_extension_rows (self , data_extension_key , rows_list ):
533
+ def create_data_extension_rows_async (self , data_extension_key , rows_list ):
557
534
endpoint = '{}/data/v1/async/dataextensions/key:{}/rows' .format (self .client .base_api_url , data_extension_key )
558
535
payload = {'items' : rows_list }
536
+ res = self .run_async_call (endpoint , "POST" , payload )
537
+ return res
559
538
560
- res = self .run_async_call (endpoint , payload )
561
-
562
- if res : # Success
563
- return len (rows_list )
564
- else : # Error: Try updating row by row
565
- rows_updated = 0
566
- for values_dict in rows_list :
567
- res = self .update_object (ObjectType .DATA_EXTENSION_ROW , data_extension_key = data_extension_key ,
568
- values_dict = values_dict )
569
- if res .code in range (200 , 300 ):
570
- rows_updated += 1
571
- return rows_updated
572
-
573
- def delete_data_extension_rows (self , data_extension_key , rows_list ):
539
+ def upsert_data_extension_rows_async (self , data_extension_key , rows_list ):
574
540
endpoint = '{}/data/v1/async/dataextensions/key:{}/rows' .format (self .client .base_api_url , data_extension_key )
575
541
payload = {'items' : rows_list }
576
-
577
- res = self .run_async_call (endpoint , payload )
578
-
579
- if res : # Success
580
- return len (rows_list )
581
- else : # Error: Try deleting row by row
582
- rows_deleted = 0
583
- for object_id_dict in rows_list :
584
- res = self .delete_object (ObjectType .DATA_EXTENSION_ROW , data_extension_key = data_extension_key ,
585
- object_id_dict = object_id_dict )
586
- if res .code in range (200 , 300 ):
587
- rows_deleted += 1
588
- return rows_deleted
542
+ res = self .run_async_call (endpoint , "PUT" , payload )
543
+ return res
589
544
590
545
# Convenience methods
591
546
@validate_response ()
0 commit comments