Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):
# Note that the input file is hardcoded and Blast-specific
exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ",
user="wfcommons", stdout=True, stderr=True)
# print(output.decode())
# Check sanity
assert (exit_code == 0)
# this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files",
Expand Down
86 changes: 64 additions & 22 deletions wfcommons/wfbench/translator/cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ class CWLTranslator(Translator):

:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
:type workflow: Union[Workflow, pathlib.Path],
:param generate_stdout_files: If true, each step will generate a .out file with stdout from the step's execution
:type generate_stdout_files: Optional[bool]
:param generate_stderr_files: If true, each step will generate a .err file with stderr from the step's execution
:type generate_stderr_files: Optional[bool]
:param logger: The logger where to log information/warning or errors (optional).
:type logger: Logger
"""
def __init__(self,
workflow: Union[Workflow, pathlib.Path],
generate_stdout_files: Optional[bool] = True,
generate_stderr_files: Optional[bool] = True,
logger: Optional[logging.Logger] = None) -> None:
super().__init__(workflow, logger)
self.cwl_script = ["cwlVersion: v1.0",
Expand All @@ -41,6 +47,10 @@ def __init__(self,
" MultipleInputFeatureRequirement: {}",
" StepInputExpressionRequirement: {}",
" InlineJavascriptRequirement: {}\n"]

self.generate_stdout_files : bool = (generate_stdout_files is None) or generate_stdout_files
self.generate_stderr_files : bool = (generate_stderr_files is None) or generate_stderr_files

self.yml_script = []
self.parsed_tasks = []
self.task_level_map = defaultdict(lambda: [])
Expand Down Expand Up @@ -147,13 +157,23 @@ def _parse_steps(self) -> None:
" step_name:",
f" valueFrom: \"{task.task_id}\"",
f" output_filenames: {{default: {output_files}}}",
" out: [out, err, output_files]\n"
" out: [" # Completed below
]
# Add stdout file?
if self.generate_stdout_files:
code[-1] += "out, "
# Add stderr file?
if self.generate_stderr_files:
code[-1] += "err, "
# Always add output files
code[-1] += "output_files]\n"

self.cwl_script.extend(code)
output_files_sources.append(f" - {task.task_id}/output_files")
log_files_sources.append(f" - {task.task_id}/out")
log_files_sources.append(f" - {task.task_id}/err")
if self.generate_stdout_files:
log_files_sources.append(f" - {task.task_id}/out")
if self.generate_stderr_files:
log_files_sources.append(f" - {task.task_id}/err")

code = [
" compile_output_files:",
Expand All @@ -172,22 +192,24 @@ def _parse_steps(self) -> None:
" out: [out]\n"
]

code += [
" compile_log_files:",
" run: clt/folder.cwl",
" in:",
" - id: name",
" valueFrom: \"logs\"",
" - id: item",
" linkMerge: merge_flattened",
" source:",
]

code += log_files_sources

code += [
" out: [out]\n"
]
# Only deal with log files if there are any
if len(log_files_sources) > 0:
code += [
" compile_log_files:",
" run: clt/folder.cwl",
" in:",
" - id: name",
" valueFrom: \"logs\"",
" - id: item",
" linkMerge: merge_flattened",
" source:",
]

code += log_files_sources

code += [
" out: [out]\n"
]

self.cwl_script.extend(code)

Expand All @@ -214,10 +236,15 @@ def _parse_inputs_outputs(self) -> None:
code = ["\noutputs:",
" data_folder:",
" type: Directory",
" outputSource: compile_output_files/out",
" outputSource: compile_output_files/out"]

if self.generate_stdout_files or self.generate_stderr_files:
code += [
" log_folder:",
" type: Directory",
" outputSource: compile_log_files/out\n"]
" outputSource: compile_log_files/out"]

code[-1] += "\n"

self.cwl_script.extend(code)

Expand All @@ -227,7 +254,22 @@ def _write_cwl_files(self, output_folder: pathlib.Path) -> None:
clt_folder = cwl_folder.joinpath("clt")
clt_folder.mkdir(exist_ok=True)
shutil.copy(this_dir.joinpath("templates/cwl/folder.cwl"), clt_folder)
shutil.copy(this_dir.joinpath("templates/cwl/shell.cwl"), clt_folder)

# Create the shell.cwl file
# shutil.copy(this_dir.joinpath("templates/cwl/shell.cwl"), clt_folder)
updates_shell_cwl = ""
with open(this_dir.joinpath("templates/cwl/shell.cwl"), "r", encoding="utf-8") as f:
for line in f.readlines():
if line.endswith("#OPTIONAL_STDOUT_FILE\n"):
if self.generate_stdout_files:
updates_shell_cwl += line.replace("#OPTIONAL_STDOUT_FILE", "")
elif line.endswith("#OPTIONAL_STDERR_FILE\n"):
if self.generate_stderr_files:
updates_shell_cwl += line.replace("#OPTIONAL_STDERR_FILE", "")
else:
updates_shell_cwl += line
with open(cwl_folder.joinpath(clt_folder / "shell.cwl"), "w", encoding="utf-8") as f:
f.write(updates_shell_cwl)

with open(cwl_folder.joinpath("main.cwl"), "w", encoding="utf-8") as f:
f.write("\n".join(self.cwl_script))
Expand Down
12 changes: 11 additions & 1 deletion wfcommons/wfbench/translator/streamflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@ class StreamflowTranslator(Translator):

:param workflow: Workflow benchmark object or path to the workflow benchmark JSON instance.
:type workflow: Union[Workflow, pathlib.Path],
:param generate_stdout_files: If true, each CWL step will generate a .out file with stdout from the step's execution
:type generate_stdout_files: Optional[bool]
:param generate_stderr_files: If true, each CWL step will generate a .err file with stderr from the step's execution
:type generate_stderr_files: Optional[bool]
:param logger: The logger where to log information/warning or errors (optional).
:type logger: Logger
"""
def __init__(self,
workflow: Union[Workflow, pathlib.Path],
generate_stdout_files: Optional[bool] = True,
generate_stderr_files: Optional[bool] = True,
logger: Optional[logging.Logger] = None) -> None:
super().__init__(workflow, logger)
self.generate_stdout_files : bool = (generate_stdout_files is None) or generate_stdout_files
self.generate_stderr_files : bool = (generate_stderr_files is None) or generate_stderr_files

