diff --git a/regression-tests/datasets/Kmeans.mar b/regression-tests/datasets/Kmeans.mar new file mode 100644 index 0000000..5c6953e Binary files /dev/null and b/regression-tests/datasets/Kmeans.mar differ diff --git a/regression-tests/datasets/kmeans_test.mar b/regression-tests/datasets/kmeans_test.mar new file mode 100644 index 0000000..190c523 Binary files /dev/null and b/regression-tests/datasets/kmeans_test.mar differ diff --git a/regression-tests/lib/__init__.py b/regression-tests/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/regression-tests/lib/__init__.pyc b/regression-tests/lib/__init__.pyc new file mode 100644 index 0000000..610f526 Binary files /dev/null and b/regression-tests/lib/__init__.pyc differ diff --git a/regression-tests/lib/config.py b/regression-tests/lib/config.py new file mode 100644 index 0000000..193a257 --- /dev/null +++ b/regression-tests/lib/config.py @@ -0,0 +1,37 @@ +# vim: set encoding=utf-8 + +# Copyright (c) 2016 Intel Corporation  +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +#       http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" Global Config file for testcases, used heavily by automation""" +import os + + +root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) +dataset_directory = os.path.join(root, "regression-tests", "datasets") +hdfs_namenode = os.getenv("CDH_MASTER", "localhost") +user = os.getenv("USER", "hadoop") +run_mode = True if os.getenv("RUN_MODE", "yarn_client") == "yarn_client" else False +hostname = os.getenv("HOSTNAME") + +# HDFS paths, need to be set NOT using os.join since HDFS doesn't use the system +# path seperator, it uses HDFS path seperator ('/') +hdfs_user_root = "/user/" + user +hdfs_data_dir = hdfs_user_root + "/qa_data" +checkpoint_dir = hdfs_user_root + "/sparktk_checkpoint" +export_dir = "hdfs://"+hostname+":8020"+hdfs_user_root+"/sparktk_export" + +scoring_engine_host = os.getenv("SCORING_ENGINE_HOST", "127.0.0.1") +port = 8020 diff --git a/regression-tests/lib/config.pyc b/regression-tests/lib/config.pyc new file mode 100644 index 0000000..93b2fc5 Binary files /dev/null and b/regression-tests/lib/config.pyc differ diff --git a/regression-tests/lib/scoring_utils.py b/regression-tests/lib/scoring_utils.py new file mode 100644 index 0000000..4b26eff --- /dev/null +++ b/regression-tests/lib/scoring_utils.py @@ -0,0 +1,95 @@ +# vim: set encoding=utf-8 + +# Copyright (c) 2016 Intel Corporation  +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +#       http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" Library to support scoring in TAP for the ATK service """ +import subprocess as sp +import requests +import time +import signal +import os +import config + + +class scorer(object): + + def __init__(self, hdfs_path=None, port=config.port, pipeline=False, host=config.scoring_engine_host): + """Set up the server location, port and model file""" + self.name = host.split('.')[0] + self.host = host + self.pipeline = pipeline + self.port = port + self.scoring_process = None + self.hdfs_path = hdfs_path + self.full_host_url = "http://" + str(self.host) + ":" + str(self.port) + + def __enter__(self): + """Activate the Server""" + # change current working directory to point at scoring_engine dir + run_path = os.path.abspath(os.path.join(config.root, "model-scoring-core")) + + # keep track of cwd for future + test_dir = os.getcwd() + os.chdir(run_path) + + # make a new process group + if self.hdfs_path: + self.scoring_process = sp.Popen(["./bin/scoring-server.sh", + "-Dtrustedanalytics.scoring-engine.archive-mar=%s" % self.hdfs_path, + "-Dtrustedanalytics.scoring.host=%s" % self.host, + "-Dtrustedanalytics.scoring.port=%s" % self.port], preexec_fn=os.setsid) + else: + self.scoring_process = sp.Popen(["./bin/scoring-server.sh", + "-Dtrustedanalytics.scoring.host=%s" % self.host, + "-Dtrustedanalytics.scoring.port=%s" % self.port], preexec_fn=os.setsid) + + # restore cwd + os.chdir(test_dir) + + # wait for server to start + time.sleep(20) + + return self + + def __exit__(self, *args): + """Teardown the server""" + # Get the process group to kill all of the suprocesses + pgrp = os.getpgid(self.scoring_process.pid) + os.killpg(pgrp, signal.SIGKILL) + time.sleep(50) + + def upload_mar_bytes(self, file_bytes): + """gives mar file to empty scoring server as bytes data""" + requests.post(url=self.full_host_url + "/uploadMarBytes", + data=file_bytes, + headers={"Content-type": "application/octet-stream"}) + + def upload_mar_file(self, files): + """gives a mar file to a an empty scoring server""" + requests.post(url=self.full_host_url + "/uploadMarFile", files=files) + + def score(self, data_val): + """score the json set data_val""" + + # Magic headers to make the server respond appropriately + # Ask the head of scoring why these + headers = {'Content-type': 'application/json', + 'Accept': 'application/json,text/plain'} + + response = requests.post( + self.full_host_url + "/v2/score", json={"records": data_val}, headers=headers) + + return response diff --git a/regression-tests/lib/scoring_utils.pyc b/regression-tests/lib/scoring_utils.pyc new file mode 100644 index 0000000..a1e07be Binary files /dev/null and b/regression-tests/lib/scoring_utils.pyc differ diff --git a/regression-tests/lib/sparktk_test.py b/regression-tests/lib/sparktk_test.py new file mode 100644 index 0000000..ded93ac --- /dev/null +++ b/regression-tests/lib/sparktk_test.py @@ -0,0 +1,109 @@ +# vim: set encoding=utf-8 + +# Copyright (c) 2016 Intel Corporation  +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +#       http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Setup up tests for regression """ + +import unittest +import uuid +import datetime +import os +import daaltk + +import sparktk as stk + +import config +from threading import Lock +udf_lib_path = os.path.join(config.root, "regression-tests", "sparktkregtests", "lib" ,"udftestlib") +udf_files = [os.path.join(udf_lib_path, f) for f in os.listdir(udf_lib_path)] + +lock = Lock() +global_tc = None + + +def get_context(): + global global_tc + with lock: + if global_tc is None: + sparktkconf_dict = {'spark.driver.maxPermSize': '512m', + 'spark.ui.enabled': 'false', + 'spark.driver.maxResultSize': '2g', + 'spark.dynamicAllocation.enabled': 'true', + 'spark.dynamicAllocation.maxExecutors': '16', + 'spark.dynamicAllocation.minExecutors': '1', + 'spark.executor.cores': '2', + 'spark.executor.memory': '2g', + 'spark.shuffle.io.preferDirectBufs': 'true', + 'spark.shuffle.service.enabled': 'true', + 'spark.yarn.am.waitTime': '1000000', + 'spark.yarn.executor.memoryOverhead': '384', + 'spark.eventLog.enabled': 'false', + 'spark.sql.shuffle.partitions': '6'} + if config.run_mode: + global_tc = stk.TkContext(master='yarn-client', extra_conf_dict=sparktkconf_dict, py_files=udf_files) + else: + global_tc = stk.TkContext(py_files=udf_files) + + return global_tc + + +class SparkTKTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + """Build the context for use""" + cls.context = get_context() + cls.context.sc.setCheckpointDir(config.checkpoint_dir) + + def setUp(self): + pass + + def tearDown(self): + pass + + @classmethod + def tearDownClass(cls): + pass + + def get_file(self, filename): + """Return the hdfs path to the given file""" + # Note this is an HDFS path, not a userspace path. os.path library + # may be wrong + placed_path = config.hdfs_data_dir + "/" + filename + return placed_path + + def get_export_file(self, filename): + # Note this is an HDFS path, not a userspace path. os.path library + # may be wrong + placed_path = config.export_dir + "/" + filename + return placed_path + + def get_name(self, prefix): + """build a guid hardened unique name """ + datestamp = datetime.datetime.now().strftime("%m_%d_%H_%M_") + name = prefix + datestamp + uuid.uuid1().hex + return name + + def get_local_dataset(self, dataset): + """gets the dataset from the dataset folder""" + dataset_directory = config.dataset_directory + return os.path.join(dataset_directory, dataset) + + def assertFramesEqual(self, frame1, frame2): + frame1_take = frame1.take(frame1.count()) + frame2_take = frame2.take(frame2.count()) + + self.assertItemsEqual(frame1_take, frame2_take) diff --git a/regression-tests/testcases/.scala_scoring_engine_endpoint_test.py.swo b/regression-tests/testcases/.scala_scoring_engine_endpoint_test.py.swo new file mode 100644 index 0000000..5b2b539 Binary files /dev/null and b/regression-tests/testcases/.scala_scoring_engine_endpoint_test.py.swo differ diff --git a/regression-tests/testcases/python_scoring_engine_endpoint_test.py b/regression-tests/testcases/python_scoring_engine_endpoint_test.py new file mode 100644 index 0000000..afb4944 --- /dev/null +++ b/regression-tests/testcases/python_scoring_engine_endpoint_test.py @@ -0,0 +1,118 @@ +# vim: set encoding=utf-8 + +# Copyright (c) 2016 Intel Corporation  +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +#       http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" test cases for scala scoring engine """ +import unittest +import os +from lib import scoring_utils +from sparktkregtests.lib import sparktk_test + + +class ScalaScoring(sparktk_test.SparkTKTestCase): + + def setUp(self): + """Import the files to test against.""" + super(ScalaScoring, self).setUp() + self.schema = [("data", float), + ("name", str)] + self.train_data = [[2, "ab"], [1, "cd"], [7, "ef"], + [1, "gh"], [9, "ij"], [2, "jk"], + [0, "mn"], [6, "op"], [5, "qr"]] + self.test_data = [[0, "ab"], [1, "cd"], [4, "ef"], + [3, "gh"], [4, "ij"], [5, "jk"], + [10, "mn"], [10, "op"], [2, "qr"]] + self.frame_train = self.context.frame.create(self.train_data, schema=self.schema) + self.frame_test = self.context.frame.create(self.test_data, schema=self.schema) + reg_test_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + self.mar_path = os.path.join(reg_test_root, "datasets") + "/Kmeans.mar" + self.model = self.context.models.clustering.kmeans.train(self.frame_train, ["data"], 3, seed=5) + + def test_model_scoring_upload_mar_file_simple(self): + """test creating an empty scoring server then upload mar file and score""" + expected_res = self.model.predict(self.frame_test) + + files = {"file": open(self.mar_path, 'rb')} + + with scoring_utils.scorer() as scorer: + scorer.upload_mar_file(files) + self._score_and_compare_expected_actual_result(expected_res, scorer) + + def test_model_scoring_simple(self): + """simple model scoring test""" + expected_result = self.model.predict(self.frame_test) + + with scoring_utils.scorer(self.mar_path) as scorer: + self._score_and_compare_expected_actual_result(expected_result, scorer) + + def test_model_scoring_send_model_bytes(self): + """start empty scoring server and test sending model as bytes""" + expected_res = self.model.predict(self.frame_test) + + with open(self.mar_path, 'rb') as mar_file: + bytes_data = mar_file.read() + + with scoring_utils.scorer() as scorer: + scorer.upload_mar_bytes(bytes_data) + self._score_and_compare_expected_actual_result(expected_res, scorer) + + @unittest.skip("scoring_engine: sending model twice to scoring engine should provide nice error message") + def test_send_model_twice(self): + """test to ensure that sending two models fails""" + with scoring_utils.scorer(self.mar_path) as scorer: + with scorer.upload_mar_file(self.mar_path): + response = scorer.score({"data": 2}) + self.assertTrue("500" in str(response)) + + def test_score_no_model(self): + """test scoring on a score server started without a model""" + with scoring_utils.scorer() as scorer: + response = scorer.score({"data": 2}) + self.assertTrue("500" in str(response)) + + def test_score_invalid_data(self): + """start a scoring server with valid mar but send invalid data""" + with scoring_utils.scorer(self.mar_path) as scorer: + response = scorer.score({"data": "apple"}) + self.assertTrue("500" in str(response)) + + def _score_and_compare_expected_actual_result(self, expected, scorer): + """compare predict and score result""" + # get pandas frame for ease of access from exp res + pandas_res = expected.to_pandas() + # here we will store the equivalent cluster name + # this is because the cluster names may be labeled differently + # e.g., predict may call one cluster 0, scoring engine might label it 1 + # so we will record what the equivalent cluster is + # we only care to ensure that the groups are the same, the labels can differ + map_cluster_labels = {} + + # iterate through the pandas predict result + for (index, row) in pandas_res.iterrows(): + score_result = scorer.score([dict(zip(["data"], [row["data"]]))]) + score = score_result.json()["data"][0]["score"] + + # if we have not yet seen this cluster label we add it to our dict + # of we have already seen this cluster label then we ensure that the + # mapped cluster is the same + if row["cluster"] not in map_cluster_labels: + map_cluster_labels[row["cluster"]] = score + else: + self.assertEqual(map_cluster_labels[row["cluster"]], score) + + +if __name__ == '__main__': + unittest.main()