@@ -112,6 +112,30 @@ def source_size_decorator(source_size_impl):
112112
113113 return source_size_decorator
114114
115+ @staticmethod
116+ def _from_protobuf_remote_mount (remote_mount ):
117+ return Mount (remote_environment = RemoteEnvironment .from_proto (
118+ remote_mount .remote_environment ),
119+ mount_path = remote_mount .mount_path ,
120+ shared_path = remote_mount .shared_path )
121+
122+ @staticmethod
123+ def _get_mounts_from_request (request ):
124+ staged_mount = request .staged_source .staged_mount
125+ mounts = request .staged_source .mounts
126+ if mounts and len (mounts ) > 0 and staged_mount and \
127+ staged_mount .mount_path and len (staged_mount .mount_path ) > 0 \
128+ and staged_mount .remote_environment :
129+ raise PluginRuntimeError (
130+ 'Either staged_mount or mounts can be present for staging source. '
131+ 'Found both staged_mount and mounts.' )
132+
133+ if mounts and len (mounts ) > 0 :
134+ return None , [
135+ LinkedOperations ._from_protobuf_remote_mount (m ) for m in mounts ]
136+ else :
137+ return LinkedOperations ._from_protobuf_remote_mount (staged_mount ), None
138+
115139 def _internal_direct_pre_snapshot (self , request ):
116140 """Pre Snapshot Wrapper for direct plugins.
117141
@@ -331,19 +355,16 @@ def _internal_staged_pre_snapshot(self, request):
331355 linked_source = request .staged_source .linked_source
332356 staged_source_definition = (LinkedSourceDefinition .from_dict (
333357 json .loads (linked_source .parameters .json )))
334- staged_mount = request .staged_source .staged_mount
335- mount = Mount (remote_environment = RemoteEnvironment .from_proto (
336- staged_mount .remote_environment ),
337- mount_path = staged_mount .mount_path ,
338- shared_path = staged_mount .shared_path )
358+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
339359 staged_source = StagedSource (
340360 guid = linked_source .guid ,
341361 source_connection = RemoteConnection .from_proto (
342362 request .staged_source .source_connection ),
343363 parameters = staged_source_definition ,
344- mount = mount ,
364+ mount = staged_mount ,
345365 staged_connection = RemoteConnection .from_proto (
346- request .staged_source .staged_connection ))
366+ request .staged_source .staged_connection ),
367+ mounts = mounts )
347368
348369 repository = RepositoryDefinition .from_dict (
349370 json .loads (request .repository .parameters .json ))
@@ -406,19 +427,16 @@ def to_protobuf(snapshot):
406427
407428 staged_source_definition = LinkedSourceDefinition .from_dict (
408429 json .loads (request .staged_source .linked_source .parameters .json ))
409- mount = Mount (
410- remote_environment = RemoteEnvironment .from_proto (
411- request .staged_source .staged_mount .remote_environment ),
412- mount_path = request .staged_source .staged_mount .mount_path ,
413- shared_path = request .staged_source .staged_mount .shared_path )
430+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
414431 staged_source = StagedSource (
415432 guid = request .staged_source .linked_source .guid ,
416433 source_connection = RemoteConnection .from_proto (
417434 request .staged_source .source_connection ),
418435 parameters = staged_source_definition ,
419- mount = mount ,
436+ mount = staged_mount ,
420437 staged_connection = RemoteConnection .from_proto (
421- request .staged_source .staged_connection ))
438+ request .staged_source .staged_connection ),
439+ mounts = mounts )
422440
423441 repository = RepositoryDefinition .from_dict (
424442 json .loads (request .repository .parameters .json ))
@@ -480,19 +498,16 @@ def _internal_start_staging(self, request):
480498
481499 staged_source_definition = LinkedSourceDefinition .from_dict (
482500 json .loads (request .staged_source .linked_source .parameters .json ))
483- mount = Mount (
484- remote_environment = (RemoteEnvironment .from_proto (
485- request .staged_source .staged_mount .remote_environment )),
486- mount_path = request .staged_source .staged_mount .mount_path ,
487- shared_path = request .staged_source .staged_mount .shared_path )
501+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
488502 staged_source = StagedSource (
489503 guid = request .staged_source .linked_source .guid ,
490504 source_connection = RemoteConnection .from_proto (
491505 request .staged_source .source_connection ),
492506 parameters = staged_source_definition ,
493- mount = mount ,
507+ mount = staged_mount ,
494508 staged_connection = RemoteConnection .from_proto (
495- request .staged_source .staged_connection ))
509+ request .staged_source .staged_connection ),
510+ mounts = mounts )
496511
497512 repository = RepositoryDefinition .from_dict (
498513 json .loads (request .repository .parameters .json ))
@@ -539,19 +554,16 @@ def _internal_stop_staging(self, request):
539554
540555 staged_source_definition = LinkedSourceDefinition .from_dict (
541556 json .loads (request .staged_source .linked_source .parameters .json ))
542- mount = Mount (
543- remote_environment = (RemoteEnvironment .from_proto (
544- request .staged_source .staged_mount .remote_environment )),
545- mount_path = request .staged_source .staged_mount .mount_path ,
546- shared_path = request .staged_source .staged_mount .shared_path )
557+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
547558 staged_source = StagedSource (
548559 guid = request .staged_source .linked_source .guid ,
549560 source_connection = RemoteConnection .from_proto (
550561 request .staged_source .source_connection ),
551562 parameters = staged_source_definition ,
552- mount = mount ,
563+ mount = staged_mount ,
553564 staged_connection = RemoteConnection .from_proto (
554- request .staged_source .staged_connection ))
565+ request .staged_source .staged_connection ),
566+ mounts = mounts )
555567
556568 repository = RepositoryDefinition .from_dict (
557569 json .loads (request .repository .parameters .json ))
@@ -598,19 +610,16 @@ def _internal_status(self, request):
598610
599611 staged_source_definition = LinkedSourceDefinition .from_dict (
600612 json .loads (request .staged_source .linked_source .parameters .json ))
601- mount = Mount (
602- remote_environment = (RemoteEnvironment .from_proto (
603- request .staged_source .staged_mount .remote_environment )),
604- mount_path = request .staged_source .staged_mount .mount_path ,
605- shared_path = request .staged_source .staged_mount .shared_path )
613+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
606614 staged_source = StagedSource (
607615 guid = request .staged_source .linked_source .guid ,
608616 source_connection = RemoteConnection .from_proto (
609617 request .staged_source .source_connection ),
610618 parameters = staged_source_definition ,
611- mount = mount ,
619+ mount = staged_mount ,
612620 staged_connection = RemoteConnection .from_proto (
613- request .staged_source .staged_connection ))
621+ request .staged_source .staged_connection ),
622+ mounts = mounts )
614623
615624 repository = RepositoryDefinition .from_dict (
616625 json .loads (request .repository .parameters .json ))
@@ -661,19 +670,16 @@ def _internal_worker(self, request):
661670
662671 staged_source_definition = LinkedSourceDefinition .from_dict (
663672 json .loads (request .staged_source .linked_source .parameters .json ))
664- mount = Mount (
665- remote_environment = (RemoteEnvironment .from_proto (
666- request .staged_source .staged_mount .remote_environment )),
667- mount_path = request .staged_source .staged_mount .mount_path ,
668- shared_path = request .staged_source .staged_mount .shared_path )
673+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
669674 staged_source = StagedSource (
670675 guid = request .staged_source .linked_source .guid ,
671676 source_connection = RemoteConnection .from_proto (
672677 request .staged_source .source_connection ),
673678 parameters = staged_source_definition ,
674- mount = mount ,
679+ mount = staged_mount ,
675680 staged_connection = RemoteConnection .from_proto (
676- request .staged_source .staged_connection ))
681+ request .staged_source .staged_connection ),
682+ mounts = mounts )
677683
678684 repository = RepositoryDefinition .from_dict (
679685 json .loads (request .repository .parameters .json ))
@@ -712,6 +718,16 @@ def _internal_mount_specification(self, request):
712718 from generated .definitions import RepositoryDefinition
713719 from generated .definitions import LinkedSourceDefinition
714720
721+ def to_protobuf_subset_mount (subset_mount ):
722+ subset_mount_protobuf = common_pb2 .SingleSubsetMount ()
723+ subset_mount_protobuf .mount_path = subset_mount .mount_path
724+ subset_mount_protobuf .remote_environment .CopyFrom (
725+ subset_mount .remote_environment .to_proto ())
726+ if subset_mount .shared_path :
727+ subset_mount_protobuf .shared_path = subset_mount .shared_path
728+
729+ return subset_mount_protobuf
730+
715731 def to_protobuf_single_mount (single_mount ):
716732 if single_mount .shared_path :
717733 raise PluginRuntimeError (
@@ -734,19 +750,16 @@ def to_protobuf_ownership_spec(ownership_spec):
734750
735751 staged_source_definition = LinkedSourceDefinition .from_dict (
736752 json .loads (request .staged_source .linked_source .parameters .json ))
737- mount = Mount (
738- remote_environment = (RemoteEnvironment .from_proto (
739- request .staged_source .staged_mount .remote_environment )),
740- mount_path = request .staged_source .staged_mount .mount_path ,
741- shared_path = request .staged_source .staged_mount .shared_path )
753+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
742754 staged_source = StagedSource (
743755 guid = request .staged_source .linked_source .guid ,
744756 source_connection = RemoteConnection .from_proto (
745757 request .staged_source .source_connection ),
746758 parameters = staged_source_definition ,
747- mount = mount ,
759+ mount = staged_mount ,
748760 staged_connection = RemoteConnection .from_proto (
749- request .staged_source .staged_connection ))
761+ request .staged_source .staged_connection ),
762+ mounts = mounts )
750763
751764 repository = RepositoryDefinition .from_dict (
752765 json .loads (request .repository .parameters .json ))
@@ -760,18 +773,20 @@ def to_protobuf_ownership_spec(ownership_spec):
760773 type (mount_spec ),
761774 MountSpecification )
762775
763- # Only one mount is supported for linked sources.
776+ staged_mount_spec_response = platform_pb2 .StagedMountSpecResponse ()
777+
764778 mount_len = len (mount_spec .mounts )
765- if mount_len != 1 :
779+ if mount_len < 1 :
766780 raise PluginRuntimeError (
767- 'Exactly one mount must be provided for staging sources.'
768- ' Found {}' .format (mount_len ))
769-
770- staged_mount = to_protobuf_single_mount (mount_spec .mounts [0 ])
771-
772- staged_mount_spec_response = platform_pb2 .StagedMountSpecResponse ()
773- staged_mount_spec_response .return_value .staged_mount .CopyFrom (
774- staged_mount )
781+ 'Mount must be provided for staging sources.'
782+ ' Found {} mounts.' .format (mount_len ))
783+ elif mount_len > 1 :
784+ mounts = [to_protobuf_subset_mount (m ) for m in mount_spec .mounts ]
785+ staged_mount_spec_response .return_value .mounts .extend (mounts )
786+ else :
787+ staged_mount = to_protobuf_single_mount (mount_spec .mounts [0 ])
788+ staged_mount_spec_response .return_value .staged_mount .CopyFrom (
789+ staged_mount )
775790
776791 # Ownership spec is optional for linked sources.
777792 if mount_spec .ownership_specification :
@@ -812,19 +827,16 @@ def _internal_staged_source_size(self, request):
812827
813828 staged_source_definition = LinkedSourceDefinition .from_dict (
814829 json .loads (request .staged_source .linked_source .parameters .json ))
815- mount = Mount (
816- remote_environment = (RemoteEnvironment .from_proto (
817- request .staged_source .staged_mount .remote_environment )),
818- mount_path = request .staged_source .staged_mount .mount_path ,
819- shared_path = request .staged_source .staged_mount .shared_path )
830+ staged_mount , mounts = LinkedOperations ._get_mounts_from_request (request )
820831 staged_source = StagedSource (
821832 guid = request .staged_source .linked_source .guid ,
822833 source_connection = RemoteConnection .from_proto (
823834 request .staged_source .source_connection ),
824835 parameters = staged_source_definition ,
825- mount = mount ,
836+ mount = staged_mount ,
826837 staged_connection = RemoteConnection .from_proto (
827- request .staged_source .staged_connection ))
838+ request .staged_source .staged_connection ),
839+ mounts = mounts )
828840
829841 repository = RepositoryDefinition .from_dict (
830842 json .loads (request .repository .parameters .json ))
0 commit comments