diff --git a/distribution/pom.xml b/distribution/pom.xml
index 38c4602d1135b..025a67b379d66 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -53,6 +53,12 @@
library-udf
2.0.7-SNAPSHOT
+
+ org.apache.iotdb
+ mqtt
+ 2.0.7-SNAPSHOT
+ provided
+
@@ -74,6 +80,7 @@
src/assembly/confignode.xml
src/assembly/cli.xml
src/assembly/library-udf.xml
+ src/assembly/external-service-impl.xml
apache-iotdb-${project.version}
@@ -107,6 +114,7 @@
apache-iotdb-${project.version}-cli-bin.zip
apache-iotdb-${project.version}-confignode-bin.zip
apache-iotdb-${project.version}-library-udf-bin.zip
+ apache-iotdb-${project.version}-external-service-impl-bin.zip
diff --git a/distribution/src/assembly/all.xml b/distribution/src/assembly/all.xml
index b77f71a32dcd9..255912d75e611 100644
--- a/distribution/src/assembly/all.xml
+++ b/distribution/src/assembly/all.xml
@@ -92,6 +92,10 @@
conf
0755
+
+ ${maven.multiModuleProjectDirectory}/external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar
+ lib
+
common-files.xml
diff --git a/distribution/src/assembly/datanode.xml b/distribution/src/assembly/datanode.xml
index 016059a903a4f..8994667fb873a 100644
--- a/distribution/src/assembly/datanode.xml
+++ b/distribution/src/assembly/datanode.xml
@@ -74,6 +74,12 @@
0755
+
+
+ ${maven.multiModuleProjectDirectory}/external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar
+ lib
+
+
common-files.xml
diff --git a/distribution/src/assembly/external-service-impl.xml b/distribution/src/assembly/external-service-impl.xml
new file mode 100644
index 0000000000000..273efe6eddfb5
--- /dev/null
+++ b/distribution/src/assembly/external-service-impl.xml
@@ -0,0 +1,51 @@
+
+
+
+ external-service-impl-bin
+
+ dir
+ zip
+
+ apache-iotdb-${project.version}-external-service-impl-bin
+
+
+ ${project.basedir}/../licenses
+ licenses
+
+
+
+
+ ${project.basedir}/../LICENSE-binary
+ licenses
+ LICENSE
+
+
+ ${project.basedir}/../NOTICE-binary
+ licenses
+ NOTICE
+
+
+ ${maven.multiModuleProjectDirectory}/external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar
+ /
+
+
+
diff --git a/example/mqtt-customize/pom.xml b/example/mqtt-customize/pom.xml
index 2624571f4e87d..6d0ba2d0953d5 100644
--- a/example/mqtt-customize/pom.xml
+++ b/example/mqtt-customize/pom.xml
@@ -29,7 +29,11 @@
customize-mqtt-example
IoTDB: Example: Customized MQTT
-
+
+ org.apache.iotdb
+ mqtt
+ ${project.version}
+
org.apache.iotdb
iotdb-server
diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
index 8c3a962173985..b16c4928a0b7a 100644
--- a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
+++ b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.mqtt.server;
-import org.apache.iotdb.db.protocol.mqtt.Message;
-import org.apache.iotdb.db.protocol.mqtt.PayloadFormatter;
-import org.apache.iotdb.db.protocol.mqtt.TreeMessage;
+import org.apache.iotdb.mqtt.Message;
+import org.apache.iotdb.mqtt.PayloadFormatter;
+import org.apache.iotdb.mqtt.TreeMessage;
import io.netty.buffer.ByteBuf;
import org.apache.tsfile.external.commons.lang3.NotImplementedException;
diff --git a/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter
similarity index 100%
rename from example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
rename to example/mqtt-customize/src/main/resources/META-INF/services/org.apache.iotdb.PayloadFormatter
diff --git a/external-service-impl/mqtt/pom.xml b/external-service-impl/mqtt/pom.xml
new file mode 100644
index 0000000000000..5ec51069b212e
--- /dev/null
+++ b/external-service-impl/mqtt/pom.xml
@@ -0,0 +1,177 @@
+
+
+
+ 4.0.0
+
+ org.apache.iotdb
+ external-service-impl
+ 2.0.7-SNAPSHOT
+
+ mqtt
+ IoTDB: External-Service-Impl: MQTT
+
+ 8
+ 8
+ UTF-8
+
+
+
+ com.github.moquette-io.moquette
+ moquette-broker
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+ io.netty
+ netty-buffer
+
+
+ io.netty
+ netty-common
+
+
+ io.netty
+ netty-transport
+
+
+ io.netty
+ netty-resolver
+
+
+ io.netty
+ netty-handler
+
+
+ io.netty
+ netty-transport-native-unix-common
+
+
+ io.netty
+ netty-codec
+
+
+ io.netty
+ netty-codec-http
+
+
+
+
+ org.apache.iotdb
+ iotdb-server
+ provided
+ 2.0.7-SNAPSHOT
+
+
+ com.google.code.gson
+ gson
+ provided
+
+
+ io.netty
+ netty-buffer
+ provided
+
+
+ org.apache.iotdb
+ iotdb-thrift-commons
+ 2.0.7-SNAPSHOT
+ provided
+
+
+ org.apache.tsfile
+ tsfile
+ ${tsfile.version}
+ provided
+
+
+ org.apache.tsfile
+ common
+ ${tsfile.version}
+ provided
+
+
+ com.google.guava
+ guava
+ provided
+
+
+ org.apache.iotdb
+ iotdb-thrift
+ 2.0.7-SNAPSHOT
+ provided
+
+
+ org.apache.iotdb
+ node-commons
+ 2.0.7-SNAPSHOT
+ provided
+
+
+ org.slf4j
+ slf4j-api
+ provided
+
+
+ org.apache.iotdb
+ service-rpc
+ 2.0.7-SNAPSHOT
+ provided
+
+
+
+
+
+ maven-assembly-plugin
+ ${maven.assembly.version}
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+
+
+ single
+
+
+ package
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ io.netty:netty-codec-mqtt
+ org.apache.tsfile:common
+
+
+
+
+
+
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java
similarity index 97%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java
index a05c8264b8225..14647a8582f04 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticator.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/BrokerAuthenticator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java
similarity index 99%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java
index cc857b7295cf1..7a348c850b42d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/JSONPayloadFormatter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java
similarity index 99%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java
index 8b596ee2c89ce..f80c3eb0b5e4f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/LinePayloadFormatter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import io.netty.buffer.ByteBuf;
import org.apache.tsfile.enums.TSDataType;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java
similarity index 99%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java
index 443b50160aa42..b13b1e89934a1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MPPPublishHandler.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java
similarity index 84%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java
index c6cc3fa47ee10..3b2be5ac3f4f1 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/MQTTService.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/MQTTService.java
@@ -16,14 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.service;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
+package org.apache.iotdb.mqtt;
+
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.protocol.mqtt.BrokerAuthenticator;
-import org.apache.iotdb.db.protocol.mqtt.MPPPublishHandler;
+import org.apache.iotdb.externalservice.api.IExternalService;
import io.moquette.BrokerConstants;
import io.moquette.broker.Server;
@@ -40,12 +38,10 @@
import java.util.Properties;
/** The IoTDB MQTT Service. */
-public class MQTTService implements IService {
+public class MQTTService implements IExternalService {
private static final Logger LOG = LoggerFactory.getLogger(MQTTService.class);
private final Server server = new Server();
- private MQTTService() {}
-
@Override
public void start() {
startup();
@@ -106,20 +102,4 @@ private IConfig createBrokerConfig(IoTDBConfig iotDBConfig) {
public void shutdown() {
server.stopServer();
}
-
- @Override
- public ServiceType getID() {
- return ServiceType.MQTT_SERVICE;
- }
-
- public static MQTTService getInstance() {
- return MQTTServiceHolder.INSTANCE;
- }
-
- private static class MQTTServiceHolder {
-
- private static final MQTTService INSTANCE = new MQTTService();
-
- private MQTTServiceHolder() {}
- }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java
similarity index 96%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java
index ba31d86976035..9a4311754533f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/Message.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/Message.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
/** Generic parsing of messages */
public class Message {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java
similarity index 99%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java
index c0b48539cd744..7bb051a852692 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManager.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatManager.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java
similarity index 97%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java
index c86648ac16136..672c512d54a21 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/PayloadFormatter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import io.netty.buffer.ByteBuf;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java
similarity index 98%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java
index b8aec19da5873..5c8fcc5d708d6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TableMessage.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TableMessage.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import org.apache.tsfile.enums.TSDataType;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java
similarity index 97%
rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
rename to external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java
index 9416ea3c83805..07ff8ed0b3d84 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/TreeMessage.java
+++ b/external-service-impl/mqtt/src/main/java/org/apache/iotdb/mqtt/TreeMessage.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+package org.apache.iotdb.mqtt;
import org.apache.tsfile.enums.TSDataType;
diff --git a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter b/external-service-impl/mqtt/src/main/resources/META-INF/services/org.apache.iotdb.mqtt.PayloadFormatter
similarity index 87%
rename from iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
rename to external-service-impl/mqtt/src/main/resources/META-INF/services/org.apache.iotdb.mqtt.PayloadFormatter
index 488d6d02d5039..f42aa83ff8678 100644
--- a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.protocol.mqtt.PayloadFormatter
+++ b/external-service-impl/mqtt/src/main/resources/META-INF/services/org.apache.iotdb.mqtt.PayloadFormatter
@@ -17,5 +17,5 @@
# under the License.
#
-org.apache.iotdb.db.protocol.mqtt.JSONPayloadFormatter
-org.apache.iotdb.db.protocol.mqtt.LinePayloadFormatter
+org.apache.iotdb.mqtt.JSONPayloadFormatter
+org.apache.iotdb.mqtt.LinePayloadFormatter
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java
similarity index 77%
rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java
index 1f66b517e9196..6884a644fedc0 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/BrokerAuthenticatorTest.java
+++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/BrokerAuthenticatorTest.java
@@ -15,29 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
-import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
+package org.apache.iotdb.mqtt;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-
public class BrokerAuthenticatorTest {
- @Before
- public void before() {
- EnvironmentUtils.envSetUp();
- }
-
- @After
- public void after() throws IOException, StorageEngineException {
- EnvironmentUtils.cleanEnv();
- }
-
@Test
public void checkValid() {
// In the previous implementation, the datanode will init a root file,
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java
similarity index 99%
rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java
index deecf607d8162..eab4b42243d1e 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java
+++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/JSONPayloadFormatterTest.java
@@ -15,7 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+
+package org.apache.iotdb.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java
similarity index 98%
rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java
index 7bf9bce0702d3..16ed807c9ae69 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java
+++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/LinePayloadFormatterTest.java
@@ -15,7 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
+
+package org.apache.iotdb.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java
similarity index 84%
rename from iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
rename to external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java
index 096f5d0d90d2e..afbe88cf047a2 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatManagerTest.java
+++ b/external-service-impl/mqtt/src/test/java/org/apache/iotdb/mqtt/PayloadFormatManagerTest.java
@@ -15,20 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.iotdb.db.protocol.mqtt;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
+package org.apache.iotdb.mqtt;
-import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
public class PayloadFormatManagerTest {
- @After
- public void tearDown() throws Exception {
- EnvironmentUtils.cleanAllDir();
- }
@Test(expected = IllegalArgumentException.class)
public void getPayloadFormat() {
diff --git a/external-service-impl/pom.xml b/external-service-impl/pom.xml
new file mode 100644
index 0000000000000..986522e6275d2
--- /dev/null
+++ b/external-service-impl/pom.xml
@@ -0,0 +1,53 @@
+
+
+
+ 4.0.0
+
+ org.apache.iotdb
+ iotdb-parent
+ 2.0.7-SNAPSHOT
+
+ external-service-impl
+ IoTDB: External-Service-Impl
+ pom
+
+ mqtt
+
+
+ 8
+ 8
+ UTF-8
+
+
+
+ org.apache.iotdb
+ external-service-api
+ provided
+ 2.0.7-SNAPSHOT
+
+
+ junit
+ junit
+ test
+
+
+
diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index f566c8e5995b7..c82a63c3f3b15 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -227,6 +227,18 @@
httpcore
test
+
+ org.apache.iotdb
+ external-service-api
+ 2.0.7-SNAPSHOT
+
+
+ org.apache.iotdb
+ mqtt
+ 2.0.7-SNAPSHOT
+
+ provided
+
diff --git a/integration-test/src/assembly/mpp-share.xml b/integration-test/src/assembly/mpp-share.xml
index 01fce3555def2..96ac13e64d179 100644
--- a/integration-test/src/assembly/mpp-share.xml
+++ b/integration-test/src/assembly/mpp-share.xml
@@ -30,4 +30,10 @@
lib
+
+
+ ${project.basedir}/../external-service-impl/mqtt/target/mqtt-${project.version}-jar-with-dependencies.jar
+ lib
+
+
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 72c4acba58615..b55fd8e5c6827 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -212,10 +212,6 @@
org.eclipse.jetty
jetty-http
-
- io.netty
- netty-codec-mqtt
-
org.apache.commons
commons-pool2
@@ -267,18 +263,10 @@
org.glassfish.jersey.containers
jersey-container-servlet-core
-
- com.github.moquette-io.moquette
- moquette-broker
-
com.google.code.gson
gson
-
- io.netty
- netty-buffer
-
org.eclipse.jetty
jetty-servlet
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 27a5ce3b7b795..d0eef468d4f94 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -1190,14 +1190,14 @@ private void getTriggerInformationList(List allTriggerInformation) {
private void prepareExternalServiceResources() throws StartupException {
long startTime = System.currentTimeMillis();
- if (resourcesInformationHolder.getExternalServiceEntryList() == null
- || resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) {
- return;
- }
try {
- ExternalServiceManagementService.getInstance()
- .restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList());
+ if (resourcesInformationHolder.getExternalServiceEntryList() != null
+ && !resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) {
+ ExternalServiceManagementService.getInstance()
+ .restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList());
+ }
+
ExternalServiceManagementService.getInstance().restoreRunningServiceInstance();
} catch (Exception e) {
throw new StartupException(e);
@@ -1290,6 +1290,7 @@ public void deleteDataNodeSystemProperties() {
public void stop() {
stopTriggerRelatedServices();
registerManager.deregisterAll();
+ ExternalServiceManagementService.getInstance().stopRunningServices();
JMXService.deregisterMBean(mbeanName);
MetricService.getInstance().stop();
if (schemaRegionConsensusStarted) {
@@ -1310,9 +1311,6 @@ public void stop() {
}
private void initProtocols() throws StartupException {
- if (config.isEnableMQTTService()) {
- registerManager.register(MQTTService.getInstance());
- }
if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
registerManager.register(RestService.getInstance());
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
index 7bed3ffe4711d..4dee890e65fd8 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
@@ -19,17 +19,18 @@
package org.apache.iotdb.db.service.externalservice;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
import java.util.function.Supplier;
public enum BuiltinExternalServices {
MQTT(
"MQTT",
- "org.apache.iotdb.externalservice.Mqtt",
- // IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService
- () -> false),
+ "org.apache.iotdb.mqtt.MQTTService",
+ IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService),
REST(
"REST",
- "org.apache.iotdb.externalservice.Rest",
+ "org.apache.iotdb.rest.RestService",
// IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService
() -> false);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
index 71f0df341a687..54cc576c048de 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
@@ -25,6 +25,7 @@
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.commons.file.SystemFileFactory;
import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
@@ -65,10 +66,25 @@ public class ExternalServiceManagementService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ExternalServiceManagementService.class);
+ public static final String INSTANCE_NULL_ERROR_MSG =
+ "External Service instance is null when state is RUNNING!";
+
private ExternalServiceManagementService(String libRoot) {
this.serviceInfos = new HashMap<>();
restoreBuiltInServices();
this.libRoot = libRoot;
+ makeDir(libRoot);
+ }
+
+ private static void makeDir(String dir) {
+ try {
+ SystemFileFactory.INSTANCE.makeDirIfNecessary(dir);
+ } catch (IOException e) {
+ LOGGER.error("Failed to make external service dir", e);
+ throw new ExternalServiceManagementException(
+ new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
+ .setMessage(e.getMessage()));
+ }
}
public Iterator showService(int dataNodeId)
@@ -170,7 +186,10 @@ public void startService(String serviceName) throws ClientManagerException, TExc
private IExternalService createExternalServiceInstance(String serviceName, String className) {
// close ClassLoader automatically to release the file handle
- try (ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); ) {
+ try {
+ // Remind: this classLoader should be closed when service is dropped after user-defined
+ // service supported
+ ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot);
return (IExternalService)
Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance();
} catch (IOException
@@ -235,7 +254,7 @@ public void stopService(String serviceName) throws ClientManagerException, TExce
private void stopService(ServiceInfo serviceInfo) {
checkState(
serviceInfo.getServiceInstance() != null,
- "External Service instance is null when state is RUNNING!",
+ INSTANCE_NULL_ERROR_MSG,
serviceInfo.getServiceName());
serviceInfo.getServiceInstance().stop();
}
@@ -311,11 +330,34 @@ public void restoreRunningServiceInstance() {
serviceInfo -> {
// start services with RUNNING state
if (serviceInfo.getState() == RUNNING) {
- IExternalService serviceInstance =
- createExternalServiceInstance(
- serviceInfo.getServiceName(), serviceInfo.getClassName());
- serviceInfo.setServiceInstance(serviceInstance);
- serviceInstance.start();
+
+ try {
+ IExternalService serviceInstance =
+ createExternalServiceInstance(
+ serviceInfo.getServiceName(), serviceInfo.getClassName());
+ checkState(serviceInstance != null, INSTANCE_NULL_ERROR_MSG);
+ serviceInfo.setServiceInstance(serviceInstance);
+ serviceInstance.start();
+ } finally {
+ // set STOPPED to avoid the case: service is RUNNING, but its instance is null
+ if (serviceInfo.getServiceInstance() == null) {
+ serviceInfo.setState(STOPPED);
+ }
+ }
+ }
+ });
+ }
+
+ public void stopRunningServices() {
+ serviceInfos
+ .values()
+ .forEach(
+ serviceInfo -> {
+ // stop services with RUNNING state
+ if (serviceInfo.getState() == RUNNING) {
+ IExternalService serviceInstance = serviceInfo.getServiceInstance();
+ checkState(serviceInstance != null, INSTANCE_NULL_ERROR_MSG);
+ serviceInstance.stop();
}
});
}
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 7267c79a66551..55fad14e5a10f 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -25,7 +25,6 @@ public enum ServiceType {
METRIC_SERVICE("Metrics ServerService", "MetricService"),
RPC_SERVICE("RPC ServerService", "RPCService"),
INFLUX_SERVICE("InfluxDB Protocol Service", "InfluxDB Protocol"),
- MQTT_SERVICE("MQTTService", "MqttService"),
AIR_GAP_SERVICE("AirGapService", "AirGapService"),
MONITOR_SERVICE("Monitor ServerService", "Monitor"),
STAT_MONITOR_SERVICE("Statistics ServerService", "StatMonitorService"),
diff --git a/pom.xml b/pom.xml
index 6696f783d804b..ab4b25e01034c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@
distribution
example
library-udf
+ external-service-impl