diff --git a/celements-s3/pom.xml b/celements-s3/pom.xml new file mode 100644 index 00000000..f6d1df17 --- /dev/null +++ b/celements-s3/pom.xml @@ -0,0 +1,45 @@ + + + com.celements + celements + 7.0-SNAPSHOT + + 4.0.0 + celements-s3 + 7.0-SNAPSHOT + Celements S3 Integration + + + com.celements + celements-config-source + 7.0-SNAPSHOT + provided + + + com.celements + celements-servlet + 7.0-SNAPSHOT + provided + + + com.celements + celements-model + 7.0-SNAPSHOT + provided + + + software.amazon.awssdk + s3 + 2.41.5 + + + + scm:git:git@github.com:celements/celements-base.git + scm:git:git@github.com:celements/celements-base.git + https://github.com/celements/celements-base/tree/dev/celements-s3 + HEAD + + diff --git a/celements-s3/src/main/java/com/celements/store/s3/S3Config.java b/celements-s3/src/main/java/com/celements/store/s3/S3Config.java new file mode 100644 index 00000000..681353bc --- /dev/null +++ b/celements-s3/src/main/java/com/celements/store/s3/S3Config.java @@ -0,0 +1,98 @@ +package com.celements.store.s3; + +import java.net.URI; +import java.util.Optional; + +import javax.annotation.Nullable; +import javax.inject.Inject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.xwiki.configuration.ConfigurationSource; + +import com.celements.configuration.CelementsAllPropertiesConfigurationSource; + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.checksums.RequestChecksumCalculation; +import software.amazon.awssdk.core.checksums.ResponseChecksumValidation; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; + +@Configuration +public class S3Config { + + private static final Logger LOGGER = LoggerFactory.getLogger(S3Config.class); + + private final ConfigurationSource cfgSrc; + + @Inject + public S3Config(CelementsAllPropertiesConfigurationSource cfgSrc) { + this.cfgSrc = cfgSrc; + } + + @Bean(destroyMethod = "close") + @Nullable + public S3Client s3Client() { + var endpoint = cfgSrc.getProperty("celements.s3.endpoint", "").trim(); + var region = cfgSrc.getProperty("celements.s3.region", "eu-central").trim(); + if (endpoint.isEmpty()) { + LOGGER.info("S3 endpoint not configured"); + return null; + } + var client = S3Client.builder() + .endpointOverride(URI.create(endpoint)) + .region(Region.of(region)) + .credentialsProvider(StaticCredentialsProvider.create(buildCredentials())) + .requestChecksumCalculation(RequestChecksumCalculation.WHEN_REQUIRED) + .responseChecksumValidation(ResponseChecksumValidation.WHEN_REQUIRED) + .build(); + testClient(client); + LOGGER.info("S3 configured: {}", endpoint); + return client; + } + + private void testClient(S3Client client) { + try { + client.listBuckets(); + } catch (Exception exc) { + client.close(); + throw exc; + } + } + + private AwsCredentials buildCredentials() { + var accessKey = cfgSrc.getProperty("celements.s3.accessKey", "").trim(); + var secretKey = cfgSrc.getProperty("celements.s3.secretKey", "").trim(); + if (accessKey.isEmpty() || secretKey.isEmpty()) { + throw new IllegalArgumentException("celements.s3.accessKey/secretKey missing"); + } + return AwsBasicCredentials.builder() + .accessKeyId(accessKey) + .secretAccessKey(secretKey) + .build(); + } + + @Bean(name = "s3BucketFilebase") + @Nullable + public String s3BucketFilebase(Optional s3Client) { + var bucket = cfgSrc.getProperty("celements.s3.bucket.filebase", "").trim(); + if (bucket.isEmpty()) { + LOGGER.info("S3 filebase bucket not configured"); + return null; + } + testBucket(s3Client, bucket); + LOGGER.info("S3 filebase bucket configured: {}", bucket); + return bucket; + } + + private void testBucket(Optional s3Client, String bucket) { + s3Client + .orElseThrow(() -> new IllegalStateException("S3 client not configured")) + .headBucket(builder -> builder.bucket(bucket)); + } + +} diff --git a/celements-s3/src/main/java/com/celements/store/s3/att/S3AttachmentContentStore.java b/celements-s3/src/main/java/com/celements/store/s3/att/S3AttachmentContentStore.java new file mode 100644 index 00000000..14acd387 --- /dev/null +++ b/celements-s3/src/main/java/com/celements/store/s3/att/S3AttachmentContentStore.java @@ -0,0 +1,182 @@ +package com.celements.store.s3.att; + +import java.util.List; +import java.util.Optional; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; + +import com.celements.servlet.NodeConfig.NodeIdentity; +import com.xpn.xwiki.doc.XWikiAttachment; +import com.xpn.xwiki.doc.XWikiAttachmentContent; +import com.xpn.xwiki.store.AttachmentContentStore; + +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Exception; + +@Component +@Named(S3AttachmentContentStore.STORE_NAME) +@Lazy +public class S3AttachmentContentStore implements AttachmentContentStore { + + private static final Logger LOGGER = LoggerFactory + .getLogger(S3AttachmentContentStore.class); + + public static final String STORE_NAME = "store.attachment.content.s3"; + + private final NodeIdentity nodeIdentity; + private final S3Client s3Client; + private final String s3BucketFilebase; + + @Inject + public S3AttachmentContentStore( + NodeIdentity nodeIdentity, + Optional s3Client, + @Named("s3BucketFilebase") Optional s3BucketFilebase) { + this.nodeIdentity = nodeIdentity; + this.s3Client = s3Client + .orElseThrow(() -> new IllegalStateException("S3Client missing")); + this.s3BucketFilebase = s3BucketFilebase + .orElseThrow(() -> new IllegalStateException("s3BucketFilebase missing")); + } + + @Override + public String getStoreName() { + return STORE_NAME; + } + + /** + * Builds the S3 key for the given attachment. The key structure is as follows: + * attachment/{appName}/{wikiName}/{docId}/{attachmentId} + */ + public String buildS3AttachmentKey(XWikiAttachment attachment) { + var doc = attachment.getDoc(); + var wiki = doc.getDocumentReference().getWikiReference(); + return String.join("/", + nodeIdentity.clusterName(), // allow bucket multi-tenancy by cluster name + "attachments", // subbucket for attachments + wiki.getName(), // identify wiki + Long.toString(doc.getId()), // identify document + Long.toString(attachment.getId())); // identify attachment + } + + /** + * Builds the S3 key for the given attachment. The key structure is as follows: + * attachment/{appName}/{wikiName}/{docId}/{attachmentId}/{version} + */ + public String buildS3AttachmentVersionKey(XWikiAttachment attachment) { + return String.join("/", + buildS3AttachmentKey(attachment), + attachment.getVersion()); // identify attachment version + } + + public boolean hasContent(XWikiAttachment attachment) throws AttachmentContentStoreException { + var s3Key = buildS3AttachmentVersionKey(attachment); + LOGGER.debug("hasContent - {} : {}", s3Key, attachment); + try { + s3Client.headObject(builder -> builder + .bucket(s3BucketFilebase) + .key(s3Key)); + return true; + } catch (NoSuchKeyException e) { + return false; + } catch (S3Exception e) { + throw new AttachmentContentStoreException(buildS3ErrorMessage(s3Key, e), e); + } catch (Exception e) { + throw new AttachmentContentStoreException("Failed checking attachment", e); + } + } + + @Override + public void saveContent(XWikiAttachmentContent content) throws AttachmentContentStoreException { + var s3Key = buildS3AttachmentVersionKey(content.getAttachment()); + LOGGER.info("saveContent - {} : {}", s3Key, content.getAttachment()); + try { + try (var data = content.getContentInputStream()) { + s3Client.putObject(builder -> builder + .bucket(s3BucketFilebase) + .key(s3Key) + .contentLength((long) content.getSize()) + .contentType(content.getAttachment().getMimeType()), + RequestBody.fromInputStream(data, content.getSize())); + } + } catch (S3Exception e) { + throw new AttachmentContentStoreException(buildS3ErrorMessage(s3Key, e), e); + } catch (Exception e) { + throw new AttachmentContentStoreException("Failed saving attachment", e); + } + } + + @Override + public void loadContent(XWikiAttachmentContent content) throws AttachmentContentStoreException { + var s3Key = buildS3AttachmentVersionKey(content.getAttachment()); + try { + try (var data = s3Client.getObject(builder -> builder + .bucket(s3BucketFilebase) + .key(s3Key))) { + content.setContent(data); + } + LOGGER.debug("loadContent - {} : {}", s3Key, content.getAttachment()); + } catch (NoSuchKeyException e) { + LOGGER.debug("loadContent - {} not found : {}", s3Key, content.getAttachment()); + throw new AttachmentContentStoreException("Attachment content not found in S3: " + s3Key, e); + } catch (S3Exception e) { + throw new AttachmentContentStoreException(buildS3ErrorMessage(s3Key, e), e); + } catch (Exception e) { + throw new AttachmentContentStoreException("Failed loading attachment", e); + } + } + + @Override + public void deleteContent(XWikiAttachment attachment) throws AttachmentContentStoreException { + var s3Prefix = buildS3AttachmentKey(attachment) + "/"; + LOGGER.info("deleteContent - {} : {}", s3Prefix, attachment); + List batch = s3Client.listObjectsV2(builder -> builder + .bucket(s3BucketFilebase) + .prefix(s3Prefix)) + .contents() + .stream() + .map(s3Object -> ObjectIdentifier.builder().key(s3Object.key()).build()) + .toList(); + if (batch.isEmpty()) { + return; + } else if (batch.size() >= 1000) { + throw new AttachmentContentStoreException( + "Too many objects to delete in S3 for attachment: " + attachment, null); + } + s3Client.deleteObjects(builder -> builder + .bucket(s3BucketFilebase) + .delete(deleteBuilder -> deleteBuilder.objects(batch))); + } + + @Override + public void deleteContent(XWikiAttachmentContent content) throws AttachmentContentStoreException { + var s3Key = buildS3AttachmentVersionKey(content.getAttachment()); + LOGGER.info("deleteContent - {} : {}", s3Key, content.getAttachment()); + try { + s3Client.deleteObject(builder -> builder + .bucket(s3BucketFilebase) + .key(s3Key)); + } catch (S3Exception e) { + throw new AttachmentContentStoreException(buildS3ErrorMessage(s3Key, e), e); + } catch (Exception e) { + throw new AttachmentContentStoreException("Failed deleting attachment", e); + } + } + + private static String buildS3ErrorMessage(String s3Key, S3Exception e) { + return String.format("S3 error for attachment (key=%s, status=%d, code=%s)", + s3Key, + e.statusCode(), + e.awsErrorDetails() != null ? e.awsErrorDetails().errorCode() : "n/a"); + } + +} diff --git a/celements-s3/src/main/java/com/celements/store/s3/att/migration/S3AttachmentContentMigrationService.java b/celements-s3/src/main/java/com/celements/store/s3/att/migration/S3AttachmentContentMigrationService.java new file mode 100644 index 00000000..9a966f9e --- /dev/null +++ b/celements-s3/src/main/java/com/celements/store/s3/att/migration/S3AttachmentContentMigrationService.java @@ -0,0 +1,172 @@ +package com.celements.store.s3.att.migration; + +import static com.celements.logging.LogUtils.*; + +import java.util.List; + +import javax.inject.Inject; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Service; +import org.xwiki.model.reference.DocumentReference; +import org.xwiki.model.reference.EntityReference; +import org.xwiki.model.reference.WikiReference; + +import com.celements.init.XWikiProvider; +import com.celements.model.access.IModelAccessFacade; +import com.celements.model.util.ModelUtils; +import com.celements.query.QueryExecutionService; +import com.celements.store.s3.att.S3AttachmentContentStore; +import com.xpn.xwiki.XWikiException; +import com.xpn.xwiki.doc.XWikiAttachment; +import com.xpn.xwiki.doc.XWikiDocument; +import com.xpn.xwiki.store.AttachmentContentStore.AttachmentContentStoreException; +import com.xpn.xwiki.store.AttachmentVersioningStore; +import com.xpn.xwiki.store.hibernate.HibernateAttachmentContentStore; + +import one.util.streamex.StreamEx; + +@Service +@Lazy +public class S3AttachmentContentMigrationService { + + static final Logger LOGGER = LoggerFactory.getLogger(S3AttachmentContentMigrationService.class); + + private final QueryExecutionService queryExecutor; + private final S3AttachmentContentStore s3AttContentStore; + private final HibernateAttachmentContentStore hibAttContentStore; + private final IModelAccessFacade modelAccess; + private final ModelUtils modelUtils; + private final XWikiProvider xwikiProvider; + + @Inject + public S3AttachmentContentMigrationService( + QueryExecutionService queryExecutor, + S3AttachmentContentStore s3AttContentStore, + HibernateAttachmentContentStore hibAttContentStore, + IModelAccessFacade modelAccess, + ModelUtils modelUtils, + XWikiProvider xwikiProvider) { + this.queryExecutor = queryExecutor; + this.s3AttContentStore = s3AttContentStore; + this.hibAttContentStore = hibAttContentStore; + this.modelAccess = modelAccess; + this.modelUtils = modelUtils; + this.xwikiProvider = xwikiProvider; + } + + public void migrate(WikiReference wiki, boolean cleanup) + throws XWikiException, AttachmentContentStoreException { + migrateArchive(wiki, cleanup); + // migrateRecycleBin(wiki); // TODO implement xwikiattrecyclebin migration + } + + private static final String SQL_ATTACHMENTS_WITH_CONTENT = "" + + "SELECT DISTINCT d.XWD_FULLNAME, a.XWA_FILENAME " + + "FROM xwikidoc d " + + "JOIN xwikiattachment a ON d.XWD_ID = a.XWA_DOC_ID " + + "JOIN xwikiattachment_content c ON a.XWA_ID = c.XWA_ID " + + "ORDER BY d.XWD_FULLNAME, a.XWA_FILENAME"; + + public void migrateArchive(WikiReference wiki, boolean cleanup) + throws XWikiException, AttachmentContentStoreException { + var result = queryExecutor.executeReadSql(wiki, String.class, SQL_ATTACHMENTS_WITH_CONTENT); + LOGGER.info("[{}] migrating {} attachments to S3 store", wiki.getName(), result.size()); + var countPushed = 0; + var countProcessed = 0; + var countCleaned = 0; + var countError = 0; + XWikiDocument doc = null; + for (List row : result) { + var docRef = modelUtils.resolveRef(row.get(0), DocumentReference.class, wiki); + if ((doc == null) || !doc.getDocumentReference().equals(docRef)) { + doc = modelAccess.getOrCreateDocument(docRef); + } + var att = doc.getAttachment(row.get(1)); + if (att != null) { + try { + countPushed += migrate(att); + if (cleanup) { + cleanup(att); // content moved to s3, cleanup content in db + countCleaned++; + } + } catch (XWikiException exc) { + countError++; + LOGGER.error("[{}] failed migrating {}", wiki.getName(), + serialize(att.getAttachmentReference()), exc); + } + } + countProcessed++; + if ((countProcessed % 100) == 0) { + LOGGER.info("[{}] processed {}/{} attachments", + wiki.getName(), countProcessed, result.size()); + } + } + LOGGER.info("[{}] migration finished: {} processed, {} failed, {} cleaned, {} pushed contents", + wiki.getName(), countProcessed, countError, countCleaned, countPushed); + } + + public int migrate(XWikiAttachment att) + throws XWikiException, AttachmentContentStoreException { + var count = 0; + var archive = att.loadArchive(); + for (var v : StreamEx.of(archive.getVersions()).append(att.getRCSVersion()).distinct()) { + try { + if (pushToS3(archive.getRevision(v))) { + count++; + } + } catch (XWikiException exc) { + throw new XWikiException(0, 0, "Failed migrating " + + serialize(att.getAttachmentReference()) + "@" + v, exc); + } + } + LOGGER.trace("[{}] migrated {} with {} contents", + defer(() -> att.getWikiReference().getName()), + defer(() -> serialize(att.getAttachmentReference())), + count); + return count; + } + + private boolean pushToS3(XWikiAttachment att) + throws XWikiException, AttachmentContentStoreException { + if (!s3AttContentStore.hasContent(att)) { + LOGGER.trace("[{}] pushToS3 {}", + defer(() -> att.getWikiReference().getName()), + defer(() -> s3AttContentStore.buildS3AttachmentVersionKey(att))); + s3AttContentStore.saveContent(att.loadContent()); + return true; + } + return false; + } + + // let's rebuild the archive without content blobs and delete the content + public void cleanup(XWikiAttachment att) throws XWikiException { + LOGGER.trace("[{}] cleanup {}", + defer(() -> att.getWikiReference().getName()), + defer(() -> serialize(att.getAttachmentReference()))); + hibAttContentStore.executeWrite(att.getWikiReference(), true, session -> { + try { + var archive = att.loadArchive(); + archive.rebuildArchive(false); + getAttachmentVersioningStore().saveArchive(archive, false); + hibAttContentStore.deleteContent(att); + } catch (AttachmentContentStoreException e) { + throw new XWikiException(0, 0, "Failed deleting content from fallback store for " + + serialize(att.getAttachmentReference()), e); + } + return null; + }); + } + + private String serialize(EntityReference ref) { + return modelUtils.serializeRefLocal(ref); + } + + private AttachmentVersioningStore getAttachmentVersioningStore() { + return xwikiProvider.get().orElseThrow(IllegalStateException::new) + .getAttachmentStore() + .getVersioningStore(); + } +}