diff --git a/settings.xml b/settings.xml
index b5281bb65..9b5fb7aa5 100644
--- a/settings.xml
+++ b/settings.xml
@@ -52,7 +52,7 @@
github-fannypack
FannyPack github repository
- https://maven.pkg.github.com/Kowalski-IO/fannypack
+ https://maven.pkg.github.com/BrandonKowalski/fannypack
true
true
diff --git a/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java b/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java
index 8619a140a..5741e1313 100644
--- a/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java
+++ b/src/main/java/org/breedinginsight/api/v1/controller/ExperimentController.java
@@ -28,7 +28,9 @@
import org.breedinginsight.services.ProgramService;
import org.breedinginsight.services.ProgramUserService;
import org.breedinginsight.services.RoleService;
+import org.breedinginsight.services.exceptions.AlreadyExistsException;
import org.breedinginsight.services.exceptions.DoesNotExistException;
+import org.breedinginsight.services.exceptions.CreationBusyException;
import org.breedinginsight.utilities.response.mappers.ExperimentQueryMapper;
import javax.inject.Inject;
@@ -134,6 +136,12 @@ public HttpResponse> createSubEntityDataset(
Response response = new Response(experimentService.createSubEntityDataset(programOptional.get(), experimentId, datasetRequest));
return HttpResponse.ok(response);
+ } catch (AlreadyExistsException e) {
+ log.info(e.getMessage());
+ return HttpResponse.status(HttpStatus.CONFLICT, e.getMessage());
+ } catch (CreationBusyException e) {
+ log.info(e.getMessage());
+ return HttpResponse.status(HttpStatus.SERVICE_UNAVAILABLE, e.getMessage());
} catch (Exception e){
log.info(e.getMessage());
return HttpResponse.status(HttpStatus.UNPROCESSABLE_ENTITY, e.getMessage());
diff --git a/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationLevelDAO.java b/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationLevelDAO.java
new file mode 100644
index 000000000..e37e23dbe
--- /dev/null
+++ b/src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationLevelDAO.java
@@ -0,0 +1,99 @@
+/*
+ * See the NOTICE file distributed with this work for additional information
+ * regarding copyright ownership.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.breedinginsight.brapi.v2.dao;
+
+import com.google.gson.Gson;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.HttpUrl;
+import okhttp3.MediaType;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import org.brapi.client.v2.JSON;
+import org.brapi.client.v2.model.exceptions.ApiException;
+import org.breedinginsight.model.DatasetLevel;
+import org.breedinginsight.model.Program;
+import org.breedinginsight.utilities.BrAPIDAOUtil;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
+@Slf4j
+@Singleton
+public class BrAPIObservationLevelDAO {
+
+ private static final MediaType JSON_MEDIA_TYPE = MediaType.get("application/json");
+ private final BrAPIDAOUtil brAPIDAOUtil;
+ private final Gson gson = new JSON().getGson();
+
+ @Inject
+ public BrAPIObservationLevelDAO(BrAPIDAOUtil brAPIDAOUtil) {
+ this.brAPIDAOUtil = brAPIDAOUtil;
+ }
+
+ public HttpResponse createObservationLevelName(Program program, String levelName, DatasetLevel levelOrder, String programDbId) throws ApiException {
+ HttpUrl url = HttpUrl.parse(brAPIDAOUtil.getProgramBrAPIBaseUrl(program.getId()))
+ .newBuilder()
+ .addPathSegment("observationlevelnames")
+ .build();
+ JsonObject levelJson = new JsonObject();
+ levelJson.addProperty("levelName", levelName);
+ if (levelOrder != null) {
+ levelJson.addProperty("levelOrder", levelOrder.getValue());
+ }
+ if (programDbId != null) {
+ levelJson.addProperty("programDbId", programDbId);
+ }
+ JsonArray bodyArray = new JsonArray();
+ bodyArray.add(levelJson);
+ RequestBody body = RequestBody.create(gson.toJson(bodyArray), JSON_MEDIA_TYPE);
+ var request = new Request.Builder()
+ .url(url)
+ .post(body)
+ .addHeader("Content-Type", "application/json")
+ .build();
+ return brAPIDAOUtil.makeCall(request);
+ }
+
+ public void deleteObservationLevelName(Program program, String levelDbId) {
+ HttpUrl url = HttpUrl.parse(brAPIDAOUtil.getProgramBrAPIBaseUrl(program.getId()))
+ .newBuilder()
+ .addPathSegment("observationlevelnames")
+ .addPathSegment(levelDbId)
+ .build();
+ var request = new Request.Builder()
+ .url(url)
+ .delete()
+ .addHeader("Content-Type", "application/json")
+ .build();
+ try {
+ HttpResponse response = brAPIDAOUtil.makeCall(request);
+ if (response.getStatus() != HttpStatus.OK && response.getStatus() != HttpStatus.NO_CONTENT && response.getStatus() != HttpStatus.ACCEPTED) {
+ log.warn("Observation level delete returned status {} for {}", response.getStatus(), levelDbId);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to delete observation level {}", levelDbId, e);
+ }
+ }
+
+}
diff --git a/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java b/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java
index 9f5a93410..4c217ca66 100644
--- a/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java
+++ b/src/main/java/org/breedinginsight/brapi/v2/services/BrAPITrialService.java
@@ -6,6 +6,8 @@
import com.github.filosganga.geogson.model.positions.SinglePosition;
import com.google.gson.JsonObject;
import io.micronaut.context.annotation.Property;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.server.exceptions.InternalServerException;
import io.micronaut.http.server.types.files.StreamedFile;
@@ -34,16 +36,20 @@
import org.breedinginsight.model.Program;
import org.breedinginsight.model.*;
import org.breedinginsight.services.TraitService;
+import org.breedinginsight.services.exceptions.AlreadyExistsException;
import org.breedinginsight.services.exceptions.DoesNotExistException;
+import org.breedinginsight.services.exceptions.CreationBusyException;
import org.breedinginsight.services.parsers.experiment.ExperimentFileColumns;
import org.breedinginsight.utilities.*;
import org.jetbrains.annotations.NotNull;
+import org.breedinginsight.services.lock.DistributedLockService;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.time.Duration;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@@ -51,6 +57,7 @@
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
+import java.util.concurrent.TimeoutException;
import static org.breedinginsight.brapps.importer.services.processors.experiment.model.ExpImportProcessConstants.OBSERVATION_UNIT_ID_SUFFIX;
@@ -67,8 +74,10 @@ public class BrAPITrialService {
private final BrAPIStudyDAO studyDAO;
private final BrAPISeasonDAO seasonDAO;
private final BrAPIObservationUnitDAO ouDAO;
+ private final BrAPIObservationLevelDAO observationLevelDAO;
private final BrAPIGermplasmDAO germplasmDAO;
private final FileMappingUtil fileMappingUtil;
+ private final DistributedLockService lockService;
private static final String SHEET_NAME = "Data";
@Inject
@@ -81,8 +90,10 @@ public BrAPITrialService(@Property(name = "brapi.server.reference-source") Strin
BrAPIStudyDAO studyDAO,
BrAPISeasonDAO seasonDAO,
BrAPIObservationUnitDAO ouDAO,
+ BrAPIObservationLevelDAO observationLevelDAO,
BrAPIGermplasmDAO germplasmDAO,
- FileMappingUtil fileMappingUtil) {
+ FileMappingUtil fileMappingUtil,
+ DistributedLockService lockService) {
this.referenceSource = referenceSource;
this.trialDAO = trialDAO;
@@ -93,8 +104,10 @@ public BrAPITrialService(@Property(name = "brapi.server.reference-source") Strin
this.studyDAO = studyDAO;
this.seasonDAO = seasonDAO;
this.ouDAO = ouDAO;
+ this.observationLevelDAO = observationLevelDAO;
this.germplasmDAO = germplasmDAO;
this.fileMappingUtil = fileMappingUtil;
+ this.lockService = lockService;
}
public List getExperiments(UUID programId) throws ApiException, DoesNotExistException {
@@ -414,58 +427,99 @@ public List getDatasetsMetadata(Program program, UUID experimen
return datasets;
}
- public Dataset createSubEntityDataset(Program program, UUID experimentId, SubEntityDatasetRequest request) throws ApiException, DoesNotExistException {
- log.debug("creating sub-entity dataset: \"" + request.getName() + "\" for experiment: \"" + experimentId + "\" with: \"" + request.getRepeatedMeasures() + "\" repeated measures.");
- UUID subEntityDatasetId = UUID.randomUUID();
- List subObsUnits = new ArrayList<>();
- BrAPITrial experiment = getExperiment(program, experimentId);
- // Get top level dataset ObservationUnits.
- DatasetMetadata topLevelDataset = DatasetUtil.getTopLevelDataset(experiment);
- if (topLevelDataset == null) {
- log.error("Experiment {} has no top level dataset.", experiment.getTrialDbId());
- throw new RuntimeException("Cannot create sub-entity dataset for experiment without top level dataset.");
- }
-
- List expOUs = ouDAO.getObservationUnitsForDataset(topLevelDataset.getId().toString(), program);
- for (BrAPIObservationUnit expUnit : expOUs) {
-
- // Get environment number from study.
- String envSeqValue = studyDAO.getStudyByDbId(expUnit.getStudyDbId(), program).orElseThrow()
- .getAdditionalInfo().get(BrAPIAdditionalInfoFields.ENVIRONMENT_NUMBER).getAsString();
-
- for (int i=1; i<=request.getRepeatedMeasures(); i++) {
- // Create subObsUnit and add to list.
- subObsUnits.add(
- createSubObservationUnit(
- request.getName(),
- Integer.toString(i),
- program,
- envSeqValue,
- expUnit,
- this.referenceSource,
- subEntityDatasetId,
- UUID.randomUUID()
- )
- );
- }
- }
+ /**
+ * Creates sub-entity dataset
+ * TODO: Handle compensating transactions in event of failure. Currently brapi server does not support
+ * deleting observation units. Will need to add batch delete support for observation units before this
+ * can be done.
+ *
+ * @param program
+ * @param experimentId
+ * @param request
+ * @return
+ * @throws ApiException
+ * @throws DoesNotExistException
+ * @throws AlreadyExistsException
+ * @throws CreationBusyException
+ */
+ public Dataset createSubEntityDataset(Program program, UUID experimentId, SubEntityDatasetRequest request)
+ throws ApiException, DoesNotExistException, AlreadyExistsException, CreationBusyException {
+ final String datasetName = request.getName().trim().toLowerCase();
+ String lockKey = String.format("sub-entity-dataset:%s", experimentId);
+ try {
+ return lockService.withLock(lockKey, Duration.ofSeconds(30), Duration.ofMinutes(5), () -> {
+ log.debug("creating sub-entity dataset: \"{}\" for experiment: \"{}\" with: \"{}\" repeated measures.", datasetName, experimentId, request.getRepeatedMeasures());
+ UUID subEntityDatasetId = UUID.randomUUID();
+ List subObsUnits = new ArrayList<>();
+ BrAPITrial experiment = getExperiment(program, experimentId);
+ DatasetMetadata topLevelDataset = DatasetUtil.getTopLevelDataset(experiment);
+ if (topLevelDataset == null) {
+ log.error("Experiment {} has no top level dataset.", experiment.getTrialDbId());
+ throw new RuntimeException("Cannot create sub-entity dataset for experiment without top level dataset.");
+ }
+
+ List existingDatasets = DatasetUtil.datasetsFromJson(experiment.getAdditionalInfo().getAsJsonArray(BrAPIAdditionalInfoFields.DATASETS));
+ if (existingDatasets.stream().anyMatch(dataset -> dataset.getName().equalsIgnoreCase(datasetName))) {
+ throw new AlreadyExistsException("Dataset name already exists in this experiment");
+ }
- List createdObservationUnits = observationUnitDAO.createBrAPIObservationUnits(subObsUnits, program.getId());
+ String programDbId = program.getBrapiProgram() != null ? program.getBrapiProgram().getProgramDbId() : null;
+ HttpResponse levelResponse = observationLevelDAO.createObservationLevelName(program, datasetName, DatasetLevel.SUB_OBS_UNIT, programDbId);
+ if (levelResponse.getStatus() == HttpStatus.CONFLICT) {
+ throw new AlreadyExistsException("Dataset name already exists in this experiment");
+ } else if (levelResponse.getStatus().getCode() < 200 || levelResponse.getStatus().getCode() >= 300) {
+ throw new ApiException(levelResponse.getStatus().getCode(), "Unable to create observation level: " + levelResponse.getStatus().getReason());
+ }
+
+ List expOUs = ouDAO.getObservationUnitsForDataset(topLevelDataset.getId().toString(), program);
+ for (BrAPIObservationUnit expUnit : expOUs) {
+
+ String envSeqValue = studyDAO.getStudyByDbId(expUnit.getStudyDbId(), program).orElseThrow()
+ .getAdditionalInfo().get(BrAPIAdditionalInfoFields.ENVIRONMENT_NUMBER).getAsString();
+
+ for (int i=1; i<=request.getRepeatedMeasures(); i++) {
+ subObsUnits.add(
+ createSubObservationUnit(
+ datasetName,
+ Integer.toString(i),
+ program,
+ envSeqValue,
+ expUnit,
+ this.referenceSource,
+ subEntityDatasetId,
+ UUID.randomUUID()
+ )
+ );
+ }
+ }
- // Add the new dataset metadata to the datasets array in the trial's additionalInfo.
- DatasetMetadata subEntityDatasetMetadata = DatasetMetadata.builder()
- .id(subEntityDatasetId)
- .name(request.getName())
- .level(DatasetLevel.SUB_OBS_UNIT)
- .build();
- List datasets = DatasetUtil.datasetsFromJson(experiment.getAdditionalInfo().getAsJsonArray(BrAPIAdditionalInfoFields.DATASETS));
- datasets.add(subEntityDatasetMetadata);
- experiment.getAdditionalInfo().add(BrAPIAdditionalInfoFields.DATASETS, DatasetUtil.jsonArrayFromDatasets(datasets));
- // Ask the DAO to persist the updated trial.
- trialDAO.updateBrAPITrial(experiment.getTrialDbId(), experiment, program.getId());
+ observationUnitDAO.createBrAPIObservationUnits(subObsUnits, program.getId());
- // Return the new dataset.
- return getDatasetData(program, experimentId, subEntityDatasetId, false);
+ DatasetMetadata subEntityDatasetMetadata = DatasetMetadata.builder()
+ .id(subEntityDatasetId)
+ .name(datasetName)
+ .level(DatasetLevel.SUB_OBS_UNIT)
+ .build();
+
+ // Refresh experiment so we merge with the latest dataset metadata and avoid clobbering concurrent updates.
+ BrAPITrial latestExperiment = getExperiment(program, experimentId);
+ List datasets = DatasetUtil.datasetsFromJson(latestExperiment.getAdditionalInfo().getAsJsonArray(BrAPIAdditionalInfoFields.DATASETS));
+ if (datasets.stream().anyMatch(dataset -> dataset.getName().equalsIgnoreCase(datasetName))) {
+ throw new AlreadyExistsException("Dataset name already exists in this experiment");
+ }
+ datasets.add(subEntityDatasetMetadata);
+ latestExperiment.getAdditionalInfo().add(BrAPIAdditionalInfoFields.DATASETS, DatasetUtil.jsonArrayFromDatasets(datasets));
+ trialDAO.updateBrAPITrial(latestExperiment.getTrialDbId(), latestExperiment, program.getId());
+
+ return getDatasetData(program, experimentId, subEntityDatasetId, false);
+ });
+ } catch (TimeoutException e) {
+ throw new CreationBusyException("Dataset creation is busy, please retry");
+ } catch (ApiException | DoesNotExistException | AlreadyExistsException | CreationBusyException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected error creating sub-entity dataset", e);
+ }
}
public BrAPIObservationUnit createSubObservationUnit(
@@ -535,6 +589,7 @@ public BrAPIObservationUnit createSubObservationUnit(
// Put level in additional info: keep this in case we decide to rename levels in future.
observationUnit.putAdditionalInfoItem(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL, subEntityDatasetName);
+
// Put RTK in additional info.
JsonElement rtk = expUnit.getAdditionalInfo().get(BrAPIAdditionalInfoFields.RTK);
if (rtk != null) {
@@ -554,14 +609,14 @@ public BrAPIObservationUnit createSubObservationUnit(
// ObservationLevel entry for Sub-Obs Unit.
BrAPIObservationUnitLevelRelationship level = new BrAPIObservationUnitLevelRelationship();
- // TODO: consider removing toLowerCase() after BI-2219 is implemented.
- level.setLevelName(subEntityDatasetName.toLowerCase());
+ level.setLevelName(subEntityDatasetName);
level.setLevelCode(Utilities.appendProgramKey(subUnitId, program.getKey(), seqVal));
level.setLevelOrder(DatasetLevel.SUB_OBS_UNIT.getValue());
position.setObservationLevel(level);
// ObservationLevelRelationships.
List levelRelationships = new ArrayList<>();
+ levelRelationships.add(level);
// ObservationLevelRelationships for block.
BrAPIObservationUnitLevelRelationship expBlockLevel = expUnit.getObservationUnitPosition()
.getObservationLevelRelationships().stream()
@@ -586,8 +641,7 @@ public BrAPIObservationUnit createSubObservationUnit(
}
// ObservationLevelRelationships for top-level Exp Unit linking.
BrAPIObservationUnitLevelRelationship expUnitLevel = new BrAPIObservationUnitLevelRelationship();
- // TODO: consider removing toLowerCase() after BI-2219 is implemented.
- expUnitLevel.setLevelName(expUnit.getAdditionalInfo().get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString().toLowerCase());
+ expUnitLevel.setLevelName(expUnit.getAdditionalInfo().get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString());
String expUnitUUID = Utilities.getExternalReference(expUnit.getExternalReferences(), referenceSource, ExternalReferenceSource.OBSERVATION_UNITS).orElseThrow().getReferenceId();
expUnitLevel.setLevelCode(Utilities.appendProgramKey(expUnitUUID, program.getKey(), seqVal));
expUnitLevel.setLevelOrder(DatasetLevel.EXP_UNIT.getValue());
@@ -602,6 +656,29 @@ public BrAPIObservationUnit createSubObservationUnit(
return observationUnit;
}
+ private String getObservationLevelName(BrAPIObservationUnit observationUnit) {
+ if (observationUnit.getObservationUnitPosition() != null
+ && observationUnit.getObservationUnitPosition().getObservationLevel() != null
+ && StringUtils.isNotBlank(observationUnit.getObservationUnitPosition().getObservationLevel().getLevelName())) {
+ return observationUnit.getObservationUnitPosition().getObservationLevel().getLevelName();
+ }
+ JsonObject additionalInfo = observationUnit.getAdditionalInfo();
+ if (additionalInfo != null
+ && additionalInfo.has(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL)
+ && !additionalInfo.get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).isJsonNull()) {
+ return additionalInfo.get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString();
+ }
+ return null;
+ }
+
+ private String requireObservationLevelName(BrAPIObservationUnit observationUnit) {
+ String levelName = getObservationLevelName(observationUnit);
+ if (StringUtils.isBlank(levelName)) {
+ throw new RuntimeException("Observation level not found for observation unit " + observationUnit.getObservationUnitDbId());
+ }
+ return levelName;
+ }
+
private void addBrAPIObsToRecords(
List dataset,
BrAPITrial experiment,
@@ -773,6 +850,7 @@ private Map createExportRow(
row.put(ExperimentObservation.Columns.TEST_CHECK, testCheck);
row.put(ExperimentObservation.Columns.EXP_TITLE, Utilities.removeProgramKey(experiment.getTrialName(), program.getKey()));
row.put(ExperimentObservation.Columns.EXP_DESCRIPTION, experiment.getTrialDescription());
+
row.put(ExperimentObservation.Columns.EXP_TYPE, experiment.getAdditionalInfo().getAsJsonObject().get(BrAPIAdditionalInfoFields.EXPERIMENT_TYPE).getAsString());
row.put(ExperimentObservation.Columns.ENV, Utilities.removeProgramKeyAndUnknownAdditionalData(study.getStudyName(), program.getKey()));
row.put(ExperimentObservation.Columns.ENV_LOCATION, Utilities.removeProgramKey(study.getLocationName(), program.getKey()));
@@ -828,7 +906,7 @@ private Map createExportRow(
}
//Append observation level to obsUnitID
- String observationLvl = ou.getAdditionalInfo().getAsJsonObject().get(BrAPIAdditionalInfoFields.OBSERVATION_LEVEL).getAsString();
+ String observationLvl = requireObservationLevelName(ou);
row.put(observationLvl + " " + OBSERVATION_UNIT_ID_SUFFIX, ouId);
if (isSubEntity) {
diff --git a/src/main/java/org/breedinginsight/services/exceptions/CreationBusyException.java b/src/main/java/org/breedinginsight/services/exceptions/CreationBusyException.java
new file mode 100644
index 000000000..d32536159
--- /dev/null
+++ b/src/main/java/org/breedinginsight/services/exceptions/CreationBusyException.java
@@ -0,0 +1,7 @@
+package org.breedinginsight.services.exceptions;
+
+public class CreationBusyException extends Exception {
+ public CreationBusyException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/org/breedinginsight/services/lock/DistributedLockService.java b/src/main/java/org/breedinginsight/services/lock/DistributedLockService.java
new file mode 100644
index 000000000..06f141f33
--- /dev/null
+++ b/src/main/java/org/breedinginsight/services/lock/DistributedLockService.java
@@ -0,0 +1,61 @@
+package org.breedinginsight.services.lock;
+
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RLock;
+import org.redisson.api.RedissonClient;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Small helper to provide a consistent pattern for distributed locks across the service layer.
+ */
+@Slf4j
+@Singleton
+public class DistributedLockService {
+
+ private final RedissonClient redissonClient;
+
+ @Inject
+ public DistributedLockService(RedissonClient redissonClient) {
+ this.redissonClient = redissonClient;
+ }
+
+ /**
+ * Execute the given callback guarded by a distributed lock.
+ *
+ * @param lockKey the key for the distributed lock
+ * @param waitTime how long to wait to acquire the lock
+ * @param leaseTime how long before the lock automatically releases
+ * @param action the work to run while holding the lock
+ * @return result of the callback
+ * @throws TimeoutException if the lock cannot be acquired within the wait time
+ * @throws Exception bubbled up from the callback
+ */
+ public T withLock(String lockKey, Duration waitTime, Duration leaseTime, Callable action) throws Exception {
+ RLock lock = redissonClient.getLock(lockKey);
+ boolean acquired = false;
+ try {
+ acquired = lock.tryLock(waitTime.toMillis(), leaseTime.toMillis(), TimeUnit.MILLISECONDS);
+ if (!acquired) {
+ throw new TimeoutException("Unable to acquire lock " + lockKey);
+ }
+ return action.call();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TimeoutException("Interrupted while acquiring lock " + lockKey);
+ } finally {
+ if (acquired && lock.isHeldByCurrentThread()) {
+ try {
+ lock.unlock();
+ } catch (Exception e) {
+ log.warn("Failed to release lock {}", lockKey, e);
+ }
+ }
+ }
+ }
+}
diff --git a/src/test/java/org/breedinginsight/brapi/v2/SubEntityDatasetLockIntegrationTest.java b/src/test/java/org/breedinginsight/brapi/v2/SubEntityDatasetLockIntegrationTest.java
new file mode 100644
index 000000000..c3fcbd1c9
--- /dev/null
+++ b/src/test/java/org/breedinginsight/brapi/v2/SubEntityDatasetLockIntegrationTest.java
@@ -0,0 +1,111 @@
+package org.breedinginsight.brapi.v2;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.micronaut.context.annotation.Property;
+import io.micronaut.http.HttpRequest;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.HttpStatus;
+import io.micronaut.http.MediaType;
+import io.micronaut.http.client.RxHttpClient;
+import io.micronaut.http.client.annotation.Client;
+import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
+import io.reactivex.Flowable;
+import org.breedinginsight.BrAPITest;
+import org.breedinginsight.model.Program;
+import org.junit.jupiter.api.*;
+
+import javax.inject.Inject;
+import java.time.OffsetDateTime;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@MicronautTest
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class SubEntityDatasetLockIntegrationTest extends BrAPITest {
+
+ private Program program;
+ private String experimentId;
+
+ @Inject
+ private BrAPITestUtils brAPITestUtils;
+
+ @Inject
+ @Client("/${micronaut.bi.api.version}")
+ private RxHttpClient client;
+
+ private final Gson gson = new GsonBuilder()
+ .registerTypeAdapter(OffsetDateTime.class, (JsonDeserializer) (json, type, context) -> OffsetDateTime.parse(json.getAsString()))
+ .create();
+
+ @BeforeAll
+ void setup() throws Exception {
+ var setup = brAPITestUtils.setupTestProgram(super.getBrapiDsl(), gson);
+ program = setup.getV1();
+ experimentId = setup.getV2().get(0);
+ }
+
+ @Test
+ void concurrentDatasetCreateReturnsSingleSuccessAndConflict() throws Exception {
+ // Use a fresh name to avoid interference with other runs
+ String datasetName = "LockTest-" + UUID.randomUUID();
+ JsonObject request = new JsonObject();
+ request.addProperty("name", datasetName);
+ request.addProperty("repeatedMeasures", 1);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ CountDownLatch start = new CountDownLatch(1);
+
+ Callable call = () -> {
+ start.await(1, TimeUnit.SECONDS);
+ Flowable> response = client.exchange(
+ HttpRequest.POST(String.format("/programs/%s/experiments/%s/dataset", program.getId(), experimentId), request.toString())
+ .contentType(MediaType.APPLICATION_JSON)
+ .bearerAuth("test-registered-user"),
+ String.class
+ );
+ return response.blockingFirst().getStatus();
+ };
+
+ Future first = executor.submit(call);
+ Future second = executor.submit(call);
+ start.countDown();
+
+ HttpStatus status1 = first.get(10, TimeUnit.SECONDS);
+ HttpStatus status2 = second.get(10, TimeUnit.SECONDS);
+ executor.shutdownNow();
+
+ List statuses = List.of(status1, status2);
+ assertTrue(statuses.contains(HttpStatus.OK));
+ assertTrue(statuses.contains(HttpStatus.CONFLICT));
+
+ // Confirm only one dataset with that name exists
+ Flowable> datasetsCall = client.exchange(
+ HttpRequest.GET(String.format("/programs/%s/experiments/%s/datasets", program.getId(), experimentId))
+ .bearerAuth("test-registered-user"),
+ String.class
+ );
+ HttpResponse datasetsResponse = datasetsCall.blockingFirst();
+ assertEquals(HttpStatus.OK, datasetsResponse.getStatus());
+ var datasetsJson = JsonParser.parseString(Objects.requireNonNull(datasetsResponse.body())).getAsJsonObject()
+ .getAsJsonObject("result")
+ .getAsJsonArray("data");
+ long matching = 0;
+ for (int i = 0; i < datasetsJson.size(); i++) {
+ String name = datasetsJson.get(i).getAsJsonObject().get("name").getAsString();
+ if (name.equalsIgnoreCase(datasetName)) {
+ matching++;
+ }
+ }
+ assertEquals(1, matching);
+ }
+}
diff --git a/src/test/java/org/breedinginsight/services/lock/DistributedLockServiceTest.java b/src/test/java/org/breedinginsight/services/lock/DistributedLockServiceTest.java
new file mode 100644
index 000000000..bb3275beb
--- /dev/null
+++ b/src/test/java/org/breedinginsight/services/lock/DistributedLockServiceTest.java
@@ -0,0 +1,59 @@
+package org.breedinginsight.services.lock;
+
+import org.breedinginsight.DatabaseTest;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class DistributedLockServiceTest extends DatabaseTest {
+
+ private final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ @AfterEach
+ void cleanup() {
+ executor.shutdownNow();
+ }
+
+ @Test
+ void secondLockAttemptTimesOutWhileFirstHolds() throws Exception {
+ DistributedLockService lockService = new DistributedLockService(super.getRedisConnection());
+ String lockKey = "test-lock-key";
+
+ CountDownLatch firstAcquired = new CountDownLatch(1);
+ CountDownLatch releaseFirst = new CountDownLatch(1);
+
+ Future firstCall = executor.submit(() ->
+ lockService.withLock(lockKey, Duration.ofMillis(500), Duration.ofSeconds(5), () -> {
+ firstAcquired.countDown();
+ // keep the lock held until signaled
+ releaseFirst.await(2, TimeUnit.SECONDS);
+ return "first";
+ })
+ );
+
+ assertTrue(firstAcquired.await(1, TimeUnit.SECONDS), "First lock holder did not start in time");
+
+ assertThrows(TimeoutException.class, () ->
+ lockService.withLock(lockKey, Duration.ofMillis(100), Duration.ofSeconds(2), () -> "second")
+ );
+
+ releaseFirst.countDown();
+ assertEquals("first", firstCall.get(2, TimeUnit.SECONDS));
+
+ String afterRelease = lockService.withLock(lockKey, Duration.ofMillis(500), Duration.ofSeconds(2), () -> "after-release");
+ assertEquals("after-release", afterRelease);
+ }
+}