def translate(self, output_folder: pathlib.Path) -> None:
"""
Expand All @@ -46,7 +54,9 @@ def translate(self, output_folder: pathlib.Path) -> None:
"""
# Perform the CWL translation (which will create the output folder)
from wfcommons.wfbench import CWLTranslator
cwl_translator = CWLTranslator(workflow=self.workflow, logger=self.logger)
cwl_translator = CWLTranslator(workflow=self.workflow, logger=self.logger,
generate_stdout_files=self.generate_stdout_files,
generate_stderr_files=self.generate_stderr_files)
cwl_translator.translate(output_folder)

# Generate the streamflow.yml file
Expand Down
18 changes: 10 additions & 8 deletions wfcommons/wfbench/translator/templates/cwl/shell.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ arguments:
}
}
cmd = cmd + " > " + runtime.outdir + "/" + inputs.step_name + ".out 2> " + runtime.outdir + "/" + inputs.step_name + ".err";
cmd = cmd + " ; echo '-- end of stdout for " + inputs.step_name + " --' >> " + runtime.outdir + "/" + inputs.step_name + ".out"; #OPTIONAL_STDOUT_FILE
cmd = cmd + " ; echo '-- end of stderr for " + inputs.step_name + " --' >> " + runtime.outdir + "/" + inputs.step_name + ".err"; #OPTIONAL_STDERR_FILE
return cmd;
}
shellQuote: false
Expand All @@ -30,14 +32,14 @@ inputs:
type: string

outputs:
out:
type: File
outputBinding:
glob: $(inputs.step_name + ".out")
err:
type: File
outputBinding:
glob: $(inputs.step_name + ".err")
out: #OPTIONAL_STDOUT_FILE
type: File #OPTIONAL_STDOUT_FILE
outputBinding: #OPTIONAL_STDOUT_FILE
glob: $(inputs.step_name + ".out") #OPTIONAL_STDOUT_FILE
err: #OPTIONAL_STDERR_FILE
type: File #OPTIONAL_STDERR_FILE
outputBinding: #OPTIONAL_STDERR_FILE
glob: $(inputs.step_name + ".err") #OPTIONAL_STDERR_FILE
output_files:
type: File[]
outputBinding:
Expand Down