diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java index 613b620d72..9447f127bb 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBOffsetRecoveryListenerTest.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.activemq.store.kahadb; import java.io.File; @@ -30,6 +29,7 @@ import org.apache.activemq.store.MessageRecoveryContext; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.util.IOHelper; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -57,13 +57,13 @@ public class KahaDBOffsetRecoveryListenerTest { @Rule public TestName testName = new TestName(); - protected final int PRETEST_MSG_COUNT = 17531; + protected final int PRETEST_MSG_COUNT = 7531; @Before public void beforeEach() throws Exception { // Send+Recv a odd number of messages beyond cache sizes // to confirm the queue's sequence number gets pushed off - sendMessages(PRETEST_MSG_COUNT, testName.getMethodName()); + sendMessages(PRETEST_MSG_COUNT, testName.getMethodName(), true); assertEquals(Integer.valueOf(PRETEST_MSG_COUNT), Integer.valueOf(receiveMessages(testName.getMethodName()))); } @@ -85,7 +85,7 @@ protected BrokerService createBroker(KahaDBStore kaha) throws Exception { private KahaDBStore createStore(boolean delete) throws IOException { KahaDBStore kaha = new KahaDBStore(); kaha.setJournalMaxFileLength(1024*100); - kaha.setDirectory(new File("target" + File.separator + "activemq-data" + File.separator + "kahadb-recovery-tests")); + kaha.setDirectory(new File(IOHelper.getDefaultDataDirectory(), "kahadb-recovery-tests")); if( delete ) { kaha.deleteAllMessages(); } @@ -99,8 +99,7 @@ protected void runOffsetTest(final int sendCount, final int expectedMessageCount protected void runOffsetLoopTest(final int sendCount, final int expectedMessageCount, final int recoverOffset, final int recoverCount, final int expectedRecoverCount, final int expectedRecoverIndex, final String queueName, final int loopCount, final boolean repeatExpected) throws Exception { KahaDBStore kahaDBStore = createStore(true); brokerService = createBroker(kahaDBStore); - sendMessages(sendCount, queueName); - + sendMessages(sendCount, queueName, false); MessageStore messageStore = kahaDBStore.createQueueMessageStore(new ActiveMQQueue(queueName)); int tmpExpectedRecoverCount = expectedRecoverCount; @@ -187,8 +186,12 @@ public void testOffsetRepeat() throws Exception { runOffsetLoopTest(10_000, 10_000, 7_000, 133, 133, 7_000, testName.getMethodName(), 10, true); } - private void sendMessages(int count, String queueName) throws JMSException { + private void sendMessages(int count, String queueName, boolean sendAsync) throws JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + if(sendAsync) { + cf.setProducerWindowSize(800000); + cf.setUseAsyncSend(true); + } cf.setWatchTopicAdvisories(false); Connection connection = cf.createConnection();