This repository was archived by the owner on Jun 11, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy path__init__.py
More file actions
1177 lines (1003 loc) · 42 KB
/
__init__.py
File metadata and controls
1177 lines (1003 loc) · 42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2010 University of Chicago
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Library for using the Globus Transfer API. Tested with python 2.6;
will likely also work with 2.7, but not with earlier releases or 3.x.
Can also be run with python -i or ipython and used as an interactive shell
for experimenting with the API:
ipython -- transfer_api.py USERNAME -k ~/.globus/userkey.pem \
-c ~/.globus/usercert.pem \
OR
python -i transfer_api.py ...
It creates an TransferAPIClient instance called "api" with the credentials
passed on the command line, which you can use to make requests:
> print dir(api) # See a list of available methods.
> code, reason, data = api.tasksummary() # Test out tasksummary.
See https://transfer.api.globusonline.org for API documentation.
"""
from __future__ import print_function
import os.path
import os
import sys
import platform
import socket
import json
import urllib
import time
import ssl
try:
from urlparse import urlparse
except:
from urllib.parse import urlparse
try:
from httplib import BadStatusLine
except:
from http.client import BadStatusLine
from datetime import datetime, timedelta
from globusonline.transfer.api_client.verified_https \
import VerifiedHTTPSConnection
from globusonline.transfer.api_client.goauth import get_access_token
API_VERSION = "v0.10"
DEFAULT_BASE_URL = "https://transfer.api.globusonline.org/" + API_VERSION
RETRY_WAIT_SECONDS=30
CA_FILE = "ca/all-ca.pem"
__all__ = ["TransferAPIClient", "TransferAPIError", "InterfaceError",
"APIError", "ClientError", "ServerError", "ExternalError",
"ServiceUnavailable", "Transfer", "Delete"]
# client version
__version__ = "0.10.17"
class TransferAPIClient(object):
"""
Maintains a connection to the server as a specific user. Not thread
safe. Uses the JSON representations.
Convenience api methods return a triple:
(status_code, status_message, data)
data is either the JSON response loaded as a python dictionary,
or None if the reponse was empty, or a conveninience wrapper around
the JSON data if the data itself is hard to use directly.
Endpoint names can be full canonical names of the form
ausername#epname, or simply epname, in which case the API looks at
the logged in user's endpoints.
"""
def __init__(self, username, server_ca_file=None,
cert_file=None, key_file=None,
base_url=DEFAULT_BASE_URL,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
httplib_debuglevel=0, max_attempts=1,
goauth=None):
"""
Initialize a client with the client credential and optional alternate
base URL.
For authentication, use either x509 or goauth.
x509 requires configuring your Globus Online account with the x509
certificate and having the private key available. Requires @cert_file
and @key_file, or just one if both are in the same file.
goauth requires fetching an access token from nexus, which supports
several authentication methods including basic auth - see the
goauth module and the nexus documentation.
@param username: username to connect to the service with.
@param server_ca_file: path to file containing one or more x509
certificates, used to verify the server
certificate. If not specified tries to choose
the appropriate CA based on the hostname in
base_url.
@param cert_file: path to file containing the x509 client certificate
for authentication.
@param key_file: path to file containg the RSA key for client
authentication. If blank and cert_file passed,
uses cert_file.
@param goauth: goauth access token retrieved from nexus.
@param base_url: optionally specify an alternate base url, if testing
out an unreleased or alternatively hosted version of
the API.
@param timeout: timeout to set on the underlying TCP socket.
@param max_attempts: Retry every API call on network
errors and ServiceUnavailable up to this many
times. Sleeps for 30 seconds between each attempt.
Note that a socket timeout will be treated as
a network error and retried. When max_attempts
is exceeded, the exception from the last attempt
will be raised. max_attempts=1 implies no
retrying.
"""
if server_ca_file is None:
server_ca_file = get_ca(base_url)
if server_ca_file is None:
raise InterfaceError("no CA found for base URL '%s'"
% base_url)
if not os.path.isfile(server_ca_file):
raise InterfaceError("server_ca_file not found: '%s'"
% server_ca_file)
self.headers = {}
if goauth:
if cert_file or key_file:
raise InterfaceError("pass only one auth method")
elif cert_file or key_file:
if not key_file:
key_file = cert_file
if not cert_file:
cert_file = key_file
if not os.path.isfile(cert_file):
raise InterfaceError("cert_file not found: %s" % cert_file)
if not os.path.isfile(key_file):
raise InterfaceError("key_file not found: %s" % key_file)
self.headers["X-Transfer-API-X509-User"] = username
else:
raise InterfaceError("pass one auth method")
if max_attempts is not None:
max_attempts = int(max_attempts)
if max_attempts < 1:
raise InterfaceError(
"max_attempts must be None or a positive integer")
self.max_attempts = max_attempts
self.goauth = goauth
self.cert_file = cert_file
self.key_file = key_file
self.username = username
self.server_ca_file = server_ca_file
self.httplib_debuglevel = httplib_debuglevel
self.base_url = base_url
self.host, self.port = _get_host_port(base_url)
self.timeout = timeout
self.print_request = False
self.print_response = False
self.c = None
self.user_agent = "Python-httplib/%s (%s)" \
% (platform.python_version(), platform.system())
self.client_info = "globusonline.transfer.api_client/%s" % __version__
def connect(self):
"""
Create an HTTPS connection to the server. Run automatically by
request methods.
"""
kwargs = dict(ca_certs=self.server_ca_file, strict=False,
timeout=self.timeout)
if self.cert_file:
kwargs["cert_file"] = self.cert_file
kwargs["key_file"] = self.key_file
self.c = VerifiedHTTPSConnection(self.host, self.port, **kwargs)
self.c.set_debuglevel(self.httplib_debuglevel)
def set_http_connection_debug(self, value):
"""
Turn debugging of the underlying VerifiedHTTPSConnection on or
off. Note: this may print sensative information, like auth tokens,
to standard out.
"""
if value:
level = 1
else:
level = 0
self.httplib_debuglevel = level
if self.c:
self.c.set_debuglevel(level)
def set_debug_print(self, print_request, print_response):
self.print_request = print_request
self.print_response = print_response
def close(self):
"""
Close the wrapped VerifiedHTTPSConnection.
"""
if self.c:
self.c.close()
self.c = None
def _request(self, method, path, body=None, content_type=None):
if not path.startswith("/"):
path = "/" + path
url = self.base_url + path
headers = self.headers.copy()
if content_type:
headers["Content-Type"] = content_type
if self.print_request:
print("")
print(">>>REQUEST>>>:")
print("%s %s" % (method, url))
for h in headers.iteritems():
print("%s: %s" % h)
print("")
if body:
print(body)
if self.goauth:
headers["Authorization"] = "Globus-Goauthtoken %s" % self.goauth
headers["User-Agent"] = self.user_agent
headers["X-Transfer-API-Client"] = self.client_info
def do_request():
if self.c is None:
self.connect()
self.c.request(method, url, body=body, headers=headers)
r = self.c.getresponse()
response_body = r.read()
return r, response_body
for attempt in range(self.max_attempts):
r = None
try:
try:
r, response_body = do_request()
except BadStatusLine:
# This happens when the connection is closed by the server
# in between request, which is very likely when using
# interactively, in a client that waits for user input
# between requests, or after a retry wait. This does not
# count as an attempt - it just means the old connection
# has gone stale and we need a new one.
# TODO: find a more elegant way to re-use the connection
# on closely spaced requests. Can we tell that the
# connection is dead without making a request?
self.close()
r, response_body = do_request()
except ssl.SSLError:
# This probably has to do with failed authentication, so
# retrying is not useful.
self.close()
raise
except socket.error:
# Network error. If the last attempt failed, raise,
# otherwise do nothing and go on to next attempt.
self.close()
if attempt == self.max_attempts - 1:
raise
# Check for 503 ServiceUnavailable, which is treated just like
# network errors.
if (r is not None and r.status == 503
and attempt < self.max_attempts - 1):
# Force sleep below and continue loop, unless we are on
# the last attempt in which case skip this and return
# the 503 error.
self.close()
r = None
if r is not None:
break
else:
time.sleep(RETRY_WAIT_SECONDS)
if self.print_response:
print("")
print("<<<RESPONSE<<<:")
print(r.status, r.reason)
for h in r.getheaders():
print("%s: %s" % h)
print("")
print(response_body)
return r, response_body
def _request_json(self, method, path, body=None, content_type=None):
"""
Make a request and load the response body as JSON, if the response
is not empty.
"""
r, response_body = self._request(method, path, body, content_type)
response_content_type = r.getheader("content-type")
assert response_content_type == "application/json" or r.status != 200
if response_body and response_content_type == "application/json":
try:
data = json.loads(response_body)
except Exception as e:
raise InterfaceError(
("Unable to parse JSON in response: err='%s', "
+"body len='%d', status='%d %s'")
% (e, len(response_body), r.status, r.reason))
else:
data = None
parts = response_content_type.split(';')
if parts[0].strip() == "text/html":
data = response_body
return api_result(r, data)
# Generic API methods:
def get(self, path):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self._request_json("GET", path)
def put(self, path, body):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self._request_json("PUT", path, body, "application/json")
def post(self, path, body):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self._request_json("POST", path, body, "application/json")
def _delete(self, path):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
TODO: this conflicts with the method for submitting delete
jobs, so it's named inconsistently from the other HTTP method
functions. Maybe they should all be _ prefixed?
"""
return self._request_json("DELETE", path)
# Convenience API methods:
def tasksummary(self, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/tasksummary" + encode_qs(kw))
def task_list(self, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/task_list" + encode_qs(kw))
def task(self, task_id, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/task/%s" % task_id + encode_qs(kw))
def task_update(self, task_id, task_data, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.post("/task/%s" % task_id + encode_qs(kw))
def task_cancel(self, task_id, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.post("/task/%s/cancel" % task_id + encode_qs(kw),
body=None)
def task_successful_transfers(self, task_id, **kw):
"""
Get a list of source and destination paths for files successful
transferred in a transfer task. Raises an error if task_id is not
a transfer task.
@param marker: start from specified marker, copied from the
next_marker field of the previous page
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/task/%s/successful_transfers" % task_id
+ encode_qs(kw))
def subtask_list(self, parent_task_id, **kw):
"""
DEPRECATED, see task_successful_transfers
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/task/%s/subtask_list"
% parent_task_id + encode_qs(kw))
def subtask(self, task_id, **kw):
"""
DEPRECATED, see task_successful_transfers
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/subtask/%s" % task_id + encode_qs(kw))
def task_event_list(self, parent_task_id, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/task/%s/event_list" % parent_task_id + encode_qs(kw))
def endpoint_list(self, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get("/endpoint_list" + encode_qs(kw))
def endpoint(self, endpoint_name, **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.get(_endpoint_path(endpoint_name) + encode_qs(kw))
def endpoint_activation_requirements(self, endpoint_name, **kw):
"""
@return: (code, reason, data), where data is an
ActivationRequirements instance instead of a plain
dictionary.
@raise TransferAPIError
"""
code, reason, data = self.get(_endpoint_path(endpoint_name,
"/activation_requirements")
+ encode_qs(kw))
if code == 200 and data:
data = ActivationRequirementList(data)
return code, reason, data
def endpoint_activate(self, endpoint_name, filled_requirements,
if_expires_in="", timeout=30):
"""
@param endpoint_name: partial or canonical name of endpoint to
activate.
@param filled_requirements: ActivationRequirementList instance with
required values set for one activation
type.
@type filled_requirements: ActivationRequirementList
@param if_expires_in: don't re-activate endpoint if it doesn't expire
for this many minutes. If not passed, always
activate, even if already activated.
@param timeout: timeout in seconds to attempt contacting external
servers to get the credential.
@return: (code, reason, data), where data is an ActivationRequirements
instance.
@raise TransferAPIError
"""
if filled_requirements:
body = json.dumps(filled_requirements.json_data)
else:
raise InterfaceError("Use autoactivate instead; using activate "
"with an empty request body to auto activate is "
"deprecated.")
# Note: blank query parameters are ignored, so we can pass blank
# values to use the default behavior.
qs = encode_qs(dict(if_expires_in=str(if_expires_in),
timeout=str(timeout)))
code, reason, data = self.post(
_endpoint_path(endpoint_name, "/activate" + qs), body=body)
if code == 200 and data:
data = ActivationRequirementList(data)
return code, reason, data
def endpoint_autoactivate(self, endpoint_name, if_expires_in="",
timeout=30):
"""
@param endpoint_name: partial or canonical name of endpoint to
activate.
@param if_expires_in: don't re-activate endpoint if it doesn't expire
for this many minutes. If not passed, always
activate, even if already activated.
@param timeout: timeout in seconds to attempt contacting external
servers to get the credential.
@return: (code, reason, data), where data is an ActivationRequirements
instance.
@raise TransferAPIError
"""
# Note: blank query parameters are ignored, so we can pass blank
# values to use the default behavior.
qs = encode_qs(dict(if_expires_in=str(if_expires_in),
timeout=str(timeout)))
code, reason, data = self.post(
_endpoint_path(endpoint_name, "/autoactivate" + qs), body=None)
if code == 200 and data:
data = ActivationRequirementList(data)
return code, reason, data
def endpoint_deactivate(self, endpoint_name, **kw):
"""
@param endpoint_name: partial or canonical name of endpoint to
activate.
@return: (code, reason, data)
@raise TransferAPIError
"""
# Note: blank query parameters are ignored, so we can pass blank
# values to use the default behavior.
code, reason, data = self.post(
_endpoint_path(endpoint_name, "/deactivate") + encode_qs(kw),
body=None)
return code, reason, data
def endpoint_ls(self, endpoint_name, path="", **kw):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
kw["path"] = path
return self.get(_endpoint_path(endpoint_name, "/ls")
+ encode_qs(kw))
def endpoint_mkdir(self, endpoint_name, path, **kw):
data = dict(path=path, DATA_TYPE="mkdir")
return self.post(_endpoint_path(endpoint_name, "/mkdir")
+ encode_qs(kw), json.dumps(data))
def endpoint_create(self, endpoint_name, hostname=None, description="",
scheme="gsiftp", port=2811, subject=None,
myproxy_server=None, myproxy_dn=None,
public=False, is_globus_connect=False,
default_directory=None, oauth_server=None):
"""
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
data = { "DATA_TYPE": "endpoint",
"myproxy_server": myproxy_server,
"myproxy_dn": myproxy_dn,
"description": description,
"canonical_name": endpoint_name,
"public": public,
"is_globus_connect": is_globus_connect,
"default_directory": default_directory,
"oauth_server": oauth_server, }
if not is_globus_connect:
data["DATA"] = [dict(DATA_TYPE="server",
hostname=hostname,
scheme=scheme,
port=port,
subject=subject)]
return self.post("/endpoint", json.dumps(data))
def endpoint_update(self, endpoint_name, endpoint_data):
"""
Update top level endpoint fields. Using this method to add or remove
server is deprecated, use endpoint_server_add and
endpoint_server_delete instead.
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.put(_endpoint_path(endpoint_name),
json.dumps(endpoint_data))
def endpoint_rename(self, endpoint_name, new_endpoint_name):
_, _, endpoint_data = self.endpoint(endpoint_name)
endpoint_data["canonical_name"] = new_endpoint_name
del endpoint_data["name"]
return self.endpoint_update(endpoint_name, endpoint_data)
def endpoint_delete(self, endpoint_name):
"""
Delete the specified endpoint. Existing transfers using the endpoint
will continue to work, but you will not be able to use the endpoint
in any new operations, and it will be gone from the endpoint_list.
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self._delete(_endpoint_path(endpoint_name))
def endpoint_server_list(self, endpoint_name, **kw):
return self.get(_endpoint_path(endpoint_name, "/server_list")
+ encode_qs(kw))
def endpoint_server(self, endpoint_name, server_id, **kw):
return self.get(_endpoint_path(endpoint_name, "/server/")
+ urllib.quote(str(server_id)) + encode_qs(kw))
def endpoint_server_delete(self, endpoint_name, server_id, **kw):
return self._delete(_endpoint_path(endpoint_name, "/server/")
+ urllib.quote(str(server_id)) + encode_qs(kw))
def endpoint_server_add(self, endpoint_name, server_data):
return self.post(_endpoint_path(endpoint_name, "/server"),
json.dumps(server_data))
def shared_endpoint_create(self, endpoint_name, host_endpoint,
host_path):
"""
[ALPHA] This API is alpha and may change between minor server releases.
It is provided for testing and development only.
Create a shared endpoint on the specified host. Raises an error if
the host endpoint does not support sharing, if the user is not licensed
to use sharing, or if the specified path is not allowed for sharing.
@param endpoint_name: name of the new shared endpoint to create
@param host_endpoint: endpoint that hosts the actual data for the
shared endpoint. Must be running a newer GridFTP server with sharing
enabled.
@param host_path: a path on the host_endpoint to use as the root of
the new shared endpoint.
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
data = { "DATA_TYPE": "shared_endpoint",
"name": endpoint_name,
"host_endpoint": host_endpoint,
"host_path": host_path, }
return self.post("/shared_endpoint", json.dumps(data))
def submission_id(self):
"""
@return: (status_code, status_reason, data)
@raise: TransferAPIError
"""
return self.get("/submission_id")
# backward compatibility
transfer_submission_id = submission_id
def transfer(self, transfer):
"""
@type transfer: Transfer object
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.post("/transfer", transfer.as_json())
def delete(self, delete):
"""
@type delete: Delete object
@return: (status_code, status_reason, data)
@raise TransferAPIError
"""
return self.post("/delete", delete.as_json())
class Transfer(object):
"""
Class for constructing a transfer request, which is a collections of items
containing the source and destination paths, along with flags.
A transfer can only invovle one source and one destination endpoint, so
they are set in the constructor.
@param **kw: support additional top level options without having to
add them to the client API. New options should use this.
Example options are encrypt_data and verify_checksum,
both of which are boolean and default to False if not
specified.
"""
def __init__(self, submission_id, source_endpoint, destination_endpoint,
deadline=None, sync_level=None, label=None, **kw):
self.submission_id = submission_id
self.source_endpoint = source_endpoint
self.destination_endpoint = destination_endpoint
self.deadline = deadline
self.sync_level = sync_level
self.kw = kw
self.label = label
self.items = []
def add_item(self, source_path, destination_path, recursive=False,
verify_size=None):
item = dict(source_path=source_path,
destination_path=destination_path,
recursive=recursive,
verify_size=verify_size,
DATA_TYPE="transfer_item")
self.items.append(item)
def as_data(self):
if self.deadline is None:
deadline = None
else:
deadline = str(self.deadline)
data = { "DATA_TYPE": "transfer",
"length": len(self.items),
"submission_id": self.submission_id,
"source_endpoint": self.source_endpoint,
"destination_endpoint": self.destination_endpoint,
"deadline": deadline,
"sync_level": self.sync_level,
"label": self.label,
"DATA": self.items }
if self.kw:
data.update(self.kw)
return data
def as_json(self):
return json.dumps(self.as_data())
# For backward compatibility; new code should just use Transfer.
SimpleTransfer = Transfer
class Delete(object):
"""
Class for constructing a delete request, which contains an endpoint and a
collections of items containing the paths to delete on that endpoint. To
delete directories, the recursive option must be set.
"""
def __init__(self, submission_id, endpoint, deadline=None, recursive=False,
ignore_missing=True, label=None, interpret_globs=False):
self.submission_id = submission_id
self.endpoint = endpoint
self.deadline = deadline
self.recursive = recursive
self.ignore_missing = ignore_missing
self.interpret_globs = interpret_globs
self.label = label
self.items = []
def add_item(self, path):
item = dict(path=path, DATA_TYPE="delete_item")
self.items.append(item)
def as_data(self):
if self.deadline is None:
# TODO: there is a bug in the API that doesn't allow null
# deadline for delete. Change this once it's fixed.
#deadline = None
deadline = str(datetime.utcnow() + timedelta(seconds=3600 * 24))
else:
deadline = str(self.deadline)
return { "DATA_TYPE": "delete",
"length": len(self.items),
"submission_id": self.submission_id,
"endpoint": self.endpoint,
"deadline": deadline,
"recursive": self.recursive,
"ignore_missing": self.ignore_missing,
"interpret_globs": self.interpret_globs,
"label": self.label,
"DATA": self.items }
def as_json(self):
return json.dumps(self.as_data())
class ActivationRequirementList(object):
"""
Wrapper around the activation requirement list data which makes it easier
to set specific values. The json data itself uses a list to preserve
order for display in a UI, but that is not so convenient for programmatic
access.
"""
fields = []
def __init__(self, json_data):
if json_data["DATA_TYPE"] not in ("activation_requirements",
"activation_result"):
raise InterfaceError("Expected activation_requirements or "
"activation_result data, got "
"'%s'" % json_data["DATA_TYPE"])
self.json_data = json_data
# The req_list contains references to activation_requirement
# data dictionaries in self.json_data.
self.req_list = []
self.types = []
self.type_reqs = {}
self.index_map = {}
subdocuments = json_data.get("DATA", ())
for r in subdocuments:
if r["DATA_TYPE"] != "activation_requirement":
continue
type_ = r["type"]
name = r["name"]
if type_ not in self.types:
self.types.append(type_)
key = type_ + "." + name
self.req_list.append(r)
self.index_map[key] = len(self.req_list) - 1
def __getitem__(self, key):
return self.json_data[key]
def _get_requirement(self, key):
"""
Keys should be "type.name"
"""
return self.req_list[self.index_map[key]]
def set_requirement_value(self, type, name, value):
"""
@raise KeyError: if requirement not found.
"""
key = type + "." + name
self._get_requirement(key)["value"] = value
def get_requirement_value(self, type, name):
"""
@raise KeyError: if requirement not found.
"""
key = type + "." + name
return self._get_requirement(key)["value"]
def is_required(self, type, name):
"""
@raise KeyError: if requirement not found.
"""
key = type + "." + name
return self._get_requirement(key)["required"]
def is_private(self, type, name):
"""
@raise KeyError: if requirement not found.
"""
key = type + "." + name
return self._get_requirement(key)["private"]
def get_requirements_list(self, type):
"""
If no requirements were found with matching type, that type is not
supported and we return None.
"""
reqs = [req for req in self.req_list if req["type"] == type]
if reqs:
return reqs
return None
def set_submit_type(self, type):
"""
Removes requirements of other types; this is needed when submitting,
to indicate what type of activation is actually desired.
"""
self.req_list = [req for req in self.req_list if req["type"] == type]
# remap
keys = [r["type"] + "." + r["name"] for r in self.req_list]
self.index_map = dict(zip(keys, range(len(keys))))
def as_json(self):
return json.dumps(self.json_data)
def supported_types(self):
return self.types
def __str__(self):
return str(self.json_data)
def __repr__(self):
return str(self.json_data)
def _get_host_port(url):
o = urlparse(url)
netloc_parts = o.netloc.split(":")
if len(netloc_parts) == 2:
host = netloc_parts[0]
port = int(netloc_parts[1])
else:
host = o.netloc
if o.scheme == "https":
port = 443
else:
port = 80
return (host, port)
class TransferAPIError(Exception):
"""
Superclass for API errors.
"""
pass
class InterfaceError(TransferAPIError):
"""
Error generated by the python interface.
"""
pass
class APIError(TransferAPIError):
"""
Wrapper around an error returned by the transfer API.
"""
def __init__(self, error_code, status_code, status_message, error_data):
self.status_code = status_code
self.status_message = status_message
self.code = error_code
# error_data not set unless reply is from the api itself
self.read_error_data(error_code, error_data)
Exception.__init__(self, status_message)
def read_error_data(self, error_code, error_data):
if error_code:
# API error message
self.resource = error_data["resource"]
self._message = error_data["message"]
self.request_id = error_data["request_id"]
else:
# Possibly web server error message, assume plaintext
# response body
self._message = error_data
@property
def message(self):
return self._message
@property
def status(self):
return "%s %s" % (self.status_code, self.status_message)
def __str__(self):
return "%s (%s): %s" % (self.code, self.status, self.message)
class ClientError(APIError):
"""
DEPRECATED, use APIError instead
Used for 400 errors.
"""
pass
class ServerError(APIError):
"""
DEPRECATED, use APIError instead
Used for 500 error only. Indicates bug in the server.
"""
pass
class ExternalError(APIError):
"""
DEPRECATED, use APIError instead
Used for 502 Bad Gateway and 504 Gateway Timeout.
Inticates problem contacting external resources, like gridftp
endpoints and myproxy servers.
"""
pass
class ServiceUnavailable(APIError):
"""
DEPRECATED, use APIError instead
Used for 503 Service Unavailable.
"""
pass