Macros for SAS Application Developers
https://github.com/sasjs/core
Loading...
Searching...
No Matches
mv_jobwaitfor.sas
Go to the documentation of this file.
1/**
2 @file
3 @brief Takes a table of running jobs and waits for ANY/ALL of them to complete
4 @details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
5 jobs are completed. Completion is determined by reference to the returned
6 _state_, as per the following table:
7
8 | state | Wait? | Notes|
9 |-----------|-------|------|
10 | idle | yes | We assume processing will continue. Beware of idle sessions with no code submitted! |
11 | pending | yes | Job is preparing to run |
12 | running | yes | Job is running|
13 | canceled | no | Job was cancelled|
14 | completed | no | Job finished - does not mean it was successful. Check stateDetails|
15 | failed | no | Job failed to execute, could be a problem when calling the apis|
16
17
18 ## Example
19
20 First, compile the macros:
21
22 filename mc url
23 "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
24 %inc mc;
25
26 Next, create a job (in this case, as a web service):
27
28 filename ft15f001 temp;
29 parmcards4;
30 data ;
31 rand=ranuni(0)*1000000;
32 do x=1 to rand;
33 y=rand*x;
34 output;
35 end;
36 run;
37 ;;;;
38 %mv_createwebservice(path=/Public/temp,name=demo)
39
40 Then, execute the job,multiple times, and wait for them all to finish:
41
42 %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds1)
43 %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds2)
44 %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds3)
45 %mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds4)
46
47 data work.jobs;
48 set work.ds1 work.ds2 work.ds3 work.ds4;
49 where method='GET' and rel='state';
50 run;
51
52 %mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
53
54 Delete the job:
55
56 %mv_deletejes(path=/Public/temp,name=demo)
57
58 @param [in] access_token_var= The global macro variable to contain the access
59 token
60 @param [in] grant_type= valid values:
61
62 - password
63 - authorization_code
64 - detect - will check if access_token exists, if not will use sas_services
65 if a SASStudioV session else authorization_code. Default option.
66 - sas_services - will use oauth_bearer=sas_services
67
68 @param [in] action=Either ALL (to wait for every job) or ANY (if one job
69 completes, processing will continue). Default=ALL.
70 @param [in] inds= The input dataset containing the list of job uris, in the
71 following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
72 job name. The uri should be in a `uri` variable, and the job path/name
73 should be in a `_program` variable.
74 @param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete
75 succcessfully
76 @param [in] mdebug= set to 1 to enable DEBUG messages
77 @param [out] outds= The output dataset containing the list of states by job
78 (default=work.mv_jobexecute)
79 @param [out] outref= A fileref to which the spawned job logs should be
80 appended.
81
82 @version VIYA V.03.04
83 @author Allan Bowe, source: https://github.com/sasjs/core
84
85 <h4> Dependencies </h4>
86 @li mp_abort.sas
87 @li mf_getplatform.sas
88 @li mf_getuniquefileref.sas
89 @li mf_getuniquelibref.sas
90 @li mf_existvar.sas
91 @li mf_nobs.sas
92 @li mv_getjoblog.sas
93
94**/
95
96%macro mv_jobwaitfor(action
97 ,access_token_var=ACCESS_TOKEN
98 ,grant_type=sas_services
99 ,inds=0
100 ,outds=work.mv_jobwaitfor
101 ,outref=0
102 ,raise_err=0
103 ,mdebug=0
104 );
105%local dbg;
106%if &mdebug=1 %then %do;
107 %put &sysmacroname entry vars:;
108 %put _local_;
109%end;
110%else %let dbg=*;
111
112%local oauth_bearer;
113%if &grant_type=detect %then %do;
114 %if %symexist(&access_token_var) %then %let grant_type=authorization_code;
115 %else %let grant_type=sas_services;
116%end;
117%if &grant_type=sas_services %then %do;
118 %let oauth_bearer=oauth_bearer=sas_services;
119 %let &access_token_var=;
120%end;
121
122%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
123 and &grant_type ne sas_services
124 )
125 ,mac=&sysmacroname
126 ,msg=%str(Invalid value for grant_type: &grant_type)
127)
128
129%mp_abort(iftrue=("&inds"="0")
130 ,mac=&sysmacroname
131 ,msg=%str(input dataset not provided)
132)
133%mp_abort(iftrue=(%mf_existvar(&inds,uri)=0)
134 ,mac=&sysmacroname
135 ,msg=%str(The URI variable was not found in the input dataset(&inds))
136)
137%mp_abort(iftrue=(%mf_existvar(&inds,_program)=0)
138 ,mac=&sysmacroname
139 ,msg=%str(The _PROGRAM variable was not found in the input dataset(&inds))
140)
141
142%if %mf_nobs(&inds)=0 %then %do;
143 %put NOTE: Zero observations in &inds, &sysmacroname will now exit;
144 %return;
145%end;
146
147options noquotelenmax;
148%local base_uri; /* location of rest apis */
149%let base_uri=%mf_getplatform(VIYARESTAPI);
150
151data _null_;
152 length jobparams $32767;
153 set &inds end=last;
154 call symputx(cats('joburi',_n_),substr(uri,1,55),'l');
155 call symputx(cats('jobname',_n_),_program,'l');
156 call symputx(cats('jobparams',_n_),jobparams,'l');
157 if last then call symputx('uricnt',_n_,'l');
158run;
159
160%local runcnt;
161%if &action=ALL %then %let runcnt=&uricnt;
162%else %if &action=ANY %then %let runcnt=1;
163%else %let runcnt=&uricnt;
164
165%local fname0 ;
166%let fname0=%mf_getuniquefileref();
167
168data &outds;
169 format _program uri $128. state $32. stateDetails $32. timestamp datetime19.
170 jobparams $32767.;
171 call missing (of _all_);
172 stop;
173run;
174
175%local i;
176%do i=1 %to &uricnt;
177 %if "&&joburi&i" ne "0" %then %do;
178 proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i";
179 headers "Accept"="application/json"
180 %if &grant_type=authorization_code %then %do;
181 "Authorization"="Bearer &&&access_token_var"
182 %end; ;
183 run;
184 %if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201
185 %then %do;
186 data _null_;infile &fname0;input;putlog _infile_;run;
187 %mp_abort(mac=&sysmacroname
188 ,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
189 )
190 %end;
191
192 %let status=notset;
193
194 %local libref1;
195 %let libref1=%mf_getuniquelibref();
196 libname &libref1 json fileref=&fname0;
197
198 data _null_;
199 length state stateDetails $32;
200 set &libref1..root;
201 call symputx('status',state,'l');
202 call symputx('stateDetails',stateDetails,'l');
203 run;
204
205 libname &libref1 clear;
206
207 %if &status=completed or &status=failed or &status=canceled %then %do;
208 %local plainuri;
209 %let plainuri=%substr(&&joburi&i,1,55);
210 proc sql;
211 insert into &outds set
212 _program="&&jobname&i",
213 uri="&plainuri",
214 state="&status",
215 stateDetails=symget("stateDetails"),
216 timestamp=datetime(),
217 jobparams=symget("jobparams&i");
218 %let joburi&i=0; /* do not re-check */
219 /* fetch log */
220 %if %str(&outref) ne 0 %then %do;
221 %mv_getjoblog(uri=&plainuri,outref=&outref,mdebug=&mdebug)
222 %end;
223 %end;
224 %else %if &status=idle or &status=pending or &status=running %then %do;
225 data _null_;
226 call sleep(1,1);
227 run;
228 %end;
229 %else %do;
230 %mp_abort(mac=&sysmacroname
231 ,msg=%str(status &status not expected!!)
232 )
233 %end;
234
235 %if (&raise_err) %then %do;
236 %if (&status = canceled or &status = failed or %length(&stateDetails)>0)
237 %then %do;
238 %if ("&stateDetails" = "%str(war)ning") %then %let SYSCC=4;
239 %else %let SYSCC=5;
240 %put %str(ERR)OR: Job &&jobname&i. did not complete. &stateDetails;
241 %return;
242 %end;
243 %end;
244
245 %end;
246 %if &i=&uricnt %then %do;
247 %local goback;
248 %let goback=0;
249 proc sql noprint;
250 select count(*) into:goback from &outds;
251 %if &goback lt &runcnt %then %let i=0;
252 %end;
253%end;
254
255%if &mdebug=1 %then %do;
256 %put &sysmacroname exit vars:;
257 %put _local_;
258%end;
259%else %do;
260 /* clear refs */
261 filename &fname0 clear;
262%end;
263%mend mv_jobwaitfor;