diff --git a/libs/wire-subsystems/src/Wire/GalleyAPIAccess/Rpc.hs b/libs/wire-subsystems/src/Wire/GalleyAPIAccess/Rpc.hs index 2562ae6c29..5f8b3e7827 100644 --- a/libs/wire-subsystems/src/Wire/GalleyAPIAccess/Rpc.hs +++ b/libs/wire-subsystems/src/Wire/GalleyAPIAccess/Rpc.hs @@ -36,7 +36,6 @@ import Network.Wai.Utilities.Error qualified as Wai import Polysemy import Polysemy.Error import Polysemy.Input -import Polysemy.TinyLog import Servant.API (toHeader) import System.Logger.Message import Util.Options @@ -59,8 +58,7 @@ import Wire.Rpc interpretGalleyAPIAccessToRpc :: ( Member (Error ParseException) r, - Member Rpc r, - Member TinyLog r + Member Rpc r ) => Set Version -> Endpoint -> @@ -99,17 +97,13 @@ interpretGalleyAPIAccessToRpc disabledVersions galleyEndpoint = InternalGetConversation id' -> internalGetConversation id' getUserLegalholdStatus :: - ( Member TinyLog r, - Member (Error ParseException) r, + ( Member (Error ParseException) r, Member Rpc r ) => Local UserId -> TeamId -> Sem (Input Endpoint : r) UserLegalHoldStatusResponse getUserLegalholdStatus luid tid = do - debug $ - remote "galley" - . msg (val "get legalhold user status") decodeBodyOrThrow "galley" =<< galleyRequest do method GET . paths ["teams", toByteString' tid, "legalhold", toByteString' (tUnqualified luid)] @@ -124,16 +118,12 @@ galleyRequest req = do -- | Calls 'Galley.API.createSelfConversationH'. createSelfConv :: ( Member Rpc r, - Member TinyLog r, Member (Input Endpoint) r ) => Version -> UserId -> Sem r () createSelfConv v u = do - debug $ - remote "galley" - . msg (val "Creating self conversation") void $ galleyRequest $ method POST @@ -145,19 +135,13 @@ createSelfConv v u = do getConv :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => Version -> UserId -> Local ConvId -> Sem r (Maybe OwnConversation) getConv v usr lcnv = do - debug $ - remote "galley" - . field "domain" (toByteString (tDomain lcnv)) - . field "conv" (toByteString (tUnqualified lcnv)) - . msg (val "Getting conversation") rs <- galleyRequest req case Bilge.statusCode rs of 200 -> Just <$> decodeBodyOrThrow "galley" rs @@ -178,8 +162,7 @@ getConv v usr lcnv = do getTeamConv :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => Version -> UserId -> @@ -187,10 +170,6 @@ getTeamConv :: ConvId -> Sem r (Maybe Conv.TeamConversation) getTeamConv v usr tid cnv = do - debug $ - remote "galley" - . field "conv" (toByteString cnv) - . msg (val "Getting team conversation") rs <- galleyRequest req case Bilge.statusCode rs of 200 -> Just <$> decodeBodyOrThrow "galley" rs @@ -211,18 +190,12 @@ getTeamConv v usr tid cnv = do -- | Calls 'Galley.API.addClientH'. newClient :: ( Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => UserId -> ClientId -> Sem r () newClient u c = do - debug $ - remote "galley" - . field "user" (toByteString u) - . field "client" (toByteString c) - . msg (val "new client") void . galleyRequest $ method POST . paths ["i", "clients", toByteString' c] @@ -232,15 +205,11 @@ newClient u c = do -- | Calls 'Galley.API.canUserJoinTeamH'. checkUserCanJoinTeam :: ( Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Sem r (Maybe Wai.Error) checkUserCanJoinTeam tid = do - debug $ - remote "galley" - . msg (val "Check if can add member to team") rs <- galleyRequest req pure $ case Bilge.statusCode rs of 200 -> Nothing @@ -256,8 +225,7 @@ checkUserCanJoinTeam tid = do -- | Calls 'Galley.API.uncheckedAddTeamMemberH'. addTeamMember :: ( Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => UserId -> TeamId -> @@ -265,9 +233,6 @@ addTeamMember :: Role -> Sem r Bool addTeamMember u tid minvmeta role = do - debug $ - remote "galley" - . msg (val "Adding member to team") rs <- galleyRequest req pure $ case Bilge.statusCode rs of 200 -> True @@ -286,17 +251,13 @@ addTeamMember u tid minvmeta role = do -- | Calls 'Galley.API.createBindingTeamH'. createTeam :: ( Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => UserId -> NewTeam -> TeamId -> Sem r () createTeam u t teamid = do - debug $ - remote "galley" - . msg (val "Creating Team") void $ galleyRequest $ req teamid where req tid = @@ -311,16 +272,12 @@ createTeam u t teamid = do getTeamMember :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => UserId -> TeamId -> Sem r (Maybe TeamMember) getTeamMember u tid = do - debug $ - remote "galley" - . msg (val "Get team member") rs <- galleyRequest req case Bilge.statusCode rs of 200 -> Just <$> decodeBodyOrThrow "galley" rs @@ -340,14 +297,12 @@ getTeamMember u tid = do getTeamMembers :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Maybe (Range 1 HardTruncationLimit Int32) -> Sem r TeamMemberList getTeamMembers tid maxResults = do - debug $ remote "galley" . msg (val "Get team members") galleyRequest req >>= decodeBodyOrThrow "galley" where req = @@ -359,14 +314,12 @@ getTeamMembers tid maxResults = do selectTeamMemberInfos :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> [UserId] -> Sem r TeamMemberInfoList selectTeamMemberInfos tid uids = do - debug $ remote "galley" . msg (val "Select team members") let bdy = UserIds uids galleyRequest (req bdy) >>= decodeBodyOrThrow "galley" where @@ -380,13 +333,11 @@ selectTeamMemberInfos tid uids = do getTeamAdmins :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Sem r TeamMemberList getTeamAdmins tid = do - debug $ remote "galley" . msg (val "Get team admins") galleyRequest req >>= decodeBodyOrThrow "galley" where req = @@ -410,13 +361,11 @@ memberIsTeamOwner tid uid = do getTeamId :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => UserId -> Sem r (Maybe TeamId) getTeamId u = do - debug $ remote "galley" . msg (val "Get team from user") rs <- galleyRequest req case Bilge.statusCode rs of 200 -> Just <$> decodeBodyOrThrow "galley" rs @@ -431,13 +380,11 @@ getTeamId u = do getTeam :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Sem r Team.TeamData getTeam tid = do - debug $ remote "galley" . msg (val "Get team info") galleyRequest req >>= decodeBodyOrThrow "galley" where req = @@ -449,13 +396,11 @@ getTeam tid = do getTeamName :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Sem r Team.TeamName getTeamName tid = do - debug $ remote "galley" . msg (val "Get team info") galleyRequest req >>= decodeBodyOrThrow "galley" where req = @@ -467,13 +412,11 @@ getTeamName tid = do getTeamLegalHoldStatus :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Sem r (LockableFeature LegalholdConfig) getTeamLegalHoldStatus tid = do - debug $ remote "galley" . msg (val "Get legalhold settings") galleyRequest req >>= decodeBodyOrThrow "galley" where req = @@ -485,14 +428,12 @@ getTeamLegalHoldStatus tid = do getTeamSearchVisibility :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Sem r TeamSearchVisibility getTeamSearchVisibility tid = coerce @TeamSearchVisibilityView @TeamSearchVisibility <$> do - debug $ remote "galley" . msg (val "Get search visibility settings") galleyRequest req >>= decodeBodyOrThrow "galley" where req = @@ -504,14 +445,12 @@ getFeatureConfigForTeam :: forall feature r. ( IsFeatureConfig feature, Typeable feature, - Member TinyLog r, Member Rpc r, Member (Error ParseException) r ) => TeamId -> Sem (Input Endpoint : r) (LockableFeature feature) getFeatureConfigForTeam tid = do - debug $ remote "galley" . msg (val "Get feature config for team") galleyRequest req >>= decodeBodyOrThrow "galley" where req = @@ -522,13 +461,11 @@ getFeatureConfigForTeam tid = do getVerificationCodeEnabled :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Sem r Bool getVerificationCodeEnabled tid = do - debug $ remote "galley" . msg (val "Get snd factor password challenge settings") response <- galleyRequest req status <- (.status) <$> decodeBodyOrThrow @(LockableFeature SndFactorPasswordChallengeConfig) "galley" response case status of @@ -558,15 +495,13 @@ getAllTeamFeaturesForUser mbUserId = -- | Calls 'Galley.API.updateTeamStatusH'. changeTeamStatus :: ( Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => TeamId -> Team.TeamStatus -> Maybe Currency.Alpha -> Sem r () changeTeamStatus tid s cur = do - debug $ remote "galley" . msg (val "Change Team status") void $ galleyRequest req where req = @@ -579,13 +514,11 @@ changeTeamStatus tid s cur = do getTeamExposeInvitationURLsToTeamAdmin :: ( Member Rpc r, Member (Input Endpoint) r, - Member (Error ParseException) r, - Member TinyLog r + Member (Error ParseException) r ) => TeamId -> Sem r ShowOrHideInvitationUrl getTeamExposeInvitationURLsToTeamAdmin tid = do - debug $ remote "galley" . msg (val "Get expose invitation URLs to team admin settings") response <- galleyRequest req status <- (.status) <$> decodeBodyOrThrow @(LockableFeature ExposeInvitationURLsToTeamAdminConfig) "galley" response case status of @@ -600,14 +533,12 @@ getTeamExposeInvitationURLsToTeamAdmin tid = do checkMLSOne2OneEstablished :: ( Member (Error ParseException) r, Member (Input Endpoint) r, - Member Rpc r, - Member TinyLog r + Member Rpc r ) => Local UserId -> Qualified UserId -> Sem r MLSOneToOneEstablished checkMLSOne2OneEstablished self (Qualified other otherDomain) = do - debug $ remote "galley" . msg (val "Get the MLS one-to-one conversation") responseSelf <- galleyRequest req case HTTP.statusCode (HTTP.responseStatus responseSelf) of 200 -> do @@ -631,8 +562,7 @@ checkMLSOne2OneEstablished self (Qualified other otherDomain) = do unblockConversation :: ( Member (Error ParseException) r, Member (Input Endpoint) r, - Member Rpc r, - Member TinyLog r + Member Rpc r ) => Version -> Local UserId -> @@ -640,11 +570,6 @@ unblockConversation :: Qualified ConvId -> Sem r OwnConversation unblockConversation v lusr mconn (Qualified cnv cdom) = do - debug $ - remote "galley" - . field "conv" (toByteString cnv) - . field "domain" (toByteString cdom) - . msg (val "Unblocking conversation") void $ galleyRequest putReq galleyRequest getReq >>= decodeBodyOrThrow @OwnConversation "galley" where @@ -665,17 +590,13 @@ remote = field "remote" getEJPDConvInfo :: forall r. - ( Member TinyLog r, - Member (Error ParseException) r, + ( Member (Error ParseException) r, Member (Input Endpoint) r, Member Rpc r ) => UserId -> Sem r [EJPDConvInfo] getEJPDConvInfo uid = do - debug $ - remote "galley" - . msg (val "get conversation info for ejpd") decodeBodyOrThrow "galley" =<< galleyRequest getReq where getReq = @@ -685,16 +606,11 @@ getEJPDConvInfo uid = do internalGetConversation :: ( Member (Error ParseException) r, Member Rpc r, - Member (Input Endpoint) r, - Member TinyLog r + Member (Input Endpoint) r ) => ConvId -> Sem r (Maybe Conversation) internalGetConversation convId = do - debug $ - remote "galley" - . field "conv" (toByteString convId) - . msg (val "Getting conversation (internal)") rs <- galleyRequest req case Bilge.statusCode rs of 200 -> Just <$> decodeBodyOrThrow "galley" rs diff --git a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs b/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs deleted file mode 100644 index 66969fe61d..0000000000 --- a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk.hs +++ /dev/null @@ -1,22 +0,0 @@ -{-# LANGUAGE TemplateHaskell #-} - -module Wire.IndexedUserStore.Bulk where - -import Polysemy -import Wire.UserSearch.Migration - --- | Increase this number any time you want to force reindexing. -expectedMigrationVersion :: MigrationVersion -expectedMigrationVersion = MigrationVersion 6 - --- | Bulk operations, must not be used from any web handler -data IndexedUserStoreBulk m a where - -- | Only changes data if it is not updated since last update, use when users - -- need to be synced because of an outage, or migrating to a new ES instance. - SyncAllUsers :: IndexedUserStoreBulk m () - -- | Overwrite all users in the ES index, use it when trying to fix some - -- inconsistency or while introducing a new field in the mapping. - ForceSyncAllUsers :: IndexedUserStoreBulk m () - MigrateData :: IndexedUserStoreBulk m () - -makeSem ''IndexedUserStoreBulk diff --git a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs b/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs index 95166e2081..ee6e733e46 100644 --- a/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs +++ b/libs/wire-subsystems/src/Wire/IndexedUserStore/Bulk/ElasticSearch.hs @@ -3,105 +3,148 @@ module Wire.IndexedUserStore.Bulk.ElasticSearch where import Cassandra.Exec (paginateWithStateC) import Cassandra.Util (Writetime (Writetime)) import Conduit (ConduitT, runConduit, (.|)) +import Control.Error (headMay) +import Control.Exception (try) +import Control.Monad.Extra (mapMaybeM) import Data.Conduit.Combinators qualified as Conduit +import Data.Conduit.Internal (zipSources) +import Data.Conduit.List qualified as CL import Data.Id import Data.Json.Util (UTCTimeMillis (fromUTCTimeMillis)) import Data.Map qualified as Map import Database.Bloodhound qualified as ES import Imports import Polysemy -import Polysemy.Error +import Polysemy.Error hiding (try) import Polysemy.TinyLog import Polysemy.TinyLog qualified as Log import System.Logger.Message qualified as Log +import UnliftIO (pooledForConcurrentlyN) +import Wire.API.Federation.Client (FederatorClient) +import Wire.API.Federation.Error (FederationError) import Wire.API.Team.Feature import Wire.API.Team.Member.Info import Wire.API.Team.Role +import Wire.BlockListStore (BlockListStore) +import Wire.FederationAPIAccess (FederationAPIAccess) +import Wire.FederationConfigStore (FederationConfigStore) import Wire.GalleyAPIAccess -import Wire.IndexedUserStore (IndexedUserStore) +import Wire.IndexedUserStore (IndexedUserStore, IndexedUserStoreError) import Wire.IndexedUserStore qualified as IndexedUserStore -import Wire.IndexedUserStore.Bulk import Wire.IndexedUserStore.MigrationStore import Wire.IndexedUserStore.MigrationStore qualified as MigrationStore -import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (Unsafe), unsafePooledForConcurrentlyN) +import Wire.ParseException (ParseException) +import Wire.Rpc (Rpc) +import Wire.Sem.Concurrency (Concurrency, ConcurrencySafety (Unsafe)) +import Wire.Sem.Metrics (Metrics) +import Wire.UserKeyStore (UserKeyStore) import Wire.UserSearch.Migration import Wire.UserSearch.Types import Wire.UserStore import Wire.UserStore.IndexUser +import Wire.UserSubsystem.Error (UserSubsystemError) -interpretIndexedUserStoreBulk :: - ( Member TinyLog r, - Member UserStore r, - Member (Concurrency Unsafe) r, - Member GalleyAPIAccess r, - Member IndexedUserStore r, - Member (Error MigrationException) r, - Member IndexedUserMigrationStore r - ) => - InterpreterFor IndexedUserStoreBulk r -interpretIndexedUserStoreBulk = interpret \case - SyncAllUsers -> syncAllUsersImpl - ForceSyncAllUsers -> forceSyncAllUsersImpl - MigrateData -> migrateDataImpl - -syncAllUsersImpl :: - forall r. - ( Member UserStore r, - Member TinyLog r, - Member (Concurrency 'Unsafe) r, - Member GalleyAPIAccess r, - Member IndexedUserStore r - ) => - Sem r () -syncAllUsersImpl = syncAllUsersWithVersion ES.ExternalGT - -forceSyncAllUsersImpl :: - forall r. - ( Member UserStore r, - Member TinyLog r, - Member (Concurrency 'Unsafe) r, - Member GalleyAPIAccess r, - Member IndexedUserStore r - ) => - Sem r () -forceSyncAllUsersImpl = syncAllUsersWithVersion ES.ExternalGTE - -syncAllUsersWithVersion :: - forall r. - ( Member UserStore r, - Member TinyLog r, - Member (Concurrency 'Unsafe) r, - Member GalleyAPIAccess r, - Member IndexedUserStore r - ) => - (ES.ExternalDocVersion -> ES.VersionControl) -> - Sem r () -syncAllUsersWithVersion mkVersion = +type BulkEffectStack = + [ UserKeyStore, + BlockListStore, + Error UserSubsystemError, + FederationAPIAccess FederatorClient, + Error FederationError, + UserStore, + IndexedUserStore, + Error IndexedUserStoreError, + IndexedUserMigrationStore, + Error MigrationException, + FederationConfigStore, + GalleyAPIAccess, + Error ParseException, + Rpc, + Metrics, + TinyLog, + Concurrency 'Unsafe, + Embed IO, + Final IO + ] + +type BulkEffectStackInterpreter = forall a. Sem BulkEffectStack a -> IO a + +-- | Increase this number any time you want to force reindexing. +expectedMigrationVersion :: MigrationVersion +expectedMigrationVersion = MigrationVersion 6 + +syncAllUsers :: BulkEffectStackInterpreter -> IO () +syncAllUsers interpreter = syncAllUsersWithVersion interpreter ES.ExternalGT + +forceSyncAllUsers :: BulkEffectStackInterpreter -> IO () +forceSyncAllUsers interpreter = syncAllUsersWithVersion interpreter ES.ExternalGTE + +syncAllUsersWithVersion :: BulkEffectStackInterpreter -> (ES.ExternalDocVersion -> ES.VersionControl) -> IO () +syncAllUsersWithVersion interpreter mkVersion = runConduit $ - paginateWithStateC (getIndexUsersPaginated 1000) + zipSources (CL.sourceList [1 ..]) (paginateWithStateC (interpreter . getIndexUsersPaginated pageSize)) .| logPage .| mkUserDocs - .| Conduit.mapM_ IndexedUserStore.bulkUpsert + .| Conduit.mapM_ (interpreter . IndexedUserStore.bulkUpsert) where - logPage :: ConduitT [IndexUser] [IndexUser] (Sem r) () - logPage = Conduit.iterM $ \page -> do - info $ - Log.field "size" (length page) - . Log.msg (Log.val "Reindex: processing C* page") + pageSize = 10000 + + logPage :: ConduitT (Int32, [IndexUser]) [IndexUser] IO () + logPage = Conduit.mapM $ \(pageNumber, page) -> do + interpreter $ + info $ + Log.field "estimatedUserSoFar" (length page + fromIntegral (pageSize * pageNumber)) + . Log.msg (Log.val "Received user page") + . Log.field "firstUser" (maybe "N/A" (idToText . (.userId)) (headMay page)) + pure page - mkUserDocs :: ConduitT [IndexUser] [(ES.DocId, UserDoc, ES.VersionControl)] (Sem r) () + mkUserDocs :: ConduitT [IndexUser] [(ES.DocId, UserDoc, ES.VersionControl)] IO () mkUserDocs = Conduit.mapM $ \page -> do let teams :: Map TeamId [IndexUser] = Map.fromListWith (<>) $ mapMaybe (\u -> (,[u]) . value <$> u.teamId) page teamIds = Map.keys teams - visMap <- fmap Map.fromList . unsafePooledForConcurrentlyN 16 teamIds $ \t -> - (t,) <$> teamSearchVisibilityInbound t - roles :: Map UserId (WithWritetime Role) <- fmap (Map.fromList . concat) . unsafePooledForConcurrentlyN 16 (Map.toList teams) $ \(t, us) -> do - tms <- (.members) <$> selectTeamMemberInfos t (fmap (.userId) us) - pure $ mapMaybe mkRoleWithWriteTime tms - let vis indexUser = fromMaybe defaultSearchVisibilityInbound $ (flip Map.lookup visMap . value =<< indexUser.teamId) - mkUserDoc indexUser = indexUserToDoc (vis indexUser) ((.value) <$> Map.lookup indexUser.userId roles) indexUser - mkDocVersion u = mkVersion . ES.ExternalDocVersion . docVersion $ indexUserToVersion (Map.lookup u.userId roles) u - pure $ map (\u -> (userIdToDocId u.userId, mkUserDoc u, mkDocVersion u)) page + + visMap <- fmap Map.fromList . pooledForConcurrentlyN 16 teamIds $ \t -> do + x <- try $ interpreter $ teamSearchVisibilityInbound t + pure (t, x) + + roles :: Map UserId (Either SomeException (WithWritetime Role)) <- + fmap (Map.fromList . concat) . pooledForConcurrentlyN 16 (Map.toList teams) $ \(t, us) -> do + eithMembers <- try $ interpreter $ (.members) <$> selectTeamMemberInfos t (fmap (.userId) us) + case eithMembers of + Left e -> pure $ map (\iu -> (iu.userId, Left e)) us + Right tms -> pure $ mapMaybe (fmap rightSecond . mkRoleWithWriteTime) tms + + let vis :: IndexUser -> Either SomeException SearchVisibilityInbound + vis indexUser = do + let mTeam = value <$> indexUser.teamId + in fromMaybe (Right defaultSearchVisibilityInbound) $ flip Map.lookup visMap =<< mTeam + + mkUserDoc :: IndexUser -> Either SomeException UserDoc + mkUserDoc indexUser = do + currentVis <- vis indexUser + currentRole <- sequence $ Map.lookup indexUser.userId roles + pure $ indexUserToDoc currentVis ((.value) <$> currentRole) indexUser + + mkDocVersion :: IndexUser -> Either SomeException ES.VersionControl + mkDocVersion u = do + roleWithTime <- sequence (Map.lookup u.userId roles) + pure . mkVersion . ES.ExternalDocVersion . docVersion $ indexUserToVersion roleWithTime u + + let docsWithErrors = map (\u -> (userIdToDocId u.userId, mkUserDoc u, mkDocVersion u)) page + interpreter . flip mapMaybeM docsWithErrors $ logAndHush + + rightSecond :: (a, b) -> (a, Either c b) + rightSecond (a, b) = (a, Right b) + + logAndHush :: (Member TinyLog r) => (ES.DocId, Either SomeException UserDoc, Either SomeException ES.VersionControl) -> Sem r (Maybe (ES.DocId, UserDoc, ES.VersionControl)) + logAndHush (docId@(ES.DocId idText), eithUserDoc, eithVersion) = + case (,) <$> eithUserDoc <*> eithVersion of + Left e -> do + Log.err $ + Log.msg (Log.val "Error ocurred while indexing user") + . Log.field "userId" idText + . Log.field "error" (show e) + pure Nothing + Right (userDoc, version) -> pure $ Just (docId, userDoc, version) mkRoleWithWriteTime :: TeamMemberInfo -> Maybe (UserId, WithWritetime Role) mkRoleWithWriteTime tmi = @@ -115,17 +158,10 @@ syncAllUsersWithVersion mkVersion = ) <$> permissionsToRole tmi.permissions -migrateDataImpl :: - ( Member IndexedUserStore r, - Member (Error MigrationException) r, - Member IndexedUserMigrationStore r, - Member UserStore r, - Member (Concurrency Unsafe) r, - Member GalleyAPIAccess r, - Member TinyLog r - ) => - Sem r () -migrateDataImpl = do +migrateData :: + BulkEffectStackInterpreter -> + IO () +migrateData interpreter = interpreter $ do unlessM IndexedUserStore.doesIndexExist $ throw TargetIndexAbsent MigrationStore.ensureMigrationIndex @@ -136,7 +172,7 @@ migrateDataImpl = do Log.msg (Log.val "Migration necessary.") . Log.field "expectedVersion" expectedMigrationVersion . Log.field "foundVersion" foundVersion - forceSyncAllUsersImpl + embed $ forceSyncAllUsers interpreter MigrationStore.persistMigrationVersion expectedMigrationVersion else do Log.info $ diff --git a/libs/wire-subsystems/wire-subsystems.cabal b/libs/wire-subsystems/wire-subsystems.cabal index 8e7deafde7..e1ee09dc11 100644 --- a/libs/wire-subsystems/wire-subsystems.cabal +++ b/libs/wire-subsystems/wire-subsystems.cabal @@ -237,7 +237,6 @@ library Wire.HashPassword.Interpreter Wire.HashPassword.Scrypt Wire.IndexedUserStore - Wire.IndexedUserStore.Bulk Wire.IndexedUserStore.Bulk.ElasticSearch Wire.IndexedUserStore.ElasticSearch Wire.IndexedUserStore.MigrationStore diff --git a/services/brig/brig.cabal b/services/brig/brig.cabal index 67449f7bb0..4e094d0a38 100644 --- a/services/brig/brig.cabal +++ b/services/brig/brig.cabal @@ -341,6 +341,7 @@ executable brig-index build-depends: , base , brig + , extended , imports , optparse-applicative , tinylog diff --git a/services/brig/index/src/Main.hs b/services/brig/index/src/Main.hs index bf2412d475..99ea1f0c59 100644 --- a/services/brig/index/src/Main.hs +++ b/services/brig/index/src/Main.hs @@ -25,7 +25,7 @@ import Brig.Index.Options import Imports import Options.Applicative import System.Exit -import System.Logger.Class qualified as Log +import System.Logger.Extended qualified as Log main :: IO () main = do @@ -39,9 +39,4 @@ main = do header "brig-index" <> progDesc "Brig Search Index Utilities" <> fullDesc - initLogger = - Log.new -- TODO: use mkLogger'? - . Log.setOutput Log.StdOut - . Log.setFormat Nothing - . Log.setBufSize 0 - $ Log.defSettings + initLogger = Log.mkLogger Log.Debug Nothing (Just $ Last Log.JSON) diff --git a/services/brig/src/Brig/Index/Eval.hs b/services/brig/src/Brig/Index/Eval.hs index dbc6a74f95..4b0930d54c 100644 --- a/services/brig/src/Brig/Index/Eval.hs +++ b/services/brig/src/Brig/Index/Eval.hs @@ -25,7 +25,7 @@ import Brig.App (initHttpManagerWithTLSConfig, mkIndexEnv) import Brig.Index.Options import Brig.Options import Brig.User.Search.Index -import Cassandra (Client, runClient) +import Cassandra (Client, ClientState, runClient) import Cassandra.Options import Cassandra.Util (defInitCassandra) import Control.Exception (throwIO) @@ -40,69 +40,35 @@ import Data.Id import Database.Bloodhound qualified as ES import Database.Bloodhound.Internal.Client (BHEnv (..)) import Imports +import Network.HTTP.Client (Manager) import Polysemy import Polysemy.Embed (runEmbedded) import Polysemy.Error -import Polysemy.TinyLog hiding (Logger) import System.Logger qualified as Log import System.Logger.Class (Logger) import Util.Options (initCredentials) -import Wire.API.Federation.Client (FederatorClient) import Wire.API.Federation.Error -import Wire.BlockListStore (BlockListStore) import Wire.BlockListStore.Cassandra -import Wire.FederationAPIAccess import Wire.FederationAPIAccess.Interpreter (noFederationAPIAccess) -import Wire.FederationConfigStore (FederationConfigStore) import Wire.FederationConfigStore.Cassandra (interpretFederationDomainConfig) -import Wire.GalleyAPIAccess import Wire.GalleyAPIAccess.Rpc import Wire.IndexedUserStore -import Wire.IndexedUserStore.Bulk (IndexedUserStoreBulk) -import Wire.IndexedUserStore.Bulk qualified as IndexedUserStoreBulk -import Wire.IndexedUserStore.Bulk.ElasticSearch (interpretIndexedUserStoreBulk) +import Wire.IndexedUserStore.Bulk.ElasticSearch (BulkEffectStack) +import Wire.IndexedUserStore.Bulk.ElasticSearch qualified as IndexedUserStoreBulk import Wire.IndexedUserStore.ElasticSearch -import Wire.IndexedUserStore.MigrationStore (IndexedUserMigrationStore) import Wire.IndexedUserStore.MigrationStore.ElasticSearch import Wire.ParseException import Wire.Rpc -import Wire.Sem.Concurrency import Wire.Sem.Concurrency.IO import Wire.Sem.Logger.TinyLog -import Wire.Sem.Metrics import Wire.Sem.Metrics.IO -import Wire.UserKeyStore (UserKeyStore) import Wire.UserKeyStore.Cassandra import Wire.UserSearch.Migration (MigrationException) -import Wire.UserStore import Wire.UserStore.Cassandra import Wire.UserSubsystem.Error -type BrigIndexEffectStack = - [ IndexedUserStoreBulk, - UserKeyStore, - BlockListStore, - Error UserSubsystemError, - FederationAPIAccess FederatorClient, - Error FederationError, - UserStore, - IndexedUserStore, - Error IndexedUserStoreError, - IndexedUserMigrationStore, - Error MigrationException, - FederationConfigStore, - GalleyAPIAccess, - Error ParseException, - Rpc, - Metrics, - TinyLog, - Concurrency 'Unsafe, - Embed IO, - Final IO - ] - -runSem :: ESConnectionSettings -> CassandraSettings -> Endpoint -> Logger -> Sem BrigIndexEffectStack a -> IO a -runSem esConn cas galleyEndpoint logger action = do +mkSemDeps :: ESConnectionSettings -> CassandraSettings -> Logger -> IO (Manager, ClientState, BHEnv, IndexedUserStoreConfig, RequestId, IndexName) +mkSemDeps esConn cas logger = do mgr <- initHttpManagerWithTLSConfig esConn.esInsecureSkipVerifyTls esConn.esCaCert mEsCreds :: Maybe Credentials <- for esConn.esCredentials initCredentials casClient <- defInitCassandra (toCassandraOpts cas) logger @@ -123,6 +89,10 @@ runSem esConn cas galleyEndpoint logger action = do } reqId = (RequestId "brig-index") migrationIndexName = fromMaybe defaultMigrationIndexName (esMigrationIndexName esConn) + pure (mgr, casClient, bhEnv, indexedUserStoreConfig, reqId, migrationIndexName) + +runSem :: (Manager, ClientState, BHEnv, IndexedUserStoreConfig, RequestId, IndexName) -> Endpoint -> Logger -> Sem BulkEffectStack a -> IO a +runSem (mgr, casClient, bhEnv, indexedUserStoreConfig, reqId, migrationIndexName) galleyEndpoint logger action = do runFinal . embedToFinal . unsafelyPerformConcurrency @@ -144,7 +114,6 @@ runSem esConn cas galleyEndpoint logger action = do . throwErrorToIOFinal @UserSubsystemError . interpretBlockListStoreToCassandra casClient . interpretUserKeyStoreCassandra casClient - . interpretIndexedUserStoreBulk $ action throwErrorToIOFinal :: (Exception e, Member (Final IO) r) => InterpreterFor (Error e) r @@ -162,17 +131,17 @@ runCommand l = \case e <- initIndex l (es ^. esConnection) galley runIndexIO e $ resetIndex (mkCreateIndexSettings es) Reindex es cas galley -> do - runSem (es ^. esConnection) cas galley l $ - IndexedUserStoreBulk.syncAllUsers + semDeps <- mkSemDeps (es ^. esConnection) cas l + IndexedUserStoreBulk.syncAllUsers (runSem semDeps galley l) ReindexSameOrNewer es cas galley -> do - runSem (es ^. esConnection) cas galley l $ - IndexedUserStoreBulk.forceSyncAllUsers + semDeps <- mkSemDeps (es ^. esConnection) cas l + IndexedUserStoreBulk.forceSyncAllUsers (runSem semDeps galley l) UpdateMapping esConn galley -> do e <- initIndex l esConn galley runIndexIO e updateMapping Migrate es cas galley -> do - runSem (es ^. esConnection) cas galley l $ - IndexedUserStoreBulk.migrateData + semDeps <- mkSemDeps (es ^. esConnection) cas l + IndexedUserStoreBulk.migrateData (runSem semDeps galley l) ReindexFromAnotherIndex reindexSettings -> do mgr <- initHttpManagerWithTLSConfig