import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeoutException;
import java.util.logging.Handler;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;

import org.json.JSONException;
import org.json.JSONObject;

import com.catalyst.Context;
import com.catalyst.config.ZCThreadLocal;
import com.catalyst.job.CapacityAttributes;
import com.catalyst.job.JOB_STATUS;
import com.catalyst.impl.DefaultContext;
import com.catalyst.job.JobRequest;
import com.catalyst.job.impl.DefaultJobRequest;

class ZcJSONObject extends JSONObject {

	ZcJSONObject(String jsonString) throws JSONException {
		super(jsonString);
	}

	public long getLong(String key, boolean throwErr) throws JSONException {
		try {
			return super.getLong(key);
		} catch (JSONException ex) {
			if(throwErr) {
				throw ex;
			}
			return 0;
		}
	}

	public ZcJSONObject getJSONObject(String key) throws JSONException {
		return this.getJSONObject(key, false);
	}
	public ZcJSONObject getJSONObject(String key, boolean throwErr) throws JSONException {
		try {
			return new ZcJSONObject(super.getJSONObject(key).toString());
		} catch(JSONException ex) {
			if(throwErr) {
				throw ex;
			}
			return null;
		}
	}
}

public class JavajobInvoker {

	private static final Integer MESSAGE_LENGTH = 1500;

	public class LogHandler extends Handler {

		@Override
		public void publish(LogRecord record) {
			try {
				String exceptionMessage = "";
				if (record.getThrown() != null) {
					exceptionMessage = getStackTraceAsString(record.getThrown());
				}
				String message = new SimpleFormatter().format(record) + " " + exceptionMessage;
				if (message.length() > MESSAGE_LENGTH) {
					message = message.substring(0, MESSAGE_LENGTH);
				}

				System.out.println("[" + record.getLevel().getName() + "] : " + message);

			} catch (Exception e) {
				System.out.println(getStackTraceAsString(e));
			}

		}

		@Override
		public void flush() {
			// TODO Auto-generated method stub

		}

		@Override
		public void close() throws SecurityException {
			// TODO Auto-generated method stub

		}

	}

	private static String getStackTraceAsString(Throwable throwable) {
		StringWriter stringWriter = new StringWriter();
		throwable.printStackTrace(new PrintWriter(stringWriter));
		return stringWriter.toString();
	}

	private static <T> HashMap<String, T> jsonToMap(String t) throws Exception {

		HashMap<String, T> map = new HashMap<String, T>();
		JSONObject jObject = new JSONObject(t);
		Iterator<?> keys = jObject.keys();

		while (keys.hasNext()) {
			String key = (String) keys.next();
			Object value = jObject.get(key);
			map.put(key, (T) value);

		}
		return map;
	}

	private static void writeResponse(String response, Integer status, String invokerDir) throws Exception {
		// write response data
		String responseFilePath = Paths.get(invokerDir, "../user_res_body").toString();

		BufferedWriter responseWriter = new BufferedWriter(new FileWriter(responseFilePath));
		responseWriter.write(response);
		responseWriter.close();

		// write response meta
		String metaFilePath = Paths.get(invokerDir, "../user_meta.json").toString();

		BufferedWriter metaWriter = new BufferedWriter(new FileWriter(metaFilePath));
		JSONObject metaJson = new JSONObject();
		metaJson.put("statusCode", status);
		metaWriter.write(new JSONObject().put("response", metaJson).toString());
		metaWriter.close();
	}

	private static void throwAndExit(Exception err, int exitCode, String invokerDir) {
		try {
			String sStackTrace = getStackTraceAsString(err); // stack trace as a string
			if (exitCode == 532) {
				writeResponse("CODE_EXCEPTION", exitCode, invokerDir);
			} else if (exitCode == 500) {
				writeResponse("INTERNAL_SERVER_ERROR", 500, invokerDir);
			} else if (exitCode == 408) {
				writeResponse("TIMEOUT", exitCode, invokerDir);
			}
			System.out.println(sStackTrace);
		} catch (Exception e) {
			System.out.println(e);
		}
		System.exit(exitCode);
	}

	private static void setZCThreadLocalProject(HashMap<String, Object> project) throws Exception {

		JSONObject catalystConfig = new JSONObject();
		catalystConfig.put("project_id", project.get("x-zc-projectid").toString());
		catalystConfig.put("project_key", project.get("x-zc-project-key").toString());
		catalystConfig.put("project_domain", project.get("x-zc-project-domain").toString());
		catalystConfig.put("environment", project.get("x-zc-environment").toString());
		
		ZCThreadLocal.putValue("CATALYST_CONFIG", catalystConfig.toString());
	}

