View Javadoc
1   /*
2    * This file is part of Waarp Project (named also Waarp or GG).
3    *
4    *  Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
5    *  tags. See the COPYRIGHT.txt in the distribution for a full listing of
6    * individual contributors.
7    *
8    *  All Waarp Project is free software: you can redistribute it and/or
9    * modify it under the terms of the GNU General Public License as published by
10   * the Free Software Foundation, either version 3 of the License, or (at your
11   * option) any later version.
12   *
13   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
14   * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
15   * A PARTICULAR PURPOSE. See the GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License along with
18   * Waarp . If not, see <http://www.gnu.org/licenses/>.
19   */
20  package org.waarp.ftp.core.data.handler;
21  
22  import io.netty.buffer.ByteBuf;
23  import io.netty.buffer.ByteBufAllocator;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.handler.codec.ByteToMessageCodec;
26  import org.waarp.common.exception.InvalidArgumentException;
27  import org.waarp.common.file.DataBlock;
28  import org.waarp.common.future.WaarpFuture;
29  import org.waarp.common.logging.SysErrLogger;
30  import org.waarp.common.utility.WaarpNettyUtil;
31  import org.waarp.compress.zlib.ZlibCodec;
32  import org.waarp.ftp.core.command.FtpArgumentCode.TransferMode;
33  import org.waarp.ftp.core.command.FtpArgumentCode.TransferStructure;
34  import org.waarp.ftp.core.config.FtpInternalConfiguration;
35  import org.waarp.ftp.core.data.handler.FtpSeekAheadData.SeekAheadNoBackArrayException;
36  
37  import java.io.IOException;
38  import java.util.List;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  
41  /**
42   * First CODEC :<br>
43   * - encode : takes a {@link DataBlock} and transforms it to a ByteBuf<br>
44   * - decode : takes a ByteBuf and transforms it to a {@link DataBlock}<br>
45   * STREAM and BLOCK mode are implemented. COMPRESSED mode is not implemented.
46   */
47  public class FtpDataModeCodec extends ByteToMessageCodec<DataBlock> {
48    /*
49     * 3.4.1. STREAM MODE The data is transmitted as a stream of bytes. There is no restriction on the
50     * representation type used; record structures are allowed. In a record structured file EOR and EOF will each
51     * be indicated by a two-byte control code. The first byte of the control code will be all ones, the escape
52     * character. The second byte will have the low order bit on and zeros elsewhere for EOR and the second low
53     * order bit on for EOF; that is, the byte will have value 1 for EOR and value 2 for EOF. EOR and EOF may be
54     * indicated together on the last byte transmitted by turning both low order bits on (i.e., the value 3). If a
55     * byte of all ones was intended to be sent as data, it should be repeated in the second byte of the control
56     * code. If the structure is a file structure, the EOF is indicated by the sending host closing the data
57     * connection and all bytes are data bytes. 3.4.2. BLOCK MODE The file is transmitted as a series of data
58     * blocks preceded by one or more header bytes. The header bytes contain a count field, and descriptor code.
59     * The count field indicates the total length of the data block in bytes, thus marking the beginning of the
60     * next data block (there are no filler bits). The descriptor code defines: last block in the file (EOF) last
61     * block in the record (EOR), restart marker (see the Section on Error Recovery and Restart) or suspect data
62     * (i.e., the data being transferred is suspected of errors and is not reliable). This last code is NOT
63     * intended for error control within FTP. It is motivated by the desire of sites exchanging certain types of
64     * data (e.g., seismic or weather data) to send and receive all the data despite local errors (such as
65     * "magnetic tape read errors"), but to indicate in the transmission that certain portions are suspect).
66     * Record structures are allowed in this mode, and any representation type may be used. The header consists of
67     * the three bytes. Of the 24 bits of header information, the 16 low order bits shall represent byte count,
68     * and the 8 high order bits shall represent descriptor codes as shown below. Block Header
69     * +----------------+----------------+----------------+ | Descriptor | Byte Count | | 8 bits | 16 bits |
70     * +----------------+----------------+----------------+ The descriptor codes are indicated by bit flags in the
71     * descriptor byte. Four codes have been assigned, where each code number is the decimal value of the
72     * corresponding bit in the byte. Code Meaning 128 End of data block is EOR 64 End of data block is EOF 32
73     * Suspected errors in data block 16 Data block is a restart marker With this encoding, more than one
74     * descriptor coded condition may exist for a particular block. As many bits as necessary may be flagged. The
75     * restart marker is embedded in the data stream as an integral number of 8-bit bytes representing printable
76     * characters in the language being used over the control connection (e.g., default--NVT-ASCII). <SP> (Space,
77     * in the appropriate language) must not be used WITHIN a restart marker. For example, to transmit a
78     * six-character marker, the following would be sent: +--------+--------+--------+ |Descrptr| Byte count |
79     * |code= 16| = 6 | +--------+--------+--------+ +--------+--------+--------+ | Marker | Marker | Marker | | 8
80     * bits | 8 bits | 8 bits | +--------+--------+--------+ +--------+--------+--------+ | Marker | Marker |
81     * Marker | | 8 bits | 8 bits | 8 bits | +--------+--------+--------+
82     */
83    /**
84     * Transfer Mode
85     */
86    private TransferMode mode;
87  
88    /**
89     * Structure Mode
90     */
91    private TransferStructure structure;
92  
93    /**
94     * Ftp Data Block
95     */
96    private DataBlock dataBlock;
97  
98    /**
99     * Last byte for STREAM+RECORD
100    */
101   private int lastbyte;
102 
103   /**
104    * if in Z mode
105    */
106   private ZlibCodec zlibCodec = new ZlibCodec();
107 
108   /**
109    * Is the underlying DataNetworkHandler ready to receive block
110    */
111   private final AtomicBoolean isReady = new AtomicBoolean(false);
112 
113   /**
114    * Blocking step between DataNetworkHandler and this Codec in order to wait
115    * that the DataNetworkHandler is
116    * ready
117    */
118   private final WaarpFuture codecLocked = new WaarpFuture();
119 
120   /**
121    * @param mode
122    * @param structure
123    */
124   public FtpDataModeCodec(final TransferMode mode,
125                           final TransferStructure structure) {
126     this.mode = mode;
127     this.structure = structure;
128   }
129 
130   /**
131    * Inform the Codec that DataNetworkHandler is ready (called from
132    * DataNetworkHandler after setCorrectCodec).
133    */
134   public final void setCodecReady() {
135     codecLocked.setSuccess();
136   }
137 
138   protected final DataBlock decodeRecordStandard(final ByteBuf buf,
139                                                  final int length) {
140     final ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length, length);
141     if (lastbyte == 0xFF) {
142       if (readByteForDataBlock(buf, newbuf)) {
143         lastbyte = 0;
144       }
145     }
146     try {
147       while (buf.readableBytes() > 0) {
148         lastbyte = buf.readUnsignedByte();
149         if (lastbyte == 0xFF) {
150           readByteForDataBlock(buf, newbuf);
151         } else {
152           newbuf.writeByte((byte) (lastbyte & 0xFF));
153         }
154         lastbyte = 0;
155       }
156     } catch (final IndexOutOfBoundsException e) {
157       // End of read
158     }
159     dataBlock.setBlock(newbuf);
160     return dataBlock;
161   }
162 
163   private boolean readByteForDataBlock(final ByteBuf buf,
164                                        final ByteBuf newbuf) {
165     final int nextbyte = buf.readUnsignedByte();
166     if (nextbyte == 0xFF) {
167       newbuf.writeByte((byte) (lastbyte & 0xFF));
168       return false;
169     } else {
170       if (nextbyte == 1) {
171         dataBlock.setEOR(true);
172       } else if (nextbyte == 2) {
173         dataBlock.setEOF(true);
174       } else if (nextbyte == 3) {
175         dataBlock.setEOR(true);
176         dataBlock.setEOF(true);
177       }
178       return true;
179     }
180   }
181 
182   protected final DataBlock decodeRecord(final ByteBuf buf, final int length) {
183     final FtpSeekAheadData sad;
184     try {
185       sad = new FtpSeekAheadData(buf);
186     } catch (final SeekAheadNoBackArrayException e1) {
187       return decodeRecordStandard(buf, length);
188     }
189     final ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length, length);
190     if (lastbyte == 0xFF) {
191       if (readBytesFromSad(sad, newbuf)) {
192         lastbyte = 0;
193       }
194     }
195     try {
196       while (sad.pos < sad.limit) {
197         lastbyte = sad.bytes[sad.pos++] & 0xFF;
198         if (lastbyte == 0xFF) {
199           readBytesFromSad(sad, newbuf);
200         } else {
201           newbuf.writeByte((byte) (lastbyte & 0xFF));
202         }
203         lastbyte = 0;
204       }
205     } catch (final IndexOutOfBoundsException e) {
206       // End of read
207     }
208     sad.setReadPosition(0);
209     dataBlock.setBlock(newbuf);
210     return dataBlock;
211   }
212 
213   private boolean readBytesFromSad(final FtpSeekAheadData sad,
214                                    final ByteBuf newbuf) {
215     final int nextbyte = sad.bytes[sad.pos++] & 0xFF;
216     if (nextbyte == 0xFF) {
217       newbuf.writeByte((byte) (lastbyte & 0xFF));
218       return false;
219     } else {
220       if (nextbyte == 1) {
221         dataBlock.setEOR(true);
222       } else if (nextbyte == 2) {
223         dataBlock.setEOF(true);
224       } else if (nextbyte == 3) {
225         dataBlock.setEOR(true);
226         dataBlock.setEOF(true);
227       }
228       return true;
229     }
230   }
231 
232   private void checkCodecUnlocked() throws InterruptedException {
233     if (!isReady.get()) {
234       for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
235         if (!codecLocked.awaitOrInterruptible(
236             FtpInternalConfiguration.RETRYINMS)) {
237           Thread.sleep(FtpInternalConfiguration.RETRYINMS);
238         } else {
239           break;
240         }
241       }
242       if (!codecLocked.awaitOrInterruptible(
243           FtpInternalConfiguration.RETRYINMS)) {
244         Thread.sleep(FtpInternalConfiguration.RETRYINMS);
245         // Force Codec Ready
246         setCodecReady();
247       }
248       isReady.set(true);
249     }
250   }
251 
252   @Override
253   protected void decode(final ChannelHandlerContext ctx, final ByteBuf buf,
254                         final List<Object> out) throws Exception {
255     // First test if the connection is fully ready (block might be
256     // transferred by client before connection is ready)
257     checkCodecUnlocked();
258     if (buf.readableBytes() == 0) {
259       return;
260     }
261     // If STREAM Mode, no task to do, just next filter
262     if (mode == TransferMode.STREAM) {
263       dataBlock = new DataBlock();
264       if (structure != TransferStructure.RECORD) {
265         final ByteBuf newbuf = buf.slice();
266         buf.readerIndex(buf.readableBytes());
267         WaarpNettyUtil.retain(newbuf);
268         dataBlock.setBlock(newbuf);
269         out.add(dataBlock);
270         return;
271       }
272       // Except if RECORD Structure!
273       final int length = buf.readableBytes();
274       out.add(decodeRecord(buf, length));
275       return;
276     } else if (mode == TransferMode.BLOCK) {
277       // Now we are in BLOCK Mode
278       // Make sure if the length field was received.
279       if (buf.readableBytes() < 3) {
280         // The length field was not received yet - return null.
281         // This method will be invoked again when more packets are
282         // received and appended to the buffer.
283         return;
284       }
285 
286       // The length field is in the buffer.
287 
288       // Mark the current buffer position before reading the length field
289       // because the whole frame might not be in the buffer yet.
290       // We will reset the buffer position to the marked position if
291       // there's not enough bytes in the buffer.
292       buf.markReaderIndex();
293 
294       if (dataBlock == null) {
295         dataBlock = new DataBlock();
296       }
297       // Read the descriptor
298       dataBlock.setDescriptor(buf.readByte());
299 
300       // Read the length field.
301       final byte upper = buf.readByte();
302       final byte lower = buf.readByte();
303       dataBlock.setByteCount(upper, lower);
304 
305       // Make sure if there's enough bytes in the buffer.
306       if (buf.readableBytes() < dataBlock.getByteCount()) {
307         // The whole bytes were not received yet - return null.
308         // This method will be invoked again when more packets are
309         // received and appended to the buffer.
310 
311         // Reset to the marked position to read the length field again
312         // next time.
313         buf.resetReaderIndex();
314 
315         return;
316       }
317       if (dataBlock.getByteCount() > 0) {
318         // There's enough bytes in the buffer. Read it.
319         dataBlock.setBlock(buf.readBytes(dataBlock.getByteCount()));
320       }
321       final DataBlock returnDataBlock = dataBlock;
322       // Free the datablock for next frame
323       dataBlock = null;
324       // Successfully decoded a frame. Return the decoded frame.
325       out.add(returnDataBlock);
326       return;
327     } else if (mode == TransferMode.ZLIB) {
328       dataBlock = new DataBlock();
329       if (structure != TransferStructure.RECORD) {
330         zlibCodec.writeForDecompression(buf);
331         dataBlock.setBlock(zlibCodec.readCodec());
332         out.add(dataBlock);
333         return;
334       }
335       // Except if RECORD Structure!
336       throw new InvalidArgumentException(
337           "Mode unimplemented: " + mode.name() + " with " + structure.name());
338     }
339     // Type unimplemented
340     throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
341   }
342 
343   @Override
344   protected void decodeLast(final ChannelHandlerContext ctx, final ByteBuf in,
345                             final List<Object> out) throws Exception {
346     if (mode == TransferMode.ZLIB) {
347       final byte[] bytes = zlibCodec.finishCodec();
348       if (bytes != null && bytes.length != 0) {
349         dataBlock = new DataBlock();
350         dataBlock.setBlock(bytes);
351         dataBlock.setEOF(true);
352         out.add(dataBlock);
353       }
354     }
355   }
356 
357   protected final ByteBuf encodeRecord(final DataBlock msg,
358                                        final byte[] buffer) {
359     final int size = msg.getByteCount();
360     final ByteBuf newbuf = ByteBufAllocator.DEFAULT.ioBuffer(size);
361     int newbyte;
362     try {
363       int pos = 0;
364       final int limit = buffer.length;
365       while (pos < limit) {
366         newbyte = buffer[pos++] & 0xFF;
367         if (newbyte == 0xFF) {
368           newbuf.writeByte((byte) 0xFF);
369         }
370         newbuf.writeByte((byte) (newbyte & 0xFF));
371       }
372     } catch (final IndexOutOfBoundsException e) {
373       // end of read
374     }
375     int value = 0;
376     if (msg.isEOF()) {
377       value += 2;
378     }
379     if (msg.isEOR()) {
380       value += 1;
381     }
382     if (value > 0) {
383       newbuf.writeByte((byte) 0xFF);
384       newbuf.writeByte((byte) (value & 0xFF));
385     }
386     msg.clear();
387     return newbuf;
388   }
389 
390   /**
391    * Encode a DataBlock in the correct format for Mode
392    *
393    * @param msg
394    *
395    * @return the ByteBuf or null when the last block is already done
396    *
397    * @throws InvalidArgumentException
398    */
399   protected final ByteBuf encode(final DataBlock msg)
400       throws InvalidArgumentException {
401     if (msg.isCleared()) {
402       return null;
403     }
404     final byte[] bytes = msg.getByteBlock();
405     if (mode == TransferMode.STREAM) {
406       // If record structure, special attention
407       if (structure == TransferStructure.RECORD) {
408         return encodeRecord(msg, bytes);
409       }
410       msg.clear();
411       return WaarpNettyUtil.wrappedBuffer(bytes);
412     } else if (mode == TransferMode.BLOCK) {
413       int length = msg.getByteCount();
414       final int size = length > 0xFFFF? 0xFFFF + 3 : length + 3;
415       final ByteBuf newbuf = ByteBufAllocator.DEFAULT.ioBuffer(size, size);
416       final byte[] header = { 0, 0, 0 };
417       // Is there any data left
418       if (length == 0) {
419         // It could be an empty block for EOR or EOF
420         if (msg.isEOF() || msg.isEOR()) {
421           header[0] = msg.getDescriptor();
422           header[1] = 0;
423           header[2] = 0;
424           newbuf.writeBytes(header);
425           // Next call will be the last one
426           msg.clear();
427           // return the last block
428           return newbuf;
429         }
430         // This was the very last call
431         msg.clear();
432         // return the end of encode
433         return null;
434       }
435       // Is this a Restart so only Markers
436       if (msg.isRESTART()) {
437         header[0] = msg.getDescriptor();
438         header[1] = msg.getByteCountUpper();
439         header[2] = msg.getByteCountLower();
440         newbuf.writeBytes(header);
441         newbuf.writeBytes(msg.getByteMarkers());
442         // Next call will be the last one
443         msg.clear();
444         // return the last block
445         return newbuf;
446       }
447       // Work on sub block, ignoring descriptor since it is not the last
448       // one
449       if (length > 0xFFFF) {
450         header[0] = 0;
451         header[1] = (byte) 0xFF;
452         header[2] = (byte) 0xFF;
453         newbuf.writeBytes(header);
454         // Now take the first 0xFFFF bytes from buffer
455         newbuf.writeBytes(bytes, msg.getOffset(), 0xFFFF);
456         msg.addOffset(0xFFFF);
457         length -= 0xFFFF;
458         msg.setByteCount(length);
459         // return the sub block
460         return newbuf;
461       }
462       // Last final block, using the descriptor
463       header[0] = msg.getDescriptor();
464       header[1] = msg.getByteCountUpper();
465       header[2] = msg.getByteCountLower();
466       newbuf.writeBytes(header);
467       // real data
468       newbuf.writeBytes(bytes, msg.getOffset(), length);
469       // Next call will be the last one
470       msg.clear();
471       // return the last block
472       return newbuf;
473     } else if (mode == TransferMode.ZLIB) {
474       // If record structure, special attention
475       if (structure == TransferStructure.RECORD) {
476         throw new InvalidArgumentException(
477             "Mode unimplemented: " + mode.name() + " with " + structure.name());
478       }
479       final boolean last = msg.isEOF();
480       msg.clear();
481       try {
482         zlibCodec.writeForCompression(bytes);
483       } catch (final IOException e) {
484         throw new InvalidArgumentException(e.getMessage());
485       }
486       if (last) {
487         try {
488           return WaarpNettyUtil.wrappedBuffer(zlibCodec.finishCodec());
489         } catch (final IOException e) {
490           throw new InvalidArgumentException(e.getMessage());
491         }
492       }
493       return WaarpNettyUtil.wrappedBuffer(zlibCodec.readCodec());
494     } else {
495       // Mode unimplemented
496       throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
497     }
498   }
499 
500   /**
501    * @return the mode
502    */
503   public final TransferMode getMode() {
504     return mode;
505   }
506 
507   /**
508    * @param mode the mode to set
509    */
510   public final void setMode(final TransferMode mode) {
511     this.mode = mode;
512     try {
513       zlibCodec.finishCodec();
514     } catch (final IOException e) {
515       SysErrLogger.FAKE_LOGGER.ignoreLog(e);
516     }
517   }
518 
519   /**
520    * @return the structure
521    */
522   public final TransferStructure getStructure() {
523     return structure;
524   }
525 
526   /**
527    * @param structure the structure to set
528    */
529   public final void setStructure(final TransferStructure structure) {
530     this.structure = structure;
531   }
532 
533   @Override
534   protected void encode(final ChannelHandlerContext ctx, final DataBlock msg,
535                         final ByteBuf out) throws Exception {
536     // First test if the connection is fully ready (block might be
537     // transferred by client before connection is ready)
538     checkCodecUnlocked();
539     ByteBuf next = encode(msg);
540     // Could be splitten in several block
541     while (next != null) {
542       out.writeBytes(next);
543       WaarpNettyUtil.release(next);
544       next = encode(msg);
545     }
546   }
547 }