feat(JobWrapper): Report the status of the job to DiracX#111
feat(JobWrapper): Report the status of the job to DiracX#111Loxeris wants to merge 11 commits intoDIRACGrid:mainfrom
Conversation
71983e2 to
2ff453b
Compare
test/test_job_wrapper.py
Outdated
| This module tests the functionalities of the job wrapper. | ||
| """ | ||
|
|
||
| import asyncio |
There was a problem hiding this comment.
Could you try to use pytest-asyncio as it's done in diracx please?
https://github.com/DIRACGrid/diracx/blob/b7358959d3bdc4aee25e74cf2bfbf0da0e59a521/diracx-testing/pyproject.toml#L17
test/test_integration.py
Outdated
| usage scenarios. | ||
| """ | ||
|
|
||
| import asyncio |
test/test_execution_hooks_core.py
Outdated
| abstract interfaces. | ||
| """ | ||
|
|
||
| import asyncio |
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| asyncio.run(main()) |
There was a problem hiding this comment.
Instead of calling sys.exit from the main (now async) function, I think it would be cleaner to return an int, and sys.exit from here like:
return 1
if __name__ == "__main__:
sys.exit(asyncio.run(main())There was a problem hiding this comment.
I think you can just rename the data_management_mocks into mocks
| ret = await set_job_status( | ||
| jobID, | ||
| "Done", | ||
| "Execution Complete", | ||
| source=src, | ||
| ) |
There was a problem hiding this comment.
Can we move the calls to set_job_status within the JobWrapper?
The reason is that we also use the JobWrapper but without the JobWrapperTemplate with HPC with no external connectivity.
| ret = await set_job_status( | ||
| jobID, | ||
| "Failed", | ||
| source=src, | ||
| ) |
src/dirac_cwl/job/job_wrapper.py
Outdated
| :param job_path: Path to the job working directory. | ||
| """ | ||
| assert arguments.sandbox is not None | ||
| await set_job_status(self.jobID, minor_status="Downloading InputSandbox", source=src) |
There was a problem hiding this comment.
FYI, in DIRAC, there is a list of the existing minor status: https://github.com/DIRACGrid/DIRAC/blob/88fb54782ecae01d2b58205d0720efef0ae8d8ac/src/DIRAC/WorkloadManagementSystem/Client/JobMinorStatus.py
For now, you can make an StrEnum in dirac-cwl out of it (that will be later moved to diracx).
I think you could potentially even add more minor status like: InputSandbox successfully downloaded for instance.
src/dirac_cwl/job/job_wrapper.py
Outdated
|
|
||
| async def initialize(self) -> None: | ||
| """Initialize the JobWrapper.""" | ||
| await set_job_status(self.jobID, "Running", "Job Initialization", source=src) |
There was a problem hiding this comment.
And you could name them uniformly like: Initializing job..., Downloading input sandbox..., Resolving input data..., etc.
src/dirac_cwl/job/job_wrapper.py
Outdated
|
|
||
| # Execute the task | ||
| logger.info("Executing Task: %s", command) | ||
| await set_job_status(self.jobID, minor_status="Application", source=src) |
2ff453b to
140b8bc
Compare
src/dirac_cwl/job/job_report.py
Outdated
| class JobStatus(StrEnum): | ||
| """List of all available job statuses.""" | ||
|
|
||
| SUBMITTING = "Submitting" | ||
| RECEIVED = "Received" | ||
| CHECKING = "Checking" | ||
| STAGING = "Staging" | ||
| WAITING = "Waiting" | ||
| MATCHED = "Matched" | ||
| RUNNING = "Running" | ||
| STALLED = "Stalled" | ||
| COMPLETING = "Completing" | ||
| DONE = "Done" | ||
| COMPLETED = "Completed" | ||
| FAILED = "Failed" | ||
| DELETED = "Deleted" | ||
| KILLED = "Killed" | ||
| RESCHEDULED = "Rescheduled" |
There was a problem hiding this comment.
Can't you reuse the job status from diracx-core?
| class JobStatus(StrEnum): | |
| """List of all available job statuses.""" | |
| SUBMITTING = "Submitting" | |
| RECEIVED = "Received" | |
| CHECKING = "Checking" | |
| STAGING = "Staging" | |
| WAITING = "Waiting" | |
| MATCHED = "Matched" | |
| RUNNING = "Running" | |
| STALLED = "Stalled" | |
| COMPLETING = "Completing" | |
| DONE = "Done" | |
| COMPLETED = "Completed" | |
| FAILED = "Failed" | |
| DELETED = "Deleted" | |
| KILLED = "Killed" | |
| RESCHEDULED = "Rescheduled" | |
| from diracx.core.models import JobStatus |
src/dirac_cwl/job/job_report.py
Outdated
| # add job status record | ||
| self.job_status_info.append((status, minor_status, application_status.strip(' "' + "'"), timestamp)) | ||
|
|
||
| def setApplicationStatus(self, appStatus): |
There was a problem hiding this comment.
To be (almost) consistent with setJobStatus?
| def setApplicationStatus(self, appStatus): | |
| def setApplicationStatus(self, application_status: str | None = None): |
There was a problem hiding this comment.
Not entirely sure if the best is str = "" or str | None = None, can you check?
src/dirac_cwl/job/job_report.py
Outdated
| self._client = client | ||
|
|
||
| def setJobStatus( | ||
| self, status: JobStatus | None = None, minor_status: JobMinorStatus | None = None, application_status: str = "" |
There was a problem hiding this comment.
Isn't it better like this? I am just surprised you can have status = None with application status = ""
Shouldn't JobStatus be mandatory?
| self, status: JobStatus | None = None, minor_status: JobMinorStatus | None = None, application_status: str = "" | |
| self, status: JobStatus | None = None, minor_status: JobMinorStatus | None = None, application_status: str | None = None |
src/dirac_cwl/job/job_report.py
Outdated
| if appStatus: | ||
| self.job_status_info.append(("", "", (appStatus.strip(' "' + "'"), timeStamp))) | ||
|
|
||
| def setJobParameter(self, par_name, par_value): |
There was a problem hiding this comment.
What about?
| def setJobParameter(self, par_name, par_value): | |
| def setJobParameter(self, key: str, value: str): |
src/dirac_cwl/job/job_report.py
Outdated
| :param par_name: name of the parameter | ||
| :param par_value: value of the parameter | ||
| """ | ||
| self.job_parameters.append((par_name, par_value)) |
There was a problem hiding this comment.
Any reason of having a list of tuples instead of a dictionary here? Because then I see you are building a dictionary from that when you send them
There was a problem hiding this comment.
I kept how it was in DIRAC. A dictionary sounds better indeed.
src/dirac_cwl/job/job_report.py
Outdated
| def dump(self): | ||
| """Print out the contents of the internal cached information.""" | ||
| print("Job status info:") | ||
| for status, minor, app, timeStamp in self.job_status_info: | ||
| if not status: | ||
| status = "" | ||
| if not minor: | ||
| minor = "" | ||
| print(status.ljust(20), minor.ljust(30), app.ljust(30), timeStamp) | ||
|
|
||
| print("Job parameters:") | ||
| for pname, pvalue in self.job_parameters: | ||
| print(pname.ljust(20), pvalue.ljust(30)) |
There was a problem hiding this comment.
I don't think we want that function here? At least not for now and not with print statements.
| def dump(self): | |
| """Print out the contents of the internal cached information.""" | |
| print("Job status info:") | |
| for status, minor, app, timeStamp in self.job_status_info: | |
| if not status: | |
| status = "" | |
| if not minor: | |
| minor = "" | |
| print(status.ljust(20), minor.ljust(30), app.ljust(30), timeStamp) | |
| print("Job parameters:") | |
| for pname, pvalue in self.job_parameters: | |
| print(pname.ljust(20), pvalue.ljust(30)) |
There was a problem hiding this comment.
I copied it from DIRAC implementation but I agree it should probably go.
src/dirac_cwl/job/job_wrapper.py
Outdated
| async def initialize(self) -> None: | ||
| """Initialize the JobWrapper.""" | ||
| self.job_report.setJobStatus(JobStatus.RUNNING, JobMinorStatus.JOB_INITIALIZATION) | ||
| await self.job_report.commit() |
There was a problem hiding this comment.
Let's say there is no initialize for now (I want to see if we will need it in the future). The job_report call is then made in __init__.
| async def initialize(self) -> None: | |
| """Initialize the JobWrapper.""" | |
| self.job_report.setJobStatus(JobStatus.RUNNING, JobMinorStatus.JOB_INITIALIZATION) | |
| await self.job_report.commit() | |
| self.job_report.setJobStatus(JobStatus.RUNNING, JobMinorStatus.JOB_INITIALIZATION) | |
| await self.job_report.commit() |
There was a problem hiding this comment.
We need an await to commit the job report, await needs to be in an async method and __init__ can't be one afaik. I guess we could just remove the problem entirely by not committing here. I need to check when it's needed.
There was a problem hiding this comment.
Ah ok I see. I think it's better to do the commit at the end of every public methods.
So I don't say anything wrong, that would be at the end of preprocess/execute/postprocess.
What do you think?
There was a problem hiding this comment.
I agree this should probably work out. Maybe we could also add a commit just after the JobMinorStatus.APPLICATION so the user can see the payload is running in case it takes a while.
| assert arguments.sandbox is not None | ||
| self.job_report.setJobStatus(minor_status=JobMinorStatus.DOWNLOADING_INPUT_SANDBOX) | ||
| if not self.execution_hooks_plugin: | ||
| self.job_report.setJobStatus(minor_status=JobMinorStatus.FAILED_DOWNLOADING_INPUT_SANDBOX) |
There was a problem hiding this comment.
Don't you need to commit here?
Unless you want to catch exceptions in the JobWrapperTemplate and then commit?
There was a problem hiding this comment.
I actually think it's fine not to commit here because Exceptions are caught in a try catch in run_job() and there is a commit there.
except Exception:
logger.exception("JobWrapper: Failed to execute workflow")
self.job_report.setJobStatus(JobStatus.FAILED)
await self.job_report.commit()
return FalseI might also just move this commit to the finally block, to not have one for each return of the method too.
src/dirac_cwl/job/job_wrapper.py
Outdated
| else: | ||
| self.job_report = JobReport(self.job_id, src, AsyncDiracClient()) | ||
|
|
||
| async def initialize(self) -> None: |
There was a problem hiding this comment.
Good thing to make everything async! (I think the whole code should be async to fit with the diracx code base)
| await self.execution_hooks_plugin.store_output(outputs) | ||
| self.job_report.setJobStatus(status=JobStatus.COMPLETING, minor_status=JobMinorStatus.OUTPUT_DATA_UPLOADED) | ||
| except RuntimeError as err: | ||
| self.job_report.setJobStatus(status=JobStatus.FAILED, minor_status=JobMinorStatus.UPLOADING_OUTPUT_DATA) |
4e793fe to
ae4abc4
Compare
a6ac83f to
c7b7188
Compare
| class JobMinorStatus(StrEnum): | ||
| """List of all available job minor statuses.""" | ||
|
|
||
| APPLICATION = "Executing Payload" | ||
| APP_ERRORS = "Application Finished With Errors" | ||
| # APP_NOT_FOUND = "Application not found" | ||
| APP_SUCCESS = "Application Finished Successfully" | ||
| # APP_THREAD_FAILED = "Application thread failed" | ||
| # APP_THREAD_NOT_COMPLETE = "Application thread did not complete" | ||
| DOWNLOADING_INPUT_SANDBOX = "Downloading InputSandbox" | ||
| # DOWNLOADING_INPUT_SANDBOX_LFN = "Downloading InputSandbox LFN(s)" | ||
| # EXCEPTION_DURING_EXEC = "Exception During Execution" | ||
| EXEC_COMPLETE = "Execution Complete" | ||
| FAILED_DOWNLOADING_INPUT_SANDBOX = "Failed Downloading InputSandbox" | ||
| # FAILED_DOWNLOADING_INPUT_SANDBOX_LFN = "Failed Downloading InputSandbox LFN(s)" | ||
| # FAILED_SENDING_REQUESTS = "Failed sending requests" | ||
| # GOING_RESCHEDULE = "Going to reschedule job" | ||
| # ILLEGAL_JOB_JDL = "Illegal Job JDL" | ||
| INPUT_DATA_RESOLUTION = "Resolving Input Data" | ||
| # INPUT_NOT_AVAILABLE = "Input Data Not Available" | ||
| # JOB_EXCEEDED_CPU = "Job has reached the CPU limit of the queue" | ||
| # JOB_EXCEEDED_WALL_CLOCK = "Job has exceeded maximum wall clock time" | ||
| JOB_INITIALIZATION = "Initializing Job" | ||
| # JOB_INSUFFICIENT_DISK = "Job has insufficient disk space to continue" | ||
| # JOB_WRAPPER_EXECUTION = "JobWrapper execution" | ||
| # JOB_WRAPPER_INITIALIZATION = "Job Wrapper Initialization" | ||
| # MARKED_FOR_TERMINATION = "Marked for termination" | ||
| # NO_CANDIDATE_SITE_FOUND = "No candidate sites available" | ||
| OUTPUT_DATA_UPLOADED = "Output Data Uploaded" | ||
| OUTPUT_SANDBOX_UPLOADED = "Output Sandbox Uploaded" | ||
| # PENDING_REQUESTS = "Pending Requests" | ||
| # PILOT_AGENT_SUBMISSION = "Pilot Agent Submission" | ||
| # RECEIVED_KILL_SIGNAL = "Received Kill signal" | ||
| # REQUESTS_DONE = "Requests done" | ||
| # RESCHEDULED = "Job Rescheduled" | ||
| RESOLVING_OUTPUT_SANDBOX = "Resolving Output Sandbox" | ||
| # STALLED_PILOT_NOT_RUNNING = "Job stalled: pilot not running" | ||
| # UPLOADING_JOB_OUTPUTS = "Uploading Outputs" | ||
| UPLOADING_OUTPUT_DATA = "Uploading Output Data" | ||
| UPLOADING_OUTPUT_SANDBOX = "Uploading Output Sandbox" | ||
| # WATCHDOG_STALLED = "Watchdog identified this job as stalled" |
There was a problem hiding this comment.
I commented all minor statuses unused in the job wrapper when working on it. I'm wondering if I should restore all these minor statuses, keep them commented, or delete them entirely from the enum.
Reports the status of the job to DiracX using
diracx-api.closes #83