	private static void setZCThreadLocalAuth(HashMap<String, Object> auth) throws Exception {
		JSONObject catalystAuth = new JSONObject();
		
		String adminAuthHeaderType = auth.get("x-zc-admin-cred-type").toString();
		String adminAuthToken = auth.get("x-zc-admin-cred-token").toString();
		
		JSONObject adminAuth = new JSONObject();
		adminAuth.put((adminAuthHeaderType.equals("token")) ? "access_token" : "ticket", adminAuthToken); // No I18N
		catalystAuth.put("admin_cred", adminAuth);

		JSONObject clientAuth = new JSONObject();
		if(auth.containsKey("x-zc-user-cred-type") && auth.containsKey("x-zc-user-cred-token")) {
			String userAuthHeaderType = auth.get("x-zc-user-cred-type").toString();
			String userAuthToken = auth.get("x-zc-user-cred-token").toString();

			clientAuth.put((userAuthHeaderType.equals("token")) ? "access_token" : "ticket", userAuthToken); // No I18N
		}
		if(auth.containsKey("x-zc-user-type")) { // No I18N
			String userType = auth.get("x-zc-user-type").toString(); // No I18N
			clientAuth.put("user_type", userType); // No I18N
		}
		if (auth.containsKey("x-zc-cookie")) {
			String cookie = auth.get("x-zc-cookie").toString();
			clientAuth.put("cookie", cookie);
		}
		catalystAuth.put("client_cred", clientAuth);
		ZCThreadLocal.putValue("CATALYST_AUTH", catalystAuth.toString()); // No I18N

		// For backward compatibility, to be remove in future.
		if(auth.containsKey("x-zc-cookie")) {
			String cookie = auth.get("x-zc-cookie").toString();
			ZCThreadLocal.putValue("client_cookie", cookie); // No I18N
		}
		// to be removed in future
	}

	public static void main(String[] args) {
		String invokerDir = args[0];
		try {
			LogManager manager = LogManager.getLogManager();
			manager.reset();
			Logger rootLogger = manager.getLogger("");
			LogHandler handler = new JavajobInvoker().new LogHandler();
			rootLogger.addHandler(handler);

			HashMap<String, Object> target = jsonToMap(args[1]);
			String fnExeName = (String) target.get("index");
			String fnName = (String) target.get("name");
			String fnExePath = Paths.get(invokerDir, "../../", "functions", fnName).normalize().toString();

			ZcJSONObject userData = null;
			try {
				userData = new ZcJSONObject(args[2]);
			} catch (JSONException jsonEx) {
				// do not handle the exception
			}
			HashMap<String, Object> projectData = jsonToMap(args[3]);
			HashMap<String, Object> authData = jsonToMap(args[4]);

			File[] jarFiles = new File(fnExePath).listFiles(new FilenameFilter() {
				@Override
				public boolean accept(File dir, String name) {
					if(name.endsWith(".jar")) {
						return true;
					}
					return false;
				}
			});
			int jarCount = jarFiles.length;
			URL[] URLs = new URL[jarCount + 1];
			URLs[0] = new File(Paths.get(fnExePath).toString()).toURI().toURL();
			for (int i = 1; i <= jarCount; i++) {
				URLs[i] = jarFiles[i - 1].toURI().toURL();
			}
			URLClassLoader child = new URLClassLoader(URLs, JavajobInvoker.class.getClassLoader());
			Class<?> cls = Class.forName(fnExeName, true, child);

			setZCThreadLocalProject(projectData);
			setZCThreadLocalAuth(authData);

			
			// construct job details from input data
			ZcJSONObject jobDetails = userData.getJSONObject("job_details");
			ZcJSONObject jobMetaDetails = jobDetails != null ? jobDetails.getJSONObject("job_meta_details") : null;
			ZcJSONObject jobpoolDetails = jobMetaDetails != null ? jobMetaDetails.getJSONObject("jobpool_details") : null;
			ZcJSONObject projectDetails = jobpoolDetails != null ? jobpoolDetails.getJSONObject("project_details") : null;
			ZcJSONObject capacity = userData.getJSONObject("capacity");
			ZcJSONObject params = jobMetaDetails != null ? jobMetaDetails.getJSONObject("params") : null;
			
			// construct default job request from input job details
			DefaultJobRequest defaultJob = new DefaultJobRequest();

			defaultJob.setJobDetails(jobDetails != null ? (JSONObject) jobDetails : new JSONObject());
			defaultJob.setJobMetaDetails(jobMetaDetails != null ? (JSONObject) jobDetails : new JSONObject());
			defaultJob.setJobpoolDetails(jobpoolDetails != null ? (JSONObject) jobpoolDetails : new JSONObject());
			defaultJob.setProjectDetails(projectDetails != null ? (JSONObject) projectDetails : new JSONObject());
			CapacityAttributes capacityAttributes = CapacityAttributes.getInstance();
			capacityAttributes.setMemory(capacity != null ? capacity.getLong("memory") : 0l);
			defaultJob.setJobCapacity(capacityAttributes);
			defaultJob.setJobParams(jsonToMap(params != null ? params.toString() : new JSONObject().toString()));

			JobRequest jobRequest = defaultJob;
			
			DefaultContext defaultContext = new DefaultContext(cls.getName(), 900000L);
			Context context = defaultContext;

			Method runner = cls.getMethod("handleJobExecute", JobRequest.class, Context.class);
			JOB_STATUS jobStatus = null;

			if(System.getenv("DEBUG") != null && System.getenv("DEBUG").equals("false")) {
				Timer executionTimer = new Timer(true);
				executionTimer.schedule(new TimerTask() {
					@Override
					public void run() {
						throwAndExit(new TimeoutException("function execution timeout"), 408, invokerDir);
					}
				}, defaultContext.getMaxExecutionTimeMs());
			}

			try {
				jobStatus = (JOB_STATUS) runner.invoke(cls.getDeclaredConstructor().newInstance(), jobRequest, context);
			} catch (Exception e) {
				throwAndExit(e, 532, invokerDir);
			}

			if(jobStatus == null) {
				writeResponse("UNINTENTIONAL_TERMINATION", 531, invokerDir);
			} else {
				int status = jobStatus.getStatus();
				writeResponse(jobStatus.name(), status == 500 ? 530 : status, invokerDir);
			}

			System.exit(0);
		} catch (Exception e) {
			throwAndExit(e, 500, invokerDir);
		}
	}
}
