View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.openr66.thrift;
21  
22  import org.apache.thrift.TException;
23  import org.waarp.common.command.exception.CommandAbstractException;
24  import org.waarp.common.database.data.AbstractDbData;
25  import org.waarp.common.database.exception.WaarpDatabaseException;
26  import org.waarp.common.logging.SysErrLogger;
27  import org.waarp.common.logging.WaarpLogger;
28  import org.waarp.common.logging.WaarpLoggerFactory;
29  import org.waarp.openr66.client.AbstractTransfer;
30  import org.waarp.openr66.commander.ClientRunner;
31  import org.waarp.openr66.context.R66Session;
32  import org.waarp.openr66.context.filesystem.R66File;
33  import org.waarp.openr66.database.data.DbRule;
34  import org.waarp.openr66.database.data.DbTaskRunner;
35  import org.waarp.openr66.protocol.configuration.Configuration;
36  import org.waarp.openr66.protocol.configuration.PartnerConfiguration;
37  import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
38  import org.waarp.openr66.protocol.localhandler.LocalServerHandler;
39  import org.waarp.openr66.protocol.localhandler.packet.ErrorPacket;
40  import org.waarp.openr66.protocol.localhandler.packet.RequestPacket;
41  import org.waarp.openr66.protocol.utils.TransferUtils;
42  import org.waarp.thrift.r66.Action;
43  import org.waarp.thrift.r66.ErrorCode;
44  import org.waarp.thrift.r66.R66Request;
45  import org.waarp.thrift.r66.R66Result;
46  import org.waarp.thrift.r66.R66Service;
47  import org.waarp.thrift.r66.RequestMode;
48  
49  import java.sql.Timestamp;
50  import java.text.ParseException;
51  import java.text.SimpleDateFormat;
52  import java.util.ArrayList;
53  import java.util.Date;
54  import java.util.List;
55  
56  import static org.waarp.common.database.DbConstant.*;
57  
58  /**
59   * Embedded service attached with the Thrift service
60   */
61  public class R66EmbeddedServiceImpl implements R66Service.Iface {
62    /**
63     * Internal Logger
64     */
65    private static final WaarpLogger logger =
66        WaarpLoggerFactory.getLogger(R66EmbeddedServiceImpl.class);
67  
68    private DbTaskRunner initRequest(final R66Request request) {
69      Timestamp ttimestart = null;
70      if (request.isSetStart()) {
71        final Date date;
72        try {
73          final SimpleDateFormat dateFormat =
74              new SimpleDateFormat(AbstractTransfer.TIMESTAMP_FORMAT);
75          date = dateFormat.parse(request.getStart());
76          ttimestart = new Timestamp(date.getTime());
77        } catch (final ParseException ignored) {
78          // nothing
79        }
80      } else if (request.isSetDelay()) {
81        if (request.getDelay().charAt(0) == '+') {
82          ttimestart = new Timestamp(System.currentTimeMillis() + Long.parseLong(
83              request.getDelay().substring(1)));
84        } else {
85          ttimestart = new Timestamp(Long.parseLong(request.getDelay()));
86        }
87      }
88      final DbRule rule;
89      try {
90        rule = new DbRule(request.getRule());
91      } catch (final WaarpDatabaseException e) {
92        logger.warn("Cannot get Rule: " + request.getRule() + " : {}",
93                    e.getMessage());
94        return null;
95      }
96      int mode = rule.getMode();
97      if (request.isMd5()) {
98        mode = RequestPacket.getModeMD5(mode);
99      }
100     final DbTaskRunner taskRunner;
101     long tid = ILLEGALVALUE;
102     if (request.isSetTid()) {
103       tid = request.getTid();
104     }
105     if (tid != ILLEGALVALUE) {
106       try {
107         taskRunner = new DbTaskRunner(tid, request.getDestuid());
108         // requested
109         taskRunner.setSenderByRequestToValidate(true);
110       } catch (final WaarpDatabaseException e) {
111         logger.warn("Cannot get task" + " : {}", e.getMessage());
112         return null;
113       }
114     } else {
115       final String sep =
116           PartnerConfiguration.getSeparator(request.getDestuid());
117       final RequestPacket requestPacket =
118           new RequestPacket(request.getRule(), mode, request.getFile(),
119                             request.getBlocksize(), 0, tid, request.getInfo(),
120                             -1, sep);
121       // Not isRecv since it is the requester, so send => isRetrieve is true
122       final boolean isRetrieve =
123           !RequestPacket.isRecvMode(requestPacket.getMode());
124       try {
125         taskRunner = new DbTaskRunner(rule, isRetrieve, requestPacket,
126                                       request.getDestuid(), ttimestart);
127       } catch (final WaarpDatabaseException e) {
128         logger.warn("Cannot get task" + " : {}", e.getMessage());
129         return null;
130       }
131     }
132     return taskRunner;
133   }
134 
135   @Override
136   public R66Result transferRequestQuery(final R66Request request)
137       throws TException {
138     final DbTaskRunner runner = initRequest(request);
139     if (runner != null) {
140       runner.changeUpdatedInfo(AbstractDbData.UpdatedInfo.TOSUBMIT);
141       final boolean isSender = runner.isSender();
142       if (!runner.forceSaveStatus()) {
143         logger.warn("Cannot prepare task");
144         return new R66Result(request.getMode(), ErrorCode.CommandNotFound,
145                              "ERROR: Cannot prepare transfer");
146       }
147       final R66Result result =
148           new R66Result(request.getMode(), ErrorCode.InitOk,
149                         "Transfer Scheduled");
150       if (request.getMode() == RequestMode.SYNCTRANSFER) {
151         // now need to wait but first, reload the runner
152         try {
153           runner.select();
154           while (!runner.isFinished()) {
155             try {
156               Thread.sleep(1000);
157               runner.select();
158             } catch (final InterruptedException e) {//NOSONAR
159               SysErrLogger.FAKE_LOGGER.ignoreLog(e);
160               break;
161             }
162           }
163           runner.setSender(isSender);
164         } catch (final WaarpDatabaseException ignored) {
165           // nothing
166         }
167         setResultFromRunner(runner, result);
168         if (runner.isAllDone()) {
169           result.setCode(ErrorCode.CompleteOk);
170           result.setResultinfo("Transfer Done");
171         } else {
172           result.setCode(ErrorCode.valueOf(runner.getErrorInfo().name()));
173           result.setResultinfo(runner.getErrorInfo().getMesg());
174         }
175       } else {
176         try {
177           runner.select();
178         } catch (final WaarpDatabaseException ignored) {
179           // nothing
180         }
181         runner.setSender(isSender);
182         setResultFromRunner(runner, result);
183       }
184       return result;
185     } else {
186       logger.warn("ERROR: Transfer NOT scheduled");
187       return new R66Result(request.getMode(), ErrorCode.Internal,
188                            "ERROR: Transfer NOT scheduled");
189     }
190   }
191 
192   private void setResultFromRunner(final DbTaskRunner runner,
193                                    final R66Result result) {
194     result.setDestuid(runner.getRequested());
195     result.setFromuid(runner.getRequester());
196     result.setTid(runner.getSpecialId());
197     result.setRule(runner.getRuleId());
198     result.setBlocksize(runner.getBlocksize());
199     result.setFile(runner.getFilename());
200     result.setOriginalfilename(runner.getOriginalFilename());
201     result.setIsmoved(runner.isFileMoved());
202     result.setModetransfer(runner.getMode());
203     result.setRetrievemode(runner.isSender());
204     result.setStep(runner.getStep());
205     result.setGloballaststep(runner.getGloballaststep());
206     result.setRank(runner.getRank());
207     result.setStart(runner.getStart().toString());
208     result.setStop(runner.getStop().toString());
209     result.setResultinfo(runner.getFileInformation());
210   }
211 
212   private void setResultFromLCR(final LocalChannelReference lcr,
213                                 final R66Result result) {
214     final R66Session session = lcr.getSession();
215     DbTaskRunner runner = null;
216     if (session != null) {
217       runner = session.getRunner();
218     } else {
219       final ClientRunner run = lcr.getClientRunner();
220       if (run != null) {
221         runner = run.getTaskRunner();
222       }
223     }
224     if (runner != null) {
225       setResultFromRunner(runner, result);
226     }
227   }
228 
229   private int stopOrCancelRunner(final long id, final String reqd,
230                                  final String reqr,
231                                  final org.waarp.openr66.context.ErrorCode code) {
232     try {
233       final DbTaskRunner taskRunner =
234           new DbTaskRunner(null, null, id, reqr, reqd);
235       return taskRunner.stopOrCancelRunner(code)? 1 : 0;
236     } catch (final WaarpDatabaseException ignored) {
237       // nothing
238     }
239     logger.warn(
240         "Cannot accomplished action on task: " + id + ' ' + code.name());
241     return -1;
242   }
243 
244   private R66Result stopOrCancel(final R66Request request,
245                                  final LocalChannelReference lcr,
246                                  final org.waarp.openr66.context.ErrorCode r66code) {
247     // stop the current transfer
248     final R66Result resulttest;
249     if (lcr != null) {
250       int rank = 0;
251       if (r66code == org.waarp.openr66.context.ErrorCode.StoppedTransfer &&
252           lcr.getSession() != null) {
253         final DbTaskRunner taskRunner = lcr.getSession().getRunner();
254         if (taskRunner != null) {
255           rank = taskRunner.getRank();
256         }
257       }
258       final ErrorPacket error =
259           new ErrorPacket(r66code.name() + ' ' + rank, r66code.getCode(),
260                           ErrorPacket.FORWARDCLOSECODE);
261       try {
262         // inform local instead of remote
263         LocalServerHandler.channelRead0(lcr, error);
264       } catch (final Exception ignored) {
265         // nothing
266       }
267       resulttest = new R66Result(request.getMode(), ErrorCode.CompleteOk,
268                                  r66code.name());
269       setResultFromLCR(lcr, resulttest);
270     } else {
271       // Transfer is not running
272       // but maybe need action on database
273       final int test =
274           stopOrCancelRunner(request.getTid(), request.getDestuid(),
275                              request.getFromuid(), r66code);
276       if (test > 0) {
277         resulttest = new R66Result(request.getMode(), ErrorCode.CompleteOk,
278                                    r66code.name());
279       } else if (test == 0) {
280         resulttest = new R66Result(request.getMode(), ErrorCode.TransferOk,
281                                    r66code.name());
282       } else {
283         resulttest = new R66Result(request.getMode(), ErrorCode.CommandNotFound,
284                                    "Error: cannot accomplished task on transfer");
285       }
286     }
287     return resulttest;
288   }
289 
290   private R66Result restart(final R66Request request,
291                             final LocalChannelReference lcr) {
292     // Try to validate a restarting transfer
293     // validLimit on requested side
294     if (Configuration.configuration.getConstraintLimitHandler()
295                                    .checkConstraints()) {
296       logger.warn(
297           "Limit exceeded {} while asking to relaunch a task " + request,
298           Configuration.configuration.getConstraintLimitHandler().lastAlert);
299       return new R66Result(request.getMode(), ErrorCode.ServerOverloaded,
300                            "Limit exceeded while asking to relaunch a task");
301     }
302     // Try to validate a restarting transfer
303     // header = ?; middle = requested+blank+requester+blank+specialId
304     final DbTaskRunner taskRunner;
305     try {
306       taskRunner =
307           new DbTaskRunner(null, null, request.getTid(), request.getFromuid(),
308                            request.getDestuid());
309       final org.waarp.openr66.context.R66Result resulttest =
310           TransferUtils.restartTransfer(taskRunner, lcr);
311       return new R66Result(request.getMode(),
312                            ErrorCode.valueOf(resulttest.getCode().name()),
313                            resulttest.getMessage());
314     } catch (final WaarpDatabaseException e1) {
315       logger.warn("Exception while trying to restart transfer" + " : {}",
316                   e1.getMessage());
317       return new R66Result(request.getMode(), ErrorCode.Internal,
318                            "Exception while trying to restart transfer");
319     }
320   }
321 
322   @Override
323   public R66Result infoTransferQuery(final R66Request request)
324       throws TException {
325     final RequestMode mode = request.getMode();
326     if (mode != RequestMode.INFOREQUEST) {
327       // error
328       logger.warn("Mode is uncompatible with infoTransferQuery");
329       return new R66Result(request.getMode(), ErrorCode.Unimplemented,
330                            "Mode is uncompatible with infoTransferQuery");
331     }
332     // now check if enough arguments are provided
333     if (!request.isSetTid() ||
334         !request.isSetDestuid() && !request.isSetFromuid() ||
335         !request.isSetAction()) {
336       // error
337       logger.warn("Not enough arguments");
338       return new R66Result(request.getMode(), ErrorCode.RemoteError,
339                            "Not enough arguments");
340     }
341     // requested+blank+requester+blank+specialId
342     final LocalChannelReference lcr =
343         Configuration.configuration.getLocalTransaction().getFromRequest(
344             request.getDestuid() + ' ' + request.getFromuid() + ' ' +
345             request.getTid());
346     final org.waarp.openr66.context.ErrorCode r66code;
347     switch (request.getAction()) {
348       case Detail: {
349         final R66Result result =
350             new R66Result(request.getMode(), ErrorCode.CompleteOk,
351                           "Existence test OK");
352         result.setAction(Action.Exist);
353         result.setDestuid(request.getDestuid());
354         result.setFromuid(request.getFromuid());
355         result.setTid(request.getTid());
356         if (lcr != null) {
357           setResultFromLCR(lcr, result);
358         } else {
359           try {
360             final DbTaskRunner runner =
361                 new DbTaskRunner(null, null, request.getTid(),
362                                  request.getFromuid(), request.getDestuid());
363             setResultFromRunner(runner, result);
364           } catch (final WaarpDatabaseException e) {
365             result.setCode(ErrorCode.FileNotFound);
366           }
367         }
368         return result;
369       }
370       case Restart:
371         return restart(request, lcr);
372       case Cancel:
373         r66code = org.waarp.openr66.context.ErrorCode.CanceledTransfer;
374         return stopOrCancel(request, lcr, r66code);
375       case Stop:
376         r66code = org.waarp.openr66.context.ErrorCode.StoppedTransfer;
377         return stopOrCancel(request, lcr, r66code);
378       default:
379         logger.warn("Uncompatible with " + request.getAction().name());
380         return new R66Result(request.getMode(), ErrorCode.Unimplemented,
381                              "Uncompatible with " + request.getAction().name());
382     }
383   }
384 
385   @Override
386   public final boolean isStillRunning(final String fromuid, final String touid,
387                                       final long tid) throws TException {
388     // now check if enough arguments are provided
389     if (fromuid == null || touid == null || tid == ILLEGALVALUE) {
390       // error
391       logger.warn("Not enough arguments");
392       return false;
393     }
394     // header = ?; middle = requested+blank+requester+blank+specialId
395     final LocalChannelReference lcr =
396         Configuration.configuration.getLocalTransaction().getFromRequest(
397             touid + ' ' + fromuid + ' ' + tid);
398     return lcr != null;
399   }
400 
401   @Override
402   public final List<String> infoListQuery(final R66Request request)
403       throws TException {
404     List<String> list = new ArrayList<String>();
405     final RequestMode mode = request.getMode();
406     if (mode != RequestMode.INFOFILE) {
407       // error
408       logger.warn("Not correct mode for infoListQuery");
409       list.add("Not correct mode for infoListQuery");
410       return list;
411     }
412     // now check if enough arguments are provided
413     if (!request.isSetRule() || !request.isSetAction()) {
414       // error
415       logger.warn("Not enough arguments");
416       list.add("Not enough arguments");
417       return list;
418     }
419     final R66Session session = new R66Session(false);
420     session.getAuth().specialNoSessionAuth(false,
421                                            Configuration.configuration.getHostId());
422     final DbRule rule;
423     try {
424       rule = new DbRule(request.getRule());
425     } catch (final WaarpDatabaseException e) {
426       logger.warn("Rule is unknown: " + request.getRule());
427       list.add("Rule is unknown: " + request.getRule());
428       return list;
429     }
430     try {
431       if (RequestPacket.isRecvMode(rule.getMode())) {
432         session.getDir().changeDirectory(rule.getWorkPath());
433       } else {
434         session.getDir().changeDirectory(rule.getSendPath());
435       }
436 
437       if (request.getAction() == Action.List ||
438           request.getAction() == Action.Mlsx) {
439         // ls or mls from current directory
440         if (request.getAction() == Action.List) {
441           list = session.getDir().list(request.getFile());
442         } else {
443           list = session.getDir().listFull(request.getFile(), false);
444         }
445       } else {
446         // ls pr mls from current directory and filename
447         if (!request.isSetFile()) {
448           logger.warn("File missing");
449           list.add("File missing");
450           return list;
451         }
452         final R66File file =
453             (R66File) session.getDir().setFile(request.getFile(), false);
454         String sresult;
455         if (request.getAction() == Action.Exist) {
456           sresult = String.valueOf(file.exists());
457           list.add(sresult);
458         } else if (request.getAction() == Action.Detail) {
459           sresult = session.getDir().fileFull(request.getFile(), false);
460           final String[] slist = sresult.split("\n");
461           sresult = slist[1];
462           list.add(sresult);
463         }
464       }
465       return list;
466     } catch (final CommandAbstractException e) {
467       logger.warn("Error occurs during: " + request + " : {}", e.getMessage());
468       list.add("Error occurs during: " + request);
469       return list;
470     }
471   }
472 
473 }