diff --git a/src/scripts/blah_common_submit_functions.sh b/src/scripts/blah_common_submit_functions.sh index d4d8fef8..269c0fa0 100644 --- a/src/scripts/blah_common_submit_functions.sh +++ b/src/scripts/blah_common_submit_functions.sh @@ -712,6 +712,11 @@ function bls_start_job_wrapper () echo "# Move all relative outputsand paths out of temp home" echo "cd \$new_home" bls_fl_subst_relative_paths_and_dump outputsand "mv \"@@F_WORKNAME\" \"@@F_REMOTE\" 2> /dev/null" "\\\$old_home" + echo "if declare -f blah_stageout_trap &>/dev/null; then" + echo " pushd \$old_home" + echo " blah_stageout_trap" + echo " popd" + echo "fi" echo "# Move any remapped outputsand file to shared directories" bls_fl_subst_relative_paths_and_dump outputmove "mv \"@@F_REMOTE\" \"@@F_LOCAL\" 2> /dev/null" diff --git a/src/scripts/condor-ce-stager.service b/src/scripts/condor-ce-stager.service new file mode 100644 index 00000000..ff6ec117 --- /dev/null +++ b/src/scripts/condor-ce-stager.service @@ -0,0 +1,9 @@ +[Unit] +Description=Simple app to stage HTCondor-CE files to/from worker nodes + +[Service] +ExecStart=/usr/libexec/blahp/slurm_file_stager.py +Restart=always + +[Install] +WantedBy=condor-ce.service diff --git a/src/scripts/slurm_file_stager.py b/src/scripts/slurm_file_stager.py new file mode 100755 index 00000000..62a17664 --- /dev/null +++ b/src/scripts/slurm_file_stager.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python + +# +# Simple HTTP-based service to stage files between the CEs and WNs +# + +import os +import os.path +import stat +from BaseHTTPServer import BaseHTTPRequestHandler +from cgi import parse_multipart, parse_header +from urlparse import parse_qs +import urlparse + +class StagerHandler(BaseHTTPRequestHandler): + def check_auth(self): + if self.client_address[0] == "127.0.0.1": + return True + elif self.client_address[0].startswith("10.0"): + return True + else: + self.send_response(403) + self.end_headers() + self.wfile.write("This IP is not authorized: %s.\r\n" % self.client_address[0]) + return False + + def do_GET(self): + if not self.check_auth(): + return + parsed_path = self.path + parsed_path = os.path.normpath(parsed_path) + parsed_path = os.path.realpath(parsed_path) + if not parsed_path.startswith('/var/lib/condor-ce/spool/'): + print "Illegal path %s" % os.path.realpath(parsed_path) + self.fail("PATH") + return + if not os.path.exists(parsed_path): + print "404: %s" % parsed_path + self.fail() + else: + self.send_response(200) + self.end_headers() + size = os.stat(parsed_path).st_size + print "Sending %s (%s bytes)" % (parsed_path, size) + self.wfile.write(open(parsed_path, "rb").read()) + + def fail(self, msg="FAIL"): + self.send_response(500) + self.end_headers() + self.wfile.write("%s\r\n" % msg) + return + + def do_POST(self): + if not self.check_auth(): + return + parsed_path = self.path + parsed_path = os.path.normpath(parsed_path) + parsed_path = os.path.realpath(parsed_path) + if not parsed_path.startswith('/var/lib/condor-ce/spool/'): + print "Illegal path %s" % os.path.realpath(parsed_path) + self.fail("PATH") + return + ctype, pdict = parse_header(self.headers['content-type']) + if ctype == 'multipart/form-data': + if int(self.headers['content-length']) > 1024*1024*20: + # Don't accept files larger than 20MB + print "Too large %s" % self.headers['content-length'] + self.fail() + return + + postvars = parse_multipart(self.rfile, pdict) + elif ctype == 'application/x-www-form-urlencoded': + length = int(self.headers['content-length']) + postvars = parse_qs( + self.rfile.read(length), + keep_blank_values=1) + else: + postvars = {} + outdata = postvars['data'][0] + try: + parent_dir = os.path.dirname(parsed_path) + if not os.path.exists(parent_dir): + self.fail("PATH") + return + dirstat = os.stat(parent_dir) + uid = dirstat.st_uid + gid = dirstat.st_gid + with open(parsed_path, 'wb') as fh: + os.fchown(fh.fileno(), uid, gid) + fh.write(outdata) + fh.flush() + os.fsync(fh.fileno()) + size = os.stat(parsed_path).st_size + print "Received %s (%s bytes)" % (parsed_path, size) + except IOError, e: + print "ioerror: %s" % e + self.fail("IO") + return + + message = 'OK\r\n' + self.send_response(200) + self.end_headers() + self.wfile.write(message) + return + + +if __name__ == '__main__': + from BaseHTTPServer import HTTPServer + server = HTTPServer(('0.0.0.0', 8080), StagerHandler) + print 'Starting server, use to stop' + server.serve_forever() + diff --git a/src/scripts/slurm_submit.sh b/src/scripts/slurm_submit.sh index f02f31f2..223e32df 100755 --- a/src/scripts/slurm_submit.sh +++ b/src/scripts/slurm_submit.sh @@ -75,7 +75,16 @@ fi bls_set_up_local_and_extra_args # Input and output sandbox setup. -# Assume all filesystems are shared. +echo "# Begin file staging" >> $bls_tmp_file +echo "cd \$HOME" >> $bls_tmp_file +bls_fl_subst_and_dump inputsand "curl --retry 5 -s -o @@F_REMOTE http://`hostname -s`:8080@@F_LOCAL" >> $bls_tmp_file +bls_fl_subst_and_dump inputsand "chmod go-rwx @@F_REMOTE" >> $bls_tmp_file +echo "function blah_stageout_trap() {" >> $bls_tmp_file +bls_fl_subst_and_dump outputsand " curl --retry 5 -s -F 'data=@@@F_REMOTE' http://`hostname -s`:8080@@F_LOCAL" >> $bls_tmp_file +bls_fl_subst_and_dump outputsand " rm -f @@F_REMOTE" >> $bls_tmp_file +echo " sleep 5" >> $bls_tmp_file +echo "}" >> $bls_tmp_file +echo "# End file staging" >> $bls_tmp_file bls_add_job_wrapper