From 4cfbc034a28ecfc9eb253b0fa0cd1ed50855e598 Mon Sep 17 00:00:00 2001 From: Danilo Lessa Bernardineli Date: Wed, 19 Dec 2018 13:36:06 -0200 Subject: [PATCH 1/2] added an GCS operator --- README.md | 10 +- __init__.py | 3 +- __pycache__/__init__.cpython-36.pyc | Bin 0 -> 835 bytes hooks/__pycache__/__init__.cpython-36.pyc | Bin 0 -> 131 bytes hooks/__pycache__/mongo_hook.cpython-36.pyc | Bin 0 -> 5336 bytes operators/__pycache__/__init__.cpython-36.pyc | Bin 0 -> 135 bytes .../mongo_to_gcs_operator.cpython-36.pyc | Bin 0 -> 3603 bytes .../mongo_to_s3_operator.cpython-36.pyc | Bin 0 -> 3483 bytes .../s3_to_mongo_operator.cpython-36.pyc | Bin 0 -> 5751 bytes operators/mongo_to_gcs_operator.py | 95 ++++++++++++++++++ 10 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 __pycache__/__init__.cpython-36.pyc create mode 100644 hooks/__pycache__/__init__.cpython-36.pyc create mode 100644 hooks/__pycache__/mongo_hook.cpython-36.pyc create mode 100644 operators/__pycache__/__init__.cpython-36.pyc create mode 100644 operators/__pycache__/mongo_to_gcs_operator.cpython-36.pyc create mode 100644 operators/__pycache__/mongo_to_s3_operator.cpython-36.pyc create mode 100644 operators/__pycache__/s3_to_mongo_operator.cpython-36.pyc create mode 100644 operators/mongo_to_gcs_operator.py diff --git a/README.md b/README.md index c13d64c..2185339 100644 --- a/README.md +++ b/README.md @@ -1 +1,9 @@ -# mongo_plugin \ No newline at end of file +# mongo_plugin + +MongoDB operators and hooks for Airflow + +This plugin contains an hook for connecting with Airflow and also some operators for syncing with Amazon S3 and Google Cloud Storage: + +* S3ToMongoOperator +* MongoToS3Operator +* MongoToGCSOperator diff --git a/__init__.py b/__init__.py index 6b5026f..d57ecf7 100644 --- a/__init__.py +++ b/__init__.py @@ -2,11 +2,12 @@ from mongo_plugin.hooks.mongo_hook import MongoHook from mongo_plugin.operators.s3_to_mongo_operator import S3ToMongoOperator from mongo_plugin.operators.mongo_to_s3_operator import MongoToS3Operator +from mongo_plugin.operators.mongo_to_gcs_operator import MongoToGCSOperator class MongoPlugin(AirflowPlugin): name = "MongoPlugin" - operators = [MongoToS3Operator, S3ToMongoOperator] + operators = [MongoToS3Operator, S3ToMongoOperator, MongoToGCSOperator] hooks = [MongoHook] executors = [] macros = [] diff --git a/__pycache__/__init__.cpython-36.pyc b/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..4d58572242eb2e7e096a39727cbbfb5cc0976c40 GIT binary patch literal 835 zcmZ`%&2AGh5Vo`bn*>Th;=}_)pzI+?To5YaP!3Qkr6&u?GO@c|#a^d&mckjtdvN5H zeC5O|aANG;q~TCoGyD0oXU6k+7t?9^^T+YWcO!&;qk|CxJ^{1m;5gz~B83UY9*^Wm zMI;J1lCeri5^yY2m60sqMCPg>1xBZcr#$0?BhQ)*B;z9@F!$ugk)y9*7xX{bR0eL)-r*D=!cI8X#Mg-sl%svMv z5WyUg5m+Q*k%+X8NG9?clcGkX1RH}*z@}jLIEE{a+&I+eZ+bSn?{Z38rUa#~q*Q6% zNslL#UUy6mJF)+o1d1B3s7b-OLJ!?os5on-u$EP<7rEL5m2IWG z#e~>%^$-61dl-k3@`#24nSPY0whuxf*CX!{Z=v*frJsnF9ZFI{M=Oil*GK8 z)Z*eq{lv_ow4D5M{eqm*^vt|splp72v3`7fW?p7Ve7s&k+*!`f zth;+aak+a4D8#uaeYE+N3neOU3%0^_&OxIMh8Cf&iwRP<$8nvkX zSfe&Gk4@L1&I7IA`YZI-`g+#c{pkKfZ}{o(BPg^#^acAkN%lcryO+d!$!-*~SO(a* zyiftW#INDOG*_pZYf#-aso`4m+E?0GR}*$zn>w`hSc56QLp!wlSa)0WRk}udAhqdh zbc1e!)S-l4pf7;brLWVA^b$yG^bLBMUID2`uhSRlOCYV&H|Z9=3epC>L0_h?fVBCQ zW@+vPSkPNpM=6Zj`@hvStxMqg>`=WVUvPguXPn4{d=g8>{Xmj0LOCIM^&MaOBUrlX zlfT+c;+O?8Ok#2;3DOy$;A2TOnMgSoJA(mD0QMK!}8in3(7iy?UU;*kEAwJ@49)*GbfXN*qnUJ@L5YY~K$WwN!)b3$8 zV@WFSJ=h_)-+^nTy1Olf^_@|+u0ZrC8~bS_149{@2Ha|z3HK)4VjCV(JJzN;)v57B z*R`d-)I4LUP0gi#jIsr?HMK! zztjHq?qHJ4*ns+R#Dwq%e#pmBaxj=j>0THMs7?08AV&n{?fD_^!s0I@3T=33`i9}? zJ$-Re;fE%_T`nd7(^nT_=fJ%SxA+x202^%u*iH4NJ~gPZ-{EgkGcUm#^Vo(hnptUlW=Cn;)w0&&w1;`!Rk=+n@(KWfY zG{*YU{EZHrbQd=(+quhRFkzzFrn$GNsuwpE2y_Hn?I_@kiq+915ps1jPqCPDGH0Cd znJ+UtN`Q{6JqO+n5>ELR)bgvUi2$`{KHtEC3FeYH=^O@U94(h4^nHt?)n*o&L}sQu z%v!}3;z(rvxPndErMeamvp9uVFnyX-IwaChK^Q{vo;?OPx$ z!6gI3y`>R5$oy1SJQ>HDV#-_^$BOA=$S z2=1)*1D^{Zvfe2qhlu!s-1(5C%G3)d7cyGSHtgF9b^aRY%x}QM?cwSwri#s2>QKUZ zA>SUga#=fnLq~46tc&AQQ1iE7f*-;yaA#=tWxZ`Iw(g+=SDPruwVW$JQ1QaXDE}00 zaSa{_C^#RJ>Q9W7y~GWI25u<>H+-WEKjEYr>aB^3?_@;SG44$7CU4NAwAy^|9v`9mrBcmfnndwDQIJ@m&7cyQ0lj8 z9x^*e|NDSVO#%NN7Eih>j^5>CxbskCZ-AEq{5NX+aqMT^JBvS7+J6!YaLurHhyX_u zF#_3^%EKTNf;{f5f2w-^$2Sc`X^a|xWHsk0Ywy-gn&zIFu!wW;#XRRB;;U>M@LgQ4 zv8{)FW`zGx>0BZVS?eL9i#g^uB3pa;=LIdQROxvN+qD7&Y|{i6P9Ly_bI^SUkp0C8 zbg!I27YF`k*YPU!Jmor++gO@oW2syP#L~Z1irZ$ZePD?&b3 zS=#>(A9+^!G&;I;`Q^1z_6H{Afn-S>RG-84}UttL+yF~H}^xa?5BF8 zMw=!B{5C@VFW`>(=%W+3H#|~f@Mm`THS7=4(JWLUb(BW?P9R1oOaO`0(vYnR%d<92=oi2w2g>J4jejIDFb z{`>k6|BbuqCofdb7w}U;?9Z6zWnIskB{Ypt?s<^X`cYBS@;nGn@VcWu639dN27#w~ zb?8y~8ON+vrB@tt0FI%GqkH7IH96V|N42x{;%h>{qR8{={0tzvgpKf2$uafCj|Jjo z6grk}85^zk)h=GWZm0ZH?S^2uc=%MuSNEmjgT%J_5V4I`;i1S)XyUet^MpF(s@}B% Wxs)Gjf1Km+KAeT}<5tgrKl2}FSh2tW literal 0 HcmV?d00001 diff --git a/operators/__pycache__/__init__.cpython-36.pyc b/operators/__pycache__/__init__.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..af70d7af43575ddee0f7f8cf86c679671312e3c3 GIT binary patch literal 135 zcmXr!<>e~>%^$-61dl-k3@`#24nSPY0whuxf*CX!{Z=v*frJsnFBAQY{M=Oil*GK8 z)Z*eq{lv_ow4D5M{eqm*^vt|s{rrN|qQsK?qGJ8{_{_Y_lK6PNg34PQHo5sJr8%i~ KAk&J0m;nG2`XH|W literal 0 HcmV?d00001 diff --git a/operators/__pycache__/mongo_to_gcs_operator.cpython-36.pyc b/operators/__pycache__/mongo_to_gcs_operator.cpython-36.pyc new file mode 100644 index 0000000000000000000000000000000000000000..7d7f2dd6cc7aea0b95b5d34a8029f621cfd00d40 GIT binary patch literal 3603 zcmaJ^QE%k75f&xwu2x?A9LEXJ+$9j&mn@R3?^2)->A^_jq*oM(lg2I*2nhtP$X!d; zN;;CNvxfY%2J+MI@~VeZKVxCYK_I^9^Ud8Op!i-u9pTYwz2C z?^xDx2&cI+?rZg*~(8X8pw=9GG!0^A^K! zNUUcT>$AbN#Rh!1azdZkC)U{eA85A5L=RJ0=@rZFYB7Zj#r)YLsrJCL8ifLkG97FbKbQkyw*J9K}rW`wz{ckP9} z#;&l>w%L&R*CZUUEjGHgLhlF5wZb9Txuf@tNvFlLqmvGiJ^~m&fXA!_9~=g-Y9>A^ zsi2EsfyapABG03QeT0(jb9%-DSyUqCjZAhq!LL!-vpOs zjjA=`2DnaU6IQpj-hVF^ql!);2se_!OIV%E%SheIt=;z`J+HV}-WCiRq>{(UEa9y6 zqY--P4UiigBruoSGPcWY9?7#=3w1Jz&K&g-G261CF2*@8Zpp zjF(iM-9@}t$vvPO=y(qzN^WK)o^-RG+o0MBBGP>H}z!L8mN5 zYHMYucJ0-}e-L8T@E$q!0D8A}pzqZ-^!?g}exSVCN(XAVvc<=YN8y?X1`nYGuf zv8mqeq`IULcsA`PGRdW)dCaw&B~t0FL`G#&@+`@D`ii?9XtL9+kL8HHUJ1E+E+D)q>GEr=&C*@LX0b&46?n7q{_v}9Dx9v>_#?Ix# z&$05oP%S5&V`Ce4!Gn%MRY%a|PtYj~f=N~+wO7`atsvN`yCPRkZKn{>v=1vlgVcjL z2!1*ghqasfFxpy?+=ua~8DF9FHng3(w*q}vef5A5q`;56SA%+x?$)4-SYN}n^%v_a z>l>?f($_$K58CUDtQ@$y93;x+?@WD#{0Kh#p2wAfLNi?kipq1KNfY0Isz6k^N)~)A z90H9LUz9*2a&bJ~7f5mwQti#TGPxu0dTpXN!nv**3b=+L=#AK#RmcRfhbu!mp9?;x ziZ@hqXGzYqA2s(uGF$4Q5;T{yLM*fk;mmdKLL`cZ{i@6g%5Jzb_QV^gasauk`1eZt z9h>t!^g6<8KQ1~uFl}U&DXBeHKym|U=*$u(-a`9K_t0qMjk|siFLD<;%iSTqvrpb} zcFBnB+m~-`ph&w(6F)|W;e)@}j-VO70U6*+gxS|n89uczybvho@}FzppVD(K0}2-@ zoJr7W#)Egur8!t|R0&y#0RH|U1YM5t-c=M5KA#6HG50SOiP_H93I$h{$U$XW+nW34 zZ(K;ct}96Nc~CCZ8N8%LlCSs01Z>6!36I6)1e$yV-5iMi-ToC(K=+y`JB2T(zuQ0C z&s{hRf%F|PKBM5~v8!GDGSWR(ElMd+llTC-v90aNW9>i??zsLJ|0&j+xLA~je?`Cs z{qo5Mf8TIs@-!=An#n^Lbi2V07Y{Uc8kV@gA)!5-La*Mz6~6|sFFQc1clq`!0oPRk z&BJB_jMTp!TnsJ!6#Te$>X)L3p@>0<%B2Aa;wGjvXF(S!3<-ptVvK`}M4bhk#?WPa z!E+T%dH3OjiQPRk`18pZ$ARP`p;>Zih_mLza&5qdQ?1kO-Ejalw_1R8c@dP2sYs6m zCOBvOhExfk1eig%MDL!ZnBf}e$rUo0b!SW~sf;2Sr~;0$HVV%GR0d9Bg#zLim;mPd z&F6v%#x4&H=zY>USrAf90NVvdUsVJ?E!G3@cHT(gN_Mj;@W z8swrk?n3+q)PLPUSl;EMS3!)4cpqfp28HUI(*=j>>qpU|U{!|WQ4~F|Xx6R_q6iRS z`T-W|49aN)-;o)U=6VzFfhI#~fi~#v_CAeymPOHxnSl8#l*FQEJ|OUmM&jM=z2-We zJ$mT9;SbEpcuTue$mJ%9Cm^EjLh82{=AVS-_MM!;|2%TiOySsE9N|0djXl9~C6Z|? uh6UOP;Z34o~7ojs1>)pwi>shPHcO&uKl3VHtn2gw8QMV z>2;~Grw#3Y;Y4kym))sI6F-+>w2N;)r^zJ!F-?!)rM z2jVddnC9tJ1Fdw=qy{x7hG)Ih#yYiV>&*6Al^$N34rrG;XS&y+J=#ChJo}|)XE0L5|PQA_Su+wR_$dVi1+PGUDkrTrjH5q78ko=s@2uy3+xL=$_-s8r2|Xt=;|a%MNE*L;^NkFIysqRKeST zU)sJOCZY8Ga`0A^md?5VO1oO32jxa1)WinItCdbD!bC`t1gx~8P{^_u3O@@o7KI5b zy9*ET4vL#un(;1-hE8b-7L9oyUN~CCTdT3^h*IZs$h#W9hMn4P5k3)R`{)>dF)R9h z5D_7K|8MQhqs_xKW}B2G5fg%Jl8}$1^msFi@(C!pnbv7iY*t7z^+CO6c9hL{4}d;~ zCay!Lb*~z2yTqwwwb;jU596H^GCNqKS8)iRaKs4Sc81^{Fw}P7MjZ zomzAK)GUlCczfE074YEHhB@%{w9B^(YwEzLH`fyf#{Fu1iqZpUYej1g=bg3-bGnWk zm~KG30_`f*Utz_;N@AXTt&$_M{JR${$R%WngPALdI0D~SUUP|bd7j8HW((n=;q|Jx z@)Hs><*?o1HI4*U4lC^mlPXnkEQHESULWV0jKhK$g)PedT*$e484NsHA>(opx zogi&=z;FRmOS%F9P|gS!=puUf1uM8^+1a%M7UHOWg_3 zMEKw@wjF4SP2g+zN}$FW#QC=xdQW-iuZ7X~$q^GSf$|3z;p8AzUK`IWxQh%JwD9Yt zY|%W<1jm#3XV49e(iq(@O)&ihtG{s`V)_WuINN@TfED`5{mbS)C&}nxlm;Xc+c0Q0 zgBljzXJ}T;utG4WXX6wa^#Uxv3s_GYJgs$d=Y4pK5J%;(Dfc4PmtBjFB|Bing-L(m zX#gn+9F)z}`EF2olqA$`{DckxrxWzBdmPF`myrOvh{Y_C?g48)c2w1>JK{dsd%Eii z#zPW?CyF);M$8rpT)xyIy6z2KNT7KP$|b3rRhl9f$NX$#X`V`r(u3r^jC9q6J6AZ1$xB&x2M*9?q zSP}5tu|tCoL4bdO-P^Ivzl6Eppf&W`e2-;Tu}hr?0GM<<3ypCR0qe)F4T5H$++{oD42(@`r!$FmL~jH~$l9OS%@ zV)aE|mDJH8{C7l*swo_+3gO+T3va!=x8IUHi*p65k;=sO!^#xf Q$O3`iP<Y5c}ub>_Y@n9JQWyW=x+*Jyd)g0ivGGRwxZJUhrg&GSA!>bsj&tNBqR`Da5e zq9PX<@jvIoG>Z9W8INh{^dVO1k=v-aLF{!bW_E0Lv*WP0zA?Hc^H}3mqf=u(TX|)4 z>a59D@pjo7yN0*NuCp6>H@-0(qvM0QTjh=2&AnVPHf16DPs1?$0MG7b@aRdP2G#f0 zFcQ%qkegwgXIYrAEt)lZ_6~U<^O1=0+6-t8L@1Ix3li4W6pE7}4PHSYkcCLHgSpN2 zM{%DQb9)6BH`8q_h=2X~NydjH-Na#8$%1wuOleH_%u(biZX@|EhTXbgB(gxw@9xY!CF+|P#uwkT~q z2!7iQ;=??bICqeYLsQE8EXAf~w7&rAk` zRXnwE9uv6~u|aea?8DRWB8X(b6R;zKsM1mZBZ60@0#P05#a5s7LQNI1DHRf%2YKbV zgQFx(mtyT@!jG^tQk+ErTu2aQY$~H7s?iF-EJg4c`4%XR3$1nq)gbRGQbazgmI)De7p7>x874czZR=_@ojkJrNX=ick3P$McC?eIV)aQROiCOP?1I&!4jWGLO z-V8v;Fp`opQcsqPL6oMjgak@7ZrgM;MszNg(p+>7Qp%uFD)S&J3O>MLqr;U~gqI>m zj*$fjodmKsij0C7E+hiSu2ReupIW9R5BG8$BV zd}gu5t^2%v&<_5v9*<;^54b=YvAz{t_&jYXhPKYRyhUzICkq=hoLT^0B~FWVIqUs|ccW2S zz!-V34dh)2s#oeF*w6EnN7~Jlb>w9*Zh3t^=9hPL&t)T&bsf}pwqsu`)OYH#A9ZcG}e{fW^- zX5Dwi7sU#*&&)r9O3!CbzafNb)u@HBI*l=d^6ni$ER}A>_4rrMA*uvahdfTYC)1#? zt|eHfps-%eT~~2l`&Av|+I!pMr@vE}3I;%~3NDIJ0;?;pQRxzSh2BR+RUYh#5#P`z zX<3RTu>!tY&9Wv(2p**uhDnwbVOaXJ#JFsrt@Bsmk{GK~qo~BYZ0b+VobYIdr@Pp` zK>lu&b#3*s@$2K5t8kAk9m&(KSi`s=`zX@lI<<&|xIxuTs)&Sm3svbv;y{-5{t>1*&x^ajzyuqR@!hxbI;6A46LMC4M4SaX3=#|K?{0t zt2dS~H}nvf#N956yp8Gr+E6Ir)ha;jq#7i+@B%si@d!t3As!1%gQ9T zpuPj+)IRI~};A?uHg7dvY1T;G?*#O~D#7jqnD5e&`{aSLyPH)18C zPticstWht(u>u-Riov~AX0rNg$5c|NIdFca@xuRdy4veIe=)^%HP*O#y&CsvT)}H) zXL}E@qBxTL8(13Od=~X77yg{Tz-_u{;{kS*5nbHRaf4ywd(~hdM{$g((j8G^-bIX* zhe(I;D7cRARJ6A7*GYPw;`m1wx&nOZrL8si8r=IzKS5eAizq`jt%9Wu4_4Z6Yh^7# z!j5=HzAIybj-A**Rk~?@fS7hrI!V^eJO2JC?T1GK(`QI*d2?={-T z=Hll>ogAK^vnEKca<$4DVxFd9I1?S{xI>dDk_qySlp9Iv@*X&rVL}fc)rPrt%Ukv8 zp08%LR>Zrs;saEjcXd{wbB;Ft%jnCt?xMD!P}X_7%1o-^-OW&qk$)@KrehG7r&3Ue n?zqDfMMEQiIGZu8Tf}ndJ=QvULLO97?&~2sHDR-+)wI6{9^k3L literal 0 HcmV?d00001 diff --git a/operators/mongo_to_gcs_operator.py b/operators/mongo_to_gcs_operator.py new file mode 100644 index 0000000..32938f4 --- /dev/null +++ b/operators/mongo_to_gcs_operator.py @@ -0,0 +1,95 @@ +from bson import json_util +import json +import os + +from mongo_plugin.hooks.mongo_hook import MongoHook +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator + + +class MongoToGCSOperator(BaseOperator): + """ + Mongo -> GCS + :param mongo_conn_id: The source mongo connection id. + :type mongo_conn_id: string + :param mongo_collection: The source mongo collection. + :type mongo_collection: string + :param mongo_database: The source mongo database. + :type mongo_database: string + :param mongo_query: The specified mongo query. + :type mongo_query: string + :param gcs_bucket: The destination gcs bucket. + :type gcs_bucket: string + :param gcs_object: The destination gcs filepath. + :type gcs_object: string + :param gcs_conn_id: The destination gcs connnection id. + :type gcs_conn_id: string + """ + + # TODO This currently sets job = queued and locks job + template_fields = ['gcs_object', 'mongo_query'] + + def __init__(self, + mongo_conn_id, + mongo_collection, + mongo_database, + mongo_query, + gcs_bucket, + gcs_object, + gcs_conn_id, + *args, **kwargs): + super(MongoToGCSOperator, self).__init__(*args, **kwargs) + # Conn Ids + self.mongo_conn_id = mongo_conn_id + self.gcs_conn_id = gcs_conn_id + # Mongo Query Settings + self.mongo_db = mongo_database + self.mongo_collection = mongo_collection + # Grab query and determine if we need to run an aggregate pipeline + self.mongo_query = mongo_query + self.is_pipeline = True if isinstance(self.mongo_query, list) else False + + # GCS Settings + self.gcs_bucket = gcs_bucket + self.gcs_object = gcs_object + + # KWARGS + self.replace = kwargs.pop('replace', False) + + def execute(self, context): + """ + Executed by task_instance at runtime + """ + mongo_conn = MongoHook(self.mongo_conn_id).get_conn() + gcs_conn = GoogleCloudStorageHook(self.gcs_conn_id) + + # Grab collection and execute query according to whether or not it is a pipeline + collection = mongo_conn.get_database(self.mongo_db).get_collection(self.mongo_collection) + results = collection.aggregate(self.mongo_query) if self.is_pipeline else collection.find(self.mongo_query) + + # Performs transform then stringifies the docs results into json format + docs_str = self._stringify(self.transform(results)) + with open("__temp__", "w") as fid: + fid.write(docs_str) + + gcs_conn.upload(self.gcs_bucket, self.gcs_object, "__temp__") + + #os.remove("__temp__") + #s3_conn.load_string(docs_str, self.s3_key, bucket_name=self.s3_bucket, replace=self.replace) + + def _stringify(self, iter, joinable='\n'): + """ + Takes an interable (pymongo Cursor or Array) containing dictionaries and + returns a stringified version using python join + """ + return joinable.join([json.dumps(doc, default=json_util.default) for doc in iter]) + + def transform(self, docs): + """ + Processes pyMongo cursor and returns single array with each element being + a JSON serializable dictionary + MongoToGCSOperator.transform() assumes no processing is needed + ie. docs is a pyMongo cursor of documents and cursor just needs to be + converted into an array. + """ + return [doc for doc in docs] From e0a7c960d31c574900ab30902393c88f6d8e3e09 Mon Sep 17 00:00:00 2001 From: Danillo Lessa Bernardineli Date: Thu, 20 Dec 2018 11:12:06 -0200 Subject: [PATCH 2/2] Update README.md --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 2185339..42721bf 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ MongoDB operators and hooks for Airflow This plugin contains an hook for connecting with Airflow and also some operators for syncing with Amazon S3 and Google Cloud Storage: +* MongoHook * S3ToMongoOperator * MongoToS3Operator * MongoToGCSOperator